diff --git a/internal/export/doc/en/inputs/ebpf.md b/internal/export/doc/en/inputs/ebpf.md index 4bdd7f71b9..e942715775 100644 --- a/internal/export/doc/en/inputs/ebpf.md +++ b/internal/export/doc/en/inputs/ebpf.md @@ -123,6 +123,11 @@ Configuration items: - Environment variable: `ENV_INPUT_EBPF_L7NET_ENABLED` - Example: `httpflow` +- `interval` + - Description: Set the sampling time interval + - Environment variable: `ENV_INPUT_EBPF_INTERVAL` + - Example: `1m30s` + - `ipv6_disabled` - Description: Whether the system does not support IPv6 - Environment variable: `ENV_INPUT_EBPF_IPV6_DISABLED` diff --git a/internal/export/doc/zh/inputs/ebpf.md b/internal/export/doc/zh/inputs/ebpf.md index 7a0322a0ae..ad5691676b 100644 --- a/internal/export/doc/zh/inputs/ebpf.md +++ b/internal/export/doc/zh/inputs/ebpf.md @@ -120,6 +120,11 @@ setenforce 0 - 环境变量:`ENV_INPUT_EBPF_L7NET_ENABLED` - 示例:`httpflow` +- `interval` + - 描述:设置采样时间间隔 + - 环境变量:`ENV_INPUT_EBPF_INTERVAL` + - 示例:`1m30s` + - `ipv6_disabled` - 描述:系统是否不支持 IPv6 - 环境变量:`ENV_INPUT_EBPF_IPV6_DISABLED` diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c b/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c index c020ba0561..7808730d12 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/apiflow.c @@ -98,6 +98,8 @@ FN_KPROBE(tcp_close) return 0; } + clean_protocol_filter(sk); + net_data_t *dst = get_net_data_percpu(); if (dst == NULL) { @@ -123,8 +125,6 @@ FN_KPROBE(tcp_close) try_upload_net_events(ctx, dst); - clean_protocol_filter(pid_tgid, sk); - return 0; } @@ -166,18 +166,17 @@ FN_UPROBE(SSL_shutdown) FN_KPROBE(sched_getaffinity) { __u64 cpu = bpf_get_smp_processor_id(); - __s32 index = 0; - network_events_t *events = bpf_map_lookup_elem(&mp_network_events, &index); + network_events_t *events = get_net_events(); if (events == NULL) { return 0; } - if (events->pos.num > 0) + if (events->rec.num > 0) { bpf_perf_event_output(ctx, &mp_upload_netwrk_events, cpu, events, sizeof(network_events_t)); - events->pos.len = 0; - events->pos.num = 0; + events->rec.bytes = 0; + events->rec.num = 0; } return 0; diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h b/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h index 34d649cc34..8709364fdf 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/bpfmap_l7.h @@ -11,32 +11,39 @@ // ------------------------------------------------------ // ---------------------- BPF MAP ----------------------- -BPF_HASH_MAP(mp_syscall_rw_arg, __u64, syscall_rw_arg_t, 1024) -BPF_HASH_MAP(mp_syscall_rw_v_arg, __u64, syscall_rw_v_arg_t, 1024) +BPF_HASH_MAP(mp_syscall_rw_arg, __u64, syscall_rw_arg_t, 2048) +BPF_HASH_MAP(mp_syscall_rw_v_arg, __u64, syscall_rw_v_arg_t, 2048) -BPF_HASH_MAP(mp_sk_inf, void *, sk_inf_t, 65535) +// Limit the number of connections tracked to 4k conns +BPF_HASH_MAP(mp_sk_inf, void *, sk_inf_t, 40960) BPF_PERCPU_ARRAY(mp_uni_id_per_cpu, id_generator_t) -BPF_PERCPU_ARRAY(mp_network_data, net_data_t) +BPF_PERCPU_ARRAY(mp_network_data_per_cpu, net_data_t) -BPF_PERCPU_ARRAY(mp_network_events, network_events_t) +BPF_PERCPU_ARRAY(mp_network_events_per_cpu, network_events_t) + +static __always_inline network_events_t *get_net_events() +{ + __s32 index = 0; + network_events_t *events = bpf_map_lookup_elem(&mp_network_events_per_cpu, &index); + return events; +} BPF_PERF_EVENT_MAP(mp_upload_netwrk_events) -BPF_HASH_MAP(bpfmap_ssl_read_args, __u64, ssl_read_args_t, 1024) +BPF_HASH_MAP(bpfmap_ssl_read_args, __u64, ssl_read_args_t, 2048) -BPF_HASH_MAP(bpfmap_bio_new_socket_args, __u64, __u32, 1024) // k: pid_tgid v: sockfd +BPF_HASH_MAP(bpfmap_bio_new_socket_args, __u64, __u32, 2048) // k: pid_tgid v: sockfd -BPF_HASH_MAP(bpfmap_ssl_ctx_sockfd, void *, __u64, 1024) +BPF_HASH_MAP(bpfmap_ssl_ctx_sockfd, void *, __u64, 2048) -BPF_HASH_MAP(bpfmap_ssl_bio_fd, void *, __u32, 1024) +BPF_HASH_MAP(bpfmap_ssl_bio_fd, void *, __u32, 2048) -BPF_HASH_MAP(bpfmap_ssl_pidtgid_ctx, __u64, void *, 1024) +BPF_HASH_MAP(bpfmap_ssl_pidtgid_ctx, __u64, void *, 2048) -BPF_HASH_MAP(bpfmap_syscall_sendfile_arg, __u64, syscall_sendfile_arg_t, 1024) +BPF_HASH_MAP(bpfmap_syscall_sendfile_arg, __u64, syscall_sendfile_arg_t, 2048) -// TODO: use it -BPF_HASH_MAP(mp_protocol_filter, pid_skptr_t, __u8, 65536) +BPF_HASH_MAP(mp_protocol_filter, void *, __u8, 65536) #endif // !__BPFMAP_L7_H diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h index aedc101df5..53f36e5315 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_stats.h @@ -74,6 +74,7 @@ typedef struct sk_inf { id_generator_t uni_id; __u64 index; + __u64 skptr; conn_inf_t conn; } sk_inf_t; @@ -105,67 +106,32 @@ typedef struct netwrk_data typedef struct event_rec { __u32 num; - __u32 len; + __u32 bytes; } event_rec_t; enum { - L7_EVENT_SIZE = (L7_BUFFER_SIZE * 2 - sizeof(event_rec_t)), + L7_EVENT_SIZE = (L7_BUFFER_SIZE * 4 - sizeof(event_rec_t)), #define L7_EVENT_SIZE L7_EVENT_SIZE }; typedef struct network_events { - event_rec_t pos; + event_rec_t rec; __u8 payload[L7_EVENT_SIZE]; } network_events_t; -typedef enum -{ - BUF_DIV8 = L7_BUFFER_SIZE / 8, -#define BUF_DIV8 BUF_DIV8 - - BUF_DIV4 = L7_BUFFER_SIZE / 4, -#define BUF_DIV4 BUF_DIV4 - - BUF_DIV2 = L7_BUFFER_SIZE / 2, -#define BUF_DIV2 BUF_DIV2 - - BUF_DIV1 = L7_BUFFER_SIZE, -#define BUF_DIV1 BUF_DIV1 -} buf_div_t; - typedef struct net_event_comm { - __u32 idx; - __u32 len; - + event_rec_t rec; netdata_meta_t meta; } net_event_comm_t; typedef struct { net_event_comm_t event_comm; - __u8 payload[BUF_DIV8]; -} net_event_div8_t; - -typedef struct -{ - net_event_comm_t event_comm; - __u8 payload[BUF_DIV4]; -} net_event_div4_t; - -typedef struct -{ - net_event_comm_t event_comm; - __u8 payload[BUF_DIV2]; -} net_event_div2_t; - -typedef struct -{ - net_event_comm_t event_comm; - __u8 payload[BUF_DIV1]; -} net_event_div1_t; + __u8 payload[L7_BUFFER_SIZE]; +} net_event_t; typedef struct ssl_read_args { diff --git a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h index 6e9b138c64..c5eb6b01af 100644 --- a/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h +++ b/internal/plugins/externals/ebpf/internal/c/apiflow/l7_utils.h @@ -15,7 +15,7 @@ #include "../netflow/netflow_utils.h" #include "../conntrack/maps.h" -#include "../process_sched/goid2tid.h" +#include "../process_sched/goidtid.h" #include "bpfmap_l7.h" #include "bpf_helpers.h" @@ -89,10 +89,16 @@ static __always_inline __u8 get_sk_inf(void *sk, sk_inf_t *dst, __u8 force) return 0; } + i.skptr = (u64)sk; // May fail due to exceeding the upper limit of the number of elements bpf_map_update_elem(&mp_sk_inf, &sk, &i, BPF_NOEXIST); inf = (sk_inf_t *)bpf_map_lookup_elem(&mp_sk_inf, &sk); + if (inf != NULL) + { + __u8 val = 0; + bpf_map_update_elem(&mp_protocol_filter, &sk, &val, BPF_NOEXIST); + } } } @@ -140,27 +146,26 @@ static __always_inline void del_sk_inf(void *sk) } \ } while (0) -static __always_inline bool proto_filter(__u64 pid_tgid, void *sock_ptr) +static __always_inline bool net_filtered(__u64 pid_tgid, void *sock_ptr) { - pid_skptr_t key = { - .pid = pid_tgid >> 32, - .sk_ptr = (__u64)sock_ptr}; + u32 pid = pid_tgid >> 32; + if (need_filter_proc(&pid)) + { + return false; + } - __u8 *val = bpf_map_lookup_elem(&mp_protocol_filter, &key); + __u8 *val = bpf_map_lookup_elem(&mp_protocol_filter, &sock_ptr); if (val != NULL && *val == 1) { - return true; + return false; } - return false; + + return true; } -static __always_inline void clean_protocol_filter(__u64 pid_tgid, void *sock_ptr) +static __always_inline void clean_protocol_filter(void *sock_ptr) { - pid_skptr_t key = { - .pid = pid_tgid >> 32, - .sk_ptr = (__u64)sock_ptr}; - - bpf_map_delete_elem(&mp_protocol_filter, &key); + bpf_map_delete_elem(&mp_protocol_filter, &sock_ptr); } // ret 0: r, 1: w @@ -275,6 +280,7 @@ static __always_inline void read_netwrk_data(net_data_t *dst, __u8 *buf, __s64 l { if (len_or_errno <= 0) { + dst->meta.capture_size = 0; return; } @@ -407,7 +413,7 @@ static __always_inline bool get_sk_with_typ(struct socket *skt, struct sock **sk static __always_inline net_data_t *get_net_data_percpu() { __s32 index = 0; - net_data_t *data = bpf_map_lookup_elem(&mp_network_data, &index); + net_data_t *data = bpf_map_lookup_elem(&mp_network_data_per_cpu, &index); if (data == NULL) { return NULL; @@ -418,113 +424,55 @@ static __always_inline net_data_t *get_net_data_percpu() return data; } -static __always_inline int buf_copy_thr_bprobe(void *dst, const int max_size_base2, int size, void *src) +static __always_inline void try_upload_net_events(void *ctx, net_data_t *data) { - if (size <= 0) - { - return 0; - } - if (size >= max_size_base2) + network_events_t *events = get_net_events(); + if (events == NULL) { - size = max_size_base2; - } - else - { - size &= (max_size_base2 - 1); + return; } - bpf_probe_read(dst, size, src); - return size; -} - -#define write_net_event(ctx, cpu, data, typ) \ - do \ - { \ - __s32 index = 0; \ - network_events_t *events = bpf_map_lookup_elem(&mp_network_events, &index); \ - if (events == NULL) \ - { \ - return; \ - } \ - \ - if (sizeof(typ) > L7_EVENT_SIZE - events->pos.len) \ - { \ - bpf_perf_event_output(ctx, &mp_upload_netwrk_events, cpu, events, sizeof(network_events_t)); \ - events->pos.len = 0; \ - events->pos.num = 0; \ - } \ - \ - if (sizeof(typ) + events->pos.len <= L7_EVENT_SIZE) \ - { \ - typ *elem = (typ *)((__u8 *)(events->payload) + events->pos.len); \ - \ - events->pos.num += 1; \ - elem->event_comm.idx = events->pos.num; \ - \ - bpf_probe_read(&elem->event_comm.meta, sizeof(data->meta), &data->meta); \ - events->pos.len += sizeof(elem->event_comm); \ - \ - int capture_size = data->meta.capture_size; \ - if (capture_size < 0) \ - { \ - capture_size = 0; \ - } \ - if (capture_size > 0) \ - { \ - capture_size = buf_copy_thr_bprobe(&elem->payload, sizeof(elem->payload), capture_size, &data->payload); \ - } \ - \ - events->pos.len += capture_size; \ - elem->event_comm.len = capture_size; \ - } \ - } while (0); - -static __always_inline void try_upload_net_events(void *ctx, net_data_t *data) -{ __u64 cpu = bpf_get_smp_processor_id(); - s32 capture_size = 0; - capture_size = data->meta.capture_size; - - if (capture_size <= BUF_DIV8) + int capture_size = data->meta.capture_size; + if (capture_size < 0) { - write_net_event(ctx, cpu, data, net_event_div8_t); + capture_size = 0; } - else if (capture_size <= BUF_DIV4) - { - write_net_event(ctx, cpu, data, net_event_div4_t); - } - else if (capture_size <= BUF_DIV2) - { - write_net_event(ctx, cpu, data, net_event_div2_t); - } - else if (capture_size <= BUF_DIV1) - { - write_net_event(ctx, cpu, data, net_event_div1_t); - } - else + + if (sizeof(net_event_t) + events->rec.bytes > L7_EVENT_SIZE) { -#ifdef __DKE_DEBUG_RW_V__ - bpf_printk("act_size %d\n", capture_size); -#endif - // something wrong + bpf_perf_event_output(ctx, &mp_upload_netwrk_events, cpu, events, sizeof(network_events_t)); + events->rec.bytes = 0; + events->rec.num = 0; } -} -static __always_inline void flush_net_events(void *ctx) -{ - __s32 index = 0; - network_events_t *events = bpf_map_lookup_elem(&mp_network_events, &index); - if (events == NULL || events->pos.num == 0) + if (sizeof(net_event_t) + events->rec.bytes <= L7_EVENT_SIZE) { - return; - } + net_event_t *net_event = (net_event_t *)((__u8 *)(events->payload) + events->rec.bytes); - __u64 cpu = bpf_get_smp_processor_id(); + // update events header + events->rec.bytes += sizeof(net_event_comm_t); + events->rec.num += 1; + // update event rec header + net_event->event_comm.rec.num = events->rec.num; + + // copy meta data from data to event meta + bpf_probe_read(&net_event->event_comm.meta, sizeof(data->meta), &data->meta); - bpf_perf_event_output(ctx, &mp_network_events, cpu, events, sizeof(network_events_t)); - events->pos.len = 0; - events->pos.num = 0; + // copy payload from data to event payload + if (capture_size > 0) + { + bpf_probe_read(&net_event->payload, sizeof(data->payload), &data->payload); + } + else + { + capture_size = 0; + } + + events->rec.bytes += capture_size; + net_event->event_comm.rec.bytes = capture_size; + } } static __always_inline bool put_rw_args(tp_syscall_rw_args_t *ctx, void *bpf_map, enum MSG_RW rw) @@ -557,7 +505,7 @@ static __always_inline bool put_rw_args(tp_syscall_rw_args_t *ctx, void *bpf_map return false; } - if (proto_filter(pid_tgid, sk)) + if (!net_filtered(pid_tgid, sk)) { return false; } @@ -625,7 +573,7 @@ static __always_inline bool put_rw_v_args(tp_syscall_rw_v_args_t *ctx, void *bpf return false; } - if (proto_filter(pid_tgid, sk)) + if (!net_filtered(pid_tgid, sk)) { return false; } diff --git a/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h b/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h index 2d141051a3..bba48fc866 100644 --- a/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h +++ b/internal/plugins/externals/ebpf/internal/c/common/bpf_helpers.h @@ -109,6 +109,13 @@ struct bpf_map_def .value_size = sizeof(value_type), \ .max_entries = 1}; +#define BPF_ARRAY_MAP(map_name, key_type, value_type, map_max_entries) \ + struct bpf_map_def SEC("maps/" #map_name) map_name = { \ + .type = BPF_MAP_TYPE_ARRAY, \ + .key_size = sizeof(key_type), \ + .value_size = sizeof(value_type), \ + .max_entries = map_max_entries}; + #define BPF_HASH_MAP(map_name, key_type, value_type, map_max_entries) \ struct bpf_map_def SEC("maps/" #map_name) map_name = { \ .type = BPF_MAP_TYPE_HASH, \ diff --git a/internal/plugins/externals/ebpf/internal/c/process_sched/bpfmap.h b/internal/plugins/externals/ebpf/internal/c/process_sched/bpfmap.h index 6848a2c9f1..b6166bae10 100644 --- a/internal/plugins/externals/ebpf/internal/c/process_sched/bpfmap.h +++ b/internal/plugins/externals/ebpf/internal/c/process_sched/bpfmap.h @@ -6,6 +6,6 @@ BPF_PERF_EVENT_MAP(process_sched_event); -BPF_HASH_MAP(bmap_procinject, u32, proc_inject_t, 4096); +BPF_HASH_MAP(bmap_procinject, u32, proc_inject_t, 12800); #endif diff --git a/internal/plugins/externals/ebpf/internal/c/process_sched/goid2tid.h b/internal/plugins/externals/ebpf/internal/c/process_sched/goid2tid.h deleted file mode 100644 index 83d9c974a5..0000000000 --- a/internal/plugins/externals/ebpf/internal/c/process_sched/goid2tid.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef __GOID_TO_TID_H__ -#define __GOID_TO_TID_H__ - -#include "bpf_helpers.h" - -// pid_tgid -> goid -BPF_HASH_MAP(bmap_tid2goid, u64, u64, 12800); - -#endif diff --git a/internal/plugins/externals/ebpf/internal/c/process_sched/goidtid.h b/internal/plugins/externals/ebpf/internal/c/process_sched/goidtid.h new file mode 100644 index 0000000000..b013993015 --- /dev/null +++ b/internal/plugins/externals/ebpf/internal/c/process_sched/goidtid.h @@ -0,0 +1,30 @@ +#ifndef __GOID_TID_H__ +#define __GOID_TID_H__ + +#include +#include "bpf_helpers.h" + +#include "process_sched.h" + +// pid_tgid -> goid +BPF_HASH_MAP(bmap_tid2goid, u64, u64, 12800); + +BPF_HASH_MAP(bmap_proc_filter, u32, proc_filter_info_t, 12800); + +static __always_inline bool need_filter_proc(u32 *pid) +{ + proc_filter_info_t *inf = NULL; + inf = bpf_map_lookup_elem(&bmap_proc_filter, pid); + if (inf == NULL) + { + return false; + } + + if (inf->disable) + { + return true; + } + return false; +} + +#endif diff --git a/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.c b/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.c index 8b915b3e13..0c7fecda1b 100644 --- a/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.c +++ b/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.c @@ -3,16 +3,17 @@ #include "bpf_helpers.h" #include "process_sched.h" #include "bpfmap.h" -#include "goid2tid.h" +#include "goidtid.h" SEC("tracepoint/sched/sched_process_fork") int tracepoint__sched_process_fork(struct tp_sched_process_fork_args *ctx) { + // syscall: clone, clone3, fork, vfork ... __u64 pid_tgid = bpf_get_current_pid_tgid(); rec_process_sched_status_t rec = {0}; rec.status = REC_SCHED_FORK; - rec.prv_pid = ctx->parent_pid; + rec.prv_pid = pid_tgid >> 32; rec.nxt_pid = ctx->child_pid; bpf_probe_read(&rec.comm, sizeof(rec.comm), &ctx->child_comm); @@ -40,7 +41,7 @@ int tracepoint__sched_process_exec(struct tp_sched_process_exec_args *ctx) // set status rec.prv_pid = pid_tgid >> 32; - rec.nxt_pid = (__u32)pid_tgid; + rec.nxt_pid = rec.prv_pid; rec.status = REC_SCHED_EXEC; bpf_get_current_comm(&rec.comm, KERNEL_TASK_COMM_LEN); @@ -48,12 +49,6 @@ int tracepoint__sched_process_exec(struct tp_sched_process_exec_args *ctx) bpf_perf_event_output(ctx, &process_sched_event, cpu, &rec, sizeof(rec_process_sched_status_t)); - // __u64 pid = pid_tgid >> 32; - // if (pid == 157374 || pid == 191010) - // { - // bpf_printk("exec comm %s, pid %d tid %d\n", rec.comm, rec.prv_pid, rec.nxt_pid); - // }; - return 0; } @@ -64,27 +59,22 @@ int tracepoint__sched_process_exit(struct tp_sched_process_exit_args *ctx) bpf_map_delete_elem(&bmap_tid2goid, &pid_tgid); - if ((pid_tgid >> 32) == (__u32)pid_tgid) // process(all threads) exit + int pid = pid_tgid >> 32; + int tid = (u32)pid_tgid; + if (pid == tid) // process(all threads) exit { rec_process_sched_status_t rec = {0}; rec.status = REC_SCHED_EXIT; - rec.prv_pid = pid_tgid >> 32; - rec.nxt_pid = (__u32)pid_tgid; + rec.prv_pid = pid; + rec.nxt_pid = tid; - __u32 pid = pid_tgid >> 32; bpf_map_delete_elem(&bmap_procinject, &pid); + bpf_map_delete_elem(&bmap_proc_filter, &pid); __u64 cpu = bpf_get_smp_processor_id(); bpf_perf_event_output(ctx, &process_sched_event, cpu, &rec, sizeof(rec_process_sched_status_t)); } - - // __u64 pid = pid_tgid >> 32; - // if (pid == 157374 || pid == 191010) - // { - // bpf_printk("exit comm %s, pid %d tid %d\n", ctx->comm, pid_tgid >> 32, (__u32)pid_tgid); - // }; - return 0; } @@ -127,11 +117,6 @@ int uprobe__go_runtime_execute(struct pt_regs *ctx) bpf_map_update_elem(&bmap_tid2goid, &pid_tgid, &goid, BPF_ANY); - // if (pid == 157374 || pid == 191010) - // { - // bpf_printk("go exec pid %d tid %d goid %d\n", pid_tgid >> 32, (__u32)pid_tgid, goid); - // }; - return 0; } diff --git a/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.h b/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.h index c10fd7bf87..fe0f2a2d0c 100644 --- a/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.h +++ b/internal/plugins/externals/ebpf/internal/c/process_sched/process_sched.h @@ -21,7 +21,6 @@ typedef struct rec_process_sched_status __s32 prv_pid; // parent_pid or tgid or old_pid __s32 nxt_pid; __s32 __pad; - // __s32 cur_tgid; __u8 comm[KERNEL_TASK_COMM_LEN]; } rec_process_sched_status_t; @@ -65,4 +64,14 @@ typedef struct proc_inject __u64 offset_go_runtime_g_goid; __u64 go_use_register; } proc_inject_t; + + +typedef struct proc_filter_info +{ + __u8 disable; + __u8 pad0; + __u16 pad1; + __u32 pad2; +} proc_filter_info_t; + #endif \ No newline at end of file diff --git a/internal/plugins/externals/ebpf/internal/cmd/run/run.go b/internal/plugins/externals/ebpf/internal/cmd/run/run.go index 3ac83a9fdc..8631166517 100644 --- a/internal/plugins/externals/ebpf/internal/cmd/run/run.go +++ b/internal/plugins/externals/ebpf/internal/cmd/run/run.go @@ -30,7 +30,6 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/netflow" dkoffset "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/offset" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/sysmonitor" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/tracing" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/pkg/dumpstd" // nolint:gosec @@ -270,6 +269,7 @@ func mergeOption(cfgFilePath *string, opt *Flag) (*Flag, error) { //nolint:funlen func runCmd(cfgFile *string, fl *Flag) error { + _ = cfgFile fl, gTags, err := parseFlags(fl) if err != nil { return err @@ -302,6 +302,17 @@ func runCmd(cfgFile *string, fl *Flag) error { initResLitmiter(fl, signaIterrrupt) interval := time.Minute + if v, err := time.ParseDuration(fl.Interval); err == nil { + if v > interval { + interval = v + } else { + log.Warnf("%s is less than the minimum interval of 60s", v) + } + } else { + log.Warnf("parse interval failed: %s", err.Error()) + } + + log.Infof("set the time interval to %s", interval) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -320,43 +331,45 @@ func runCmd(cfgFile *string, fl *Flag) error { } if enableEbpfNet { - traceAll := fl.EBPFTrace.TraceAllProc + _ = fl.EBPFTrace.TraceAllProc + + var envWhitelist []string + var envBlacklist []string - envSet := map[string]bool{} for _, e := range fl.EBPFTrace.TraceEnvList { e = strings.TrimSpace(e) if e != "" { - envSet[e] = true + envWhitelist = append(envWhitelist, e) } } for _, e := range fl.EBPFTrace.TraceEnvBlacklist { e = strings.TrimSpace(e) if e != "" { - envSet[e] = false + envBlacklist = append(envBlacklist, e) } } - processSet := map[string]bool{} - processSet["datakit-ebpf"] = false - processSet["datakit"] = false + nameBlacklist := []string{"datakit-ebpf", "datakit"} + var nameWhitelist []string for _, p := range fl.EBPFTrace.TraceNameList { p = strings.TrimSpace(p) if p != "" { - processSet[p] = true + nameWhitelist = append(nameWhitelist, p) } } for _, p := range fl.EBPFTrace.TraceNameBlacklist { p = strings.TrimSpace(p) if p != "" { - processSet[p] = false + nameBlacklist = append(nameBlacklist, p) } } - var enableTraceFilter bool - enableProtos := map[protodec.L7Protocol]struct{}{} + enableProtos := map[protodec.L7Protocol]struct{}{ + protodec.ProtoHTTP: {}, + } if enableTrace && fl.EBPFTrace.TraceServer != "" { protoLi := netproto(fl.EBPFTrace.TraceProtoList, fl.EBPFTrace.TraceProtoBlacklist) var protoStr []string @@ -364,19 +377,25 @@ func runCmd(cfgFile *string, fl *Flag) error { enableProtos[p] = struct{}{} protoStr = append(protoStr, p.StringLower()) } - enableTraceFilter = true - log.Info("trace function enabled") + log.Info("trace feature enabled") log.Info("trace protocols: ", strings.Join(protoStr, ",")) - log.Info("trace all processes: ", traceAll) - log.Info("service name environment variables: ", strings.Join(envAssignAllowed, ",")) - log.Info("enable trace environment variables: ", envSet) - log.Info("process name set: ", processSet) } - log.Info(envAssignAllowed, envSet, processSet, traceAll, !enableTraceFilter) + log.Infof("service env: %v, env w: %v, b: %v, proc w: %v, b: %v", + envAssignAllowed, envWhitelist, envBlacklist, nameWhitelist, nameBlacklist) + + procFilter := sysmonitor.NewProcessFilter(ctx, + sysmonitor.WithSelfPid(os.Getpid()), + + sysmonitor.WithEnvService(envAssignAllowed), + + sysmonitor.WithEnvBlacklist(envBlacklist), + sysmonitor.WithEnvWhitelist(envWhitelist), + + sysmonitor.WithNameBlacklist(nameBlacklist), + sysmonitor.WithNameWhitelist(nameWhitelist), - procFilter := tracing.NewProcessFilter( - envAssignAllowed, envSet, processSet, traceAll, !enableTraceFilter, + sysmonitor.WithTracing(enableTrace), ) schedTracer, err := sysmonitor.NewProcessSchedTracer(procFilter) @@ -449,10 +468,9 @@ func runCmd(cfgFile *string, fl *Flag) error { var bmaps map[string]*ebpf.Map if ctMap != nil { - if bmaps == nil { - bmaps = make(map[string]*ebpf.Map) + bmaps = map[string]*ebpf.Map{ + "bpfmap_conntrack_tuple": ctMap, } - bmaps["bpfmap_conntrack_tuple"] = ctMap } netflowTracer := netflow.NewNetFlowTracer(procFilter) @@ -499,7 +517,7 @@ func runCmd(cfgFile *string, fl *Flag) error { constEditor = append(constEditor, httpConst...) // TODO: append conntrack bpf map - bmaps, _ := schedTracer.GetGOSchedMap() + bmaps, _ := schedTracer.GetSchedMap() var traceSvc string if exporter.DataKitTraceServer != "" { traceSvc = fmt.Sprintf("http://%s%s", exporter.DataKitTraceServer, "/v1/bpftracing") @@ -510,6 +528,7 @@ func runCmd(cfgFile *string, fl *Flag) error { url.QueryEscape(inputNameNetHTTP) tracer := l7flow.NewAPIFlowTracer(ctx, + l7flow.WithSelfPid(os.Getpid()), l7flow.WithTags(gTags), l7flow.WithDatakitPostURL(dkPostURL), l7flow.WithTracePostURL(traceSvc), @@ -546,7 +565,7 @@ func runCmd(cfgFile *string, fl *Flag) error { var fnSetEndpoints l4log.CfgFn - if fl.ContainerInfo.Endpoints != nil && len(fl.ContainerInfo.Endpoints) > 0 { + if len(fl.ContainerInfo.Endpoints) > 0 { fnSetEndpoints = l4log.WithCtrEndpointOverride(fl.ContainerInfo.Endpoints) } else { var rootfs string diff --git a/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go b/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go index 150d5e803b..a43d0fd9f1 100644 --- a/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go +++ b/internal/plugins/externals/ebpf/internal/cmd/run/run_test.go @@ -14,7 +14,8 @@ func TestDKE(t *testing.T) { PprofPort: "6267", Service: "ebpf", - Enabled: []string{"ebpf-net", "ebpf-trace"}, + Enabled: []string{"ebpf-net"}, + // "ebpf-trace"}, EBPFNet: FlagNet{ L7NetEnabled: []string{"httpflow"}, diff --git a/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go b/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go index 395321fc71..8f8e2c9024 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/comm/comm.go @@ -203,6 +203,7 @@ type NetwrkData struct { Index uint64 `json:"index"` Fn FnID `json:"fn_id"` Payload []byte `json:"payload"` + SockPtr uint64 } func (d NetwrkData) String() string { diff --git a/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go b/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go index 3c156c877b..fd071a1427 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/l7flow.go @@ -27,7 +27,6 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/protodec" dknetflow "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/netflow" sysmonitor "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/sysmonitor" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/tracing" "golang.org/x/sys/unix" expRand "golang.org/x/exp/rand" @@ -321,17 +320,12 @@ func NewHTTPFlowManger(constEditor []manager.ConstantEditor, bmaps map[string]*e } type APIFlowTracer struct { - gTags map[string]string datakitPostURL string tracePostURL string - conv2dd bool - procFilter *tracing.ProcessFilter tracer *Tracer } -var selfPid = 0 - type APITracerOpt func(*apiTracerConfig) type apiTracerConfig struct { @@ -340,9 +334,16 @@ type apiTracerConfig struct { tracePostURL string conv2dd bool enableTrace bool - procFilter *tracing.ProcessFilter + procFilter *sysmonitor.ProcessFilter protos map[protodec.L7Protocol]struct{} k8sNetInfo *k8sinfo.K8sNetInfo + selfPid int +} + +func WithSelfPid(pid int) APITracerOpt { + return func(cfg *apiTracerConfig) { + cfg.selfPid = pid + } } func WithTags(tags map[string]string) APITracerOpt { @@ -375,7 +376,7 @@ func WithEnableTrace(enableTrace bool) APITracerOpt { } } -func WithProcessFilter(procFilter *tracing.ProcessFilter) APITracerOpt { +func WithProcessFilter(procFilter *sysmonitor.ProcessFilter) APITracerOpt { return func(cfg *apiTracerConfig) { cfg.procFilter = procFilter } @@ -401,24 +402,17 @@ func NewAPIFlowTracer(ctx context.Context, opts ...APITracerOpt) *APIFlowTracer } } - if selfPid == 0 { - selfPid = os.Getpid() - } return &APIFlowTracer{ - gTags: cfg.tags, datakitPostURL: cfg.datakitPostURL, tracePostURL: cfg.tracePostURL, - conv2dd: cfg.conv2dd, - procFilter: cfg.procFilter, tracer: newTracer(ctx, &cfg), } } +const bpfMapProtocolFilter = "mp_protocol_filter" + func (tracer *APIFlowTracer) Run(ctx context.Context, constEditor []manager.ConstantEditor, bmaps map[string]*ebpf.Map, enableTLS bool, interval time.Duration) error { - if selfPid == 0 { - selfPid = os.Getpid() - } go tracer.tracer.Start(ctx, interval) bpfManger, r, err := NewHTTPFlowManger(constEditor, bmaps, @@ -436,6 +430,22 @@ func (tracer *APIFlowTracer) Run(ctx context.Context, constEditor []manager.Cons log.Info("api tracer starting ...") + var fn func(uint64) + if mp, ok, err := bpfManger.GetMap(bpfMapProtocolFilter); err == nil && ok { + fn = func() func(u uint64) { + return func(u uint64) { + val := uint8(1) + if err := mp.Update(&u, &val, ebpf.UpdateExist); err != nil { + log.Debug(err) + } + } + }() + } else { + fn = func(u uint64) {} + } + + tracer.tracer.protocolFilter.setFn(fn) + if r != nil { r.ScanAndUpdate() r.Monitor(ctx, time.Second*30) diff --git a/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go b/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go index 4dc641f939..dbdc526d51 100644 --- a/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go +++ b/internal/plugins/externals/ebpf/internal/l7flow/net_tracer.go @@ -7,6 +7,7 @@ import ( "context" "strconv" "sync" + "sync/atomic" "time" "unsafe" @@ -16,12 +17,12 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/comm" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/l7flow/protodec" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/netflow" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/tracing" + sysmonitor "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/sysmonitor" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/pkg/spanid" ) type NetTrace struct { - // 对于每个进程, ingress 请求抵达时,后续的网络请求都应该继承此生成的 innter trace id + // 对于每个服务,ingress 请求抵达时,后续的网络请求都应该继承此生成的 innter trace id ESpanLinkDuration time.Duration // (kernel) sock ptr and random id ==> network flow pipe @@ -30,7 +31,9 @@ type NetTrace struct { ConnAndClosedDelCount [2]int - threadInnerID comm.ThreadTrace + threadInnerID comm.ThreadTrace + protocolFilter *protoKernelFilter + enabledProto map[protodec.L7Protocol]struct{} ptsPrv []*point.Point ptsCur []*point.Point @@ -38,9 +41,10 @@ type NetTrace struct { allowESPan bool } +const maxDetec = 64 + func (netTrace *NetTrace) StreamHandle(tn int64, uniID CUniID, data *comm.NetwrkData, - aggPool map[protodec.L7Protocol]protodec.AggPool, allowTrace bool, - protoLi map[protodec.L7Protocol]struct{}, + aggPool map[protodec.L7Protocol]protodec.AggPool, ) { if netTrace.ConnMap == nil { netTrace.ConnMap = make(map[CUniID]*FlowPipe) @@ -79,9 +83,12 @@ func (netTrace *NetTrace) StreamHandle(tn int64, uniID CUniID, data *comm.Netwrk } var dataLi []*comm.NetwrkData - if pipe.detecTimes < 64 || pipe.Decoder != nil { + if pipe.detecTimes < maxDetec || pipe.Decoder != nil { dataLi = pipe.sort.Queue(data) } else { + if netTrace.protocolFilter != nil { + netTrace.protocolFilter.filter(data.SockPtr) + } pipe.sort.li = nil } @@ -106,7 +113,8 @@ func (netTrace *NetTrace) StreamHandle(tn int64, uniID CUniID, data *comm.Netwrk if pipe.Proto == protodec.ProtoUnknown { if proto, dec, ok := protodec.ProtoDetector(d.Payload, d.CaptureSize); ok { pipe.Proto = proto - if _, ok := protoLi[pipe.Proto]; !ok && pipe.Proto != protodec.ProtoHTTP { + if _, ok := netTrace.enabledProto[pipe.Proto]; !ok { + pipe.detecTimes = maxDetec + 1 continue } else { pipe.Decoder = dec @@ -147,7 +155,7 @@ func (netTrace *NetTrace) StreamHandle(tn int64, uniID CUniID, data *comm.Netwrk p.Obs(&data.Conn, v[i]) } } - if _, ok := protoLi[pipe.Proto]; ok { + if _, ok := netTrace.enabledProto[pipe.Proto]; ok { if netTrace.allowESPan && pipe.Conn.ProcessName != "datakit" && pipe.Conn.ProcessName != "datakit-ebpf" { pts := genPts(v, &data.Conn) @@ -172,10 +180,8 @@ type FlowPipe struct { } type ConnWatcher struct { - netracer *NetTrace - aggPool map[protodec.L7Protocol]protodec.AggPool - - procFilter *tracing.ProcessFilter + trace *NetTrace + aggPool map[protodec.L7Protocol]protodec.AggPool tags map[string]string k8sInfo *k8sinfo.K8sNetInfo @@ -183,8 +189,6 @@ type ConnWatcher struct { tracePostURL string enableTrace bool - enabledProto map[protodec.L7Protocol]struct{} - sync.Mutex } @@ -192,23 +196,7 @@ func (watcher *ConnWatcher) handle(tn int64, uniID CUniID, netdata *comm.NetwrkD watcher.Lock() defer watcher.Unlock() - pid := int(netdata.Conn.Pid) - - nettracer := watcher.netracer - if nettracer == nil { - return - } - - if watcher.enableTrace && watcher.procFilter != nil { - nettracer.allowESPan = false - if v, ok := watcher.procFilter.GetProcInfo(pid); ok { - nettracer.allowESPan = v.AllowTrace - netdata.Conn.ProcessName = v.Name - netdata.Conn.ServiceName = v.Service - } - } - - nettracer.StreamHandle(tn, uniID, netdata, watcher.aggPool, watcher.enableTrace, watcher.enabledProto) + watcher.trace.StreamHandle(tn, uniID, netdata, watcher.aggPool) } func (watcher *ConnWatcher) start(ctx context.Context) { @@ -221,32 +209,32 @@ func (watcher *ConnWatcher) start(ctx context.Context) { case <-tickerCheck.C: watcher.Lock() groupTime := time.Now().UnixNano() - for uniID, pipe := range watcher.netracer.ConnMap { + for uniID, pipe := range watcher.trace.ConnMap { if groupTime-pipe.lastTime > int64(time.Minute)*3 { - delete(watcher.netracer.ConnMap, uniID) - watcher.netracer.ConnAndClosedDelCount[0]++ + delete(watcher.trace.ConnMap, uniID) + watcher.trace.ConnAndClosedDelCount[0]++ } } - for uniID, pipe := range watcher.netracer.ConnMapClosed { + for uniID, pipe := range watcher.trace.ConnMapClosed { if groupTime-pipe.lastTime > int64(time.Minute) { - delete(watcher.netracer.ConnMapClosed, uniID) - watcher.netracer.ConnAndClosedDelCount[1]++ + delete(watcher.trace.ConnMapClosed, uniID) + watcher.trace.ConnAndClosedDelCount[1]++ } } - if watcher.netracer.ConnAndClosedDelCount[0] > 160_000 { - watcher.netracer.ConnAndClosedDelCount[0] = 0 + if watcher.trace.ConnAndClosedDelCount[0] > 160_000 { + watcher.trace.ConnAndClosedDelCount[0] = 0 connMap := make(map[CUniID]*FlowPipe) - for k, v := range watcher.netracer.ConnMap { + for k, v := range watcher.trace.ConnMap { connMap[k] = v } } - if watcher.netracer.ConnAndClosedDelCount[1] > 160_000 { - watcher.netracer.ConnAndClosedDelCount[1] = 0 + if watcher.trace.ConnAndClosedDelCount[1] > 160_000 { + watcher.trace.ConnAndClosedDelCount[1] = 0 connMap := make(map[CUniID]*FlowPipe) - for k, v := range watcher.netracer.ConnMapClosed { + for k, v := range watcher.trace.ConnMapClosed { connMap[k] = v } } @@ -254,13 +242,13 @@ func (watcher *ConnWatcher) start(ctx context.Context) { watcher.Unlock() case <-tickerClean.C: watcher.Lock() - if tracer := watcher.netracer; tracer != nil { + if tracer := watcher.trace; tracer != nil { tracer.threadInnerID.Cleanup() } watcher.Unlock() case <-ticker.C: watcher.Lock() - if tracer := watcher.netracer; tracer != nil { + if tracer := watcher.trace; tracer != nil { for _, pt := range tracer.ptsPrv { setInnerID(pt, &tracer.threadInnerID) } @@ -313,18 +301,19 @@ func setInnerID(pt *point.Point, threadInnerID *comm.ThreadTrace) { func newConnWatcher(ctx context.Context, cfg *connWatcherConfig) *ConnWatcher { p := &ConnWatcher{ - netracer: &NetTrace{ - ConnMap: make(map[CUniID]*FlowPipe), - ConnMapClosed: make(map[CUniID]*FlowPipe), + trace: &NetTrace{ + ConnMap: make(map[CUniID]*FlowPipe), + ConnMapClosed: make(map[CUniID]*FlowPipe), + protocolFilter: cfg.protocolFilter, + enabledProto: cfg.protos, + allowESPan: cfg.enableTrace, }, aggPool: cfg.aggPool, - procFilter: cfg.procFilter, tags: cfg.tags, k8sInfo: cfg.k8sNetInfo, metricPostURL: cfg.datakitPostURL, tracePostURL: cfg.tracePostURL, enableTrace: cfg.enableTrace, - enabledProto: cfg.protos, } go p.start(ctx) return p @@ -338,9 +327,13 @@ type Tracer struct { tags map[string]string k8sInfo *k8sinfo.K8sNetInfo + processFilter *sysmonitor.ProcessFilter + protocolFilter *protoKernelFilter + + selfPid int + metricPostURL string tracePostURL string - enableTrace bool } func (tracer *Tracer) Start(ctx context.Context, interval time.Duration) { @@ -363,59 +356,109 @@ func (tracer *Tracer) Start(ctx context.Context, interval time.Duration) { } } +const ( + //nolint:gosec + eventsHdrSize = int(unsafe.Sizeof(CNetEvents{})) - + int(unsafe.Sizeof(CNetEvents{}.payload)) + //nolint:gosec + commHdrSize = int(unsafe.Sizeof(CNetEventComm{})) +) + func (tracer *Tracer) PerfEventHandle(cpu int, data []byte, perfmap *manager.PerfMap, manager *manager.Manager, ) { events := (*CNetEvents)(unsafe.Pointer(&data[0])) //nolint:gosec - eventsNum := int(events.pos.num) - // eventsLen := int(events.pos.len) - - hdrSize := unsafe.Sizeof(CNetEventComm{}) // nolint:gosec - - pos := int(unsafe.Sizeof(events.pos)) // nolint:gosec + eventsNum := int(events.rec.num) + pos := eventsHdrSize // nolint:gosec groupTime := time.Now().UnixNano() for i := 0; i < eventsNum; i++ { - commHdr := *(*CNetEventComm)(unsafe.Pointer(&data[pos])) //nolint:gosec - pos += int(hdrSize) - - netdata := getNetwrkData(int(commHdr.len)) - - readMeta(&commHdr, &netdata.Conn) - - if int(commHdr.len) > 0 { - v := unsafe.Slice((*byte)(unsafe.Pointer(&data[pos])), int(commHdr.len)) //nolint:gosec + curHdrPos := pos + eventHdr := *(*CNetEventComm)(unsafe.Pointer(&data[curHdrPos])) //nolint:gosec + pos += commHdrSize + curPayloadPos := pos + pos += int(eventHdr.rec.bytes) + + netdata := getNetwrkData(int(eventHdr.rec.bytes)) + readMeta(&eventHdr, &netdata.Conn) + if int(eventHdr.rec.bytes) > 0 { + v := unsafe.Slice((*byte)(unsafe.Pointer(&data[curPayloadPos])), int(eventHdr.rec.bytes)) //nolint:gosec netdata.Payload = append(netdata.Payload, v...) - - pos += int(commHdr.len) } + // pos must be calculated before the filter is run pid := int(netdata.Conn.Pid) - if pid == selfPid { - return + if pid == tracer.selfPid { + continue } - netdata.Fn = comm.FnID(commHdr.meta.func_id) - - netdata.CaptureSize = int(commHdr.len) - netdata.FnCallSize = int(commHdr.meta.original_size) - netdata.TCPSeq = uint32(commHdr.meta.tcp_seq) - netdata.Thread = [2]int32{int32(commHdr.meta.tid_utid >> 32), (int32(commHdr.meta.tid_utid))} - netdata.TS = uint64(commHdr.meta.ts) - netdata.TSTail = uint64(commHdr.meta.ts_tail) - netdata.Index = uint64(commHdr.meta.sk_inf.index) - - // log.Info(netdata.String()) + if tracer.processFilter != nil { + if v, ok := tracer.processFilter.GetProcInfo(pid); ok { + if !v.Filtered() { + continue + } + netdata.Conn.ProcessName = v.Name() + netdata.Conn.ServiceName = v.ServiceName() + } else { + tracer.processFilter.AsyncTryAdd(pid) + } + } - tracer.connWatcher.handle(groupTime, CUniID(commHdr.meta.sk_inf.uni_id), netdata) + netdata.Fn = comm.FnID(eventHdr.meta.func_id) + netdata.SockPtr = uint64(eventHdr.meta.sk_inf.skptr) + netdata.CaptureSize = int(eventHdr.rec.bytes) + netdata.FnCallSize = int(eventHdr.meta.original_size) + netdata.TCPSeq = uint32(eventHdr.meta.tcp_seq) + netdata.Thread = [2]int32{int32(eventHdr.meta.tid_utid >> 32), (int32(eventHdr.meta.tid_utid))} + netdata.TS = uint64(eventHdr.meta.ts) + netdata.TSTail = uint64(eventHdr.meta.ts_tail) + netdata.Index = uint64(eventHdr.meta.sk_inf.index) + + tracer.connWatcher.handle(groupTime, CUniID(eventHdr.meta.sk_inf.uni_id), netdata) } } type connWatcherConfig struct { apiTracerConfig - aggPool map[protodec.L7Protocol]protodec.AggPool + aggPool map[protodec.L7Protocol]protodec.AggPool + protocolFilter *protoKernelFilter +} + +type protoKernelFilter struct { + fn chan func(uint64) + keySk chan uint64 + + firstRun int64 +} + +func (f *protoKernelFilter) filter(key uint64) { + f.keySk <- key +} + +func (f *protoKernelFilter) setFn(fn func(uint64)) { + f.fn <- fn +} + +func (f *protoKernelFilter) run(ctx context.Context) { + if v := atomic.SwapInt64(&f.firstRun, 1); v != 0 { + return + } + + var kFilter func(uint64) + for { + select { + case fn := <-f.fn: + kFilter = fn + case k := <-f.keySk: + if kFilter != nil { + kFilter(k) + } + case <-ctx.Done(): + return + } + } } func newTracer(ctx context.Context, cfg *apiTracerConfig) *Tracer { @@ -425,17 +468,26 @@ func newTracer(ctx context.Context, cfg *apiTracerConfig) *Tracer { aggP := protodec.NewProtoAggregators() + protoFilter := &protoKernelFilter{ + fn: make(chan func(uint64)), + keySk: make(chan uint64, 16), + } + go protoFilter.run(ctx) + return &Tracer{ connWatcher: newConnWatcher(ctx, &connWatcherConfig{ apiTracerConfig: *cfg, aggPool: aggP, + protocolFilter: protoFilter, }), - aggPool: aggP, - tags: cfg.tags, - k8sInfo: cfg.k8sNetInfo, - metricPostURL: cfg.datakitPostURL, - tracePostURL: cfg.tracePostURL, - enableTrace: cfg.enableTrace, + aggPool: aggP, + tags: cfg.tags, + k8sInfo: cfg.k8sNetInfo, + selfPid: cfg.selfPid, + processFilter: cfg.procFilter, + protocolFilter: protoFilter, + metricPostURL: cfg.datakitPostURL, + tracePostURL: cfg.tracePostURL, } } diff --git a/internal/plugins/externals/ebpf/internal/netflow/netflow_linux.go b/internal/plugins/externals/ebpf/internal/netflow/netflow_linux.go index e506badbbc..16010789d0 100644 --- a/internal/plugins/externals/ebpf/internal/netflow/netflow_linux.go +++ b/internal/plugins/externals/ebpf/internal/netflow/netflow_linux.go @@ -13,7 +13,7 @@ import ( "github.com/cilium/ebpf" "github.com/shirou/gopsutil/host" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/exporter" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/tracing" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/sysmonitor" "golang.org/x/net/context" ) @@ -31,14 +31,20 @@ const ( transportUDP = "udp" ) +var enableUDP bool + +func SetEnableUDP(on bool) { + enableUDP = on +} + type NetFlowTracer struct { connStatsRecord *ConnStatsRecord resultCh chan *ConnResult closedEventCh chan *ConncetionClosedInfo - procFilter *tracing.ProcessFilter + procFilter *sysmonitor.ProcessFilter } -func NewNetFlowTracer(procFilter *tracing.ProcessFilter) *NetFlowTracer { +func NewNetFlowTracer(procFilter *sysmonitor.ProcessFilter) *NetFlowTracer { return &NetFlowTracer{ connStatsRecord: newConnStatsRecord(), resultCh: make(chan *ConnResult, 4), @@ -99,7 +105,7 @@ func (tracer *NetFlowTracer) ClosedEventHandler(cpu int, data []byte, if tracer.procFilter != nil { if v, ok := tracer.procFilter.GetProcInfo(int(event.Info.Pid)); ok { - event.Info.ProcessName = v.Name + event.Info.ProcessName = v.Name() } } @@ -189,7 +195,7 @@ func (tracer *NetFlowTracer) connCollectHanllder(ctx context.Context, connStatsM } if tracer.procFilter != nil { if v, ok := tracer.procFilter.GetProcInfo(int(connInfoC.pid)); ok { - connInfo.ProcessName = v.Name + connInfo.ProcessName = v.Name() } } diff --git a/internal/plugins/externals/ebpf/internal/netflow/netflow_test.go b/internal/plugins/externals/ebpf/internal/netflow/netflow_test.go index 3212dc4870..4e1cf64b10 100644 --- a/internal/plugins/externals/ebpf/internal/netflow/netflow_test.go +++ b/internal/plugins/externals/ebpf/internal/netflow/netflow_test.go @@ -367,6 +367,7 @@ func TestConvConn2M(t *testing.T) { }, } + SetEnableUDP(true) for _, v := range cases { connR.result[v.conn] = v.connStats pt, err := ConvConn2M(v.conn, v.connStats, srcNameM, v.tags, diff --git a/internal/plugins/externals/ebpf/internal/netflow/utils.go b/internal/plugins/externals/ebpf/internal/netflow/utils.go index dcb6ff3058..8761cea4d9 100644 --- a/internal/plugins/externals/ebpf/internal/netflow/utils.go +++ b/internal/plugins/externals/ebpf/internal/netflow/utils.go @@ -485,6 +485,9 @@ func U32BEToIP(addr [4]uint32, isIPv6 bool) net.IP { // 4. Filter connections with port 0 or ip address :: or 0.0.0.0; // Need to filter, the function returns False. func ConnNotNeedToFilter(conn *ConnectionInfo, connStats *ConnFullStats) bool { + if !enableUDP && !ConnProtocolIsTCP(conn.Meta) { + return false + } if (conn.Saddr[0]|conn.Saddr[1]|conn.Saddr[2]|conn.Saddr[3]) == 0 || (conn.Daddr[0]|conn.Daddr[1]|conn.Daddr[2]|conn.Daddr[3]) == 0 || conn.Sport == 0 || conn.Dport == 0 { diff --git a/internal/plugins/externals/ebpf/internal/sysmonitor/process_filter.go b/internal/plugins/externals/ebpf/internal/sysmonitor/process_filter.go new file mode 100644 index 0000000000..9774aa1382 --- /dev/null +++ b/internal/plugins/externals/ebpf/internal/sysmonitor/process_filter.go @@ -0,0 +1,322 @@ +//go:build linux +// +build linux + +package sysmonitor + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/golang/groupcache/lru" + pr "github.com/shirou/gopsutil/v3/process" +) + +type ProcessFilter struct { + allowTrace bool + nameBlacklist map[string]struct{} + nameWhitelist map[string]struct{} + envBlacklist map[string]struct{} + envWhitelist map[string]struct{} + serviceEnv []string + + selfPid int + asynCh chan int + + procInfo map[int]*ProcInfo + procDel *lru.Cache + + kernerFilter kernelProcFilter + sync.RWMutex +} + +type ProcInfo struct { + pid int + name string + binPath string + serviceName string + + keep bool + allowTrace bool + createTS int64 +} + +func (p *ProcInfo) ServiceName() string { + return p.serviceName +} + +func (p *ProcInfo) Name() string { + return p.name +} + +func (p *ProcInfo) Filtered() bool { + return p.keep +} + +func (p *ProcInfo) TraceFilterd() bool { + return p.allowTrace +} + +type FilterOpt func(p *ProcessFilter) + +func convArrToMAP(li []string) map[string]struct{} { + m := map[string]struct{}{} + for _, v := range li { + m[v] = struct{}{} + } + return m +} + +func WithTracing(on bool) FilterOpt { + return func(p *ProcessFilter) { + p.allowTrace = on + } +} + +func WithSelfPid(pid int) FilterOpt { + return func(p *ProcessFilter) { + p.selfPid = pid + } +} + +func WithEnvWhitelist(li []string) FilterOpt { + return func(p *ProcessFilter) { + p.envWhitelist = convArrToMAP(li) + } +} + +func WithEnvBlacklist(li []string) FilterOpt { + return func(p *ProcessFilter) { + p.envBlacklist = convArrToMAP(li) + } +} + +func WithNameWhitelist(li []string) FilterOpt { + return func(p *ProcessFilter) { + p.nameWhitelist = convArrToMAP(li) + } +} + +func WithNameBlacklist(li []string) FilterOpt { + return func(p *ProcessFilter) { + p.nameBlacklist = convArrToMAP(li) + } +} + +func WithEnvService(li []string) FilterOpt { + return func(p *ProcessFilter) { + var l []string + l = append(l, li...) + p.serviceEnv = l + } +} + +func NewProcessFilter(ctx context.Context, opts ...FilterOpt) *ProcessFilter { + filter := &ProcessFilter{ + procInfo: map[int]*ProcInfo{}, + procDel: lru.New(2048), + } + for _, opt := range opts { + if opt != nil { + opt(filter) + } + } + filter.asynCh = make(chan int, 64) + go filter.asynTryAddLoop(ctx) + + return filter +} + +func (p *ProcessFilter) setKernelProcFilter(fn kernelProcFilter) { + p.kernerFilter = fn +} + +func (p *ProcessFilter) asynTryAddLoop(ctx context.Context) { + pidSet := map[int]struct{}{} + tk := time.NewTicker(time.Second) + defer tk.Stop() + for { + select { + case pid := <-p.asynCh: + pidSet[pid] = struct{}{} + case <-tk.C: + if len(pidSet) > 0 { + for pid := range pidSet { + if _, err := p.TryAdd(pid); err != nil { + log.Debug(err) + } + } + pidSet = map[int]struct{}{} + } + case <-ctx.Done(): + return + } + } +} + +func (p *ProcessFilter) tryAdd(pid, ppid int, createTS int64, procName, binPath string, env map[string]string) *ProcInfo { + keep := true + allowTraceAttach := p.allowTrace + + if pid == p.selfPid || strings.HasPrefix(procName, "datakit") { + keep = false + goto notKeep + } + + if len(p.nameBlacklist) > 0 { + if _, ok := p.nameBlacklist[procName]; ok { + keep = false + goto notKeep + } + } + if len(p.nameWhitelist) > 0 { + if _, ok := p.nameWhitelist[procName]; !ok { + keep = false + goto notKeep + } + } + + if p.allowTrace { + if len(p.envBlacklist) > 0 { + for envName := range p.envBlacklist { + if _, ok := env[envName]; ok { + keep = false + goto notKeep + } + } + } + if len(p.envWhitelist) > 0 { + k := false + for envName := range p.envWhitelist { + if _, ok := env[envName]; ok { + k = true + break + } + } + if keep { + keep = k + } + } + } + +notKeep: + var serviceName string + for _, envName := range p.serviceEnv { + if v, ok := env[envName]; ok { + if v != "" { + serviceName = v + break + } + } + } + + if serviceName == "" { + serviceName = procName + } + + if !keep && p.kernerFilter != nil { + p.kernerFilter(pid) + } + + if allowTraceAttach && !keep { + allowTraceAttach = false + } + + if binPath == "" { + allowTraceAttach = false + } + + p.Lock() + defer p.Unlock() + if !keep && ppid != 0 { + // inherits the state of the parent process + if v, ok := p.procInfo[ppid]; ok && v.name == procName { + keep = v.keep + } + } + + inf := &ProcInfo{ + pid: pid, + name: procName, + binPath: binPath, + serviceName: serviceName, + keep: keep, + allowTrace: allowTraceAttach, + createTS: createTS, + } + p.procInfo[pid] = inf + return inf +} + +func (p *ProcessFilter) Delete(pid int) { + p.Lock() + defer p.Unlock() + + if v, ok := p.procInfo[pid]; ok { + delete(p.procInfo, pid) + p.procDel.Add(pid, v) + } +} + +func (p *ProcessFilter) GetProcInfo(pid int) (*ProcInfo, bool) { + p.RLock() + defer p.RUnlock() + + if v, ok := p.procInfo[pid]; ok && v != nil { + return v, true + } + + if v, ok := p.procDel.Get(pid); ok { + if v, ok := v.(*ProcInfo); ok && v != nil { + return v, true + } + } + + return nil, false +} + +func (p *ProcessFilter) TryAdd(pid int) (*ProcInfo, error) { + proc, err := pr.NewProcess(int32(pid)) + if err != nil { + return nil, err + } + pname, err := proc.Name() + if err != nil { + return nil, err + } + + var ppid int + if v, _ := proc.Ppid(); v > 0 { + ppid = int(v) + } + + env, _ := proc.Environ() + envMap := map[string]string{} + for _, v := range env { + s := strings.Index(v, "=") + if s > 0 { + envMap[v[:s]] = v[s+1:] + } + } + + exePath, err := proc.Exe() + if err != nil { + return nil, err + } + + exeResolvePath := resolveBinPath(pid, exePath) + if exeResolvePath == "" { + log.Debugf("process: %s, pid: %d, path: %s, resolvepath: %s", + pname, pid, exePath, exeResolvePath) + } else { + exeResolvePath = HostRoot(exeResolvePath) + } + + ts, _ := proc.CreateTime() + return p.tryAdd(pid, ppid, ts, pname, exeResolvePath, envMap), nil +} + +func (p *ProcessFilter) AsyncTryAdd(pid int) { + p.asynCh <- pid +} diff --git a/internal/plugins/externals/ebpf/internal/sysmonitor/process_sched.go b/internal/plugins/externals/ebpf/internal/sysmonitor/process_sched.go index af43213c19..7c93e22deb 100644 --- a/internal/plugins/externals/ebpf/internal/sysmonitor/process_sched.go +++ b/internal/plugins/externals/ebpf/internal/sysmonitor/process_sched.go @@ -12,7 +12,6 @@ import ( "math" "os" "runtime" - "strings" "sync" "time" "unsafe" @@ -22,7 +21,6 @@ import ( "github.com/golang/groupcache/lru" pr "github.com/shirou/gopsutil/v3/process" dkebpf "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/c" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/internal/tracing" "golang.org/x/sys/unix" ) @@ -31,6 +29,16 @@ import ( import "C" type ProcessSchedC C.struct_rec_process_sched_status + +type ProcFilterInfoC C.struct_proc_filter_info + +func (sc *ProcessSchedC) String() string { + comm := *(*[16]byte)(unsafe.Pointer(&sc.comm[0])) + return fmt.Sprintf("st %d, prv %d, nxt %d, name `%s`", sc.status, + sc.prv_pid, sc.nxt_pid, + string(bytes.TrimRight(comm[:], "\x00"))) +} + type ProcInjectC C.struct_proc_inject type ProcessSchedWithFNameC C.struct_rec_process_sched_status_with_filename @@ -45,6 +53,8 @@ const ( const ( bmapProcInject = "bmap_procinject" + bmapProcFilter = "bmap_proc_filter" + bmapTid2Goid = "bmap_tid2goid" ) type ProcessInfo struct { @@ -138,7 +148,7 @@ var execGoFnName = []string{ "uprobe__go_runtime_execute", } -func NewProcessSchedTracer(filter *tracing.ProcessFilter) (*SchedTracer, error) { +func NewProcessSchedTracer(filter *ProcessFilter) (*SchedTracer, error) { tracer := SchedTracer{ processFilter: filter, } @@ -155,45 +165,68 @@ func NewProcessSchedTracer(filter *tracing.ProcessFilter) (*SchedTracer, error) type SchedTracer struct { Manager *manager.Manager - processFilter *tracing.ProcessFilter + processFilter *ProcessFilter attachInfo ProcessAttachInfo + selfPid int + sync.Mutex } -func (tracer *SchedTracer) GetGOSchedMap() (map[string]*ebpf.Map, bool) { +func (tracer *SchedTracer) GetSchedMap() (map[string]*ebpf.Map, bool) { if tracer.Manager == nil { return nil, false } bmaps := map[string]*ebpf.Map{} - // if m, ok, err := tracer.Manager.GetMap("bmap_goid2tid"); !ok || err != nil { - // return nil, false - // } else { - // bmaps["bmap_goid2tid"] = m - // } - if m, ok, err := tracer.Manager.GetMap("bmap_tid2goid"); !ok || err != nil { + if m, ok, err := tracer.Manager.GetMap(bmapTid2Goid); !ok || err != nil { + return nil, false + } else { + bmaps[bmapTid2Goid] = m + } + + if m, ok, err := tracer.Manager.GetMap(bmapProcFilter); !ok || err != nil { return nil, false } else { - bmaps["bmap_tid2goid"] = m + bmaps[bmapProcFilter] = m } + return bmaps, true } +type kernelProcFilter func(int) + func (tracer *SchedTracer) Start(ctx context.Context) error { + tracer.selfPid = os.Getpid() err := tracer.Manager.Start() if err != nil { return err } + fn := func() kernelProcFilter { + if mp, ok, err := tracer.Manager.GetMap(bmapProcFilter); err == nil && ok { + return func(pid int) { + k := uint32(pid) + v := ProcFilterInfoC{ + disable: 1, + } + if err := mp.Update(&k, &v, ebpf.UpdateAny); err != nil { + log.Info(err) + } + } + } + return func(pid int) {} + }() + + tracer.processFilter.setKernelProcFilter(fn) + pses, err := pr.Processes() if err != nil { return nil } - for _, p := range pses { - if err := tracer.goProbeRegister(p); err != nil { + if err := tracer.attachProcess(p); err != nil { log.Warn(err) } } @@ -266,15 +299,12 @@ func (tracer *SchedTracer) ProcessSchedHandler(cpu int, data []byte, switch evetC.status { case SchedFork: case SchedExec: - // eventC := (*ProcessSchedWithFNameC)(unsafe.Pointer(&data[0])) - // pid := eventC.sched_status.nxt_pid - p, err := pr.NewProcess(int32(evetC.nxt_pid)) if err != nil { break } - if err := tracer.goProbeRegister(p); err != nil { + if err := tracer.attachProcess(p); err != nil { log.Debug(err) } case SchedExit: @@ -286,60 +316,36 @@ func (tracer *SchedTracer) ProcessSchedHandler(cpu int, data []byte, } } -func (tracer *SchedTracer) goProbeRegister(p *pr.Process) error { - pname, err := p.Name() - if err != nil { - return err - } - env, err := p.Environ() - if err != nil { - log.Debug(err) - return nil +func (tracer *SchedTracer) attachProcess(p *pr.Process) error { + if p.Pid < 0 { + return fmt.Errorf("pid < 0") } - envMap := map[string]string{} - for _, v := range env { - s := strings.Index(v, "=") - if s > 0 { - envMap[v[:s]] = v[s+1:] - } - } - exePath, err := p.Exe() + procInfo, err := tracer.processFilter.TryAdd(int(p.Pid)) if err != nil { - log.Debug(err) - return nil + return err } - - pid := p.Pid - exeResolvePath := resolveBinPath(int(pid), exePath) - if exeResolvePath == "" { + if !procInfo.TraceFilterd() || procInfo.binPath == "" { return nil } - exeResolvePath = HostRoot(exeResolvePath) - - if tracer.processFilter != nil { - if !tracer.processFilter.Filter(int(pid), pname, exePath, envMap) { - return nil - } - } - // check file modified - exeFstat, err := os.Stat(exeResolvePath) + binPath := procInfo.binPath + exeFstat, err := os.Stat(binPath) if err != nil { return err } exeModTime := exeFstat.ModTime() - if tmod, ok := tracer.attachInfo.GetCannotAndAttachInfo(exeResolvePath); ok { + if tmod, ok := tracer.attachInfo.GetCannotAndAttachInfo(binPath); ok { if tmod.Equal(exeFstat.ModTime()) { return nil } } var goVer = [2]int{} - inf, err := buildinfo.ReadFile(exeResolvePath) + inf, err := buildinfo.ReadFile(binPath) if err != nil { log.Debug(err) // if the go version is greater than 1.13+, this function can get the go version @@ -352,14 +358,14 @@ func (tracer *SchedTracer) goProbeRegister(p *pr.Process) error { var symbolAddr uint64 = 0 - elfFile, err := elf.Open(exeResolvePath) + elfFile, err := elf.Open(binPath) if err != nil { - return fmt.Errorf("nnable to open elf file %s: %w", exeResolvePath, err) + return fmt.Errorf("nnable to open elf file %s: %w", binPath, err) } if syms, err := FindSymbol(elfFile, "runtime.execute"); err == nil { if len(syms) != 1 { - log.Debugf("find symbol runtime.execute, exe %s, count %d", exeResolvePath, len(syms)) + log.Debugf("find symbol runtime.execute, exe %s, count %d", binPath, len(syms)) return nil } symbolAddr = syms[0].Value @@ -367,7 +373,7 @@ func (tracer *SchedTracer) goProbeRegister(p *pr.Process) error { sym, err := getGoUprobeSymbolFromPCLN(elfFile, goVer[1] >= 20, "runtime.execute") if err != nil { log.Debug(err) - tracer.attachInfo.AddCannotAttach(exeResolvePath, exeModTime) + tracer.attachInfo.AddCannotAttach(binPath, exeModTime) return nil } symbolAddr = sym.Start @@ -408,20 +414,20 @@ func (tracer *SchedTracer) goProbeRegister(p *pr.Process) error { } } - pidU32 := (uint32)(pid) + pidU32 := (uint32)(procInfo.pid) if err := emap.Update(unsafe.Pointer(&pidU32), unsafe.Pointer(&val), ebpf.UpdateAny); err != nil { return err } var uid string - if tmod, ok := tracer.attachInfo.GetAttachInfo(exeResolvePath); ok { + if tmod, ok := tracer.attachInfo.GetAttachInfo(binPath); ok { if tmod.Equal(exeModTime) { return nil } - uid = ShortID(exeResolvePath) + uid = ShortID(binPath) - log.Info("DetachHook: file modfied: ", exeResolvePath, " ShortID: ", uid) + log.Info("DetachHook: file modfied: ", binPath, " ShortID: ", uid) for _, fnName := range execGoFnName { p, ok := tracer.Manager.GetProbe(manager.ProbeIdentificationPair{ UID: uid, @@ -445,13 +451,13 @@ func (tracer *SchedTracer) goProbeRegister(p *pr.Process) error { } } - tracer.attachInfo.AddAtach(exeResolvePath, exeModTime) + tracer.attachInfo.AddAtach(binPath, exeModTime) if uid == "" { - uid = ShortID(exeResolvePath) + uid = ShortID(binPath) } - log.Info("AddHook: ", exeResolvePath, " ShortID: ", uid) + log.Info("AddHook: ", binPath, " ShortID: ", uid) for _, fnName := range execGoFnName { if err := tracer.Manager.AddHook("", &manager.Probe{ ProbeIdentificationPair: manager.ProbeIdentificationPair{ @@ -459,7 +465,7 @@ func (tracer *SchedTracer) goProbeRegister(p *pr.Process) error { EBPFFuncName: fnName, }, UprobeOffset: symbolAddr, - BinaryPath: exeResolvePath, + BinaryPath: binPath, }); err != nil { log.Warn(err) } diff --git a/internal/plugins/externals/ebpf/internal/sysmonitor/utils.go b/internal/plugins/externals/ebpf/internal/sysmonitor/utils.go index 09d040a4df..5c10858fd6 100644 --- a/internal/plugins/externals/ebpf/internal/sysmonitor/utils.go +++ b/internal/plugins/externals/ebpf/internal/sysmonitor/utils.go @@ -7,9 +7,11 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "fmt" "os" "path/filepath" "strconv" + "syscall" ddfp "github.com/DataDog/gopsutil/process/filepath" @@ -22,6 +24,11 @@ func SetLogger(nl *logger.Logger) { log = nl } +type fileKey struct { + dev uint64 + ino uint64 +} + func diff(old, cur map[string]struct{}) (map[string]struct{}, map[string]struct{}) { add := map[string]struct{}{} del := map[string]struct{}{} @@ -78,6 +85,22 @@ func GetEnv(key string, dfault string, combineWith ...string) string { } } +func FileInfo(fp string) (fileKey, error) { + if f, err := os.Stat(fp); err == nil { + v := f.Sys() + if sysF, ok := v.(*syscall.Stat_t); ok { + return fileKey{ + dev: sysF.Dev, + ino: sysF.Ino, + }, nil + } + } else { + return fileKey{}, err + } + + return fileKey{}, fmt.Errorf("get file info failed") +} + // HostProc returns the value of the host proc path. // Context from vendor/github.com/shirou/gopsutil/v3/internal/common/common.go:common.HostProc. func HostProc(combineWith ...string) string { diff --git a/internal/plugins/externals/ebpf/internal/tracing/utils.go b/internal/plugins/externals/ebpf/internal/tracing/utils.go index f632281389..9bf8fe9b60 100644 --- a/internal/plugins/externals/ebpf/internal/tracing/utils.go +++ b/internal/plugins/externals/ebpf/internal/tracing/utils.go @@ -7,9 +7,7 @@ import ( "encoding/hex" "strconv" "strings" - "sync" - "github.com/golang/groupcache/lru" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/externals/ebpf/pkg/spanid" ) @@ -168,140 +166,3 @@ func HexSpanid2ID64(s string) spanid.ID64 { return 0 } } - -type ProcessFilter struct { - SvcAssignEnv []string - RuleEnv map[string]bool - - RuleProcessName map[string]bool - RulePath map[string]bool - - Pid2ProcInfo map[int]*ProcSvcInfo - PidDeleted *lru.Cache - AnyProcess bool - Disable bool - sync.RWMutex -} - -type ProcSvcInfo struct { - Name string - Service string - AllowTrace bool -} - -func NewProcessFilter(svcAssignEnv []string, ruleEnv map[string]bool, ruleProcessName map[string]bool, - anyProcess, disable bool, -) *ProcessFilter { - return &ProcessFilter{ - SvcAssignEnv: svcAssignEnv, - RuleEnv: ruleEnv, - RuleProcessName: ruleProcessName, - - RulePath: map[string]bool{}, - - Pid2ProcInfo: map[int]*ProcSvcInfo{}, - PidDeleted: lru.New(1024), - - AnyProcess: anyProcess, - Disable: disable, - } -} - -func (p *ProcessFilter) Filter(pid int, name, path string, env map[string]string) bool { - p.Lock() - defer p.Unlock() - - var filtered bool - for i := 0; i < 1; i++ { - for k, allow := range p.RuleEnv { - if _, ok := env[k]; ok { - if allow { - filtered = true - } - break - } - } - - if allow, ok := p.RuleProcessName[name]; ok && allow { - filtered = true - break - } - - if _, ok := p.RulePath[path]; ok { - filtered = true - break - } - - if p.AnyProcess { - filtered = true - break - } - } - - if allow, ok := p.RuleProcessName[name]; ok && !allow { - filtered = false - } - - for k, allow := range p.RuleEnv { - if _, ok := env[k]; ok && !allow { - filtered = false - } - } - - if p.Disable { - filtered = false - } - - pinfo := &ProcSvcInfo{ - Name: name, - Service: name, - AllowTrace: filtered, - } - - if len(env) > 0 && len(p.SvcAssignEnv) > 0 { - for _, k := range p.SvcAssignEnv { - if v, ok := env[k]; ok { - pinfo.Service = v - break - } - } - } - - p.Pid2ProcInfo[pid] = pinfo - - return filtered -} - -func (p *ProcessFilter) Delete(pid int) { - p.Lock() - defer p.Unlock() - - if v, ok := p.Pid2ProcInfo[pid]; ok { - delete(p.Pid2ProcInfo, pid) - p.PidDeleted.Add(pid, v) - } -} - -func (p *ProcessFilter) GetProcInfo(pid int) (*ProcSvcInfo, bool) { - p.RLock() - if len(p.Pid2ProcInfo) > 0 { - if v, ok := p.Pid2ProcInfo[pid]; ok && v != nil { - p.RUnlock() - return v, true - } - } - p.RUnlock() - - // search deleted proc info from lru map - p.Lock() - defer p.Unlock() - if p.PidDeleted.Len() > 0 { - if v, ok := p.PidDeleted.Get(pid); ok { - if v, ok := v.(*ProcSvcInfo); ok && v != nil { - return v, ok - } - } - } - - return nil, false -}