diff --git a/agent/benches/labeler.rs b/agent/benches/labeler.rs index a59cd1c1bdd..27ebd5056a9 100644 --- a/agent/benches/labeler.rs +++ b/agent/benches/labeler.rs @@ -139,7 +139,7 @@ fn bench_labeler(c: &mut Criterion) { fn bench_policy(c: &mut Criterion) { fn generate_table() -> FirstPath { - let mut first = FirstPath::new(1, 8, 1 << 16, false); + let mut first = FirstPath::new(1, 8, 1 << 16, false, false); let acl = Acl::new( 1, vec![10], diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index b07bd798255..e75c5c95061 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -17,12 +17,12 @@ use std::borrow::Cow; use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; -use std::fmt; use std::hash::{Hash, Hasher}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use std::{fmt, u32}; use arc_swap::{access::Map, ArcSwap}; use base64::{prelude::BASE64_STANDARD, Engine}; @@ -43,6 +43,7 @@ use sysinfo::SystemExt; use sysinfo::{CpuRefreshKind, RefreshKind, System}; use tokio::runtime::Runtime; +use super::config::ProcessorsFlowLogTunning; #[cfg(any(target_os = "linux", target_os = "android"))] use super::config::{Ebpf, EbpfFileIoEvent, ProcessMatcher, SymbolTable}; use super::{ @@ -441,6 +442,7 @@ pub struct FlowConfig { pub cloud_gateway_traffic: bool, pub collector_enabled: bool, pub l7_log_tap_types: [bool; 256], + pub capture_mode: PacketCaptureType, pub capacity: u32, pub hash_slots: u32, @@ -492,6 +494,7 @@ impl From<(&UserConfig, &DynamicConfig)> for FlowConfig { FlowConfig { agent_id: dynamic_config.agent_id() as u16, agent_type: conf.global.common.agent_type, + capture_mode: conf.inputs.cbpf.common.capture_mode, cloud_gateway_traffic: conf .inputs .cbpf @@ -675,6 +678,14 @@ impl FlowConfig { self.packet_segmentation_reassembly.contains(&src_port) || self.packet_segmentation_reassembly.contains(&dst_port) } + + pub fn flow_capacity(&self) -> u32 { + let default_capacity = ProcessorsFlowLogTunning::default().concurrent_flow_limit; + match self.capture_mode { + PacketCaptureType::Analyzer if self.capacity <= default_capacity => u32::MAX, + _ => self.capacity, + } + } } impl fmt::Debug for FlowConfig { @@ -694,6 +705,7 @@ impl fmt::Debug for FlowConfig { .collect::>(), ) .field("capacity", &self.capacity) + .field("flow_capacity", &self.flow_capacity()) .field("hash_slots", &self.hash_slots) .field("packet_delay", &self.packet_delay) .field("flush_interval", &self.flush_interval) @@ -2646,7 +2658,7 @@ impl ConfigHandler { if af_packet.vlan_pcp_in_physical_mirror_traffic != new_af_packet.vlan_pcp_in_physical_mirror_traffic { - info!("Update inputs.cbpf.af_packet.vlan_pcp_in_physical_mirror_traffic from {:?} to {:?}.", + info!("Update inputs.cbpf.af_packet.vlan_pcp_in_physical_mirror_traffic from {:?} to {:?}.", af_packet.vlan_pcp_in_physical_mirror_traffic, new_af_packet.vlan_pcp_in_physical_mirror_traffic); af_packet.vlan_pcp_in_physical_mirror_traffic = new_af_packet.vlan_pcp_in_physical_mirror_traffic; @@ -2734,7 +2746,7 @@ impl ConfigHandler { if physical_mirror.private_cloud_gateway_traffic != new_physical_mirror.private_cloud_gateway_traffic { - info!("Update inputs.cbpf.physical_mirror.private_cloud_gateway_traffic from {:?} to {:?}.", + info!("Update inputs.cbpf.physical_mirror.private_cloud_gateway_traffic from {:?} to {:?}.", physical_mirror.private_cloud_gateway_traffic, new_physical_mirror.private_cloud_gateway_traffic); physical_mirror.private_cloud_gateway_traffic = new_physical_mirror.private_cloud_gateway_traffic; @@ -2743,7 +2755,7 @@ impl ConfigHandler { if physical_mirror.default_capture_network_type != new_physical_mirror.default_capture_network_type { - info!("Update inputs.cbpf.physical_mirror.default_capture_network_type from {:?} to {:?}.", + info!("Update inputs.cbpf.physical_mirror.default_capture_network_type from {:?} to {:?}.", physical_mirror.default_capture_network_type, new_physical_mirror.default_capture_network_type); physical_mirror.default_capture_network_type = new_physical_mirror.default_capture_network_type; @@ -2802,13 +2814,13 @@ impl ConfigHandler { let physical_switch = &mut special_network.physical_switch; let new_physical_switch = &mut new_special_network.physical_switch; if physical_switch.netflow_ports != new_physical_switch.netflow_ports { - info!("Update inputs.cbpf.special_network.physical_switch.netflow_ports from {:?} to {:?}.", + info!("Update inputs.cbpf.special_network.physical_switch.netflow_ports from {:?} to {:?}.", physical_switch.netflow_ports , new_physical_switch.netflow_ports ); physical_switch.netflow_ports = new_physical_switch.netflow_ports.clone(); restart_agent = !first_run; } if physical_switch.sflow_ports != new_physical_switch.sflow_ports { - info!("Update inputs.cbpf.special_network.physical_switch.sflow_ports from {:?} to {:?}.", + info!("Update inputs.cbpf.special_network.physical_switch.sflow_ports from {:?} to {:?}.", physical_switch.sflow_ports , new_physical_switch.sflow_ports ); physical_switch.sflow_ports = new_physical_switch.sflow_ports.clone(); restart_agent = !first_run; @@ -2816,7 +2828,7 @@ impl ConfigHandler { if special_network.vhost_user.vhost_socket_path != new_special_network.vhost_user.vhost_socket_path { - info!("Update inputs.cbpf.special_network.vhost_user.vhost_socket_path from {:?} to {:?}.", + info!("Update inputs.cbpf.special_network.vhost_user.vhost_socket_path from {:?} to {:?}.", special_network.vhost_user.vhost_socket_path, new_special_network.vhost_user.vhost_socket_path); special_network.vhost_user.vhost_socket_path = new_special_network.vhost_user.vhost_socket_path.clone(); @@ -3081,7 +3093,7 @@ impl ConfigHandler { if preprocess.out_of_order_reassembly_cache_size != new_preprocess.out_of_order_reassembly_cache_size { - info!("Update inputs.ebpf.socket.preprocess.out_of_order_reassembly_cache_size from {:?} to {:?}.", + info!("Update inputs.ebpf.socket.preprocess.out_of_order_reassembly_cache_size from {:?} to {:?}.", preprocess.out_of_order_reassembly_cache_size, new_preprocess.out_of_order_reassembly_cache_size); preprocess.out_of_order_reassembly_cache_size = new_preprocess.out_of_order_reassembly_cache_size; @@ -3090,7 +3102,7 @@ impl ConfigHandler { if preprocess.out_of_order_reassembly_protocols != new_preprocess.out_of_order_reassembly_protocols { - info!("Update inputs.ebpf.socket.preprocess.out_of_order_reassembly_protocols from {:?} to {:?}.", + info!("Update inputs.ebpf.socket.preprocess.out_of_order_reassembly_protocols from {:?} to {:?}.", preprocess.out_of_order_reassembly_protocols, new_preprocess.out_of_order_reassembly_protocols); preprocess.out_of_order_reassembly_protocols = new_preprocess.out_of_order_reassembly_protocols.clone(); @@ -3099,7 +3111,7 @@ impl ConfigHandler { if preprocess.segmentation_reassembly_protocols != new_preprocess.segmentation_reassembly_protocols { - info!("Update inputs.ebpf.socket.preprocess.segmentation_reassembly_protocols from {:?} to {:?}.", + info!("Update inputs.ebpf.socket.preprocess.segmentation_reassembly_protocols from {:?} to {:?}.", preprocess.segmentation_reassembly_protocols, new_preprocess.segmentation_reassembly_protocols); preprocess.segmentation_reassembly_protocols = new_preprocess.segmentation_reassembly_protocols.clone(); @@ -3301,7 +3313,7 @@ impl ConfigHandler { if private_cloud.hypervisor_resource_enabled != new_private_cloud.hypervisor_resource_enabled { - info!("Update inputs.resources.private_cloud.hypervisor_resource_enabled from {:?} to {:?}.", + info!("Update inputs.resources.private_cloud.hypervisor_resource_enabled from {:?} to {:?}.", private_cloud.hypervisor_resource_enabled, new_private_cloud.hypervisor_resource_enabled); private_cloud.hypervisor_resource_enabled = new_private_cloud.hypervisor_resource_enabled; @@ -3335,13 +3347,13 @@ impl ConfigHandler { if pull_resource.only_kubernetes_pod_ip_in_local_cluster != new_pull_resource.only_kubernetes_pod_ip_in_local_cluster { - info!("Update inputs.resources.pull_resource_from_controller.only_kubernetes_pod_ip_in_local_cluster from {:?} to {:?}.", + info!("Update inputs.resources.pull_resource_from_controller.only_kubernetes_pod_ip_in_local_cluster from {:?} to {:?}.", pull_resource.only_kubernetes_pod_ip_in_local_cluster, new_pull_resource.only_kubernetes_pod_ip_in_local_cluster); pull_resource.only_kubernetes_pod_ip_in_local_cluster = new_pull_resource.only_kubernetes_pod_ip_in_local_cluster; } if pull_resource.domain_filter != new_pull_resource.domain_filter { - info!("Update inputs.resources.pull_resource_from_controller.domain_filter from {:?} to {:?}.", + info!("Update inputs.resources.pull_resource_from_controller.domain_filter from {:?} to {:?}.", pull_resource.domain_filter, new_pull_resource.domain_filter); pull_resource.domain_filter = new_pull_resource.domain_filter.clone(); } @@ -3475,7 +3487,7 @@ impl ConfigHandler { let relative_sys_load = &mut circuit_breakers.relative_sys_load; let new_relative_sys_load = &mut new_circuit_breakers.relative_sys_load; if relative_sys_load.recovery_threshold != new_relative_sys_load.recovery_threshold { - info!("Update global.circuit_breakers.relative_sys_load.recovery_threshold from {:?} to {:?}.", + info!("Update global.circuit_breakers.relative_sys_load.recovery_threshold from {:?} to {:?}.", relative_sys_load.recovery_threshold, new_relative_sys_load.recovery_threshold); relative_sys_load.recovery_threshold = new_relative_sys_load.recovery_threshold; } @@ -3487,14 +3499,14 @@ impl ConfigHandler { relative_sys_load.metric = new_relative_sys_load.metric; } if relative_sys_load.trigger_threshold != new_relative_sys_load.trigger_threshold { - info!("Update global.circuit_breakers.relative_sys_load.trigger_threshold from {:?} to {:?}.", + info!("Update global.circuit_breakers.relative_sys_load.trigger_threshold from {:?} to {:?}.", relative_sys_load.trigger_threshold, new_relative_sys_load.trigger_threshold); relative_sys_load.trigger_threshold = new_relative_sys_load.trigger_threshold; } let sys_memory_percentage = &mut circuit_breakers.sys_memory_percentage; let new_sys_memory_percentage = &mut new_circuit_breakers.sys_memory_percentage; if sys_memory_percentage.trigger_threshold != new_sys_memory_percentage.trigger_threshold { - info!("Update global.circuit_breakers.sys_memory_percentage.trigger_threshold from {:?} to {:?}.", + info!("Update global.circuit_breakers.sys_memory_percentage.trigger_threshold from {:?} to {:?}.", sys_memory_percentage.trigger_threshold, new_sys_memory_percentage.trigger_threshold); sys_memory_percentage.trigger_threshold = new_sys_memory_percentage.trigger_threshold; } @@ -3522,7 +3534,7 @@ impl ConfigHandler { if tx_throughput.throughput_monitoring_interval != new_tx_throughput.throughput_monitoring_interval { - info!("Update global.circuit_breakers.tx_throughput.throughput_monitoring_interval from {:?} to {:?}.", + info!("Update global.circuit_breakers.tx_throughput.throughput_monitoring_interval from {:?} to {:?}.", tx_throughput.throughput_monitoring_interval, new_tx_throughput.throughput_monitoring_interval); tx_throughput.throughput_monitoring_interval = new_tx_throughput.throughput_monitoring_interval; @@ -3980,7 +3992,7 @@ impl ConfigHandler { } if filters.inactive_server_port_aggregation != new_filters.inactive_server_port_aggregation { - info!("Update outputs.flow_metrics.filters.inactive_server_port_aggregation from {:?} to {:?}.", + info!("Update outputs.flow_metrics.filters.inactive_server_port_aggregation from {:?} to {:?}.", filters.inactive_server_port_aggregation, new_filters.inactive_server_port_aggregation); filters.inactive_server_port_aggregation = new_filters.inactive_server_port_aggregation; } @@ -4254,25 +4266,25 @@ impl ConfigHandler { let new_flow_generation = &mut new_conntrack.flow_generation; if flow_generation.cloud_traffic_ignore_mac != new_flow_generation.cloud_traffic_ignore_mac { - info!("Update processors.flow_log.conntrack.flow_generation.cloud_traffic_ignore_mac from {:?} to {:?}.", + info!("Update processors.flow_log.conntrack.flow_generation.cloud_traffic_ignore_mac from {:?} to {:?}.", flow_generation.cloud_traffic_ignore_mac, new_flow_generation.cloud_traffic_ignore_mac); flow_generation.cloud_traffic_ignore_mac = new_flow_generation.cloud_traffic_ignore_mac; restart_agent = !first_run; } if flow_generation.idc_traffic_ignore_vlan != new_flow_generation.idc_traffic_ignore_vlan { - info!("Update processors.flow_log.conntrack.flow_generation.idc_traffic_ignore_vlan from {:?} to {:?}.", + info!("Update processors.flow_log.conntrack.flow_generation.idc_traffic_ignore_vlan from {:?} to {:?}.", flow_generation.idc_traffic_ignore_vlan, new_flow_generation.idc_traffic_ignore_vlan); flow_generation.idc_traffic_ignore_vlan = new_flow_generation.idc_traffic_ignore_vlan; restart_agent = !first_run; } if flow_generation.ignore_l2_end != new_flow_generation.ignore_l2_end { - info!("Update processors.flow_log.conntrack.flow_generation.ignore_l2_end from {:?} to {:?}.", + info!("Update processors.flow_log.conntrack.flow_generation.ignore_l2_end from {:?} to {:?}.", flow_generation.ignore_l2_end, new_flow_generation.ignore_l2_end); flow_generation.ignore_l2_end = new_flow_generation.ignore_l2_end; restart_agent = !first_run; } if flow_generation.server_ports != new_flow_generation.server_ports { - info!("Update processors.flow_log.conntrack.flow_generation.server_ports from {:?} to {:?}.", + info!("Update processors.flow_log.conntrack.flow_generation.server_ports from {:?} to {:?}.", flow_generation.server_ports, new_flow_generation.server_ports); flow_generation.server_ports = new_flow_generation.server_ports.clone(); restart_agent = !first_run; @@ -4316,13 +4328,13 @@ impl ConfigHandler { let time_window = &mut flow_log.time_window; let new_time_window = &mut new_flow_log.time_window; if time_window.extra_tolerable_flow_delay != new_time_window.extra_tolerable_flow_delay { - info!("Update processors.flow_log.time_window.extra_tolerable_flow_delay from {:?} to {:?}.", + info!("Update processors.flow_log.time_window.extra_tolerable_flow_delay from {:?} to {:?}.", time_window.extra_tolerable_flow_delay, new_time_window.extra_tolerable_flow_delay); time_window.extra_tolerable_flow_delay = new_time_window.extra_tolerable_flow_delay; restart_agent = !first_run; } if time_window.max_tolerable_packet_delay != new_time_window.max_tolerable_packet_delay { - info!("Update processors.flow_log.time_window.max_tolerable_packet_delay from {:?} to {:?}.", + info!("Update processors.flow_log.time_window.max_tolerable_packet_delay from {:?} to {:?}.", time_window.max_tolerable_packet_delay, new_time_window.max_tolerable_packet_delay); time_window.max_tolerable_packet_delay = new_time_window.max_tolerable_packet_delay; restart_agent = !first_run; @@ -4379,7 +4391,7 @@ impl ConfigHandler { restart_agent = !first_run; } if tunning.quadruple_generator_queue_size != new_tunning.quadruple_generator_queue_size { - info!("Update processors.flow_log.tunning.quadruple_generator_queue_size from {:?} to {:?}.", + info!("Update processors.flow_log.tunning.quadruple_generator_queue_size from {:?} to {:?}.", tunning.quadruple_generator_queue_size, new_tunning.quadruple_generator_queue_size); tunning.quadruple_generator_queue_size = new_tunning.quadruple_generator_queue_size; restart_agent = !first_run; @@ -4390,25 +4402,25 @@ impl ConfigHandler { let app = &mut request_log.application_protocol_inference; let new_app = &mut new_request_log.application_protocol_inference; if app.enabled_protocols != new_app.enabled_protocols { - info!("Update processors.request_log.application_protocol_inference.enabled_protocols from {:?} to {:?}.", + info!("Update processors.request_log.application_protocol_inference.enabled_protocols from {:?} to {:?}.", app.enabled_protocols, new_app.enabled_protocols); app.enabled_protocols = new_app.enabled_protocols.clone(); restart_agent = !first_run; } if app.inference_max_retries != new_app.inference_max_retries { - info!("Update processors.request_log.application_protocol_inference.inference_max_retries from {:?} to {:?}.", + info!("Update processors.request_log.application_protocol_inference.inference_max_retries from {:?} to {:?}.", app.inference_max_retries, new_app.inference_max_retries); app.inference_max_retries = new_app.inference_max_retries; restart_agent = !first_run; } if app.inference_result_ttl != new_app.inference_result_ttl { - info!("Update processors.request_log.application_protocol_inference.inference_result_ttl from {:?} to {:?}.", + info!("Update processors.request_log.application_protocol_inference.inference_result_ttl from {:?} to {:?}.", app.inference_result_ttl, new_app.inference_result_ttl); app.inference_result_ttl = new_app.inference_result_ttl; restart_agent = !first_run; } if app.protocol_special_config != new_app.protocol_special_config { - info!("Update processors.request_log.application_protocol_inference.protocol_special_config from {:?} to {:?}.", + info!("Update processors.request_log.application_protocol_inference.protocol_special_config from {:?} to {:?}.", app.protocol_special_config, new_app.protocol_special_config); app.protocol_special_config = new_app.protocol_special_config; restart_agent = !first_run; @@ -4434,7 +4446,7 @@ impl ConfigHandler { if filters.unconcerned_dns_nxdomain_response_suffixes != new_filters.unconcerned_dns_nxdomain_response_suffixes { - info!("Update processors.request_log.filters.unconcerned_dns_nxdomain_response_suffixes from {:?} to {:?}.", + info!("Update processors.request_log.filters.unconcerned_dns_nxdomain_response_suffixes from {:?} to {:?}.", filters.unconcerned_dns_nxdomain_response_suffixes, new_filters.unconcerned_dns_nxdomain_response_suffixes); filters.unconcerned_dns_nxdomain_response_suffixes = new_filters .unconcerned_dns_nxdomain_response_suffixes @@ -4460,7 +4472,7 @@ impl ConfigHandler { restart_agent = !first_run; } if tag_extraction.obfuscate_protocols != new_tag_extraction.obfuscate_protocols { - info!("Update processors.request_log.tag_extraction.obfuscate_protocols from {:?} to {:?}.", + info!("Update processors.request_log.tag_extraction.obfuscate_protocols from {:?} to {:?}.", tag_extraction.obfuscate_protocols, new_tag_extraction.obfuscate_protocols); tag_extraction.obfuscate_protocols = new_tag_extraction.obfuscate_protocols.clone(); restart_agent = !first_run; @@ -4478,7 +4490,7 @@ impl ConfigHandler { if tunning.consistent_timestamp_in_l7_metrics != new_tunning.consistent_timestamp_in_l7_metrics { - info!("Update processors.request_log.tunning.consistent_timestamp_in_l7_metrics from {:?} to {:?}.", + info!("Update processors.request_log.tunning.consistent_timestamp_in_l7_metrics from {:?} to {:?}.", tunning.consistent_timestamp_in_l7_metrics, new_tunning.consistent_timestamp_in_l7_metrics); tunning.consistent_timestamp_in_l7_metrics = new_tunning.consistent_timestamp_in_l7_metrics; @@ -4492,7 +4504,7 @@ impl ConfigHandler { tunning.payload_truncation = new_tunning.payload_truncation; } if tunning.session_aggregate_slot_capacity != new_tunning.session_aggregate_slot_capacity { - info!("Update processors.request_log.tunning.session_aggregate_slot_capacity from {:?} to {:?}.", + info!("Update processors.request_log.tunning.session_aggregate_slot_capacity from {:?} to {:?}.", tunning.session_aggregate_slot_capacity, new_tunning.session_aggregate_slot_capacity); tunning.session_aggregate_slot_capacity = new_tunning.session_aggregate_slot_capacity; restart_agent = !first_run; @@ -4607,14 +4619,15 @@ impl ConfigHandler { let max_cpus = 1.max(system.cpus().len()) as u32; let max_millicpus = max_cpus * 1000; - if candidate_config.environment.max_memory != max_memory { + new_config.environment.max_memory = max_memory; + new_config.environment.max_millicpus = max_millicpus; + + if candidate_config.environment.max_memory != new_config.environment.max_memory { info!("memory set ulimit when capture_mode=analyzer"); - candidate_config.environment.max_memory = max_memory; } - if candidate_config.environment.max_millicpus != max_millicpus { + if candidate_config.environment.max_millicpus != new_config.environment.max_millicpus { info!("cpu set ulimit when capture_mode=analyzer"); - candidate_config.environment.max_millicpus = max_millicpus; } } diff --git a/agent/src/flow_generator/flow_map.rs b/agent/src/flow_generator/flow_map.rs index 3214100e70d..f12e438e5cd 100644 --- a/agent/src/flow_generator/flow_map.rs +++ b/agent/src/flow_generator/flow_map.rs @@ -334,7 +334,7 @@ impl FlowMap { None }, stats_collector, - capacity: config.capacity as usize, + capacity: config.flow_capacity() as usize, size: 0, } } @@ -2547,7 +2547,7 @@ pub fn _new_flow_map_and_receiver( flow_timeout: Option, ignore_idc_vlan: bool, ) -> (ModuleConfig, FlowMap, Receiver>>) { - let (_, mut policy_getter) = Policy::new(1, 0, 1 << 10, 1 << 14, false); + let (_, mut policy_getter) = Policy::new(1, 0, 1 << 10, 1 << 14, false, false); policy_getter.disable(); let queue_debugger = QueueDebugger::new(); let (output_queue_sender, output_queue_receiver, _) = diff --git a/agent/src/policy/first_path.rs b/agent/src/policy/first_path.rs index d5185ea1308..d26fe0645ca 100644 --- a/agent/src/policy/first_path.rs +++ b/agent/src/policy/first_path.rs @@ -248,6 +248,7 @@ pub struct FirstPath { fast: FastPath, fast_disable: bool, + memory_check_disable: bool, memory_limit: AtomicU64, } @@ -261,7 +262,13 @@ impl FirstPath { const POLICY_LIMIT: u64 = 500000; const MEMORY_LIMIT: u64 = 1 << 20; - pub fn new(queue_count: usize, level: usize, map_size: usize, fast_disable: bool) -> FirstPath { + pub fn new( + queue_count: usize, + level: usize, + map_size: usize, + fast_disable: bool, + memory_check_disable: bool, + ) -> FirstPath { FirstPath { group_ip_map: Some(AHashMap::new()), vector_4: Vector4::default(), @@ -281,6 +288,7 @@ impl FirstPath { fast: FastPath::new(queue_count, map_size), fast_disable, + memory_check_disable, memory_limit: AtomicU64::new(0), } } @@ -365,6 +373,10 @@ impl FirstPath { } fn memory_check(&self, size: u64) -> bool { + if self.memory_check_disable { + return true; + } + let Ok(current) = get_memory_rss() else { warn!("Cannot check policy memory: Get process memory failed."); return true; @@ -804,7 +816,7 @@ mod tests { } fn generate_table() -> PResult { - let mut first = FirstPath::new(1, 8, 1 << 16, false); + let mut first = FirstPath::new(1, 8, 1 << 16, false, false); let acl = Acl::new( 1, vec![10], diff --git a/agent/src/policy/policy.rs b/agent/src/policy/policy.rs index ae8e09ecf1f..7aad11cd42c 100644 --- a/agent/src/policy/policy.rs +++ b/agent/src/policy/policy.rs @@ -21,7 +21,7 @@ use std::sync::{ }; use ahash::AHashMap; -use log::{debug, warn}; +use log::{debug, info, warn}; use pnet::datalink; use public::enums::IpProtocol; @@ -108,10 +108,21 @@ impl Policy { map_size: usize, forward_capacity: usize, fast_disable: bool, + memory_check_disable: bool, ) -> (PolicySetter, PolicyGetter) { + if memory_check_disable { + info!("The policy module does not check the memory."); + } + let policy = Box::into_raw(Box::new(Policy { labeler: Labeler::default(), - table: FirstPath::new(queue_count, level, map_size, fast_disable), + table: FirstPath::new( + queue_count, + level, + map_size, + fast_disable, + memory_check_disable, + ), forward: Forward::new(queue_count, forward_capacity), nat: RwLock::new(vec![AHashMap::new(), AHashMap::new()]), first_hit: 0, @@ -738,7 +749,7 @@ mod test { #[test] fn test_policy_normal() { - let (mut setter, mut getter) = Policy::new(10, 0, 1024, 1024, false); + let (mut setter, mut getter) = Policy::new(10, 0, 1024, 1024, false, false); let interface: PlatformData = PlatformData { mac: 0x002233445566, ips: vec![IpSubnet { diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 0cbd6256355..0948e98986c 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -2034,6 +2034,7 @@ impl AgentComponents { user_config.get_fast_path_map_size(candidate_config.dispatcher.max_memory), user_config.processors.packet.policy.forward_table_capacity, user_config.processors.packet.policy.fast_path_disabled, + candidate_config.capture_mode == PacketCaptureType::Analyzer, ); synchronizer.add_flow_acl_listener(Box::new(policy_setter)); policy_setter.set_memory_limit(max_memory);