From 2d9d27bf0daf2ad7ae217a7da168ef8e82e05f88 Mon Sep 17 00:00:00 2001 From: Haixuan Xavier Tao Date: Wed, 15 Jan 2025 17:37:44 +0100 Subject: [PATCH] Make unknown output acceptable (#755) Make unknown output acceptable and warn once in case of typo --- apis/rust/node/src/node/mod.rs | 35 +++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 009e1b81c..29340ea20 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -24,12 +24,12 @@ use dora_message::{ use eyre::{bail, WrapErr}; use shared_memory_extended::{Shmem, ShmemConf}; use std::{ - collections::{HashMap, VecDeque}, + collections::{BTreeSet, HashMap, VecDeque}, ops::{Deref, DerefMut}, sync::Arc, time::Duration, }; -use tracing::info; +use tracing::{info, warn}; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; @@ -52,6 +52,7 @@ pub struct DoraNode { cache: VecDeque, dataflow_descriptor: Descriptor, + warned_unknown_output: BTreeSet, } impl DoraNode { @@ -157,10 +158,23 @@ impl DoraNode { drop_stream, cache: VecDeque::new(), dataflow_descriptor, + warned_unknown_output: BTreeSet::new(), }; Ok((node, event_stream)) } + fn validate_output(&mut self, output_id: &DataId) -> bool { + if !self.node_config.outputs.contains(output_id) { + if !self.warned_unknown_output.contains(output_id) { + warn!("Ignoring output `{output_id}` not in node's output list."); + self.warned_unknown_output.insert(output_id.clone()); + } + false + } else { + true + } + } + /// Send data from the node to the other nodes. /// We take a closure as an input to enable zero copy on send. /// @@ -194,6 +208,9 @@ impl DoraNode { where F: FnOnce(&mut [u8]), { + if !self.validate_output(&output_id) { + return Ok(()); + }; let mut sample = self.allocate_data_sample(data_len)?; data(&mut sample); @@ -208,6 +225,10 @@ impl DoraNode { parameters: MetadataParameters, data: impl Array, ) -> eyre::Result<()> { + if !self.validate_output(&output_id) { + return Ok(()); + }; + let arrow_array = data.to_data(); let total_len = required_data_size(&arrow_array); @@ -228,6 +249,9 @@ impl DoraNode { data_len: usize, data: &[u8], ) -> eyre::Result<()> { + if !self.validate_output(&output_id) { + return Ok(()); + }; self.send_output_raw(output_id, parameters, data_len, |sample| { sample.copy_from_slice(data) }) @@ -244,6 +268,10 @@ impl DoraNode { where F: FnOnce(&mut [u8]), { + if !self.validate_output(&output_id) { + return Ok(()); + }; + let mut sample = self.allocate_data_sample(data_len)?; data(&mut sample); @@ -259,9 +287,6 @@ impl DoraNode { ) -> eyre::Result<()> { self.handle_finished_drop_tokens()?; - if !self.node_config.outputs.contains(&output_id) { - eyre::bail!("unknown dora node output `{output_id}` called by `send_output`. Double-check if this output is defined within your dataflow YAML file.",); - } let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters); let (data, shmem) = match sample {