Add broadcast & hop limiting functionality (#65)

* Add optional hop limiting to overlay traffic

* Cleanup commented code

* Add optional broadcast functionality for wakeup messages

* Update simulator to better handle changing network connections.

* Update wireshark plugin

* Fix frame tests

* Fix incorrect broadcast signature verification

* Move broadcast consts to consts file

* Only send broadcasts on the best connection to a peer

* Send broadcast immediately to newly added peer

* Fix broadcast frame marshalling

* Move keepalive timeouts to consts file

* Hook broadcasts into the sim logic

* Add broadcast info to sim ui

* Fix sim broadcast timestamps

* Add proper timestamps to broadcast events

* Drain timer when disabling broadcasts

* Update licensing comments for new broadcast files

* Remove unnecessary logging

* Filter broadcasts based on time since last seen broadcast

* Fix traffic forwarding comment

* Only send broadcasts to peer using best connection

* Ensure bootstraps have same root key and sequence

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
devonh 2022-11-17 21:45:03 +00:00 committed by GitHub
parent 37f2e9b9ba
commit 218c39e0cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 805 additions and 114 deletions

View file

@ -103,7 +103,8 @@ func main() {
}
}
sim.CalculateShortestPaths()
sim.GenerateNetworkGraph()
sim.UpdateRealDistances()
if chaos != nil && *chaos > 0 {
rand.Seed(time.Now().UnixNano())
@ -258,6 +259,11 @@ func userProxyReporter(conn *websocket.Conn, connID uint64, sim *simulator.Simul
snakeEntries = append(snakeEntries, simulator.SnakeRouteEntry{EntryID: entry, PeerID: peer})
}
var broadcastEntries []simulator.BroadcastEntry
for entry, timestamp := range node.BroadcastsReceived {
broadcastEntries = append(broadcastEntries, simulator.BroadcastEntry{PeerID: entry, Time: timestamp})
}
var bandwidthReports simulator.BandwidthReports
for _, report := range node.BandwidthReports {
if report.ReceiveTime != 0 {
@ -274,14 +280,15 @@ func userProxyReporter(conn *websocket.Conn, connID uint64, sim *simulator.Simul
AnnTime: node.Announcement.Time,
Coords: node.Coords,
},
Peers: peerConns,
TreeParent: node.Parent,
SnakeAsc: node.AscendingPeer,
SnakeAscPath: node.AscendingPathID,
SnakeDesc: node.DescendingPeer,
SnakeDescPath: node.DescendingPathID,
SnakeEntries: snakeEntries,
BandwidthReports: bandwidthReports,
Peers: peerConns,
TreeParent: node.Parent,
SnakeAsc: node.AscendingPeer,
SnakeAscPath: node.AscendingPathID,
SnakeDesc: node.DescendingPeer,
SnakeDescPath: node.DescendingPathID,
SnakeEntries: snakeEntries,
BroadcastsReceived: broadcastEntries,
BandwidthReports: bandwidthReports,
}
if batchSize == int(maxBatchSize) || end {
@ -380,6 +387,8 @@ func handleSimEvents(log *log.Logger, conn *websocket.Conn, ch <-chan simulator.
eventType = simulator.SimPingStateUpdated
case simulator.NetworkStatsUpdate:
eventType = simulator.SimNetworkStatsUpdated
case simulator.BroadcastReceived:
eventType = simulator.SimBroadcastReceived
case simulator.BandwidthReport:
eventType = simulator.SimBandwidthReport
}

View file

@ -54,6 +54,7 @@
"Keepalive": "0",
"TreeAnnouncement": "0",
"VirtualSnakeBootstrap": "0",
"WakeupBroadcast": "0",
"OverlayTraffic": "0"
}
}
@ -68,6 +69,7 @@
"Keepalive": "0",
"TreeAnnouncement": "0",
"VirtualSnakeBootstrap": "0",
"WakeupBroadcast": "0",
"OverlayTraffic": "0"
}
}

View file

@ -0,0 +1,8 @@
{
"EventSequence": [
{
"Command": "Debug",
"Data": {}
}
]
}

View file

@ -61,6 +61,7 @@
"Keepalive": "0",
"TreeAnnouncement": "0",
"VirtualSnakeBootstrap": "100",
"WakeupBroadcast": "100",
"OverlayTraffic": "100"
}
}
@ -89,6 +90,7 @@
"Keepalive": "0",
"TreeAnnouncement": "0",
"VirtualSnakeBootstrap": "0",
"WakeupBroadcast": "0",
"OverlayTraffic": "0"
}
}

View file

