Skip to content

Commit

Permalink
feat(tracker): add history capabilities for all tracks
Browse files Browse the repository at this point in the history
  • Loading branch information
willyrgf committed Nov 26, 2023
1 parent c6dd615 commit 77c6853
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 62 deletions.
22 changes: 7 additions & 15 deletions mfm_machine/src/state/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use serde_json::Value;
pub type ContextWrapper = Arc<Mutex<Box<dyn Context>>>;

pub trait Context {
fn read_input(&self) -> Result<Value, Error>;
fn write_output(&mut self, value: &Value) -> Result<(), Error>;
fn read(&self) -> Result<Value, Error>;
fn write(&mut self, value: &Value) -> Result<(), Error>;
}

pub fn wrap_context<C: Context + 'static>(context: C) -> ContextWrapper {
Expand Down Expand Up @@ -35,11 +35,11 @@ mod test {
}

impl Context for ContextA {
fn read_input(&self) -> Result<Value, Error> {
fn read(&self) -> Result<Value, Error> {
serde_json::to_value(self).map_err(|e| anyhow!(e))
}

fn write_output(&mut self, value: &Value) -> Result<(), Error> {
fn write(&mut self, value: &Value) -> Result<(), Error> {
let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?;
self.a = ctx.a;
self.b = ctx.b;
Expand All @@ -52,18 +52,10 @@ mod test {
let context_a: &mut dyn Context = &mut ContextA::_new(String::from("hello"), 7);
let context_b: &dyn Context = &ContextA::_new(String::from("hellow"), 9);

assert_ne!(
context_a.read_input().unwrap(),
context_b.read_input().unwrap()
);
assert_ne!(context_a.read().unwrap(), context_b.read().unwrap());

context_a
.write_output(&context_b.read_input().unwrap())
.unwrap();
context_a.write(&context_b.read().unwrap()).unwrap();

assert_eq!(
context_a.read_input().unwrap(),
context_b.read_input().unwrap()
);
assert_eq!(context_a.read().unwrap(), context_b.read().unwrap());
}
}
40 changes: 20 additions & 20 deletions mfm_machine/src/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ impl StateMachine {
}

let state = &self.states[state_index];
let tracker = self.tracker.as_mut();
// TODO: should state.label() return an &Label or Label?
// TODO: remove this unwrap for proper handling
tracker.track(
Index::new(state_index, state.label(), state.tags()),
context.clone(),
);

// if thats true, means that no state was executed before and this is the first one
if last_state_result.is_none() {
Expand All @@ -99,13 +92,16 @@ impl StateMachine {
// we should implement an well defined rule for the whole dependency
// system between states, and follow this definition here as well.
let state_depends_on = state.depends_on();
let indexes_state_deps =
tracker.search_by_tag(state_depends_on.first().unwrap());
let indexes_state_deps = self
.tracker
.search_by_tag(state_depends_on.first().unwrap());

let last_index_of_first_dep = indexes_state_deps.last().unwrap().clone();

let last_index_state_ctx =
tracker.recover(last_index_of_first_dep.clone()).unwrap();
let last_index_state_ctx = self
.tracker
.recover(last_index_of_first_dep.clone())
.unwrap();

context = last_index_state_ctx.clone();

Expand Down Expand Up @@ -134,9 +130,13 @@ impl StateMachine {
return Ok(());
}

let current_state = &self.states[next_state_index];
let state = &self.states[next_state_index];

let result = current_state.handler(context.clone());
let result = state.handler(context.clone());
self.tracker.as_mut().track(
Index::new(next_state_index, state.label(), state.tags()),
context.clone(),
);

Check warning on line 139 in mfm_machine/src/state_machine/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unused `std::result::Result` that must be used

warning: unused `std::result::Result` that must be used --> mfm_machine/src/state_machine/mod.rs:136:9 | 136 | / self.tracker.as_mut().track( 137 | | Index::new(next_state_index, state.label(), state.tags()), 138 | | context.clone(), 139 | | ); | |_________^ | = note: this `Result` may be an `Err` variant, which should be handled = note: `#[warn(unused_must_use)]` on by default help: use `let _ = ...` to ignore the resulting value | 136 | let _ = self.tracker.as_mut().track( | +++++++

self.execute_rec(context, next_state_index, Option::Some(result))
}
Expand Down Expand Up @@ -176,11 +176,11 @@ mod test {
}

impl Context for ContextA {
fn read_input(&self) -> Result<Value, Error> {
fn read(&self) -> Result<Value, Error> {
serde_json::to_value(self).map_err(|e| anyhow!(e))
}

fn write_output(&mut self, value: &Value) -> Result<(), Error> {
fn write(&mut self, value: &Value) -> Result<(), Error> {
let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?;
self.a = ctx.a;
self.b = ctx.b;
Expand Down Expand Up @@ -209,10 +209,10 @@ mod test {

impl StateHandler for Setup {
fn handler(&self, context: ContextWrapper) -> StateResult {
let value = context.lock().unwrap().read_input().unwrap();
let value = context.lock().unwrap().read().unwrap();
let _data: ContextA = serde_json::from_value(value).unwrap();
let data = json!({ "a": "setting up", "b": 1 });
match context.lock().as_mut().unwrap().write_output(&data) {
match context.lock().as_mut().unwrap().write(&data) {
Ok(()) => Ok(()),
Err(e) => Err(StateError::StorageAccess(
StateErrorRecoverability::Recoverable,
Expand Down Expand Up @@ -244,10 +244,10 @@ mod test {

impl StateHandler for Report {
fn handler(&self, context: ContextWrapper) -> StateResult {
let value = context.lock().unwrap().read_input().unwrap();
let value = context.lock().unwrap().read().unwrap();
let _data: ContextA = serde_json::from_value(value).unwrap();
let data = json!({ "a": "some new data reported", "b": 7 });
match context.lock().as_mut().unwrap().write_output(&data) {
match context.lock().as_mut().unwrap().write(&data) {
Ok(()) => Ok(()),
Err(e) => Err(StateError::StorageAccess(
StateErrorRecoverability::Recoverable,
Expand Down Expand Up @@ -297,7 +297,7 @@ mod test {

let context = wrap_context(ContextA::new(String::from("hello"), 7));
let result = state_machine.execute(context.clone());
let last_ctx_message = context.lock().unwrap().read_input().unwrap();
let last_ctx_message = context.lock().unwrap().read().unwrap();

assert_eq!(state_machine.states.len(), iss.len());

Expand Down
136 changes: 118 additions & 18 deletions mfm_machine/src/state_machine/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,61 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt::Debug};

use anyhow::{anyhow, Error};

use crate::state::{context::ContextWrapper, Label, Tag};

pub trait Tracker {
pub trait TrackerMetadata {
fn indexes(&self) -> Vec<Index>;
fn search_by_tag(&self, tag: &Tag) -> Vec<Index>;
fn history(&self) -> TrackerHistory;
}

#[derive(Clone)]
pub struct TrackerHistory(Vec<(usize, Index, ContextWrapper)>);

impl Debug for TrackerHistory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0
.iter()
.map(|(history_id, index, _)| {
writeln!(
f,
"history_id ({}); index ({:?}); context (ptr)",
history_id, index
)
})
.collect()

Check warning on line 27 in mfm_machine/src/state_machine/tracker.rs

View workflow job for this annotation

GitHub Actions / clippy

`.map().collect()` can be replaced with `.try_for_each()`

warning: `.map().collect()` can be replaced with `.try_for_each()` --> mfm_machine/src/state_machine/tracker.rs:18:9 | 18 | / self.0 19 | | .iter() 20 | | .map(|(history_id, index, _)| { 21 | | writeln!( ... | 26 | | }) 27 | | .collect() | |______________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#map_collect_result_unit = note: `#[warn(clippy::map_collect_result_unit)]` on by default help: try | 18 ~ self.0 19 + .iter().try_for_each(|(history_id, index, _)| { 20 + writeln!( 21 + f, 22 + "history_id ({}); index ({:?}); context (ptr)", 23 + history_id, index 24 + ) 25 + }) |
}
}

impl TrackerHistory {
pub fn new() -> Self {
TrackerHistory(Vec::new())
}

Check warning on line 34 in mfm_machine/src/state_machine/tracker.rs

View workflow job for this annotation

GitHub Actions / clippy

you should consider adding a `Default` implementation for `TrackerHistory`

warning: you should consider adding a `Default` implementation for `TrackerHistory` --> mfm_machine/src/state_machine/tracker.rs:32:5 | 32 | / pub fn new() -> Self { 33 | | TrackerHistory(Vec::new()) 34 | | } | |_____^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#new_without_default = note: `#[warn(clippy::new_without_default)]` on by default help: try adding this | 31 + impl Default for TrackerHistory { 32 + fn default() -> Self { 33 + Self::new() 34 + } 35 + } |

pub fn len(&self) -> usize {

Check warning on line 36 in mfm_machine/src/state_machine/tracker.rs

View workflow job for this annotation

GitHub Actions / clippy

struct `TrackerHistory` has a public `len` method, but no `is_empty` method

warning: struct `TrackerHistory` has a public `len` method, but no `is_empty` method --> mfm_machine/src/state_machine/tracker.rs:36:5 | 36 | pub fn len(&self) -> usize { | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#len_without_is_empty = note: `#[warn(clippy::len_without_is_empty)]` on by default
self.0.len()
}

pub fn push(&mut self, index: Index, context: ContextWrapper) {
self.0.push((self.len(), index, context))
}
}

impl IntoIterator for TrackerHistory {
type Item = (usize, Index, ContextWrapper);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

//TODO: should consider add an state_metadata where the state have
// flexibility to say what kind of execution data he wants to store
pub trait Tracker: TrackerMetadata {
fn track(&mut self, index: Index, context: ContextWrapper) -> Result<bool, Error>;
fn recover(&self, index: Index) -> Result<ContextWrapper, Error>;
// TODO: may be this Label should be tag?
// it may be an `search_by_tag`, and to do that we need
// to carry the tags of an state inside the index
// maybe and StateMetadata::from(state) can be an good idea,
// StateMetadata should be Hashable, Cloneable and Copiable
//
fn search_by_tag(&self, tag: &Tag) -> Vec<Index>;
}

// TODO: should it be public? may export methods to access it
Expand All @@ -34,11 +76,17 @@ impl Index {
}
}

pub struct HashMapTracker(HashMap<Index, ContextWrapper>);
pub struct HashMapTracker {
tracker: HashMap<Index, ContextWrapper>,
history: TrackerHistory,
}

impl HashMapTracker {
pub fn new() -> Self {
HashMapTracker(HashMap::new())
Self {
tracker: HashMap::new(),
history: TrackerHistory::new(),
}
}
}

Expand All @@ -49,25 +97,37 @@ impl Default for HashMapTracker {
}

impl Tracker for HashMapTracker {
// TODO: add validations
fn track(&mut self, index: Index, context: ContextWrapper) -> Result<bool, Error> {
Ok(self.0.insert(index, context).is_none())
self.history.push(index.clone(), context.clone());
Ok(self.tracker.insert(index, context).is_none())
}

fn recover(&self, index: Index) -> Result<ContextWrapper, Error> {
self.0
self.tracker
.get(&index)
.cloned()
.clone()
.ok_or(anyhow!("index not found"))
}
}

impl TrackerMetadata for HashMapTracker {
fn search_by_tag(&self, tag: &Tag) -> Vec<Index> {
self.0
self.tracker
.keys()
.filter(|index| index.state_tags.contains(tag))
.cloned()
.collect()
}

fn indexes(&self) -> Vec<Index> {
self.tracker.keys().cloned().collect()
}

fn history(&self) -> TrackerHistory {
self.history.clone()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -96,11 +156,11 @@ mod test {
}

impl Context for ContextA {
fn read_input(&self) -> Result<Value, Error> {
fn read(&self) -> Result<Value, Error> {
serde_json::to_value(self).map_err(|e| anyhow!(e))
}

fn write_output(&mut self, value: &Value) -> Result<(), Error> {
fn write(&mut self, value: &Value) -> Result<(), Error> {
let ctx: ContextA = serde_json::from_value(value.clone()).map_err(|e| anyhow!(e))?;
self.a = ctx.a;
self.b = ctx.b;
Expand Down Expand Up @@ -143,8 +203,8 @@ mod test {
for i in 0..indexes.len() {
let context_recovered = tracker.recover(indexes[i].clone()).unwrap();

let value_recovered = context_recovered.lock().unwrap().read_input().unwrap();
let value_expected = contexts[i].lock().unwrap().read_input().unwrap();
let value_recovered = context_recovered.lock().unwrap().read().unwrap();
let value_expected = contexts[i].lock().unwrap().read().unwrap();

assert_eq!(value_expected, value_recovered);
}
Expand Down Expand Up @@ -191,4 +251,44 @@ mod test {
Label::new("value_two").unwrap()
);
}

#[test]
fn test_list() {
let tracker: &mut dyn Tracker = &mut HashMapTracker::new();

let contexts: Vec<ContextWrapper> = vec![
wrap_context(ContextA::new("value".to_string(), 1)),
wrap_context(ContextA::new("value".to_string(), 2)),
wrap_context(ContextA::new("value".to_string(), 3)),
];
let indexes = vec![
Index::new(
1,
Label::new("value_one").unwrap(),
vec![Tag::new("tag_one").unwrap()],
),
Index::new(
2,
Label::new("value_two").unwrap(),
vec![Tag::new("tag_two").unwrap()],
),
Index::new(
3,
Label::new("value_three").unwrap(),
vec![Tag::new("tag_three").unwrap()],
),
];

for i in 0..indexes.len() {
tracker
.track(indexes[i].clone(), contexts[i].clone())
.unwrap();
}

let indexes = tracker.indexes();

assert_eq!(indexes.len(), 3);

println!("indexes: {:?}", indexes);
}
}
Loading

0 comments on commit 77c6853

Please sign in to comment.