diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 009e1b81c..c68f16686 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -24,12 +24,12 @@ use dora_message::{ use eyre::{bail, WrapErr}; use shared_memory_extended::{Shmem, ShmemConf}; use std::{ - collections::{HashMap, VecDeque}, + collections::{BTreeSet, HashMap, VecDeque}, ops::{Deref, DerefMut}, sync::Arc, time::Duration, }; -use tracing::info; +use tracing::{info, warn}; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; @@ -52,6 +52,7 @@ pub struct DoraNode { cache: VecDeque, dataflow_descriptor: Descriptor, + warn_once: BTreeSet, } impl DoraNode { @@ -157,10 +158,25 @@ impl DoraNode { drop_stream, cache: VecDeque::new(), dataflow_descriptor, + warn_once: BTreeSet::new(), }; Ok((node, event_stream)) } + fn validate_output(&mut self, output_id: &DataId) -> bool { + if !self.node_config.outputs.contains(output_id) { + if self.warn_once.contains(output_id) { + warn!("Output {output_id} is not part of the node's output list."); + self.warn_once.insert(output_id.clone()); + } else { + self.warn_once.insert(output_id.clone()); + } + false + } else { + true + } + } + /// Send data from the node to the other nodes. /// We take a closure as an input to enable zero copy on send. /// @@ -194,6 +210,9 @@ impl DoraNode { where F: FnOnce(&mut [u8]), { + if !self.validate_output(&output_id) { + return Ok(()); + }; let mut sample = self.allocate_data_sample(data_len)?; data(&mut sample); @@ -208,6 +227,10 @@ impl DoraNode { parameters: MetadataParameters, data: impl Array, ) -> eyre::Result<()> { + if !self.validate_output(&output_id) { + return Ok(()); + }; + let arrow_array = data.to_data(); let total_len = required_data_size(&arrow_array); @@ -228,6 +251,9 @@ impl DoraNode { data_len: usize, data: &[u8], ) -> eyre::Result<()> { + if !self.validate_output(&output_id) { + return Ok(()); + }; self.send_output_raw(output_id, parameters, data_len, |sample| { sample.copy_from_slice(data) }) @@ -244,6 +270,10 @@ impl DoraNode { where F: FnOnce(&mut [u8]), { + if !self.validate_output(&output_id) { + return Ok(()); + }; + let mut sample = self.allocate_data_sample(data_len)?; data(&mut sample); @@ -259,9 +289,6 @@ impl DoraNode { ) -> eyre::Result<()> { self.handle_finished_drop_tokens()?; - if !self.node_config.outputs.contains(&output_id) { - eyre::bail!("unknown dora node output `{output_id}` called by `send_output`. Double-check if this output is defined within your dataflow YAML file.",); - } let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters); let (data, shmem) = match sample {