Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an example of lock-free telemetry aggregation between kernel s… #154

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/tc-byte-count/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[alias]
xtask = "run --package xtask --"
13 changes: 13 additions & 0 deletions examples/tc-byte-count/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
### https://raw.github.com/github/gitignore/master/Rust.gitignore

# Generated by Cargo
# will have compiled files and executables
debug/
target/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk
3 changes: 3 additions & 0 deletions examples/tc-byte-count/.vim/coc-settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.linkedProjects": ["Cargo.toml", "tc-byte-count-ebpf/Cargo.toml"]
}
3 changes: 3 additions & 0 deletions examples/tc-byte-count/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.linkedProjects": ["Cargo.toml", "tc-byte-count-ebpf/Cargo.toml"]
}
3 changes: 3 additions & 0 deletions examples/tc-byte-count/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[workspace]
members = ["tc-byte-count", "tc-byte-count-common", "xtask"]
resolver="2"
43 changes: 43 additions & 0 deletions examples/tc-byte-count/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# tc-byte-count

This application is intended to demonstrate how to use several imporant BPF primitives via aya, including:

1. Basic eBPF program attachment for Traffic Control (TC).
1. How to instrument ingress and egress traffic.
1. Lock free telemetry aggregation between kernel (eBPF) and userpsace programs.

The example program itself uses a realistic scenario where the author wishes to emitt metrics about ingress and
egress network traffic data rates grouped by remote port. The data is gathered using eBPF programs attached to
the tc_egress and tc_ingress instrumentation points in the kernel. Each packet is inspected and then the byte
count of the packet is incremented for the packets remote port in a "per-cpu" map. These "pre-cpu" maps allow
us to avoid using locks for our get-increment-put operation on the map. The userspace program them periodically
aggregates the map data, summing the per-cpu values in order to produce a set of percentiles that are logged
every second. This is precisely the kind of data one might want when sizing or monitoring a bursty network
application that is also sensitive to data loss.

## Prerequisites

1. Install a rust stable toolchain: `rustup install stable`
1. Install a rust nightly toolchain: `rustup install nightly`
1. Install bpf-linker: `cargo install bpf-linker`

## Build eBPF

```bash
cargo xtask build-ebpf
```

To perform a release build you can use the `--release` flag.
You may also change the target architecture with the `--target` flag

## Build Userspace

```bash
cargo build
```

## Run

```bash
cargo xtask run
```
14 changes: 14 additions & 0 deletions examples/tc-byte-count/tc-byte-count-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "tc-byte-count-common"
version = "0.1.0"
edition = "2021"

[features]
default = []
user = ["aya"]

[dependencies]
aya = { git = "https://github.com/aya-rs/aya", optional = true }

[lib]
path = "src/lib.rs"
1 change: 1 addition & 0 deletions examples/tc-byte-count/tc-byte-count-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#![no_std]
6 changes: 6 additions & 0 deletions examples/tc-byte-count/tc-byte-count-ebpf/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build]
target-dir = "../target"
target = "bpfel-unknown-none"

[unstable]
build-std = ["core"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"rust-analyzer.cargo.target": "bpfel-unknown-none",
"rust-analyzer.checkOnSave.allTargets": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"rust-analyzer.cargo.target": "bpfel-unknown-none",
"rust-analyzer.checkOnSave.allTargets": false
}
34 changes: 34 additions & 0 deletions examples/tc-byte-count/tc-byte-count-ebpf/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "tc-byte-count-ebpf"
version = "0.1.0"
edition = "2021"

[dependencies]
aya-ebpf = { git = "https://github.com/aya-rs/aya" }
aya-log-ebpf = { git = "https://github.com/aya-rs/aya" }
tc-byte-count-common = { path = "../tc-byte-count-common" }
memoffset = "0.8"
network-types = "0.0.4"

[[bin]]
name = "tc-byte-count"
path = "src/main.rs"

[profile.dev]
opt-level = 3
debug = false
debug-assertions = false
overflow-checks = false
lto = true
panic = "abort"
incremental = false
codegen-units = 1
rpath = false

