From d306ca6f24691c0c19b97f42e9ea3b6b1bd99c45 Mon Sep 17 00:00:00 2001 From: Neng Li Date: Tue, 27 Aug 2024 14:33:26 +0800 Subject: [PATCH] refactor(interactive): Pass worker_id as a parameter instead of lazy_static (#4060) Fixes #4173 --- .../engine/pegasus/common/src/codec/mod.rs | 1 + .../engine/pegasus/network/src/send/mod.rs | 1 + .../pegasus/src/communication/channel.rs | 55 ++++++++++++------- .../src/communication/decorator/aggregate.rs | 14 +++-- .../src/communication/decorator/broadcast.rs | 9 +-- .../src/communication/decorator/evented.rs | 12 ++-- .../src/communication/decorator/exchange.rs | 48 ++++++++++------ .../src/communication/decorator/mod.rs | 4 +- .../pegasus/src/communication/input/input.rs | 7 ++- .../pegasus/src/communication/input/mod.rs | 5 +- .../pegasus/pegasus/src/communication/mod.rs | 6 +- .../src/communication/output/builder.rs | 3 +- .../src/communication/output/output.rs | 3 +- .../pegasus/src/communication/output/tee.rs | 3 +- .../engine/pegasus/pegasus/src/dataflow.rs | 15 +++-- .../engine/pegasus/pegasus/src/lib.rs | 2 +- .../pegasus/src/operator/concise/any.rs | 2 +- .../pegasus/src/operator/concise/correlate.rs | 17 +++--- .../pegasus/src/operator/concise/count.rs | 29 +++++----- .../pegasus/src/operator/concise/fold.rs | 12 ++-- .../pegasus/src/operator/concise/keyed/mod.rs | 2 +- .../src/operator/iteration/feedback.rs | 3 +- .../pegasus/src/operator/iteration/mod.rs | 3 +- .../pegasus/pegasus/src/operator/mod.rs | 16 ++++-- .../pegasus/src/operator/primitives/unary.rs | 2 +- .../engine/pegasus/pegasus/src/progress.rs | 17 ++---- .../pegasus/pegasus/src/schedule/mod.rs | 6 +- .../pegasus/pegasus/src/schedule/operator.rs | 16 ++++-- .../pegasus/src/schedule/state/inbound.rs | 15 ++--- .../engine/pegasus/pegasus/src/stream.rs | 1 + .../engine/pegasus/pegasus/src/worker.rs | 14 +++-- 31 files changed, 205 insertions(+), 138 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/common/src/codec/mod.rs b/interactive_engine/executor/engine/pegasus/common/src/codec/mod.rs index e4fac682904c..6ff00116996f 100644 --- a/interactive_engine/executor/engine/pegasus/common/src/codec/mod.rs +++ b/interactive_engine/executor/engine/pegasus/common/src/codec/mod.rs @@ -344,6 +344,7 @@ impl Decode for Option { mod shade; mod third_party; + pub use shade::ShadeCodec; #[cfg(feature = "serde")] pub use third_party::serde_bin as serde; diff --git a/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs b/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs index f6d3fecceba5..743f238d1652 100644 --- a/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs +++ b/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs @@ -31,6 +31,7 @@ use crate::message::MessageHeader; use crate::{NetError, Server}; mod encode; + pub use encode::{GeneralEncoder, MessageEncoder, SimpleEncoder, SlabEncoder}; mod net_tx; diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/channel.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/channel.rs index 45d13c33dfab..94c7aefcb777 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/channel.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/channel.rs @@ -162,14 +162,13 @@ impl Channel { } impl Channel { - fn build_pipeline(self, target: Port, id: ChannelId) -> MaterializedChannel { + fn build_pipeline(self, target: Port, id: ChannelId, worker_id: u32) -> MaterializedChannel { let (tx, rx) = crate::data_plane::pipeline::>(id); let scope_level = self.get_scope_level(); let ch_info = ChannelInfo::new(id, scope_level, 1, 1, self.source, target); - let push = MicroBatchPush::Pipeline(LocalMicroBatchPush::new(ch_info, tx)); - let worker = crate::worker_id::get_current_worker().index; - let ch = CancelHandle::SC(SingleConsCancel::new(worker)); - let push = PerChannelPush::new(ch_info, self.scope_delta, push, ch); + let push = MicroBatchPush::Pipeline(LocalMicroBatchPush::new(ch_info, tx, worker_id)); + let ch = CancelHandle::SC(SingleConsCancel::new(worker_id)); + let push = PerChannelPush::new(ch_info, self.scope_delta, push, ch, worker_id); MaterializedChannel { push, pull: rx.into(), notify: None } } @@ -179,14 +178,14 @@ impl Channel { (ChannelInfo, Vec>, GeneralPull>, GeneralPush>), BuildJobError, > { - let (mut raw, pull) = crate::communication::build_channel::>(id, &dfb.config)?.take(); - let worker_index = crate::worker_id::get_current_worker().index as usize; + let (mut raw, pull) = + crate::communication::build_channel::>(id, &dfb.config, dfb.worker_id)?.take(); + let worker_index = dfb.worker_id.index as usize; let notify = raw.swap_remove(worker_index); let ch_info = ChannelInfo::new(id, scope_level, raw.len(), raw.len(), self.source, target); let mut pushes = Vec::with_capacity(raw.len()); - let source = dfb.worker_id.index; for (idx, p) in raw.into_iter().enumerate() { - let push = EventEmitPush::new(ch_info, source, idx as u32, p, dfb.event_emitter.clone()); + let push = EventEmitPush::new(ch_info, dfb.worker_id, idx as u32, p, dfb.event_emitter.clone()); pushes.push(push); } Ok((ch_info, pushes, pull, notify)) @@ -212,12 +211,12 @@ impl Channel { } if dfb.worker_id.total_peers() == 1 { - return Ok(self.build_pipeline(target, id)); + return Ok(self.build_pipeline(target, id, dfb.worker_id.index)); } let kind = std::mem::replace(&mut self.kind, ChannelKind::Pipeline); match kind { - ChannelKind::Pipeline => Ok(self.build_pipeline(target, id)), + ChannelKind::Pipeline => Ok(self.build_pipeline(target, id, dfb.worker_id.index)), ChannelKind::Shuffle(r) => { let (info, pushes, pull, notify) = self.build_remote(scope_level, target, id, dfb)?; let mut buffers = Vec::with_capacity(pushes.len()); @@ -225,38 +224,56 @@ impl Channel { let b = ScopeBufferPool::new(batch_size, batch_capacity, scope_level); buffers.push(b); } - let push = ExchangeByDataPush::new(info, r, buffers, pushes); + let push = ExchangeByDataPush::new(info, r, buffers, pushes, dfb.worker_id); let ch = push.get_cancel_handle(); - let push = PerChannelPush::new(info, self.scope_delta, MicroBatchPush::Exchange(push), ch); + let push = PerChannelPush::new( + info, + self.scope_delta, + MicroBatchPush::Exchange(push), + ch, + dfb.worker_id.index, + ); Ok(MaterializedChannel { push, pull: pull.into(), notify: Some(notify) }) } ChannelKind::BatchShuffle(route) => { let (info, pushes, pull, notify) = self.build_remote(scope_level, target, id, dfb)?; - let push = ExchangeByBatchPush::new(info, route, pushes); + let push = ExchangeByBatchPush::new(info, route, pushes, dfb.worker_id); let cancel = push.get_cancel_handle(); let push = PerChannelPush::new( info, self.scope_delta, MicroBatchPush::ExchangeByBatch(push), cancel, + dfb.worker_id.index, ); Ok(MaterializedChannel { push, pull: pull.into(), notify: Some(notify) }) } ChannelKind::Broadcast => { let (info, pushes, pull, notify) = self.build_remote(scope_level, target, id, dfb)?; - let push = BroadcastBatchPush::new(info, pushes); + let push = BroadcastBatchPush::new(info, pushes, dfb.worker_id.total_peers()); let ch = push.get_cancel_handle(); - let push = PerChannelPush::new(info, self.scope_delta, MicroBatchPush::Broadcast(push), ch); + let push = PerChannelPush::new( + info, + self.scope_delta, + MicroBatchPush::Broadcast(push), + ch, + dfb.worker_id.index, + ); Ok(MaterializedChannel { push, pull: pull.into(), notify: Some(notify) }) } ChannelKind::Aggregate => { let (mut ch_info, pushes, pull, notify) = self.build_remote(scope_level, target, id, dfb)?; ch_info.target_peers = 1; - let push = AggregateBatchPush::new(ch_info, pushes); + let push = AggregateBatchPush::new(ch_info, pushes, dfb.worker_id); let cancel = push.get_cancel_handle(); - let push = - PerChannelPush::new(ch_info, self.scope_delta, MicroBatchPush::Aggregate(push), cancel); + let push = PerChannelPush::new( + ch_info, + self.scope_delta, + MicroBatchPush::Aggregate(push), + cancel, + dfb.worker_id.index, + ); Ok(MaterializedChannel { push, pull: pull.into(), notify: Some(notify) }) } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/aggregate.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/aggregate.rs index 36c6add8455d..179058e38cbb 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/aggregate.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/aggregate.rs @@ -9,7 +9,7 @@ use crate::communication::decorator::exchange::ExchangeByBatchPush; use crate::data::MicroBatch; use crate::data_plane::Push; use crate::errors::IOError; -use crate::Data; +use crate::{Data, WorkerId}; struct ScopedAggregate(PhantomData); @@ -30,14 +30,18 @@ pub struct AggregateBatchPush { } impl AggregateBatchPush { - pub fn new(info: ChannelInfo, pushes: Vec>) -> Self { + pub fn new(info: ChannelInfo, pushes: Vec>, worker_id: WorkerId) -> Self { if info.scope_level == 0 { - let push = ExchangeByBatchPush::new(info, BatchRoute::AllToOne(0), pushes); + let push = ExchangeByBatchPush::new(info, BatchRoute::AllToOne(0), pushes, worker_id); AggregateBatchPush { push } } else { let chancel_handle = DynSingleConsCancelPtr::new(info.scope_level, pushes.len()); - let mut push = - ExchangeByBatchPush::new(info, BatchRoute::Dyn(Box::new(ScopedAggregate::new())), pushes); + let mut push = ExchangeByBatchPush::new( + info, + BatchRoute::Dyn(Box::new(ScopedAggregate::new())), + pushes, + worker_id, + ); push.update_cancel_handle(CancelHandle::DSC(chancel_handle)); AggregateBatchPush { push } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/broadcast.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/broadcast.rs index 6e58b9170cb2..831eddf0a7f6 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/broadcast.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/broadcast.rs @@ -11,12 +11,13 @@ pub struct BroadcastBatchPush { pub ch_info: ChannelInfo, pushes: Vec>, cancel_handle: MultiConsCancelPtr, + total_peers: u32, } impl BroadcastBatchPush { - pub fn new(ch_info: ChannelInfo, pushes: Vec>) -> Self { + pub fn new(ch_info: ChannelInfo, pushes: Vec>, total_peers: u32) -> Self { let cancel_handle = MultiConsCancelPtr::new(ch_info.scope_level, pushes.len()); - BroadcastBatchPush { ch_info, pushes, cancel_handle } + BroadcastBatchPush { ch_info, pushes, cancel_handle, total_peers } } pub(crate) fn get_cancel_handle(&self) -> CancelHandle { @@ -38,14 +39,14 @@ impl BroadcastBatchPush { if let Some(mut end) = batch.take_end() { if end.peers().value() == 1 && end.peers_contains(self.pushes[target].source_worker) { - end.update_peers(DynPeers::all()); + end.update_peers(DynPeers::all(self.total_peers), self.total_peers); batch.set_end(end); self.pushes[target].push(batch)?; } else { if !batch.is_empty() { self.pushes[target].push(batch)?; } - self.pushes[target].sync_end(end, DynPeers::all())?; + self.pushes[target].sync_end(end, DynPeers::all(self.total_peers))?; } } else { self.pushes[target].push(batch)?; diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs index 6bad2ae92196..1c96e3491e85 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs @@ -21,12 +21,13 @@ use crate::event::emitter::EventEmitter; use crate::event::{Event, EventKind}; use crate::progress::{DynPeers, EndOfScope, EndSyncSignal}; use crate::tag::tools::map::TidyTagMap; -use crate::PROFILE_COMM_FLAG; use crate::{Data, Tag}; +use crate::{WorkerId, PROFILE_COMM_FLAG}; #[allow(dead_code)] pub struct EventEmitPush { pub ch_info: ChannelInfo, + pub total_peers: u32, pub source_worker: u32, pub target_worker: u32, inner: GeneralPush>, @@ -38,13 +39,14 @@ pub struct EventEmitPush { #[allow(dead_code)] impl EventEmitPush { pub fn new( - info: ChannelInfo, source_worker: u32, target_worker: u32, push: GeneralPush>, + info: ChannelInfo, worker_id: WorkerId, target_worker: u32, push: GeneralPush>, emitter: EventEmitter, ) -> Self { let push_counts = TidyTagMap::new(info.scope_level); EventEmitPush { ch_info: info, - source_worker, + total_peers: worker_id.total_peers(), + source_worker: worker_id.index, target_worker, inner: push, event_emitter: emitter, @@ -75,14 +77,14 @@ impl EventEmitPush { end.peers(), children ); - end.update_peers(children); + end.update_peers(children, self.total_peers); let end_batch = MicroBatch::last(self.source_worker, end); self.push(end_batch) } else { Ok(()) } } else { - end.update_peers(children); + end.update_peers(children, self.total_peers); let end_batch = MicroBatch::last(self.source_worker, end); self.push(end_batch) } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs index b9ac8d0822d8..0403954c9030 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs @@ -31,7 +31,7 @@ use crate::errors::{IOError, IOErrorKind}; use crate::graph::Port; use crate::progress::{DynPeers, EndOfScope}; use crate::tag::tools::map::TidyTagMap; -use crate::{Data, Tag}; +use crate::{Data, Tag, WorkerId}; struct Exchange { magic: Magic, @@ -61,6 +61,7 @@ pub(crate) struct ExchangeByDataPush { pub src: u32, pub port: Port, index: u32, + total_peers: u32, scope_level: u32, buffers: Vec>, pushes: Vec>, @@ -72,13 +73,13 @@ pub(crate) struct ExchangeByDataPush { impl ExchangeByDataPush { pub(crate) fn new( info: ChannelInfo, router: Box>, buffers: Vec>, - pushes: Vec>, + pushes: Vec>, worker_id: WorkerId, ) -> Self { - let src = crate::worker_id::get_current_worker().index; let len = pushes.len(); let cancel_handle = MultiConsCancelPtr::new(info.scope_level, len); ExchangeByDataPush { - src, + src: worker_id.index, + total_peers: worker_id.total_peers(), port: info.source_port, index: info.index(), scope_level: info.scope_level, @@ -192,7 +193,7 @@ impl ExchangeByDataPush { push_stat.push(pushes); } - let mut weight = DynPeers::all(); + let mut weight = DynPeers::all(self.total_peers); if self.scope_level != 0 { weight = DynPeers::empty(); for (i, p) in push_stat.iter().enumerate() { @@ -344,7 +345,7 @@ impl Push> for ExchangeByDataPush { let mut new_end = end.clone(); new_end.total_send = 0; new_end.global_total_send = 0; - new_end.update_peers(DynPeers::single(self.src)); + new_end.update_peers(DynPeers::single(self.src), self.total_peers); p.push_end(new_end, DynPeers::single(self.src))?; } } else { @@ -375,7 +376,7 @@ impl Push> for ExchangeByDataPush { new_end.total_send = 0; new_end.global_total_send = 0; if end.tag.is_root() { - p.sync_end(new_end, DynPeers::all())?; + p.sync_end(new_end, DynPeers::all(self.total_peers))?; } else { p.sync_end(new_end, DynPeers::empty())?; } @@ -408,7 +409,7 @@ impl Push> for ExchangeByDataPush { let mut new_end = end.clone(); new_end.total_send = 0; new_end.global_total_send = 0; - p.sync_end(new_end, DynPeers::all())?; + p.sync_end(new_end, DynPeers::all(self.total_peers))?; } } } else { @@ -445,7 +446,7 @@ impl Push> for ExchangeByDataPush { if batch.get_seq() == 0 { // multi source; self.pushes[target].push(batch)?; - let children = DynPeers::all(); + let children = DynPeers::all(self.total_peers); for i in 0..self.pushes.len() { let mut new_end = end.clone(); if i != target { @@ -467,7 +468,7 @@ impl Push> for ExchangeByDataPush { new_end.global_total_send = g; if i == target { let mut batch = std::mem::replace(&mut batch, MicroBatch::empty()); - new_end.update_peers(children); + new_end.update_peers(children, self.total_peers); batch.set_end(new_end); self.pushes[i].push(batch)?; } else { @@ -596,6 +597,7 @@ impl BlockPush for ExchangeByDataPush { pub struct ExchangeByBatchPush { pub ch_info: ChannelInfo, src: u32, + total_peers: u32, scope_level: u32, pushes: Vec>, magic: Magic, @@ -604,17 +606,27 @@ pub struct ExchangeByBatchPush { } impl ExchangeByBatchPush { - pub fn new(ch_info: ChannelInfo, route: BatchRoute, pushes: Vec>) -> Self { + pub fn new( + ch_info: ChannelInfo, route: BatchRoute, pushes: Vec>, worker_id: WorkerId, + ) -> Self { let len = pushes.len(); let magic = Magic::new(len); - let src = crate::worker_id::get_current_worker().index; let cancel_handle = match route { BatchRoute::AllToOne(t) => CancelHandle::SC(SingleConsCancel::new(t)), BatchRoute::Dyn(_) => CancelHandle::MC(MultiConsCancelPtr::new(ch_info.scope_level, len)), }; let scope_level = ch_info.scope_level; - ExchangeByBatchPush { ch_info, src, scope_level, pushes, magic, route, cancel_handle } + ExchangeByBatchPush { + ch_info, + src: worker_id.index, + total_peers: worker_id.total_peers(), + scope_level, + pushes, + magic, + route, + cancel_handle, + } } pub(crate) fn update_cancel_handle(&mut self, cancel_handle: CancelHandle) { @@ -639,7 +651,7 @@ impl ExchangeByBatchPush { push_stat.push(pushes); } - let mut weight = DynPeers::all(); + let mut weight = DynPeers::all(self.total_peers); if self.scope_level != 0 { weight = DynPeers::empty(); for (i, p) in push_stat.iter().enumerate() { @@ -674,7 +686,7 @@ impl ExchangeByBatchPush { let mut new_end = end.clone(); new_end.total_send = 0; new_end.global_total_send = 0; - new_end.update_peers(DynPeers::single(self.src)); + new_end.update_peers(DynPeers::single(self.src), self.total_peers); p.push_end(new_end, DynPeers::single(self.src))?; } } else { @@ -744,7 +756,7 @@ impl ExchangeByBatchPush { } } - end.update_peers(DynPeers::single(target as u32)); + end.update_peers(DynPeers::single(target as u32), self.total_peers); end.total_send = total_send; end.global_total_send = total_send; batch.set_end(end); @@ -758,7 +770,7 @@ impl ExchangeByBatchPush { new_end.global_total_send = g; if i == target { let mut batch = std::mem::replace(&mut batch, MicroBatch::empty()); - new_end.update_peers(c); + new_end.update_peers(c, self.total_peers); batch.set_end(new_end); self.pushes[i].push(batch)?; } else { @@ -852,7 +864,7 @@ impl Push> for ExchangeByBatchPush { let mut new_end = end.clone(); new_end.total_send = 0; new_end.global_total_send = 0; - p.sync_end(new_end, DynPeers::all())?; + p.sync_end(new_end, DynPeers::all(self.total_peers))?; } } } else { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs index 695baf6da79a..ee1824dc228a 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs @@ -27,6 +27,7 @@ use crate::progress::EndOfScope; use crate::tag::tools::map::TidyTagMap; use crate::PROFILE_COMM_FLAG; use crate::{Data, Tag}; + pub mod aggregate; pub mod broadcast; pub mod evented; @@ -76,9 +77,8 @@ pub struct LocalMicroBatchPush { } impl LocalMicroBatchPush { - pub fn new(ch_info: ChannelInfo, push: ThreadPush>) -> Self { + pub fn new(ch_info: ChannelInfo, push: ThreadPush>, worker_index: u32) -> Self { let push_counts = TidyTagMap::new(ch_info.scope_level); - let worker_index = crate::worker_id::get_current_worker().index; LocalMicroBatchPush { ch_info, src: worker_index, inner: push, push_counts } } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/input.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/input.rs index 3fa4c4768e21..c6efee5f4c35 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/input.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/input.rs @@ -31,7 +31,7 @@ use crate::event::emitter::EventEmitter; use crate::event::{Event, EventKind}; use crate::progress::EndOfScope; use crate::tag::tools::map::TidyTagMap; -use crate::{Data, Tag}; +use crate::{Data, Tag, WorkerId}; pub struct InputBlockGuard { pub tag: Tag, @@ -55,11 +55,13 @@ pub struct InputHandle { // scope skip manager: cancel: TidyTagMap<()>, parent_cancel: AHashSet, + worker_id: WorkerId, } impl InputHandle { pub fn new( ch_info: ChannelInfo, pull: GeneralPull>, event_emitter: EventEmitter, + worker_id: WorkerId, ) -> Self { let scope_level = ch_info.scope_level; InputHandle { @@ -72,6 +74,7 @@ impl InputHandle { event_emitter, cancel: TidyTagMap::new(scope_level), parent_cancel: AHashSet::new(), + worker_id, } } @@ -410,7 +413,7 @@ impl InputHandle { } pub fn propagate_cancel(&mut self, tag: &Tag) { - let source = crate::worker_id::get_current_worker().index; + let source = self.worker_id.index; let ch = self.ch_info.id.index; let event = Event::new(source, self.ch_info.source_port, EventKind::Cancel((ch, tag.clone()))); let result = if self.ch_info.source_peers > 1 { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/mod.rs index 9f7802bce330..43434e09a869 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/mod.rs @@ -21,7 +21,7 @@ use crate::data_plane::GeneralPull; use crate::errors::IOResult; use crate::event::emitter::EventEmitter; use crate::progress::EndOfScope; -use crate::{Data, Tag}; +use crate::{Data, Tag, WorkerId}; /// Input abstraction without data type; pub trait InputProxy: AsAny + Send { @@ -46,8 +46,9 @@ pub use session::InputSession; #[inline] pub(crate) fn new_input( ch_info: ChannelInfo, pull: GeneralPull>, event_emitter: &EventEmitter, + worker_id: WorkerId, ) -> Box { - let input = InputHandle::new(ch_info, pull, event_emitter.clone()); + let input = InputHandle::new(ch_info, pull, event_emitter.clone(), worker_id); Box::new(RefWrapInput::wrap(input)) as Box } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/mod.rs index 7e03c5e19cb3..b5950369b142 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/mod.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, LinkedList}; use crate::data_plane::ChannelResource; use crate::errors::{BuildJobError, IOError}; -use crate::{Data, JobConf}; +use crate::{Data, JobConf, WorkerId}; mod buffer; pub(crate) mod cancel; @@ -27,6 +27,7 @@ pub(crate) mod channel; pub(crate) mod decorator; pub(crate) mod input; pub(crate) mod output; + pub use channel::Channel; use crate::channel_id::ChannelId; @@ -67,9 +68,8 @@ impl Magic { } pub(crate) fn build_channel( - ch_id: ChannelId, conf: &JobConf, + ch_id: ChannelId, conf: &JobConf, worker_id: WorkerId, ) -> Result, BuildJobError> { - let worker_id = crate::worker_id::get_current_worker(); let ch = CHANNEL_RESOURCES.with(|res| { let mut map = res.borrow_mut(); map.get_mut(&ch_id) diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/builder.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/builder.rs index 1e0d29f5ad1f..0e4aaed502ce 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/builder.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/builder.rs @@ -159,6 +159,7 @@ impl OutputBuilder for OutputBuilderImpl { if let Some(main_push) = main_push { let meta = self.meta.borrow(); + let src = main_push.src; let mut tee = Tee::::new(meta.port, meta.scope_level, main_push); for p in shared.iter_mut() { if let Some(push) = p.take() { @@ -166,7 +167,7 @@ impl OutputBuilder for OutputBuilderImpl { } } - let output = OutputHandle::new(*meta, tee); + let output = OutputHandle::new(*meta, tee, src); Some(Box::new(RefWrapOutput::wrap(output)) as Box) } else { None diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/output.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/output.rs index eccba4698cf4..9d82750e89df 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/output.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/output.rs @@ -53,7 +53,7 @@ pub struct OutputHandle { } impl OutputHandle { - pub(crate) fn new(meta: OutputMeta, output: Tee) -> Self { + pub(crate) fn new(meta: OutputMeta, output: Tee, src: u32) -> Self { let batch_capacity = meta.batch_capacity as usize; let scope_level = meta.scope_level; debug_worker!( @@ -63,7 +63,6 @@ impl OutputHandle { batch_capacity ); let buf_pool = ScopeBufferPool::new(meta.batch_size, batch_capacity, scope_level); - let src = crate::worker_id::get_current_worker().index; let parent_level = if scope_level == 0 { 0 } else { scope_level - 1 }; OutputHandle { port: meta.port, diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/tee.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/tee.rs index e1e5e92ecbf3..49f17c564a1e 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/tee.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/output/tee.rs @@ -154,9 +154,8 @@ pub(crate) struct PerChannelPush { impl PerChannelPush { pub(crate) fn new( - ch_info: ChannelInfo, delta: MergedScopeDelta, push: MicroBatchPush, ch: CancelHandle, + ch_info: ChannelInfo, delta: MergedScopeDelta, push: MicroBatchPush, ch: CancelHandle, src: u32, ) -> Self { - let src = crate::worker_id::get_current_worker().index; let cancel_handle = ChannelCancelPtr::new(ch_info.scope_level, delta.clone(), ch); let re_seq = TidyTagMap::new(ch_info.scope_level); PerChannelPush { ch_info, src, delta, push, cancel_handle, re_seq } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs index c146d48bf9b7..818e6d5cf10f 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs @@ -85,7 +85,7 @@ impl DataflowBuilder { let index = self.operators.borrow().len() + 1; let info = OperatorInfo::new(name, index, scope_level); let core = Box::new(construct(&info)); - let op_b = OperatorBuilder::new(info, GeneralOperator::Simple(core)); + let op_b = OperatorBuilder::new(info, GeneralOperator::Simple(core), self.worker_id); self.operators.borrow_mut().push(op_b); OperatorRef::new(index, self.operators.clone(), self.config.clone()) } @@ -100,7 +100,7 @@ impl DataflowBuilder { let index = self.operators.borrow().len() + 1; let info = OperatorInfo::new(name, index, scope_level); let core = Box::new(construct(&info)); - let op_b = OperatorBuilder::new(info, GeneralOperator::Notifiable(core)); + let op_b = OperatorBuilder::new(info, GeneralOperator::Notifiable(core), self.worker_id); self.operators.borrow_mut().push(op_b); OperatorRef::new(index, self.operators.clone(), self.config.clone()) } @@ -131,19 +131,24 @@ impl DataflowBuilder { let mut op_names = vec![]; op_names.push("root".to_owned()); let mut depends = Dependency::default(); - sch.add_schedule_op(0, 0, vec![], vec![]); + sch.add_schedule_op(self.worker_id, 0, 0, vec![], vec![]); let sinks = self.sinks.replace(vec![]); depends.set_sinks(sinks); for e in self.edges.borrow().iter() { depends.add(e); } - for (i, mut op_b) in builds.drain(..).enumerate() { let op_index = op_b.index(); assert_eq!(i + 1, op_index, "{:?}", op_b.info); let inputs_notify = op_b.take_inputs_notify(); let outputs_cancel = op_b.build_outputs_cancel(); - sch.add_schedule_op(op_index, op_b.info.scope_level, inputs_notify, outputs_cancel); + sch.add_schedule_op( + self.worker_id, + op_index, + op_b.info.scope_level, + inputs_notify, + outputs_cancel, + ); let op = op_b.build(); op_names.push(op.info.name.clone()); if report { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index cbd49ff95da6..5f0412875f9c 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -25,6 +25,7 @@ extern crate core; use std::cell::Cell; use std::sync::atomic::AtomicUsize; +use std::sync::Once; use std::sync::{Arc, Mutex, RwLock}; mod config; @@ -56,7 +57,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Once; pub use config::{read_from, Configuration, JobConf, ServerConf}; pub use data::Data; diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/any.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/any.rs index 9fd27bddba02..733f8c6b2a20 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/any.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/any.rs @@ -39,6 +39,7 @@ fn has_any(mut stream: Stream) -> Result, BuildJobE stream .set_upstream_batch_capacity(1) .set_upstream_batch_size(1); + let worker = stream.get_worker_id().index; let x = stream.unary("any_global", |info| { let mut any_map = TidyTagMap::<()>::new(info.scope_level); move |input, output| { @@ -60,7 +61,6 @@ fn has_any(mut stream: Stream) -> Result, BuildJobE if let Some(end) = batch.take_end() { if any_map.remove(batch.tag()).is_none() { - let worker = crate::worker_id::get_current_worker().index; if end.peers_contains(worker) { output .new_session(batch.tag())? diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/correlate.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/correlate.rs index 9ae7e67bbf62..c6cca1807608 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/correlate.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/correlate.rs @@ -31,7 +31,7 @@ use crate::operator::{Notifiable, OperatorCore}; use crate::progress::{DynPeers, EndOfScope}; use crate::stream::{Single, SingleItem, Stream}; use crate::tag::tools::map::TidyTagMap; -use crate::{BuildJobError, Data, Tag}; +use crate::{BuildJobError, Data, Tag, WorkerId}; impl CorrelatedSubTask for Stream { fn apply(self, func: F) -> Result, BuildJobError> @@ -47,13 +47,15 @@ impl CorrelatedSubTask for Stream { T: Data, F: FnOnce(Stream) -> Result, BuildJobError>, { + let worker_id = self.get_worker_id(); + let total_peers = worker_id.total_peers(); let entered = self.enter()?; let scope_level = entered.get_scope_level(); let fork_guard = UnsafeRcPtr::new(RefCell::new(TidyTagMap::new(scope_level - 1))); let join_guard = fork_guard.clone(); let (main, mut sub): (Stream, Stream) = entered .binary_branch_notify("fork_subtask", |info| { - ForkSubtaskOperator::::new(info.scope_level, max_parallel, fork_guard) + ForkSubtaskOperator::::new(info.scope_level, max_parallel, fork_guard, worker_id) })?; sub.set_upstream_batch_capacity(1) .set_upstream_batch_size(1); @@ -62,7 +64,7 @@ impl CorrelatedSubTask for Stream { .set_upstream_batch_capacity(1) .set_upstream_batch_size(1); main.union_transform_notify("zip_subtasks", inner, move |info| { - ZipSubtaskOperator::::new(info.scope_level, join_guard) + ZipSubtaskOperator::::new(info.scope_level, join_guard, total_peers) })? .leave() } @@ -114,8 +116,10 @@ struct ForkSubtaskOperator { } impl ForkSubtaskOperator { - fn new(scope_level: u32, max_parallel: u32, fork_guard: UnsafeRcPtr>>) -> Self { - let id = crate::worker_id::get_current_worker(); + fn new( + scope_level: u32, max_parallel: u32, fork_guard: UnsafeRcPtr>>, + id: WorkerId, + ) -> Self { ForkSubtaskOperator { worker_index: id.index, peers: id.total_peers(), @@ -248,8 +252,7 @@ struct ZipSubtaskOperator { } impl ZipSubtaskOperator { - fn new(scope_level: u32, zip_guard: UnsafeRcPtr>>) -> Self { - let peers = crate::worker_id::get_current_worker().total_peers(); + fn new(scope_level: u32, zip_guard: UnsafeRcPtr>>, peers: u32) -> Self { let mut parent_parent_ends = Vec::with_capacity(scope_level as usize - 1); for _ in 0..scope_level - 1 { parent_parent_ends.push(vec![]); diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/count.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/count.rs index 4fc085c864d2..b6bea0e3973a 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/count.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/count.rs @@ -6,6 +6,8 @@ use crate::{BuildJobError, Data}; impl Count for Stream { fn count(self) -> Result, BuildJobError> { + let worker_id = self.get_worker_id().index; + let total_peers = self.get_worker_id().total_peers(); if self.get_partitions() > 1 { let mut stream = self.unary("count_local", |info| { let mut table = TidyTagMap::::new(info.scope_level); @@ -22,10 +24,9 @@ impl Count for Stream { if end.tag.len() > 0 { let mut new_end = end.clone(); let mut new_peers = end.peers().clone(); - let owner_index = batch.tag.current_uncheck() - % crate::worker_id::get_current_worker().total_peers(); + let owner_index = batch.tag.current_uncheck() % total_peers; new_peers.add_source(owner_index); - new_end.update_peers(new_peers); + new_end.update_peers(new_peers, total_peers); session.give_last(cnt, new_end)?; } else { session.give_last(cnt, end)?; @@ -43,28 +44,26 @@ impl Count for Stream { if end.tag.len() > 0 { let mut new_end = end.clone(); let mut new_peers = end.peers().clone(); - let owner_index = batch.tag.current_uncheck() - % crate::worker_id::get_current_worker().total_peers(); + let owner_index = batch.tag.current_uncheck() % total_peers; new_peers.add_source(owner_index); - new_end.update_peers(new_peers); + new_end.update_peers(new_peers, total_peers); session.give_last(cnt, new_end)?; } else { session.give_last(cnt, end)?; } } else { - let worker = crate::worker_id::get_current_worker().index; + let worker = worker_id; let new_end = if end.tag.len() > 0 { let mut new_end = end.clone(); let mut new_peers = end.peers().clone(); - let owner_index = batch.tag.current_uncheck() - % crate::worker_id::get_current_worker().total_peers(); + let owner_index = batch.tag.current_uncheck() % total_peers; new_peers.add_source(owner_index); - new_end.update_peers(new_peers); + new_end.update_peers(new_peers, total_peers); new_end } else { end }; - if new_end.contains_source(worker) { + if new_end.contains_source(worker, total_peers) { let mut session = output.new_session(&batch.tag)?; trace_worker!("local count {} of {:?}", 0, batch.tag); session.give_last(0, new_end)?; @@ -107,8 +106,8 @@ impl Count for Stream { trace_worker!("emit global count = {} of {:?};", sum, end.tag); session.give_last(Single(sum), end)?; } else { - let index = crate::worker_id::get_current_worker().index; - if end.contains_source(index) { + let index = worker_id; + if end.contains_source(index, total_peers) { let mut session = output.new_session(&batch.tag)?; trace_worker!("emit global count = {} of {:?};", 0, end.tag); session.give_last(Single(0), end)?; @@ -149,8 +148,8 @@ impl Count for Stream { trace_worker!("global count {} of {:?}", cnt, batch.tag); session.give_last(Single(cnt), end)?; } else { - let worker = crate::worker_id::get_current_worker().index; - if end.contains_source(worker) { + let worker = worker_id; + if end.contains_source(worker, total_peers) { let mut session = output.new_session(&batch.tag)?; trace_worker!("global count {} of {:?}", 0, batch.tag); session.give_last(Single(0), end)?; diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/fold.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/fold.rs index f7e8b013514d..6058e01e3552 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/fold.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/fold.rs @@ -14,6 +14,8 @@ impl Fold for Stream { F: FnMut(B, D) -> FnResult + Send + 'static, C: Fn() -> F + Send + 'static, { + let worker = self.get_worker_id().index; + let total_peers = self.get_worker_id().total_peers(); let s = self.aggregate().unary("fold", |info| { let mut table = TidyTagMap::<(B, F)>::new(info.scope_level); move |input, output| { @@ -44,9 +46,8 @@ impl Fold for Stream { } else { // decide if it need to output a default value when upstream is empty; // but only one default value should be output; - let worker = crate::worker_id::get_current_worker().index; if (end.tag.is_root() && worker == 0) - || (!end.tag.is_root() && end.contains_source(worker)) + || (!end.tag.is_root() && end.contains_source(worker, total_peers)) { let mut session = output.new_session(&batch.tag)?; session.give_last(Single(init.clone()), end)? @@ -68,6 +69,8 @@ impl Fold for Stream { F: FnMut(B, D) -> FnResult + Send + 'static, C: Fn() -> F + Send + 'static, { + let worker = self.get_worker_id().index; + let total_peers = self.get_worker_id().total_peers(); let s = self.unary("fold_partition", |info| { let mut table = TidyTagMap::<(B, F)>::new(info.scope_level); move |input, output| { @@ -88,8 +91,9 @@ impl Fold for Stream { let mut session = output.new_session(&batch.tag)?; session.give_last(Single(accum), end)?; } else { - let worker = crate::worker_id::get_current_worker().index; - if end.tag.is_root() || (!end.tag.is_root() && end.contains_source(worker)) { + if end.tag.is_root() + || (!end.tag.is_root() && end.contains_source(worker, total_peers)) + { let mut session = output.new_session(&batch.tag)?; session.give_last(Single(init.clone()), end)? } else { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/keyed/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/keyed/mod.rs index 44deccb5048f..f2e28b6e280d 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/keyed/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/keyed/mod.rs @@ -36,7 +36,7 @@ impl KeyBy for Stream { impl PartitionByKey for Stream { fn partition_by_key(self) -> Stream { - let job_id = crate::worker_id::get_current_worker().job_id; + let job_id = self.get_worker_id().job_id; let bh = ahash::RandomState::with_seeds(job_id, job_id & 3, job_id & 7, job_id & 15); let router = KeyRouter::new(bh); diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/feedback.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/feedback.rs index 8e5a34f14b03..44886d04d0b2 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/feedback.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/feedback.rs @@ -16,8 +16,7 @@ pub(crate) struct FeedbackOperator { } impl FeedbackOperator { - pub fn new(scope_level: u32, max_iters: u32) -> Self { - let worker_index = crate::worker_id::get_current_worker().index; + pub fn new(scope_level: u32, max_iters: u32, worker_index: u32) -> Self { FeedbackOperator { scope_level, worker_index, diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs index 40465ff63ee7..9d907cb08b39 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs @@ -55,6 +55,7 @@ where F: FnOnce(Stream) -> Result, BuildJobError>, { let max_iters = until.max_iters; + let worker_index = stream.get_worker_id().index; let (mut leave, enter) = stream .enter()? .binary_branch_notify("switch", |info| { @@ -65,7 +66,7 @@ where let feedback: Stream = after_body .sync_state() .transform_notify("feedback", move |info| { - FeedbackOperator::::new(info.scope_level, max_iters) + FeedbackOperator::::new(info.scope_level, max_iters, worker_index) })?; let feedback_partitions = feedback.get_partitions(); feedback.feedback_to(index)?; diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs index 91c37f96f1b1..e366ac9b7bb6 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs @@ -33,7 +33,7 @@ use crate::progress::EndOfScope; use crate::schedule::state::inbound::InputEndNotify; use crate::schedule::state::outbound::OutputCancelState; use crate::tag::tools::map::TidyTagMap; -use crate::{Data, Tag}; +use crate::{Data, Tag, WorkerId}; use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG}; pub trait Notifiable: Send + 'static { @@ -426,11 +426,19 @@ pub struct OperatorBuilder { inputs_notify: Vec>>, outputs: Vec>, core: GeneralOperator, + worker_id: WorkerId, } impl OperatorBuilder { - pub fn new(meta: OperatorInfo, core: GeneralOperator) -> Self { - OperatorBuilder { info: meta, inputs: vec![], inputs_notify: vec![], outputs: vec![], core } + pub fn new(meta: OperatorInfo, core: GeneralOperator, worker_id: WorkerId) -> Self { + OperatorBuilder { + info: meta, + inputs: vec![], + inputs_notify: vec![], + outputs: vec![], + core, + worker_id, + } } pub fn index(&self) -> usize { @@ -442,7 +450,7 @@ impl OperatorBuilder { notify: Option>>, event_emitter: &EventEmitter, ) { assert_eq!(ch_info.target_port.port, self.inputs.len()); - let input = new_input(ch_info, pull, event_emitter); + let input = new_input(ch_info, pull, event_emitter, self.worker_id); self.inputs.push(input); let n = notify.map(|p| Box::new(p) as Box); self.inputs_notify.push(n); diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/primitives/unary.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/primitives/unary.rs index dbe209ec0d29..628e9e74420c 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/primitives/unary.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/primitives/unary.rs @@ -63,7 +63,7 @@ impl Unary for Stream { { self.transform(name, |info| { let func = construct(info); - Box::new(UnaryOperator::new(func)) + UnaryOperator::new(func) }) } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/progress.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/progress.rs index 3e6d9e2c6274..878e0f51a20e 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/progress.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/progress.rs @@ -81,8 +81,7 @@ impl DynPeers { DynPeers { mask: PeerSet::Partial(set) } } - pub fn all() -> Self { - let peers = crate::worker_id::get_current_worker().total_peers(); + pub fn all(peers: u32) -> Self { DynPeers { mask: PeerSet::All(peers) } } @@ -245,14 +244,9 @@ impl EndOfScope { self.global_total_send += other.global_total_send; } - pub fn update_peers(&mut self, mut peers: DynPeers) { + pub fn update_peers(&mut self, mut peers: DynPeers, total_peers: u32) { if peers.value() == 0 { - let owner = if self.tag.len() == 0 { - 0 - } else { - let peers = crate::worker_id::get_current_worker().total_peers(); - self.tag.current_uncheck() % peers - }; + let owner = if self.tag.len() == 0 { 0 } else { self.tag.current_uncheck() % total_peers }; peers = DynPeers::single(owner); } trace_worker!("update peers from {:?} to {:?} of scope {:?}", self.peers, peers, self.tag); @@ -267,11 +261,10 @@ impl EndOfScope { &self.peers } - pub(crate) fn contains_source(&self, src: u32) -> bool { + pub(crate) fn contains_source(&self, src: u32, total_peers: u32) -> bool { if !self.tag.is_root() { // get owner worker index of subtask - let owner_index = - self.tag.current_uncheck() % crate::worker_id::get_current_worker().total_peers(); + let owner_index = self.tag.current_uncheck() % total_peers; if owner_index != src { // current worker has input self.peers.value() > 0 && self.peers.contains_source(src) diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/mod.rs index 69a57569f15e..8e21107499d7 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/mod.rs @@ -23,6 +23,7 @@ use crate::event::Event; use crate::schedule::operator::OperatorScheduler; use crate::schedule::state::inbound::InputEndNotify; use crate::schedule::state::outbound::OutputCancelState; +use crate::WorkerId; pub(crate) mod operator; pub(crate) mod state; @@ -59,10 +60,11 @@ impl Schedule { } pub fn add_schedule_op( - &mut self, index: usize, scope_level: u32, inputs_notify: Vec>>, + &mut self, worker_id: WorkerId, index: usize, scope_level: u32, + inputs_notify: Vec>>, outputs_cancel: Vec>, ) { - let op = OperatorScheduler::new(index, scope_level, inputs_notify, outputs_cancel); + let op = OperatorScheduler::new(worker_id, index, scope_level, inputs_notify, outputs_cancel); self.sch_ops.push(op); } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/operator.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/operator.rs index a2822dd2a76d..14e1f64b25ac 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/operator.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/operator.rs @@ -5,10 +5,11 @@ use crate::event::{Event, EventKind}; use crate::graph::Port; use crate::schedule::state::inbound::{InboundStreamState, InputEndNotify}; use crate::schedule::state::outbound::OutputCancelState; -use crate::Tag; +use crate::{Tag, WorkerId}; pub struct OperatorScheduler { pub index: usize, + worker_id: WorkerId, inputs_notify: Vec>, outputs_cancel: Vec>, discards: VecDeque<(Port, Tag)>, @@ -16,21 +17,28 @@ pub struct OperatorScheduler { impl OperatorScheduler { pub fn new( - index: usize, scope_level: u32, inputs_notify: Vec>>, + worker_id: WorkerId, index: usize, scope_level: u32, + inputs_notify: Vec>>, outputs_cancel: Vec>, ) -> Self { let mut input_events = Vec::with_capacity(inputs_notify.len()); for (i, notify) in inputs_notify.into_iter().enumerate() { if let Some(notify) = notify { let port = Port::new(index, i); - let state = InboundStreamState::new(port, scope_level, notify); + let state = InboundStreamState::new(port, worker_id, scope_level, notify); input_events.push(Some(state)); } else { input_events.push(None); } } - OperatorScheduler { index, inputs_notify: input_events, outputs_cancel, discards: VecDeque::new() } + OperatorScheduler { + index, + worker_id, + inputs_notify: input_events, + outputs_cancel, + discards: VecDeque::new(), + } } pub fn accept(&mut self, event: Event) -> IOResult<()> { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/state/inbound.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/state/inbound.rs index a3a788554918..a2c8f7b2dd13 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/state/inbound.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/schedule/state/inbound.rs @@ -4,7 +4,7 @@ use crate::data_plane::{GeneralPush, Push}; use crate::graph::Port; use crate::progress::{DynPeers, EndOfScope, EndSyncSignal}; use crate::tag::tools::map::TidyTagMap; -use crate::{Data, Tag}; +use crate::{Data, Tag, WorkerId}; #[allow(dead_code)] struct ScopeEndPanel { @@ -34,7 +34,7 @@ impl ScopeEndPanel { } } - fn merge(&mut self, src: u32, end: EndSyncSignal) -> Option { + fn merge(&mut self, src: u32, end: EndSyncSignal, total_peers: u32) -> Option { let (end, children) = end.take(); assert_eq!(end.tag, self.tag); assert_eq!(end.peers(), &self.expect_src); @@ -49,7 +49,7 @@ impl ScopeEndPanel { let mut new_end = end.clone(); new_end.total_send = self.count; new_end.global_total_send = self.global_count; - new_end.update_peers(src); + new_end.update_peers(src, total_peers); Some(new_end) } else { None @@ -82,18 +82,19 @@ impl InputEndNotify for GeneralPush> { pub struct InboundStreamState { port: Port, + worker_id: WorkerId, scope_level: u32, notify_guards: Vec>, notify: Box, } impl InboundStreamState { - pub fn new(port: Port, scope_level: u32, notify: Box) -> Self { + pub fn new(port: Port, worker_id: WorkerId, scope_level: u32, notify: Box) -> Self { let mut notify_guards = Vec::new(); for i in 0..scope_level + 1 { notify_guards.push(TidyTagMap::new(i)); } - InboundStreamState { port, scope_level, notify_guards, notify } + InboundStreamState { port, worker_id, scope_level, notify_guards, notify } } pub fn on_end(&mut self, src: u32, end: EndSyncSignal) -> IOResult<()> { @@ -107,7 +108,7 @@ impl InboundStreamState { end.peers(), child ); - end.update_peers(child); + end.update_peers(child, self.worker_id.total_peers()); return self.notify.notify(end); } @@ -131,7 +132,7 @@ impl InboundStreamState { // } if let Some(mut p) = self.notify_guards[idx].remove(end.tag()) { - if let Some(e) = p.merge(src, end) { + if let Some(e) = p.merge(src, end, self.worker_id.total_peers()) { trace_worker!( "input[{:?}] get end of {:?}, total pushed {} to me, global pushed {} to {:?}", self.port, diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs index f673281f7157..80e65b4664f4 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs @@ -370,6 +370,7 @@ impl Stream { let mut op = self .builder .add_operator(name, self.get_scope_level(), builder); + let edge = self.connect(&mut op)?; self.builder.add_edge(edge); Ok(op) diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs index d0d76dd4429b..d313a823b9ce 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs @@ -61,7 +61,6 @@ impl Worker { if peer_guard.fetch_add(1, Ordering::SeqCst) == 0 { pegasus_memory::alloc::new_task(conf.job_id as usize); } - Worker { conf: conf.clone(), id, @@ -83,8 +82,11 @@ impl Worker { { // set current worker's id into tls variable to make it accessible at anywhere; let _g = crate::worker_id::guard(self.id); - let resource = - crate::communication::build_channel::(ChannelId::new(self.id.job_id, 0), &self.conf)?; + let resource = crate::communication::build_channel::( + ChannelId::new(self.id.job_id, 0), + &self.conf, + self.id, + )?; if resource.ch_id.index != 0 { return Err(BuildJobError::InternalError(String::from("Event channel index must be 0"))); } @@ -117,7 +119,7 @@ impl Worker { let root = Box::new(root_builder) .build() .expect("no output;"); - let end = EndOfScope::new(Tag::Root, DynPeers::all(), 0, 0); + let end = EndOfScope::new(Tag::Root, DynPeers::all(self.id.total_peers()), 0, 0); root.notify_end(end).ok(); root.close().ok(); Ok(()) @@ -134,7 +136,7 @@ impl Worker { .insert(key, Box::new(resource)); } - fn check_cancel(&mut self) -> bool { + fn check_cancel(&self) -> bool { if self.conf.time_limit > 0 { let elapsed = self.start.elapsed().as_millis() as u64; if elapsed >= self.conf.time_limit { @@ -242,6 +244,7 @@ impl Task for Worker { self.span .set_status(trace::Status::error("Job is canceled")); self.span.end(); + self.sink.set_cancel_hook(true); return TaskState::Finished; } @@ -266,7 +269,6 @@ impl Task for Worker { .set_attribute(KeyValue::new("used_ms", elapsed.to_string())); self.span.set_status(trace::Status::Ok); self.span.end(); - // if this is last worker, return Finished if self.peer_guard.fetch_sub(1, Ordering::SeqCst) == 1 { state