diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c b/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c index 91bbeaf1bb..c020ba0561 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c @@ -93,36 +93,36 @@ FN_KPROBE(tcp_close) __u64 pid_tgid = bpf_get_current_pid_tgid(); struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx); - conn_uni_id_t uni_id = {0}; - get_conn_uni_id(sk, pid_tgid, &uni_id); - del_conn_uni_id(sk); - __u32 index = get_sock_buf_index(sk); - del_sock_buf_index(sk); if (sk == NULL) { return 0; } - netwrk_data_t *dst = get_netwrk_data_percpu(); - if (dst != NULL) + net_data_t *dst = get_net_data_percpu(); + if (dst == NULL) { - if (read_connection_info(sk, &dst->meta.conn, pid_tgid, CONN_L4_TCP) == 0) - { - dst->meta.index = index; - dst->meta.func_id = P_SYSCALL_CLOSE; - dst->meta.tid_utid = pid_tgid << 32; - __u64 *goid = bpf_map_lookup_elem(&bmap_tid2goid, &pid_tgid); - if (goid != NULL) - { - dst->meta.tid_utid |= *goid; - } - - __builtin_memcpy(&dst->meta.uni_id, &uni_id, sizeof(conn_uni_id_t)); - __u64 cpu = bpf_get_smp_processor_id(); - bpf_perf_event_output(ctx, &mp_upload_netwrk_data, cpu, dst, sizeof(netwrk_data_t)); - } + return 0; + } + + __u8 found = 0; + found = get_sk_inf(sk, &dst->meta.sk_inf, 0); + if (found == 0) + { + return 0; } + del_sk_inf(sk); + + dst->meta.func_id = P_SYSCALL_CLOSE; + dst->meta.tid_utid = pid_tgid << 32; + __u64 *goid = bpf_map_lookup_elem(&bmap_tid2goid, &pid_tgid); + if (goid != NULL) + { + dst->meta.tid_utid |= *goid; + } + + try_upload_net_events(ctx, dst); + clean_protocol_filter(pid_tgid, sk); return 0; @@ -163,6 +163,26 @@ FN_UPROBE(SSL_shutdown) return 0; } +FN_KPROBE(sched_getaffinity) +{ + __u64 cpu = bpf_get_smp_processor_id(); + __s32 index = 0; + network_events_t *events = bpf_map_lookup_elem(&mp_network_events, &index); + if (events == NULL) + { + return 0; + } + + if (events->pos.num > 0) + { + bpf_perf_event_output(ctx, &mp_upload_netwrk_events, cpu, events, sizeof(network_events_t)); + events->pos.len = 0; + events->pos.num = 0; + } + + return 0; +} + char _license[] SEC("license") = "GPL"; // this number will be interpreted by eBPF(Cilium) elf-loader // to set the current running kernel version diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h b/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h index d37a22d89c..34d649cc34 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h @@ -14,13 +14,17 @@ BPF_HASH_MAP(mp_syscall_rw_arg, __u64, syscall_rw_arg_t, 1024) BPF_HASH_MAP(mp_syscall_rw_v_arg, __u64, syscall_rw_v_arg_t, 1024) -BPF_HASH_MAP(mp_sock_buf_index, void *, __u32, 65535) -BPF_HASH_MAP(mp_conn_uni_id, void *, conn_uni_id_t, 65535) +BPF_HASH_MAP(mp_sk_inf, void *, sk_inf_t, 65535) -BPF_PERCPU_MAP(mp_netwrk_data_pool, netwrk_data_t) -BPF_PERF_EVENT_MAP(mp_upload_netwrk_data) +BPF_PERCPU_ARRAY(mp_uni_id_per_cpu, id_generator_t) -BPF_HASH_MAP(bpfmap_ssl_read_args, __u64, ssl_read_args_t, 1024); +BPF_PERCPU_ARRAY(mp_network_data, net_data_t) + +BPF_PERCPU_ARRAY(mp_network_events, network_events_t) + +BPF_PERF_EVENT_MAP(mp_upload_netwrk_events) + +BPF_HASH_MAP(bpfmap_ssl_read_args, __u64, ssl_read_args_t, 1024) BPF_HASH_MAP(bpfmap_bio_new_socket_args, __u64, __u32, 1024) // k: pid_tgid v: sockfd diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h index 9b23bf9dc7..aedc101df5 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h @@ -6,12 +6,14 @@ enum { -#define L7_BUFFER_LEFT_SHIFT 12 - L7_BUFFER_SIZE = (1 << L7_BUFFER_LEFT_SHIFT), // 2^10 +#define L7_BUFFER_LEFT_SHIFT 11 + + L7_BUFFER_SIZE = (1 << (L7_BUFFER_LEFT_SHIFT)), // 2^10 #define L7_BUFFER_SIZE L7_BUFFER_SIZE -#define IOVEC_LEFT_SHIFT 11 - BUF_IOVEC_LEN = (1 << IOVEC_LEFT_SHIFT), +#define IOVEC_LEFT_SHIFT 10 + + BUF_IOVEC_LEN = (1 << (IOVEC_LEFT_SHIFT)), #define BUF_IOVEC_LEN BUF_IOVEC_LEN }; @@ -58,12 +60,22 @@ typedef struct pidtid } pidtid_t; // 由于数据乱序上传,我们需要使用一个唯一值标示连接 -typedef struct conn_uni_id +// cpu id | ktime | id(auto increment) +typedef struct id_generator { - __u64 sk; - __u32 ktime; - __u32 prandom; -} conn_uni_id_t; + __u8 init; + __u8 _pad; + __u16 cpu_id; + __u32 id; + __u64 ktime; +} id_generator_t; + +typedef struct sk_inf +{ + id_generator_t uni_id; + __u64 index; + conn_inf_t conn; +} sk_inf_t; typedef struct netdata_meta { @@ -72,18 +84,15 @@ typedef struct netdata_meta __u64 tid_utid; __u8 comm[KERNEL_TASK_COMM_LEN]; - conn_uni_id_t uni_id; + sk_inf_t sk_inf; - struct connection_info conn; __u32 tcp_seq; __u16 _pad0; __u16 func_id; - __s32 fd; - __s32 buf_len; - __s32 act_size; - __u32 index; + __s32 original_size; + __s32 capture_size; } netdata_meta_t; // TODO: 考虑暂存此对象减少上报次数 @@ -91,7 +100,72 @@ typedef struct netwrk_data { netdata_meta_t meta; __u8 payload[L7_BUFFER_SIZE]; -} netwrk_data_t; +} net_data_t; + +typedef struct event_rec +{ + __u32 num; + __u32 len; +} event_rec_t; + +enum +{ + L7_EVENT_SIZE = (L7_BUFFER_SIZE * 2 - sizeof(event_rec_t)), +#define L7_EVENT_SIZE L7_EVENT_SIZE +}; + +typedef struct network_events +{ + event_rec_t pos; + __u8 payload[L7_EVENT_SIZE]; +} network_events_t; + +typedef enum +{ + BUF_DIV8 = L7_BUFFER_SIZE / 8, +#define BUF_DIV8 BUF_DIV8 + + BUF_DIV4 = L7_BUFFER_SIZE / 4, +#define BUF_DIV4 BUF_DIV4 + + BUF_DIV2 = L7_BUFFER_SIZE / 2, +#define BUF_DIV2 BUF_DIV2 + + BUF_DIV1 = L7_BUFFER_SIZE, +#define BUF_DIV1 BUF_DIV1 +} buf_div_t; + +typedef struct net_event_comm +{ + __u32 idx; + __u32 len; + + netdata_meta_t meta; +} net_event_comm_t; + +typedef struct +{ + net_event_comm_t event_comm; + __u8 payload[BUF_DIV8]; +} net_event_div8_t; + +typedef struct +{ + net_event_comm_t event_comm; + __u8 payload[BUF_DIV4]; +} net_event_div4_t; + +typedef struct +{ + net_event_comm_t event_comm; + __u8 payload[BUF_DIV2]; +} net_event_div2_t; + +typedef struct +{ + net_event_comm_t event_comm; + __u8 payload[BUF_DIV1]; +} net_event_div1_t; typedef struct ssl_read_args { diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h index 35454399f9..6e9b138c64 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h @@ -44,95 +44,100 @@ enum HTTP_METHOD_TRACE }; -static __always_inline void fill_conn_uni_id(conn_uni_id_t *id, void *sk, __u64 k_time) +static __always_inline void get_uni_id(id_generator_t *dst) { - id->ktime = (__u32)k_time; - id->sk = (__u64)sk; - id->prandom = bpf_get_prandom_u32(); -} - -static __always_inline void get_conn_uni_id(void *sk, __u64 pid_tgid, conn_uni_id_t *dst) -{ - conn_uni_id_t *id = NULL; - id = bpf_map_lookup_elem(&mp_conn_uni_id, &sk); - if (id == NULL) - { - __u64 k_time = bpf_ktime_get_ns(); - fill_conn_uni_id(dst, sk, k_time); - bpf_map_update_elem(&mp_conn_uni_id, &sk, dst, BPF_NOEXIST); - id = bpf_map_lookup_elem(&mp_conn_uni_id, &sk); - if (id != NULL) - { - __builtin_memcpy(dst, id, sizeof(conn_uni_id_t)); - } + __u32 index = 0; + id_generator_t *val = NULL; + val = (id_generator_t *)bpf_map_lookup_elem(&mp_uni_id_per_cpu, &index); + if (val == NULL) + { + return; } - else + + // initialize + if (val->init == 0) { - __builtin_memcpy(dst, id, sizeof(conn_uni_id_t)); + __u16 cpu_id = (__u16)bpf_get_smp_processor_id(); + val->cpu_id = cpu_id; + val->init = 1; } -} -static __always_inline void del_conn_uni_id(void *sk) -{ - bpf_map_delete_elem(&mp_conn_uni_id, &sk); + __u64 ktime = bpf_ktime_get_ns(); + val->ktime = ktime; + + val->id++; + + __builtin_memcpy(dst, val, sizeof(id_generator_t)); } -static __always_inline __u32 -get_sock_buf_index(void *sk) +static __always_inline __u8 get_sk_inf(void *sk, sk_inf_t *dst, __u8 force) { - __u32 i = 0; - __u32 *idx = bpf_map_lookup_elem(&mp_sock_buf_index, &sk); - if (idx == NULL) + sk_inf_t *inf = NULL; + + inf = (sk_inf_t *)bpf_map_lookup_elem(&mp_sk_inf, &sk); + if (inf == NULL) { - bpf_map_update_elem(&mp_sock_buf_index, &sk, &i, BPF_NOEXIST); - __u32 *idx = bpf_map_lookup_elem(&mp_sock_buf_index, &sk); - if (idx != NULL) + if (force != 0) { - i = *idx; + sk_inf_t i = {0}; + i.index = 1; + get_uni_id(&i.uni_id); + __u64 pid_tgid = bpf_get_current_pid_tgid(); + + if (read_connection_info(sk, &i.conn, pid_tgid, CONN_L4_TCP) != 0) + { + return 0; + } + + // May fail due to exceeding the upper limit of the number of elements + bpf_map_update_elem(&mp_sk_inf, &sk, &i, BPF_NOEXIST); + + inf = (sk_inf_t *)bpf_map_lookup_elem(&mp_sk_inf, &sk); } } - else + + if (inf != NULL) { - i = *idx; + __builtin_memcpy(dst, inf, sizeof(sk_inf_t)); + __sync_fetch_and_add(&inf->index, 1); + return 1; } - i += 1; - bpf_map_update_elem(&mp_sock_buf_index, &sk, &i, BPF_ANY); - return i; + + return 0; } -static __always_inline void del_sock_buf_index(void *sk) +static __always_inline void del_sk_inf(void *sk) { - bpf_map_delete_elem(&mp_sock_buf_index, &sk); + bpf_map_delete_elem(&mp_sk_inf, &sk); } // args: syscall_rw_arg_t, syscall_rw_v_arg_t; dst: netwrk_data_t -#define read_net_meta(args, pid_tgid, dst) \ - do \ - { \ - __u64 ts = bpf_ktime_get_ns(); \ - if (!conn_info_from_skt(args->skt, &dst->meta.conn, pid_tgid)) \ - { \ - goto cleanup; \ - } \ - \ - struct sock *sk = NULL; \ - enum sock_type sktype = 0; \ - get_sock_from_skt(args->skt, &sk, &sktype); \ - get_conn_uni_id(sk, pid_tgid, &dst->meta.uni_id); \ - dst->meta.index = get_sock_buf_index(sk); \ - dst->meta.ts = args->ts; \ - dst->meta.ts_tail = ts; \ - dst->meta.tid_utid = pid_tgid << 32; \ - dst->meta.tcp_seq = args->tcp_seq; \ - dst->meta.func_id = fn; \ - dst->meta.fd = args->fd; \ - dst->meta.act_size = ctx->ret; \ - \ - __u64 *goid = bpf_map_lookup_elem(&bmap_tid2goid, &pid_tgid); \ - if (goid != NULL) \ - { \ - dst->meta.tid_utid |= *goid; \ - } \ +#define read_net_meta(args, pid_tgid, dst) \ + do \ + { \ + __u64 ts = bpf_ktime_get_ns(); \ + struct sock *sk = NULL; \ + if (!get_sk_with_typ(args->skt, &sk, SOCK_STREAM)) \ + { \ + goto cleanup; \ + }; \ + __u8 found = get_sk_inf(sk, &dst->meta.sk_inf, 1); \ + if (found == 0) \ + { \ + goto cleanup; \ + } \ + dst->meta.ts = args->ts; \ + dst->meta.ts_tail = ts; \ + dst->meta.tid_utid = pid_tgid << 32; \ + dst->meta.tcp_seq = args->tcp_seq; \ + dst->meta.func_id = fn; \ + dst->meta.original_size = ctx->ret; \ + \ + __u64 *goid = bpf_map_lookup_elem(&bmap_tid2goid, &pid_tgid); \ + if (goid != NULL) \ + { \ + dst->meta.tid_utid |= *goid; \ + } \ } while (0) static __always_inline bool proto_filter(__u64 pid_tgid, void *sock_ptr) @@ -216,7 +221,7 @@ struct buf_iovec __u8 data[BUF_IOVEC_LEN]; }; -static __always_inline void read_network_data_from_vec(netwrk_data_t *dst, struct iovec *vec, +static __always_inline void read_network_data_from_vec(net_data_t *dst, struct iovec *vec, __u64 vlen, __s64 len_or_errno) { if (len_or_errno <= 0) @@ -263,10 +268,10 @@ static __always_inline void read_network_data_from_vec(netwrk_data_t *dst, struc } } - dst->meta.buf_len = offset; + dst->meta.capture_size = offset; } -static __always_inline void read_netwrk_data(netwrk_data_t *dst, __u8 *buf, __s64 len_or_errno) +static __always_inline void read_netwrk_data(net_data_t *dst, __u8 *buf, __s64 len_or_errno) { if (len_or_errno <= 0) { @@ -282,7 +287,7 @@ static __always_inline void read_netwrk_data(netwrk_data_t *dst, __u8 *buf, __s6 len_or_errno = len_or_errno & (sizeof(dst->payload) - 1); } bpf_probe_read(&dst->payload, len_or_errno, buf); - dst->meta.buf_len = len_or_errno; + dst->meta.capture_size = len_or_errno; } static __always_inline struct socket *get_socket_from_fd( @@ -336,10 +341,7 @@ static __always_inline struct socket *get_socket_from_fd( struct socket *skt = NULL; offset = load_offset_file_private_data(); - bpf_probe_read( - &skt, sizeof(skt), - (__u8 *)skfile + - offset); // bpf_probe_read(&skt, sizeof(skt), &skfile->private_data); + bpf_probe_read(&skt, sizeof(skt), (__u8 *)skfile + offset); // bpf_probe_read(&skt, sizeof(skt), &skfile->private_data); if (skt == NULL) { return NULL; @@ -359,9 +361,9 @@ static __always_inline struct socket *get_socket_from_fd( return skt; } -static __always_inline int get_sock_from_skt(struct socket *skt, - struct sock **sk, - enum sock_type *sktype) +static __always_inline int get_sk(struct socket *skt, + struct sock **sk, + enum sock_type *sktype) { __u64 offset_socket_sk = load_offset_socket_sk(); @@ -385,27 +387,16 @@ static __always_inline void init_ssl_sockfd(void *ssl_ctx, __u32 fd) bpf_map_update_elem(&bpfmap_ssl_ctx_sockfd, &ssl_ctx, &fd, BPF_ANY); } -static __always_inline bool conn_info_from_skt( - struct socket *skt, struct connection_info *conn, __u64 pid_tgid) +static __always_inline bool get_sk_with_typ(struct socket *skt, struct sock **sk_ptr, enum sock_type sk_type) { - struct sock *sk = NULL; - enum sock_type sktype = 0; + enum sock_type typ = 0; - if (get_sock_from_skt(skt, &sk, &sktype) != 0) + if (get_sk(skt, sk_ptr, &typ) != 0 || typ != sk_type) { return false; } - // tcp only - switch (sktype) - { - case SOCK_STREAM: - break; - default: - return false; - } - - if (read_connection_info(sk, conn, pid_tgid, CONN_L4_TCP) != 0) + if (*sk_ptr == NULL) { return false; } @@ -413,10 +404,10 @@ static __always_inline bool conn_info_from_skt( return true; } -static __always_inline netwrk_data_t *get_netwrk_data_percpu() +static __always_inline net_data_t *get_net_data_percpu() { __s32 index = 0; - netwrk_data_t *data = bpf_map_lookup_elem(&mp_netwrk_data_pool, &index); + net_data_t *data = bpf_map_lookup_elem(&mp_network_data, &index); if (data == NULL) { return NULL; @@ -427,6 +418,115 @@ static __always_inline netwrk_data_t *get_netwrk_data_percpu() return data; } +static __always_inline int buf_copy_thr_bprobe(void *dst, const int max_size_base2, int size, void *src) +{ + if (size <= 0) + { + return 0; + } + if (size >= max_size_base2) + { + size = max_size_base2; + } + else + { + size &= (max_size_base2 - 1); + } + + bpf_probe_read(dst, size, src); + return size; +} + +#define write_net_event(ctx, cpu, data, typ) \ + do \ + { \ + __s32 index = 0; \ + network_events_t *events = bpf_map_lookup_elem(&mp_network_events, &index); \ + if (events == NULL) \ + { \ + return; \ + } \ + \ + if (sizeof(typ) > L7_EVENT_SIZE - events->pos.len) \ + { \ + bpf_perf_event_output(ctx, &mp_upload_netwrk_events, cpu, events, sizeof(network_events_t)); \ + events->pos.len = 0; \ + events->pos.num = 0; \ + } \ + \ + if (sizeof(typ) + events->pos.len <= L7_EVENT_SIZE) \ + { \ + typ *elem = (typ *)((__u8 *)(events->payload) + events->pos.len); \ + \ + events->pos.num += 1; \ + elem->event_comm.idx = events->pos.num; \ + \ + bpf_probe_read(&elem->event_comm.meta, sizeof(data->meta), &data->meta); \ + events->pos.len += sizeof(elem->event_comm); \ + \ + int capture_size = data->meta.capture_size; \ + if (capture_size < 0) \ + { \ + capture_size = 0; \ + } \ + if (capture_size > 0) \ + { \ + capture_size = buf_copy_thr_bprobe(&elem->payload, sizeof(elem->payload), capture_size, &data->payload); \ + } \ + \ + events->pos.len += capture_size; \ + elem->event_comm.len = capture_size; \ + } \ + } while (0); + +static __always_inline void try_upload_net_events(void *ctx, net_data_t *data) +{ + __u64 cpu = bpf_get_smp_processor_id(); + + s32 capture_size = 0; + capture_size = data->meta.capture_size; + + if (capture_size <= BUF_DIV8) + { + write_net_event(ctx, cpu, data, net_event_div8_t); + } + else if (capture_size <= BUF_DIV4) + { + write_net_event(ctx, cpu, data, net_event_div4_t); + } + else if (capture_size <= BUF_DIV2) + { + write_net_event(ctx, cpu, data, net_event_div2_t); + } + else if (capture_size <= BUF_DIV1) + { + write_net_event(ctx, cpu, data, net_event_div1_t); + } + else + { +#ifdef __DKE_DEBUG_RW_V__ + bpf_printk("act_size %d\n", capture_size); +#endif + // something wrong + } +} + +static __always_inline void flush_net_events(void *ctx) +{ + __s32 index = 0; + network_events_t *events = bpf_map_lookup_elem(&mp_network_events, &index); + if (events == NULL || events->pos.num == 0) + { + return; + } + + __u64 cpu = bpf_get_smp_processor_id(); + + bpf_perf_event_output(ctx, &mp_network_events, cpu, events, sizeof(network_events_t)); + events->pos.len = 0; + events->pos.num = 0; +} + static __always_inline bool put_rw_args(tp_syscall_rw_args_t *ctx, void *bpf_map, enum MSG_RW rw) { if ((ctx == NULL) || ctx->fd < 3) @@ -451,9 +551,8 @@ static __always_inline bool put_rw_args(tp_syscall_rw_args_t *ctx, void *bpf_map }; struct sock *sk = NULL; - enum sock_type sktype = 0; - if (get_sock_from_skt(skt, &sk, &sktype) != 0 || sktype != SOCK_STREAM) + if (!get_sk_with_typ(skt, &sk, SOCK_STREAM)) { return false; } @@ -520,9 +619,8 @@ static __always_inline bool put_rw_v_args(tp_syscall_rw_v_args_t *ctx, void *bpf }; struct sock *sk = NULL; - enum sock_type sktype = 0; - if (get_sock_from_skt(skt, &sk, &sktype) != 0 || sktype != SOCK_STREAM) + if (!get_sk_with_typ(skt, &sk, SOCK_STREAM)) { return false; } @@ -580,7 +678,7 @@ static __always_inline void read_rw_data(tp_syscall_exit_args_t *ctx, void *bpf_ goto cleanup; } - netwrk_data_t *dst = get_netwrk_data_percpu(); + net_data_t *dst = get_net_data_percpu(); if (dst == NULL) { goto cleanup; @@ -590,12 +688,11 @@ static __always_inline void read_rw_data(tp_syscall_exit_args_t *ctx, void *bpf_ read_netwrk_data(dst, rw_args->buf, ctx->ret); - __u64 cpu = bpf_get_smp_processor_id(); - bpf_perf_event_output(ctx, &mp_upload_netwrk_data, cpu, dst, sizeof(netwrk_data_t)); + try_upload_net_events(ctx, dst); #ifdef __DKE_DEBUG_RW__ - bpf_printk("act len: %d %d\n", dst->meta.act_size, ctx->ret); - bpf_printk("fn: %d, len %d, data: %s\n", fn, dst->meta.buf_len, dst->payload); + bpf_printk("cap len: %d %d\n", dst->meta.capture_size, ctx->ret); + bpf_printk("fn: %d, len %d, data: %s\n", fn, dst->meta.original_size, dst->payload); #endif cleanup: @@ -623,21 +720,21 @@ static __always_inline void read_rw_v_data(tp_syscall_exit_args_t *ctx, void *bp goto cleanup; } - netwrk_data_t *dst = get_netwrk_data_percpu(); + net_data_t *dst = get_net_data_percpu(); if (dst == NULL) { goto cleanup; } read_net_meta(rwv_args, pid_tgid, dst); + read_network_data_from_vec(dst, rwv_args->vec, vlen, ctx->ret); - __u64 cpu = bpf_get_smp_processor_id(); - bpf_perf_event_output(ctx, &mp_upload_netwrk_data, cpu, dst, sizeof(netwrk_data_t)); + try_upload_net_events(ctx, dst); #ifdef __DKE_DEBUG_RW_V__ - bpf_printk("act len: %d %d\n", dst->meta.act_size, ctx->ret); - bpf_printk("fn: %d, len %d, data: %s\n", fn, dst->meta.buf_len, dst->payload); + bpf_printk("cap len: %d %d\n", dst->meta.capture_size, ctx->ret); + bpf_printk("fn: %d, len %d, data: %s\n", fn, dst->meta.original_size, dst->payload); #endif cleanup: diff --git a/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h b/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h index 14b3af710c..2d141051a3 100644 --- a/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h +++ b/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h @@ -102,7 +102,7 @@ struct bpf_map_def char namespace[BUF_SIZE_MAP_NS]; }; -#define BPF_PERCPU_MAP(map_name, value_type) \ +#define BPF_PERCPU_ARRAY(map_name, value_type) \ struct bpf_map_def SEC("maps/" #map_name) map_name = { \ .type = BPF_MAP_TYPE_PERCPU_ARRAY, \ .key_size = sizeof(__u32), \ diff --git a/internal/plugins/externals/ebpf/internal/c/netflow/conn_stats.h b/internal/plugins/externals/ebpf/internal/c/netflow/conn_stats.h index 576dfa2a6e..1c45ab0f9e 100644 --- a/internal/plugins/externals/ebpf/internal/c/netflow/conn_stats.h +++ b/internal/plugins/externals/ebpf/internal/c/netflow/conn_stats.h @@ -49,7 +49,7 @@ enum ConnLayerP }; // key of bpf map conn_stats -struct connection_info +typedef struct connection_info { __be32 saddr[4]; // src ip address; Use the last element to store the IPv4 address __be32 daddr[4]; // dst ip address @@ -58,7 +58,7 @@ struct connection_info __u32 pid; __u32 netns; // network namespace __u32 meta; // first byte: 0x0000|IPv4 or 0x0001|IPv6; second byte 0x0000|TCP or 0x0100|UDP; ... -}; +} conn_inf_t; struct connection_stats { diff --git a/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go b/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go index 8942fe600c..150d5e803b 100644 --- a/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go +++ b/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go @@ -14,7 +14,7 @@ func TestDKE(t *testing.T) { PprofPort: "6267", Service: "ebpf", - Enabled: []string{"bpf-netlog"}, + Enabled: []string{"ebpf-net", "ebpf-trace"}, EBPFNet: FlagNet{ L7NetEnabled: []string{"httpflow"}, @@ -23,7 +23,7 @@ func TestDKE(t *testing.T) { BPFNetLog: FlagBPFNetLog{ EnableLog: true, EnableMetric: true, - L7LogProtocols: []string{"http"}, + L7LogProtocols: []string{}, }, EBPFTrace: FlagTrace{ diff --git a/internal/plugins/externals/ebpf/internal/l4log/nic_monitor.go b/internal/plugins/externals/ebpf/internal/l4log/nic_monitor.go index 279177e882..7e292b230c 100644 --- a/internal/plugins/externals/ebpf/internal/l4log/nic_monitor.go +++ b/internal/plugins/externals/ebpf/internal/l4log/nic_monitor.go @@ -334,7 +334,7 @@ func ListContainersAndHostNetNS(ctrLi []cruntime.ContainerRuntime, allowLo bool, for _, c := range ctrs { nsH, err := netns.GetFromPid(c.Pid) if err != nil { - log.Error("get netns from pid: %w", err) + log.Errorf("get netns from pid: %w", err) continue } nsHStr := NSInode(nsH) diff --git a/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go b/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go index 7959e36169..395321fc71 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go @@ -46,6 +46,7 @@ const ( DirectionIncoming = "incoming" DirectionUnknown = "unknown" + FieldPid = "pid" FieldUserThread = "tid_usr" FieldKernelThread = "tid" FieldKernelTime = "ktime" @@ -191,30 +192,25 @@ func FnInOut(fn FnID) NICDirection { } } -type ConnUniID struct { - SkPtr uint64 `json:"sk_ptr"` - Ktime uint32 `json:"ktime"` - Rand uint32 `json:"rand"` -} type NetwrkData struct { - Conn ConnectionInfo `json:"conn"` - ConnUniID ConnUniID `json:"conn_uni_id"` - ActSize int `json:"act_size"` - TCPSeq uint32 `json:"tcp_seq"` - Thread [2]int32 `json:"thread"` - TS uint64 `json:"ts"` - TSTail uint64 `json:"ts_tail"` - Index uint32 `json:"index"` - Fn FnID `json:"fn_id"` - Payload []byte `json:"payload"` + Conn ConnectionInfo `json:"conn"` + CaptureSize int `json:"act_size"` + FnCallSize int `json:"fn_call_size"` + TCPSeq uint32 `json:"tcp_seq"` + Thread [2]int32 `json:"thread"` + TS uint64 `json:"ts"` + TSTail uint64 `json:"ts_tail"` + Index uint64 `json:"index"` + Fn FnID `json:"fn_id"` + Payload []byte `json:"payload"` } func (d NetwrkData) String() string { str := fmt.Sprintf("\tconn %s\n", d.Conn.String()) - str += fmt.Sprintf("\nptr: %x, thread %d, user thread %d, idx %d\n", - d.ConnUniID, d.Thread[0], d.Thread[1], d.Index) + str += fmt.Sprintf("\nthread %d, user thread %d, idx %d\n", + d.Thread[0], d.Thread[1], d.Index) str += fmt.Sprintf("\tfn %s, size %d, tcp seq: %d\n", d.Fn.String(), - d.ActSize, d.TCPSeq) + d.CaptureSize, d.TCPSeq) ts := d.TS tsNano := ts % uint64(time.Second) @@ -259,14 +255,14 @@ type ThreadTrace struct { sync.RWMutex // only for incoming requests - Threads map[int32]*ThrEntry + Threads map[uint64]*ThrEntry lastTS uint64 delCount int } -func (thrTr *ThreadTrace) Insert(d Direcion, thrID2 [2]int32, ts0_1 uint64) (id int64) { +func (thrTr *ThreadTrace) Insert(d Direcion, pid int32, thrID2 [2]int32, ts0_1 uint64) (id int64) { switch d { //nolint:exhaustive case DIn: default: @@ -280,6 +276,8 @@ func (thrTr *ThreadTrace) Insert(d Direcion, thrID2 [2]int32, ts0_1 uint64) (id thrID = thrID2[0] } + ptid := uint64(pid)<<32 | uint64(thrID) + thrTr.Lock() defer thrTr.Unlock() @@ -288,7 +286,7 @@ func (thrTr *ThreadTrace) Insert(d Direcion, thrID2 [2]int32, ts0_1 uint64) (id } if thrTr.Threads == nil { - thrTr.Threads = make(map[int32]*ThrEntry) + thrTr.Threads = make(map[uint64]*ThrEntry) } id = randInnerID() @@ -297,10 +295,10 @@ func (thrTr *ThreadTrace) Insert(d Direcion, thrID2 [2]int32, ts0_1 uint64) (id innerID: id, } - if tailTr, ok := thrTr.Threads[thrID]; ok { + if tailTr, ok := thrTr.Threads[ptid]; ok { if ts0_1 >= tailTr.ts { insertEntry.prv = tailTr - thrTr.Threads[thrID] = insertEntry + thrTr.Threads[ptid] = insertEntry return } @@ -314,12 +312,12 @@ func (thrTr *ThreadTrace) Insert(d Direcion, thrID2 [2]int32, ts0_1 uint64) (id return } } else { - thrTr.Threads[thrID] = insertEntry + thrTr.Threads[ptid] = insertEntry } return id } -func (thrTr *ThreadTrace) GetInnerID(thrID2 [2]int32, ts uint64) int64 { +func (thrTr *ThreadTrace) GetInnerID(pid int32, thrID2 [2]int32, ts uint64) int64 { thrTr.RLock() defer thrTr.RUnlock() @@ -329,8 +327,9 @@ func (thrTr *ThreadTrace) GetInnerID(thrID2 [2]int32, ts uint64) int64 { } else { thrID = thrID2[0] } + ptid := uint64(pid)<<32 | uint64(thrID) - if tailTr, ok := thrTr.Threads[thrID]; ok { + if tailTr, ok := thrTr.Threads[ptid]; ok { if ts >= tailTr.ts { return tailTr.innerID } @@ -350,7 +349,7 @@ func (thrTr *ThreadTrace) Cleanup() { defer thrTr.Unlock() lastTS := thrTr.lastTS - var del []int32 + var del []uint64 for k, v := range thrTr.Threads { if v == nil { continue @@ -374,7 +373,7 @@ func (thrTr *ThreadTrace) Cleanup() { thrTr.delCount += len(del) if thrTr.delCount > 1e3 && thrTr.delCount >= len(thrTr.Threads) { - mp := make(map[int32]*ThrEntry) + mp := make(map[uint64]*ThrEntry) for k, v := range thrTr.Threads { mp[k] = v } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go b/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go index 9b234d3703..3c156c877b 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go @@ -20,7 +20,6 @@ import ( "github.com/GuanceCloud/cliutils/logger" "github.com/GuanceCloud/cliutils/point" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/perf" dkebpf "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/c" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/exporter" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/k8sinfo" @@ -72,7 +71,9 @@ var ( type ( CLayer7Http C.struct_layer7_http CHTTPReqFinished C.struct_http_req_finished - CL7Buffer C.struct_netwrk_data + CNetEventComm C.struct_net_event_comm + CNetEvents C.struct_network_events + CUniID C.struct_id_generator ConnectionInfoC dknetflow.ConnectionInfoC @@ -104,8 +105,8 @@ type ( } ) -func readMeta(buf *CL7Buffer, dst *comm.ConnectionInfo) { - conn := buf.meta.conn +func readMeta(buf *CNetEventComm, dst *comm.ConnectionInfo) { + conn := buf.meta.sk_inf.conn // TODO: record thread name // @@ -116,6 +117,7 @@ func readMeta(buf *CL7Buffer, dst *comm.ConnectionInfo) { cmdCpy = bytes.TrimSpace(cmdCpy) taskComm := string(cmdCpy) + // 暂时屏蔽 uds,其 ip port 为 0; ebpf 暂时不采集此类 socket dst.Saddr = (*(*[4]uint32)(unsafe.Pointer(&conn.saddr))) //nolint:gosec dst.Daddr = (*(*[4]uint32)(unsafe.Pointer(&conn.daddr))) //nolint:gosec dst.Sport = uint32(conn.sport) @@ -166,7 +168,7 @@ var ( } ) -type perferEventHandle func(record *perf.Record, perfmap *manager.PerfMap, +type perferEventHandle func(CPU int, data []byte, perfmap *manager.PerfMap, manager *manager.Manager) func NewHTTPFlowManger(constEditor []manager.ConstantEditor, bmaps map[string]*ebpf.Map, @@ -241,6 +243,12 @@ func NewHTTPFlowManger(constEditor []manager.ConstantEditor, bmaps map[string]*e UID: "tcp_close_apiflow", }, }, + { + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: "kprobe__sched_getaffinity", + UID: "kprobe_sched_getaffinity_apiflow", + }, + }, { ProbeIdentificationPair: manager.ProbeIdentificationPair{ EBPFFuncName: "tracepoint__sys_enter_sendfile64", @@ -255,12 +263,12 @@ func NewHTTPFlowManger(constEditor []manager.ConstantEditor, bmaps map[string]*e PerfMaps: []*manager.PerfMap{ { Map: manager.Map{ - Name: "mp_upload_netwrk_data", + Name: "mp_upload_netwrk_events", }, PerfMapOptions: manager.PerfMapOptions{ - // pagesize ~= 4k, - PerfRingBufferSize: 8 * 1024 * os.Getpagesize(), - RecordHandler: bufHandler, + // 1k * pagesize ~= 1k * 4k, + PerfRingBufferSize: 1024 * os.Getpagesize(), + DataHandler: bufHandler, LostHandler: func(CPU int, count uint64, perfMap *manager.PerfMap, manager *manager.Manager) { log.Warnf("lost %d events on cpu %d\n", count, CPU) }, @@ -419,6 +427,8 @@ func (tracer *APIFlowTracer) Run(ctx context.Context, constEditor []manager.Cons return err } + newKpFlushTrigger(ctx) + if err := bpfManger.Start(); err != nil { log.Error(err) return err diff --git a/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go b/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go index 21807772ef..4dc641f939 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go @@ -5,14 +5,13 @@ package l7flow import ( "context" - "math" "strconv" + "sync" "time" "unsafe" manager "github.com/DataDog/ebpf-manager" "github.com/GuanceCloud/cliutils/point" - "github.com/cilium/ebpf/perf" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/k8sinfo" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/comm" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/protodec" @@ -21,15 +20,15 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/pkg/spanid" ) -type ProcNetworkTrace struct { - Pid int - - // 对于该进程, ingress 请求抵达时,后续的网络请求都应该继承此生成的 innter trace id +type NetTrace struct { + // 对于每个进程, ingress 请求抵达时,后续的网络请求都应该继承此生成的 innter trace id ESpanLinkDuration time.Duration // (kernel) sock ptr and random id ==> network flow pipe - ConnMap map[comm.ConnUniID]*FlowPipe - delConnMapCount int + ConnMap map[CUniID]*FlowPipe + ConnMapClosed map[CUniID]*FlowPipe + + ConnAndClosedDelCount [2]int threadInnerID comm.ThreadTrace @@ -39,44 +38,73 @@ type ProcNetworkTrace struct { allowESPan bool } -func (netTrace *ProcNetworkTrace) StreamHandle(data *comm.NetwrkData, +func (netTrace *NetTrace) StreamHandle(tn int64, uniID CUniID, data *comm.NetwrkData, aggPool map[protodec.L7Protocol]protodec.AggPool, allowTrace bool, protoLi map[protodec.L7Protocol]struct{}, ) { if netTrace.ConnMap == nil { - netTrace.ConnMap = make(map[comm.ConnUniID]*FlowPipe) + netTrace.ConnMap = make(map[CUniID]*FlowPipe) } - ptr := data.ConnUniID + var pipe *FlowPipe + var inClosedMap bool + + // check in closed map first + if p, ok := netTrace.ConnMapClosed[uniID]; ok { + pipe = p + inClosedMap = true + } - pipe, ok := netTrace.ConnMap[ptr] - if !ok { - pipe = &FlowPipe{ - Conn: data.Conn, - sort: netdata{prvDataPos: 0}, + if pipe == nil { + var ok bool + pipe, ok = netTrace.ConnMap[uniID] + if !ok { + pipe = &FlowPipe{ + Conn: data.Conn, + sort: dataQueue{prvDataPos: 0}, + } + netTrace.ConnMap[uniID] = pipe } - netTrace.ConnMap[ptr] = pipe } - dataLi := pipe.sort.Queue(data) - defer func(datli []*comm.NetwrkData) { - for _, v := range dataLi { - putNetwrkData(v) + pipe.lastTime = tn + + // check conn close + if data.Fn == comm.FnSysClose { + pipe.connClosed = true + + delete(netTrace.ConnMap, uniID) + netTrace.ConnMapClosed[uniID] = pipe + inClosedMap = true + } + + var dataLi []*comm.NetwrkData + if pipe.detecTimes < 64 || pipe.Decoder != nil { + dataLi = pipe.sort.Queue(data) + } else { + pipe.sort.li = nil + } + + defer func(li []*comm.NetwrkData) { + for _, d := range li { + putNetwrkData(d) } }(dataLi) var connClose bool for _, d := range dataLi { + if d.Fn == comm.FnSysClose { + connClose = true + continue + } + txRx := comm.FnInOut(d.Fn) if txRx == comm.NICDUnknown { - if d.Fn == comm.FnSysClose { - connClose = true - } continue } if pipe.Proto == protodec.ProtoUnknown { - if proto, dec, ok := protodec.ProtoDetector(d.Payload, d.ActSize); ok { + if proto, dec, ok := protodec.ProtoDetector(d.Payload, d.CaptureSize); ok { pipe.Proto = proto if _, ok := protoLi[pipe.Proto]; !ok && pipe.Proto != protodec.ProtoHTTP { continue @@ -93,7 +121,7 @@ func (netTrace *ProcNetworkTrace) StreamHandle(data *comm.NetwrkData, } } - if pipe.Decoder != nil && d.ActSize > 0 { + if pipe.Decoder != nil && d.CaptureSize > 0 { pipe.Decoder.Decode(txRx, d, time.Now().UnixNano(), &netTrace.threadInnerID) } } @@ -103,15 +131,12 @@ func (netTrace *ProcNetworkTrace) StreamHandle(data *comm.NetwrkData, if pipe.Decoder != nil { pipe.Decoder.ConnClose() } - delete(netTrace.ConnMap, ptr) - netTrace.delConnMapCount++ - if netTrace.delConnMapCount >= 1e3 { - mp := make(map[comm.ConnUniID]*FlowPipe) - for k, v := range netTrace.ConnMap { - mp[k] = v - } - netTrace.ConnMap = mp - netTrace.delConnMapCount = 0 + if inClosedMap { + netTrace.ConnAndClosedDelCount[1]++ + delete(netTrace.ConnMapClosed, uniID) + } else { + netTrace.ConnAndClosedDelCount[0]++ + delete(netTrace.ConnMap, uniID) } } @@ -139,15 +164,16 @@ type FlowPipe struct { Proto protodec.L7Protocol detecTimes int - sort netdata + sort dataQueue + + lastTime int64 connClosed bool } -type PidWatcher struct { - pidMap map[int]*ProcNetworkTrace - ch chan *comm.NetwrkData - aggPool map[protodec.L7Protocol]protodec.AggPool +type ConnWatcher struct { + netracer *NetTrace + aggPool map[protodec.L7Protocol]protodec.AggPool procFilter *tracing.ProcessFilter @@ -158,66 +184,100 @@ type PidWatcher struct { enableTrace bool enabledProto map[protodec.L7Protocol]struct{} + + sync.Mutex } -func (p *PidWatcher) start(ctx context.Context) { - ticker := time.NewTicker(time.Second * 5) +func (watcher *ConnWatcher) handle(tn int64, uniID CUniID, netdata *comm.NetwrkData) { + watcher.Lock() + defer watcher.Unlock() + + pid := int(netdata.Conn.Pid) + + nettracer := watcher.netracer + if nettracer == nil { + return + } + + if watcher.enableTrace && watcher.procFilter != nil { + nettracer.allowESPan = false + if v, ok := watcher.procFilter.GetProcInfo(pid); ok { + nettracer.allowESPan = v.AllowTrace + netdata.Conn.ProcessName = v.Name + netdata.Conn.ServiceName = v.Service + } + } + + nettracer.StreamHandle(tn, uniID, netdata, watcher.aggPool, watcher.enableTrace, watcher.enabledProto) +} + +func (watcher *ConnWatcher) start(ctx context.Context) { + ticker := time.NewTicker(time.Second * 10) tickerClean := time.NewTicker(time.Minute * 5) + tickerCheck := time.NewTicker(time.Minute * 2) for { select { - case netdata := <-p.ch: - pid := int(netdata.Conn.Pid) - var nettracer *ProcNetworkTrace - if v, ok := p.pidMap[pid]; !ok { - if p.pidMap == nil { - p.pidMap = make(map[int]*ProcNetworkTrace) + case <-tickerCheck.C: + watcher.Lock() + groupTime := time.Now().UnixNano() + for uniID, pipe := range watcher.netracer.ConnMap { + if groupTime-pipe.lastTime > int64(time.Minute)*3 { + delete(watcher.netracer.ConnMap, uniID) + watcher.netracer.ConnAndClosedDelCount[0]++ + } + } + + for uniID, pipe := range watcher.netracer.ConnMapClosed { + if groupTime-pipe.lastTime > int64(time.Minute) { + delete(watcher.netracer.ConnMapClosed, uniID) + watcher.netracer.ConnAndClosedDelCount[1]++ } - nettracer = &ProcNetworkTrace{ - Pid: pid, - ConnMap: make(map[comm.ConnUniID]*FlowPipe), + } + + if watcher.netracer.ConnAndClosedDelCount[0] > 160_000 { + watcher.netracer.ConnAndClosedDelCount[0] = 0 + connMap := make(map[CUniID]*FlowPipe) + for k, v := range watcher.netracer.ConnMap { + connMap[k] = v } - p.pidMap[pid] = nettracer - } else { - nettracer = v } - if p.enableTrace && p.procFilter != nil { - nettracer.allowESPan = false - if v, ok := p.procFilter.GetProcInfo(pid); ok { - nettracer.allowESPan = v.AllowTrace - netdata.Conn.ProcessName = v.Name - netdata.Conn.ServiceName = v.Service + + if watcher.netracer.ConnAndClosedDelCount[1] > 160_000 { + watcher.netracer.ConnAndClosedDelCount[1] = 0 + connMap := make(map[CUniID]*FlowPipe) + for k, v := range watcher.netracer.ConnMapClosed { + connMap[k] = v } } - nettracer.StreamHandle(netdata, p.aggPool, p.enableTrace, p.enabledProto) + watcher.Unlock() case <-tickerClean.C: - for _, v := range p.pidMap { - v.threadInnerID.Cleanup() + watcher.Lock() + if tracer := watcher.netracer; tracer != nil { + tracer.threadInnerID.Cleanup() } + watcher.Unlock() case <-ticker.C: - for _, v := range p.pidMap { - for _, pt := range v.ptsPrv { - setInnerID(pt, &v.threadInnerID) + watcher.Lock() + if tracer := watcher.netracer; tracer != nil { + for _, pt := range tracer.ptsPrv { + setInnerID(pt, &tracer.threadInnerID) } - if err := feed(p.tracePostURL, v.ptsPrv, false); err != nil { + if err := feed(watcher.tracePostURL, tracer.ptsPrv, false); err != nil { log.Error(err) } // feed("http://0.0.0.0:9529/v1/write/logging?input=ebpf-net%2Fespan", v.ptsPrv, false) - v.ptsPrv = v.ptsCur - v.ptsCur = nil + tracer.ptsPrv = tracer.ptsCur + tracer.ptsCur = nil } - + watcher.Unlock() case <-ctx.Done(): return } } } -func (p *PidWatcher) Handle(v *comm.NetwrkData) { - p.ch <- v -} - func setInnerID(pt *point.Point, threadInnerID *comm.ThreadTrace) { d := pt.Get(spanid.Direction).(string) if d != comm.DirectionOutgoing { @@ -241,14 +301,22 @@ func setInnerID(pt *point.Point, threadInnerID *comm.ThreadTrace) { ktime = uint64(v) } } - id := threadInnerID.GetInnerID(tid, ktime) + var pid int32 + if v := pt.Get(comm.FieldPid); v != nil { + if v, ok := v.(int64); ok { + pid = int32(v) + } + } + id := threadInnerID.GetInnerID(pid, tid, ktime) pt.Add(spanid.ThrTraceID, id) } -func newPidMap(ctx context.Context, cfg *pidMapConfig) *PidWatcher { - p := &PidWatcher{ - pidMap: make(map[int]*ProcNetworkTrace), - ch: make(chan *comm.NetwrkData, 32), +func newConnWatcher(ctx context.Context, cfg *connWatcherConfig) *ConnWatcher { + p := &ConnWatcher{ + netracer: &NetTrace{ + ConnMap: make(map[CUniID]*FlowPipe), + ConnMapClosed: make(map[CUniID]*FlowPipe), + }, aggPool: cfg.aggPool, procFilter: cfg.procFilter, tags: cfg.tags, @@ -263,7 +331,7 @@ func newPidMap(ctx context.Context, cfg *pidMapConfig) *PidWatcher { } type Tracer struct { - pidMap [5]*PidWatcher + connWatcher *ConnWatcher aggPool map[protodec.L7Protocol]protodec.AggPool @@ -295,57 +363,57 @@ func (tracer *Tracer) Start(ctx context.Context, interval time.Duration) { } } -func (tracer *Tracer) PerfEventHandle(record *perf.Record, +func (tracer *Tracer) PerfEventHandle(cpu int, data []byte, perfmap *manager.PerfMap, manager *manager.Manager, ) { - bufferC := (*CL7Buffer)(unsafe.Pointer(&record.RawSample[0])) //nolint:gosec + events := (*CNetEvents)(unsafe.Pointer(&data[0])) //nolint:gosec - actLen := int(bufferC.meta.act_size) - bufLen := int(bufferC.meta.buf_len) + eventsNum := int(events.pos.num) + // eventsLen := int(events.pos.len) - if bufLen > 0 { - if bufLen > PayloadBufSize { - bufLen = PayloadBufSize - } - if actLen > 0 && actLen > bufLen { - actLen = bufLen - } - } + hdrSize := unsafe.Sizeof(CNetEventComm{}) // nolint:gosec - // 需要使 actLen 不会在后续发生改变,否则内存池会出问题 - netdata := getNetwrkData(actLen) + pos := int(unsafe.Sizeof(events.pos)) // nolint:gosec - readMeta(bufferC, &netdata.Conn) + groupTime := time.Now().UnixNano() - if bufLen > 0 && actLen > 0 { - b := (*[PayloadBufSize]byte)(unsafe.Pointer(&bufferC.payload)) //nolint:gosec - netdata.Payload = append(netdata.Payload, b[:actLen]...) - } + for i := 0; i < eventsNum; i++ { + commHdr := *(*CNetEventComm)(unsafe.Pointer(&data[pos])) //nolint:gosec + pos += int(hdrSize) - pid := int(netdata.Conn.Pid) - if pid == selfPid { - return - } + netdata := getNetwrkData(int(commHdr.len)) - netdata.ConnUniID.Ktime = uint32(bufferC.meta.uni_id.ktime) - netdata.ConnUniID.SkPtr = uint64(bufferC.meta.uni_id.sk) - netdata.ConnUniID.Rand = uint32(bufferC.meta.uni_id.prandom) + readMeta(&commHdr, &netdata.Conn) + + if int(commHdr.len) > 0 { + v := unsafe.Slice((*byte)(unsafe.Pointer(&data[pos])), int(commHdr.len)) //nolint:gosec + netdata.Payload = append(netdata.Payload, v...) + + pos += int(commHdr.len) + } + + pid := int(netdata.Conn.Pid) + if pid == selfPid { + return + } - netdata.Fn = comm.FnID(bufferC.meta.func_id) + netdata.Fn = comm.FnID(commHdr.meta.func_id) - netdata.ActSize = actLen - netdata.TCPSeq = uint32(bufferC.meta.tcp_seq) - netdata.Thread = [2]int32{int32(bufferC.meta.tid_utid >> 32), (int32(bufferC.meta.tid_utid))} - netdata.TS = uint64(bufferC.meta.ts) - netdata.TSTail = uint64(bufferC.meta.ts_tail) - netdata.Index = uint32(bufferC.meta.index) + netdata.CaptureSize = int(commHdr.len) + netdata.FnCallSize = int(commHdr.meta.original_size) + netdata.TCPSeq = uint32(commHdr.meta.tcp_seq) + netdata.Thread = [2]int32{int32(commHdr.meta.tid_utid >> 32), (int32(commHdr.meta.tid_utid))} + netdata.TS = uint64(commHdr.meta.ts) + netdata.TSTail = uint64(commHdr.meta.ts_tail) + netdata.Index = uint64(commHdr.meta.sk_inf.index) - // log.Info(netdata.String()) + // log.Info(netdata.String()) - tracer.pidMap[pid%5].Handle(netdata) + tracer.connWatcher.handle(groupTime, CUniID(commHdr.meta.sk_inf.uni_id), netdata) + } } -type pidMapConfig struct { +type connWatcherConfig struct { apiTracerConfig aggPool map[protodec.L7Protocol]protodec.AggPool } @@ -355,18 +423,13 @@ func newTracer(ctx context.Context, cfg *apiTracerConfig) *Tracer { return nil } - mps := [5]*PidWatcher{} - aggP := protodec.NewProtoAggregators() - for i := 0; i < 5; i++ { - mps[i] = newPidMap(ctx, &pidMapConfig{ - apiTracerConfig: *cfg, - aggPool: aggP, - }) - } return &Tracer{ - pidMap: mps, + connWatcher: newConnWatcher(ctx, &connWatcherConfig{ + apiTracerConfig: *cfg, + aggPool: aggP, + }), aggPool: aggP, tags: cfg.tags, k8sInfo: cfg.k8sNetInfo, @@ -376,73 +439,6 @@ func newTracer(ctx context.Context, cfg *apiTracerConfig) *Tracer { } } -type netdata struct { - li []*comm.NetwrkData - // 从 1 开始索引,如果值为 0,视为发生翻转 - prvDataPos uint32 -} - -func (n *netdata) Queue(data *comm.NetwrkData) []*comm.NetwrkData { - var val []*comm.NetwrkData - if data == nil { - return val - } - - lenQ := len(n.li) - switch lenQ { - case 0: - if n.prvDataPos+1 == data.Index { - n.prvDataPos = data.Index - return []*comm.NetwrkData{data} - } else { - n.li = append(n.li, data) - } - default: - for i := 0; i < lenQ; i++ { - if idxLess(data.Index, n.li[i].Index) { - n.li = append(n.li, nil) - copy(n.li[i+1:], n.li[i:]) - n.li[i] = data - break - } - if i+1 == lenQ { - n.li = append(n.li, data) - break - } - } - } - - // try flush cache - i := 0 - for ; i < len(n.li); i++ { - cur := n.li[i].Index - if cur == n.prvDataPos+1 { - val = append(val, n.li[i]) - n.prvDataPos = cur - } else { - break - } - } - - // clean cache - if i >= len(n.li) { - n.li = n.li[:0] - } else if i > 0 { - copy(n.li, n.li[i:]) - n.li = n.li[:len(n.li)-i] - } - - // 可能存在数据丢失情况 - if len(n.li) >= 1024 && len(val) == 0 { - x := 128 - val = append(val, n.li[:x]...) - n.li = n.li[x:] - n.prvDataPos = val[x-1].Index - } - - return val -} - func genPts(data []*protodec.ProtoData, conn *comm.ConnectionInfo) []*point.Point { var pts []*point.Point for _, v := range data { @@ -506,7 +502,7 @@ func genPts(data []*protodec.ProtoData, conn *comm.ConnectionInfo) []*point.Poin } else { v.KVs = v.KVs.Add("service", conn.ServiceName, false, true) } - v.KVs = v.KVs.Add("pid", strconv.Itoa(int(conn.Pid)), false, true) + v.KVs = v.KVs.Add(comm.FieldPid, strconv.Itoa(int(conn.Pid)), false, true) // conn info isV6 := !netflow.ConnAddrIsIPv4(conn.Meta) @@ -530,12 +526,3 @@ func genPts(data []*protodec.ProtoData, conn *comm.ConnectionInfo) []*point.Poin } return pts } - -func idxLess(l, r uint32) bool { - // 可能发生回绕现象,预留窗口应与 buffer 长度相近 - if l > math.MaxUint32-1025 && r <= 1025 { - return true - } - - return l < r -} diff --git a/internal/plugins/externals/ebpf/internal/l7flow/net_tracer_test.go b/internal/plugins/externals/ebpf/internal/l7flow/net_tracer_test.go index 1bab3ff28f..21e0bba237 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/net_tracer_test.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/net_tracer_test.go @@ -6,23 +6,15 @@ package l7flow import ( "math" "testing" - "unsafe" "github.com/stretchr/testify/assert" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/comm" ) -func TestSizeof(t *testing.T) { - assert.Equal(t, 4224, int(unsafe.Sizeof(CL7Buffer{}))) - - // 此处不匹配,则会造成从 pool 取不到对象,导致 panic - assert.Equal(t, 4096, int(unsafe.Sizeof(CL7Buffer{}.payload))) -} - func TestSort(t *testing.T) { - fn := func(cases []uint32, expected []uint32, start ...uint32) { - rst := []uint32{} - netdata := netdata{} + fn := func(cases []uint64, expected []uint64, start ...uint64) { + rst := []uint64{} + netdata := dataQueue{} if len(start) > 0 { netdata.prvDataPos = start[0] } @@ -36,57 +28,59 @@ func TestSort(t *testing.T) { } t.Run("c1", func(t *testing.T) { - li := []uint32{1, 4, 3, 5, 2} - fn(li, []uint32{1, 2, 3, 4, 5}) + li := []uint64{1, 4, 3, 5, 2} + fn(li, []uint64{1, 2, 3, 4, 5}) }) t.Run("c2", func(t *testing.T) { - li := []uint32{5, 4, 3, 2, 1} - fn(li, []uint32{1, 2, 3, 4, 5}) + li := []uint64{5, 4, 3, 2, 1} + fn(li, []uint64{1, 2, 3, 4, 5}) }) t.Run("c3", func(t *testing.T) { - li := []uint32{1, 2, 3, 4, 5} - fn(li, []uint32{1, 2, 3, 4, 5}) + li := []uint64{1, 2, 3, 4, 5} + fn(li, []uint64{1, 2, 3, 4, 5}) }) t.Run("c4", func(t *testing.T) { - li := []uint32{1, 2} - fn(li, []uint32{1, 2}) + li := []uint64{1, 2} + fn(li, []uint64{1, 2}) }) t.Run("c5", func(t *testing.T) { - li := []uint32{2, 1} - fn(li, []uint32{1, 2}) + li := []uint64{2, 1} + fn(li, []uint64{1, 2}) }) t.Run("c5", func(t *testing.T) { - li := []uint32{2, 4, 3, 1} - fn(li, []uint32{1, 2, 3, 4}) + li := []uint64{2, 4, 3, 1} + fn(li, []uint64{1, 2, 3, 4}) }) t.Run("c6", func(t *testing.T) { - li := []uint32{1, 6, 2, 3, 4, 5, 7, 9, 10, 8, 11, 14, 13, 12} - fn(li, []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}) + li := []uint64{1, 6, 2, 3, 4, 5, 7, 9, 10, 8, 11, 14, 13, 12} + fn(li, []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}) }) t.Run("c7", func(t *testing.T) { - var li []uint32 - for i := 2; i < 2+1024; i++ { - li = append(li, uint32(i)) + var li []uint64 + var rst []uint64 + for i := 2; i < 2+queueWindow; i++ { + rst = append(rst, uint64(i)) + li = append(li, uint64(i)) } - fn(li, li[:128]) + fn(li, rst) }) t.Run("c8", func(t *testing.T) { - var li, rst []uint32 - startPos := uint32(math.MaxUint32 - 100) - for i := startPos + 1; i < math.MaxUint32; i++ { + var li, rst []uint64 + startPos := uint64(math.MaxUint64 - 100) + for i := startPos + 1; i < math.MaxUint64; i++ { li = append(li, i) rst = append(rst, i) } - li = append(li, 1, math.MaxUint32, 2, 0, 3) - rst = append(rst, math.MaxUint32, 0, 1, 2, 3) + li = append(li, 1, math.MaxUint64, 2, 0, 3) + rst = append(rst, math.MaxUint64, 0, 1, 2, 3) fn(li, rst, startPos) }) } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/pool.go b/internal/plugins/externals/ebpf/internal/l7flow/pool.go index f9fb9f00d2..cda54ebaa0 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/pool.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/pool.go @@ -88,16 +88,14 @@ func putNetwrkData(data *comm.NetwrkData) { func resetNetwrkData(data *comm.NetwrkData) *comm.NetwrkData { data.Conn = comm.ConnectionInfo{} - data.ConnUniID = comm.ConnUniID{} - - data.ActSize = 0 + data.FnCallSize = 0 + data.CaptureSize = 0 data.TCPSeq = 0 - data.Thread[0] = 0 - data.Thread[1] = 0 + data.Thread = [2]int32{} data.TS = 0 - data.Fn = 0 data.TSTail = 0 data.Index = 0 + data.Fn = 0 data.Payload = data.Payload[:0] return data diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/amqp.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/amqp.go index fa2f28e39b..55e6091f8e 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/amqp.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/amqp.go @@ -96,7 +96,7 @@ func (dec *amqpDecPipe) decode(txRx comm.NICDirection, data *comm.NetwrkData, inf.readyToExport = true inf.ts = ts - inf.reqBytes += data.ActSize + inf.reqBytes += data.FnCallSize if dec.direction == comm.DUnknown { switch txRx { //nolint:exhaustive @@ -196,7 +196,8 @@ func (dec *amqpDecPipe) decode(txRx comm.NICDirection, data *comm.NetwrkData, inf.ts = ts if dec.direction == comm.DIn { - inf.meta.InnerID = thrTr.Insert(dec.direction, data.Thread, data.TSTail) + inf.meta.InnerID = thrTr.Insert(dec.direction, int32(data.Conn.Pid), + data.Thread, data.TSTail) } inf.meta.ReqTCPSeq = data.TCPSeq @@ -232,16 +233,16 @@ func (dec *amqpDecPipe) decode(txRx comm.NICDirection, data *comm.NetwrkData, case comm.DIn: switch txRx { //nolint:exhaustive case comm.NICDIngress: - dec.inf.reqBytes += data.ActSize + dec.inf.reqBytes += data.FnCallSize case comm.NICDEgress: - dec.inf.respBytes += data.ActSize + dec.inf.respBytes += data.FnCallSize } case comm.DOut: switch txRx { //nolint:exhaustive case comm.NICDIngress: - dec.inf.respBytes += data.ActSize + dec.inf.respBytes += data.FnCallSize case comm.NICDEgress: - dec.inf.reqBytes += data.ActSize + dec.inf.reqBytes += data.FnCallSize } } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/datadump_test.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/datadump_test.go index 82ec709e61..658040d43c 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/datadump_test.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/datadump_test.go @@ -14,7 +14,7 @@ import ( ) type multiStream struct { - uniid comm.ConnUniID + conn comm.ConnectionInfo netdata []comm.NetwrkData } @@ -35,7 +35,7 @@ func TestJsonDump(t *testing.T) { json.Unmarshal(d[i], &n) var find bool for _, v := range cases { - if v.uniid == n.ConnUniID { + if v.conn == n.Conn { v.netdata = append(v.netdata, n) find = true break @@ -44,7 +44,7 @@ func TestJsonDump(t *testing.T) { if !find { cases = append(cases, &multiStream{ netdata: []comm.NetwrkData{n}, - uniid: n.ConnUniID, + conn: n.Conn, }) } } @@ -55,7 +55,7 @@ func TestJsonDump(t *testing.T) { t.Run(strconv.FormatInt(int64(i), 10), func(t *testing.T) { for _, data := range oneCase.netdata { if impl == nil { - _, impl, _ = MysqlProtoDetect(data.Payload, data.ActSize) + _, impl, _ = MysqlProtoDetect(data.Payload, data.CaptureSize) } if impl != nil { impl.Decode(comm.FnInOut(data.Fn), &data, 0, nil) diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/http.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/http.go index f30fa73266..f210fa4f64 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/http.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/http.go @@ -206,7 +206,7 @@ func (dec *httpDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, } if dec.direction == comm.DIn { - inf.meta.InnerID = thrTr.Insert(dec.direction, data.Thread, data.TSTail) + inf.meta.InnerID = thrTr.Insert(dec.direction, int32(data.Conn.Pid), data.Thread, data.TSTail) } inf.meta.ReqTCPSeq = data.TCPSeq @@ -243,16 +243,16 @@ func (dec *httpDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, case comm.DIn: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.CaptureSize case comm.NICDEgress: - inf.respBytes += data.ActSize + inf.respBytes += data.CaptureSize } case comm.DOut: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.respBytes += data.ActSize + inf.respBytes += data.CaptureSize case comm.NICDEgress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.CaptureSize } } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/http2.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/http2.go index e8022890b0..f9c5599f5c 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/http2.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/http2.go @@ -126,7 +126,7 @@ func (dec *h2DecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, } if dec.direction == comm.DIn { - elem.meta.InnerID = thrTr.Insert(dec.direction, data.Thread, data.TSTail) + elem.meta.InnerID = thrTr.Insert(dec.direction, int32(data.Conn.Pid), data.Thread, data.TSTail) } case l4log.H2HdrPath: diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/mysql.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/mysql.go index fbe251113a..d33b1e5cdb 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/mysql.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/mysql.go @@ -436,7 +436,8 @@ func (dec *mysqlDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, } if dec.direction == comm.DIn { - inf.meta.InnerID = thrTr.Insert(dec.direction, data.Thread, data.TSTail) + inf.meta.InnerID = thrTr.Insert(dec.direction, int32(data.Conn.Pid), + data.Thread, data.TSTail) } inf.meta.ReqTCPSeq = firstSeq @@ -476,16 +477,16 @@ func (dec *mysqlDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, case comm.DIn: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.CaptureSize case comm.NICDEgress: - inf.respBytes += data.ActSize + inf.respBytes += data.CaptureSize } case comm.DOut: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.respBytes += data.ActSize + inf.respBytes += data.CaptureSize case comm.NICDEgress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.CaptureSize } } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/pcap_test.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/pcap_test.go index e04645b553..34da903622 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/pcap_test.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/pcap_test.go @@ -73,7 +73,7 @@ func TestRedisProto(t *testing.T) { var impl ProtoDecPipe for _, c := range cases { netdata := c[1].(*comm.NetwrkData) - _, impl, _ = RedisProtoDetect(netdata.Payload, netdata.ActSize) + _, impl, _ = RedisProtoDetect(netdata.Payload, netdata.CaptureSize) if impl != nil { break } @@ -189,7 +189,7 @@ func TestMySQLProto(t *testing.T) { var impl ProtoDecPipe for _, c := range cases { netdata := c[1].(*comm.NetwrkData) - _, impl, _ = MysqlProtoDetect(netdata.Payload, netdata.ActSize) + _, impl, _ = MysqlProtoDetect(netdata.Payload, netdata.CaptureSize) if impl != nil { break } @@ -310,9 +310,9 @@ func (s *netStream) Get(localIP, foreignIP net.IP, localPort, foreignPort uint16 func wrapPacket(buf []byte, dir comm.NICDirection, tcpSeq, sPort, dPort uint32) *comm.NetwrkData { netdata := &comm.NetwrkData{ - ActSize: len(buf), - Payload: buf, - TCPSeq: tcpSeq, + CaptureSize: len(buf), + Payload: buf, + TCPSeq: tcpSeq, Conn: comm.ConnectionInfo{ Sport: sPort, Dport: dPort, diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/postgresql.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/postgresql.go index 9cd1595517..a4ce521ee4 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/postgresql.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/postgresql.go @@ -281,7 +281,8 @@ func (dec *pgsqlDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, } if dec.direction == comm.DIn { - inf.meta.InnerID = thrTr.Insert(dec.direction, data.Thread, data.TSTail) + inf.meta.InnerID = thrTr.Insert(dec.direction, int32(data.Conn.Pid), + data.Thread, data.TSTail) } inf.ts = ts @@ -304,16 +305,16 @@ func (dec *pgsqlDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, case comm.DIn: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.FnCallSize case comm.NICDEgress: - inf.respBytes += data.ActSize + inf.respBytes += data.FnCallSize } case comm.DOut: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.respBytes += data.ActSize + inf.respBytes += data.FnCallSize case comm.NICDEgress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.FnCallSize } } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/proto.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/proto.go index feb751b33a..2bf4ccd823 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/proto.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/proto.go @@ -152,8 +152,8 @@ type ProtoData struct { } type threadTrace interface { - Insert(d comm.Direcion, thrID [2]int32, ts0_1 uint64) (id int64) - GetInnerID(thrID [2]int32, ts uint64) int64 + Insert(d comm.Direcion, pid int32, thrID [2]int32, ts0_1 uint64) (id int64) + GetInnerID(pid int32, thrID [2]int32, ts uint64) int64 } type ProtoDecPipe interface { diff --git a/internal/plugins/externals/ebpf/internal/l7flow/protodec/redis.go b/internal/plugins/externals/ebpf/internal/l7flow/protodec/redis.go index ca23f52a4c..b5d5637948 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/protodec/redis.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/protodec/redis.go @@ -107,7 +107,8 @@ func (dec *redisDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, } if dec.direction == comm.DIn { - inf.meta.InnerID = thrTr.Insert(dec.direction, data.Thread, data.TSTail) + inf.meta.InnerID = thrTr.Insert(dec.direction, int32(data.Conn.Pid), + data.Thread, data.TSTail) } inf.meta.ReqTCPSeq = data.TCPSeq @@ -140,16 +141,16 @@ func (dec *redisDecPipe) Decode(txRx comm.NICDirection, data *comm.NetwrkData, case comm.DIn: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.CaptureSize case comm.NICDEgress: - inf.respBytes += data.ActSize + inf.respBytes += data.CaptureSize } case comm.DOut: switch txRx { //nolint:exhaustive case comm.NICDIngress: - inf.respBytes += data.ActSize + inf.respBytes += data.CaptureSize case comm.NICDEgress: - inf.reqBytes += data.ActSize + inf.reqBytes += data.CaptureSize } } diff --git a/internal/plugins/externals/ebpf/internal/l7flow/queue.go b/internal/plugins/externals/ebpf/internal/l7flow/queue.go new file mode 100644 index 0000000000..bdc3d6290b --- /dev/null +++ b/internal/plugins/externals/ebpf/internal/l7flow/queue.go @@ -0,0 +1,82 @@ +//go:build linux +// +build linux + +package l7flow + +import ( + "math" + + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/comm" +) + +const ( + queueWindow = 36 + queuePopCount = 18 + queueWindowX6 = queueWindow * 6 +) + +func idxLess(l, r uint64) bool { + // 可能发生回绕现象,预留窗口应与 buffer 长度相近 + if l > math.MaxUint64-(queueWindowX6) && r <= queueWindowX6 { + return true + } + + return l < r +} + +type dataQueue struct { + li []*comm.NetwrkData + // 从 1 开始索引,如果值为 0,视为发生翻转 + prvDataPos uint64 +} + +func (q *dataQueue) insertSort(data *comm.NetwrkData) { + for i := 0; i < len(q.li); i++ { + if idxLess(data.Index, q.li[i].Index) { + q.li = append(q.li, nil) + copy(q.li[i+1:], q.li[i:]) + q.li[i] = data + return + } + } + + q.li = append(q.li, data) +} + +func (q *dataQueue) Queue(data *comm.NetwrkData) []*comm.NetwrkData { + var val []*comm.NetwrkData + if data == nil { + return val + } + + q.insertSort(data) + + if len(q.li) >= queueWindow { + x := queuePopCount + val = append(val, q.li[:x]...) + + copy(q.li, q.li[x:]) + q.li = q.li[:len(q.li)-x] + + q.prvDataPos = val[x-1].Index + } + + var nxt int + for i := 0; i < len(q.li); i++ { + if q.li[i].Index == q.prvDataPos+1 { + val = append(val, q.li[i]) + q.prvDataPos = q.li[i].Index + nxt = i + 1 + } + } + + if nxt > 0 { + if nxt == len(q.li) { + q.li = q.li[:0] + } else { + copy(q.li, q.li[nxt:]) + q.li = q.li[:len(q.li)-nxt] + } + } + return val +} diff --git a/internal/plugins/externals/ebpf/internal/l7flow/setaffinity.go b/internal/plugins/externals/ebpf/internal/l7flow/setaffinity.go new file mode 100644 index 0000000000..8e26034f53 --- /dev/null +++ b/internal/plugins/externals/ebpf/internal/l7flow/setaffinity.go @@ -0,0 +1,51 @@ +//go:build linux +// +build linux + +package l7flow + +import ( + "context" + "fmt" + "runtime" + "time" + + "golang.org/x/sys/unix" +) + +func SetAffinity(cpuID int) error { + var newMask unix.CPUSet + newMask.Set(cpuID) + + err := unix.SchedSetaffinity(0, &newMask) + if err != nil { + return fmt.Errorf("set affinity error: %w", err) + } + return nil +} + +func newKpFlushTrigger(ctx context.Context) { + for i := 0; i < runtime.NumCPU(); i++ { + go func(i int) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if err := SetAffinity(i); err != nil { + log.Errorf("set affinity error: %s, cpu id: %d", err.Error(), i) + } else { + log.Infof("set affinity, thread id: %d, cpu id: %d", unix.Gettid(), i) + } + + ticker := time.NewTicker(time.Second * 5) + + cpuSet := unix.CPUSet{} + for { + select { + case <-ticker.C: + _ = unix.SchedGetaffinity(0, &cpuSet) + case <-ctx.Done(): + return + } + } + }(i) + } +}