From 8151209acd9186885557000dcc610320bcee9277 Mon Sep 17 00:00:00 2001 From: Eguo Wang Date: Mon, 23 Oct 2023 23:56:30 +0800 Subject: [PATCH] Add `once` argument to amp-syncer --- Cargo.lock | 12 ++++++------ Cargo.toml | 2 +- syncer/src/config.rs | 3 +++ syncer/src/handle.rs | 35 ++++++++++++++++++++--------------- syncer/src/main.rs | 16 +++++++++++----- 5 files changed, 41 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7311fe2..bb8884d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,7 +61,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.6.3" +version = "0.6.4" dependencies = [ "amp-common", "amp-resources", @@ -121,7 +121,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.6.3" +version = "0.6.4" dependencies = [ "amp-common", "amp-resolver", @@ -146,7 +146,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.6.3" +version = "0.6.4" dependencies = [ "amp-common", "clap", @@ -158,7 +158,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.6.3" +version = "0.6.4" dependencies = [ "amp-common", "amp-resources", @@ -172,7 +172,7 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.6.3" +version = "0.6.4" dependencies = [ "amp-common", "anyhow", @@ -192,7 +192,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.6.3" +version = "0.6.4" dependencies = [ "amp-common", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 58946cb..366f04f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.6.3" +version = "0.6.4" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" diff --git a/syncer/src/config.rs b/syncer/src/config.rs index 7d92cb8..9d7d7b5 100644 --- a/syncer/src/config.rs +++ b/syncer/src/config.rs @@ -40,4 +40,7 @@ pub struct Config { /// The actor name. #[clap(long, env = "AMP_ACTOR")] pub actor: String, + // Exit after sync once (Overwrite). + #[clap(long, action = clap::ArgAction::Set, default_value = "false", env = "AMP_ONCE")] + pub once: bool, } diff --git a/syncer/src/handle.rs b/syncer/src/handle.rs index e8172f9..053daea 100644 --- a/syncer/src/handle.rs +++ b/syncer/src/handle.rs @@ -16,26 +16,27 @@ use std::path::Path; use amp_common::sync::{self, Synchronization}; use tar::Archive; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info, warn}; type Result = std::result::Result>; /// Overwrite workspace's files with payload tarball. -pub fn overwrite(workspace: &Path, req: Synchronization) -> Result<()> { +pub fn overwrite(workspace: &Path, req: &Synchronization) -> Result<()> { debug!("Received overwrite event, workspace: {:?}, req: {:?}", workspace, req); - if let Some(payload) = req.payload { + if let Some(payload) = &req.payload { let mut ar = Archive::new(payload.as_slice()); ar.unpack(workspace)?; + info!("Received overwrite event and unpacked into workspace: {:?}", workspace); } Ok(()) } /// Create new files or directories. -pub fn create(workspace: &Path, req: Synchronization) -> Result<()> { +pub fn create(workspace: &Path, req: &Synchronization) -> Result<()> { debug!("Received create event, workspace: {:?}, req: {:?}", workspace, req); - for path in req.paths { + for path in &req.paths { match path { sync::Path::File(file) => { let path = workspace.join(file); @@ -50,7 +51,8 @@ pub fn create(workspace: &Path, req: Synchronization) -> Result<()> { } } - std::fs::File::create(path)?; + std::fs::File::create(path.clone())?; + info!("Created file: {:?}", path); } sync::Path::Directory(path) => { let path = workspace.join(path); @@ -59,7 +61,8 @@ pub fn create(workspace: &Path, req: Synchronization) -> Result<()> { continue; } - std::fs::create_dir_all(path)?; + std::fs::create_dir_all(path.clone())?; + info!("Created directory: {:?}", path); } } } @@ -68,28 +71,29 @@ pub fn create(workspace: &Path, req: Synchronization) -> Result<()> { } /// Modify existing files or directories. -pub fn modify(workspace: &Path, req: Synchronization) -> Result<()> { +pub fn modify(workspace: &Path, req: &Synchronization) -> Result<()> { debug!("Received modify event, workspace: {:?}, req: {:?}", workspace, req); - if let Some(payload) = req.payload { + if let Some(payload) = &req.payload { let mut ar = Archive::new(payload.as_slice()); ar.unpack(workspace)?; + info!("Received modify event and unpacked into workspace: {:?}", workspace); } Ok(()) } /// Rename existing files or directories. -pub fn rename(_workspace: &Path, _req: Synchronization) -> Result<()> { +pub fn rename(_workspace: &Path, _req: &Synchronization) -> Result<()> { error!("Received rename event, nothing to do!"); Ok(()) } /// Remove existing files or directories. -pub fn remove(workspace: &Path, req: Synchronization) -> Result<()> { +pub fn remove(workspace: &Path, req: &Synchronization) -> Result<()> { debug!("Received remove event, workspace: {:?}, req: {:?}", workspace, req); - for path in req.paths { + for path in &req.paths { match path { sync::Path::File(file) => { let path = workspace.join(file); @@ -98,7 +102,8 @@ pub fn remove(workspace: &Path, req: Synchronization) -> Result<()> { continue; } - std::fs::remove_file(path)?; + std::fs::remove_file(path.clone())?; + info!("Removed file: {:?}", path); } sync::Path::Directory(path) => { let path = workspace.join(path); @@ -107,8 +112,8 @@ pub fn remove(workspace: &Path, req: Synchronization) -> Result<()> { continue; } - trace!("Removing directory: {:?}", path); - std::fs::remove_dir_all(path)?; + std::fs::remove_dir_all(path.clone())?; + info!("Removed directory: {:?}", path); } } } diff --git a/syncer/src/main.rs b/syncer/src/main.rs index b1dbabd..1510621 100644 --- a/syncer/src/main.rs +++ b/syncer/src/main.rs @@ -62,11 +62,11 @@ async fn main() -> Result<(), async_nats::Error> { // Handle the message if let Err(err) = match req.kind { - Create => handle::create(workspace, req), - Modify => handle::modify(workspace, req), - Rename => handle::rename(workspace, req), - Remove => handle::remove(workspace, req), - Overwrite => handle::overwrite(workspace, req), + Create => handle::create(workspace, &req), + Modify => handle::modify(workspace, &req), + Rename => handle::rename(workspace, &req), + Remove => handle::remove(workspace, &req), + Overwrite => handle::overwrite(workspace, &req), Other => { warn!("Received other event, nothing to do!"); Ok(()) @@ -83,6 +83,12 @@ async fn main() -> Result<(), async_nats::Error> { if let Err(err) = message.ack().await { error!("Failed to acknowledge message: {:?}", err); } + + // If we're in once mode, exit after overwrite. + if config.once && req.kind == Overwrite { + info!("Exit after sync once (Overwrite), bye!"); + std::process::exit(0); + } } Ok(())