Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: small refactoring to remove the need for a batchconn.WriteTo #4651

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions router/control/conf.go
Original file line number Diff line number Diff line change
@@ -196,13 +196,14 @@ func confExternalInterfaces(dp Dataplane, cfg *Config) error {

_, owned := cfg.BR.IFs[ifID]
if !owned {
// XXX The current implementation effectively uses IP/UDP tunnels to create
// the SCION network as an overlay, with forwarding to local hosts being a special case.
// When setting up external interfaces that belong to other routers in the AS, they
// are basically IP/UDP tunnels between the two border routers, and as such is
// configured in the data plane.
// The current implementation effectively uses IP/UDP tunnels to create the SCION
// network as an overlay, with forwarding to local hosts being a special case. When
// setting up external interfaces that belong to other routers in the AS, they are
// basically IP/UDP tunnels between the two border routers. Those are described as a
// link from the internal address of the local router to the internal address of the
// sibling router; and not to the router in the remote AS.
linkInfo.Local.Addr = cfg.BR.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr // i.e. via sibling router.
// For internal BFD always use the default configuration.
linkInfo.BFD = BFD{}
}
73 changes: 54 additions & 19 deletions router/dataplane.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,6 @@ type bfdSession interface {
// BatchConn is a connection that supports batch reads and writes.
type BatchConn interface {
ReadBatch(underlayconn.Messages) (int, error)
WriteTo([]byte, netip.AddrPort) (int, error)
WriteBatch(msgs underlayconn.Messages, flags int) (int, error)
Close() error
}
@@ -177,7 +176,6 @@ type DataPlane struct {
linkTypes map[uint16]topology.LinkType
neighborIAs map[uint16]addr.IA
peerInterfaces map[uint16]uint16
internal BatchConn
internalIP netip.Addr
internalNextHops map[uint16]netip.AddrPort
svc *services
@@ -193,6 +191,10 @@ type DataPlane struct {

ExperimentalSCMPAuthentication bool

// The forwarding queues. Each is consumed by a forwarder process and fed by
// one bfd sender and the packet processors.
fwQs map[uint16]chan *packet

// The pool that stores all the packet buffers as described in the design document. See
// https://github.com/scionproto/scion/blob/master/doc/dev/design/BorderRouter.rst
// To avoid garbage collection, most the meta-data that is produced during the processing of a
@@ -334,14 +336,12 @@ func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error {
if conn == nil {
return emptyValue
}
if d.internal != nil {
return alreadySet
}
if d.interfaces == nil {
d.interfaces = make(map[uint16]BatchConn)
} else if d.interfaces[0] != nil {
return alreadySet
}
d.interfaces[0] = conn
d.internal = conn
d.internalIP = ip
return nil
}
@@ -361,7 +361,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() {
return emptyValue
}
err := d.addExternalInterfaceBFD(ifID, conn, src, dst, cfg)
err := d.addExternalInterfaceBFD(ifID, src, dst, cfg)
if err != nil {
return serrors.Wrap("adding external BFD", err, "if_id", ifID)
}
@@ -381,7 +381,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,

// AddNeighborIA adds the neighboring IA for a given interface ID. If an IA for
// the given ID is already set, this method will return an error. This can only
// be called on a yet running dataplane.
// be called on a not yet running dataplane.
func (d *DataPlane) AddNeighborIA(ifID uint16, remote addr.IA) error {
d.mtx.Lock()
defer d.mtx.Unlock()
@@ -439,7 +439,7 @@ func (d *DataPlane) AddRemotePeer(local, remote uint16) error {
}

// AddExternalInterfaceBFD adds the inter AS connection BFD session.
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16,
src, dst control.LinkEnd, cfg control.BFD) error {

if *cfg.Disable {
@@ -459,7 +459,7 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
PacketsReceived: d.Metrics.BFDPacketsReceived.With(labels),
}
}
s, err := newBFDSend(conn, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
s, err := newBFDSend(d, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
if err != nil {
return err
}
@@ -477,7 +477,7 @@ func (d *DataPlane) getInterfaceState(ifID uint16) control.InterfaceState {
return control.InterfaceUp
}

func (d *DataPlane) addBFDController(ifID uint16, s *bfdSend, cfg control.BFD,
func (d *DataPlane) addBFDController(ifID uint16, s bfd.Sender, cfg control.BFD,
metrics bfd.Metrics) error {

if d.bfdSessions == nil {
@@ -599,7 +599,7 @@ func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg cont
}
}

s, err := newBFDSend(d.internal, d.localIA, d.localIA, src, dst, 0, d.macFactory())
s, err := newBFDSend(d, d.localIA, d.localIA, src, dst, 0, d.macFactory())
if err != nil {
return err
}
@@ -621,7 +621,6 @@ type RunConfig struct {

func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {
d.mtx.Lock()
d.setRunning()
d.initMetrics()

processorQueueSize := max(
@@ -630,7 +629,9 @@ func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {

d.initPacketPool(cfg, processorQueueSize)
procQs, fwQs, slowQs := initQueues(cfg, d.interfaces, processorQueueSize)
d.fwQs = fwQs // Shared with BFD senders

d.setRunning()
for ifID, conn := range d.interfaces {
go func(ifID uint16, conn BatchConn) {
defer log.HandlePanic()
@@ -759,7 +760,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,

// Give a new buffer to the msgs elements that have been used in the previous loop.
for i := 0; i < cfg.BatchSize-numReusable; i++ {
p := <-d.packetPool
p := d.getPacketFromPool()
p.reset()
packets[i] = p
msgs[i].Buffers[0] = p.rawPacket
@@ -805,6 +806,10 @@ func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, e
return s % uint32(numProcRoutines), nil
}

func (d *DataPlane) getPacketFromPool() *packet {
return <-d.packetPool
}

func (d *DataPlane) returnPacketToPool(pkt *packet) {
d.packetPool <- pkt
}
@@ -1821,6 +1826,7 @@ func (p *scionPacketProcessor) handleEgressRouterAlert() disposition {
return pForward
}
if _, ok := p.d.external[p.pkt.egress]; !ok {
// the egress router is not this one.
return pForward
}
*alert = false
@@ -1990,7 +1996,6 @@ func (p *scionPacketProcessor) process() disposition {
if disp := p.validateEgressUp(); disp != pForward {
return disp
}

if _, ok := p.d.external[egressID]; ok {
// Not ASTransit in
if disp := p.processEgress(); disp != pForward {
@@ -2316,7 +2321,8 @@ func updateSCIONLayer(rawPkt []byte, s slayers.SCION, buffer gopacket.SerializeB
}

type bfdSend struct {
conn BatchConn
dataPlane *DataPlane
ifID uint16
srcAddr, dstAddr netip.AddrPort
scn *slayers.SCION
ohp *onehop.Path
@@ -2326,7 +2332,7 @@ type bfdSend struct {
}

// newBFDSend creates and initializes a BFD Sender
func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
func newBFDSend(d *DataPlane, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
ifID uint16, mac hash.Hash) (*bfdSend, error) {

scn := &slayers.SCION{
@@ -2373,8 +2379,13 @@ func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.Add
scn.Path = ohp
}

// bfdSend includes a reference to the dataplane. In general this must not be used until the
// dataplane is running. This is ensured by the fact that bfdSend objects are owned by bfd
// sessions, which are started by dataplane.Run() itself.

return &bfdSend{
conn: conn,
dataPlane: d,
ifID: ifID,
srcAddr: srcAddr,
dstAddr: dstAddr,
scn: scn,
@@ -2405,7 +2416,31 @@ func (b *bfdSend) Send(bfd *layers.BFD) error {
if err != nil {
return err
}
_, err = b.conn.WriteTo(b.buffer.Bytes(), b.dstAddr)

// BfdControllers and fwQs are initialized from the same set of ifIDs. So not finding
// the forwarding queue is an serious internal error. Let that panic.
fwChan, _ := b.dataPlane.fwQs[b.ifID]

p := b.dataPlane.getPacketFromPool()
p.reset()

// TODO: it would be best to serialize directly into the packet buffer. This would require
// a custom SerializeBuffer implementation and some changes to the packet structure. To be
// considered in a future refactoring.
sz := copy(p.rawPacket, b.buffer.Bytes())
p.rawPacket = p.rawPacket[:sz]
if b.ifID == 0 {
// Using the internal interface: must specify the destination address
updateNetAddrFromAddrPort(p.dstAddr, b.dstAddr)
}
// No need to specify pkt.egress. It isn't used downstream from here.
select {
case fwChan <- p:
default:
// We do not care if some BFD packets get bounced under high load. If it becomes a problem,
// the solution is do use BFD's demand-mode. To be considered in a future refactoring.
b.dataPlane.returnPacketToPool(p)
}
return err
}

4 changes: 2 additions & 2 deletions router/dataplane_internal_test.go
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ func TestReceiver(t *testing.T) {
dp.setRunning()
dp.initMetrics()
go func() {
dp.runReceiver(0, dp.internal, runConfig, procCh)
dp.runReceiver(0, dp.interfaces[0], runConfig, procCh)
}()
ptrMap := make(map[uintptr]struct{})
for i := 0; i < 21; i++ {
@@ -182,7 +182,7 @@ func TestForwarder(t *testing.T) {
initialPoolSize := len(dp.packetPool)
dp.setRunning()
dp.initMetrics()
go dp.runForwarder(0, dp.internal, runConfig, fwCh[0])
go dp.runForwarder(0, dp.interfaces[0], runConfig, fwCh[0])

dstAddr := &net.UDPAddr{IP: net.IP{10, 0, 200, 200}}
for i := 0; i < 255; i++ {
33 changes: 16 additions & 17 deletions router/dataplane_test.go
Original file line number Diff line number Diff line change
@@ -306,7 +306,7 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
l := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Addr: netip.MustParseAddrPort("10.0.0.100:0"),
@@ -373,10 +373,9 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mInternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)
if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
v := b.(*layers.BFD).YourDiscriminator
@@ -391,7 +390,7 @@ func TestDataPlaneRun(t *testing.T) {

return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := netip.MustParseAddrPort("10.0.200.100:0")
_ = ret.SetKey([]byte("randomkeyformacs"))
@@ -410,9 +409,9 @@ func TestDataPlaneRun(t *testing.T) {
localAddr := netip.MustParseAddrPort("10.0.200.100:0")
remoteAddr := netip.MustParseAddrPort("10.0.200.200:0")
mInternal := mock_router.NewMockBatchConn(ctrl)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
@@ -464,9 +463,9 @@ func TestDataPlaneRun(t *testing.T) {

mExternal := mock_router.NewMockBatchConn(ctrl)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
@@ -491,7 +490,7 @@ func TestDataPlaneRun(t *testing.T) {
done <- struct{}{}
return 1, nil
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
@@ -552,9 +551,9 @@ func TestDataPlaneRun(t *testing.T) {
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
@@ -569,7 +568,7 @@ func TestDataPlaneRun(t *testing.T) {
}
return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
3 changes: 2 additions & 1 deletion router/export_test.go
Original file line number Diff line number Diff line change
@@ -76,6 +76,7 @@ func NewDP(
key []byte) *DataPlane {

dp := &DataPlane{
interfaces: map[uint16]BatchConn{0: internal},
localIA: local,
external: external,
linkTypes: linkTypes,
@@ -84,10 +85,10 @@ func NewDP(
dispatchedPortStart: uint16(dispatchedPortStart),
dispatchedPortEnd: uint16(dispatchedPortEnd),
svc: &services{m: svc},
internal: internal,
internalIP: netip.MustParseAddr("198.51.100.1"),
Metrics: metrics,
}

if err := dp.SetKey(key); err != nil {
panic(err)
}
16 changes: 0 additions & 16 deletions router/mock_router/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.