Skip to content

Commit

Permalink
pack TCP failure reason into tcp_stats_t
Browse files Browse the repository at this point in the history
  • Loading branch information
brycekahle committed Nov 5, 2024
1 parent 5917430 commit 7fd542a
Show file tree
Hide file tree
Showing 24 changed files with 76 additions and 449 deletions.
4 changes: 2 additions & 2 deletions pkg/network/ebpf/c/co-re/tracer-fentry.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ int BPF_PROG(tcp_close, struct sock *sk, long timeout) {

bpf_map_delete_elem(&tcp_ongoing_connect_pid, &skp_conn);

cleanup_conn(ctx, &t, sk);
cleanup_conn(ctx, &t, sk, 0);
return 0;
}

Expand Down Expand Up @@ -543,7 +543,7 @@ static __always_inline int handle_udp_destroy_sock(void *ctx, struct sock *sk) {

__u16 lport = 0;
if (valid_tuple) {
cleanup_conn(ctx, &tup, sk);
cleanup_conn(ctx, &tup, sk, 0);
lport = tup.sport;
} else {
// get the port for the current sock
Expand Down
17 changes: 9 additions & 8 deletions pkg/network/ebpf/c/tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ int BPF_BYPASSABLE_KPROBE(kprobe__tcp_done, struct sock *sk) {
return 0;
}

// connection timeouts will have 0 pids as they are cleaned up by an idle process.
// connection timeouts will have 0 pids as they are cleaned up by an idle process.
// resets can also have kernel pids are they are triggered by receiving an RST packet from the server
// get the pid from the ongoing failure map in this case, as it should have been set in connect(). else bail
pid_ts_t *failed_conn_pid = bpf_map_lookup_elem(&tcp_ongoing_connect_pid, &skp_conn);
Expand All @@ -237,9 +237,8 @@ int BPF_BYPASSABLE_KPROBE(kprobe__tcp_done, struct sock *sk) {
// skip EEXIST errors for telemetry since it is an expected error
__u64 timestamp = bpf_ktime_get_ns();
if (bpf_map_update_with_telemetry(conn_close_flushed, &t, &timestamp, BPF_NOEXIST, -EEXIST) == 0) {
cleanup_conn(ctx, &t, sk);
cleanup_conn(ctx, &t, sk, err);
increment_telemetry_count(tcp_done_connection_flush);
flush_tcp_failure(ctx, &t, err);
} else {
bpf_map_delete_elem(&conn_close_flushed, &t);
increment_telemetry_count(double_flush_attempts_done);
Expand Down Expand Up @@ -278,7 +277,7 @@ int BPF_BYPASSABLE_KPROBE(kprobe__tcp_close, struct sock *sk) {
bpf_map_delete_elem(&tcp_ongoing_connect_pid, &skp_conn);

if (!tcp_failed_connections_enabled()) {
cleanup_conn(ctx, &t, sk);
cleanup_conn(ctx, &t, sk, 0);
return 0;
}

Expand All @@ -287,14 +286,16 @@ int BPF_BYPASSABLE_KPROBE(kprobe__tcp_close, struct sock *sk) {
// skip EEXIST errors for telemetry since it is an expected error
__u64 timestamp = bpf_ktime_get_ns();
if (bpf_map_update_with_telemetry(conn_close_flushed, &t, &timestamp, BPF_NOEXIST, -EEXIST) == 0) {
cleanup_conn(ctx, &t, sk);
increment_telemetry_count(tcp_close_connection_flush);
__u16 tcp_failure_reason = 0;
int err = 0;
bpf_probe_read_kernel_with_telemetry(&err, sizeof(err), (&sk->sk_err));
if (err == TCP_CONN_FAILED_RESET || err == TCP_CONN_FAILED_TIMEOUT || err == TCP_CONN_FAILED_REFUSED) {
increment_telemetry_count(tcp_close_target_failures);
flush_tcp_failure(ctx, &t, err);
tcp_failure_reason = err;
}

cleanup_conn(ctx, &t, sk, tcp_failure_reason);
increment_telemetry_count(tcp_close_connection_flush);
} else {
bpf_map_delete_elem(&conn_close_flushed, &t);
increment_telemetry_count(double_flush_attempts_close);
Expand Down Expand Up @@ -1047,7 +1048,7 @@ static __always_inline int handle_udp_destroy_sock(void *ctx, struct sock *skp)

__u16 lport = 0;
if (valid_tuple) {
cleanup_conn(ctx, &tup, skp);
cleanup_conn(ctx, &tup, skp, 0);
lport = tup.sport;
} else {
lport = read_sport(skp);
Expand Down
23 changes: 5 additions & 18 deletions pkg/network/ebpf/c/tracer/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ __maybe_unused static __always_inline void submit_closed_conn_event(void *ctx, i
}
}

static __always_inline void cleanup_conn(void *ctx, conn_tuple_t *tup, struct sock *sk) {
static __always_inline void cleanup_conn(void *ctx, conn_tuple_t *tup, struct sock *sk, __u16 tcp_failure_reason) {
u32 cpu = bpf_get_smp_processor_id();
// Will hold the full connection data to send through the perf or ring buffer
conn_t conn = { .tup = *tup };
Expand All @@ -70,6 +70,10 @@ static __always_inline void cleanup_conn(void *ctx, conn_tuple_t *tup, struct so
conn.tup.pid = tup->pid;

conn.tcp_stats.state_transitions |= (1 << TCP_CLOSE);
conn.tcp_stats.failure_reason = tcp_failure_reason;
if (tcp_failure_reason) {
increment_telemetry_count(tcp_failed_connect);
}
}

cst = bpf_map_lookup_elem(&conn_stats, &(conn.tup));
Expand Down Expand Up @@ -137,23 +141,6 @@ static __always_inline void cleanup_conn(void *ctx, conn_tuple_t *tup, struct so
}
}

// This function is used to flush the conn_failed_t to the perf or ring buffer.
static __always_inline void flush_tcp_failure(void *ctx, conn_tuple_t *tup, int failure_reason) {
conn_failed_t failure = {};
failure.tup = *tup;
failure.failure_reason = failure_reason;

__u64 ringbuffers_enabled = 0;
LOAD_CONSTANT("ringbuffers_enabled", ringbuffers_enabled);
if (ringbuffers_enabled > 0) {
bpf_ringbuf_output(&conn_fail_event, &failure, sizeof(conn_failed_t), 0);
} else {
u32 cpu = bpf_get_smp_processor_id();
bpf_perf_event_output(ctx, &conn_fail_event, cpu, &failure, sizeof(conn_failed_t));
}
increment_telemetry_count(tcp_failed_connect);
}

static __always_inline void flush_conn_close_if_full(void *ctx) {
u32 cpu = bpf_get_smp_processor_id();
batch_t *batch_ptr = bpf_map_lookup_elem(&conn_close_batch, &cpu);
Expand Down
3 changes: 0 additions & 3 deletions pkg/network/ebpf/c/tracer/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ BPF_HASH_MAP(conn_close_flushed, conn_tuple_t, __u64, 16384)
*/
BPF_PERF_EVENT_ARRAY_MAP(conn_close_event, __u32)

/* Will hold TCP failed connections */
BPF_PERF_EVENT_ARRAY_MAP(conn_fail_event, __u32)

/* We use this map as a container for batching closed tcp/udp connections
* The key represents the CPU core. Ideally we should use a BPF_MAP_TYPE_PERCPU_HASH map
* or BPF_MAP_TYPE_PERCPU_ARRAY, but they are not available in
Expand Down
6 changes: 1 addition & 5 deletions pkg/network/ebpf/c/tracer/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct {

// Bit mask containing all TCP state transitions tracked by our tracer
__u16 state_transitions;
__u16 failure_reason;
} tcp_stats_t;

// Full data for a tcp connection
Expand All @@ -80,11 +81,6 @@ typedef struct {
conn_stats_ts_t conn_stats;
} conn_t;

typedef struct {
conn_tuple_t tup;
__u32 failure_reason;
} conn_failed_t;

// Must match the number of conn_t objects embedded in the batch_t struct
#ifndef CONN_CLOSED_BATCH_SIZE
#define CONN_CLOSED_BATCH_SIZE 4
Expand Down
2 changes: 0 additions & 2 deletions pkg/network/ebpf/kprobe_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type ConnTuple C.conn_tuple_t
type TCPStats C.tcp_stats_t
type ConnStats C.conn_stats_ts_t
type Conn C.conn_t
type FailedConn C.conn_failed_t
type SkpConn C.skp_conn_tuple_t
type PidTs C.pid_ts_t
type Batch C.batch_t
Expand Down Expand Up @@ -60,7 +59,6 @@ const TCPFailureConnTimeout = C.TCP_CONN_FAILED_TIMEOUT
const TCPFailureConnRefused = C.TCP_CONN_FAILED_REFUSED

const SizeofConn = C.sizeof_conn_t
const SizeofFailedConn = C.sizeof_conn_failed_t

type ClassificationProgram = uint32

Expand Down
8 changes: 1 addition & 7 deletions pkg/network/ebpf/kprobe_types_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions pkg/network/ebpf/probes/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ const (
ConnCloseFlushed BPFMapName = "conn_close_flushed"
// ConnCloseEventMap is the map storing connection close events
ConnCloseEventMap BPFMapName = "conn_close_event"
// FailedConnEventMap is the map for storing failed connection events
FailedConnEventMap BPFMapName = "conn_fail_event"
// TracerStatusMap is the map storing the status of the tracer
TracerStatusMap BPFMapName = "tracer_status"
// ConntrackStatusMap is the map storing the status of the conntrack
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestSerialization(t *testing.T) {
},
},
},
TCPFailures: map[uint32]uint32{
TCPFailures: map[uint16]uint32{
110: 1,
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/encoding/marshal/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func FormatConnection(builder *model.ConnectionBuilder, conn network.ConnectionS
if len(conn.TCPFailures) > 0 {
builder.AddTcpFailuresByErrCode(func(w *model.Connection_TcpFailuresByErrCodeEntryBuilder) {
for k, v := range conn.TCPFailures {
w.SetKey(k)
w.SetKey(uint32(k))
w.SetValue(v)
}
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ type ConnectionStats struct {
}
DNSStats map[dns.Hostname]map[dns.QueryType]dns.Stats
// TCPFailures stores the number of failures for a POSIX error code
TCPFailures map[uint32]uint32
TCPFailures map[uint16]uint32

ConnectionTuple

Expand Down
2 changes: 1 addition & 1 deletion pkg/network/event_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func FlowToConnStat(cs *ConnectionStats, flow *driver.PerFlowData, enableMonoton

tf := flow.TCPFlow()
if tf != nil {
cs.TCPFailures = make(map[uint32]uint32)
cs.TCPFailures = make(map[uint16]uint32)
cs.Monotonic.Retransmits = uint32(tf.RetransmitCount)
cs.RTT = uint32(tf.SRTT)
cs.RTTVar = uint32(tf.RttVariance)
Expand Down
63 changes: 38 additions & 25 deletions pkg/network/tracer/connection/ebpf_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf"
"github.com/DataDog/datadog-agent/pkg/network/ebpf/probes"
"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/tracer/connection/failure"
"github.com/DataDog/datadog-agent/pkg/network/tracer/connection/fentry"
"github.com/DataDog/datadog-agent/pkg/network/tracer/connection/kprobe"
"github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util"
Expand All @@ -47,6 +46,7 @@ const (
)

var tcpOngoingConnectMapTTL = 30 * time.Minute.Nanoseconds()
var connClosedFlushMapTTL = 10 * time.Millisecond.Nanoseconds()

var EbpfTracerTelemetry = struct {
connections telemetry.Gauge
Expand Down Expand Up @@ -75,6 +75,7 @@ var EbpfTracerTelemetry = struct {
PidCollisions *telemetry.StatCounterWrapper
iterationDups telemetry.Counter
iterationAborts telemetry.Counter
closedConnFlushedCleaned telemetry.Counter

lastTcpFailedConnects *atomic.Int64
LastTcpSentMiscounts *atomic.Int64
Expand Down Expand Up @@ -120,6 +121,7 @@ var EbpfTracerTelemetry = struct {
telemetry.NewStatCounterWrapper(connTracerModuleName, "pid_collisions", []string{}, "Counter measuring number of process collisions"),
telemetry.NewCounter(connTracerModuleName, "iteration_dups", []string{}, "Counter measuring the number of connections iterated more than once"),
telemetry.NewCounter(connTracerModuleName, "iteration_aborts", []string{}, "Counter measuring how many times ebpf iteration of connection map was aborted"),
telemetry.NewCounter("network_tracer__tcp_failure", "closed_conn_flushed_cleaned", []string{}, "Counter measuring the number of conn_close_flushed entries cleaned in userspace"),
atomic.NewInt64(0),
atomic.NewInt64(0),
atomic.NewInt64(0),
Expand Down Expand Up @@ -149,11 +151,10 @@ type ebpfTracer struct {

// tcp_close events
closeConsumer *tcpCloseConsumer
// tcp failure events
failedConnConsumer *failure.TCPFailedConnConsumer

// periodically clean the ongoing connection pid map
ongoingConnectCleaner *ddebpf.MapCleaner[netebpf.SkpConn, netebpf.PidTs]
connCloseFlushCleaner *ddebpf.MapCleaner[netebpf.ConnTuple, int64]

removeTuple *netebpf.ConnTuple

Expand Down Expand Up @@ -255,25 +256,20 @@ func newEbpfTracer(config *config.Config, _ telemetryComponent.Component) (Trace

closeConsumer := newTCPCloseConsumer(connCloseEventHandler, batchMgr)

var failedConnConsumer *failure.TCPFailedConnConsumer
// Failed connections are not supported on prebuilt
if tracerType == TracerTypeKProbePrebuilt {
config.TCPFailedConnectionsEnabled = false
}
if config.FailedConnectionsSupported() {
failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m, config.MaxFailedConnectionsBuffered)
}

tr := &ebpfTracer{
m: m,
config: config,
closeConsumer: closeConsumer,
failedConnConsumer: failedConnConsumer,
removeTuple: &netebpf.ConnTuple{},
closeTracer: closeTracerFn,
ebpfTracerType: tracerType,
exitTelemetry: make(chan struct{}),
ch: newCookieHasher(),
m: m,
config: config,
closeConsumer: closeConsumer,
removeTuple: &netebpf.ConnTuple{},
closeTracer: closeTracerFn,
ebpfTracerType: tracerType,
exitTelemetry: make(chan struct{}),
ch: newCookieHasher(),
}

tr.setupMapCleaner(m)
Expand Down Expand Up @@ -327,7 +323,6 @@ func (t *ebpfTracer) Start(callback func(*network.ConnectionStats)) (err error)
}

t.closeConsumer.Start(callback)
t.failedConnConsumer.Start()
return nil
}

Expand All @@ -348,22 +343,15 @@ func (t *ebpfTracer) FlushPending() {
t.closeConsumer.FlushPending()
}

func (t *ebpfTracer) GetFailedConnections() *failure.FailedConns {
if t.failedConnConsumer == nil {
return nil
}
return t.failedConnConsumer.FailedConns
}

func (t *ebpfTracer) Stop() {
t.stopOnce.Do(func() {
close(t.exitTelemetry)
ddebpf.RemoveNameMappings(t.m)
ebpftelemetry.UnregisterTelemetry(t.m)
_ = t.m.Stop(manager.CleanAll)
t.closeConsumer.Stop()
t.failedConnConsumer.Stop()
t.ongoingConnectCleaner.Stop()
t.connCloseFlushCleaner.Stop()
if t.closeTracer != nil {
t.closeTracer()
}
Expand Down Expand Up @@ -730,6 +718,26 @@ func (t *ebpfTracer) setupMapCleaner(m *manager.Manager) {
})

t.ongoingConnectCleaner = tcpOngoingConnectPidCleaner

if t.config.FailedConnectionsSupported() {
connCloseFlushMap, _, err := m.GetMap(probes.ConnCloseFlushed)
if err != nil {
log.Errorf("error getting %v map: %s", probes.ConnCloseFlushed, err)
}
connCloseFlushCleaner, err := ddebpf.NewMapCleaner[netebpf.ConnTuple, int64](connCloseFlushMap, 1024)
if err != nil {
log.Errorf("error creating map cleaner: %s", err)
return
}
connCloseFlushCleaner.Clean(time.Second*1, nil, nil, func(now int64, _ netebpf.ConnTuple, val int64) bool {
expired := val > 0 && now-val > connClosedFlushMapTTL
if expired {
EbpfTracerTelemetry.closedConnFlushedCleaned.Inc()
}
return expired
})
t.connCloseFlushCleaner = connCloseFlushCleaner
}
}

func populateConnStats(stats *network.ConnectionStats, t *netebpf.ConnTuple, s *netebpf.ConnStats, ch *cookieHasher) {
Expand Down Expand Up @@ -802,5 +810,10 @@ func updateTCPStats(conn *network.ConnectionStats, tcpStats *netebpf.TCPStats) {
conn.Monotonic.TCPClosed = tcpStats.State_transitions >> netebpf.Close & 1
conn.RTT = tcpStats.Rtt
conn.RTTVar = tcpStats.Rtt_var
if tcpStats.Failure_reason > 0 {
conn.TCPFailures = map[uint16]uint32{
tcpStats.Failure_reason: 1,
}
}
}
}
Loading

0 comments on commit 7fd542a

Please sign in to comment.