@ -88,6 +88,7 @@ func defaultFrameCount() PeerFrameCount {
frameCount[types.TypeKeepalive] = atomic.NewUint64(0)
frameCount[types.TypeTreeAnnouncement] = atomic.NewUint64(0)
frameCount[types.TypeBootstrap] = atomic.NewUint64(0)
frameCount[types.TypeWakeupBroadcast] = atomic.NewUint64(0)
frameCount[types.TypeTraffic] = atomic.NewUint64(0)
peerFrameCount := PeerFrameCount{

View file

@ -45,6 +45,7 @@ const (
SimSnakeEntryRemoved
SimPingStateUpdated
SimNetworkStatsUpdated
SimBroadcastReceived
SimBandwidthReport
)
@ -71,17 +72,18 @@ const (
)
type InitialNodeState struct {
PublicKey string
NodeType APINodeType
RootState RootState
Peers []PeerInfo
TreeParent string
SnakeAsc string
SnakeAscPath string
SnakeDesc string
SnakeDescPath string
SnakeEntries []SnakeRouteEntry
BandwidthReports []BandwidthSnapshot
PublicKey string
NodeType APINodeType
RootState RootState
Peers []PeerInfo
TreeParent string
SnakeAsc string
SnakeAscPath string
SnakeDesc string
SnakeDescPath string
SnakeEntries []SnakeRouteEntry
BroadcastsReceived []BroadcastEntry
BandwidthReports []BandwidthSnapshot
}
type RootState struct {
@ -101,6 +103,11 @@ type SnakeRouteEntry struct {
PeerID string
}
type BroadcastEntry struct {
PeerID string
Time uint64
}
type SimEventMsg struct {
UpdateID APIUpdateID
Event SimEvent

View file

@ -131,6 +131,12 @@ func UnmarshalCommandJSON(command *SimCommandMsg) (SimCommand, error) {
} else {
err = fmt.Errorf("%sConfigureAdversaryDefaults.DropRates.VirtualSnakeBootstrap field doesn't exist", FAILURE_PREAMBLE)
}
if subVal, subOk := val.(map[string]interface{})["WakeupBroadcast"]; subOk {
intVal, _ := strconv.Atoi(subVal.(string))
dropRates.Frames[types.TypeWakeupBroadcast] = uint64(intVal)
} else {
err = fmt.Errorf("%sConfigureAdversaryDefaults.DropRates.WakeupBroadcast field doesn't exist", FAILURE_PREAMBLE)
}
if subVal, subOk := val.(map[string]interface{})["OverlayTraffic"]; subOk {
intVal, _ := strconv.Atoi(subVal.(string))
dropRates.Frames[types.TypeTraffic] = uint64(intVal)
@ -181,6 +187,12 @@ func UnmarshalCommandJSON(command *SimCommandMsg) (SimCommand, error) {
} else {
err = fmt.Errorf("%sConfigureAdversaryPeer.DropRates.VirtualSnakeBootstrap field doesn't exist", FAILURE_PREAMBLE)
}
if subVal, subOk := val.(map[string]interface{})["WakeupBroadcast"]; subOk {
intVal, _ := strconv.Atoi(subVal.(string))
dropRates.Frames[types.TypeWakeupBroadcast] = uint64(intVal)
} else {
err = fmt.Errorf("%sConfigureAdversaryPeer.DropRates.WakeupBroadcast field doesn't exist", FAILURE_PREAMBLE)
}
if subVal, subOk := val.(map[string]interface{})["OverlayTraffic"]; subOk {
intVal, _ := strconv.Atoi(subVal.(string))
dropRates.Frames[types.TypeTraffic] = uint64(intVal)
@ -301,6 +313,8 @@ func (c AddPeer) Run(log *log.Logger, sim *Simulator) {
if err := sim.ConnectNodes(c.Node, c.Peer); err != nil {
log.Printf("Failed connecting node %s to node %s: %s", c.Node, c.Peer, err)
}
sim.GenerateNetworkGraph()
sim.UpdateRealDistances()
}
func (c AddPeer) String() string {
@ -318,6 +332,8 @@ func (c RemovePeer) Run(log *log.Logger, sim *Simulator) {
if err := sim.DisconnectNodes(c.Node, c.Peer); err != nil {
log.Printf("Failed disconnecting node %s and node %s: %s", c.Node, c.Peer, err)
}
sim.GenerateNetworkGraph()
sim.UpdateRealDistances()
}
func (c RemovePeer) String() string {

View file

@ -127,6 +127,15 @@ type SnakeEntryRemoved struct {
// Tag SnakeEntryRemoved as an Event
func (e SnakeEntryRemoved) isEvent() {}
type BroadcastReceived struct {
Node string
PeerID string
Time uint64
}
// Tag BroadcastReceived as an Event
func (e BroadcastReceived) isEvent() {}
type BandwidthReport struct {
Node string
Bandwidth BandwidthSnapshot
@ -161,6 +170,8 @@ func (h eventHandler) Run(quit <-chan bool, sim *Simulator) {
sim.handleSnakeEntryAdded(h.node, e.EntryID, e.PeerID)
case events.SnakeEntryRemoved:
sim.handleSnakeEntryRemoved(h.node, e.EntryID)
case events.BroadcastReceived:
sim.handleBroadcastReceived(h.node, e.PeerID, e.Time)
case events.BandwidthReport:
sim.handleBandwidthReport(h.node, e.CaptureTime, e.Peers)
default:

View file

@ -14,22 +14,56 @@
package simulator
import (
"math"
)
func (sim *Simulator) ReportDistance(a, b string, l int64) {
sim.distsMutex.Lock()
defer sim.distsMutex.Unlock()
if _, ok := sim.dists[a]; !ok {
sim.dists[a] = map[string]*Distance{}
}
if _, ok := sim.dists[a][b]; !ok {
sim.dists[a][b] = &Distance{}
}
sim.dists[a][b].Observed = l
if sim.dists[a][b].Real == 0 {
na, _ := sim.graph.GetMapping(a)
nb, _ := sim.graph.GetMapping(b)
path, err := sim.graph.Shortest(na, nb)
if err == nil {
sim.dists[a][b].Real = path.Distance
}
func (sim *Simulator) UpdateRealDistances() {
sim.distsMutex.Lock()
defer sim.distsMutex.Unlock()
for from := range sim.nodes {
for to := range sim.nodes {
if _, ok := sim.dists[from]; !ok {
sim.dists[from] = map[string]*Distance{}
}
if _, ok := sim.dists[from][to]; !ok {
sim.dists[from][to] = &Distance{}
}
a, _ := sim.graph.GetMapping(from)
b, _ := sim.graph.GetMapping(to)
if a != -1 && b != -1 {
path, err := sim.graph.Shortest(a, b)
if err == nil {
sim.dists[from][to].Real = path.Distance
} else {
sim.dists[from][to].Real = math.MaxInt64
}
}
}
}
sim.State.Act(nil, func() {
sim.distsMutex.Lock()
defer sim.distsMutex.Unlock()
sim.State._updateExpectedBroadcasts(sim.dists)
})
}

View file

@ -96,8 +96,6 @@ func (sim *Simulator) ConnectNodes(a, b string) error {
register(pa)
}
sim.CalculateShortestPaths()
sim.log.Printf("Connected node %q to node %q\n", a, b)
return nil
}
@ -119,13 +117,13 @@ func (sim *Simulator) DisconnectNodes(a, b string) error {
sim.wires[firstIndex][secondIndex] = nil
sim.wiresMutex.Unlock()
sim.CalculateShortestPaths()
return wire.Close()
}
func (sim *Simulator) DisconnectAllPeers(disconnectNode string) {
sim.wiresMutex.Lock()
defer sim.wiresMutex.Unlock()
nodeWires := sim.wires[disconnectNode]
for i, conn := range nodeWires {
if conn != nil {
@ -144,7 +142,4 @@ func (sim *Simulator) DisconnectAllPeers(disconnectNode string) {
}
}
}
sim.wiresMutex.Unlock()
sim.CalculateShortestPaths()
}

View file

@ -120,8 +120,6 @@ func (sim *Simulator) CreateNode(t string, nodeType APINodeType) error {
sim.log.Printf("Created node %q\n", t)
}
sim.CalculateShortestPaths()
return nil
}
@ -157,8 +155,6 @@ func (sim *Simulator) RemoveNode(node string) {
sim.nodesMutex.Unlock()
phony.Block(sim.State, func() { sim.State._removeNode(node) })
sim.CalculateShortestPaths()
}
func (sim *Simulator) ConfigureFilterDefaults(node string, rates adversary.DropRates) {
@ -185,6 +181,8 @@ func createDefaultRouter(log *log.Logger, sk ed25519.PrivateKey, quit <-chan boo
}
rtr.rtr.InjectPacketFilter(rtr.PingFilter)
rtr.EnableHopLimiting()
rtr.EnableWakeupBroadcasts()
go rtr.OverlayReadHandler(quit)
return rtr

View file

@ -15,17 +15,10 @@
package simulator
import (
"sync"
"github.com/RyanCarrier/dijkstra"
)
var calculateShortest sync.Mutex
func (sim *Simulator) CalculateShortestPaths() {
calculateShortest.Lock()
defer calculateShortest.Unlock()
func (sim *Simulator) GenerateNetworkGraph() {
sim.log.Println("Building graph")
sim.graph = dijkstra.NewGraph()
sim.maps = make(map[string]int)

View file

@ -60,6 +60,18 @@ func (r *DefaultRouter) Coords() types.Coordinates {
return r.rtr.Coords()
}
func (r *DefaultRouter) EnableHopLimiting() {
r.rtr.EnableHopLimiting()
}
func (r *DefaultRouter) EnableWakeupBroadcasts() {
r.rtr.EnableWakeupBroadcasts()
}
func (r *DefaultRouter) DisableWakeupBroadcasts() {
r.rtr.DisableWakeupBroadcasts()
}
func (r *DefaultRouter) ConfigureFilterDefaults(rates adversary.DropRates) {}
func (r *DefaultRouter) ConfigureFilterPeer(peer types.PublicKey, rates adversary.DropRates) {}

View file

@ -26,6 +26,7 @@ import (
"github.com/Arceliar/phony"
"github.com/RyanCarrier/dijkstra"
"github.com/matrix-org/pinecone/router/events"
"github.com/matrix-org/pinecone/types"
"go.uber.org/atomic"
)
@ -135,11 +136,14 @@ func (sim *Simulator) StartPinging(ping_period time.Duration) {
sim.log.Println("Starting pings...")
tasks := make(chan pair, 2*(len(sim.nodes)*len(sim.nodes)))
for from, fromNode := range sim.nodes {
if fromNode.Type == DefaultNode {
for to, toNode := range sim.nodes {
if toNode.Type == DefaultNode {
tasks <- pair{from, to}
if sim.dists[from][to].Real <= int64(types.NetworkHorizonDistance) {
tasks <- pair{from, to}
}
}
}
}
@ -342,6 +346,12 @@ func (sim *Simulator) handleSnakeEntryRemoved(node string, entryID string) {
sim.State.Act(nil, func() { sim.State._removeSnakeEntry(node, entryName) })
}
func (sim *Simulator) handleBroadcastReceived(node string, peerID string, timestamp uint64) {
if peerNode, err := sim.State.GetNodeName(peerID); err == nil {
sim.State.Act(nil, func() { sim.State._updateBroadcastCache(node, peerNode, timestamp) })
}
}
func (sim *Simulator) handleBandwidthReport(node string, captureTime uint64, peers map[string]events.PeerBandwidthUsage) {
peerBandwidth := make(map[string]PeerBandwidthUsage)
for peer, report := range peers {

View file

@ -19,6 +19,7 @@ import (
"reflect"
"github.com/Arceliar/phony"
"github.com/matrix-org/pinecone/types"
)
type RootAnnouncement struct {
@ -45,39 +46,45 @@ type BandwidthSnapshot struct {
type BandwidthReports []BandwidthSnapshot
type ExpectedBroadcasts map[string]bool
type NodeState struct {
PeerID string
NodeType APINodeType
Connections map[int]string
Parent string
Coords []uint64
Announcement RootAnnouncement
AscendingPeer string
AscendingPathID string
DescendingPeer string
DescendingPathID string
SnakeEntries map[string]string
BandwidthReports BandwidthReports
NextReportIndex uint
PeerID string
NodeType APINodeType
Connections map[int]string
Parent string
Coords []uint64
Announcement RootAnnouncement
AscendingPeer string
AscendingPathID string
DescendingPeer string
DescendingPathID string
SnakeEntries map[string]string
BroadcastsReceived map[string]uint64
BandwidthReports BandwidthReports
NextReportIndex uint
ExpectedBroadcasts ExpectedBroadcasts
}
const MaxBandwidthReports = 10
func NewNodeState(peerID string, nodeType APINodeType) *NodeState {
node := &NodeState{
PeerID: peerID,
NodeType: nodeType,
Connections: make(map[int]string),
Parent: "",
Announcement: RootAnnouncement{},
Coords: []uint64{},
AscendingPeer: "",
AscendingPathID: "",
DescendingPeer: "",
DescendingPathID: "",
SnakeEntries: make(map[string]string),
BandwidthReports: make(BandwidthReports, MaxBandwidthReports),
NextReportIndex: 0,
PeerID: peerID,
NodeType: nodeType,
Connections: make(map[int]string),
Parent: "",
Announcement: RootAnnouncement{},
Coords: []uint64{},
AscendingPeer: "",
AscendingPathID: "",
DescendingPeer: "",
DescendingPathID: "",
SnakeEntries: make(map[string]string),
BroadcastsReceived: make(map[string]uint64),
BandwidthReports: make(BandwidthReports, MaxBandwidthReports),
NextReportIndex: 0,
ExpectedBroadcasts: make(ExpectedBroadcasts),
}
return node
}
@ -209,6 +216,7 @@ func (s *StateAccessor) _addPeerConnection(from string, to string, port int) {
if _, ok := s._state.Nodes[from]; ok {
s._state.Nodes[from].Connections[port] = to
}
s._publish(PeerAdded{Node: from, Peer: to, Port: uint64(port)})
}
@ -216,9 +224,28 @@ func (s *StateAccessor) _removePeerConnection(from string, to string, port int)
if _, ok := s._state.Nodes[from]; ok {
delete(s._state.Nodes[from].Connections, port)
}
s._publish(PeerRemoved{Node: from, Peer: to})
}
func (s *StateAccessor) _updateExpectedBroadcasts(dists map[string]map[string]*Distance) {
for node := range s._state.Nodes {
if _, ok := s._state.Nodes[node]; ok {
s._state.Nodes[node].ExpectedBroadcasts = make(ExpectedBroadcasts)
if _, ok := dists[node]; ok {
for id := range dists[node] {
if id == node {
continue
}
if dists[node][id].Real <= types.NetworkHorizonDistance {
s._state.Nodes[node].ExpectedBroadcasts[id] = false
}
}
}
}
}
}
func (s *StateAccessor) _updateParent(node string, peerID string) {
if _, ok := s._state.Nodes[node]; ok {
prev := s._state.Nodes[node].Parent
@ -268,6 +295,17 @@ func (s *StateAccessor) _removeSnakeEntry(node string, entryID string) {
s._publish(SnakeEntryRemoved{Node: node, EntryID: entryID})
}
func (s *StateAccessor) _updateBroadcastCache(node string, peerID string, timestamp uint64) {
if _, ok := s._state.Nodes[node]; ok {
s._state.Nodes[node].BroadcastsReceived[peerID] = timestamp
if _, ok := s._state.Nodes[node].ExpectedBroadcasts[peerID]; ok {
s._state.Nodes[node].ExpectedBroadcasts[peerID] = true
}
}
s._publish(BroadcastReceived{Node: node, PeerID: peerID, Time: timestamp})
}
func (s *StateAccessor) _updatePeerBandwidthUsage(node string, captureTime uint64, peers map[string]PeerBandwidthUsage) {
bandwidthSnapshot := BandwidthSnapshot{
ReceiveTime: captureTime,

View file

@ -34,6 +34,12 @@ function handleSimMessage(msg) {
}
}
if (value.BroadcastsReceived) {
for (let i = 0; i < value.BroadcastsReceived.length; i++) {
graph.addBroadcast(key, value.BroadcastsReceived[i].PeerID, value.BroadcastsReceived[i].Time);
}
}
if (value.BandwidthReports) {
for (let i = 0; i < value.BandwidthReports.length; i++) {
graph.addBandwidthReport(key, value.BandwidthReports[i]);
@ -89,6 +95,9 @@ function handleSimMessage(msg) {
case APIUpdateID.NetworkStatsUpdated:
graph.updateNetworkStats(event.PathConvergence, event.AverageStretch);
break;
case APIUpdateID.BroadcastReceived:
graph.addBroadcast(event.Node, event.PeerID, event.Time);
break;
case APIUpdateID.BandwidthReport:
graph.addBandwidthReport(event.Node, event.Bandwidth);
break;

View file

@ -773,6 +773,14 @@ class Graph {
handleStatsPanelUpdate();
}
addBroadcast(id, peer, time) {
if (Nodes.has(id)) {
let node = Nodes.get(id);
node.broadcasts.set(peer, time);
this.updateUI(id);
}
}
addBandwidthReport(id, report) {
if (Nodes.has(id)) {
let node = Nodes.get(id);
@ -807,6 +815,7 @@ function newNode(key, type) {
snekDescPath: "",
snekEntries: new Map(),
nextReportIndex: 0,
broadcasts: new Map(),
bandwidthReports: new Array(10).fill({
ReceiveTime: 0,
Peers: new Map(),
@ -837,7 +846,7 @@ function handleNodeHoverUpdate() {
let hoverPanel = document.getElementById('nodePopupText');
if (hoverPanel) {
let time = node.announcement.time.toString();
let date = new Date(node.announcement.time / 1000000) // ns to ms conversion
hoverPanel.innerHTML = "<u><b>Node " + hoverNode + "</b></u>" +
"<br>Key: " + node.key.slice(0, 16).replace(/\"/g, "").toUpperCase() +
"<br>Type: " + ConvertNodeTypeToString(node.nodeType) +
@ -849,7 +858,7 @@ function handleNodeHoverUpdate() {
"<br><br><u>Announcement</u>" +
"<br>Root: Node " + node.announcement.root +
"<br>Sequence: " + node.announcement.sequence +
"<br>Time: " + time.slice(0, time.length - 3) + " ms";
"<br>Time: " + date.toLocaleTimeString();
}
}
@ -903,6 +912,12 @@ function handleNodePanelUpdate() {
snekTable += "<tr><td><code>" + entry + "</code></td><td><code>" + peer + "</code></td></tr>";
}
let broadcasts = node.broadcasts;
let bcastTable = "";
for (var [entry, time] of broadcasts.entries()) {
let date = new Date(time / 1000000) // ns to ms conversion
bcastTable += "<tr><td><code>" + entry + "</code></td><td><code>" + date.toLocaleString() + "</code></td></tr>";
}
if (nodePanel) {
nodePanel.innerHTML +=
@ -926,8 +941,13 @@ function handleNodePanelUpdate() {
"</table>" +
"<hr><h4><u>SNEK Routes (" + routes.size + ")</u></h4>" +
"<table>" +
"<tr><th>Name</th><th>Peer</th></tr>" +
"<tr><th>Dest</th><th>Peer</th></tr>" +
snekTable +
"</table>" +
"<hr><h4><u>Broadcasts Received</u></h4>" +
"<table>" +
"<tr><th>Name</th><th>Time</th></tr>" +
bcastTable +
"</table><hr><br>";
}
}

View file

@ -23,7 +23,8 @@ export const APIUpdateID = {
SnakeEntryRemoved: 10,
PingStateUpdated: 11,
NetworkStatsUpdated: 12,
BandwidthReport: 13,
BroadcastReceived: 13,
BandwidthReport: 14,
};
export const APICommandID = {

View file

@ -332,7 +332,7 @@ function validateEventSequence(content) {
validSimCommands.set("StopPings", []);
let validSubcommands = new Map();
validSubcommands.set("DropRates", ["Overall", "Keepalive", "TreeAnnouncement", "VirtualSnakeBootstrap", "OverlayTraffic"]);
validSubcommands.set("DropRates", ["Overall", "Keepalive", "TreeAnnouncement", "VirtualSnakeBootstrap", "WakeupBroadcast", "OverlayTraffic"]);
for (let i = 0; i < content.EventSequence.length; i++) {
let sequence = content.EventSequence;

View file

@ -72,6 +72,7 @@ export function createNodeOptionsGeneralAdversary() {
"Keepalive": 0,
"TreeAnnouncement": 0,
"VirtualSnakeBootstrap": 0,
"WakeupBroadcast": 0,
"OverlayTraffic": 0,
},
"BlockTreeProtoTraffic": {
@ -79,6 +80,7 @@ export function createNodeOptionsGeneralAdversary() {
"Keepalive": 0,
"TreeAnnouncement": 100,
"VirtualSnakeBootstrap": 0,
"WakeupBroadcast": 0,
"OverlayTraffic": 0,
},
"BlockSNEKProtoTraffic": {
@ -86,12 +88,14 @@ export function createNodeOptionsGeneralAdversary() {
"Keepalive": 0,
"TreeAnnouncement": 0,
"VirtualSnakeBootstrap": 100,
"WakeupBroadcast": 0,
"OverlayTraffic": 0,
},
"BlockOverlayTraffic": {
"Keepalive": 0,
"TreeAnnouncement": 0,
"VirtualSnakeBootstrap": 0,
"WakeupBroadcast": 0,
"OverlayTraffic": 100,
},
};
@ -135,6 +139,9 @@ export function createNodeOptionsGeneralAdversary() {
let snek1 = generateSliderRow("SNEK Bootstrap", "VirtualSnakeBootstrap");
nodeOptions.appendChild(snek1);
let broadcast = generateSliderRow("Wakeup Broadcast", "WakeupBroadcast");
nodeOptions.appendChild(broadcast);
let traffic = generateSliderRow("Overlay Traffic", "OverlayTraffic");
nodeOptions.appendChild(traffic);

View file

@ -19,13 +19,15 @@ local frame_types = {
[0] = "Keepalive",
[1] = "Tree Announcement",
[2] = "Bootstrap",
[3] = "Traffic"
[3] = "Broadcast"
[4] = "Traffic",
}
header_size = 10
f_version_idx = 4
f_type_idx = 5
f_extra_idx = 6
f_hop_limit_idx = 7
f_len_idx = 8
f_payload_idx = header_size
@ -34,6 +36,7 @@ frame_version = ProtoField.uint8("pinecone.version", "Version", base.DEC,
frame_versions)
frame_type = ProtoField.uint8("pinecone.type", "Type", base.DEC, frame_types)
extra_bytes = ProtoField.bytes("pinecone.extra", "Extra Bytes")
hop_limit = ProtoField.bytes("pinecone.hoplimit", "Hop Limit")
frame_len = ProtoField.uint16("pinecone.len", "Frame Length")
destination_len = ProtoField.uint16("pinecone.dstlen", "Destination Length")
@ -61,17 +64,19 @@ sigsig = ProtoField.bytes("pinecone.sigsig", "Signature")
bootstrap_seq = ProtoField.uint32("pinecone.bootstrapseq",
"Bootstrap sequence number")
broadcast_seq = ProtoField.uint32("pinecone.broadcastseq",
"Broadcast sequence number")
watermark_key = ProtoField.bytes("pinecone.wmarkkey", "Watermark public key")
watermark_seq = ProtoField.uint32("pinecone.wmarkseq",
"Watermark sequence number")
pinecone_protocol.fields = {
magic_bytes, frame_version, frame_type, extra_bytes, frame_len,
magic_bytes, frame_version, frame_type, extra_bytes, hop_limit, frame_len,
destination_len, source_len, payload_len, destination, source,
destination_key, source_key, destination_sig, source_sig, payload, rootkey,
rootseq, sigkey, sigport, sigsig, roottgt, bootstrap_seq, watermark_key,
watermark_seq, hop_count, ping_type
watermark_seq, broadcast_seq, hop_count, ping_type
}
function short_pk(key)
@ -113,7 +118,8 @@ local function do_pinecone_dissect(buffer, pinfo, tree)
local subtree = tree:add(pinecone_protocol, buffer(), "Pinecone Protocol")
subtree:add_le(frame_version, buffer(f_version_idx, 1))
subtree:add_le(frame_type, buffer(f_type_idx, 1))
subtree:add_le(extra_bytes, buffer(f_extra_idx, 2))
subtree:add_le(extra_bytes, buffer(f_extra_idx, 1))
subtree:add_le(hop_limit, buffer(f_hop_limit_idx, 1))
subtree:add_le(frame_len, buffer(f_len_idx, 2), buffer(f_len_idx, 2):uint())
local ftype = buffer(5, 1):uint()
@ -188,6 +194,28 @@ local function do_pinecone_dissect(buffer, pinfo, tree)
pinfo.cols.info:set(frame_types[2])
pinfo.cols.info:append(" " .. short_pk(dstkey:bytes():raw()) .. "")
elseif ftype == 3 then
-- Broadcast
local plen = buffer(f_payload_idx, 2):uint()
local srckey = buffer(f_payload_idx + 2, 32)
local pload = buffer(f_payload_idx + 2 + 32, plen)
subtree:add(payload_len, buffer(f_payload_idx, 2), plen)
subtree:add(source_key, srckey)
local psubtree = subtree:add(subtree, pload, "Payload")
psubtree:set_text("Payload")
local seq, offset = varu64(pload(0):bytes())
psubtree:add(broadcast_seq, pload(0, offset), seq)
psubtree:add(rootkey, pload(offset, 32))
local root_seq, root_offset = varu64(pload(offset + 32):bytes())
psubtree:add(rootseq, pload(offset + 32, root_offset), root_seq)
psubtree:add(sigsig, pload(offset + 32 + root_offset, 64))
-- Info column
pinfo.cols.info:set(frame_types[5])
pinfo.cols.info:append(" " .. short_pk(srckey:bytes():raw()) .. "")
else
-- Traffic
local plen = buffer(f_payload_idx, 2):uint()

View file

@ -66,3 +66,11 @@ func (r *Router) Peers() []PeerInfo {
})
return infos
}
func (r *Router) EnableHopLimiting() {
r._hopLimiting.Store(true)
}
func (r *Router) DisableHopLimiting() {
r._hopLimiting.Store(false)
}

View file

@ -22,6 +22,16 @@ import (
const portCount = math.MaxUint8 - 1
const trafficBuffer = math.MaxUint8 - 1
// peerKeepaliveInterval is the frequency at which this
// node will send keepalive packets to other peers if no
// other packets have been sent within the peerKeepaliveInterval.
const peerKeepaliveInterval = time.Second * 3
// peerKeepaliveTimeout is the amount of time that must
// pass without receiving any packet before we
// will assume that the peer is dead.
const peerKeepaliveTimeout = time.Second * 5
// announcementInterval is the frequency at which this
// node will send root announcements to other peers.
const announcementInterval = time.Minute * 30
@ -50,3 +60,17 @@ const coordsCacheLifetime = time.Minute
// coordsCacheMaintainInterval is how often we will clean
// out stale entries from the coords cache.
const coordsCacheMaintainInterval = time.Minute
// wakeupBroadcastInterval is how often we will aim
// to send broadcast messages into the network.
const wakeupBroadcastInterval = time.Minute
// broadcastExpiryPeriod is how long we'll wait to
// expire a seen broadcast.
const broadcastExpiryPeriod = wakeupBroadcastInterval * 3
// broadcastFilterTime is how much time must pass
// before we'll accept a new broadcast from this node.
// This helps to prevent broadcasts from flooding the
// network.
const broadcastFilterTime = wakeupBroadcastInterval / 2

View file

@ -77,6 +77,14 @@ type SnakeEntryRemoved struct {
// Tag SnakeEntryRemoved as an Event
func (e SnakeEntryRemoved) isEvent() {}
type BroadcastReceived struct {
PeerID string
Time uint64
}
// Tag BroadcastReceived as an Event
func (e BroadcastReceived) isEvent() {}
type PeerBandwidthUsage struct {
Protocol struct {
Rx uint64

View file

@ -89,6 +89,7 @@ func (r *Router) WriteTo(p []byte, addr net.Addr) (n int, err error) {
switch ga := addr.(type) {
case types.PublicKey:
frame := getFrame()
frame.HopLimit = types.MaxHopLimit
frame.Type = types.TypeTraffic
frame.DestinationKey = ga
phony.Block(r.state, func() {

View file

@ -33,9 +33,6 @@ import (
// NOTE: Functions prefixed with an underscore (_) are only safe to be called
// from the actor that owns them, in order to prevent data races.
const peerKeepaliveInterval = time.Second * 3
const peerKeepaliveTimeout = time.Second * 5
// Lower numbers for these consts are typically faster connections.
const ( // These need to be a simple int type for gobind/gomobile to export them...
PeerTypePipe int = iota
@ -281,6 +278,7 @@ func (p *peer) _write() {
p.statistics._bytesTxProto += uint64(n)
})
}
wn, err := p.conn.Write(buf[:n])
if err != nil {
p.stop(fmt.Errorf("p.conn.Write: %w", err))
@ -337,7 +335,7 @@ func (p *peer) _read() {
{
n, err := io.ReadFull(p.conn, b[:types.FrameHeaderLength])
if err != nil {
p.stop(fmt.Errorf("io.ReadFull: %w", err))
p.stop(fmt.Errorf("io.ReadFull Initial: %w", err))
return
}
isProtoTraffic = !types.FrameType(b[5]).IsTraffic()
@ -367,7 +365,7 @@ func (p *peer) _read() {
expecting := int(binary.BigEndian.Uint16(b[types.FrameHeaderLength-2 : types.FrameHeaderLength]))
n, err := io.ReadFull(p.conn, b[types.FrameHeaderLength:expecting])
if err != nil {
p.stop(fmt.Errorf("io.ReadFull: %w", err))
p.stop(fmt.Errorf("io.ReadFull Remaining: %w", err))
return
}
@ -423,3 +421,20 @@ func (p *peer) _read() {
// the actor inbox.
p.reader.Act(nil, p._read)
}
func (p *peer) _coords() (types.Coordinates, error) {
var err error
var coords types.Coordinates
if p == p.router.local {
coords = p.router.state._coords()
} else {
if announcement, ok := p.router.state._announcements[p]; ok {
coords = announcement.PeerCoords()
} else {
err = fmt.Errorf("no root announcement found for peer")
}
}
return coords, err
}

View file

@ -45,6 +45,7 @@ type Router struct {
local *peer
state *state
secure bool
_hopLimiting *atomic.Bool
_readDeadline *atomic.Time
_subscribers map[chan<- events.Event]*phony.Inbox
}
@ -67,6 +68,7 @@ func NewRouter(logger types.Logger, sk ed25519.PrivateKey, opts ...RouterOption)
context: ctx,
cancel: cancel,
secure: !insecure,
_hopLimiting: atomic.NewBool(false),
_readDeadline: atomic.NewTime(time.Now().Add(time.Hour * 24 * 365 * 100)), // ~100 years
_subscribers: make(map[chan<- events.Event]*phony.Inbox),
}
@ -96,6 +98,20 @@ func (r *Router) InjectPacketFilter(fn FilterFn) {
})
}
func (r *Router) EnableWakeupBroadcasts() {
r.state.Act(r.state, func() {
r.state._sendBroadcastIn(0)
})
}
func (r *Router) DisableWakeupBroadcasts() {
r.state.Act(r.state, func() {
if !r.state._broadcastTimer.Stop() {
<-r.state._broadcastTimer.C
}
})
}
// _publish notifies each subscriber of a new event.
func (r *Router) _publish(event events.Event) {
for ch, inbox := range r._subscribers {

View file

@ -38,18 +38,20 @@ const BWReportingInterval = time.Minute
type state struct {
phony.Inbox
r *Router
_peers []*peer // All switch ports, connected and disconnected
_descending *virtualSnakeEntry // Next descending node in keyspace
_parent *peer // Our chosen parent in the tree
_announcements announcementTable // Announcements received from our peers
_table virtualSnakeTable // Virtual snake DHT entries
_ordering uint64 // Used to order incoming tree announcements
_sequence uint64 // Used to sequence our root tree announcements
_treetimer *time.Timer // Tree maintenance timer
_snaketimer *time.Timer // Virtual snake maintenance timer
_lastbootstrap time.Time // When did we last bootstrap?
_waiting bool // Is the tree waiting to reparent?
_filterPacket FilterFn // Function called when forwarding packets
_peers []*peer // All switch ports, connected and disconnected
_descending *virtualSnakeEntry // Next descending node in keyspace
_parent *peer // Our chosen parent in the tree
_announcements announcementTable // Announcements received from our peers
_table virtualSnakeTable // Virtual snake DHT entries
_ordering uint64 // Used to order incoming tree announcements
_sequence uint64 // Used to sequence our root tree announcements
_treetimer *time.Timer // Tree maintenance timer
_snaketimer *time.Timer // Virtual snake maintenance timer
_broadcastTimer *time.Timer // Wakeup Broadcast maintenance timer
_seenBroadcasts map[types.PublicKey]broadcastEntry // Cache of previously seen wakeup broadcasts
_lastbootstrap time.Time // When did we last bootstrap?
_waiting bool // Is the tree waiting to reparent?
_filterPacket FilterFn // Function called when forwarding packets
_bandwidthTimer *time.Timer
_coordsCache coordsCacheTable
}
@ -72,6 +74,7 @@ func (s *state) _start() {
s._announcements = make(announcementTable, portCount)
s._table = virtualSnakeTable{}
s._coordsCache = coordsCacheTable{}
s._seenBroadcasts = make(map[types.PublicKey]broadcastEntry)
if s._treetimer == nil {
s._treetimer = time.AfterFunc(announcementInterval, func() {
@ -85,6 +88,12 @@ func (s *state) _start() {
})
}
if s._broadcastTimer == nil {
s._broadcastTimer = time.AfterFunc(wakeupBroadcastInterval, func() {
s.Act(nil, s._maintainBroadcasts)
})
}
if s._bandwidthTimer == nil {
s._bandwidthTimer = time.AfterFunc(time.Until(
time.Now().Round(time.Minute).Add(BWReportingInterval)),
@ -136,6 +145,18 @@ func (s *state) _cleanCachedCoords() {
})
}
// _sendBroadcastIn resets the wakeup broadcast maintenance timer to the
// specified duration.
func (s *state) _sendBroadcastIn(d time.Duration) {
if !s._broadcastTimer.Stop() {
select {
case <-s._broadcastTimer.C:
default:
}
}
s._broadcastTimer.Reset(d)
}
// _reportBandwidthIn resets the bandwidth reporting timer to the
// specified duration.
func (s *state) _reportBandwidthIn(d time.Duration) {
@ -226,6 +247,7 @@ func (s *state) _addPeer(conn net.Conn, public types.PublicKey, uri ConnectionUR
s.r.log.Println("Connected to peer", new.public.String(), "on port", new.port)
v, _ := s.r.active.LoadOrStore(hex.EncodeToString(new.public[:])+string(zone), atomic.NewUint64(0))
v.(*atomic.Uint64).Inc()
new.proto.push(s.r.state._rootAnnouncement().forPeer(new))
new.started.Store(true)
new.reader.Act(nil, new._read)

169
router/state_broadcast.go Normal file
View file

@ -0,0 +1,169 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package router
import (
"crypto/ed25519"
"fmt"
"time"
"github.com/matrix-org/pinecone/router/events"
"github.com/matrix-org/pinecone/types"
)
type broadcastEntry struct {
Sequence types.Varu64
LastSeen time.Time
}
// valid returns true if the broadcast hasn't expired, or false if it has. It is
// required for broadcasts to time out eventually, in the case that nodes leave
// the network and return later.
func (e *broadcastEntry) valid() bool {
return time.Since(e.LastSeen) < broadcastExpiryPeriod
}
// NOTE: Functions prefixed with an underscore (_) are only safe to be called
// from the actor that owns them, in order to prevent data races.
// _maintainBroadcasts sends out wakeup broadcasts to let local nodes know
// of our presence in the network.
func (s *state) _maintainBroadcasts() {
select {
case <-s.r.context.Done():
return
default:
defer s._sendBroadcastIn(wakeupBroadcastInterval)
}
// Clean up any broadcasts that are older than the expiry period.
for k, v := range s._seenBroadcasts {
if !v.valid() {
delete(s._seenBroadcasts, k)
}
}
s._sendWakeupBroadcasts()
}
func (s *state) _createBroadcastFrame() (*types.Frame, error) {
// Construct the broadcast packet.
b := frameBufferPool.Get().(*[types.MaxFrameSize]byte)
defer frameBufferPool.Put(b)
broadcast := types.WakeupBroadcast{
Sequence: types.Varu64(time.Now().UnixMilli()),
Root: s._rootAnnouncement().Root,
}
if s.r.secure {
protected, err := broadcast.ProtectedPayload()
if err != nil {
return nil, err
}
copy(
broadcast.Signature[:],
ed25519.Sign(s.r.private[:], protected),
)
}
n, err := broadcast.MarshalBinary(b[:])
if err != nil {
return nil, err
}
// Construct the frame.
send := getFrame()
send.Type = types.TypeWakeupBroadcast
send.SourceKey = s.r.public
send.HopLimit = types.NetworkHorizonDistance
send.Payload = append(send.Payload[:0], b[:n]...)
return send, nil
}
func (s *state) _sendWakeupBroadcasts() {
broadcast, err := s._createBroadcastFrame()
if err != nil {
s.r.log.Println("Failed creating broadcast frame:", err)
}
s._flood(s.r.local, broadcast, ClassicFlood)
}
func (s *state) _handleBroadcast(p *peer, f *types.Frame) error {
// Unmarshall the broadcast
var broadcast types.WakeupBroadcast
if _, err := broadcast.UnmarshalBinary(f.Payload); err != nil {
return fmt.Errorf("broadcast unmarshal failed: %w", err)
}
if s.r.secure {
// Check that the broadcast message was protected by the node that claims
// to have sent it. Silently drop it if there's a signature problem.
protected, err := broadcast.ProtectedPayload()
if err != nil {
return fmt.Errorf("broadcast payload invalid: %w", err)
}
if !ed25519.Verify(
f.SourceKey[:],
protected,
broadcast.Signature[:],
) {
return fmt.Errorf("broadcast payload signature invalid")
}
}
// Check that the root key in the update matches our current root.
// If they don't match, silently drop the broadcast.
root := s._rootAnnouncement()
if root.RootPublicKey.CompareTo(broadcast.RootPublicKey) != 0 {
return nil
}
// Check the sequence number in the broadcast.
// If we have seen a higher sequence number before then there is no need
// to continue forwarding it.
if existing, ok := s._seenBroadcasts[f.SourceKey]; ok {
sendingTooFast := time.Since(existing.LastSeen) < broadcastFilterTime
repeatedSequence := broadcast.Sequence <= existing.Sequence
if sendingTooFast || repeatedSequence {
return nil
}
}
s._seenBroadcasts[f.SourceKey] = broadcastEntry{
Sequence: broadcast.Sequence,
LastSeen: time.Now(),
}
// send event to subscribers about discovered node
s.r.Act(nil, func() {
s.r._publish(events.BroadcastReceived{PeerID: f.SourceKey.String(), Time: uint64(time.Now().UnixNano())})
})
if f.HopLimit > 1 {
f.HopLimit -= 1
} else {
// The packet has reached the hop limit and shouldn't be forwarded.
return nil
}
// Forward the broadcast to all our peers except for the peer we
// received it from.
if f.HopLimit >= types.NetworkHorizonDistance-1 {
s._flood(p, f, ClassicFlood)
} else {
s._flood(p, f, TreeFlood)
}
return nil
}

View file

@ -22,6 +22,13 @@ import (
"github.com/matrix-org/pinecone/types"
)
type FloodType int
const (
ClassicFlood FloodType = iota
TreeFlood
)
// _nextHopsFor returns the next-hop for the given frame. It will examine the packet
// type and use the correct routing algorithm to determine the next-hop. It is possible
// for this function to return `nil` if there is no suitable candidate.
@ -106,10 +113,25 @@ func (s *state) _forward(p *peer, f *types.Frame) error {
return nil
}
case types.TypeWakeupBroadcast:
// Broadcasts are a special case. The _handleBroadcast function will handle
// forwarding broadcasts as appropriate.
if err := s._handleBroadcast(p, f); err != nil {
return fmt.Errorf("s._handleBroadcast (port %d): %w", p.port, err)
}
return nil
case types.TypeTraffic:
// Traffic type packets are forwarded normally by falling through. There
// are no special rules to apply to these packets, regardless of whether
// they are SNEK-routed or tree-routed.
// Traffic type packets are forwarded normally by falling through unless hop
// limiting is enabled.
if s.r._hopLimiting.Load() {
if f.HopLimit > 1 {
f.HopLimit -= 1
} else {
// The packet has reached the hop limit and shouldn't be forwarded.
return nil
}
}
}
// If the packet's watermark is higher than the previous one or we are
@ -133,3 +155,61 @@ func (s *state) _forward(p *peer, f *types.Frame) error {
return nil
}
// _flood sends a frame to all of our connected peers. This is used for
// flooding the wakeup broadcast to all of our direct peers.
// Classic flooding works by sending frames to all other peers.
// Tree flooding works by only sending frames to peers on the same branch.
func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) {
floodCandidates := make(map[types.PublicKey]*peer)
for _, newCandidate := range s._peers {
if newCandidate == nil || newCandidate.proto == nil || !newCandidate.started.Load() {
continue
}
if newCandidate == from || newCandidate == s.r.local {
continue
}
if s._filterPacket != nil && s._filterPacket(newCandidate.public, f) {
s.r.log.Printf("Packet of type %s destined for port %d [%s] was dropped due to filter rules", f.Type.String(), newCandidate.port, newCandidate.public.String()[:8])
continue
}
if floodType == TreeFlood {
if coords, err := newCandidate._coords(); err == nil {
if coords.DistanceTo(s._coords()) != 1 {
// This peer is not directly on the same branch.
continue
}
} else {
continue
}
}
if existingCandidate, ok := floodCandidates[newCandidate.public]; ok {
fasterNewPeerType := newCandidate.peertype < existingCandidate.peertype
lowerLatencyNewCandidate := false
if existingAnnouncement, ok := s._announcements[existingCandidate]; ok {
if newAnnouncement, ok := s._announcements[newCandidate]; ok {
if newAnnouncement.receiveOrder < existingAnnouncement.receiveOrder {
lowerLatencyNewCandidate = true
}
}
}
betterCandidate := fasterNewPeerType || lowerLatencyNewCandidate
if !betterCandidate {
continue
}
}
floodCandidates[newCandidate.public] = newCandidate
}
for _, p := range floodCandidates {
frame := getFrame()
f.CopyInto(frame)
p.send(frame)
}
}

View file

@ -304,6 +304,7 @@ func (s *state) _handleTreeAnnouncement(p *peer, f *types.Frame) error {
}
isFirstAnnouncement := false
shouldSendBroadcast := false
// If the peer is replaying an old sequence number to us then we
// assume that they are up to no good.
@ -313,6 +314,18 @@ func (s *state) _handleTreeAnnouncement(p *peer, f *types.Frame) error {
}
} else {
isFirstAnnouncement = true
shouldSendBroadcast = true
for peer, ann := range s._announcements {
if ann != nil {
if peer.public.CompareTo(p.public) == 0 {
// We already have an established connection with this peer so sending another
// broadcast would be redundant.
shouldSendBroadcast = false
break
}
}
}
}
// Get the key of our current root and then work out if the root
@ -371,6 +384,12 @@ func (s *state) _handleTreeAnnouncement(p *peer, f *types.Frame) error {
}
}
if shouldSendBroadcast {
if broadcast, err := s._createBroadcastFrame(); err == nil {
p.send(broadcast)
}
}
return nil
}

83
types/broadcast.go Normal file
View file

@ -0,0 +1,83 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"crypto/ed25519"
"fmt"
)
type WakeupBroadcast struct {
Sequence Varu64
Root
Signature [ed25519.SignatureSize]byte
}
func (w *WakeupBroadcast) ProtectedPayload() ([]byte, error) {
buffer := make([]byte, w.Sequence.Length()+w.Root.Length())
offset := 0
n, err := w.Sequence.MarshalBinary(buffer[:])
if err != nil {
return nil, fmt.Errorf("w.Sequence.MarshalBinary: %w", err)
}
offset += n
offset += copy(buffer[offset:], w.RootPublicKey[:])
n, err = w.RootSequence.MarshalBinary(buffer[offset:])
if err != nil {
return nil, fmt.Errorf("w.RootSequence.MarshalBinary: %w", err)
}
offset += n
return buffer[:offset], nil
}
func (w *WakeupBroadcast) MarshalBinary(buf []byte) (int, error) {
if len(buf) < w.Sequence.Length()+w.Root.Length()+ed25519.SignatureSize {
return 0, fmt.Errorf("buffer too small")
}
offset := 0
n, err := w.Sequence.MarshalBinary(buf[offset:])
if err != nil {
return 0, fmt.Errorf("w.Sequence.MarshalBinary: %w", err)
}
offset += n
offset += copy(buf[offset:], w.RootPublicKey[:])
n, err = w.RootSequence.MarshalBinary(buf[offset:])
if err != nil {
return 0, fmt.Errorf("w.RootSequence.MarshalBinary: %w", err)
}
offset += n
offset += copy(buf[offset:], w.Signature[:])
return offset, nil
}
func (w *WakeupBroadcast) UnmarshalBinary(buf []byte) (int, error) {
if len(buf) < w.Sequence.MinLength()+w.Root.MinLength()+ed25519.SignatureSize {
return 0, fmt.Errorf("buffer too small")
}
offset := 0
n, err := w.Sequence.UnmarshalBinary(buf[offset:])
if err != nil {
return 0, fmt.Errorf("w.Sequence.UnmarshalBinary: %w", err)
}
offset += n
offset += copy(w.RootPublicKey[:], buf[offset:])
n, err = w.RootSequence.UnmarshalBinary(buf[offset:])
if err != nil {
return 0, fmt.Errorf("w.RootSequence.UnmarshalBinary: %w", err)
}
offset += n
offset += copy(w.Signature[:], buf[offset:])
return offset, nil
}

View file

@ -37,6 +37,7 @@ const (
TypeKeepalive FrameType = iota // protocol frame, direct to peers only
TypeTreeAnnouncement // protocol frame, bypasses queues
TypeBootstrap // protocol frame, forwarded using SNEK
TypeWakeupBroadcast // protocol frame, special broadcast forwarding
TypeTraffic // traffic frame, forwarded using tree or SNEK
)
@ -53,10 +54,16 @@ var FrameMagicBytes = []byte{0x70, 0x69, 0x6e, 0x65}
// 4 magic bytes, 1 byte version, 1 byte type, 2 bytes extra, 2 bytes frame length
const FrameHeaderLength = 10
// TODO: what should this be for the network visibility horizon to be what we desire?
// ie. 2-hop 100%, 5-hop >90%, etc.
const MaxHopLimit = 10
const NetworkHorizonDistance = 5
type Frame struct {
Version FrameVersion
Type FrameType
Extra [2]byte
Extra byte
HopLimit uint8
Destination Coordinates
DestinationKey PublicKey
Source Coordinates
@ -67,9 +74,8 @@ type Frame struct {
func (f *Frame) Reset() {
f.Version, f.Type = 0, 0
for i := range f.Extra {
f.Extra[i] = 0
}
f.Extra = 0
f.HopLimit = 0
f.Destination = Coordinates{}
f.DestinationKey = PublicKey{}
f.Source = Coordinates{}
@ -78,10 +84,23 @@ func (f *Frame) Reset() {
f.Payload = f.Payload[:0]
}
func (f *Frame) CopyInto(t *Frame) {
t.Version = f.Version
t.Type = f.Type
t.Extra = f.Extra
t.HopLimit = f.HopLimit
t.DestinationKey = f.DestinationKey
t.SourceKey = f.SourceKey
t.Watermark = f.Watermark
t.Payload = t.Payload[:len(f.Payload)]
copy(t.Payload, f.Payload)
}
func (f *Frame) MarshalBinary(buffer []byte) (int, error) {
copy(buffer[:4], FrameMagicBytes)
buffer[4], buffer[5] = byte(f.Version), byte(f.Type)
copy(buffer[6:], f.Extra[:])
buffer[6] = f.Extra
buffer[7] = f.HopLimit
offset := FrameHeaderLength
switch f.Type {
case TypeKeepalive:
@ -111,6 +130,16 @@ func (f *Frame) MarshalBinary(buffer []byte) (int, error) {
offset += copy(buffer[offset:], f.Payload[:payloadLen])
}
case TypeWakeupBroadcast: // source = key
payloadLen := len(f.Payload)
binary.BigEndian.PutUint16(buffer[offset+0:offset+2], uint16(payloadLen))
offset += 2
offset += copy(buffer[offset:], f.SourceKey[:ed25519.PublicKeySize])
if f.Payload != nil {
f.Payload = f.Payload[:payloadLen]
offset += copy(buffer[offset:], f.Payload[:payloadLen])
}
case TypeTraffic:
payloadLen := len(f.Payload)
binary.BigEndian.PutUint16(buffer[offset+0:offset+2], uint16(payloadLen))
@ -158,8 +187,8 @@ func (f *Frame) UnmarshalBinary(data []byte) (int, error) {
return 0, fmt.Errorf("frame doesn't contain magic bytes")
}
f.Version, f.Type = FrameVersion(data[4]), FrameType(data[5])
f.Extra[0], f.Extra[1] = data[6], data[7]
copy(f.Extra[:], data[6:])
f.Extra = data[6]
f.HopLimit = data[7]
framelen := int(binary.BigEndian.Uint16(data[FrameHeaderLength-2 : FrameHeaderLength]))
if len(data) != framelen {
return 0, fmt.Errorf("frame length incorrect")
@ -196,6 +225,17 @@ func (f *Frame) UnmarshalBinary(data []byte) (int, error) {
offset += copy(f.Payload[:payloadLen], data[offset:])
return offset, nil
case TypeWakeupBroadcast: // source = key
payloadLen := int(binary.BigEndian.Uint16(data[offset+0 : offset+2]))
if payloadLen > cap(f.Payload) {
return 0, fmt.Errorf("payload length exceeds frame capacity")
}
offset += 2
offset += copy(f.SourceKey[:], data[offset:])
f.Payload = f.Payload[:payloadLen]
offset += copy(f.Payload[:payloadLen], data[offset:])
return offset, nil
case TypeTraffic:
payloadLen := int(binary.BigEndian.Uint16(data[offset+0 : offset+2]))
if payloadLen > cap(f.Payload) {
@ -246,6 +286,8 @@ func (t FrameType) String() string {
return "TreeAnnouncement"
case TypeBootstrap:
return "VirtualSnakeBootstrap"
case TypeWakeupBroadcast:
return "WakeupBroadcast"
case TypeTraffic:
return "OverlayTraffic"
default:

View file

@ -38,8 +38,9 @@ func TestMarshalUnmarshalFrameTreeRouted(t *testing.T) {
0x70, 0x69, 0x6e, 0x65, // magic bytes
0, // version 0
byte(TypeTraffic), // type greedy
0, 0, // extra
0, 97, // frame length
0, // extra
0, // hop limit
0, 97, // frame length
0, 7, // payload len
0, 6, 1, 2, 3, 4, 167, 8, // destination (2+6 bytes but 5 ports!)
0, 4, 4, 3, 2, 1, // source (2+4 bytes)
@ -117,8 +118,9 @@ func TestMarshalUnmarshalFrameSnekRouted(t *testing.T) {
0x70, 0x69, 0x6e, 0x65, // magic bytes
0, // version 0
byte(TypeTraffic), // type greedy
0, 0, // extra
0, 124, // frame length
0, // extra
0, // hop limit
0, 124, // frame length
0, 7, // payload len
0, 0, // destination (2+0 bytes)
0, 4, 4, 3, 2, 1, // source (2+4 bytes)
@ -203,8 +205,9 @@ func TestMarshalUnmarshalSNEKBootstrapFrame(t *testing.T) {
0x70, 0x69, 0x6e, 0x65, // magic bytes
0, // version 0
byte(TypeBootstrap), // type greedy
0, 0, // extra
0, 82, // frame length
0, // extra
0, // hop limit
0, 82, // frame length
0, 5, // payload length
}
expected = append(expected, pk...)