From 84a9779a2b86c86d006af036646108524ac2ea34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aurimas=20Bla=C5=BEulionis?= Date: Mon, 20 Jan 2025 06:00:44 +0000 Subject: [PATCH] Add cramless snapshot/event-log verification --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/lib.rs | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++-- src/main.rs | 4 +- 4 files changed, 116 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 943b69f..ec144cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1429,7 +1429,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pierport" -version = "0.1.3" +version = "0.1.4" dependencies = [ "anyhow", "async-compression 0.4.7", diff --git a/Cargo.toml b/Cargo.toml index d29b46b..74cebfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pierport" -version = "0.1.3" +version = "0.1.4" edition = "2021" rust-version = "1.76" authors = ["Aurimas Blažulionis "] diff --git a/src/lib.rs b/src/lib.rs index 5727187..019099d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,13 +178,15 @@ pub async fn find_files_by_suffix( } /// Actions to be taken after pier is unpacked. -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct PostUnpackCfg { /// Catch up the event log. prep: bool, /// Cram and verify that the cram hasn't changed after subsequent steps. verify_cram: bool, + /// Store snapshot and event log event count, and verify it. + verify_events: bool, /// Run vere pack. pack: bool, /// Run vere meld. @@ -193,6 +195,19 @@ pub struct PostUnpackCfg { chop: bool, } +impl Default for PostUnpackCfg { + fn default() -> Self { + Self { + prep: true, + verify_cram: false, + verify_events: true, + pack: true, + meld: true, + chop: true, + } + } +} + impl PostUnpackCfg { pub fn verify_cram(self, verify_cram: bool) -> Self { Self { @@ -201,6 +216,13 @@ impl PostUnpackCfg { } } + pub fn verify_events(self, verify_events: bool) -> Self { + Self { + verify_events, + ..self + } + } + pub fn prep(self, prep: bool) -> Self { Self { prep, ..self } } @@ -220,6 +242,7 @@ impl PostUnpackCfg { pub fn all() -> Self { Self { verify_cram: true, + verify_events: true, prep: true, pack: true, meld: true, @@ -228,10 +251,58 @@ impl PostUnpackCfg { } } +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +struct EventCount { + snapshot: u64, + disk: u64, +} + +impl EventCount { + fn from_info_stderr(info: &str) -> anyhow::Result { + let mut lines = info.lines(); + + while let Some(line) = lines.next() { + let line = line.trim(); + + // Parse snapshot event number + let Some(line) = line.strip_prefix("urbit: ") else { + continue; + }; + let Some((_, line)) = line.split_once("at event ") else { + continue; + }; + let Ok(snapshot) = line.parse::() else { + continue; + }; + + trace!("Parsed snapshot event number: {snapshot}"); + + let Some(line) = lines.next() else { continue }; + let line = line.trim(); + + // Parse the disk (event log) event number + if !line.contains("disk:") { + continue; + } + let Some((_, line)) = line.split_once("event=") else { + continue; + }; + let Ok(disk) = line.parse::() else { + continue; + }; + + return Ok(Self { snapshot, disk }); + } + + Err(anyhow!("Could not parse info output")) + } +} + #[derive(Clone)] pub struct StandardUnpack> { path: T, loom: Option, + event_count: Option, } impl> StandardUnpack { @@ -269,7 +340,11 @@ impl> StandardUnpack { fs::create_dir_all(path.as_ref()).await?; - Ok(Self { path, loom }) + Ok(Self { + path, + loom, + event_count: None, + }) } pub async fn detect_loom(&mut self) -> Result> { @@ -281,7 +356,7 @@ impl> StandardUnpack { self.loom = loom; } - async fn run_cmd(&mut self, args: &[&str]) -> Result<()> { + async fn run_cmd(&mut self, args: &[&str]) -> Result { let mut cmd = Command::new("./.run"); cmd.current_dir(&**self).args(args); @@ -302,7 +377,32 @@ impl> StandardUnpack { ) .into()) } else { - Ok(()) + Ok(String::from_utf8_lossy(&output.stderr).into()) + } + } + + pub async fn store_events(mut self) -> Result> { + debug!("Pre-work event count"); + let err = self.run_cmd(&["-R"]).await?; + self.event_count = Some(EventCount::from_info_stderr(&err)?); + Ok(self) + } + + pub async fn verify_events(mut self) -> Result> { + debug!("Pre-work event count"); + let err = self.run_cmd(&["-R"]).await?; + let event_count = EventCount::from_info_stderr(&err)?; + let Some(events) = self.event_count else { + return Err(anyhow!( + "verify_events called without previous store_events" + ).into()); + }; + if event_count == events { + Ok(self) + } else { + Err(anyhow!( + "Event count mismatch between prev={events:?} and cur={event_count:?}" + ).into()) } } @@ -442,6 +542,10 @@ impl> StandardUnpack { self = self.cram().await?; } + if cfg.verify_events { + self = self.store_events().await?; + } + if cfg.pack { self = self.pack().await?; } @@ -458,6 +562,10 @@ impl> StandardUnpack { self = self.verify_cram().await?; } + if cfg.verify_events { + self = self.verify_events().await?; + } + Ok(self) } diff --git a/src/main.rs b/src/main.rs index 435c624..75cdfd0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -84,8 +84,8 @@ struct Config { upload_limit: usize, /// Cleanup steps to run after the pier has been unpacked. /// - /// Defaults to all steps, unless there is a section in the config negating it (sufficient to - /// declare empty post_unpack section). + /// Defaults to all steps, except cram, unless there is a section in the config negating it + /// (sufficient to declare empty post_unpack section). post_unpack: PostUnpackCfg, /// How often should we cleanup complete sessions. session_check_interval_seconds: u64,