Skip to content

Commit

Permalink
in buffer batching with perf buffers for NPM
Browse files Browse the repository at this point in the history
  • Loading branch information
brycekahle committed Nov 22, 2024
1 parent c33fab7 commit 2f1f991
Show file tree
Hide file tree
Showing 27 changed files with 853 additions and 524 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@
/pkg/util/funcs/ @DataDog/ebpf-platform
/pkg/util/kernel/ @DataDog/ebpf-platform
/pkg/util/safeelf/ @DataDog/ebpf-platform
/pkg/util/slices/ @DataDog/ebpf-platform
/pkg/util/ktime @DataDog/agent-security
/pkg/util/kubernetes/ @DataDog/container-integrations @DataDog/container-platform @DataDog/container-app
/pkg/util/podman/ @DataDog/container-integrations
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/setup/system_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnv(join(netNS, "max_failed_connections_buffered"))
cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0)
cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500)
cfg.BindEnvAndSetDefault(join(netNS, "closed_buffer_wakeup_count"), 5)
cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000)

cfg.BindEnvAndSetDefault(join(spNS, "disable_dns_inspection"), false, "DD_DISABLE_DNS_INSPECTION")
Expand All @@ -209,6 +210,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnvAndSetDefault(join(spNS, "enable_conntrack_all_namespaces"), true, "DD_SYSTEM_PROBE_ENABLE_CONNTRACK_ALL_NAMESPACES")
cfg.BindEnvAndSetDefault(join(netNS, "enable_protocol_classification"), true, "DD_ENABLE_PROTOCOL_CLASSIFICATION")
cfg.BindEnvAndSetDefault(join(netNS, "enable_ringbuffers"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_RINGBUFFERS")
cfg.BindEnvAndSetDefault(join(netNS, "enable_kernel_batching"), false, "DD_SYSTEM_PROBE_NETWORK_ENABLE_KERNEL_BATCHING")
cfg.BindEnvAndSetDefault(join(netNS, "enable_tcp_failed_connections"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_FAILED_CONNS")
cfg.BindEnvAndSetDefault(join(netNS, "ignore_conntrack_init_failure"), false, "DD_SYSTEM_PROBE_NETWORK_IGNORE_CONNTRACK_INIT_FAILURE")
cfg.BindEnvAndSetDefault(join(netNS, "conntrack_init_timeout"), 10*time.Second)
Expand Down
12 changes: 7 additions & 5 deletions pkg/ebpf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ type Modifier interface {

// InitWithOptions is a wrapper around ebpf-manager.Manager.InitWithOptions
func (m *Manager) InitWithOptions(bytecode io.ReaderAt, opts *manager.Options) error {
// we must load the ELF file before initialization,
// to build the collection specs, because some modifiers
// inspect these to make changes to the eBPF resources.
if err := m.LoadELF(bytecode); err != nil {
return fmt.Errorf("failed to load elf from reader: %w", err)
if bytecode != nil {
// we must load the ELF file before initialization,
// to build the collection specs, because some modifiers
// inspect these to make changes to the eBPF resources.
if err := m.LoadELF(bytecode); err != nil {
return fmt.Errorf("failed to load elf from reader: %w", err)
}
}

for _, mod := range m.EnabledModifiers {
Expand Down
213 changes: 213 additions & 0 deletions pkg/ebpf/perf/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux_bpf

// Package perf implements types related to eBPF and the perf subsystem, like perf buffers and ring buffers.
package perf

import (
"errors"
"fmt"
"slices"

manager "github.com/DataDog/ebpf-manager"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/features"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"

ebpfTelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
)

var perfPool = ddsync.NewDefaultTypedPool[perf.Record]()
var ringbufPool = ddsync.NewDefaultTypedPool[ringbuf.Record]()

// Flushable is an interface for objects that support flushing
type Flushable interface {
Flush()
}

// EventHandler abstracts consuming data from a perf buffer or ring buffer (depending on availability and options).
// It handles upgrading maps from a ring buffer is desired, and unmarshalling into the desired data type.
type EventHandler struct {
f Flushable
opts EventHandlerOptions
}

// EventHandlerOptions are the options controlling the EventHandler.
// MapName and Handler are required options.
type EventHandlerOptions struct {
MapName string
Handler func([]byte)

TelemetryEnabled bool
UseRingBuffer bool
UpgradePerfBuffer bool

PerfOptions PerfBufferOptions
RingBufOptions RingBufferOptions
}

// PerfBufferOptions are options specifically for perf buffers
//
//nolint:revive
type PerfBufferOptions struct {
BufferSize int

// Watermark - The reader will start processing samples once their sizes in the perf ring buffer
// exceed this value. Must be smaller than PerfRingBufferSize. Defaults to the manager value if not set.
Watermark int

// The number of events required in any per CPU buffer before
// Read will process data. This is mutually exclusive with Watermark.
// The default is zero, which means Watermark will take precedence.
WakeupEvents int
}

// RingBufferOptions are options specifically for ring buffers
type RingBufferOptions struct {
BufferSize int
}

// NewEventHandler creates an event handler with the provided options
func NewEventHandler(opts EventHandlerOptions) (*EventHandler, error) {
if opts.MapName == "" {
return nil, errors.New("invalid options: MapName is required")
}
if opts.Handler == nil {
return nil, errors.New("invalid options: Handler is required")
}
e := &EventHandler{
opts: opts,
}
return e, nil
}

// Init must be called after ebpf-manager.Manager.LoadELF but before ebpf-manager.Manager.Init/InitWithOptions()
func (e *EventHandler) Init(mgr *manager.Manager, mgrOpts *manager.Options) error {
ms, _, _ := mgr.GetMapSpec(e.opts.MapName)
if ms == nil {
return fmt.Errorf("unable to find map spec %q", e.opts.MapName)
}

if e.opts.UseRingBuffer && features.HaveMapType(ebpf.RingBuf) == nil {
if e.opts.UpgradePerfBuffer {
if ms.Type != ebpf.PerfEventArray {
return fmt.Errorf("map %q is not a perf buffer, got %q instead", e.opts.MapName, ms.Type.String())
}
UpgradePerfBuffer(mgr, mgrOpts, e.opts.MapName)
} else if ms.Type != ebpf.RingBuf {
return fmt.Errorf("map %q is not a ring buffer, got %q instead", e.opts.MapName, ms.Type.String())
}

if ms.MaxEntries != uint32(e.opts.RingBufOptions.BufferSize) {
ResizeRingBuffer(mgrOpts, e.opts.MapName, e.opts.RingBufOptions.BufferSize)
}
e.initRingBuffer(mgr)
return nil
}

if ms.Type != ebpf.PerfEventArray {
return fmt.Errorf("map %q is not a perf buffer, got %q instead", e.opts.MapName, ms.Type.String())
}
e.initPerfBuffer(mgr)
return nil
}

// MapType returns the ebpf.MapType of the underlying events map
// This is only valid after calling Init.
func (e *EventHandler) MapType() ebpf.MapType {
switch e.f.(type) {
case *manager.PerfMap:
return ebpf.PerfEventArray
case *manager.RingBuffer:
return ebpf.RingBuf
default:
return ebpf.UnspecifiedMap
}
}

// Flush flushes the pending data from the underlying perfbuf/ringbuf
func (e *EventHandler) Flush() {
e.f.Flush()
}

// ResizeRingBuffer resizes the ring buffer by creating/updating a map spec editor
func ResizeRingBuffer(mgrOpts *manager.Options, mapName string, bufferSize int) {
if mgrOpts.MapSpecEditors == nil {
mgrOpts.MapSpecEditors = make(map[string]manager.MapSpecEditor)
}
specEditor := mgrOpts.MapSpecEditors[mapName]
specEditor.MaxEntries = uint32(bufferSize)
specEditor.EditorFlag |= manager.EditMaxEntries
mgrOpts.MapSpecEditors[mapName] = specEditor
}

func (e *EventHandler) initPerfBuffer(mgr *manager.Manager) {
mgr.PerfMaps = slices.DeleteFunc(mgr.PerfMaps, func(perfMap *manager.PerfMap) bool {
return perfMap.Name == e.opts.MapName
})
pm := &manager.PerfMap{
Map: manager.Map{Name: e.opts.MapName},
PerfMapOptions: manager.PerfMapOptions{
PerfRingBufferSize: e.opts.PerfOptions.BufferSize,
Watermark: e.opts.PerfOptions.Watermark,
WakeupEvents: e.opts.PerfOptions.WakeupEvents,
RecordHandler: e.perfRecordHandler,
LostHandler: nil, // TODO do we need support for Lost?
RecordGetter: perfPool.Get,
TelemetryEnabled: e.opts.TelemetryEnabled,
},
}
mgr.PerfMaps = append(mgr.PerfMaps, pm)
ebpfTelemetry.ReportPerfMapTelemetry(pm)
e.f = pm
}

func (e *EventHandler) perfRecordHandler(record *perf.Record, _ *manager.PerfMap, _ *manager.Manager) {
defer perfPool.Put(record)
e.opts.Handler(record.RawSample)
}

func (e *EventHandler) initRingBuffer(mgr *manager.Manager) {
mgr.RingBuffers = slices.DeleteFunc(mgr.RingBuffers, func(ringBuf *manager.RingBuffer) bool {
return ringBuf.Name == e.opts.MapName
})
rb := &manager.RingBuffer{
Map: manager.Map{Name: e.opts.MapName},
RingBufferOptions: manager.RingBufferOptions{
RecordHandler: e.ringRecordHandler,
RecordGetter: ringbufPool.Get,
TelemetryEnabled: e.opts.TelemetryEnabled,
},
}
mgr.RingBuffers = append(mgr.RingBuffers, rb)
ebpfTelemetry.ReportRingBufferTelemetry(rb)
e.f = rb
}

func (e *EventHandler) ringRecordHandler(record *ringbuf.Record, _ *manager.RingBuffer, _ *manager.Manager) {
defer ringbufPool.Put(record)
e.opts.Handler(record.RawSample)
}

// UpgradePerfBuffer upgrades a perf buffer to a ring buffer by creating a map spec editor
func UpgradePerfBuffer(mgr *manager.Manager, mgrOpts *manager.Options, mapName string) {
if mgrOpts.MapSpecEditors == nil {
mgrOpts.MapSpecEditors = make(map[string]manager.MapSpecEditor)
}
specEditor := mgrOpts.MapSpecEditors[mapName]
specEditor.Type = ebpf.RingBuf
specEditor.KeySize = 0
specEditor.ValueSize = 0
specEditor.EditorFlag |= manager.EditType | manager.EditKeyValue
mgrOpts.MapSpecEditors[mapName] = specEditor

mgr.PerfMaps = slices.DeleteFunc(mgr.PerfMaps, func(perfMap *manager.PerfMap) bool {
return perfMap.Name == mapName
})
}
8 changes: 8 additions & 0 deletions pkg/network/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type Config struct {
// ClosedChannelSize specifies the size for closed channel for the tracer
ClosedChannelSize int

// ClosedBufferWakeupCount specifies the number of events that will buffer in a perf buffer before userspace is woken up.
ClosedBufferWakeupCount int

// ExcludedSourceConnections is a map of source connections to blacklist
ExcludedSourceConnections map[string][]string

Expand Down Expand Up @@ -285,6 +288,9 @@ type Config struct {
// EnableUSMEventStream enables USM to use the event stream instead
// of netlink for receiving process events.
EnableUSMEventStream bool

// KernelBatchingEnabled enables the use of custom batching for eBPF perf events with perf buffers
KernelBatchingEnabled bool
}

// New creates a config for the network tracer
Expand Down Expand Up @@ -317,6 +323,7 @@ func New() *Config {
MaxFailedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(netNS, "max_failed_connections_buffered"))),
ClosedConnectionFlushThreshold: cfg.GetInt(sysconfig.FullKeyPath(spNS, "closed_connection_flush_threshold")),
ClosedChannelSize: cfg.GetInt(sysconfig.FullKeyPath(spNS, "closed_channel_size")),
ClosedBufferWakeupCount: cfg.GetInt(sysconfig.FullKeyPath(netNS, "closed_buffer_wakeup_count")),
MaxConnectionsStateBuffered: cfg.GetInt(sysconfig.FullKeyPath(spNS, "max_connection_state_buffered")),
ClientStateExpiry: 2 * time.Minute,

Expand All @@ -331,6 +338,7 @@ func New() *Config {
ProtocolClassificationEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_protocol_classification")),

NPMRingbuffersEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_ringbuffers")),
KernelBatchingEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_kernel_batching")),

EnableHTTPMonitoring: cfg.GetBool(sysconfig.FullKeyPath(smNS, "enable_http_monitoring")),
EnableHTTP2Monitoring: cfg.GetBool(sysconfig.FullKeyPath(smNS, "enable_http2_monitoring")),
Expand Down
4 changes: 3 additions & 1 deletion pkg/network/ebpf/c/tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,9 @@ int BPF_BYPASSABLE_KRETPROBE(kretprobe__tcp_close_clean_protocols) {
bpf_map_delete_elem(&tcp_close_args, &pid_tgid);
}

bpf_tail_call_compat(ctx, &tcp_close_progs, 0);
if (is_batching_enabled()) {
bpf_tail_call_compat(ctx, &tcp_close_progs, 0);
}

return 0;
}
Expand Down
70 changes: 40 additions & 30 deletions pkg/network/ebpf/c/tracer/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ __maybe_unused static __always_inline void submit_closed_conn_event(void *ctx, i
}
}

static __always_inline bool is_batching_enabled() {
__u64 batching_enabled = 0;
LOAD_CONSTANT("batching_enabled", batching_enabled);
return batching_enabled != 0;
}

static __always_inline void cleanup_conn(void *ctx, conn_tuple_t *tup, struct sock *sk, __u16 tcp_failure_reason) {
u32 cpu = bpf_get_smp_processor_id();
// Will hold the full connection data to send through the perf or ring buffer
Expand Down Expand Up @@ -100,44 +106,48 @@ static __always_inline void cleanup_conn(void *ctx, conn_tuple_t *tup, struct so
// if we added another field
conn.conn_stats.duration = bpf_ktime_get_ns() - conn.conn_stats.duration;

// Batch TCP closed connections before generating a perf event
batch_t *batch_ptr = bpf_map_lookup_elem(&conn_close_batch, &cpu);
if (batch_ptr == NULL) {
return;
}
if (is_batching_enabled()) {
// Batch TCP closed connections before generating a perf event
batch_t *batch_ptr = bpf_map_lookup_elem(&conn_close_batch, &cpu);
if (batch_ptr == NULL) {
return;
}

// TODO: Can we turn this into a macro based on TCP_CLOSED_BATCH_SIZE?
switch (batch_ptr->len) {
case 0:
batch_ptr->c0 = conn;
batch_ptr->len++;
return;
case 1:
batch_ptr->c1 = conn;
batch_ptr->len++;
return;
case 2:
batch_ptr->c2 = conn;
batch_ptr->len++;
return;
case 3:
batch_ptr->c3 = conn;
batch_ptr->len++;
// In this case the batch is ready to be flushed, which we defer to kretprobe/tcp_close
// in order to cope with the eBPF stack limitation of 512 bytes.
return;
// TODO: Can we turn this into a macro based on TCP_CLOSED_BATCH_SIZE?
switch (batch_ptr->len) {
case 0:
batch_ptr->c0 = conn;
batch_ptr->len++;
return;
case 1:
batch_ptr->c1 = conn;
batch_ptr->len++;
return;
case 2:
batch_ptr->c2 = conn;
batch_ptr->len++;
return;
case 3:
batch_ptr->c3 = conn;
batch_ptr->len++;
// In this case the batch is ready to be flushed, which we defer to kretprobe/tcp_close
// in order to cope with the eBPF stack limitation of 512 bytes.
return;
}
}

// If we hit this section it means we had one or more interleaved tcp_close calls.
// We send the connection outside of a batch anyway. This is likely not as
// frequent of a case to cause performance issues and avoid cases where
// we drop whole connections, which impacts things USM connection matching.
submit_closed_conn_event(ctx, cpu, &conn, sizeof(conn_t));
if (is_tcp) {
increment_telemetry_count(unbatched_tcp_close);
}
if (is_udp) {
increment_telemetry_count(unbatched_udp_close);
if (is_batching_enabled()) {
if (is_tcp) {
increment_telemetry_count(unbatched_tcp_close);
}
if (is_udp) {
increment_telemetry_count(unbatched_udp_close);
}
}
}

Expand Down
Loading

0 comments on commit 2f1f991

Please sign in to comment.