Skip to content

Commit

Permalink
fix: the policy does not check memory in analyzer mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa authored and sharang committed Nov 25, 2024
1 parent c735640 commit 8a39507
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 10 deletions.
2 changes: 1 addition & 1 deletion agent/benches/labeler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ fn bench_labeler(c: &mut Criterion) {

fn bench_policy(c: &mut Criterion) {
fn generate_table() -> FirstPath {
let mut first = FirstPath::new(1, 8, 1 << 16, false);
let mut first = FirstPath::new(1, 8, 1 << 16, false, false);
let acl = Acl::new(
1,
vec![10],
Expand Down
17 changes: 15 additions & 2 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
use std::borrow::Cow;
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, u32};

use arc_swap::{access::Map, ArcSwap};
use base64::{prelude::BASE64_STANDARD, Engine};
Expand All @@ -43,7 +43,7 @@ use sysinfo::SystemExt;
use sysinfo::{CpuRefreshKind, RefreshKind, System};
use tokio::runtime::Runtime;

use super::config::{ExtraLogFields, L7LogBlacklist, OracleParseConfig};
use super::config::{ExtraLogFields, FlowGeneratorConfig, L7LogBlacklist, OracleParseConfig};
#[cfg(any(target_os = "linux", target_os = "android"))]
use super::{
config::EbpfYamlConfig, OsProcRegexp, OS_PROC_REGEXP_MATCH_ACTION_ACCEPT,
Expand Down Expand Up @@ -428,6 +428,7 @@ impl PluginConfig {
pub struct FlowConfig {
pub vtap_id: u16,
pub trident_type: TridentType,
pub tap_mode: TapMode,
pub cloud_gateway_traffic: bool,
pub collector_enabled: bool,
pub l7_log_tap_types: [bool; 256],
Expand Down Expand Up @@ -480,6 +481,7 @@ impl From<&RuntimeConfig> for FlowConfig {
FlowConfig {
vtap_id: conf.vtap_id as u16,
trident_type: conf.trident_type,
tap_mode: conf.tap_mode,
cloud_gateway_traffic: conf.yaml_config.cloud_gateway_traffic,
collector_enabled: conf.collector_enabled,
l7_log_tap_types: {
Expand Down Expand Up @@ -596,6 +598,7 @@ impl fmt::Debug for FlowConfig {
.collect::<Vec<_>>(),
)
.field("capacity", &self.capacity)
.field("flow_capacity", &self.flow_capacity())
.field("hash_slots", &self.hash_slots)
.field("packet_delay", &self.packet_delay)
.field("flush_interval", &self.flush_interval)
Expand Down Expand Up @@ -628,6 +631,16 @@ impl fmt::Debug for FlowConfig {
}
}

impl FlowConfig {
pub fn flow_capacity(&self) -> u32 {
let default_capacity = FlowGeneratorConfig::default().capacity;
match self.tap_mode {
TapMode::Analyzer if self.capacity <= default_capacity => u32::MAX,
_ => self.capacity,
}
}
}

#[derive(Clone, PartialEq, Eq)]
struct TrieNode {
children: HashMap<char, Box<TrieNode>>,
Expand Down
4 changes: 2 additions & 2 deletions agent/src/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl FlowMap {
None
},
stats_collector,
capacity: config.capacity as usize,
capacity: config.flow_capacity() as usize,
size: 0,
}
}
Expand Down Expand Up @@ -2478,7 +2478,7 @@ pub fn _new_flow_map_and_receiver(
flow_timeout: Option<FlowTimeout>,
ignore_idc_vlan: bool,
) -> (ModuleConfig, FlowMap, Receiver<Arc<BatchedBox<TaggedFlow>>>) {
let (_, mut policy_getter) = Policy::new(1, 0, 1 << 10, 1 << 14, false);
let (_, mut policy_getter) = Policy::new(1, 0, 1 << 10, 1 << 14, false, false);
policy_getter.disable();
let queue_debugger = QueueDebugger::new();
let (output_queue_sender, output_queue_receiver, _) =
Expand Down
16 changes: 14 additions & 2 deletions agent/src/policy/first_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ pub struct FirstPath {
fast: FastPath,

fast_disable: bool,
memory_check_disable: bool,

memory_limit: AtomicU64,
}
Expand All @@ -261,7 +262,13 @@ impl FirstPath {
const POLICY_LIMIT: u64 = 500000;
const MEMORY_LIMIT: u64 = 1 << 20;

pub fn new(queue_count: usize, level: usize, map_size: usize, fast_disable: bool) -> FirstPath {
pub fn new(
queue_count: usize,
level: usize,
map_size: usize,
fast_disable: bool,
memory_check_disable: bool,
) -> FirstPath {
FirstPath {
group_ip_map: Some(AHashMap::new()),
vector_4: Vector4::default(),
Expand All @@ -281,6 +288,7 @@ impl FirstPath {

fast: FastPath::new(queue_count, map_size),
fast_disable,
memory_check_disable,
memory_limit: AtomicU64::new(0),
}
}
Expand Down Expand Up @@ -365,6 +373,10 @@ impl FirstPath {
}

fn memory_check(&self, size: u64) -> bool {
if self.memory_check_disable {
return true;
}

let Ok(current) = get_memory_rss() else {
warn!("Cannot check policy memory: Get process memory failed.");
return true;
Expand Down Expand Up @@ -804,7 +816,7 @@ mod tests {
}

fn generate_table() -> PResult<FirstPath> {
let mut first = FirstPath::new(1, 8, 1 << 16, false);
let mut first = FirstPath::new(1, 8, 1 << 16, false, false);
let acl = Acl::new(
1,
vec![10],
Expand Down
17 changes: 14 additions & 3 deletions agent/src/policy/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::{
};

use ahash::AHashMap;
use log::{debug, warn};
use log::{debug, info, warn};
use pnet::datalink;
use public::enums::IpProtocol;

Expand Down Expand Up @@ -109,10 +109,21 @@ impl Policy {
map_size: usize,
forward_capacity: usize,
fast_disable: bool,
memory_check_disable: bool,
) -> (PolicySetter, PolicyGetter) {
if memory_check_disable {
info!("The policy module does not check the memory.");
}

let policy = Box::into_raw(Box::new(Policy {
labeler: Labeler::default(),
table: FirstPath::new(queue_count, level, map_size, fast_disable),
table: FirstPath::new(
queue_count,
level,
map_size,
fast_disable,
memory_check_disable,
),
forward: Forward::new(queue_count, forward_capacity),
nat: RwLock::new(vec![AHashMap::new(), AHashMap::new()]),
first_hit: 0,
Expand Down Expand Up @@ -747,7 +758,7 @@ mod test {

#[test]
fn test_policy_normal() {
let (mut setter, mut getter) = Policy::new(10, 0, 1024, 1024, false);
let (mut setter, mut getter) = Policy::new(10, 0, 1024, 1024, false, false);
let interface: PlatformData = PlatformData {
mac: 0x002233445566,
ips: vec![IpSubnet {
Expand Down
1 change: 1 addition & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,7 @@ impl AgentComponents {
yaml_config.get_fast_path_map_size(candidate_config.dispatcher.max_memory),
yaml_config.forward_capacity,
yaml_config.fast_path_disabled,
candidate_config.tap_mode == TapMode::Analyzer,
);
synchronizer.add_flow_acl_listener(Box::new(policy_setter));
policy_setter.set_memory_limit(max_memory);
Expand Down

0 comments on commit 8a39507

Please sign in to comment.