From db17cf2ec529eb47801ae06db22e42b90f5e4cea Mon Sep 17 00:00:00 2001 From: iczero Date: Wed, 23 Aug 2023 18:13:22 -0700 Subject: [PATCH] implement output to directory --- Cargo.lock | 220 +++++++++++++- parse-tcp/.gitignore | 1 + parse-tcp/Cargo.toml | 5 +- parse-tcp/src/bin/reassemble.rs | 261 ++++++----------- parse-tcp/src/connection.rs | 17 +- parse-tcp/src/flow_table.rs | 64 +++-- parse-tcp/src/handler.rs | 495 ++++++++++++++++++++++++++++++++ parse-tcp/src/lib.rs | 12 +- parse-tcp/src/parser.rs | 16 +- parse-tcp/src/stream.rs | 18 +- 10 files changed, 892 insertions(+), 217 deletions(-) create mode 100644 parse-tcp/.gitignore create mode 100644 parse-tcp/src/handler.rs diff --git a/Cargo.lock b/Cargo.lock index c2bc110..a4c30f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,55 @@ dependencies = [ "memchr", ] +[[package]] +name = "anstream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is-terminal", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea" + +[[package]] +name = "anstyle-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" +dependencies = [ + "anstyle", + "windows-sys", +] + [[package]] name = "arrayvec" version = "0.7.4" @@ -59,6 +108,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + [[package]] name = "bytes" version = "1.4.0" @@ -86,6 +141,47 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0fc239e0f6cb375d2402d48afb92f76f5404fd1df208a41930ec81eda078bea" +[[package]] +name = "clap" +version = "4.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb690e81c7840c0d7aade59f242ea3b41b9bc27bcd5997890e7702ae4b32e487" +dependencies = [ + "clap_builder", + "clap_derive", + "once_cell", +] + +[[package]] +name = "clap_builder" +version = "4.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ed2e96bc16d8d740f6f48d663eddf4b8a0983e79210fd55479b7bcd0a69860e" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" + [[package]] name = "color-eyre" version = "0.6.2" @@ -113,6 +209,12 @@ dependencies = [ "tracing-error", ] +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -132,6 +234,27 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "errno" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "etherparse" version = "0.13.0" @@ -168,6 +291,12 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.3.2" @@ -180,6 +309,23 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" +[[package]] +name = "is-terminal" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" +dependencies = [ + "hermit-abi", + "rustix", + "windows-sys", +] + +[[package]] +name = "itoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" + [[package]] name = "kinesin" version = "0.1.0" @@ -231,6 +377,12 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "linux-raw-sys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" + [[package]] name = "lock_api" version = "0.4.10" @@ -372,12 +524,15 @@ dependencies = [ name = "parse-tcp" version = "0.1.0" dependencies = [ + "clap", "color-eyre", "etherparse", "eyre", "kinesin-rdt", "parking_lot", "pcap-parser", + "serde", + "serde_json", "tokio", "tracing", "tracing-error", @@ -426,7 +581,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -488,12 +643,62 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "0.38.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" +dependencies = [ + "bitflags 2.4.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "serde" +version = "1.0.185" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be9b6f69f1dfd54c3b568ffa45c310d6973a5e5148fd40cf515acaf38cf5bc31" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.185" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc59dfdcbad1437773485e0367fea4b090a2e0a16d9ffc46af47764536a298ec" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -528,6 +733,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "syn" version = "2.0.29" @@ -678,6 +889,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.4.1" @@ -685,6 +902,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/parse-tcp/.gitignore b/parse-tcp/.gitignore new file mode 100644 index 0000000..8c0a17c --- /dev/null +++ b/parse-tcp/.gitignore @@ -0,0 +1 @@ +/test-output diff --git a/parse-tcp/Cargo.toml b/parse-tcp/Cargo.toml index f4ca296..952c67d 100644 --- a/parse-tcp/Cargo.toml +++ b/parse-tcp/Cargo.toml @@ -6,14 +6,17 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +clap = { version = "4.3.24", features = ["derive"] } color-eyre = "0.6.2" etherparse = "0.13.0" eyre = "0.6.8" kinesin-rdt = { path = '../kinesin-rdt' } parking_lot = "0.12.1" pcap-parser = "0.14.1" +serde = { version = "1.0.185", features = ["derive"] } +serde_json = "1.0.105" tokio = { version = "1.27.0", features = ["tracing", "full"] } tracing = "0.1.37" tracing-error = "0.2.0" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } -uuid = { version = "1.4.1", features = ["v4"] } +uuid = { version = "1.4.1", features = ["v4", "serde"] } diff --git a/parse-tcp/src/bin/reassemble.rs b/parse-tcp/src/bin/reassemble.rs index 3d6ea9d..c90b6fa 100644 --- a/parse-tcp/src/bin/reassemble.rs +++ b/parse-tcp/src/bin/reassemble.rs @@ -1,203 +1,116 @@ use std::fs::File; -use std::io::BufWriter; -use std::io::Write; -use std::ops::Range; +use std::io::Read; +use std::path::PathBuf; +use std::sync::Arc; -use eyre::{eyre, Context}; -use parse_tcp::connection::{Connection, Direction}; +use clap::Parser as ClapParser; +use eyre::Context; use parse_tcp::flow_table::FlowTable; -use parse_tcp::parser::{ParseLayer, Parser}; -use parse_tcp::stream::{SegmentInfo, SegmentType}; -use parse_tcp::{initialize_logging, ConnectionHandler, PacketExtra}; +use parse_tcp::handler::{DirectoryOutputHandler, DirectoryOutputSharedInfoInner, DumpHandler}; +use parse_tcp::parser::{ParseLayer, TcpParser}; +use parse_tcp::{initialize_logging, PacketExtra, TcpMeta}; use pcap_parser::traits::PcapReaderIterator; use pcap_parser::{LegacyPcapReader, Linktype, PcapBlockOwned, PcapError}; use tracing::info; -use tracing::trace; -/* -fn dump_as_ascii(buf: &[u8]) { - let mut writer = BufWriter::new(std::io::stdout()); - buf - .iter() - .copied() - .flat_map(std::ascii::escape_default) - .for_each(|b| { - writer.write_all(&[b]).expect("failed write"); - }); - let _ = writer.write_all(b"\n"); +#[derive(ClapParser, Debug)] +struct Args { + /// Input capture file + #[arg(short = 'f', long)] + input: PathBuf, + /// Directory to write stream data. If not provided, will dump to stdout. + #[arg(short = 'd', long)] + output_dir: Option, } -*/ -fn dump_as_readable_ascii(buf: &[u8], newline: bool) { - let mut writer = BufWriter::new(std::io::stdout()); - buf.iter() - .copied() - .map(|v| { - if (b' '..=b'~').contains(&v) || v == b'\n' { - v - } else { - b'.' - } - }) - .for_each(|v| writer.write_all(&[v]).expect("failed write")); - if newline { - let _ = writer.write_all(b"\n"); +fn main() -> eyre::Result<()> { + initialize_logging(); + info!("Hello, world!"); + let args = Args::parse(); + let file = File::open(args.input).wrap_err("cannot open file")?; + if let Some(out_dir) = args.output_dir { + write_to_dir(file, out_dir)?; + } else { + dump_to_stdout(file)?; } + Ok(()) } -struct DumpHandler { - gaps: Vec>, - segments: Vec, - buf: Vec, - forward_has_data: bool, - reverse_has_data: bool, -} +fn dump_to_stdout(file: File) -> eyre::Result<()> { + let mut flowtable: FlowTable = FlowTable::new(()); -impl DumpHandler { - fn dump_stream(&mut self, connection: &mut Connection, direction: Direction) { - self.gaps.clear(); - self.segments.clear(); - self.buf.clear(); - // indiscriminately dump everything to stdout - let mut flow = connection.forward_flow.clone(); - if direction == Direction::Reverse { - flow.reverse(); - } - println!("\n====================\n{} ({})", flow, connection.uuid); - let stream = connection.get_stream(direction); - let dump_len = if stream.readable_buffered_length() > 0 { - stream.readable_buffered_length() - } else { - stream.total_buffered_length().min(128 << 10) - }; - let end_offset = stream.buffer_start() + dump_len as u64; - println!(" length: {dump_len}\n\n"); - stream.read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| { - let (a, b) = slice.as_slices(); - self.buf.extend_from_slice(a); - if let Some(b) = b { - self.buf.extend_from_slice(b); - } - }); - info!("gaps (length {})", self.gaps.len()); - for gap in &self.gaps { - info!(" gap {} -> {}", gap.start, gap.end); - } - info!("segments (length {})", self.segments.len()); - for segment in &self.segments { - info!(" offset {}", segment.offset); - info!(" reverse acked: {}", segment.reverse_acked); - match segment.data { - SegmentType::Data { len, is_retransmit } => { - info!(" type: data"); - info!(" len {len}, retransmit {is_retransmit}"); - } - SegmentType::Ack { window } => { - info!(" type: ack"); - info!(" window: {window}"); - } - SegmentType::Fin { end_offset } => { - info!(" type: fin"); - info!(" end offset: {end_offset}"); - } - } - } - info!("data (length {})", self.buf.len()); - dump_as_readable_ascii(&self.buf, true); - } + parse_packets(file, |meta, data, extra| { + let _ = flowtable.handle_packet(meta, data, extra); + Ok(()) + })?; + + flowtable.close(); + Ok(()) } -impl ConnectionHandler for DumpHandler { - type InitialData = (); - fn new(_init: (), _conn: &mut Connection) -> Self { - DumpHandler { - gaps: Vec::new(), - segments: Vec::new(), - buf: Vec::new(), - forward_has_data: false, - reverse_has_data: false, - } - } +fn write_to_dir(file: File, out_dir: PathBuf) -> eyre::Result<()> { + let shared_info = DirectoryOutputSharedInfoInner::new(out_dir)?; + let mut flowtable: FlowTable = FlowTable::new(shared_info.clone()); - fn data_received(&mut self, connection: &mut Connection, direction: Direction) { - let (fwd_data, rev_data) = match direction { - Direction::Forward => (&mut self.forward_has_data, &mut self.reverse_has_data), - Direction::Reverse => (&mut self.reverse_has_data, &mut self.forward_has_data), - }; - let fwd_readable_len = connection.get_stream(direction).readable_buffered_length(); - *fwd_data = fwd_readable_len > 0; + parse_packets(file, |meta, data, extra| { + let _ = flowtable.handle_packet(meta, data, extra); + Ok(()) + })?; - // dump reverse stream buffer if it has data - if *rev_data { - let rev_stream = connection.get_stream(direction.swap()); - if rev_stream.readable_buffered_length() > 0 { - trace!("reverse stream has data, will dump"); - self.dump_stream(connection, direction.swap()); - } - } + flowtable.close(); + drop(flowtable); + Arc::into_inner(shared_info).unwrap().close()?; + Ok(()) +} - // dump forward stream if limits hit - let fwd_stream = connection.get_stream(direction); - if fwd_readable_len > 64 << 10 || fwd_stream.total_buffered_length() > 256 << 10 { - trace!("forward stream exceeded limits, will dump"); - self.dump_stream(connection, direction); +fn parse_packets( + reader: impl Read, + mut handler: impl FnMut(TcpMeta, &[u8], PacketExtra) -> eyre::Result<()>, +) -> eyre::Result<()> { + let mut parser = TcpParser::new(); + let mut packet_counter = 0u64; + read_pcap_legacy(reader, |block| match block { + PcapBlockOwned::LegacyHeader(hdr) => { + info!("pcap linktype: {:?}", hdr.network); + let layer = match hdr.network { + Linktype::ETHERNET => ParseLayer::Link, + Linktype::RAW => ParseLayer::IP, + Linktype::IPV4 => ParseLayer::IP, + Linktype::IPV6 => ParseLayer::IP, + Linktype::NULL => ParseLayer::BsdLoopback, + _ => eyre::bail!("pcap header: unknown link type {:?}", hdr.network), + }; + parser.layer = layer; + Ok(()) } - } + PcapBlockOwned::Legacy(packet) => { + let index = packet_counter; + packet_counter += 1; + let extra = PacketExtra::LegacyPcap { + index, + ts_sec: packet.ts_sec, + ts_usec: packet.ts_usec, + }; - fn will_retire(&mut self, connection: &mut Connection) { - if connection.get_stream(Direction::Forward).total_buffered_length() > 0 { - self.dump_stream(connection, Direction::Forward); - } - if connection.get_stream(Direction::Reverse).total_buffered_length() > 0 { - self.dump_stream(connection, Direction::Reverse); + if let Some((meta, data)) = parser.parse_packet(packet.data) { + handler(meta, data, extra)?; + }; + Ok(()) } - } + PcapBlockOwned::NG(_) => unreachable!("read pcapng block in plain pcap"), + }) } -fn main() -> eyre::Result<()> { - initialize_logging(); - info!("Hello, world!"); - - let file_name = std::env::args() - .nth(1) - .ok_or_else(|| eyre!("no filename provided"))?; - let file = File::open(file_name).wrap_err("cannot open file")?; +fn read_pcap_legacy( + reader: impl Read, + mut handler: impl FnMut(PcapBlockOwned<'_>) -> eyre::Result<()>, +) -> eyre::Result<()> { let mut pcap_reader = - LegacyPcapReader::new(65536, file).wrap_err("failed to create LegacyPcapReader")?; - let mut parser = Parser::new(); - let mut flowtable: FlowTable = FlowTable::new(()); - let mut packet_counter = 0u64; + LegacyPcapReader::new(65536, reader).wrap_err("failed to create LegacyPcapReader")?; loop { match pcap_reader.next() { Ok((offset, block)) => { - match block { - PcapBlockOwned::LegacyHeader(hdr) => { - info!("pcap linktype: {:?}", hdr.network); - let layer = match hdr.network { - Linktype::ETHERNET => ParseLayer::Link, - Linktype::RAW => ParseLayer::IP, - Linktype::IPV4 => ParseLayer::IP, - Linktype::IPV6 => ParseLayer::IP, - Linktype::NULL => ParseLayer::BsdLoopback, - _ => eyre::bail!("pcap header: unknown link type {:?}", hdr.network), - }; - parser.layer = layer; - } - PcapBlockOwned::Legacy(packet) => { - let index = packet_counter; - packet_counter += 1; - let extra = PacketExtra::LegacyPcap { - index, - ts_sec: packet.ts_sec, - ts_usec: packet.ts_usec, - }; - - if let Some((meta, data)) = parser.parse_packet(packet.data) { - flowtable.handle_packet(meta, data, extra); - }; - } - PcapBlockOwned::NG(_) => unreachable!("read pcapng block in plain pcap"), - } + handler(block)?; pcap_reader.consume(offset); } Err(PcapError::Eof) => break, @@ -222,7 +135,5 @@ fn main() -> eyre::Result<()> { } } } - - flowtable.close(); Ok(()) } diff --git a/parse-tcp/src/connection.rs b/parse-tcp/src/connection.rs index 7f352ef..1942dec 100644 --- a/parse-tcp/src/connection.rs +++ b/parse-tcp/src/connection.rs @@ -101,7 +101,10 @@ pub enum HandlePacketResult { impl Connection { /// create new connection with flow - pub fn new(forward_flow: Flow, handler_init_data: H::InitialData) -> Connection { + pub fn new( + forward_flow: Flow, + handler_init_data: H::InitialData, + ) -> Result, H::ConstructError> { let mut conn = Connection { uuid: Uuid::new_v4(), forward_flow, @@ -112,9 +115,9 @@ impl Connection { reverse_stream: Stream::new(), event_handler: None, }; - let handler = H::new(handler_init_data, &mut conn); + let handler = H::new(handler_init_data, &mut conn)?; conn.event_handler = Some(handler); - conn + Ok(conn) } /// get stream in direction @@ -461,6 +464,7 @@ impl Connection { mod test { use crate::{initialize_logging, ConnectionHandler, PacketExtra, TcpFlags, TcpMeta}; use parking_lot::Mutex; + use std::convert::Infallible; use std::mem; use super::{Connection, Direction}; @@ -490,8 +494,9 @@ mod test { struct TestHandler; impl ConnectionHandler for TestHandler { type InitialData = (); - fn new(_init: (), _conn: &mut Connection) -> Self { - TestHandler + type ConstructError = Infallible; + fn new(_init: (), _conn: &mut Connection) -> Result { + Ok(TestHandler) } fn handshake_done(&mut self, _conn: &mut Connection) { let mut guard = HANDSHAKE_DONE.lock(); @@ -539,7 +544,7 @@ mod test { option_timestamp: None, }; - let mut conn: Connection = Connection::new(hs1.clone().into(), ()); + let mut conn: Connection = Connection::new((&hs1).into(), ()).unwrap(); assert!(conn.handle_packet(hs1.clone(), &[], PacketExtra::None)); let mut hs2 = swap_meta(&hs1); hs2.seq_number = 315848; diff --git a/parse-tcp/src/flow_table.rs b/parse-tcp/src/flow_table.rs index 3697950..3f68ee4 100644 --- a/parse-tcp/src/flow_table.rs +++ b/parse-tcp/src/flow_table.rs @@ -30,7 +30,7 @@ impl Flow { /// compare to TcpMeta pub fn compare_tcp_meta(&self, other: &TcpMeta) -> FlowCompare { - self.compare(&other.clone().into()) + self.compare(&other.into()) } /// compare to other @@ -55,8 +55,8 @@ impl Flow { } } -impl From for Flow { - fn from(value: TcpMeta) -> Self { +impl From<&TcpMeta> for Flow { + fn from(value: &TcpMeta) -> Self { Flow { src_addr: value.src_addr, src_port: value.src_port, @@ -148,9 +148,33 @@ impl FlowTable { } } - /// handle a packet - pub fn handle_packet(&mut self, meta: TcpMeta, data: &[u8], extra: PacketExtra) -> bool { - let flow = meta.clone().into(); + /// handle a packet, creating a flow if necessary + pub fn handle_packet( + &mut self, + meta: TcpMeta, + data: &[u8], + extra: PacketExtra, + ) -> Result { + match self.handle_packet_direct(meta, data, extra) { + Ok(b) => Ok(b), + Err((meta, extra)) => { + self.create_flow((&meta).into(), self.handler_init_data.clone())?; + Ok(self + .handle_packet_direct(meta, data, extra) + .map_err(|_| ()) + .expect("no flow after created flow")) + } + } + } + + /// handle a packet, return Err if flow does not exist (and return args) + pub fn handle_packet_direct( + &mut self, + meta: TcpMeta, + data: &[u8], + extra: PacketExtra, + ) -> Result { + let flow = (&meta).into(); let did_something; match self.map.get_mut(&flow) { Some(conn) => { @@ -159,24 +183,30 @@ impl FlowTable { // remove flow if connection is no more self.retire_flow(flow); } - did_something - } - None => { - let conn = Connection::new(flow.clone(), self.handler_init_data.clone()); - debug!("new flow: {} {flow}", conn.uuid); - self.map.insert(flow, conn); - self.handle_packet(meta, data, extra) + Ok(did_something) } + None => Err((meta, extra)), } } - pub fn retire_flow(&mut self, flow_id: Flow) { - let Some(mut conn) = self.map.remove(&flow_id) else { - warn!("retire_flow called on non-existent flow?: {flow_id}"); + /// create flow + pub fn create_flow( + &mut self, + flow: Flow, + init_data: H::InitialData, + ) -> Result>, H::ConstructError> { + let conn = Connection::new(flow.clone(), init_data)?; + debug!("new flow: {} {flow}", conn.uuid); + Ok(self.map.insert(flow, conn)) + } + + pub fn retire_flow(&mut self, flow: Flow) { + let Some(mut conn) = self.map.remove(&flow) else { + warn!("retire_flow called on non-existent flow?: {flow}"); return; }; - debug!("remove flow: {} {flow_id}", conn.uuid); + debug!("remove flow: {} {flow}", conn.uuid); conn.will_retire(); self.retired.push_back(conn); } diff --git a/parse-tcp/src/handler.rs b/parse-tcp/src/handler.rs new file mode 100644 index 0000000..c9a4a00 --- /dev/null +++ b/parse-tcp/src/handler.rs @@ -0,0 +1,495 @@ +use std::convert::Infallible; +use std::fs::File; +use std::io::{BufWriter, Seek, SeekFrom, Write}; +use std::net::IpAddr; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use eyre::Context; +use parking_lot::Mutex; +use serde::Serialize; +use tracing::{debug, info, trace}; +use uuid::Uuid; + +use crate::connection::{Connection, Direction}; +use crate::flow_table::Flow; +use crate::stream::{SegmentInfo, SegmentType}; +use crate::{ConnectionHandler, PacketExtra}; + +pub fn dump_as_readable_ascii(buf: &[u8], newline: bool) { + let mut writer = BufWriter::new(std::io::stdout()); + buf.iter() + .copied() + .map(|v| { + if (b' '..=b'~').contains(&v) || v == b'\n' { + v + } else { + b'.' + } + }) + .for_each(|v| writer.write_all(&[v]).expect("failed write")); + if newline { + let _ = writer.write_all(b"\n"); + } +} + +/// ConnectionHandler to dump data to stdout +pub struct DumpHandler { + pub gaps: Vec>, + pub segments: Vec, + pub buf: Vec, + pub forward_has_data: bool, + pub reverse_has_data: bool, +} + +impl DumpHandler { + pub fn dump_stream_segments(&self) { + info!("segments (length {})", self.segments.len()); + for segment in &self.segments { + info!(" offset: {}", segment.offset); + info!(" reverse acked: {}", segment.reverse_acked); + match segment.data { + SegmentType::Data { len, is_retransmit } => { + info!(" type: data"); + info!(" len {len}, retransmit {is_retransmit}"); + } + SegmentType::Ack { window } => { + info!(" type: ack"); + info!(" window: {window}"); + } + SegmentType::Fin { end_offset } => { + info!(" type: fin"); + info!(" end offset: {end_offset}"); + } + } + } + } + + pub fn dump_stream( + &mut self, + connection: &mut Connection, + direction: Direction, + dump_len: usize, + ) { + self.gaps.clear(); + self.segments.clear(); + self.buf.clear(); + // indiscriminately dump everything to stdout + let mut flow = connection.forward_flow.clone(); + if direction == Direction::Reverse { + flow.reverse(); + } + let uuid = connection.uuid; + let stream = connection.get_stream(direction); + + let start_offset = stream.buffer_start(); + let end_offset = start_offset + dump_len as u64; + if dump_len > 0 { + trace!("requesting {dump_len} bytes for direction {direction}"); + stream.read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| { + let (a, b) = slice.as_slices(); + self.buf.extend_from_slice(a); + if let Some(b) = b { + self.buf.extend_from_slice(b); + } + }); + + info!("gaps (length {})", self.gaps.len()); + for gap in &self.gaps { + info!(" gap {} -> {}", gap.start, gap.end); + } + self.dump_stream_segments(); + + info!("data (length {})", self.buf.len()); + println!("\n====================\n{} ({})", flow, uuid); + println!(" offset: {start_offset}"); + println!(" length: {dump_len}\n"); + if !self.gaps.is_empty() { + let gaps_len: u64 = self.gaps.iter().map(|r| r.end - r.start).sum(); + println!(" gap bytes: {gaps_len}"); + } + dump_as_readable_ascii(&self.buf, true); + } else { + // read segments only + stream.read_segments_until(end_offset, &mut self.segments); + info!("no new data, dumping segments only"); + self.dump_stream_segments(); + } + } + + pub fn write_remaining(&mut self, connection: &mut Connection, direction: Direction) { + debug!( + "connection {} direction {direction} writing remaining segments", + connection.uuid + ); + let remaining = connection.get_stream(direction).total_buffered_length(); + self.dump_stream(connection, direction, remaining); + } +} + +impl ConnectionHandler for DumpHandler { + type InitialData = (); + type ConstructError = Infallible; + fn new(_init: (), _conn: &mut Connection) -> Result { + Ok(DumpHandler { + gaps: Vec::new(), + segments: Vec::new(), + buf: Vec::new(), + forward_has_data: false, + reverse_has_data: false, + }) + } + + fn data_received(&mut self, connection: &mut Connection, direction: Direction) { + let (fwd_data, rev_data) = match direction { + Direction::Forward => (&mut self.forward_has_data, &mut self.reverse_has_data), + Direction::Reverse => (&mut self.reverse_has_data, &mut self.forward_has_data), + }; + let fwd_readable_len = connection.get_stream(direction).readable_buffered_length(); + *fwd_data = fwd_readable_len > 0; + + // dump reverse stream buffer if it has data + if *rev_data { + let rev_stream = connection.get_stream(direction.swap()); + let readable = rev_stream.readable_buffered_length(); + if readable > 0 { + trace!("reverse stream has data, will dump"); + self.dump_stream(connection, direction.swap(), readable); + } + } + + // dump forward stream if limits hit + let fwd_stream = connection.get_stream(direction); + if fwd_readable_len > 64 << 10 || fwd_stream.segments_info.len() > 16 << 10 { + trace!("forward stream exceeded threshold, will dump"); + self.dump_stream(connection, direction, fwd_readable_len); + } else if fwd_stream.total_buffered_length() > 256 << 10 { + trace!("forward stream exceeded total buffer size threshold, will dump"); + self.dump_stream(connection, direction, 128 << 10); + } + } + + fn will_retire(&mut self, connection: &mut Connection) { + self.write_remaining(connection, Direction::Forward); + self.write_remaining(connection, Direction::Reverse); + } +} + +#[derive(Serialize)] +pub struct ConnInfo { + pub id: Uuid, + pub src_addr: IpAddr, + pub src_port: u16, + pub dst_addr: IpAddr, + pub dst_port: u16, +} + +impl ConnInfo { + pub fn new(uuid: Uuid, flow: &Flow) -> Self { + ConnInfo { + id: uuid, + src_addr: flow.src_addr, + src_port: flow.src_port, + dst_addr: flow.dst_addr, + dst_port: flow.dst_port, + } + } +} + +/// shared state for DirectoryOutputHandler +pub struct DirectoryOutputSharedInfoInner { + pub base_dir: PathBuf, + pub conn_info_file: Mutex, +} + +impl DirectoryOutputSharedInfoInner { + /// create with output path + pub fn new(base_dir: PathBuf) -> std::io::Result> { + let mut conn_info_file = File::create(base_dir.join("connections.json"))?; + conn_info_file.write_all(b"[\n")?; + Ok(Arc::new(DirectoryOutputSharedInfoInner { + base_dir, + conn_info_file: Mutex::new(conn_info_file), + })) + } + + /// write connection info + pub fn record_conn_info(self: &Arc, uuid: Uuid, flow: &Flow) -> std::io::Result<()> { + let mut serialized = serde_json::to_string(&ConnInfo::new(uuid, flow)) + .expect("failed to serialize ConnInfo"); + serialized += ",\n"; + let mut file = self.conn_info_file.lock(); + file.write_all(serialized.as_bytes()) + } + + /// close connection info file + pub fn close(self) -> std::io::Result<()> { + let mut conn_info_file = self.conn_info_file.into_inner(); + let current_pos = conn_info_file.stream_position()?; + if current_pos > 2 { + // overwrite trailing comma and close array + conn_info_file.seek(SeekFrom::Current(-2))?; + conn_info_file.write_all(b"\n]\n")?; + } else { + // no connections, just close the array + conn_info_file.write_all(b"]\n")?; + } + Ok(()) + } +} + +type DirectoryOutputSharedInfo = Arc; + +#[derive(Serialize)] +#[serde(tag = "type")] +pub enum SerializedSegment { + #[serde(rename = "data")] + Data { + offset: u64, + len: usize, + is_retransmit: bool, + reverse_acked: u64, + #[serde(flatten)] + extra: PacketExtra, + }, + #[serde(rename = "ack")] + Ack { + offset: u64, + window: usize, + reverse_acked: u64, + #[serde(flatten)] + extra: PacketExtra, + }, + #[serde(rename = "fin")] + Fin { + offset: u64, + reverse_acked: u64, + #[serde(flatten)] + extra: PacketExtra, + }, + #[serde(rename = "gap")] + Gap { offset: u64, len: u64 }, +} + +impl SerializedSegment { + pub fn new_gap(offset: u64, len: u64) -> Self { + Self::Gap { offset, len } + } +} + +impl From<&SegmentInfo> for SerializedSegment { + fn from(info: &SegmentInfo) -> Self { + match info.data { + SegmentType::Data { len, is_retransmit } => Self::Data { + offset: info.offset, + len, + is_retransmit, + reverse_acked: info.reverse_acked, + extra: info.extra.clone(), + }, + SegmentType::Ack { window } => Self::Ack { + offset: info.offset, + window, + reverse_acked: info.reverse_acked, + extra: info.extra.clone(), + }, + SegmentType::Fin { end_offset } => Self::Fin { + offset: end_offset, + reverse_acked: info.reverse_acked, + extra: info.extra.clone(), + }, + } + } +} + +/// ConnectionHandler to write data to a directory +pub struct DirectoryOutputHandler { + pub shared_info: DirectoryOutputSharedInfo, + pub id: Uuid, + pub gaps: Vec>, + pub segments: Vec, + + pub forward_data: File, + pub forward_segments: File, + pub reverse_data: File, + pub reverse_segments: File, +} + +impl DirectoryOutputHandler { + pub fn write_stream_data( + &mut self, + connection: &mut Connection, + direction: Direction, + dump_len: usize, + ) -> std::io::Result<()> { + self.gaps.clear(); + self.segments.clear(); + + let (data_file, mut segments_file) = match direction { + Direction::Forward => ( + &mut self.forward_data, + BufWriter::new(&mut self.forward_segments), + ), + Direction::Reverse => ( + &mut self.reverse_data, + BufWriter::new(&mut self.reverse_segments), + ), + }; + + let stream = connection.get_stream(direction); + if dump_len > 0 { + trace!("write_stream_data: requesting {dump_len} bytes from stream for {direction}"); + let start_offset = stream.buffer_start(); + let end_offset = start_offset + dump_len as u64; + stream + .read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| { + let (a, b) = slice.as_slices(); + trace!("write_stream_data: writing {} data bytes", a.len()); + data_file.write_all(a)?; + if let Some(b) = b { + trace!("write_stream_data: writing {} data bytes", b.len()); + data_file.write_all(b)?; + } + Result::<(), std::io::Error>::Ok(()) + }) + .expect("read_next cannot fulfill range")?; + } else if !stream.segments_info.is_empty() { + // only dump remaining segments + stream.read_segments_until(stream.buffer_start(), &mut self.segments); + } else { + // nothing to do + return Ok(()); + } + + // write gaps and segments in order + let mut gaps_iter = self.gaps.iter().peekable(); + let mut segments_iter = self.segments.iter().peekable(); + loop { + enum WhichNext { + Gap, + Segment, + } + // figure out which to write next + let which = match (gaps_iter.peek(), segments_iter.peek()) { + (None, None) => break, + (None, Some(_)) => WhichNext::Segment, + (Some(_), None) => WhichNext::Gap, + (Some(&gap), Some(&segment)) => { + if gap.start < segment.offset { + WhichNext::Gap + } else { + WhichNext::Segment + } + } + }; + + // serialize and write + match which { + WhichNext::Gap => { + let gap = gaps_iter.next().unwrap(); + let info = SerializedSegment::new_gap(gap.start, gap.end - gap.start); + serde_json::to_writer(&mut segments_file, &info)?; + segments_file.write_all(b"\n")?; + } + WhichNext::Segment => { + let segment = segments_iter.next().unwrap(); + let info: SerializedSegment = segment.into(); + serde_json::to_writer(&mut segments_file, &info)?; + segments_file.write_all(b"\n")?; + } + } + } + + self.gaps.clear(); + self.segments.clear(); + Ok(()) + } + + pub fn write_remaining( + &mut self, + connection: &mut Connection, + direction: Direction, + ) -> std::io::Result<()> { + debug!( + "connection {} direction {direction} writing remaining segments", + connection.uuid + ); + let remaining = connection.get_stream(direction).total_buffered_length(); + self.write_stream_data(connection, direction, remaining) + } +} + +macro_rules! log_error { + ($result:expr, $what:expr) => { + if let Err(e) = $result { + ::tracing::error!(concat!($what, ": {:?}"), e); + } + }; +} + +impl ConnectionHandler for DirectoryOutputHandler { + type InitialData = DirectoryOutputSharedInfo; + type ConstructError = eyre::Report; + fn new( + shared_info: Self::InitialData, + connection: &mut Connection, + ) -> eyre::Result { + let id = connection.uuid; + let base_dir = &shared_info.base_dir; + let forward_data = File::create(base_dir.join(format!("{id}.f.data"))) + .wrap_err("creating forward data file")?; + let forward_segments = File::create(base_dir.join(format!("{id}.f.jsonl"))) + .wrap_err("creating forward segments file")?; + let reverse_data = File::create(base_dir.join(format!("{id}.r.data"))) + .wrap_err("creating reverse data file")?; + let reverse_segments = File::create(base_dir.join(format!("{id}.r.jsonl"))) + .wrap_err("creating reverse segments file")?; + + Ok(DirectoryOutputHandler { + shared_info, + id, + gaps: Vec::new(), + segments: Vec::new(), + forward_data, + forward_segments, + reverse_data, + reverse_segments, + }) + } + + fn handshake_done(&mut self, connection: &mut Connection) { + log_error!( + self.shared_info + .record_conn_info(connection.uuid, &connection.forward_flow), + "failed to write connection info" + ); + } + + fn data_received(&mut self, connection: &mut Connection, direction: Direction) { + let stream = connection.get_stream(direction); + let readable_len = stream.readable_buffered_length(); + if readable_len > 64 << 10 || stream.segments_info.len() > 16 << 10 { + log_error!( + self.write_stream_data(connection, direction, readable_len), + "failed to write stream data" + ); + } else if stream.total_buffered_length() > 256 << 10 { + log_error!( + self.write_stream_data(connection, direction, 128 << 10), + "failed to write stream data" + ); + } + } + + fn will_retire(&mut self, connection: &mut Connection) { + log_error!( + self.write_remaining(connection, Direction::Forward), + "failed to write stream data" + ); + log_error!( + self.write_remaining(connection, Direction::Reverse), + "failed to write stream data" + ); + } +} diff --git a/parse-tcp/src/lib.rs b/parse-tcp/src/lib.rs index 164df39..d35f551 100644 --- a/parse-tcp/src/lib.rs +++ b/parse-tcp/src/lib.rs @@ -2,9 +2,11 @@ use std::fmt::Debug; use std::net::IpAddr; use connection::{Connection, Direction}; +use serde::Serialize; pub mod connection; pub mod flow_table; +pub mod handler; pub mod parser; pub mod stream; @@ -88,8 +90,13 @@ where { /// initial data provided to new type InitialData: Clone; + /// error type raised from new + type ConstructError; /// construct handler object - fn new(init_data: Self::InitialData, connection: &mut Connection) -> Self; + fn new( + init_data: Self::InitialData, + connection: &mut Connection, + ) -> Result; /// called on handshake finish (or incomplete handshake) fn handshake_done(&mut self, _connection: &mut Connection) {} /// called on data received @@ -107,7 +114,8 @@ where } /// extra information that may be associated with the packet -#[derive(Clone)] +#[derive(Clone, Serialize)] +#[serde(untagged)] pub enum PacketExtra { None, LegacyPcap { diff --git a/parse-tcp/src/parser.rs b/parse-tcp/src/parser.rs index 3d03da3..5ca2c0f 100644 --- a/parse-tcp/src/parser.rs +++ b/parse-tcp/src/parser.rs @@ -1,17 +1,18 @@ use std::net::IpAddr; -use etherparse::{InternetSlice, SlicedPacket, TransportSlice, TcpOptionElement}; +use etherparse::{InternetSlice, SlicedPacket, TcpOptionElement, TransportSlice}; use tracing::{debug, trace}; -use crate::{TcpMeta, TcpFlags}; +use crate::{TcpFlags, TcpMeta}; -pub struct Parser { +/// parses only TCP packets with etherparse +pub struct TcpParser { pub layer: ParseLayer, pub failed_parse: usize, pub ignored: usize, } -impl Parser { +impl TcpParser { pub fn new() -> Self { Self { layer: ParseLayer::Link, @@ -20,6 +21,7 @@ impl Parser { } } + /// parse tcp packets into TcpMeta and data pub fn parse_packet<'a>(&mut self, data: &'a [u8]) -> Option<(TcpMeta, &'a [u8])> { let parse_result = match self.layer { ParseLayer::Link => SlicedPacket::from_ethernet(data), @@ -64,12 +66,12 @@ impl Parser { match opt { Ok(TcpOptionElement::WindowScale(scale)) => { option_window_scale = Some(scale); - }, + } Ok(TcpOptionElement::Timestamp(a, b)) => { option_timestamp = Some((a, b)); } // ignore all other options - _ => {}, + _ => {} } } @@ -95,7 +97,7 @@ impl Parser { } } -impl Default for Parser { +impl Default for TcpParser { fn default() -> Self { Self::new() } diff --git a/parse-tcp/src/stream.rs b/parse-tcp/src/stream.rs index 74e60c9..d56218f 100644 --- a/parse-tcp/src/stream.rs +++ b/parse-tcp/src/stream.rs @@ -486,25 +486,25 @@ impl Stream { } /// read state until offset - pub fn read_next( + pub fn read_next( &mut self, end_offset: u64, in_segments: &mut Vec, in_gaps: &mut Vec>, - read_fn: impl FnOnce(RingBufSlice<'_, u8>), - ) -> bool { + read_fn: impl FnOnce(RingBufSlice<'_, u8>) -> T, + ) -> Option { let start_offset = self.state.buffer_offset; if end_offset < start_offset { warn!("requested read of range that no longer exists"); - return false; + return None; } if end_offset == start_offset { // don't return zero-length reads - return false; + return None; } if (end_offset - start_offset) as usize > self.state.buffer.len() { warn!("requested read of range past end of buffer"); - return false; + return None; } self.read_segments_until(end_offset, in_segments); self.read_gaps(start_offset..end_offset, in_gaps); @@ -514,10 +514,10 @@ impl Stream { let Some(slice) = self.state.read_segment(start_offset..end_offset) else { panic!("InboundStreamState says range is not available"); }; - read_fn(slice); + let ret = read_fn(slice); // advance backing buffer self.state.advance_buffer(end_offset); - true + Some(ret) } } @@ -528,6 +528,7 @@ impl Default for Stream { } /// information on each segment received +#[derive(Clone)] pub struct SegmentInfo { /// offset into stream of this segment pub offset: u64, @@ -540,6 +541,7 @@ pub struct SegmentInfo { } /// type-specific information for each segment +#[derive(Clone)] pub enum SegmentType { Data { len: usize, is_retransmit: bool }, Ack { window: usize },