From f1264b0e66bcbdd0045748381c06a31f3dc227bb Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Apr 2022 14:10:24 +0100 Subject: [PATCH] Logger interface, more queue metrics --- multicast/multicast.go | 5 ++--- router/queuefairfifo.go | 32 ++++++++++++++++++++------------ router/router.go | 4 ++-- sessions/sessions.go | 5 ++--- types/logger.go | 20 ++++++++++++++++++++ 5 files changed, 46 insertions(+), 20 deletions(-) create mode 100644 types/logger.go diff --git a/multicast/multicast.go b/multicast/multicast.go index 4a9dde7..c4d9116 100644 --- a/multicast/multicast.go +++ b/multicast/multicast.go @@ -20,7 +20,6 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "log" "net" "sync" "time" @@ -36,7 +35,7 @@ const MulticastGroupPort = 60606 type Multicast struct { r *router.Router - log *log.Logger + log types.Logger ctx context.Context cancel context.CancelFunc id string @@ -56,7 +55,7 @@ type multicastInterface struct { } func NewMulticast( - log *log.Logger, r *router.Router, + log types.Logger, r *router.Router, ) *Multicast { public := r.PublicKey() m := &Multicast{ diff --git a/router/queuefairfifo.go b/router/queuefairfifo.go index 8e73b8b..6ca6d34 100644 --- a/router/queuefairfifo.go +++ b/router/queuefairfifo.go @@ -25,12 +25,14 @@ import ( const fairFIFOQueueSize = 16 type fairFIFOQueue struct { - queues map[uint16]chan *types.Frame // queue ID -> frame, map for randomness - num uint16 // how many queues should we have? - count int // how many queued items in total? - n uint16 // which queue did we last iterate on? - offset uint64 // adds an element of randomness to queue assignment - mutex sync.Mutex + queues map[uint16]chan *types.Frame // queue ID -> frame, map for randomness + num uint16 // how many queues should we have? + count int // how many queued items in total? + n uint16 // which queue did we last iterate on? + offset uint64 // adds an element of randomness to queue assignment + total uint64 // how many packets handled? + dropped uint64 // how many packets dropped? + mutex sync.Mutex } func newFairFIFOQueue(num uint16) *fairFIFOQueue { @@ -87,11 +89,13 @@ func (q *fairFIFOQueue) push(frame *types.Frame) bool { default: // The queue is full - perform a head drop <-q.queues[h] + q.dropped++ if q.count-1 == 0 { h = 0 } q.queues[h] <- frame } + q.total++ return true } @@ -141,13 +145,17 @@ func (q *fairFIFOQueue) MarshalJSON() ([]byte, error) { q.mutex.Lock() defer q.mutex.Unlock() res := struct { - Count int `json:"count"` - Size int `json:"size"` - Queues map[uint16]int `json:"queues"` + Count int `json:"count"` + Size int `json:"size"` + Queues map[uint16]int `json:"queues"` + Total uint64 `json:"packets_total"` + Dropped uint64 `json:"packets_dropped"` }{ - Count: q.count, - Size: int(q.num) * fairFIFOQueueSize, - Queues: map[uint16]int{}, + Count: q.count, + Size: int(q.num) * fairFIFOQueueSize, + Queues: map[uint16]int{}, + Total: q.total, + Dropped: q.dropped, } for h, queue := range q.queues { if c := len(queue); c > 0 { diff --git a/router/router.go b/router/router.go index b3f4a87..ff35229 100644 --- a/router/router.go +++ b/router/router.go @@ -40,7 +40,7 @@ const trafficBuffer = math.MaxUint8 - 1 type Router struct { phony.Inbox - log *log.Logger + log types.Logger context context.Context cancel context.CancelFunc public types.PublicKey @@ -53,7 +53,7 @@ type Router struct { _subscribers map[chan<- events.Event]*phony.Inbox } -func NewRouter(logger *log.Logger, sk ed25519.PrivateKey, debug bool) *Router { +func NewRouter(logger types.Logger, sk ed25519.PrivateKey, debug bool) *Router { if logger == nil { logger = log.New(ioutil.Discard, "", 0) } diff --git a/sessions/sessions.go b/sessions/sessions.go index 5e6eae8..bd1c534 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -24,7 +24,6 @@ import ( "encoding/hex" "encoding/pem" "fmt" - "log" "math/big" "net" "sync" @@ -38,7 +37,7 @@ import ( type Sessions struct { r *router.Router - log *log.Logger // logger + log types.Logger // logger context context.Context // router context cancel context.CancelFunc // shut down the router streams chan net.Conn // accepted connections @@ -49,7 +48,7 @@ type Sessions struct { utpSocket *utp.Socket // } -func NewSessions(log *log.Logger, r *router.Router) *Sessions { +func NewSessions(log types.Logger, r *router.Router) *Sessions { ctx, cancel := context.WithCancel(context.Background()) s, err := utp.NewSocketFromPacketConnNoClose(r) if err != nil { diff --git a/types/logger.go b/types/logger.go new file mode 100644 index 0000000..d2d1d06 --- /dev/null +++ b/types/logger.go @@ -0,0 +1,20 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +type Logger interface { + Println(...interface{}) + Printf(string, ...interface{}) +}