From de604c00dc920b60b0d5a4fa2c60bdd871e7218d Mon Sep 17 00:00:00 2001 From: Anthony Virtuoso Date: Mon, 18 Mar 2024 08:59:29 -0400 Subject: [PATCH] Adding an example of lock-free telemetry aggregation between kernel space (eBPF) and user space. This example makes use of PerCpuHashMaps, shows deeper packet inspection via tc context, and loads multiple eBPF programs enabling (ingress and egress) from a single user space application. --- examples/tc-byte-count/.cargo/config.toml | 2 + examples/tc-byte-count/.gitignore | 13 ++ examples/tc-byte-count/.vim/coc-settings.json | 3 + examples/tc-byte-count/.vscode/settings.json | 3 + examples/tc-byte-count/Cargo.toml | 3 + examples/tc-byte-count/README.md | 43 ++++ .../tc-byte-count-common/Cargo.toml | 14 ++ .../tc-byte-count-common/src/lib.rs | 1 + .../tc-byte-count-ebpf/.cargo/config.toml | 6 + .../tc-byte-count-ebpf/.vim/coc-settings.json | 4 + .../tc-byte-count-ebpf/.vscode/settings.json | 4 + .../tc-byte-count-ebpf/Cargo.toml | 34 +++ .../tc-byte-count-ebpf/rust-toolchain.toml | 2 + .../tc-byte-count-ebpf/src/main.rs | 184 ++++++++++++++++ .../tc-byte-count/tc-byte-count/Cargo.toml | 30 +++ .../tc-byte-count/tc-byte-count/src/lib.rs | 124 +++++++++++ .../tc-byte-count/tc-byte-count/src/main.rs | 199 ++++++++++++++++++ examples/tc-byte-count/xtask/Cargo.toml | 9 + .../tc-byte-count/xtask/src/build_ebpf.rs | 67 ++++++ examples/tc-byte-count/xtask/src/main.rs | 33 +++ examples/tc-byte-count/xtask/src/run.rs | 71 +++++++ 21 files changed, 849 insertions(+) create mode 100644 examples/tc-byte-count/.cargo/config.toml create mode 100644 examples/tc-byte-count/.gitignore create mode 100644 examples/tc-byte-count/.vim/coc-settings.json create mode 100644 examples/tc-byte-count/.vscode/settings.json create mode 100644 examples/tc-byte-count/Cargo.toml create mode 100644 examples/tc-byte-count/README.md create mode 100644 examples/tc-byte-count/tc-byte-count-common/Cargo.toml create mode 100644 examples/tc-byte-count/tc-byte-count-common/src/lib.rs create mode 100644 examples/tc-byte-count/tc-byte-count-ebpf/.cargo/config.toml create mode 100644 examples/tc-byte-count/tc-byte-count-ebpf/.vim/coc-settings.json create mode 100644 examples/tc-byte-count/tc-byte-count-ebpf/.vscode/settings.json create mode 100644 examples/tc-byte-count/tc-byte-count-ebpf/Cargo.toml create mode 100644 examples/tc-byte-count/tc-byte-count-ebpf/rust-toolchain.toml create mode 100644 examples/tc-byte-count/tc-byte-count-ebpf/src/main.rs create mode 100644 examples/tc-byte-count/tc-byte-count/Cargo.toml create mode 100644 examples/tc-byte-count/tc-byte-count/src/lib.rs create mode 100644 examples/tc-byte-count/tc-byte-count/src/main.rs create mode 100644 examples/tc-byte-count/xtask/Cargo.toml create mode 100644 examples/tc-byte-count/xtask/src/build_ebpf.rs create mode 100644 examples/tc-byte-count/xtask/src/main.rs create mode 100644 examples/tc-byte-count/xtask/src/run.rs diff --git a/examples/tc-byte-count/.cargo/config.toml b/examples/tc-byte-count/.cargo/config.toml new file mode 100644 index 00000000..f0ccbc9a --- /dev/null +++ b/examples/tc-byte-count/.cargo/config.toml @@ -0,0 +1,2 @@ +[alias] +xtask = "run --package xtask --" \ No newline at end of file diff --git a/examples/tc-byte-count/.gitignore b/examples/tc-byte-count/.gitignore new file mode 100644 index 00000000..54f741e1 --- /dev/null +++ b/examples/tc-byte-count/.gitignore @@ -0,0 +1,13 @@ +### https://raw.github.com/github/gitignore/master/Rust.gitignore + +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk diff --git a/examples/tc-byte-count/.vim/coc-settings.json b/examples/tc-byte-count/.vim/coc-settings.json new file mode 100644 index 00000000..bfe7d44f --- /dev/null +++ b/examples/tc-byte-count/.vim/coc-settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.linkedProjects": ["Cargo.toml", "tc-byte-count-ebpf/Cargo.toml"] +} diff --git a/examples/tc-byte-count/.vscode/settings.json b/examples/tc-byte-count/.vscode/settings.json new file mode 100644 index 00000000..bfe7d44f --- /dev/null +++ b/examples/tc-byte-count/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.linkedProjects": ["Cargo.toml", "tc-byte-count-ebpf/Cargo.toml"] +} diff --git a/examples/tc-byte-count/Cargo.toml b/examples/tc-byte-count/Cargo.toml new file mode 100644 index 00000000..8b00b142 --- /dev/null +++ b/examples/tc-byte-count/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +members = ["tc-byte-count", "tc-byte-count-common", "xtask"] +resolver="2" \ No newline at end of file diff --git a/examples/tc-byte-count/README.md b/examples/tc-byte-count/README.md new file mode 100644 index 00000000..ab89ef1f --- /dev/null +++ b/examples/tc-byte-count/README.md @@ -0,0 +1,43 @@ +# tc-byte-count + +This application is intended to demonstrate how to use several imporant BPF primitives via aya, including: + +1. Basic eBPF program attachment for Traffic Control (TC). +1. How to instrument ingress and egress traffic. +1. Lock free telemetry aggregation between kernel (eBPF) and userpsace programs. + +The example program itself uses a realistic scenario where the author wishes to emitt metrics about ingress and +egress network traffic data rates grouped by remote port. The data is gathered using eBPF programs attached to +the tc_egress and tc_ingress instrumentation points in the kernel. Each packet is inspected and then the byte +count of the packet is incremented for the packets remote port in a "per-cpu" map. These "pre-cpu" maps allow +us to avoid using locks for our get-increment-put operation on the map. The userspace program them periodically +aggregates the map data, summing the per-cpu values in order to produce a set of percentiles that are logged +every second. This is precisely the kind of data one might want when sizing or monitoring a bursty network +application that is also sensitive to data loss. + +## Prerequisites + +1. Install a rust stable toolchain: `rustup install stable` +1. Install a rust nightly toolchain: `rustup install nightly` +1. Install bpf-linker: `cargo install bpf-linker` + +## Build eBPF + +```bash +cargo xtask build-ebpf +``` + +To perform a release build you can use the `--release` flag. +You may also change the target architecture with the `--target` flag + +## Build Userspace + +```bash +cargo build +``` + +## Run + +```bash +cargo xtask run +``` diff --git a/examples/tc-byte-count/tc-byte-count-common/Cargo.toml b/examples/tc-byte-count/tc-byte-count-common/Cargo.toml new file mode 100644 index 00000000..eb80de8c --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-common/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tc-byte-count-common" +version = "0.1.0" +edition = "2021" + +[features] +default = [] +user = ["aya"] + +[dependencies] +aya = { git = "https://github.com/aya-rs/aya", optional = true } + +[lib] +path = "src/lib.rs" diff --git a/examples/tc-byte-count/tc-byte-count-common/src/lib.rs b/examples/tc-byte-count/tc-byte-count-common/src/lib.rs new file mode 100644 index 00000000..0c9ac1ac --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-common/src/lib.rs @@ -0,0 +1 @@ +#![no_std] diff --git a/examples/tc-byte-count/tc-byte-count-ebpf/.cargo/config.toml b/examples/tc-byte-count/tc-byte-count-ebpf/.cargo/config.toml new file mode 100644 index 00000000..5d7e5915 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-ebpf/.cargo/config.toml @@ -0,0 +1,6 @@ +[build] +target-dir = "../target" +target = "bpfel-unknown-none" + +[unstable] +build-std = ["core"] \ No newline at end of file diff --git a/examples/tc-byte-count/tc-byte-count-ebpf/.vim/coc-settings.json b/examples/tc-byte-count/tc-byte-count-ebpf/.vim/coc-settings.json new file mode 100644 index 00000000..e2211a64 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-ebpf/.vim/coc-settings.json @@ -0,0 +1,4 @@ +{ + "rust-analyzer.cargo.target": "bpfel-unknown-none", + "rust-analyzer.checkOnSave.allTargets": false +} diff --git a/examples/tc-byte-count/tc-byte-count-ebpf/.vscode/settings.json b/examples/tc-byte-count/tc-byte-count-ebpf/.vscode/settings.json new file mode 100644 index 00000000..e2211a64 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-ebpf/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "rust-analyzer.cargo.target": "bpfel-unknown-none", + "rust-analyzer.checkOnSave.allTargets": false +} diff --git a/examples/tc-byte-count/tc-byte-count-ebpf/Cargo.toml b/examples/tc-byte-count/tc-byte-count-ebpf/Cargo.toml new file mode 100644 index 00000000..fad62dda --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-ebpf/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "tc-byte-count-ebpf" +version = "0.1.0" +edition = "2021" + +[dependencies] +aya-ebpf = { git = "https://github.com/aya-rs/aya" } +aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } +tc-byte-count-common = { path = "../tc-byte-count-common" } +memoffset = "0.8" +network-types = "0.0.4" + +[[bin]] +name = "tc-byte-count" +path = "src/main.rs" + +[profile.dev] +opt-level = 3 +debug = false +debug-assertions = false +overflow-checks = false +lto = true +panic = "abort" +incremental = false +codegen-units = 1 +rpath = false + +[profile.release] +lto = true +panic = "abort" +codegen-units = 1 + +[workspace] +members = [] diff --git a/examples/tc-byte-count/tc-byte-count-ebpf/rust-toolchain.toml b/examples/tc-byte-count/tc-byte-count-ebpf/rust-toolchain.toml new file mode 100644 index 00000000..5d56faf9 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-ebpf/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" diff --git a/examples/tc-byte-count/tc-byte-count-ebpf/src/main.rs b/examples/tc-byte-count/tc-byte-count-ebpf/src/main.rs new file mode 100644 index 00000000..fa80fd45 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count-ebpf/src/main.rs @@ -0,0 +1,184 @@ +#![no_std] +#![no_main] + +use core::ops::Add; + +use aya_ebpf::{ + bindings::{BPF_ANY, TC_ACT_PIPE}, + macros::{classifier, map}, + maps::LruPerCpuHashMap, + programs::TcContext, +}; + +use network_types::{ + eth::{EthHdr, EtherType}, + ip::{IpProto, Ipv4Hdr}, + tcp::TcpHdr, +}; + +/// An LRU map that will hold information about bytes transmited. +/// The map is keyed on remote port. Values in this map are per-cpu, +/// meaning no locking is required to ensure consistent updates. +/// Values are evicted only when space is needed in the map. +#[map] +static EGRESS: LruPerCpuHashMap = LruPerCpuHashMap::with_max_entries(1024, 0); + +/// An LRU map that will hold information about bytes received. +/// The map is keyed on remote port. Values in this map are per-cpu, +/// meaning no locking is required to ensure consistent updates. +/// Values are evicted only when space is needed in the map. +#[map] +static INGRESS: LruPerCpuHashMap = LruPerCpuHashMap::with_max_entries(1024, 0); + +/// Entry point for our TrafficControl "EGRESS" eBPF attachment point. +#[classifier] +pub fn tc_egress(ctx: TcContext) -> i32 { + let _res = try_tc_egress(ctx); + + //Always allow the packet to continue to its intended destination + TC_ACT_PIPE +} + +/// Entry point for our TrafficControl "INGRESS" eBPF attachment point. +#[classifier] +pub fn tc_ingress(ctx: TcContext) -> i32 { + let _res = try_tc_ingress(ctx); + + //Always allow the packet to continue to its intended destination + TC_ACT_PIPE +} + +/// In order to reduce the total memory and cpu resource expended in +/// producing this telemetry, we collapse remote port ranges that are unlikely +/// to be of interest. For example, interesting ports include: +/// mysql: 3306 sqlserver: 1433 postgres: 5432, oracle: 1521, ephemeral: 32768 - 65535 +#[inline(always)] +fn map_port(port: u16) -> u16 { + if (32768..=65535).contains(&port) { + //ephemeral range, we can collapse these entirely + 0 + } else { + // anything else, lets track it specifically + port + } +} + +/// Handles accounting of EGRESS packets, storing telemetry by map_port(REMOTE port) +/// There is some duplicate code between this method and try_tc_ingress. This was intentional +/// as the effort to de-duplicate wasn't worth it given its only a handful of duplicate lines +#[inline(always)] +fn try_tc_egress(ctx: TcContext) -> Result<(), ()> { + let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?; + + // We are using match instead of simple if because the eBPF verifier doesn't seem to like ether_type used + // directly in an if without a stack variable copy. I think this is because EthHdr is a c-style "packed" + // struct that is memory aligned. Still Day-1 for rust and eBPF I guess + match ethhdr.ether_type { + EtherType::Ipv4 => { + // Since IPv6 adoption is low, we make a simplifying assumption that we can monitor only IPv6 + } + _ => return Ok(()), + } + + //Grab the IP Header so we can read the protocol and size of packet. + let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?; + if ipv4hdr.proto != IpProto::Tcp { + return Ok(()); + } + + //Calculate the offset of the TCP Header. The vast majority of the time + //the IP header is a fixed 20 bytes but.. its possible to occasionally + //have IP Options set that change the header size. So this handles that to + //avoid blowing out the telemetry by reading random data instead of real ports + let offset = if ipv4hdr.ihl() != 5 { + EthHdr::LEN + (ipv4hdr.ihl() * 4) as usize + } else { + EthHdr::LEN + Ipv4Hdr::LEN + }; + + //Grab the TCP Header so we can read the remote port. + let tcphdr: TcpHdr = ctx.load(offset).map_err(|_| ())?; + let dst_port = map_port(u16::from_be(tcphdr.dest)); + let len = u16::from_be(ipv4hdr.tot_len); + + //Grab an existing value for this port (if present) and add the size of this packet. + //No locking is needed since this is a per-cpu map. + let val = unsafe { + match EGRESS.get(&dst_port) { + Some(val) => (len as u64).add(val), + None => len as u64, + } + }; + + //Update the map with the new value. No locking is needed since this is a per-cpu map. + let _res = EGRESS.insert(&dst_port, &val, BPF_ANY.into()); + + Ok(()) +} + +/// Handles accounting of INGRESS packets, storing telemetry by map_port(REMOTE port) +/// There is some duplicate code between this method and try_tc_ingress. This was intentional +/// as the effort to de-duplicate wasn't worth it given its only a handful of duplicate lines +#[inline(always)] +fn try_tc_ingress(ctx: TcContext) -> Result<(), ()> { + let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?; + + // We are using match instead of simple if because the eBPF verifier doesn't seem to like ether_type used + // directly in an if without a stack variable copy. I think this is because EthHdr is a c-style "packed" + // struct that is memory aligned. Still Day-1 for rust and eBPF I guess + match ethhdr.ether_type { + EtherType::Ipv4 => { + // Since IPv6 adoption is low, we make a simplifying assumption that we can monitor only IPv6 + } + _ => return Ok(()), + } + + //Grab the IP Header so we can read the protocol and size of packet. + let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?; + if ipv4hdr.proto != IpProto::Tcp { + return Ok(()); + } + + match ipv4hdr.proto { + IpProto::Tcp => {} + _ => return Ok(()), + } + + //Calculate the offset of the TCP Header. The vast majority of the time + //the IP header is a fixed 20 bytes but.. its possible to occasionally + //have IP Options set that change the header size. So this handles that to + //avoid blowing out the telemetry by reading random data instead of real ports + let offset = if ipv4hdr.ihl() != 5 { + EthHdr::LEN + (ipv4hdr.ihl() * 4) as usize + } else { + EthHdr::LEN + Ipv4Hdr::LEN + }; + + //Grab the TCP Header so we can read the remote port. + let tcphdr: TcpHdr = ctx.load(offset).map_err(|_| ())?; + let src_port = map_port(u16::from_be(tcphdr.source)); + + //Grab the size of the packet (excluding the ethernet header, slightly inaccurate but its a rounding error) + let len = u16::from_be(ipv4hdr.tot_len); + + //Grab an existing value for this port (if present) and add the size of this packet. + //No locking is needed since this is a per-cpu map. + let val = unsafe { + match INGRESS.get(&src_port) { + Some(val) => (len as u64).add(val), + None => len as u64, + } + }; + + //Update the map with the new value. No locking is needed since this is a per-cpu map. + let _res = INGRESS.insert(&src_port, &val, BPF_ANY.into()); + + Ok(()) +} + +/// This is never used, its something that is required to satisfy the eBPF verifier since AYA and Rust support +/// are still pretty new. +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + unsafe { core::hint::unreachable_unchecked() } +} diff --git a/examples/tc-byte-count/tc-byte-count/Cargo.toml b/examples/tc-byte-count/tc-byte-count/Cargo.toml new file mode 100644 index 00000000..dafde5b0 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "tc-byte-count" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +aya = { git = "https://github.com/aya-rs/aya", features = ["async_tokio"] } +aya-log = { git = "https://github.com/aya-rs/aya" } +tc-byte-count-common = { path = "../tc-byte-count-common", features = ["user"] } +anyhow = "1.0" +clap = { version = "4.1", features = ["derive"] } +log = "0.4" +tokio = { version = "1.25", features = [ + "macros", + "rt", + "rt-multi-thread", + "net", + "signal", + "time" +] } +bytes = "1" +env_logger = "0.11" +lru = "0.12" +stats-cli = "3.0" + + +[[bin]] +name = "tc-byte-count" +path = "src/main.rs" diff --git a/examples/tc-byte-count/tc-byte-count/src/lib.rs b/examples/tc-byte-count/tc-byte-count/src/lib.rs new file mode 100644 index 00000000..514cf275 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count/src/lib.rs @@ -0,0 +1,124 @@ +use anyhow::{bail, Result}; +use inc_stats::Percentiles; +use std::time::Instant; + +/// Structure used to hold all aggregated values. +pub struct FlowStat { + time: Instant, + port: u16, + rx: Percentiles, + rx_total: u128, + rx_counters: Vec, + tx: Percentiles, + tx_total: u128, + tx_counters: Vec, +} + +impl FlowStat { + pub fn new(port: u16, num_cpus: usize) -> Self { + Self { + time: Instant::now(), + port, + rx: Percentiles::new(), + rx_total: 0, + rx_counters: vec![0; num_cpus], + tx: Percentiles::new(), + tx_total: 0, + tx_counters: vec![0; num_cpus], + } + } + + pub fn add_rx(&mut self, rx: u64) { + self.rx_total += rx as u128; + self.rx.add(&(rx as f64)); + } + + pub fn add_tx(&mut self, tx: u64) { + self.tx_total += tx as u128; + self.tx.add(&(tx as f64)); + } + + pub fn aggregate_rx(&mut self, index: usize, rx_new: u64) -> u64 { + let rx_prev = self.rx_counters[index]; + let delta = if rx_prev > rx_new { + rx_new + } else { + rx_new - rx_prev + }; + self.rx_counters[index] = rx_new; + delta + } + + pub fn aggregate_tx(&mut self, index: usize, tx_new: u64) -> u64 { + let tx_prev = self.tx_counters[index]; + let delta = if tx_prev > tx_new { + tx_new + } else { + tx_new - tx_prev + }; + self.tx_counters[index] = tx_new; + delta + } + + pub fn has_traffic(&self) -> bool { + self.rx_total > 0 || self.tx_total > 0 + } + + pub fn reset(&mut self) { + self.time = Instant::now(); + self.rx_total = 0; + self.rx = Percentiles::new(); + self.tx_total = 0; + self.tx = Percentiles::new(); + } + + /// Produce a summary of the FlowStat since the last summary. The Percentiles are managed here. + pub fn summary(&mut self, reset: bool) -> Result { + let (rx_p100, rx_p90, rx_p75, rx_p50) = match self.rx.percentiles(&[1.0, 0.90, 0.75, 0.50]) + { + Ok(Some(result)) => (result[0], result[1], result[2], result[3]), + _ => bail!("Error generating percentiles"), + }; + // .ok_or_else(|| anyhow!("Unexpected error with percentiles"))??; + let (tx_p100, tx_p90, tx_p75, tx_p50) = match self.tx.percentiles(&[1.0, 0.90, 0.75, 0.50]) + { + Ok(Some(result)) => (result[0], result[1], result[2], result[3]), + _ => bail!("Error generating percentiles"), + }; + let sample = FlowStatSummary { + start_time: self.time, + port: self.port, + rx_p50, + rx_p75, + rx_p90, + rx_p100, + rx_total: self.rx_total, + tx_p50, + tx_p75, + tx_p90, + tx_p100, + tx_total: self.tx_total, + }; + + if reset { + self.reset(); + } + + Ok(sample) + } +} + +pub struct FlowStatSummary { + pub start_time: Instant, + pub port: u16, + pub rx_p50: f64, + pub rx_p75: f64, + pub rx_p90: f64, + pub rx_p100: f64, + pub rx_total: u128, + pub tx_p50: f64, + pub tx_p75: f64, + pub tx_p90: f64, + pub tx_p100: f64, + pub tx_total: u128, +} diff --git a/examples/tc-byte-count/tc-byte-count/src/main.rs b/examples/tc-byte-count/tc-byte-count/src/main.rs new file mode 100644 index 00000000..7375d837 --- /dev/null +++ b/examples/tc-byte-count/tc-byte-count/src/main.rs @@ -0,0 +1,199 @@ +use anyhow::Result; +use aya::{ + include_bytes_aligned, + maps::{MapData, PerCpuHashMap, PerCpuValues}, + programs::{tc, SchedClassifier, TcAttachType}, + Ebpf, +}; +use aya_log::EbpfLogger; +use clap::Parser; +use log::{error, info, warn}; +use lru::LruCache; +use std::{num::NonZeroUsize, time::Instant}; +use tc_byte_count::FlowStat; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, Parser)] +struct Opt { + #[clap(short, long, default_value = "eth0")] + iface: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let opt = Opt::parse(); + + env_logger::init(); + info!("Starting..."); + + // This will include your eBPF object file as raw bytes at compile-time and load it at + // runtime. This approach is recommended for most real-world use cases. If you would + // like to specify the eBPF program at runtime rather than at compile-time, you can + // reach for `Bpf::load_file` instead. + #[cfg(debug_assertions)] + let mut bpf = Ebpf::load(include_bytes_aligned!( + "../../target/bpfel-unknown-none/debug/tc-byte-count" + ))?; + #[cfg(not(debug_assertions))] + let mut bpf = Ebpf::load(include_bytes_aligned!( + "../../target/bpfel-unknown-none/release/tc-byte-count" + ))?; + + if let Err(e) = EbpfLogger::init(&mut bpf) { + // This can happen if you remove all log statements from your eBPF program. + warn!("failed to initialize eBPF logger: {}. This can be ignored if you have no log lines in your eBPF program.", e); + } + + info!("Preparing eBPF programs and attachements..."); + + // error adding clsact to the interface if it is already added is harmless + // the full cleanup can be done with 'sudo tc qdisc del dev eth0 clsact'. + let _ = tc::qdisc_add_clsact(&opt.iface); + + // Prepare, load, and attach the eBPF program with entry-point function "tc_egress" + // into the Kernel at the Egress attach point in the Traffic Control module. + let egress_prg: &mut SchedClassifier = + bpf.program_mut("tc_egress").unwrap().try_into()?; + egress_prg.load()?; + egress_prg.attach(&opt.iface, TcAttachType::Egress)?; + + // Prepare, load, and attach the eBPF program with entry-point function "tc_ingress" + // into the Kernel at the Ingress attach point in the Traffic Control module. + let ingress_prg: &mut SchedClassifier = + bpf.program_mut("tc_ingress").unwrap().try_into()?; + ingress_prg.load()?; + ingress_prg.attach(&opt.iface, TcAttachType::Ingress)?; + + info!("Starting to gather telemetry..."); + + // Get a handle to the INGRESS telemetry map from the tc_ingress eBPF program. + // This map is a PreCpuHashMap so no locking is needed to coordinate aggregation. + let ingress: PerCpuHashMap<&MapData, u16, u64> = + PerCpuHashMap::try_from(bpf.map("INGRESS").unwrap())?; + + // Get a handle to the EGRESS telemetry map from the tc_egress eBPF program. + // This map is a PreCpuHashMap so no locking is needed to coordinate aggregation. + let egress: PerCpuHashMap<&MapData, u16, u64> = + PerCpuHashMap::try_from(bpf.map("EGRESS").unwrap())?; + + // Prepare an LRU Map for this userspace process to use when preparing aggregations across + // time windows, cpus, and ports. + let mut flows: LruCache = + LruCache::new(NonZeroUsize::new(1000).unwrap()); + + let sample_interval_millis = 20; + let summary_interval_millis = 1000; + let mut last_summary = Instant::now(); + loop { + // Simple loop that drives the main execution of our userspace aggregator. + // Every sample_interval (e.g. 20 ms ) milliseconds, we aggregate all per-cpu + // values for every port in the egress and ingress map. The bytes transfered since + // the previous aggregation are then stored by port for both rx and tx. After + // summary_interval (e.g. 1000 ms) samples are taken at the sample_interval_millis + // we produce a summary for each port that was active since the previous summary that includes: + // total_bytes transfered since the last summary, p50, p75, p90, and p100 bytes transferred in a sample. + sleep(Duration::from_millis(sample_interval_millis)).await; + + //Aggregate the raw RX and TX data from the eBPF program via the PerCpuHashMap + aggregate_map(Direction::EGRESS, &mut flows, &egress); + aggregate_map(Direction::INGRESS, &mut flows, &ingress); + + // Print a summary of the last summary_interval_samples samples for each port that had traffic since the last summary. + if last_summary.elapsed().as_millis() >= summary_interval_millis { + summarize_map(&mut flows); + last_summary = Instant::now(); + } + } +} + +/// For each Port present in the given PerCpuMap, aggregate its values for this sample interval +/// into a FlowStat in our LRU cache. +fn aggregate_map( + direction: Direction, + cache: &mut LruCache, + map: &PerCpuHashMap<&MapData, u16, u64>, +) { + for next_item in map.iter() { + match next_item { + Ok((key, value)) => { + aggregate_port(&direction, cache, (key, value)); + } + Err(e) => { + info!("error: {:?}", e); + } + } + } +} + +/// For a given port record, aggregate its telemetry across the PerCPU map entries and also +/// add record the aggregate for this sample into the FlowStat for later use in percentile +/// calculations +fn aggregate_port( + direction: &Direction, + cache: &mut LruCache, + record: (u16, PerCpuValues), +) { + let mut total = 0; + let len = record.1.len(); + let port = record.0; + + //Setup the function that will produce the 'default' value when a new entry is needed in the cache + let default = move || FlowStat::new(port, len); + + //Get the existing record from the cache for this port - or the default record we will use to seed the port. + let existing_record = cache.get_or_insert_mut(record.0, default); + + //For each CPU in the PerCPU map, aggregate the values for this sample into the FlowStat + for (i, new_value) in record.1.iter().enumerate() { + //keep a running total so we know when to add the value to the population used by the percentile calculator + total += match direction { + Direction::INGRESS => existing_record.aggregate_rx(i, *new_value), + Direction::EGRESS => existing_record.aggregate_tx(i, *new_value), + }; + } + + //add the aggregated value to the percentile population + match direction { + Direction::INGRESS => existing_record.add_rx(total), + Direction::EGRESS => existing_record.add_tx(total), + }; +} + +/// For each port in the given LRU cache, produce a summary of the samples since the last summary. +fn summarize_map(flows: &mut LruCache) { + for (key, value) in flows.iter_mut() { + //Only consider ports that had any traffic in the interval + if value.has_traffic() { + //When producing the summary, ensure the FlowStat resets its internal state in preparation + //for the next sample interval. + match value.summary(true) { + Ok(summary) => { + info!( + "Port: {} - rxt:{}, rx50:{}, rx75:{}, rx90:{}, rx100:{} - txt:{}, tx50:{}, tx75:{}, tx90:{}, tx100:{}", + summary.port, + summary.rx_total, + summary.rx_p50, + summary.rx_p75, + summary.rx_p90, + summary.rx_p100, + summary.tx_total, + summary.tx_p50, + summary.tx_p75, + summary.tx_p90, + summary.tx_p100, + ); + } + Err(e) => { + error!("Unable to process summary for {} {:?}", key, e); + } + } + } else { + value.reset(); + } + } +} + +enum Direction { + INGRESS, + EGRESS, +} diff --git a/examples/tc-byte-count/xtask/Cargo.toml b/examples/tc-byte-count/xtask/Cargo.toml new file mode 100644 index 00000000..9ea3688a --- /dev/null +++ b/examples/tc-byte-count/xtask/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "xtask" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +clap = { version = "4.1", features = ["derive"] } +aya-tool = { git = "https://github.com/aya-rs/aya" } diff --git a/examples/tc-byte-count/xtask/src/build_ebpf.rs b/examples/tc-byte-count/xtask/src/build_ebpf.rs new file mode 100644 index 00000000..d12b94d9 --- /dev/null +++ b/examples/tc-byte-count/xtask/src/build_ebpf.rs @@ -0,0 +1,67 @@ +use std::{path::PathBuf, process::Command}; + +use clap::Parser; + +#[derive(Debug, Copy, Clone)] +pub enum Architecture { + BpfEl, + BpfEb, +} + +impl std::str::FromStr for Architecture { + type Err = String; + + fn from_str(s: &str) -> Result { + Ok(match s { + "bpfel-unknown-none" => Architecture::BpfEl, + "bpfeb-unknown-none" => Architecture::BpfEb, + _ => return Err("invalid target".to_owned()), + }) + } +} + +impl std::fmt::Display for Architecture { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Architecture::BpfEl => "bpfel-unknown-none", + Architecture::BpfEb => "bpfeb-unknown-none", + }) + } +} + +#[derive(Debug, Parser)] +pub struct Options { + /// Set the endianness of the BPF target + #[clap(default_value = "bpfel-unknown-none", long)] + pub target: Architecture, + /// Build the release target + #[clap(long)] + pub release: bool, +} + +pub fn build_ebpf(opts: Options) -> Result<(), anyhow::Error> { + let dir = PathBuf::from("tc-byte-count-ebpf"); + let target = format!("--target={}", opts.target); + let mut args = vec![ + "build", + target.as_str(), + "-Z", + "build-std=core", + ]; + if opts.release { + args.push("--release") + } + + // Command::new creates a child process which inherits all env variables. This means env + // vars set by the cargo xtask command are also inherited. RUSTUP_TOOLCHAIN is removed + // so the rust-toolchain.toml file in the -ebpf folder is honored. + + let status = Command::new("cargo") + .current_dir(&dir) + .env_remove("RUSTUP_TOOLCHAIN") + .args(&args) + .status() + .expect("failed to build bpf program"); + assert!(status.success()); + Ok(()) +} diff --git a/examples/tc-byte-count/xtask/src/main.rs b/examples/tc-byte-count/xtask/src/main.rs new file mode 100644 index 00000000..c1c594e0 --- /dev/null +++ b/examples/tc-byte-count/xtask/src/main.rs @@ -0,0 +1,33 @@ +mod build_ebpf; +mod run; + +use std::process::exit; + +use clap::Parser; + +#[derive(Debug, Parser)] +pub struct Options { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +enum Command { + BuildEbpf(build_ebpf::Options), + Run(run::Options), +} + +fn main() { + let opts = Options::parse(); + + use Command::*; + let ret = match opts.command { + BuildEbpf(opts) => build_ebpf::build_ebpf(opts), + Run(opts) => run::run(opts), + }; + + if let Err(e) = ret { + eprintln!("{e:#}"); + exit(1); + } +} diff --git a/examples/tc-byte-count/xtask/src/run.rs b/examples/tc-byte-count/xtask/src/run.rs new file mode 100644 index 00000000..f9754b4d --- /dev/null +++ b/examples/tc-byte-count/xtask/src/run.rs @@ -0,0 +1,71 @@ +use std::process::Command; + +use anyhow::Context as _; +use clap::Parser; + +use crate::build_ebpf::{build_ebpf, Architecture, Options as BuildOptions}; + +#[derive(Debug, Parser)] +pub struct Options { + /// Set the endianness of the BPF target + #[clap(default_value = "bpfel-unknown-none", long)] + pub bpf_target: Architecture, + /// Build and run the release target + #[clap(long)] + pub release: bool, + /// The command used to wrap your application + #[clap(short, long, default_value = "sudo -E")] + pub runner: String, + /// Arguments to pass to your application + #[clap(name = "args", last = true)] + pub run_args: Vec, +} + +/// Build the project +fn build(opts: &Options) -> Result<(), anyhow::Error> { + let mut args = vec!["build"]; + if opts.release { + args.push("--release") + } + let status = Command::new("cargo") + .args(&args) + .status() + .expect("failed to build userspace"); + assert!(status.success()); + Ok(()) +} + +/// Build and run the project +pub fn run(opts: Options) -> Result<(), anyhow::Error> { + // build our ebpf program followed by our application + build_ebpf(BuildOptions { + target: opts.bpf_target, + release: opts.release, + }) + .context("Error while building eBPF program")?; + build(&opts).context("Error while building userspace application")?; + + // profile we are building (release or debug) + let profile = if opts.release { "release" } else { "debug" }; + let bin_path = format!("target/{profile}/tc-byte-count"); + + // arguments to pass to the application + let mut run_args: Vec<_> = + opts.run_args.iter().map(String::as_str).collect(); + + // configure args + let mut args: Vec<_> = opts.runner.trim().split_terminator(' ').collect(); + args.push(bin_path.as_str()); + args.append(&mut run_args); + + // run the command + let status = Command::new(args.first().expect("No first argument")) + .args(args.iter().skip(1)) + .status() + .expect("failed to run the command"); + + if !status.success() { + anyhow::bail!("Failed to run `{}`", args.join(" ")); + } + Ok(()) +}