Skip to content

Commit

Permalink
Fix flow direction (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Sep 5, 2024
1 parent ff86f91 commit 924cd8a
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 28 deletions.
33 changes: 18 additions & 15 deletions e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,21 +1068,24 @@ func (t *testCASTAIServer) assertNetflows(ctx context.Context) error {
t.mu.Lock()
items := t.netflows
t.mu.Unlock()
if len(items) > 0 {
f1 := items[0]
r.NotEmpty(f1.Timestamp)
r.NotEmpty(f1.Namespace)
r.NotEmpty(f1.PodName)
r.NotEmpty(f1.ContainerName)
r.NotEmpty(f1.ProcessName)
r.NotEmpty(f1.Addr)
r.NotEmpty(f1.Destinations)
d1 := f1.Destinations[0]
r.NotEmpty(d1.Addr)
r.NotEmpty(d1.Port)
r.NotEmpty(d1.TxBytes + d1.RxBytes)
r.NotEmpty(d1.TxPackets + d1.RxPackets)
return r.error()
for _, f1 := range items {
for _, d1 := range f1.Destinations {
if d1.TxBytes == 0 || d1.RxBytes == 0 {
continue
}
r.NotEmpty(f1.Timestamp)
r.NotEmpty(f1.Namespace)
r.NotEmpty(f1.PodName)
r.NotEmpty(f1.ContainerName)
r.NotEmpty(f1.ProcessName)
r.NotEmpty(f1.Addr)
r.NotEmpty(f1.Destinations)
r.NotEmpty(d1.Addr)
r.NotEmpty(d1.Port)
r.NotEmpty(d1.TxBytes + d1.RxBytes)
r.NotEmpty(d1.TxPackets + d1.RxPackets)
return r.error()
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ kubectl apply -f ./e2e/dns-generator.yaml
kubectl apply -f ./e2e/magic-write-generator.yaml
kubectl apply -f ./e2e/oom-generator.yaml
kubectl apply -f ./e2e/socks5-generator.yaml
kubectl apply -f ./e2e/iperf.yaml
kubectl apply -f ./e2e/nc-server-client.yaml

echo "Waiting for job to finish"
Expand Down
21 changes: 13 additions & 8 deletions pkg/ebpftracer/c/tracee.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -5482,7 +5482,7 @@ statfunc void update_flow_stats(netflowvalue_t *val, u64 bytes, bool ingress) {
__sync_fetch_and_add(&val->rx_packets, 1);
} else {
__sync_fetch_and_add(&val->tx_bytes, bytes);
__sync_fetch_and_add(&val->tx_bytes, 1);
__sync_fetch_and_add(&val->tx_packets, 1);
}
}

Expand Down Expand Up @@ -5674,7 +5674,7 @@ int cgroup_sock_create(struct bpf_sock *ctx)
//
// SKB eBPF programs
//
statfunc u32 cgroup_skb_generic(struct __sk_buff *ctx)
statfunc u32 cgroup_skb_generic(struct __sk_buff *ctx, bool ingress)
{
switch (ctx->family) {
case PF_INET:
Expand Down Expand Up @@ -5706,10 +5706,10 @@ statfunc u32 cgroup_skb_generic(struct __sk_buff *ctx)
return 1;
}

int zero = 0;
net_event_context_t *neteventctx = bpf_map_lookup_elem(&cgroup_skb_events_scratch_map, &zero);
if (unlikely(neteventctx == NULL))
return 0;
// TODO: We may run into stack limit issue here.
// If that happens change event_context_t to be a pointer since we have it inside ebpf map anyway.
net_event_context_t neteventctx_val = {0};
net_event_context_t *neteventctx = &neteventctx_val;

event_context_t *eventctx = &neteventctx->eventctx;
__builtin_memcpy(&eventctx->task, &netctx->taskctx, sizeof(task_context_t));
Expand All @@ -5721,6 +5721,11 @@ statfunc u32 cgroup_skb_generic(struct __sk_buff *ctx)
eventctx->matched_policies = netctx->matched_policies; // pick matched_policies from net ctx
eventctx->syscall = NO_SYSCALL; // ingress has no orig syscall
neteventctx->md.header_size = 0;
if (ingress) {
eventctx->retval = packet_ingress;
} else {
eventctx->retval = packet_egress;
}

nethdrs hdrs = {0}, *nethdrs = &hdrs;
u32 ret = CGROUP_SKB_HANDLE(proto);
Expand All @@ -5730,13 +5735,13 @@ statfunc u32 cgroup_skb_generic(struct __sk_buff *ctx)
SEC("cgroup_skb/ingress")
int cgroup_skb_ingress(struct __sk_buff *ctx)
{
return cgroup_skb_generic(ctx);
return cgroup_skb_generic(ctx, true);
}

SEC("cgroup_skb/egress")
int cgroup_skb_egress(struct __sk_buff *ctx)
{
return cgroup_skb_generic(ctx);
return cgroup_skb_generic(ctx, false);
}

//
Expand Down
3 changes: 3 additions & 0 deletions pkg/ebpftracer/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (m *module) load(cfg Config) error {

m.loaded.Store(true)

// Should reduce allocated memory, see https://github.com/cilium/ebpf/issues/1063
btf.FlushKernelSpec()

return nil
}

Expand Down
Binary file modified pkg/ebpftracer/tracer_arm64_bpfel.o
Binary file not shown.
8 changes: 4 additions & 4 deletions pkg/ebpftracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestTracer(t *testing.T) {
CgroupClient: &ebpftracer.MockCgroupClient{},
MountNamespacePIDStore: getInitializedMountNamespacePIDStore(procHandle),
HomePIDNS: pidNS,
NetflowSampleSubmitIntervalSeconds: 0,
NetflowSampleSubmitIntervalSeconds: 2,
NetflowGrouping: ebpftracer.NetflowGroupingDropSrcPort,
ProcessTreeCollector: processtree.NewNoop(),
})
Expand All @@ -113,20 +113,20 @@ func TestTracer(t *testing.T) {
policy := &ebpftracer.Policy{
Events: []*ebpftracer.EventPolicy{
//{ID: events.NetPacketSSHBase},
{ID: events.SockSetState},
//{ID: events.SockSetState},
//{ID: events.NetPacketTCPBase},
//{ID: events.NetPacketHTTPBase},
// {ID: events.SchedProcessExec},
//{ID: events.MagicWrite},
// {ID: events.SecuritySocketConnect},
// {ID: events.SocketDup},
{ID: events.NetPacketDNSBase},
//{ID: events.NetPacketDNSBase},
//{ID: events.VfsWrite},
//{ID: events.Write},
//{ID: events.StdioViaSocket},
// {ID: events.TtyOpen},
//{ID: events.TtyWrite},
//{ID: events.NetFlowBase},
{ID: events.NetFlowBase},
},
SignatureEvents: signatureEngine.TargetEvents(),
}
Expand Down
Binary file modified pkg/ebpftracer/tracer_x86_bpfel.o
Binary file not shown.

0 comments on commit 924cd8a

Please sign in to comment.