diff --git a/Cargo.lock b/Cargo.lock index fe570b6f31..b670d370b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7386,6 +7386,7 @@ dependencies = [ "tempfile", "termcolor", "test-case", + "tokio", "tremor-codec", "tremor-common", "tremor-influx", diff --git a/src/system.rs b/src/system.rs index 7a7905ee12..d024f04b33 100644 --- a/src/system.rs +++ b/src/system.rs @@ -24,24 +24,26 @@ use tremor_connectors::ConnectorBuilder; use tremor_script::{ast, highlighter::Highlighter}; use tremor_system::{ killswitch::{KillSwitch, ShutdownMode}, - selector::{PluginType, RuleSelector, RuleSelectorBuilder}, + selector::{PluginType, Rules, RulesBuilder}, }; /// Runtime builder for configuring the runtime /// note that includees and excludes are handled in order! -/// In other wordds using with_connector("foo").without_connector("foo") will result in foo being included +/// In other wordds using `with_connector("foo").without_connector("foo`") will result in foo being included /// this is especially important when using the type based inclusions and exclusions pub struct RuntimeBuilder { - connectors: RuleSelectorBuilder, + connectors: RulesBuilder, } impl RuntimeBuilder { /// Marks a given connector as includec by name + #[must_use] pub fn with_connector(mut self, connector: &str) -> Self { self.connectors = self.connectors.include(connector); self } /// Marks multiple connectors as includec by name + #[must_use] pub fn with_connectors(mut self, connectors: &[&str]) -> Self { for connector in connectors { self.connectors = self.connectors.include(*connector); @@ -49,11 +51,13 @@ impl RuntimeBuilder { self } /// Marks a given connector as excludec by name + #[must_use] pub fn without_connector(mut self, connector: &str) -> Self { self.connectors = self.connectors.exclude(connector); self } /// Marks multiple connectors as excludec by name + #[must_use] pub fn without_connectors(mut self, connectors: &[&str]) -> Self { for connector in connectors { self.connectors = self.connectors.exclude(*connector); @@ -62,33 +66,39 @@ impl RuntimeBuilder { } /// includes debug connectors + #[must_use] pub fn with_debug_connectors(mut self) -> Self { self.connectors = self.connectors.include(PluginType::Debug); self } /// excludes debug connectors + #[must_use] pub fn without_debug_connectors(mut self) -> Self { self.connectors = self.connectors.exclude(PluginType::Debug); self } /// includes all normal (non debug) connectors + #[must_use] pub fn with_normal_connectors(mut self) -> Self { self.connectors = self.connectors.include(PluginType::Normal); self } /// excludes all normal (non debug) connectors + #[must_use] pub fn without_normal_connectors(mut self) -> Self { self.connectors = self.connectors.exclude(PluginType::Normal); self } /// If no rule matches, include the connector + #[must_use] pub fn default_include_connectors(self) -> RuntimeConfig { let connectors = self.connectors.default_include(); RuntimeConfig { connectors } } /// If no rule matches, exclude the connector + #[must_use] pub fn default_exclude_connectors(self) -> RuntimeConfig { let connectors = self.connectors.default_exclude(); RuntimeConfig { connectors } @@ -99,11 +109,13 @@ impl RuntimeBuilder { /// Configuration for the runtime pub struct RuntimeConfig { /// if debug connectors should be loaded - connectors: RuleSelector, + connectors: Rules, } impl RuntimeConfig { /// Builds the runtime + /// # Errors + /// if the runtime can't be started pub async fn build(self) -> Result<(Runtime, JoinHandle>)> { Runtime::start(self).await } @@ -121,9 +133,10 @@ pub struct Runtime { impl Runtime { /// creates a runtime builder + #[must_use] pub fn builder() -> RuntimeBuilder { RuntimeBuilder { - connectors: RuleSelector::builder(), + connectors: Rules::builder(), } } /// Instantiate a flow from diff --git a/tremor-archive/src/lib.rs b/tremor-archive/src/lib.rs index 8811a093f7..3adeed9f2f 100644 --- a/tremor-archive/src/lib.rs +++ b/tremor-archive/src/lib.rs @@ -187,8 +187,6 @@ pub(crate) async fn build_archive_from_source( } }; let mut other_warnings = BTreeSet::new(); - let reg = &*FN_REGISTRY.read().map_err(|_| Error::ReadLock)?; - let helper = Helper::new(reg, &aggr_reg); for stmt in &deploy.deploy.stmts { match stmt { @@ -224,6 +222,9 @@ pub(crate) async fn build_archive_from_source( .0 .iter() .map(|(k, v)| { + let reg = &*FN_REGISTRY.read().map_err(|_| Error::ReadLock)?; + let helper = Helper::new(reg, &aggr_reg); + Ok(( k.to_string(), v.clone() @@ -286,16 +287,16 @@ pub(crate) async fn build_archive_from_source( header.set_cksum(); ar.append_data(&mut header, "main.troy", src.as_bytes()) .await?; - - for (id, paths) in MODULES + let modules = MODULES .read() .map_err(|_| Error::ReadLock)? .modules() .iter() - .map(|m| (m.arena_idx, m.paths())) - { + .map(|m| (m.arena_idx, m.paths().to_vec())) + .collect::>(); + for (id, paths) in modules { if let Some(src) = Arena::get(id)? { - for p in paths { + for p in &paths { let mut file: PathBuf = p.module().iter().collect(); file.push(p.id()); let mut header = Header::new_gnu(); diff --git a/tremor-common/src/alias.rs b/tremor-common/src/alias.rs index 8bdbfd3b3b..972e8cbdbd 100644 --- a/tremor-common/src/alias.rs +++ b/tremor-common/src/alias.rs @@ -211,7 +211,7 @@ mod test { let instance = Instance::from("test3"); assert_eq!(instance.to_string(), "test3"); - let instance = Instance::from(instance); + let instance = instance; assert_eq!(instance.to_string(), "test3"); } diff --git a/tremor-connectors/src/lib.rs b/tremor-connectors/src/lib.rs index 6471a5cabe..dbcf6cf7e5 100644 --- a/tremor-connectors/src/lib.rs +++ b/tremor-connectors/src/lib.rs @@ -936,7 +936,7 @@ pub struct ConnectorType(String); impl ConnectorType { /// name of the connector - pub fn name(&self) -> &str { + #[must_use] pub fn name(&self) -> &str { self.0.as_str() } } diff --git a/tremor-script/Cargo.toml b/tremor-script/Cargo.toml index f74a213f30..1a03b35e3c 100644 --- a/tremor-script/Cargo.toml +++ b/tremor-script/Cargo.toml @@ -59,6 +59,7 @@ tremor-kv = "0.6" unicode-xid = "0.2" url = "2" xz2 = "0.1" +tokio = { version = "1", default-features = false, features = [] } [build-dependencies] lalrpop = "0.20" diff --git a/tremor-system/src/selector.rs b/tremor-system/src/selector.rs index 6cb60c2a8d..8339b8945d 100644 --- a/tremor-system/src/selector.rs +++ b/tremor-system/src/selector.rs @@ -26,8 +26,10 @@ pub enum PluginType { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)] /// Single selector rule +#[derive(Default)] pub enum Selector { /// Select all plugins + #[default] All, /// Select plugins by name Name(String), @@ -35,11 +37,6 @@ pub enum Selector { Type(PluginType), } -impl Default for Selector { - fn default() -> Self { - Selector::All - } -} impl Selector { /// Test if a plugin should be selected pub fn test(&self, name: &(impl AsRef + ?Sized), t: PluginType) -> bool { @@ -75,18 +72,18 @@ pub enum Deposition { Exclude, } -impl Into for bool { - fn into(self) -> Deposition { - if self { +impl From for Deposition { + fn from(val: bool) -> Self { + if val { Deposition::Include } else { Deposition::Exclude } } } -impl Into for Deposition { - fn into(self) -> bool { - match self { +impl From for bool { + fn from(val: Deposition) -> Self { + match val { Deposition::Include => true, Deposition::Exclude => false, } @@ -95,48 +92,52 @@ impl Into for Deposition { #[derive(Debug, Clone)] /// Selector combiner for includes and excludes. Excludes take precedence over incluedes. -pub struct RuleSelector { +pub struct Rules { rules: Vec<(Selector, Deposition)>, default: Deposition, } /// Rule selector builder -pub struct RuleSelectorBuilder { +pub struct RulesBuilder { rules: Vec<(Selector, Deposition)>, } -impl RuleSelectorBuilder { +impl RulesBuilder { /// adds an include rule - + #[must_use] pub fn include(mut self, s: impl Into) -> Self { self.rules.push((s.into(), Deposition::Include)); self } /// adds an exclude rule + #[must_use] pub fn exclude(mut self, s: impl Into) -> Self { self.rules.push((s.into(), Deposition::Exclude)); self } /// default disposition include - pub fn default_include(self) -> RuleSelector { - RuleSelector { + #[must_use] + pub fn default_include(self) -> Rules { + Rules { rules: self.rules, default: Deposition::Include, } } /// default disposition exclude - pub fn default_exclude(self) -> RuleSelector { - RuleSelector { + #[must_use] + pub fn default_exclude(self) -> Rules { + Rules { rules: self.rules, default: Deposition::Exclude, } } } -impl RuleSelector { +impl Rules { /// Create a new rule selector builder - pub fn builder() -> RuleSelectorBuilder { - RuleSelectorBuilder { rules: vec![] } + #[must_use] + pub fn builder() -> RulesBuilder { + RulesBuilder { rules: vec![] } } /// Test if a plugin should be selected pub fn test(&self, name: &(impl AsRef + ?Sized), t: PluginType) -> bool { @@ -161,25 +162,25 @@ mod test { } #[test] fn test_rule_selector() { - let rs = RuleSelector::builder() + let rs = Rules::builder() .include("foo") .exclude(PluginType::Debug) .default_include(); - assert_eq!(rs.test("foo", PluginType::Normal), true); - assert_eq!(rs.test("foo", PluginType::Debug), true); - assert_eq!(rs.test("bar", PluginType::Normal), true); - assert_eq!(rs.test("bar", PluginType::Debug), false); + assert!(rs.test("foo", PluginType::Normal)); + assert!(rs.test("foo", PluginType::Debug)); + assert!(rs.test("bar", PluginType::Normal)); + assert!(!rs.test("bar", PluginType::Debug)); } #[test] fn test_rule_selector_builder() { - let rs = RuleSelector::builder() + let rs = Rules::builder() .include("foo") .exclude(PluginType::Debug) .default_exclude(); - assert_eq!(rs.test("foo", PluginType::Normal), true); - assert_eq!(rs.test("foo", PluginType::Debug), true); - assert_eq!(rs.test("bar", PluginType::Normal), false); - assert_eq!(rs.test("bar", PluginType::Debug), false); + assert!(rs.test("foo", PluginType::Normal)); + assert!(rs.test("foo", PluginType::Debug)); + assert!(!rs.test("bar", PluginType::Normal)); + assert!(!rs.test("bar", PluginType::Debug)); } }