Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect interface changes and start/stop new/removed lldp discovery #73

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions cmd/internal/core/core.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"sync"
"time"

v1 "github.com/metal-stack/metal-api/pkg/api/v1"
Expand Down Expand Up @@ -29,6 +30,9 @@ type Core struct {

driver metalgo.Client
eventServiceClient v1.EventServiceClient

interfaces sync.Map
interfaceCancelFuncs sync.Map
}

type Config struct {
Expand Down Expand Up @@ -73,5 +77,7 @@ func New(c Config) *Core {
frrTplFile: c.FrrTplFile,
driver: c.Driver,
eventServiceClient: c.EventServiceClient,
interfaces: sync.Map{},
interfaceCancelFuncs: sync.Map{},
}
}
68 changes: 68 additions & 0 deletions cmd/internal/core/detect-changes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package core

import (
"context"
"net"
"strings"
"time"

"github.com/metal-stack/go-lldpd/pkg/lldp"
)

const (
detectChangesInterval = time.Minute
)

func (c *Core) DetectInterfaceChanges(ctx context.Context, discoveryResultChan chan lldp.DiscoveryResult) {
ticker := time.NewTicker(detectChangesInterval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.log.Info("checking for port changes")
ifs, err := net.Interfaces()
if err != nil {
c.log.Errorw("unable to gather interfaces, ignoring", "error", err)
continue
}
actualInterfaces := []string{}
for _, iface := range ifs {
// consider only switch port interfaces
if !strings.HasPrefix(iface.Name, "swp") {
continue
}
actualInterfaces = append(actualInterfaces, iface.Name)
}
existingInterfaces := []string{}
c.interfaces.Range(func(key, value any) bool {
existingInterfaces = append(existingInterfaces, key.(string))
return true
})

addedInterfaces, removedInterfaces := difference(existingInterfaces, actualInterfaces)

if len(addedInterfaces) == 0 && len(removedInterfaces) == 0 {
c.log.Info("no port changes detected")
continue
} else {
c.log.Infow("switch interfaces changed, re-register switch", "added", addedInterfaces, "removed", removedInterfaces)
c.RegisterSwitch()
}

for _, i := range removedInterfaces {
c.log.Infow("remove lldp discovery for", "interfaces", i)
c.stopLLDPDiscovery(i)
}
for _, i := range addedInterfaces {
iface, err := net.InterfaceByName(i)
if err != nil {
c.log.Errorw("unable to get interface by name", "interface", i, "error", err)
continue
}
c.log.Infow("add lldp discovery for", "interfaces", *iface)
c.startLLDPDiscovery(ctx, discoveryResultChan, *iface)
}
}
}
}
112 changes: 68 additions & 44 deletions cmd/internal/core/phone-home.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"golang.org/x/exp/slices"

