Skip to content

Commit

Permalink
fix: the issue of duplicate metrics data
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Mar 6, 2025
1 parent ea4bff0 commit bc5470a
Showing 1 changed file with 28 additions and 79 deletions.
107 changes: 28 additions & 79 deletions agent/src/collector/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl Stash {
..Default::default()
};
let key = StashKey::new(&tagger, Ipv4Addr::UNSPECIFIED.into(), None, 0);
self.add(key, tagger, Meter::Usage(usage_meter), flow.close_type);
self.add(key, tagger, Meter::Usage(usage_meter));
}
let id_map = &acc_flow.id_maps[1];
for (&acl_gid, &ip_id) in id_map.iter() {
Expand All @@ -482,7 +482,7 @@ impl Stash {
..Default::default()
};
let key = StashKey::new(&tagger, Ipv4Addr::UNSPECIFIED.into(), None, 0);
self.add(key, tagger, Meter::Usage(usage_meter), flow.close_type);
self.add(key, tagger, Meter::Usage(usage_meter));
}
}

Expand Down Expand Up @@ -535,10 +535,10 @@ impl Stash {
None,
0,
0,
acc_flow.l7_protocol,
L7Protocol::Unknown,
self.context.agent_mode,
);
self.fill_single_l4_stats(tagger, flow_meter, acc_flow.flow.close_type);
self.fill_single_l4_stats(tagger, flow_meter);
}
let tagger = get_edge_tagger(
self.global_thread_id,
Expand All @@ -550,12 +550,12 @@ impl Stash {
None,
0,
0,
acc_flow.l7_protocol,
L7Protocol::Unknown,
self.context.agent_mode,
);
// edge_stats: If the direction of a certain end is known, the statistical data
// will be recorded with the direction (corresponding tap-side), up to two times
self.fill_edge_l4_stats(tagger, acc_flow.flow_meter, acc_flow.flow.close_type);
self.fill_edge_l4_stats(tagger, acc_flow.flow_meter);
}
// edge_stats: If both ends of direction are None, record the
// statistical data with direction=0 (corresponding tap-side=rest)
Expand All @@ -576,19 +576,14 @@ impl Stash {
None,
0,
0,
acc_flow.l7_protocol,
L7Protocol::Unknown,
self.context.agent_mode,
);
self.fill_edge_l4_stats(tagger, acc_flow.flow_meter, acc_flow.flow.close_type);
self.fill_edge_l4_stats(tagger, acc_flow.flow_meter);
}
}