[profile.release]
lto = true
panic = "abort"
codegen-units = 1

[workspace]
members = []
2 changes: 2 additions & 0 deletions examples/tc-byte-count/tc-byte-count-ebpf/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"
184 changes: 184 additions & 0 deletions examples/tc-byte-count/tc-byte-count-ebpf/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#![no_std]
#![no_main]

use core::ops::Add;

use aya_ebpf::{
bindings::{BPF_ANY, TC_ACT_PIPE},
macros::{classifier, map},
maps::LruPerCpuHashMap,
programs::TcContext,
};

use network_types::{
eth::{EthHdr, EtherType},
ip::{IpProto, Ipv4Hdr},
tcp::TcpHdr,
};

/// An LRU map that will hold information about bytes transmited.
/// The map is keyed on remote port. Values in this map are per-cpu,
/// meaning no locking is required to ensure consistent updates.
/// Values are evicted only when space is needed in the map.
#[map]
static EGRESS: LruPerCpuHashMap<u16, u64> = LruPerCpuHashMap::with_max_entries(1024, 0);

/// An LRU map that will hold information about bytes received.
/// The map is keyed on remote port. Values in this map are per-cpu,
/// meaning no locking is required to ensure consistent updates.
/// Values are evicted only when space is needed in the map.
#[map]
static INGRESS: LruPerCpuHashMap<u16, u64> = LruPerCpuHashMap::with_max_entries(1024, 0);

/// Entry point for our TrafficControl "EGRESS" eBPF attachment point.
#[classifier]
pub fn tc_egress(ctx: TcContext) -> i32 {
let _res = try_tc_egress(ctx);

//Always allow the packet to continue to its intended destination
TC_ACT_PIPE
}

/// Entry point for our TrafficControl "INGRESS" eBPF attachment point.
#[classifier]
pub fn tc_ingress(ctx: TcContext) -> i32 {
let _res = try_tc_ingress(ctx);

//Always allow the packet to continue to its intended destination
TC_ACT_PIPE
}

/// In order to reduce the total memory and cpu resource expended in
/// producing this telemetry, we collapse remote port ranges that are unlikely
/// to be of interest. For example, interesting ports include:
/// mysql: 3306 sqlserver: 1433 postgres: 5432, oracle: 1521, ephemeral: 32768 - 65535
#[inline(always)]
fn map_port(port: u16) -> u16 {
if (32768..=65535).contains(&port) {
//ephemeral range, we can collapse these entirely
0
} else {
// anything else, lets track it specifically
port
}
}

/// Handles accounting of EGRESS packets, storing telemetry by map_port(REMOTE port)
/// There is some duplicate code between this method and try_tc_ingress. This was intentional
/// as the effort to de-duplicate wasn't worth it given its only a handful of duplicate lines
#[inline(always)]
fn try_tc_egress(ctx: TcContext) -> Result<(), ()> {
let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?;

// We are using match instead of simple if because the eBPF verifier doesn't seem to like ether_type used
// directly in an if without a stack variable copy. I think this is because EthHdr is a c-style "packed"
// struct that is memory aligned. Still Day-1 for rust and eBPF I guess
match ethhdr.ether_type {
EtherType::Ipv4 => {
// Since IPv6 adoption is low, we make a simplifying assumption that we can monitor only IPv6
}
_ => return Ok(()),
}

//Grab the IP Header so we can read the protocol and size of packet.
let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?;
if ipv4hdr.proto != IpProto::Tcp {
return Ok(());
}

//Calculate the offset of the TCP Header. The vast majority of the time
//the IP header is a fixed 20 bytes but.. its possible to occasionally
//have IP Options set that change the header size. So this handles that to
//avoid blowing out the telemetry by reading random data instead of real ports
let offset = if ipv4hdr.ihl() != 5 {
EthHdr::LEN + (ipv4hdr.ihl() * 4) as usize
} else {
EthHdr::LEN + Ipv4Hdr::LEN
};

//Grab the TCP Header so we can read the remote port.
let tcphdr: TcpHdr = ctx.load(offset).map_err(|_| ())?;
let dst_port = map_port(u16::from_be(tcphdr.dest));
let len = u16::from_be(ipv4hdr.tot_len);

//Grab an existing value for this port (if present) and add the size of this packet.
//No locking is needed since this is a per-cpu map.
let val = unsafe {
match EGRESS.get(&dst_port) {
Some(val) => (len as u64).add(val),
None => len as u64,
}
};

//Update the map with the new value. No locking is needed since this is a per-cpu map.
let _res = EGRESS.insert(&dst_port, &val, BPF_ANY.into());

Ok(())
}