"github.com/metal-stack/go-lldpd/pkg/lldp"
v1 "github.com/metal-stack/metal-api/pkg/api/v1"
"go.uber.org/zap"
Expand All @@ -23,40 +25,17 @@ const (
// provisioning event to metal-api for each machine that sent at least one
// phone-home LLDP package to any interface of the host machine
// during this interval.
func (c *Core) ConstantlyPhoneHome() {
// FIXME this list of interfaces is only read on startup
// if additional interfaces are configured, no new lldpd client is started and therefore no
// phoned home events are sent for these interfaces.
// Solution:
// - either ensure metal-core is restarted on interfaces added/removed
// - dynamically detect changes and stop/start goroutines for the lldpd client per interface
func (c *Core) ConstantlyPhoneHome(ctx context.Context, discoveryResultChan chan lldp.DiscoveryResult) {
ifs, err := net.Interfaces()
if err != nil {
c.log.Errorw("unable to find interfaces", "error", err)
os.Exit(1)
}

discoveryResultChan := make(chan lldp.DiscoveryResult)

// FIXME context should come from caller and canceled on shutdown
ctx := context.Background()

phoneHomeMessages := sync.Map{}
// initial interface discovery
for _, iface := range ifs {
// consider only switch port interfaces
if !strings.HasPrefix(iface.Name, "swp") {
continue
}
lldpcli, err := lldp.NewClient(ctx, iface)
if err != nil {
c.log.Errorw("unable to start LLDP client", "interface", iface.Name, "error", err)
continue
}
c.log.Infow("start lldp client", "interface", iface.Name)

// constantly observe LLDP traffic on current machine and current interface
go lldpcli.Start(discoveryResultChan)

c.startLLDPDiscovery(ctx, discoveryResultChan, iface)
}
// extract phone home messages from fetched LLDP packages
go func() {
Expand All @@ -73,26 +52,24 @@ func (c *Core) ConstantlyPhoneHome() {

// send arrived messages on a ticker basis
ticker := time.NewTicker(phonedHomeInterval)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
msgs := []phoneHomeMessage{}
phoneHomeMessages.Range(func(key, value interface{}) bool {
msg, ok := value.(phoneHomeMessage)
if !ok {
return true
}
phoneHomeMessages.Delete(key)
msgs = append(msgs, msg)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
msgs := []phoneHomeMessage{}
phoneHomeMessages.Range(func(key, value any) bool {
msg, ok := value.(phoneHomeMessage)
if !ok {
return true
})
c.phoneHome(msgs)
}
}
phoneHomeMessages.Delete(key)
msgs = append(msgs, msg)
return true
})
c.phoneHome(msgs)
}
}()
}
}

func (c *Core) send(event *v1.EventServiceSendRequest) (*v1.EventServiceSendResponse, error) {
Expand Down Expand Up @@ -152,3 +129,50 @@ func toPhoneHomeMessage(discoveryResult lldp.DiscoveryResult) *phoneHomeMessage
}
return nil
}

func (c *Core) startLLDPDiscovery(ctx context.Context, discoveryResultChan chan lldp.DiscoveryResult, iface net.Interface) {
// consider only switch port interfaces
if !strings.HasPrefix(iface.Name, "swp") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cmd/internal/core/detect-changes.go line 32 the interface is checked for "swp" prefix.
If the same check gets kept in cmd/internal/core/phone-home.go then behavious is consistent across callers (both only call this function for "swp" interfaces) and the duplicate check here could be dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is a bit too spread, will think of a solution, maybe factoring interface listing to a separate func (which also detects switch os in the future, SONiC hast "Ethernet.." )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the PR #68 for a possible solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

return
}
ifacectx, cancel := context.WithCancel(ctx)
lldpcli, err := lldp.NewClient(ifacectx, iface)
if err != nil {
c.log.Errorw("unable to start LLDP client", "interface", iface.Name, "error", err)
cancel()
return
}
c.log.Infow("start lldp client", "interface", iface.Name)

// constantly observe LLDP traffic on current machine and current interface
go lldpcli.Start(discoveryResultChan)

c.interfaces.Store(iface.Name, iface)
c.interfaceCancelFuncs.Store(iface.Name, cancel)
}

func (c *Core) stopLLDPDiscovery(iface string) {
value, ok := c.interfaceCancelFuncs.Load(iface)
if !ok {
return
}
f := value.(context.CancelFunc)
f()
c.interfaceCancelFuncs.Delete(iface)
c.interfaces.Delete(iface)
}

func difference[E comparable](old, new []E) (added, removed []E) {
for _, n := range new {
if !slices.Contains(old, n) {
added = append(added, n)
}
}

for _, o := range old {
if !slices.Contains(new, o) {
removed = append(removed, o)
}
}
return added, removed
}
59 changes: 59 additions & 0 deletions cmd/internal/core/phone-home_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package core

import (
"testing"

"golang.org/x/exp/slices"
)

