From 77c6853f2b6d6fe2d9710a66a348d2d821af2b88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Willy=20Rom=C3=A3o?= Date: Sun, 26 Nov 2023 17:14:01 +0000 Subject: [PATCH] feat(tracker): add history capabilities for all tracks --- mfm_machine/src/state/context.rs | 22 ++-- mfm_machine/src/state_machine/mod.rs | 40 +++---- mfm_machine/src/state_machine/tracker.rs | 136 ++++++++++++++++++++--- mfm_machine/tests/default_impls.rs | 16 +-- mfm_machine/tests/public_api_test.rs | 2 +- 5 files changed, 154 insertions(+), 62 deletions(-) diff --git a/mfm_machine/src/state/context.rs b/mfm_machine/src/state/context.rs index 27a7414..1d5aac7 100644 --- a/mfm_machine/src/state/context.rs +++ b/mfm_machine/src/state/context.rs @@ -6,8 +6,8 @@ use serde_json::Value; pub type ContextWrapper = Arc>>; pub trait Context { - fn read_input(&self) -> Result; - fn write_output(&mut self, value: &Value) -> Result<(), Error>; + fn read(&self) -> Result; + fn write(&mut self, value: &Value) -> Result<(), Error>; } pub fn wrap_context(context: C) -> ContextWrapper { @@ -35,11 +35,11 @@ mod test { } impl Context for ContextA { - fn read_input(&self) -> Result { + fn read(&self) -> Result { serde_json::to_value(self).map_err(|e| anyhow!(e)) } - fn write_output(&mut self, value: &Value) -> Result<(), Error> { + fn write(&mut self, value: &Value) -> Result<(), Error> { let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?; self.a = ctx.a; self.b = ctx.b; @@ -52,18 +52,10 @@ mod test { let context_a: &mut dyn Context = &mut ContextA::_new(String::from("hello"), 7); let context_b: &dyn Context = &ContextA::_new(String::from("hellow"), 9); - assert_ne!( - context_a.read_input().unwrap(), - context_b.read_input().unwrap() - ); + assert_ne!(context_a.read().unwrap(), context_b.read().unwrap()); - context_a - .write_output(&context_b.read_input().unwrap()) - .unwrap(); + context_a.write(&context_b.read().unwrap()).unwrap(); - assert_eq!( - context_a.read_input().unwrap(), - context_b.read_input().unwrap() - ); + assert_eq!(context_a.read().unwrap(), context_b.read().unwrap()); } } diff --git a/mfm_machine/src/state_machine/mod.rs b/mfm_machine/src/state_machine/mod.rs index 75030e9..cea9760 100644 --- a/mfm_machine/src/state_machine/mod.rs +++ b/mfm_machine/src/state_machine/mod.rs @@ -75,13 +75,6 @@ impl StateMachine { } let state = &self.states[state_index]; - let tracker = self.tracker.as_mut(); - // TODO: should state.label() return an &Label or Label? - // TODO: remove this unwrap for proper handling - tracker.track( - Index::new(state_index, state.label(), state.tags()), - context.clone(), - ); // if thats true, means that no state was executed before and this is the first one if last_state_result.is_none() { @@ -99,13 +92,16 @@ impl StateMachine { // we should implement an well defined rule for the whole dependency // system between states, and follow this definition here as well. let state_depends_on = state.depends_on(); - let indexes_state_deps = - tracker.search_by_tag(state_depends_on.first().unwrap()); + let indexes_state_deps = self + .tracker + .search_by_tag(state_depends_on.first().unwrap()); let last_index_of_first_dep = indexes_state_deps.last().unwrap().clone(); - let last_index_state_ctx = - tracker.recover(last_index_of_first_dep.clone()).unwrap(); + let last_index_state_ctx = self + .tracker + .recover(last_index_of_first_dep.clone()) + .unwrap(); context = last_index_state_ctx.clone(); @@ -134,9 +130,13 @@ impl StateMachine { return Ok(()); } - let current_state = &self.states[next_state_index]; + let state = &self.states[next_state_index]; - let result = current_state.handler(context.clone()); + let result = state.handler(context.clone()); + self.tracker.as_mut().track( + Index::new(next_state_index, state.label(), state.tags()), + context.clone(), + ); self.execute_rec(context, next_state_index, Option::Some(result)) } @@ -176,11 +176,11 @@ mod test { } impl Context for ContextA { - fn read_input(&self) -> Result { + fn read(&self) -> Result { serde_json::to_value(self).map_err(|e| anyhow!(e)) } - fn write_output(&mut self, value: &Value) -> Result<(), Error> { + fn write(&mut self, value: &Value) -> Result<(), Error> { let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?; self.a = ctx.a; self.b = ctx.b; @@ -209,10 +209,10 @@ mod test { impl StateHandler for Setup { fn handler(&self, context: ContextWrapper) -> StateResult { - let value = context.lock().unwrap().read_input().unwrap(); + let value = context.lock().unwrap().read().unwrap(); let _data: ContextA = serde_json::from_value(value).unwrap(); let data = json!({ "a": "setting up", "b": 1 }); - match context.lock().as_mut().unwrap().write_output(&data) { + match context.lock().as_mut().unwrap().write(&data) { Ok(()) => Ok(()), Err(e) => Err(StateError::StorageAccess( StateErrorRecoverability::Recoverable, @@ -244,10 +244,10 @@ mod test { impl StateHandler for Report { fn handler(&self, context: ContextWrapper) -> StateResult { - let value = context.lock().unwrap().read_input().unwrap(); + let value = context.lock().unwrap().read().unwrap(); let _data: ContextA = serde_json::from_value(value).unwrap(); let data = json!({ "a": "some new data reported", "b": 7 }); - match context.lock().as_mut().unwrap().write_output(&data) { + match context.lock().as_mut().unwrap().write(&data) { Ok(()) => Ok(()), Err(e) => Err(StateError::StorageAccess( StateErrorRecoverability::Recoverable, @@ -297,7 +297,7 @@ mod test { let context = wrap_context(ContextA::new(String::from("hello"), 7)); let result = state_machine.execute(context.clone()); - let last_ctx_message = context.lock().unwrap().read_input().unwrap(); + let last_ctx_message = context.lock().unwrap().read().unwrap(); assert_eq!(state_machine.states.len(), iss.len()); diff --git a/mfm_machine/src/state_machine/tracker.rs b/mfm_machine/src/state_machine/tracker.rs index d368d08..9d3d09a 100644 --- a/mfm_machine/src/state_machine/tracker.rs +++ b/mfm_machine/src/state_machine/tracker.rs @@ -1,19 +1,61 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; use anyhow::{anyhow, Error}; use crate::state::{context::ContextWrapper, Label, Tag}; -pub trait Tracker { +pub trait TrackerMetadata { + fn indexes(&self) -> Vec; + fn search_by_tag(&self, tag: &Tag) -> Vec; + fn history(&self) -> TrackerHistory; +} + +#[derive(Clone)] +pub struct TrackerHistory(Vec<(usize, Index, ContextWrapper)>); + +impl Debug for TrackerHistory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0 + .iter() + .map(|(history_id, index, _)| { + writeln!( + f, + "history_id ({}); index ({:?}); context (ptr)", + history_id, index + ) + }) + .collect() + } +} + +impl TrackerHistory { + pub fn new() -> Self { + TrackerHistory(Vec::new()) + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn push(&mut self, index: Index, context: ContextWrapper) { + self.0.push((self.len(), index, context)) + } +} + +impl IntoIterator for TrackerHistory { + type Item = (usize, Index, ContextWrapper); + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +//TODO: should consider add an state_metadata where the state have +// flexibility to say what kind of execution data he wants to store +pub trait Tracker: TrackerMetadata { fn track(&mut self, index: Index, context: ContextWrapper) -> Result; fn recover(&self, index: Index) -> Result; - // TODO: may be this Label should be tag? - // it may be an `search_by_tag`, and to do that we need - // to carry the tags of an state inside the index - // maybe and StateMetadata::from(state) can be an good idea, - // StateMetadata should be Hashable, Cloneable and Copiable - // - fn search_by_tag(&self, tag: &Tag) -> Vec; } // TODO: should it be public? may export methods to access it @@ -34,11 +76,17 @@ impl Index { } } -pub struct HashMapTracker(HashMap); +pub struct HashMapTracker { + tracker: HashMap, + history: TrackerHistory, +} impl HashMapTracker { pub fn new() -> Self { - HashMapTracker(HashMap::new()) + Self { + tracker: HashMap::new(), + history: TrackerHistory::new(), + } } } @@ -49,25 +97,37 @@ impl Default for HashMapTracker { } impl Tracker for HashMapTracker { + // TODO: add validations fn track(&mut self, index: Index, context: ContextWrapper) -> Result { - Ok(self.0.insert(index, context).is_none()) + self.history.push(index.clone(), context.clone()); + Ok(self.tracker.insert(index, context).is_none()) } fn recover(&self, index: Index) -> Result { - self.0 + self.tracker .get(&index) .cloned() .clone() .ok_or(anyhow!("index not found")) } +} +impl TrackerMetadata for HashMapTracker { fn search_by_tag(&self, tag: &Tag) -> Vec { - self.0 + self.tracker .keys() .filter(|index| index.state_tags.contains(tag)) .cloned() .collect() } + + fn indexes(&self) -> Vec { + self.tracker.keys().cloned().collect() + } + + fn history(&self) -> TrackerHistory { + self.history.clone() + } } #[cfg(test)] @@ -96,11 +156,11 @@ mod test { } impl Context for ContextA { - fn read_input(&self) -> Result { + fn read(&self) -> Result { serde_json::to_value(self).map_err(|e| anyhow!(e)) } - fn write_output(&mut self, value: &Value) -> Result<(), Error> { + fn write(&mut self, value: &Value) -> Result<(), Error> { let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?; self.a = ctx.a; self.b = ctx.b; @@ -143,8 +203,8 @@ mod test { for i in 0..indexes.len() { let context_recovered = tracker.recover(indexes[i].clone()).unwrap(); - let value_recovered = context_recovered.lock().unwrap().read_input().unwrap(); - let value_expected = contexts[i].lock().unwrap().read_input().unwrap(); + let value_recovered = context_recovered.lock().unwrap().read().unwrap(); + let value_expected = contexts[i].lock().unwrap().read().unwrap(); assert_eq!(value_expected, value_recovered); } @@ -191,4 +251,44 @@ mod test { Label::new("value_two").unwrap() ); } + + #[test] + fn test_list() { + let tracker: &mut dyn Tracker = &mut HashMapTracker::new(); + + let contexts: Vec = vec![ + wrap_context(ContextA::new("value".to_string(), 1)), + wrap_context(ContextA::new("value".to_string(), 2)), + wrap_context(ContextA::new("value".to_string(), 3)), + ]; + let indexes = vec![ + Index::new( + 1, + Label::new("value_one").unwrap(), + vec![Tag::new("tag_one").unwrap()], + ), + Index::new( + 2, + Label::new("value_two").unwrap(), + vec![Tag::new("tag_two").unwrap()], + ), + Index::new( + 3, + Label::new("value_three").unwrap(), + vec![Tag::new("tag_three").unwrap()], + ), + ]; + + for i in 0..indexes.len() { + tracker + .track(indexes[i].clone(), contexts[i].clone()) + .unwrap(); + } + + let indexes = tracker.indexes(); + + assert_eq!(indexes.len(), 3); + + println!("indexes: {:?}", indexes); + } } diff --git a/mfm_machine/tests/default_impls.rs b/mfm_machine/tests/default_impls.rs index 3cad381..153ef5e 100644 --- a/mfm_machine/tests/default_impls.rs +++ b/mfm_machine/tests/default_impls.rs @@ -28,11 +28,11 @@ impl ContextA { } impl Context for ContextA { - fn read_input(&self) -> Result { + fn read(&self) -> Result { serde_json::to_value(self).map_err(|e| anyhow!(e)) } - fn write_output(&mut self, value: &Value) -> Result<(), Error> { + fn write(&mut self, value: &Value) -> Result<(), Error> { let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?; self.a = ctx.a; self.b = ctx.b; @@ -66,13 +66,13 @@ impl Setup { impl StateHandler for Setup { fn handler(&self, context: ContextWrapper) -> StateResult { - let value = context.lock().unwrap().read_input().unwrap(); + let value = context.lock().unwrap().read().unwrap(); let _data: ContextA = serde_json::from_value(value).unwrap(); let mut rng = rand::thread_rng(); let data = json!({ "a": "setting up", "b": rng.gen_range(0..9) }); - match context.lock().as_mut().unwrap().write_output(&data) { + match context.lock().as_mut().unwrap().write(&data) { Ok(()) => Ok(()), Err(e) => Err(StateError::StorageAccess( StateErrorRecoverability::Recoverable, @@ -108,7 +108,7 @@ impl ComputePrice { impl StateHandler for ComputePrice { fn handler(&self, context: ContextWrapper) -> StateResult { - let value = context.lock().unwrap().read_input().unwrap(); + let value = context.lock().unwrap().read().unwrap(); let _data: ContextA = serde_json::from_value(value).unwrap(); if _data.b % 2 == 0 { return Err(StateError::ParsingInput( @@ -118,7 +118,7 @@ impl StateHandler for ComputePrice { } let data = json!({ "a": "the input number is odd", "b": _data.b }); - match context.lock().as_mut().unwrap().write_output(&data) { + match context.lock().as_mut().unwrap().write(&data) { Ok(()) => Ok(()), Err(e) => Err(StateError::StorageAccess( StateErrorRecoverability::Unrecoverable, @@ -154,11 +154,11 @@ impl Report { impl StateHandler for Report { fn handler(&self, context: ContextWrapper) -> StateResult { - let value = context.lock().unwrap().read_input().unwrap(); + let value = context.lock().unwrap().read().unwrap(); let _data: ContextA = serde_json::from_value(value).unwrap(); let data = json!({ "a": format!("{}: {}", "some new data reported", _data.a), "b": _data.b }); - match context.lock().as_mut().unwrap().write_output(&data) { + match context.lock().as_mut().unwrap().write(&data) { Ok(()) => Ok(()), Err(e) => Err(StateError::StorageAccess( StateErrorRecoverability::Recoverable, diff --git a/mfm_machine/tests/public_api_test.rs b/mfm_machine/tests/public_api_test.rs index 35cc070..963e884 100644 --- a/mfm_machine/tests/public_api_test.rs +++ b/mfm_machine/tests/public_api_test.rs @@ -34,7 +34,7 @@ fn test_state_machine_execute() { let context = wrap_context(ContextA::new(String::from("hello"), 7)); let result = state_machine.execute(context.clone()); - let last_ctx_message = context.lock().unwrap().read_input().unwrap(); + let last_ctx_message = context.lock().unwrap().read().unwrap(); assert_eq!(state_machine.states.len(), iss.len());