fn fill_single_l4_stats(
&mut self,
tagger: Tagger,
flow_meter: FlowMeter,
close_type: CloseType,
) {
fn fill_single_l4_stats(&mut self, tagger: Tagger, flow_meter: FlowMeter) {
// We collect the single-ended metrics data from Packet, XFlow, EBPF, Otel to the table (vtap_app_port).
// In the case of signal_source grouping, the single_stats data is not duplicate.
// Only data whose direction is c|s|local|None has flow_meter.
Expand All @@ -598,18 +593,18 @@ impl Stash {
|| tagger.direction == Direction::None
{
let key = StashKey::new(&tagger, tagger.ip, None, 0);
self.add(key, tagger, Meter::Flow(flow_meter), close_type);
self.add(key, tagger, Meter::Flow(flow_meter));
}
}

fn fill_edge_l4_stats(&mut self, tagger: Tagger, flow_meter: FlowMeter, close_type: CloseType) {
fn fill_edge_l4_stats(&mut self, tagger: Tagger, flow_meter: FlowMeter) {
// network metrics (vtap_flow_edge_port)
// Packet data and XFlow data have L4 info
if tagger.signal_source == SignalSource::Packet
|| tagger.signal_source == SignalSource::XFlow
{
let key = StashKey::new(&tagger, tagger.ip, Some(tagger.ip1), 0);
self.add(key, tagger, Meter::Flow(flow_meter), close_type);
self.add(key, tagger, Meter::Flow(flow_meter));
}
}

Expand Down Expand Up @@ -729,12 +724,7 @@ impl Stash {
self.context.agent_mode,
);
tagger.code |= Code::L7_PROTOCOL;
self.fill_single_l7_stats(
tagger,
meter.endpoint_hash,
meter.app_meter,
flow.close_type,
);
self.fill_single_l7_stats(tagger, meter.endpoint_hash, meter.app_meter);
}
let mut tagger = get_edge_tagger(
self.global_thread_id,
Expand All @@ -752,12 +742,7 @@ impl Stash {
tagger.code |= Code::L7_PROTOCOL;
// edge_stats: If the direction of a certain end is known, the statistical data
// will be recorded with the direction (corresponding tap-side), up to two times
self.fill_edge_l7_stats(
tagger,
meter.endpoint_hash,
meter.app_meter,
flow.close_type,
);
self.fill_edge_l7_stats(tagger, meter.endpoint_hash, meter.app_meter);
}
// edge_stats: If both ends of direction are None, record the
// statistical data with direction=0 (corresponding tap-side=rest)
Expand All @@ -782,22 +767,11 @@ impl Stash {
self.context.agent_mode,
);
tagger.code |= Code::L7_PROTOCOL;
self.fill_edge_l7_stats(
tagger,
meter.endpoint_hash,
meter.app_meter,
flow.close_type,
);
self.fill_edge_l7_stats(tagger, meter.endpoint_hash, meter.app_meter);
}
}

fn fill_single_l7_stats(
&mut self,
tagger: Tagger,
endpoint_hash: u32,
app_meter: AppMeter,
close_type: CloseType,
) {
fn fill_single_l7_stats(&mut self, tagger: Tagger, endpoint_hash: u32, app_meter: AppMeter) {
// The l7_protocol of otel data may not be available, so report all otel data metrics.
if tagger.l7_protocol != L7Protocol::Unknown || tagger.signal_source == SignalSource::OTel {
// Only data whose direction is c|s|local|c-p|s-p|c-app|s-app|app has app_meter.
Expand All @@ -809,23 +783,17 @@ impl Stash {
|| tagger.signal_source != SignalSource::Packet
{
let key = StashKey::new(&tagger, tagger.ip, None, endpoint_hash);
self.add(key, tagger, Meter::App(app_meter), close_type);
self.add(key, tagger, Meter::App(app_meter));
}
}
}

fn fill_edge_l7_stats(
&mut self,
tagger: Tagger,
endpoint_hash: u32,
app_meter: AppMeter,
close_type: CloseType,
) {
fn fill_edge_l7_stats(&mut self, tagger: Tagger, endpoint_hash: u32, app_meter: AppMeter) {
// The l7_protocol of otel data may not be available, so report all otel data metrics.
// application metrics (vtap_app_edge_port)
if tagger.l7_protocol != L7Protocol::Unknown || tagger.signal_source == SignalSource::OTel {
let key = StashKey::new(&tagger, tagger.ip, Some(tagger.ip1), endpoint_hash);
self.add(key, tagger, Meter::App(app_meter), close_type);
self.add(key, tagger, Meter::App(app_meter));
}
}

Expand All @@ -839,35 +807,16 @@ impl Stash {
}
}

fn add(&mut self, key: StashKey, tagger: Tagger, meter: Meter, close_type: CloseType) {
if close_type != CloseType::Unknown && close_type != CloseType::ForcedReport {
match self.inner.entry(key) {
Entry::Occupied(o) => {
let mut doc = o.remove();
doc.meter.sequential_merge(&meter);
doc.timestamp = self.start_time.as_secs() as u32;
doc.flags |= self.doc_flag;
self.push_closed_doc(BoxedDocument(Box::new(doc)));
}
Entry::Vacant(_) => {
let mut doc = Document::new(meter);
doc.tagger = tagger;
doc.timestamp = self.start_time.as_secs() as u32;
doc.flags |= self.doc_flag;
self.push_closed_doc(BoxedDocument(Box::new(doc)));
}
fn add(&mut self, key: StashKey, tagger: Tagger, meter: Meter) {
match self.inner.entry(key) {
Entry::Occupied(mut o) => {
let doc = o.get_mut();
doc.meter.sequential_merge(&meter);
}
} else {
match self.inner.entry(key) {
Entry::Occupied(mut o) => {
let doc = o.get_mut();
doc.meter.sequential_merge(&meter);
}
Entry::Vacant(o) => {
let mut doc = Document::new(meter);
doc.tagger = tagger;
o.insert(doc);
}
Entry::Vacant(o) => {
let mut doc = Document::new(meter);
doc.tagger = tagger;
o.insert(doc);
}
}
}
Expand Down

0 comments on commit bc5470a

Please sign in to comment.