func Test_difference(t *testing.T) {
type args struct {
}
tests := []struct {
name string
old []string
new []string
wantAdded []string
wantRemoved []string
}{
{
name: "equal",
old: []string{"a", "b", "c"},
new: []string{"a", "b", "c"},
wantAdded: []string{},
wantRemoved: []string{},
},
{
name: "one added",
old: []string{"a", "b", "c"},
new: []string{"a", "b", "d", "c"},
wantAdded: []string{"d"},
wantRemoved: []string{},
},
{
name: "one removed",
old: []string{"a", "b", "d", "c"},
new: []string{"a", "b", "c"},
wantAdded: []string{},
wantRemoved: []string{"d"},
},
{
name: "more added and removed",
old: []string{"a", "x", "b", "d", "c"},
new: []string{"a", "b", "c", "z", "j"},
wantAdded: []string{"z", "j"},
wantRemoved: []string{"x", "d"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotAdded, gotRemoved := difference(tt.old, tt.new)
if !slices.Equal(gotAdded, tt.wantAdded) {
t.Errorf("difference() gotAdded = %v, want %v", gotAdded, tt.wantAdded)
}
if !slices.Equal(gotRemoved, tt.wantRemoved) {
t.Errorf("difference() gotRemoved = %v, want %v", gotRemoved, tt.wantRemoved)
}
})
}
}
2 changes: 1 addition & 1 deletion cmd/internal/core/reconfigure-switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Core) reconfigureSwitch(switchName string) error {
return fmt.Errorf("could not gather information about eth0 nic: %w", err)
}

c.log.Infow("assembled new config for switch", "config", switchConfig)
c.log.Debugw("assembled new config for switch", "config", switchConfig)
if !c.enableReconfigureSwitch {
c.log.Debug("skip config application because of environment setting")
return nil
Expand Down
21 changes: 9 additions & 12 deletions cmd/internal/core/register-switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

sw "github.com/metal-stack/metal-go/api/client/switch_operations"
"github.com/metal-stack/metal-go/api/models"
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

func (c *Core) RegisterSwitch() error {
Expand Down Expand Up @@ -46,26 +46,23 @@ func (c *Core) RegisterSwitch() error {
c.log.Errorw("unable to register at metal-api, retrying", "error", err)
time.Sleep(30 * time.Second)
}
c.log.Infow("register switch completed")
c.log.Infow("register switch completed", "params", params)
return nil
}

func getNics(log *zap.SugaredLogger, blacklist []string) ([]*models.V1SwitchNic, error) {
var nics []*models.V1SwitchNic
links, err := netlink.LinkList()
links, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("unable to get all links: %w", err)
}
links:
for _, l := range links {
attrs := l.Attrs()
name := attrs.Name
mac := attrs.HardwareAddr.String()
for _, b := range blacklist {
if b == name {
log.Debugw("skip interface, because it is contained in the blacklist", "interface", name, "blacklist", blacklist)
continue links
}
name := l.Name
mac := l.HardwareAddr.String()

if slices.Contains(blacklist, name) {
log.Debugw("skip interface, because it is contained in the blacklist", "interface", name, "blacklist", blacklist)
continue
}
if !strings.HasPrefix(name, "swp") {
log.Debugw("skip interface, because only swp* switch ports are reported to metal-api", "interface", name, "MAC", mac)
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/switcher/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)

func testApplier(t *testing.T, a Applier, expectedFilename string) {
Expand Down
11 changes: 10 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package cmd

import (
"context"
"fmt"
"os"
"strings"

"github.com/kelseyhightower/envconfig"
"github.com/metal-stack/go-lldpd/pkg/lldp"
"github.com/metal-stack/metal-core/cmd/internal/core"

metalgo "github.com/metal-stack/metal-go"
"github.com/metal-stack/v"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -93,7 +96,13 @@ func Run() {
}

go c.ReconfigureSwitch()
c.ConstantlyPhoneHome()
// FIXME create context with cancel
ctx := context.Background()
discoveryResultChan := make(chan lldp.DiscoveryResult)
go c.ConstantlyPhoneHome(ctx, discoveryResultChan)

// detect changes of switch ports, register switch and start/stop lldp discovery if required
go c.DetectInterfaceChanges(ctx, discoveryResultChan)

if strings.ToUpper(cfg.LogLevel) == "DEBUG" {
_ = os.Setenv("DEBUG", "1")
Expand Down
Loading