/// Handles accounting of INGRESS packets, storing telemetry by map_port(REMOTE port)
/// There is some duplicate code between this method and try_tc_ingress. This was intentional
/// as the effort to de-duplicate wasn't worth it given its only a handful of duplicate lines
#[inline(always)]
fn try_tc_ingress(ctx: TcContext) -> Result<(), ()> {
let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?;

// We are using match instead of simple if because the eBPF verifier doesn't seem to like ether_type used
// directly in an if without a stack variable copy. I think this is because EthHdr is a c-style "packed"
// struct that is memory aligned. Still Day-1 for rust and eBPF I guess
match ethhdr.ether_type {
EtherType::Ipv4 => {
// Since IPv6 adoption is low, we make a simplifying assumption that we can monitor only IPv6
}
_ => return Ok(()),
}

//Grab the IP Header so we can read the protocol and size of packet.
let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?;
if ipv4hdr.proto != IpProto::Tcp {
return Ok(());
}

match ipv4hdr.proto {
IpProto::Tcp => {}
_ => return Ok(()),
}

//Calculate the offset of the TCP Header. The vast majority of the time
//the IP header is a fixed 20 bytes but.. its possible to occasionally
//have IP Options set that change the header size. So this handles that to
//avoid blowing out the telemetry by reading random data instead of real ports
let offset = if ipv4hdr.ihl() != 5 {
EthHdr::LEN + (ipv4hdr.ihl() * 4) as usize
} else {
EthHdr::LEN + Ipv4Hdr::LEN
};

//Grab the TCP Header so we can read the remote port.
let tcphdr: TcpHdr = ctx.load(offset).map_err(|_| ())?;
let src_port = map_port(u16::from_be(tcphdr.source));

//Grab the size of the packet (excluding the ethernet header, slightly inaccurate but its a rounding error)
let len = u16::from_be(ipv4hdr.tot_len);

//Grab an existing value for this port (if present) and add the size of this packet.
//No locking is needed since this is a per-cpu map.
let val = unsafe {
match INGRESS.get(&src_port) {
Some(val) => (len as u64).add(val),
None => len as u64,
}
};

//Update the map with the new value. No locking is needed since this is a per-cpu map.
let _res = INGRESS.insert(&src_port, &val, BPF_ANY.into());

Ok(())
}

/// This is never used, its something that is required to satisfy the eBPF verifier since AYA and Rust support
/// are still pretty new.
#[panic_handler]
fn panic(_info: &core::panic::PanicInfo) -> ! {
unsafe { core::hint::unreachable_unchecked() }
}
30 changes: 30 additions & 0 deletions examples/tc-byte-count/tc-byte-count/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "tc-byte-count"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
aya = { git = "https://github.com/aya-rs/aya", features = ["async_tokio"] }
aya-log = { git = "https://github.com/aya-rs/aya" }
tc-byte-count-common = { path = "../tc-byte-count-common", features = ["user"] }
anyhow = "1.0"
clap = { version = "4.1", features = ["derive"] }
log = "0.4"
tokio = { version = "1.25", features = [
"macros",
"rt",
"rt-multi-thread",
"net",
"signal",
"time"
] }
bytes = "1"
env_logger = "0.11"
lru = "0.12"
stats-cli = "3.0"


[[bin]]
name = "tc-byte-count"
path = "src/main.rs"
Loading
Loading