From fed9f187926f5d1390f5f3c4bdfcea037aabab7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Willy=20Rom=C3=A3o?= Date: Sat, 18 Nov 2023 22:30:33 +0000 Subject: [PATCH] wip: testing all capabilities --- mfm_machine/mfm_machine_macros/src/lib.rs | 3 +- mfm_machine/tests/default_impls.rs | 166 ++++++++++++++++++ mfm_machine/tests/public_api_test.rs | 135 ++------------ .../retry_workflow_state_machine_test.rs | 28 +++ 4 files changed, 208 insertions(+), 124 deletions(-) create mode 100644 mfm_machine/tests/default_impls.rs create mode 100644 mfm_machine/tests/retry_workflow_state_machine_test.rs diff --git a/mfm_machine/mfm_machine_macros/src/lib.rs b/mfm_machine/mfm_machine_macros/src/lib.rs index 8717712..176c7f2 100644 --- a/mfm_machine/mfm_machine_macros/src/lib.rs +++ b/mfm_machine/mfm_machine_macros/src/lib.rs @@ -7,9 +7,8 @@ use syn::{parse_macro_input, DeriveInput}; #[proc_macro_derive(StateConfigReqs)] pub fn state_reqs_derive(input: TokenStream) -> TokenStream { let input = parse_macro_input!(input as DeriveInput); - let ident = &input.ident; - //FIXME: import and fix paths for mfm_machine lib + let expanded = quote! { impl StateConfig for #ident { fn label(&self) -> &Label { diff --git a/mfm_machine/tests/default_impls.rs b/mfm_machine/tests/default_impls.rs new file mode 100644 index 0000000..7faecdc --- /dev/null +++ b/mfm_machine/tests/default_impls.rs @@ -0,0 +1,166 @@ +use anyhow::anyhow; +use anyhow::Error; +use mfm_machine::state::context::Context; +use mfm_machine::state::DependencyStrategy; +use mfm_machine::state::Label; +use mfm_machine::state::StateConfig; +use mfm_machine::state::StateError; +use mfm_machine::state::StateErrorRecoverability; +use mfm_machine::state::StateHandler; +use mfm_machine::state::StateResult; +use mfm_machine::state::Tag; +use mfm_machine::StateConfigReqs; +use rand::Rng; +use serde_derive::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +#[derive(Serialize, Deserialize)] +pub struct ContextA { + pub a: String, + pub b: u64, +} + +impl ContextA { + pub fn new(a: String, b: u64) -> Self { + Self { a, b } + } +} + +impl Context for ContextA { + fn read_input(&self) -> Result { + serde_json::to_value(self).map_err(|e| anyhow!(e)) + } + + fn write_output(&mut self, value: &Value) -> Result<(), Error> { + let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?; + self.a = ctx.a; + self.b = ctx.b; + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, StateConfigReqs)] +pub struct Setup { + label: Label, + tags: Vec, + depends_on: Vec, + depends_on_strategy: DependencyStrategy, +} + +impl Default for Setup { + fn default() -> Self { + Self::new() + } +} + +impl Setup { + pub fn new() -> Self { + Self { + label: Label::new("setup_state").unwrap(), + tags: vec![Tag::new("setup").unwrap()], + depends_on: vec![Tag::new("setup").unwrap()], + depends_on_strategy: DependencyStrategy::Latest, + } + } +} + +impl StateHandler for Setup { + fn handler(&self, context: &mut dyn Context) -> StateResult { + let _data: ContextA = serde_json::from_value(context.read_input().unwrap()).unwrap(); + let mut rng = rand::thread_rng(); + let data = json!({ "a": "setting up", "b": rng.gen_range(0..9) }); + match context.write_output(&data) { + Ok(()) => Ok(()), + Err(e) => Err(StateError::StorageAccess( + StateErrorRecoverability::Recoverable, + e, + )), + } + } +} + +#[derive(Debug, Clone, PartialEq, StateConfigReqs)] +pub struct ComputePrice { + label: Label, + tags: Vec, + depends_on: Vec, + depends_on_strategy: DependencyStrategy, +} + +impl Default for ComputePrice { + fn default() -> Self { + Self::new() + } +} + +impl ComputePrice { + pub fn new() -> Self { + Self { + label: Label::new("compute_price").unwrap(), + tags: vec![Tag::new("computation").unwrap()], + depends_on: vec![Tag::new("setup").unwrap()], + depends_on_strategy: DependencyStrategy::Latest, + } + } +} + +impl StateHandler for ComputePrice { + fn handler(&self, context: &mut dyn Context) -> StateResult { + let _data: ContextA = serde_json::from_value(context.read_input().unwrap()).unwrap(); + if _data.b % 2 == 0 { + return Err(StateError::ParsingInput( + StateErrorRecoverability::Recoverable, + anyhow!("the input is even, should be odd"), + )); + } + + let data = json!({ "a": "the input number is odd", "b": _data.b }); + match context.write_output(&data) { + Ok(()) => Ok(()), + Err(e) => Err(StateError::StorageAccess( + StateErrorRecoverability::Unrecoverable, + e, + )), + } + } +} + +#[derive(Debug, Clone, PartialEq, StateConfigReqs)] +pub struct Report { + label: Label, + tags: Vec, + depends_on: Vec, + depends_on_strategy: DependencyStrategy, +} + +impl Default for Report { + fn default() -> Self { + Self::new() + } +} + +impl Report { + pub fn new() -> Self { + Self { + label: Label::new("report_state").unwrap(), + tags: vec![Tag::new("report").unwrap()], + depends_on: vec![Tag::new("setup").unwrap()], + depends_on_strategy: DependencyStrategy::Latest, + } + } +} + +impl StateHandler for Report { + fn handler(&self, context: &mut dyn Context) -> StateResult { + let _data: ContextA = serde_json::from_value(context.read_input().unwrap()).unwrap(); + let data = + json!({ "a": format!("{}: {}", "some new data reported", _data.a), "b": _data.b }); + match context.write_output(&data) { + Ok(()) => Ok(()), + Err(e) => Err(StateError::StorageAccess( + StateErrorRecoverability::Recoverable, + e, + )), + } + } +} diff --git a/mfm_machine/tests/public_api_test.rs b/mfm_machine/tests/public_api_test.rs index d0c15c2..dffe0d2 100644 --- a/mfm_machine/tests/public_api_test.rs +++ b/mfm_machine/tests/public_api_test.rs @@ -1,127 +1,16 @@ -extern crate mfm_machine; +mod default_impls; -use anyhow::anyhow; -use anyhow::Error; +use default_impls::{ContextA, Report, Setup}; use mfm_machine::state::context::Context; use mfm_machine::state::DependencyStrategy; use mfm_machine::state::Label; -use mfm_machine::state::StateConfig; -use mfm_machine::state::StateError; -use mfm_machine::state::StateErrorRecoverability; use mfm_machine::state::StateHandler; -use mfm_machine::state::StateResult; use mfm_machine::state::Tag; -use mfm_machine::StateConfigReqs; -use serde_derive::{Deserialize, Serialize}; -use serde_json::{json, Value}; - -#[derive(Serialize, Deserialize)] -struct ContextA { - a: String, - b: u64, -} - -impl ContextA { - fn _new(a: String, b: u64) -> Self { - Self { a, b } - } -} - -impl mfm_machine::state::context::Context for ContextA { - fn read_input(&self) -> Result { - serde_json::to_value(self).map_err(|e| anyhow!(e)) - } - - fn write_output(&mut self, value: &Value) -> Result<(), Error> { - let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?; - self.a = ctx.a; - self.b = ctx.b; - Ok(()) - } -} - -#[derive(Debug, Clone, PartialEq, StateConfigReqs)] -pub struct Setup { - label: Label, - tags: Vec, - depends_on: Vec, - depends_on_strategy: DependencyStrategy, -} - -impl Default for Setup { - fn default() -> Self { - Self::new() - } -} - -impl Setup { - fn new() -> Self { - Self { - label: Label::new("setup_state").unwrap(), - tags: vec![Tag::new("setup").unwrap()], - depends_on: vec![Tag::new("setup").unwrap()], - depends_on_strategy: DependencyStrategy::Latest, - } - } -} - -impl StateHandler for Setup { - fn handler(&self, context: &mut dyn Context) -> StateResult { - let _data: ContextA = serde_json::from_value(context.read_input().unwrap()).unwrap(); - let data = json!({ "a": "setting up", "b": 1 }); - match context.write_output(&data) { - Ok(()) => Ok(()), - Err(e) => Err(StateError::StorageAccess( - StateErrorRecoverability::Recoverable, - e, - )), - } - } -} - -#[derive(Debug, Clone, PartialEq, StateConfigReqs)] -pub struct Report { - label: Label, - tags: Vec, - depends_on: Vec, - depends_on_strategy: DependencyStrategy, -} - -impl Default for Report { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -impl Report { - pub fn new() -> Self { - Self { - label: Label::new("report_state").unwrap(), - tags: vec![Tag::new("report").unwrap()], - depends_on: vec![Tag::new("setup").unwrap()], - depends_on_strategy: DependencyStrategy::Latest, - } - } -} - -impl StateHandler for Report { - fn handler(&self, context: &mut dyn Context) -> StateResult { - let _data: ContextA = serde_json::from_value(context.read_input().unwrap()).unwrap(); - let data = json!({ "a": "some new data reported", "b": 7 }); - match context.write_output(&data) { - Ok(()) => Ok(()), - Err(e) => Err(StateError::StorageAccess( - StateErrorRecoverability::Recoverable, - e, - )), - } - } -} +use mfm_machine::state_machine::StateMachine; +use serde_json::json; #[test] fn test_state_machine_execute() { - use mfm_machine::state::state_machine::StateMachine; use std::sync::Arc; let setup_state = Box::new(Setup::new()); @@ -136,17 +25,17 @@ fn test_state_machine_execute() { .map(|is| { ( is.label().clone(), - is.tags().clone(), - is.depends_on().clone(), + is.tags(), + is.depends_on(), is.depends_on_strategy().clone(), ) }) .collect(); - let state_machine = StateMachine::new(initial_states); + let mut state_machine = StateMachine::new(initial_states); - let mut context = ContextA::_new(String::from("hello"), 7); - let result = state_machine.execute(&mut context); + let context: &mut dyn Context = &mut ContextA::new(String::from("hello"), 7); + let result = state_machine.execute(context); let last_ctx_message = context.read_input().unwrap(); assert_eq!(state_machine.states.len(), iss.len()); @@ -160,9 +49,11 @@ fn test_state_machine_execute() { }, ); + let last_ctx_data: ContextA = serde_json::from_value(last_ctx_message).unwrap(); + assert!(result.is_ok()); assert_eq!( - last_ctx_message, - json!({"a": "some new data reported", "b": 7}) + last_ctx_data.a, + String::from("some new data reported: setting up") ); } diff --git a/mfm_machine/tests/retry_workflow_state_machine_test.rs b/mfm_machine/tests/retry_workflow_state_machine_test.rs new file mode 100644 index 0000000..91733e9 --- /dev/null +++ b/mfm_machine/tests/retry_workflow_state_machine_test.rs @@ -0,0 +1,28 @@ +mod default_impls; + +use default_impls::{ComputePrice, ContextA, Report, Setup}; +use mfm_machine::state::context::Context; +use mfm_machine::state::StateHandler; +use mfm_machine::state_machine::StateMachine; +use std::sync::Arc; + +#[test] +fn test_retry_workflow_state_machine() { + let setup_state = Box::new(Setup::new()); + let compute_price_state = Box::new(ComputePrice::new()); + let report_state = Box::new(Report::new()); + + let context: &mut dyn Context = &mut ContextA::new(String::from("hello"), 7); + + let initial_states: Arc<[Box]> = Arc::new([ + setup_state.clone(), + compute_price_state.clone(), + report_state.clone(), + ]); + + let mut state_machine = StateMachine::new(initial_states); + + let result = state_machine.execute(context); + + assert!(result.is_ok()); +}