mirror of
https://github.com/matrix-org/pinecone.git
synced 2026-01-16 23:00:32 +00:00
Revert "Simplify a bit by not putting asc/desc routes into routing table separately"
This reverts commit a9c3c3b311.
This commit is contained in:
parent
a9c3c3b311
commit
a9dad1b2fc
2 changed files with 197 additions and 96 deletions
|
|
@ -205,6 +205,10 @@ func (t *virtualSnake) bootstrapNow() {
|
|||
if t.r.IsRoot() || t.r.PeerCount(-1) == 0 {
|
||||
return
|
||||
}
|
||||
ts, err := util.SignedTimestamp(t.r.private)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var payload [8 + ed25519.PublicKeySize]byte
|
||||
bootstrap := types.VirtualSnakeBootstrap{
|
||||
RootPublicKey: t.r.RootPublicKey(),
|
||||
|
|
@ -220,7 +224,7 @@ func (t *virtualSnake) bootstrapNow() {
|
|||
frame.Type = types.TypeVirtualSnakeBootstrap
|
||||
frame.DestinationKey = t.r.PublicKey()
|
||||
frame.Source = t.r.Coords()
|
||||
frame.Payload = append(frame.Payload[:0], payload[:]...)
|
||||
frame.Payload = append(frame.Payload[:0], append(payload[:], ts...)...)
|
||||
t.r.send <- frame
|
||||
}
|
||||
|
||||
|
|
@ -359,14 +363,6 @@ func (t *virtualSnake) getVirtualSnakeNextHop(from *Peer, destKey types.PublicKe
|
|||
}
|
||||
}
|
||||
|
||||
// Check our keyspace neighbours
|
||||
if asc := t._ascending; asc != nil {
|
||||
newCheckedCandidate(asc.PublicKey, asc.Port)
|
||||
}
|
||||
if desc := t._descending; desc != nil {
|
||||
newCheckedCandidate(desc.PublicKey, desc.Port)
|
||||
}
|
||||
|
||||
// Check our DHT entries
|
||||
for dhtKey, entry := range t._table {
|
||||
if entry.Valid() {
|
||||
|
|
@ -486,12 +482,18 @@ func (t *virtualSnake) handleBootstrap(from *Peer, rx *types.Frame) error {
|
|||
}
|
||||
// Unmarshal the bootstrap.
|
||||
var bootstrap types.VirtualSnakeBootstrap
|
||||
if _, err := bootstrap.UnmarshalBinary(rx.Payload); err != nil {
|
||||
n, err := bootstrap.UnmarshalBinary(rx.Payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("bootstrap.UnmarshalBinary: %w", err)
|
||||
}
|
||||
// Check if the packet has a valid signed timestamp.
|
||||
if !util.VerifySignedTimestamp(rx.DestinationKey, rx.Payload[n:]) {
|
||||
return fmt.Errorf("util.VerifySignedTimestamp")
|
||||
}
|
||||
if !bootstrap.RootPublicKey.EqualTo(t.r.RootPublicKey()) {
|
||||
return fmt.Errorf("bootstrap root key doesn't match")
|
||||
}
|
||||
acknowledge := false
|
||||
desc := t._descending
|
||||
switch {
|
||||
case rx.SourceKey.EqualTo(t.r.public):
|
||||
|
|
@ -499,46 +501,54 @@ func (t *virtualSnake) handleBootstrap(from *Peer, rx *types.Frame) error {
|
|||
// so either another node has forwarded it to us incorrectly, or
|
||||
// a routing loop has occurred somewhere. Don't act on the bootstrap
|
||||
// in that case.
|
||||
return fmt.Errorf("received loopback bootstrap")
|
||||
case desc != nil && desc.PublicKey.EqualTo(rx.DestinationKey):
|
||||
// We've received another bootstrap from our direct descending node.
|
||||
// Send back an acknowledgement as this is OK.
|
||||
acknowledge = true
|
||||
case desc != nil && time.Since(desc.LastSeen) >= virtualSnakeNeighExpiryPeriod:
|
||||
// We already have a direct descending node, but we haven't seen it
|
||||
// recently, so it's quite possible that it has disappeared. We'll
|
||||
// therefore handle this bootstrap instead. If the original node comes
|
||||
// back later and is closer to us then we'll end up using it again.
|
||||
acknowledge = true
|
||||
case desc == nil && util.LessThan(rx.DestinationKey, t.r.public):
|
||||
// We don't know about a descending node and at the moment we don't know
|
||||
// any better candidates, so we'll accept a bootstrap from a node with a
|
||||
// key lower than ours (so that it matches descending order).
|
||||
acknowledge = true
|
||||
case desc != nil && util.DHTOrdered(desc.PublicKey, rx.DestinationKey, t.r.public):
|
||||
// We know about a descending node already but it turns out that this
|
||||
// new node that we've received a bootstrap from is actually closer to
|
||||
// us than the previous node. We'll update our record to use the new
|
||||
// node instead and then send back a bootstrap ACK.
|
||||
case desc == nil && util.LessThan(rx.DestinationKey, t.r.public):
|
||||
// We don't know about a descending node and at the moment we don't know
|
||||
// any better candidates, so we'll accept a bootstrap from a node with a
|
||||
// key lower than ours (so that it matches descending order).
|
||||
acknowledge = true
|
||||
default:
|
||||
// The bootstrap conditions weren't met. This might just be because
|
||||
// there's a node out there that hasn't converged to a closer node
|
||||
// yet, so we'll just ignore the bootstrap.
|
||||
return fmt.Errorf("no bootstrap conditions met")
|
||||
}
|
||||
bootstrapACK := types.VirtualSnakeBootstrapACK{ // nolint:gosimple
|
||||
PathID: bootstrap.PathID,
|
||||
RootPublicKey: bootstrap.RootPublicKey,
|
||||
if acknowledge {
|
||||
bootstrapACK := types.VirtualSnakeBootstrapACK{ // nolint:gosimple
|
||||
PathID: bootstrap.PathID,
|
||||
RootPublicKey: bootstrap.RootPublicKey,
|
||||
}
|
||||
var buf [8 + ed25519.PublicKeySize]byte
|
||||
if _, err := bootstrapACK.MarshalBinary(buf[:]); err != nil {
|
||||
return fmt.Errorf("bootstrapACK.MarshalBinary: %w", err)
|
||||
}
|
||||
ts, err := util.SignedTimestamp(t.r.private)
|
||||
if err != nil {
|
||||
return fmt.Errorf("util.SignedTimestamp: %w", err)
|
||||
}
|
||||
frame := types.GetFrame()
|
||||
frame.Type = types.TypeVirtualSnakeBootstrapACK
|
||||
frame.Destination = rx.Source
|
||||
frame.DestinationKey = rx.DestinationKey
|
||||
frame.Source = t.r.Coords()
|
||||
frame.SourceKey = t.r.public
|
||||
frame.Payload = append(frame.Payload[:0], append(buf[:], ts...)...)
|
||||
t.r.send <- frame
|
||||
}
|
||||
var buf [8 + ed25519.PublicKeySize]byte
|
||||
if _, err := bootstrapACK.MarshalBinary(buf[:]); err != nil {
|
||||
return fmt.Errorf("bootstrapACK.MarshalBinary: %w", err)
|
||||
}
|
||||
frame := types.GetFrame()
|
||||
frame.Type = types.TypeVirtualSnakeBootstrapACK
|
||||
frame.Destination = rx.Source
|
||||
frame.DestinationKey = rx.DestinationKey
|
||||
frame.Source = t.r.Coords()
|
||||
frame.SourceKey = t.r.public
|
||||
frame.Payload = append(frame.Payload[:0], buf[:]...)
|
||||
t.r.send <- frame
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -547,12 +557,18 @@ func (t *virtualSnake) handleBootstrapACK(from *Peer, rx *types.Frame) error {
|
|||
defer t.mutex.Unlock()
|
||||
// Unmarshal the bootstrap ACK.
|
||||
var bootstrapACK types.VirtualSnakeBootstrapACK
|
||||
if _, err := bootstrapACK.UnmarshalBinary(rx.Payload); err != nil {
|
||||
n, err := bootstrapACK.UnmarshalBinary(rx.Payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("bootstrapACK.UnmarshalBinary: %w", err)
|
||||
}
|
||||
// Check if the packet has a valid signed timestamp.
|
||||
if !util.VerifySignedTimestamp(rx.SourceKey, rx.Payload[n:]) {
|
||||
return fmt.Errorf("util.VerifySignedTimestamp")
|
||||
}
|
||||
if !bootstrapACK.RootPublicKey.EqualTo(t.r.RootPublicKey()) {
|
||||
return fmt.Errorf("bootstrap ACK root key doesn't match")
|
||||
}
|
||||
update := false
|
||||
asc := t._ascending
|
||||
switch {
|
||||
case rx.SourceKey.EqualTo(t.r.public):
|
||||
|
|
@ -560,59 +576,67 @@ func (t *virtualSnake) handleBootstrapACK(from *Peer, rx *types.Frame) error {
|
|||
// so either another node has forwarded it to us incorrectly, or
|
||||
// a routing loop has occurred somewhere. Don't act on the bootstrap
|
||||
// in that case.
|
||||
return fmt.Errorf("received loopback bootstrap ACK")
|
||||
case asc != nil && asc.PublicKey.EqualTo(rx.SourceKey) && asc.PathID != bootstrapACK.PathID:
|
||||
// We've received another bootstrap ACK from our direct ascending node.
|
||||
// Just refresh the record and then send a new path setup message to
|
||||
// that node.
|
||||
update = true
|
||||
case asc != nil && time.Since(asc.LastSeen) >= virtualSnakeNeighExpiryPeriod:
|
||||
// We already have a direct ascending node, but we haven't seen it
|
||||
// recently, so it's quite possible that it has disappeared. We'll
|
||||
// therefore handle this bootstrap ACK instead. If the original node comes
|
||||
// back later and is closer to us then we'll end up using it again.
|
||||
update = true
|
||||
case asc == nil && util.LessThan(t.r.public, rx.SourceKey):
|
||||
// We don't know about an ascending node and at the moment we don't know
|
||||
// any better candidates, so we'll accept a bootstrap ACK from a node with a
|
||||
// key higher than ours (so that it matches descending order).
|
||||
update = true
|
||||
case asc != nil && util.DHTOrdered(t.r.public, rx.SourceKey, asc.PublicKey):
|
||||
// We know about an ascending node already but it turns out that this
|
||||
// new node that we've received a bootstrap from is actually closer to
|
||||
// us than the previous node. We'll update our record to use the new
|
||||
// node instead and then send a new path setup message to it.
|
||||
case asc == nil && util.LessThan(t.r.public, rx.SourceKey):
|
||||
// We don't know about an ascending node and at the moment we don't know
|
||||
// any better candidates, so we'll accept a bootstrap ACK from a node with a
|
||||
// key higher than ours (so that it matches descending order).
|
||||
update = true
|
||||
default:
|
||||
// The bootstrap ACK conditions weren't met. This might just be because
|
||||
// there's a node out there that hasn't converged to a closer node
|
||||
// yet, so we'll just ignore the acknowledgement.
|
||||
return fmt.Errorf("no bootstrap ACK conditions met")
|
||||
}
|
||||
if asc != nil {
|
||||
// Tear down the previous path, if there was one.
|
||||
t.teardownPath(asc.PublicKey, asc.PathID, asc.Port, true)
|
||||
}
|
||||
t._ascending = &virtualSnakeNeighbour{
|
||||
PublicKey: rx.SourceKey,
|
||||
Port: from.port,
|
||||
LastSeen: time.Now(),
|
||||
Coords: rx.Source,
|
||||
PathID: bootstrapACK.PathID,
|
||||
RootPublicKey: bootstrapACK.RootPublicKey,
|
||||
}
|
||||
setup := types.VirtualSnakeSetup{ // nolint:gosimple
|
||||
PathID: bootstrapACK.PathID,
|
||||
RootPublicKey: bootstrapACK.RootPublicKey,
|
||||
}
|
||||
var buf [8 + ed25519.PublicKeySize]byte
|
||||
if _, err := setup.MarshalBinary(buf[:]); err != nil {
|
||||
return fmt.Errorf("setup.MarshalBinary: %w", err)
|
||||
}
|
||||
if update {
|
||||
if asc != nil {
|
||||
// Tear down the previous path, if there was one.
|
||||
t.teardownPath(asc.PublicKey, asc.PathID, asc.Port, true)
|
||||
}
|
||||
t._ascending = &virtualSnakeNeighbour{
|
||||
PublicKey: rx.SourceKey,
|
||||
Port: from.port,
|
||||
LastSeen: time.Now(),
|
||||
Coords: rx.Source,
|
||||
PathID: bootstrapACK.PathID,
|
||||
RootPublicKey: bootstrapACK.RootPublicKey,
|
||||
}
|
||||
setup := types.VirtualSnakeSetup{ // nolint:gosimple
|
||||
PathID: bootstrapACK.PathID,
|
||||
RootPublicKey: bootstrapACK.RootPublicKey,
|
||||
}
|
||||
var buf [8 + ed25519.PublicKeySize]byte
|
||||
if _, err := setup.MarshalBinary(buf[:]); err != nil {
|
||||
return fmt.Errorf("setup.MarshalBinary: %w", err)
|
||||
}
|
||||
ts, err := util.SignedTimestamp(t.r.private)
|
||||
if err != nil {
|
||||
return fmt.Errorf("util.SignedTimestamp: %w", err)
|
||||
}
|
||||
|
||||
frame := types.GetFrame()
|
||||
frame.Type = types.TypeVirtualSnakeSetup
|
||||
frame.Destination = rx.Source
|
||||
frame.DestinationKey = rx.SourceKey
|
||||
frame.SourceKey = t.r.public
|
||||
frame.Payload = append(frame.Payload[:0], buf[:]...)
|
||||
t.r.send <- frame
|
||||
frame := types.GetFrame()
|
||||
frame.Type = types.TypeVirtualSnakeSetup
|
||||
frame.Destination = rx.Source
|
||||
frame.DestinationKey = rx.SourceKey
|
||||
frame.SourceKey = t.r.public
|
||||
frame.Payload = append(frame.Payload[:0], append(buf[:], ts...)...)
|
||||
t.r.send <- frame
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -623,12 +647,18 @@ func (t *virtualSnake) handleBootstrapACK(from *Peer, rx *types.Frame) error {
|
|||
func (t *virtualSnake) handleSetup(from *Peer, rx *types.Frame, nextHops types.SwitchPorts) error {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
|
||||
// Unmarshal the setup.
|
||||
var setup types.VirtualSnakeSetup
|
||||
if _, err := setup.UnmarshalBinary(rx.Payload); err != nil {
|
||||
n, err := setup.UnmarshalBinary(rx.Payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("setup.UnmarshalBinary: %w", err)
|
||||
}
|
||||
|
||||
// Check if the setup packet has a valid signed timestamp.
|
||||
if !util.VerifySignedTimestamp(rx.SourceKey, rx.Payload[n:]) {
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, from.port, false)
|
||||
return fmt.Errorf("invalid signature")
|
||||
}
|
||||
if !setup.RootPublicKey.EqualTo(t.r.RootPublicKey()) {
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, from.port, false)
|
||||
return fmt.Errorf("setup root key doesn't match")
|
||||
|
|
@ -642,8 +672,11 @@ func (t *virtualSnake) handleSetup(from *Peer, rx *types.Frame, nextHops types.S
|
|||
}
|
||||
}
|
||||
|
||||
var addToRoutingTable bool
|
||||
|
||||
// Is the setup a duplicate of one we already have in our table?
|
||||
if _, ok := t._table[virtualSnakeIndex{rx.SourceKey, setup.PathID}]; ok {
|
||||
_, ok := t._table[virtualSnakeIndex{rx.SourceKey, setup.PathID}]
|
||||
if ok {
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, 0, false)
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, from.port, false)
|
||||
return fmt.Errorf("setup is a duplicate")
|
||||
|
|
@ -652,52 +685,60 @@ func (t *virtualSnake) handleSetup(from *Peer, rx *types.Frame, nextHops types.S
|
|||
// If we're at the destination of the setup then update our predecessor
|
||||
// with information from the bootstrap.
|
||||
if rx.DestinationKey.EqualTo(t.r.public) {
|
||||
update := false
|
||||
desc := t._descending
|
||||
switch {
|
||||
case rx.SourceKey.EqualTo(t.r.public):
|
||||
// We received a setup from ourselves. This shouldn't happen,
|
||||
// We received a bootstrap from ourselves. This shouldn't happen,
|
||||
// so either another node has forwarded it to us incorrectly, or
|
||||
// a routing loop has occurred somewhere. Don't act on the setup
|
||||
// a routing loop has occurred somewhere. Don't act on the bootstrap
|
||||
// in that case.
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, from.port, false)
|
||||
return fmt.Errorf("received loopback setup")
|
||||
case desc != nil && desc.PublicKey.EqualTo(rx.SourceKey):
|
||||
// We've received another setup from our direct descending node.
|
||||
// Just refresh the record.
|
||||
// We've received another bootstrap from our direct descending node.
|
||||
// Just refresh the record and then send back an acknowledgement.
|
||||
update = true
|
||||
case desc != nil && time.Since(desc.LastSeen) >= virtualSnakeNeighExpiryPeriod:
|
||||
// We already have a direct descending node, but we haven't seen it
|
||||
// recently, so it's quite possible that it has disappeared. We'll
|
||||
// therefore handle this setup instead. If the original node comes
|
||||
// therefore handle this bootstrap instead. If the original node comes
|
||||
// back later and is closer to us then we'll end up using it again.
|
||||
case desc != nil && util.DHTOrdered(desc.PublicKey, rx.SourceKey, t.r.public):
|
||||
// We know about a descending node already but it turns out that this
|
||||
// new node that we've received a setup from is actually closer to
|
||||
// us than the previous node. We'll update our record to use the new
|
||||
// node instead and then send back a bootstrap ACK.
|
||||
update = true
|
||||
case desc == nil && util.LessThan(rx.SourceKey, t.r.public):
|
||||
// We don't know about a descending node and at the moment we don't know
|
||||
// any better candidates, so we'll accept a setup from a node with a
|
||||
// any better candidates, so we'll accept a bootstrap from a node with a
|
||||
// key lower than ours (so that it matches descending order).
|
||||
update = true
|
||||
case desc != nil && util.DHTOrdered(desc.PublicKey, rx.SourceKey, t.r.public):
|
||||
// We know about a descending node already but it turns out that this
|
||||
// new node that we've received a bootstrap from is actually closer to
|
||||
// us than the previous node. We'll update our record to use the new
|
||||
// node instead and then send back a bootstrap ACK.
|
||||
update = true
|
||||
default:
|
||||
// The setup conditions weren't met. This might just be because
|
||||
// The bootstrap conditions weren't met. This might just be because
|
||||
// there's a node out there that hasn't converged to a closer node
|
||||
// yet, so we'll just ignore the setup.
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, from.port, false)
|
||||
return fmt.Errorf("no setup conditions met")
|
||||
// yet, so we'll just ignore the bootstrap.
|
||||
}
|
||||
if desc != nil {
|
||||
// Tear down the previous path, if there was one.
|
||||
t.teardownPath(desc.PublicKey, desc.PathID, desc.Port, false)
|
||||
}
|
||||
t._descending = &virtualSnakeNeighbour{
|
||||
PublicKey: rx.SourceKey,
|
||||
Port: from.port,
|
||||
LastSeen: time.Now(),
|
||||
Coords: rx.Source,
|
||||
PathID: setup.PathID,
|
||||
RootPublicKey: setup.RootPublicKey,
|
||||
if update {
|
||||
if desc != nil {
|
||||
// Tear down the previous path, if there was one.
|
||||
t.teardownPath(desc.PublicKey, desc.PathID, desc.Port, false)
|
||||
}
|
||||
t._descending = &virtualSnakeNeighbour{
|
||||
PublicKey: rx.SourceKey,
|
||||
Port: from.port,
|
||||
LastSeen: time.Now(),
|
||||
Coords: rx.Source,
|
||||
PathID: setup.PathID,
|
||||
RootPublicKey: setup.RootPublicKey,
|
||||
}
|
||||
addToRoutingTable = true
|
||||
}
|
||||
} else {
|
||||
addToRoutingTable = true
|
||||
}
|
||||
|
||||
if addToRoutingTable {
|
||||
// Add a new routing table entry.
|
||||
index := virtualSnakeIndex{
|
||||
PublicKey: rx.SourceKey,
|
||||
|
|
@ -712,7 +753,10 @@ func (t *virtualSnake) handleSetup(from *Peer, rx *types.Frame, nextHops types.S
|
|||
entry.DestinationPort = nextHops[0]
|
||||
}
|
||||
t._table[index] = entry
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
t.teardownPath(rx.SourceKey, setup.PathID, from.port, false)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
57
util/signedts.go
Normal file
57
util/signedts.go
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/pinecone/types"
|
||||
)
|
||||
|
||||
func SignedTimestamp(private types.PrivateKey) ([]byte, error) {
|
||||
ts, err := types.Varu64(time.Now().Unix()).MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("types.Varu64.MarshalBinary: %w", err)
|
||||
}
|
||||
if _, ok := os.LookupEnv("PINECONE_DISABLE_SIGNATURES"); !ok {
|
||||
ts = append(ts, ed25519.Sign(private[:], ts)...)
|
||||
} else {
|
||||
ts = append(ts, make([]byte, ed25519.SignatureSize)...)
|
||||
}
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func VerifySignedTimestamp(public types.PublicKey, payload []byte) bool {
|
||||
if len(payload) < ed25519.SignatureSize+1 {
|
||||
return false
|
||||
}
|
||||
var ts types.Varu64
|
||||
if err := ts.UnmarshalBinary(payload); err != nil {
|
||||
return false
|
||||
}
|
||||
offset := ts.Length()
|
||||
if _, ok := os.LookupEnv("PINECONE_DISABLE_SIGNATURES"); !ok {
|
||||
if !ed25519.Verify(public[:], payload[:offset], payload[offset:]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if time.Since(time.Unix(int64(ts), 0)) > time.Minute {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue