From 04350604d75545a1098a7a90cb011b7ca374463c Mon Sep 17 00:00:00 2001 From: Jean-Christophe Hugly Date: Thu, 7 Nov 2024 14:25:44 +0100 Subject: [PATCH 1/4] router: small refactoring to remove the need for a batchconn.WriteTo() API. WriteTo needs to be given the destination address explicitly. WriteBatch, on the other end, can either find it in each packet structure, or rely on the connection's destination. WriteTo is only used to send BFD packets. It turns out that BFD packets can also very easily be sent via the regular forwarders that use WriteBtach. The motivation to do that is to simplify the interface between the dataplane and the forwarders, in view of supporting multiple underlays with possibly very different interfaces. So the channels around the processor tasks seems like a good universal interface. --- router/control/conf.go | 13 +++++---- router/dataplane.go | 59 +++++++++++++++++++++++++++++++------- router/dataplane_test.go | 33 +++++++++++---------- router/mock_router/mock.go | 16 ----------- 4 files changed, 71 insertions(+), 50 deletions(-) diff --git a/router/control/conf.go b/router/control/conf.go index fc8c0520f9..99c9859a8b 100644 --- a/router/control/conf.go +++ b/router/control/conf.go @@ -196,13 +196,14 @@ func confExternalInterfaces(dp Dataplane, cfg *Config) error { _, owned := cfg.BR.IFs[ifID] if !owned { - // XXX The current implementation effectively uses IP/UDP tunnels to create - // the SCION network as an overlay, with forwarding to local hosts being a special case. - // When setting up external interfaces that belong to other routers in the AS, they - // are basically IP/UDP tunnels between the two border routers, and as such is - // configured in the data plane. + // The current implementation effectively uses IP/UDP tunnels to create the SCION + // network as an overlay, with forwarding to local hosts being a special case. When + // setting up external interfaces that belong to other routers in the AS, they are + // basically IP/UDP tunnels between the two border routers. Those are described as a + // link from the internal address of the local router to the internal address of the + // sibling router; and not to the router in the remote AS. linkInfo.Local.Addr = cfg.BR.InternalAddr - linkInfo.Remote.Addr = iface.InternalAddr + linkInfo.Remote.Addr = iface.InternalAddr // i.e. via sibling router. // For internal BFD always use the default configuration. linkInfo.BFD = BFD{} } diff --git a/router/dataplane.go b/router/dataplane.go index 1701fcc8ab..3b696c347f 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -86,7 +86,6 @@ type bfdSession interface { // BatchConn is a connection that supports batch reads and writes. type BatchConn interface { ReadBatch(underlayconn.Messages) (int, error) - WriteTo([]byte, netip.AddrPort) (int, error) WriteBatch(msgs underlayconn.Messages, flags int) (int, error) Close() error } @@ -193,6 +192,10 @@ type DataPlane struct { ExperimentalSCMPAuthentication bool + // The forwarding queues. Each is consumed by a forwarder process and fed by + // one bfd sender and the packet processors. + fwQs map[uint16]chan *packet + // The pool that stores all the packet buffers as described in the design document. See // https://github.com/scionproto/scion/blob/master/doc/dev/design/BorderRouter.rst // To avoid garbage collection, most the meta-data that is produced during the processing of a @@ -228,6 +231,7 @@ var ( errBFDSessionDown = errors.New("bfd session down") expiredHop = errors.New("expired hop") ingressInterfaceInvalid = errors.New("ingress interface invalid") + egressInterfaceInvalid = errors.New("egress interface invalid") macVerificationFailed = errors.New("MAC verification failed") badPacketSize = errors.New("bad packet size") @@ -361,7 +365,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn, if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() { return emptyValue } - err := d.addExternalInterfaceBFD(ifID, conn, src, dst, cfg) + err := d.addExternalInterfaceBFD(ifID, src, dst, cfg) if err != nil { return serrors.Wrap("adding external BFD", err, "if_id", ifID) } @@ -439,7 +443,7 @@ func (d *DataPlane) AddRemotePeer(local, remote uint16) error { } // AddExternalInterfaceBFD adds the inter AS connection BFD session. -func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn, +func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, src, dst control.LinkEnd, cfg control.BFD) error { if *cfg.Disable { @@ -459,7 +463,7 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn, PacketsReceived: d.Metrics.BFDPacketsReceived.With(labels), } } - s, err := newBFDSend(conn, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory()) + s, err := newBFDSend(d, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory()) if err != nil { return err } @@ -599,7 +603,7 @@ func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg cont } } - s, err := newBFDSend(d.internal, d.localIA, d.localIA, src, dst, 0, d.macFactory()) + s, err := newBFDSend(d, d.localIA, d.localIA, src, dst, 0, d.macFactory()) if err != nil { return err } @@ -621,7 +625,6 @@ type RunConfig struct { func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error { d.mtx.Lock() - d.setRunning() d.initMetrics() processorQueueSize := max( @@ -630,7 +633,9 @@ func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error { d.initPacketPool(cfg, processorQueueSize) procQs, fwQs, slowQs := initQueues(cfg, d.interfaces, processorQueueSize) + d.fwQs = fwQs // Shared with BFD senders + d.setRunning() for ifID, conn := range d.interfaces { go func(ifID uint16, conn BatchConn) { defer log.HandlePanic() @@ -759,7 +764,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, // Give a new buffer to the msgs elements that have been used in the previous loop. for i := 0; i < cfg.BatchSize-numReusable; i++ { - p := <-d.packetPool + p := d.getPacketFromPool() p.reset() packets[i] = p msgs[i].Buffers[0] = p.rawPacket @@ -805,6 +810,10 @@ func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, e return s % uint32(numProcRoutines), nil } +func (d *DataPlane) getPacketFromPool() *packet { + return <-d.packetPool +} + func (d *DataPlane) returnPacketToPool(pkt *packet) { d.packetPool <- pkt } @@ -2316,7 +2325,8 @@ func updateSCIONLayer(rawPkt []byte, s slayers.SCION, buffer gopacket.SerializeB } type bfdSend struct { - conn BatchConn + dataPlane *DataPlane + ifID uint16 srcAddr, dstAddr netip.AddrPort scn *slayers.SCION ohp *onehop.Path @@ -2326,7 +2336,7 @@ type bfdSend struct { } // newBFDSend creates and initializes a BFD Sender -func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort, +func newBFDSend(d *DataPlane, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort, ifID uint16, mac hash.Hash) (*bfdSend, error) { scn := &slayers.SCION{ @@ -2373,8 +2383,13 @@ func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.Add scn.Path = ohp } + // bfdSend includes a reference to the dataplane. In general this must not be used until the + // dataplane is running. This is ensured by the fact that bfdSend objects are owned by bfd + // sessions, which are started by dataplane.Run() itself. + return &bfdSend{ - conn: conn, + dataPlane: d, + ifID: ifID, srcAddr: srcAddr, dstAddr: dstAddr, scn: scn, @@ -2405,7 +2420,29 @@ func (b *bfdSend) Send(bfd *layers.BFD) error { if err != nil { return err } - _, err = b.conn.WriteTo(b.buffer.Bytes(), b.dstAddr) + + fwChan, ok := b.dataPlane.fwQs[b.ifID] + if !ok { + // WTF? May be we should just treat that as a panic-able offense + return egressInterfaceInvalid + } + + p := b.dataPlane.getPacketFromPool() + p.reset() + + // FIXME: would rather build the pkt directly into the rawPacket's buffer. + sz := copy(p.rawPacket, b.buffer.Bytes()) + p.rawPacket = p.rawPacket[:sz] + if b.ifID == 0 { + // Using the internal interface: must specify the destination address + updateNetAddrFromAddrPort(p.dstAddr, b.dstAddr) + } + // No need to specify pkt.egress. It isn't used downstream from here. + select { + case fwChan <- p: + default: + b.dataPlane.returnPacketToPool(p) // Do we care enough to have a metric? + } return err } diff --git a/router/dataplane_test.go b/router/dataplane_test.go index 87f2e9f1b4..1c812f9816 100644 --- a/router/dataplane_test.go +++ b/router/dataplane_test.go @@ -306,7 +306,7 @@ func TestDataPlaneRun(t *testing.T) { }, ).Times(1) mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes() - mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() + mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() l := control.LinkEnd{ IA: addr.MustParseIA("1-ff00:0:1"), Addr: netip.MustParseAddrPort("10.0.0.100:0"), @@ -373,10 +373,9 @@ func TestDataPlaneRun(t *testing.T) { }, ).Times(1) mInternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes() - - mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn( - func(data []byte, _ netip.AddrPort) (int, error) { - pkt := gopacket.NewPacket(data, + mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn( + func(msgs underlayconn.Messages, _ int) (int, error) { + pkt := gopacket.NewPacket(msgs[0].Buffers[0], slayers.LayerTypeSCION, gopacket.Default) if b := pkt.Layer(layers.LayerTypeBFD); b != nil { v := b.(*layers.BFD).YourDiscriminator @@ -391,7 +390,7 @@ func TestDataPlaneRun(t *testing.T) { return 0, fmt.Errorf("no valid BFD message") }).MinTimes(1) - mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() + mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() local := netip.MustParseAddrPort("10.0.200.100:0") _ = ret.SetKey([]byte("randomkeyformacs")) @@ -410,9 +409,9 @@ func TestDataPlaneRun(t *testing.T) { localAddr := netip.MustParseAddrPort("10.0.200.100:0") remoteAddr := netip.MustParseAddrPort("10.0.200.200:0") mInternal := mock_router.NewMockBatchConn(ctrl) - mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn( - func(data []byte, _ netip.AddrPort) (int, error) { - pkt := gopacket.NewPacket(data, + mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn( + func(msgs underlayconn.Messages, _ int) (int, error) { + pkt := gopacket.NewPacket(msgs[0].Buffers[0], slayers.LayerTypeSCION, gopacket.Default) if b := pkt.Layer(layers.LayerTypeBFD); b == nil { @@ -464,9 +463,9 @@ func TestDataPlaneRun(t *testing.T) { mExternal := mock_router.NewMockBatchConn(ctrl) mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes() - mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn( - func(data []byte, _ netip.AddrPort) (int, error) { - pkt := gopacket.NewPacket(data, + mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn( + func(msgs underlayconn.Messages, _ int) (int, error) { + pkt := gopacket.NewPacket(msgs[0].Buffers[0], slayers.LayerTypeSCION, gopacket.Default) if b := pkt.Layer(layers.LayerTypeBFD); b == nil { @@ -491,7 +490,7 @@ func TestDataPlaneRun(t *testing.T) { done <- struct{}{} return 1, nil }).MinTimes(1) - mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() + mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() local := control.LinkEnd{ IA: addr.MustParseIA("1-ff00:0:1"), @@ -552,9 +551,9 @@ func TestDataPlaneRun(t *testing.T) { ).Times(1) mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes() - mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn( - func(data []byte, _ netip.AddrPort) (int, error) { - pkt := gopacket.NewPacket(data, + mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn( + func(msgs underlayconn.Messages, _ int) (int, error) { + pkt := gopacket.NewPacket(msgs[0].Buffers[0], slayers.LayerTypeSCION, gopacket.Default) if b := pkt.Layer(layers.LayerTypeBFD); b != nil { @@ -569,7 +568,7 @@ func TestDataPlaneRun(t *testing.T) { } return 0, fmt.Errorf("no valid BFD message") }).MinTimes(1) - mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() + mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() local := control.LinkEnd{ IA: addr.MustParseIA("1-ff00:0:1"), diff --git a/router/mock_router/mock.go b/router/mock_router/mock.go index d2c5a6fb87..b5238fb4fa 100644 --- a/router/mock_router/mock.go +++ b/router/mock_router/mock.go @@ -5,7 +5,6 @@ package mock_router import ( - netip "net/netip" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -78,18 +77,3 @@ func (mr *MockBatchConnMockRecorder) WriteBatch(arg0, arg1 interface{}) *gomock. mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*MockBatchConn)(nil).WriteBatch), arg0, arg1) } - -// WriteTo mocks base method. -func (m *MockBatchConn) WriteTo(arg0 []byte, arg1 netip.AddrPort) (int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteTo", arg0, arg1) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WriteTo indicates an expected call of WriteTo. -func (mr *MockBatchConnMockRecorder) WriteTo(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTo", reflect.TypeOf((*MockBatchConn)(nil).WriteTo), arg0, arg1) -} From 2ac2725663d199cb4c73df0f6ab175e4700ed50a Mon Sep 17 00:00:00 2001 From: Jean-Christophe Hugly Date: Tue, 12 Nov 2024 17:23:12 +0100 Subject: [PATCH 2/4] Remove the field "internal" from the dataplane structure. It was duplicating interfaces[0] and existed only for compatibility (backward, I presume) with some tests. --- router/dataplane.go | 11 ++++------- router/dataplane_internal_test.go | 4 ++-- router/export_test.go | 3 ++- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/router/dataplane.go b/router/dataplane.go index 3b696c347f..de4872ba17 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -176,7 +176,6 @@ type DataPlane struct { linkTypes map[uint16]topology.LinkType neighborIAs map[uint16]addr.IA peerInterfaces map[uint16]uint16 - internal BatchConn internalIP netip.Addr internalNextHops map[uint16]netip.AddrPort svc *services @@ -338,14 +337,12 @@ func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error { if conn == nil { return emptyValue } - if d.internal != nil { - return alreadySet - } if d.interfaces == nil { d.interfaces = make(map[uint16]BatchConn) + } else if d.interfaces[0] != nil { + return alreadySet } d.interfaces[0] = conn - d.internal = conn d.internalIP = ip return nil } @@ -481,7 +478,7 @@ func (d *DataPlane) getInterfaceState(ifID uint16) control.InterfaceState { return control.InterfaceUp } -func (d *DataPlane) addBFDController(ifID uint16, s *bfdSend, cfg control.BFD, +func (d *DataPlane) addBFDController(ifID uint16, s bfd.Sender, cfg control.BFD, metrics bfd.Metrics) error { if d.bfdSessions == nil { @@ -1830,6 +1827,7 @@ func (p *scionPacketProcessor) handleEgressRouterAlert() disposition { return pForward } if _, ok := p.d.external[p.pkt.egress]; !ok { + // the egress router is not this one. return pForward } *alert = false @@ -1999,7 +1997,6 @@ func (p *scionPacketProcessor) process() disposition { if disp := p.validateEgressUp(); disp != pForward { return disp } - if _, ok := p.d.external[egressID]; ok { // Not ASTransit in if disp := p.processEgress(); disp != pForward { diff --git a/router/dataplane_internal_test.go b/router/dataplane_internal_test.go index 86b91f4088..f6274f5f3c 100644 --- a/router/dataplane_internal_test.go +++ b/router/dataplane_internal_test.go @@ -89,7 +89,7 @@ func TestReceiver(t *testing.T) { dp.setRunning() dp.initMetrics() go func() { - dp.runReceiver(0, dp.internal, runConfig, procCh) + dp.runReceiver(0, dp.interfaces[0], runConfig, procCh) }() ptrMap := make(map[uintptr]struct{}) for i := 0; i < 21; i++ { @@ -182,7 +182,7 @@ func TestForwarder(t *testing.T) { initialPoolSize := len(dp.packetPool) dp.setRunning() dp.initMetrics() - go dp.runForwarder(0, dp.internal, runConfig, fwCh[0]) + go dp.runForwarder(0, dp.interfaces[0], runConfig, fwCh[0]) dstAddr := &net.UDPAddr{IP: net.IP{10, 0, 200, 200}} for i := 0; i < 255; i++ { diff --git a/router/export_test.go b/router/export_test.go index e5f4cc0d18..1c87bcce8b 100644 --- a/router/export_test.go +++ b/router/export_test.go @@ -76,6 +76,7 @@ func NewDP( key []byte) *DataPlane { dp := &DataPlane{ + interfaces: map[uint16]BatchConn{0: internal}, localIA: local, external: external, linkTypes: linkTypes, @@ -84,10 +85,10 @@ func NewDP( dispatchedPortStart: uint16(dispatchedPortStart), dispatchedPortEnd: uint16(dispatchedPortEnd), svc: &services{m: svc}, - internal: internal, internalIP: netip.MustParseAddr("198.51.100.1"), Metrics: metrics, } + if err := dp.SetKey(key); err != nil { panic(err) } From c487886308f0c6362fb1df2955572562274201b4 Mon Sep 17 00:00:00 2001 From: Jean-Christophe Hugly Date: Wed, 13 Nov 2024 19:40:04 +0100 Subject: [PATCH 3/4] Implement reviewer's suggestions. --- router/dataplane.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/router/dataplane.go b/router/dataplane.go index de4872ba17..869e95cb7b 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -230,7 +230,6 @@ var ( errBFDSessionDown = errors.New("bfd session down") expiredHop = errors.New("expired hop") ingressInterfaceInvalid = errors.New("ingress interface invalid") - egressInterfaceInvalid = errors.New("egress interface invalid") macVerificationFailed = errors.New("MAC verification failed") badPacketSize = errors.New("bad packet size") @@ -382,7 +381,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn, // AddNeighborIA adds the neighboring IA for a given interface ID. If an IA for // the given ID is already set, this method will return an error. This can only -// be called on a yet running dataplane. +// be called on a not yet running dataplane. func (d *DataPlane) AddNeighborIA(ifID uint16, remote addr.IA) error { d.mtx.Lock() defer d.mtx.Unlock() @@ -2418,16 +2417,16 @@ func (b *bfdSend) Send(bfd *layers.BFD) error { return err } - fwChan, ok := b.dataPlane.fwQs[b.ifID] - if !ok { - // WTF? May be we should just treat that as a panic-able offense - return egressInterfaceInvalid - } + // BfdControllers and fwQs are initialized from the same set of ifIDs. So not finding + // the forwarding queue is an serious internal error. Let that panic. + fwChan, _ := b.dataPlane.fwQs[b.ifID] p := b.dataPlane.getPacketFromPool() p.reset() - // FIXME: would rather build the pkt directly into the rawPacket's buffer. + // TODO: it would be best to serialize directly into the packet buffer. This would require + // a custom SerializeBuffer implementation and some changes to the packet structure. To be + // considered in a future refactoring. sz := copy(p.rawPacket, b.buffer.Bytes()) p.rawPacket = p.rawPacket[:sz] if b.ifID == 0 { @@ -2438,7 +2437,9 @@ func (b *bfdSend) Send(bfd *layers.BFD) error { select { case fwChan <- p: default: - b.dataPlane.returnPacketToPool(p) // Do we care enough to have a metric? + // We do not care if some BFD packets get bounced under high load. If it becomes a problem, + // the solution is do use BFD's demand-mode. To be considered in a future refactoring. + b.dataPlane.returnPacketToPool(p) } return err } From 6e7fd1dd305ab600009a1a14ceb908f5e3ce852a Mon Sep 17 00:00:00 2001 From: Jean-Christophe Hugly Date: Fri, 15 Nov 2024 11:00:28 +0100 Subject: [PATCH 4/4] make BFD send fail fast if the interface doens't exist. --- router/dataplane.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/dataplane.go b/router/dataplane.go index 869e95cb7b..c1322a5f4a 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -2419,7 +2419,7 @@ func (b *bfdSend) Send(bfd *layers.BFD) error { // BfdControllers and fwQs are initialized from the same set of ifIDs. So not finding // the forwarding queue is an serious internal error. Let that panic. - fwChan, _ := b.dataPlane.fwQs[b.ifID] + fwChan := b.dataPlane.fwQs[b.ifID] p := b.dataPlane.getPacketFromPool() p.reset()