Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prototype] nftables flowtable support #9458

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions felix/dataplane/linux/endpoint_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ type endpointManager struct {
epMarkMapper rules.EndpointMarkMapper
newMatch func() generictables.MatchCriteria
actions generictables.ActionFactory
maps nftables.MapsDataplane

ifceHandler nftables.InterfaceHandler

// Pending updates, cleared in CompleteDeferredWork as the data is copied to the activeXYZ
// fields.
Expand Down Expand Up @@ -225,6 +228,8 @@ func newEndpointManager(
wlInterfacePrefixes []string,
onWorkloadEndpointStatusUpdate EndpointStatusUpdateCallback,
defaultRPFilter string,
maps nftables.MapsDataplane,
ifces nftables.InterfaceHandler,
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
Expand All @@ -245,6 +250,8 @@ func newEndpointManager(
writeProcSys,
os.Stat,
defaultRPFilter,
maps,
ifces,
bpfEnabled,
bpfEndpointManager,
callbacks,
Expand All @@ -267,6 +274,8 @@ func newEndpointManagerWithShims(
procSysWriter procSysWriter,
osStat func(name string) (os.FileInfo, error),
defaultRPFilter string,
maps nftables.MapsDataplane,
ifces nftables.InterfaceHandler,
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
Expand All @@ -288,6 +297,8 @@ func newEndpointManagerWithShims(
wlIfacesRegexp: wlIfacesRegexp,
kubeIPVSSupportEnabled: kubeIPVSSupportEnabled,
bpfEnabled: bpfEnabled,
maps: maps,
ifceHandler: ifces,
bpfEndpointManager: bpfEndpointManager,
floatingIPsEnabled: floatingIPsEnabled,

Expand Down Expand Up @@ -679,6 +690,12 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
delete(m.activeWlEndpoints, id)
}

if m.bpfEnabled && m.maps != nil {
// Remove nftables maps if running in BPF mode, as dispatch chains are not used.
m.maps.RemoveMap(rules.NftablesFromWorkloadDispatchMap)
m.maps.RemoveMap(rules.NftablesToWorkloadDispatchMap)
}

// Repeat the following loop until the pending update map is empty. Note that it's possible
// for an endpoint deletion to add a further update into the map (for a previously shadowed
// endpoint), so we cannot assume that a single iteration will always be enough.
Expand Down Expand Up @@ -828,6 +845,23 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
}

if !m.bpfEnabled && m.needToCheckDispatchChains {
if m.maps != nil {
// Update dispatch verdict maps if needed.
fromMappings, toMappings := m.ruleRenderer.DispatchMappings(m.activeWlEndpoints)
m.maps.AddOrReplaceMap(nftables.MapMetadata{ID: rules.NftablesFromWorkloadDispatchMap, Type: nftables.MapTypeInterfaceMatch}, fromMappings)
m.maps.AddOrReplaceMap(nftables.MapMetadata{ID: rules.NftablesToWorkloadDispatchMap, Type: nftables.MapTypeInterfaceMatch}, toMappings)

if m.ifceHandler != nil {
// Also update the interface handler to be aware of all local interfaces.
// TODO: these should be detected not hardcoded.
ifces := []string{"ens4", "vxlan.calico"}
for i := range fromMappings {
ifces = append(ifces, i)
}
m.ifceHandler.SetInterfaces(ifces)
}
}

// Rewrite the dispatch chains if they've changed.
newDispatchChains := m.ruleRenderer.WorkloadDispatchChains(m.activeWlEndpoints)
m.updateDispatchChains(m.activeWlDispatchChains, newDispatchChains, m.filterTable)
Expand Down
1 change: 1 addition & 0 deletions felix/dataplane/linux/endpoint_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ func endpointManagerTests(ipVersion uint8) func() {
mockProcSys.write,
mockProcSys.stat,
"1",
nil,
false,
hepListener,
common.NewCallbacks(),
Expand Down
14 changes: 14 additions & 0 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,11 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
}
}

var nftMaps nftables.MapsDataplane
if config.RulesConfig.NFTables {
nftMaps = nftablesV4RootTable.(nftables.MapsDataplane)
}

epManager := newEndpointManager(
rawTableV4,
mangleTableV4,
Expand All @@ -916,6 +921,8 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.RulesConfig.WorkloadIfacePrefixes,
dp.endpointStatusCombiner.OnEndpointStatusUpdate,
string(defaultRPFilter),
nftMaps,
nftablesV4RootTable.(nftables.InterfaceHandler),
config.BPFEnabled,
bpfEndpointManager,
callbacks,
Expand Down Expand Up @@ -1037,6 +1044,11 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.RegisterManager(newRawEgressPolicyManager(rawTableV6, ruleRenderer, 6, ipSetsV6.SetFilter))
}

var nftMapsV6 nftables.MapsDataplane
if config.RulesConfig.NFTables {
nftMapsV6 = nftablesV6RootTable.(nftables.MapsDataplane)
}

dp.RegisterManager(newEndpointManager(
rawTableV6,
mangleTableV6,
Expand All @@ -1049,6 +1061,8 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.RulesConfig.WorkloadIfacePrefixes,
dp.endpointStatusCombiner.OnEndpointStatusUpdate,
"",
nftMapsV6,
nftablesV6RootTable.(nftables.InterfaceHandler),
config.BPFEnabled,
nil,
callbacks,
Expand Down
1 change: 1 addition & 0 deletions felix/generictables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ActionFactory interface {
Masq(toPorts string) Action
SetConnmark(mark, mask uint32) Action
Reject(with RejectWith) Action
FlowOffload(table string) Action
}

type RejectWith string
Expand Down
4 changes: 4 additions & 0 deletions felix/generictables/match_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type MatchCriteria interface {
NotICMPV6Type(t uint8) MatchCriteria
ICMPV6TypeAndCode(t, c uint8) MatchCriteria
NotICMPV6TypeAndCode(t, c uint8) MatchCriteria

// Only supported in nftables.
InInterfaceVMAP(mapname string) MatchCriteria
OutInterfaceVMAP(mapname string) MatchCriteria
}

type AddrType string
Expand Down
4 changes: 4 additions & 0 deletions felix/iptables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (s *actionFactory) SetConnmark(mark, mask uint32) generictables.Action {
}
}

func (s *actionFactory) FlowOffload(ft string) generictables.Action {
return nil
}

type Referrer interface {
ReferencedChain() string
}
Expand Down
10 changes: 10 additions & 0 deletions felix/iptables/match_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ func (m matchCriteria) NotICMPV6TypeAndCode(t, c uint8) generictables.MatchCrite
return append(m, fmt.Sprintf("-m icmp6 ! --icmpv6-type %d/%d", t, c))
}

func (m matchCriteria) InInterfaceVMAP(mapname string) generictables.MatchCriteria {
log.Panic("InInterfaceVMAP not supported in iptables")
return m
}

func (m matchCriteria) OutInterfaceVMAP(mapname string) generictables.MatchCriteria {
log.Panic("OutInterfaceVMAP not supported in iptables")
return m
}

func PortsToMultiport(ports []uint16) string {
portFragments := make([]string, len(ports))
for i, port := range ports {
Expand Down
17 changes: 17 additions & 0 deletions felix/nftables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (s *actionSet) SetConnmark(mark, mask uint32) generictables.Action {
}
}

func (s *actionSet) FlowOffload(ft string) generictables.Action {
return FlowOffloadAction{FlowTable: ft}
}

type Referrer interface {
ReferencedChain() string
}
Expand Down Expand Up @@ -406,3 +410,16 @@ func (c SetConnMarkAction) ToFragment(features *environment.Features) string {
func (c SetConnMarkAction) String() string {
return fmt.Sprintf("SetConnMarkWithMask:%#x/%#x", c.Mark, c.Mask)
}

type FlowOffloadAction struct {
FlowTable string
TypeFlowOffload struct{}
}

func (c FlowOffloadAction) ToFragment(features *environment.Features) string {
return fmt.Sprintf("flow offload @%s", c.FlowTable)
}

func (c FlowOffloadAction) String() string {
return fmt.Sprintf("FlowOffload:%s", c.FlowTable)
}
4 changes: 2 additions & 2 deletions felix/nftables/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ func (s *IPSets) ApplyUpdates() {

for attempt := 0; attempt < 10; attempt++ {
if attempt > 0 {
s.logCxt.Info("Retrying after an ipsets update failure...")
s.logCxt.Info("Retrying after an nftables set update failure...")
}
if s.resyncRequired {
// Compare our in-memory state against the dataplane and queue up
// modifications to fix any inconsistencies.
s.logCxt.Debug("Resyncing ipsets with dataplane.")
s.logCxt.Debug("Resyncing nftables sets with dataplane.")
s.opReporter.RecordOperation(fmt.Sprint("resync-nft-sets-v", s.IPVersionConfig.Family.Version()))

if err := s.tryResync(); err != nil {
Expand Down
Loading