mirror of
https://github.com/matrix-org/pinecone.git
synced 2026-01-16 23:00:32 +00:00
Logger interface, more queue metrics
This commit is contained in:
parent
7a39d14c6c
commit
f1264b0e66
5 changed files with 46 additions and 20 deletions
|
|
@ -20,7 +20,6 @@ import (
|
|||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -36,7 +35,7 @@ const MulticastGroupPort = 60606
|
|||
|
||||
type Multicast struct {
|
||||
r *router.Router
|
||||
log *log.Logger
|
||||
log types.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
id string
|
||||
|
|
@ -56,7 +55,7 @@ type multicastInterface struct {
|
|||
}
|
||||
|
||||
func NewMulticast(
|
||||
log *log.Logger, r *router.Router,
|
||||
log types.Logger, r *router.Router,
|
||||
) *Multicast {
|
||||
public := r.PublicKey()
|
||||
m := &Multicast{
|
||||
|
|
|
|||
|
|
@ -25,12 +25,14 @@ import (
|
|||
const fairFIFOQueueSize = 16
|
||||
|
||||
type fairFIFOQueue struct {
|
||||
queues map[uint16]chan *types.Frame // queue ID -> frame, map for randomness
|
||||
num uint16 // how many queues should we have?
|
||||
count int // how many queued items in total?
|
||||
n uint16 // which queue did we last iterate on?
|
||||
offset uint64 // adds an element of randomness to queue assignment
|
||||
mutex sync.Mutex
|
||||
queues map[uint16]chan *types.Frame // queue ID -> frame, map for randomness
|
||||
num uint16 // how many queues should we have?
|
||||
count int // how many queued items in total?
|
||||
n uint16 // which queue did we last iterate on?
|
||||
offset uint64 // adds an element of randomness to queue assignment
|
||||
total uint64 // how many packets handled?
|
||||
dropped uint64 // how many packets dropped?
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func newFairFIFOQueue(num uint16) *fairFIFOQueue {
|
||||
|
|
@ -87,11 +89,13 @@ func (q *fairFIFOQueue) push(frame *types.Frame) bool {
|
|||
default:
|
||||
// The queue is full - perform a head drop
|
||||
<-q.queues[h]
|
||||
q.dropped++
|
||||
if q.count-1 == 0 {
|
||||
h = 0
|
||||
}
|
||||
q.queues[h] <- frame
|
||||
}
|
||||
q.total++
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -141,13 +145,17 @@ func (q *fairFIFOQueue) MarshalJSON() ([]byte, error) {
|
|||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
res := struct {
|
||||
Count int `json:"count"`
|
||||
Size int `json:"size"`
|
||||
Queues map[uint16]int `json:"queues"`
|
||||
Count int `json:"count"`
|
||||
Size int `json:"size"`
|
||||
Queues map[uint16]int `json:"queues"`
|
||||
Total uint64 `json:"packets_total"`
|
||||
Dropped uint64 `json:"packets_dropped"`
|
||||
}{
|
||||
Count: q.count,
|
||||
Size: int(q.num) * fairFIFOQueueSize,
|
||||
Queues: map[uint16]int{},
|
||||
Count: q.count,
|
||||
Size: int(q.num) * fairFIFOQueueSize,
|
||||
Queues: map[uint16]int{},
|
||||
Total: q.total,
|
||||
Dropped: q.dropped,
|
||||
}
|
||||
for h, queue := range q.queues {
|
||||
if c := len(queue); c > 0 {
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ const trafficBuffer = math.MaxUint8 - 1
|
|||
|
||||
type Router struct {
|
||||
phony.Inbox
|
||||
log *log.Logger
|
||||
log types.Logger
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
public types.PublicKey
|
||||
|
|
@ -53,7 +53,7 @@ type Router struct {
|
|||
_subscribers map[chan<- events.Event]*phony.Inbox
|
||||
}
|
||||
|
||||
func NewRouter(logger *log.Logger, sk ed25519.PrivateKey, debug bool) *Router {
|
||||
func NewRouter(logger types.Logger, sk ed25519.PrivateKey, debug bool) *Router {
|
||||
if logger == nil {
|
||||
logger = log.New(ioutil.Discard, "", 0)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
"encoding/hex"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/big"
|
||||
"net"
|
||||
"sync"
|
||||
|
|
@ -38,7 +37,7 @@ import (
|
|||
|
||||
type Sessions struct {
|
||||
r *router.Router
|
||||
log *log.Logger // logger
|
||||
log types.Logger // logger
|
||||
context context.Context // router context
|
||||
cancel context.CancelFunc // shut down the router
|
||||
streams chan net.Conn // accepted connections
|
||||
|
|
@ -49,7 +48,7 @@ type Sessions struct {
|
|||
utpSocket *utp.Socket //
|
||||
}
|
||||
|
||||
func NewSessions(log *log.Logger, r *router.Router) *Sessions {
|
||||
func NewSessions(log types.Logger, r *router.Router) *Sessions {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s, err := utp.NewSocketFromPacketConnNoClose(r)
|
||||
if err != nil {
|
||||
|
|
|
|||
20
types/logger.go
Normal file
20
types/logger.go
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package types
|
||||
|
||||
type Logger interface {
|
||||
Println(...interface{})
|
||||
Printf(string, ...interface{})
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue