Skip to content

Commit

Permalink
Make unknown output acceptable (#755)
Browse files Browse the repository at this point in the history
Make unknown output acceptable and warn once in case of typo
  • Loading branch information
haixuanTao authored Jan 15, 2025
1 parent 2148ec2 commit 2d9d27b
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use dora_message::{
use eyre::{bail, WrapErr};
use shared_memory_extended::{Shmem, ShmemConf};
use std::{
collections::{HashMap, VecDeque},
collections::{BTreeSet, HashMap, VecDeque},
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use tracing::info;
use tracing::{info, warn};

#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
Expand All @@ -52,6 +52,7 @@ pub struct DoraNode {
cache: VecDeque<ShmemHandle>,

dataflow_descriptor: Descriptor,
warned_unknown_output: BTreeSet<DataId>,
}

impl DoraNode {
Expand Down Expand Up @@ -157,10 +158,23 @@ impl DoraNode {
drop_stream,
cache: VecDeque::new(),
dataflow_descriptor,
warned_unknown_output: BTreeSet::new(),
};
Ok((node, event_stream))
}

fn validate_output(&mut self, output_id: &DataId) -> bool {
if !self.node_config.outputs.contains(output_id) {
if !self.warned_unknown_output.contains(output_id) {
warn!("Ignoring output `{output_id}` not in node's output list.");
self.warned_unknown_output.insert(output_id.clone());
}
false
} else {
true
}
}

/// Send data from the node to the other nodes.
/// We take a closure as an input to enable zero copy on send.
///
Expand Down Expand Up @@ -194,6 +208,9 @@ impl DoraNode {
where
F: FnOnce(&mut [u8]),
{
if !self.validate_output(&output_id) {
return Ok(());
};
let mut sample = self.allocate_data_sample(data_len)?;
data(&mut sample);

Expand All @@ -208,6 +225,10 @@ impl DoraNode {
parameters: MetadataParameters,
data: impl Array,
) -> eyre::Result<()> {
if !self.validate_output(&output_id) {
return Ok(());
};

let arrow_array = data.to_data();

let total_len = required_data_size(&arrow_array);
Expand All @@ -228,6 +249,9 @@ impl DoraNode {
data_len: usize,
data: &[u8],
) -> eyre::Result<()> {
if !self.validate_output(&output_id) {
return Ok(());
};
self.send_output_raw(output_id, parameters, data_len, |sample| {
sample.copy_from_slice(data)
})
Expand All @@ -244,6 +268,10 @@ impl DoraNode {
where
F: FnOnce(&mut [u8]),
{
if !self.validate_output(&output_id) {
return Ok(());
};

let mut sample = self.allocate_data_sample(data_len)?;
data(&mut sample);

Expand All @@ -259,9 +287,6 @@ impl DoraNode {
) -> eyre::Result<()> {
self.handle_finished_drop_tokens()?;

if !self.node_config.outputs.contains(&output_id) {
eyre::bail!("unknown dora node output `{output_id}` called by `send_output`. Double-check if this output is defined within your dataflow YAML file.",);
}
let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);

let (data, shmem) = match sample {
Expand Down

0 comments on commit 2d9d27b

Please sign in to comment.