Skip to content

Commit

Permalink
refactor: removed mutexes in PacketStream, made Invocation state erro…
Browse files Browse the repository at this point in the history
…r-proof
  • Loading branch information
jsoverson committed Sep 12, 2023
1 parent 935a7f3 commit 33b6785
Show file tree
Hide file tree
Showing 39 changed files with 435 additions and 352 deletions.
11 changes: 6 additions & 5 deletions crates/components/wick-http-client/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ impl Component for HttpClientComponent {
let config = self.config.clone();
let baseurl = self.base.clone();
let codec = config.codec().copied();
let opdef = get_op_by_name(&config, invocation.target.operation_id());
let opdef = get_op_by_name(&config, invocation.target().operation_id());
let path_template = opdef
.as_ref()
.and_then(|op| self.path_templates.get(op.name()).cloned());
let client = self.client.clone();

Box::pin(async move {
let (tx, rx) = invocation.make_response();
let span = invocation.span.clone();
let span = invocation.span().clone();
let fut = handle(
opdef,
tx.clone(),
Expand Down Expand Up @@ -134,7 +134,7 @@ fn get_op_by_name(config: &HttpClientComponentConfig, name: &str) -> Option<Http
async fn handle(
opdef: Option<HttpClientOperationDefinition>,
tx: FluxChannel<Packet, wick_packet::Error>,
mut invocation: Invocation,
invocation: Invocation,
root_config: Option<RuntimeConfig>,
op_config: Option<RuntimeConfig>,
codec: Option<Codec>,
Expand All @@ -146,14 +146,15 @@ async fn handle(
return Err(Error::InvalidBaseUrl(baseurl).into());
}
let Some(opdef) = opdef else {
return Err(Error::OpNotFound(invocation.target.operation_id().to_owned()).into());
return Err(Error::OpNotFound(invocation.target().operation_id().to_owned()).into());
};
// Defer to operation codec, then to client codec, then to default.
let codec = opdef.codec().copied().unwrap_or(codec.unwrap_or_default());
let template = path_template.unwrap();

let input_list: Vec<_> = opdef.inputs().iter().map(|i| i.name.clone()).collect();
let mut inputs = wick_packet::StreamMap::from_stream(invocation.eject_stream(), input_list);
let (invocation, stream) = invocation.split();
let mut inputs = wick_packet::StreamMap::from_stream(stream, input_list);
let mut handles = Vec::new();

'outer: loop {
Expand Down
37 changes: 5 additions & 32 deletions crates/components/wick-sql/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,34 +74,6 @@ impl DatabaseProvider for Client {
self.inner().get_connection().await
}
}
// pub(crate) struct Transaction<'a>(Arc<dyn ClientConnection + Sync + Send + 'a>);

// impl<'a> Transaction<'a> {
// fn new(conn: Arc<dyn ClientConnection + Sync + Send + 'a>) -> Self {
// Self(conn)
// }
// fn end_transaction(&self) -> BoxFuture<Result<(), Error>> {
// Box::pin(async move { Ok(()) })
// }
// }

// #[async_trait::async_trait]
// impl<'a> ClientConnection for Transaction<'a> {
// async fn query(&mut self, stmt: &str, bound_args: Vec<SqlWrapper>) -> Result<BoxStream<Result<Value, Error>>, Error> {
// self.0.query(stmt, bound_args).await
// }
// async fn exec(&mut self, stmt: &str, bound_args: Vec<SqlWrapper>) -> Result<(), Error> {
// self.0.exec(stmt, bound_args).await
// }

// async fn handle_error(&mut self, e: Error, behavior: ErrorBehavior) -> Result<(), Error> {
// self.0.handle_error(e, behavior).await
// }

// async fn finish(&mut self) -> Result<(), Error> {
// todo!()
// }
// }

/// The Azure SQL Wick component.
#[derive(Clone)]
Expand Down Expand Up @@ -159,23 +131,24 @@ impl SqlComponent {
impl Component for SqlComponent {
fn handle(
&self,
mut invocation: Invocation,
invocation: Invocation,
_data: Option<RuntimeConfig>, // TODO: this needs to be used
_callback: Arc<RuntimeCallback>,
) -> BoxFuture<Result<PacketStream, ComponentError>> {
let client = self.provider.clone();
let opdef = self
.config
.get_operation(invocation.target.operation_id())
.ok_or_else(|| Error::MissingOperation(invocation.target.operation_id().to_owned()))
.get_operation(invocation.target().operation_id())
.ok_or_else(|| Error::MissingOperation(invocation.target().operation_id().to_owned()))
.cloned();

Box::pin(async move {
let opdef = opdef?;
let stmt = client.get_statement(opdef.name()).unwrap().to_owned();

let input_names: Vec<_> = opdef.inputs().iter().map(|i| i.name.clone()).collect();
let input_streams = wick_packet::split_stream(invocation.eject_stream(), input_names);
let (invocation, stream) = invocation.split();
let input_streams = wick_packet::split_stream(stream, input_names);
let (tx, rx) = invocation.make_response();
tokio::spawn(async move {
let start = SystemTime::now();
Expand Down
28 changes: 16 additions & 12 deletions crates/wick/flow-graph-interpreter/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,38 +239,42 @@ impl Component for Interpreter {
}
hosted
};
let span = invocation.span.clone();
let span = invocation.span().clone();

span.in_scope(|| trace!(target=%invocation.target_url(),tx_id=%invocation.tx_id,id=%invocation.id, "invoking"));
let from_exposed = self.exposed_ops.get(invocation.target.operation_id());
span
.in_scope(|| trace!(target=%invocation.target().url(),tx_id=%invocation.tx_id(),id=%invocation.id(), "invoking"));
let from_exposed = self.exposed_ops.get(invocation.target().operation_id());

Box::pin(async move {
let stream = match &invocation.target {
let stream = match invocation.target() {
Entity::Operation(ns, _) => {
if ns == SelfComponent::ID || ns == Entity::LOCAL || Some(ns) == self.namespace.as_ref() {
if let Some(handler) = from_exposed {
let new_target = Entity::operation(handler.namespace(), invocation.target.operation_id());
span.in_scope(|| trace!(origin=%invocation.origin,original_target=%invocation.target, %new_target, "invoke::exposed::operation"));
invocation.target = new_target;
let new_target = Entity::operation(handler.namespace(), invocation.target().operation_id());
span.in_scope(|| trace!(origin=%invocation.origin(),original_target=%invocation.target(), %new_target, "invoke::exposed::operation"));
invocation = invocation.redirect(new_target);
return handler.component.handle(invocation, config, cb).await;
}
span
.in_scope(|| trace!(origin=%invocation.origin,target=%invocation.target, "invoke::composite::operation"));
span.in_scope(
|| trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::composite::operation"),
);
self
.self_component
.handle(invocation, config, self.get_callback())
.await?
} else if let Some(handler) = self.components.get(ns) {
span.in_scope(|| trace!(origin=%invocation.origin,target=%invocation.target, "invoke::handler::operation"));
span.in_scope(
|| trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::handler::operation"),
);
handler.component.handle(invocation, config, cb).await?
} else {
return Err(ComponentError::new(Error::TargetNotFound(
invocation.target.clone(),
invocation.target().clone(),
known_targets(),
)));
}
}
_ => return Err(ComponentError::new(Error::InvalidEntity(invocation.target))),
_ => return Err(ComponentError::new(Error::InvalidEntity(invocation.target().clone()))),
};

Ok::<_, ComponentError>(stream)
Expand Down
14 changes: 9 additions & 5 deletions crates/wick/flow-graph-interpreter/src/interpreter/channel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use flow_graph::{NodeIndex, PortReference};
use tracing::Span;
use uuid::Uuid;
use wick_packet::{Invocation, PacketPayload};
use wick_packet::{Invocation, PacketPayload, PacketStream};

pub(crate) use self::error::Error;
use super::executor::error::ExecutionError;
Expand Down Expand Up @@ -45,7 +45,7 @@ impl Event {
#[allow(clippy::exhaustive_enums)]
pub enum EventKind {
Ping(usize),
ExecutionStart(Box<ExecutionContext>),
ExecutionStart(Box<ExecutionContext>, PacketStream),
ExecutionDone,
PortData(PortReference),
Invocation(NodeIndex, Box<Invocation>),
Expand All @@ -57,7 +57,7 @@ impl EventKind {
pub(crate) const fn name(&self) -> &str {
match self {
EventKind::Ping(_) => "ping",
EventKind::ExecutionStart(_) => "exec_start",
EventKind::ExecutionStart(_, _) => "exec_start",
EventKind::ExecutionDone => "exec_done",
EventKind::PortData(_) => "port_data",
EventKind::Invocation(_, _) => "invocation",
Expand Down Expand Up @@ -166,8 +166,12 @@ impl InterpreterDispatchChannel {
self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error), self.span.clone()));
}

pub(crate) fn dispatch_start(&self, ctx: Box<ExecutionContext>) {
self.dispatch(Event::new(ctx.id(), EventKind::ExecutionStart(ctx), self.span.clone()));
pub(crate) fn dispatch_start(&self, ctx: Box<ExecutionContext>, stream: PacketStream) {
self.dispatch(Event::new(
ctx.id(),
EventKind::ExecutionStart(ctx, stream),
self.span.clone(),
));
}

pub(crate) fn dispatch_call_complete(&self, ctx_id: Uuid, op_index: usize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ impl Component for ComponentComponent {
_config: Option<RuntimeConfig>,
_callback: std::sync::Arc<RuntimeCallback>,
) -> BoxFuture<Result<PacketStream, ComponentError>> {
invocation.trace(|| debug!(target = %invocation.target, namespace = Self::ID));
invocation.trace(|| debug!(target = %invocation.target(), namespace = Self::ID));

// This handler handles the components:: namespace and outputs the entity
// to link to.
let target_name = invocation.target.operation_id().to_owned();
let entity = Entity::component(invocation.target.operation_id());
let target_name = invocation.target().operation_id().to_owned();
let entity = Entity::component(invocation.target().operation_id());

let contains_components = self.signature.operations.iter().any(|op| op.name == target_name);
let all_components: Vec<_> = self.signature.operations.iter().map(|op| op.name.clone()).collect();
Expand All @@ -63,7 +63,7 @@ impl Component for ComponentComponent {

tx.send(Packet::encode(
port_name,
ComponentReference::new(invocation.origin.clone(), Entity::component(entity.component_id())),
ComponentReference::new(invocation.origin().clone(), Entity::component(entity.component_id())),
))
.map_err(ComponentError::new)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl CoreComponent {
macro_rules! core_op {
($type:ty, $inv:expr, $name:expr, $callback:expr, $data:ident) => {{
let config = <$type>::decode_config($data)?;
let ctx = Context::new(config, &$inv.inherent, $callback);
let ctx = Context::new(config, $inv.inherent(), $callback);
$name.handle($inv, ctx).await
}};
}
Expand All @@ -165,17 +165,17 @@ impl Component for CoreComponent {
data: Option<RuntimeConfig>,
callback: std::sync::Arc<RuntimeCallback>,
) -> BoxFuture<Result<PacketStream, ComponentError>> {
invocation.trace(|| debug!(target = %invocation.target, namespace = Self::ID));
invocation.trace(|| debug!(target = %invocation.target(), namespace = Self::ID));

let task = async move {
match invocation.target.operation_id() {
match invocation.target().operation_id() {
sender::Op::ID => core_op! {sender::Op, invocation, self.sender, callback, data},
pluck::Op::ID => core_op! {pluck::Op, invocation, self.pluck, callback, data},
merge::Op::ID => core_op! {merge::Op, invocation, self.merge, callback, data},
switch::Op::ID => core_op! {switch::Op, invocation, self.switch, callback, data},
collect::Op::ID => core_op! {collect::Op, invocation, self.collect, callback, data},
_ => {
panic!("Core operation {} not handled.", invocation.target.operation_id());
panic!("Core operation {} not handled.", invocation.target().operation_id());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,17 @@ impl Operation for Op {
type Config = Config;
fn handle(
&self,
mut invocation: Invocation,
invocation: Invocation,
context: Context<Self::Config>,
) -> BoxFuture<Result<PacketStream, ComponentError>> {
let (tx, rx) = invocation.make_response();

tokio::spawn(async move {
let mut ports: HashMap<String, Vec<Value>> = context.config.inputs.iter().map(|n| (n.clone(), vec![])).collect();
let mut array_levels: HashMap<String, i16> = HashMap::new();
let mut stream = invocation.into_stream();

while let Some(next) = invocation.packets.next().await {
while let Some(next) = stream.next().await {
if let Err(e) = next {
ports
.entry(Packet::FATAL_ERROR.to_owned())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl Operation for Op {
context: Context<Self::Config>,
) -> BoxFuture<Result<PacketStream, ComponentError>> {
let (tx, rx) = invocation.make_response();
let mut map = StreamMap::from_stream(invocation.packets, self.input_names(&context.config));
let stream = invocation.into_stream();
let mut map = StreamMap::from_stream(stream, self.input_names(&context.config));
tokio::spawn(async move {
while let Ok(next) = map.next_set().await {
if next.is_none() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use flow_component::{ComponentError, Context, Operation, RenderConfiguration};
use futures::{FutureExt, StreamExt};
use serde_json::Value;
use wick_interface_types::{operation, OperationSignature};
use wick_packet::{Invocation, Packet, PacketStream, RuntimeConfig, StreamMap};
use wick_packet::{Invocation, Packet, PacketStream, RuntimeConfig};

use crate::BoxFuture;
pub(crate) struct Op {
Expand Down Expand Up @@ -77,34 +77,36 @@ impl Operation for Op {
invocation: Invocation,
context: Context<Self::Config>,
) -> BoxFuture<Result<PacketStream, ComponentError>> {
let mut map = StreamMap::from_stream(invocation.packets, self.input_names(&context.config));
let mapped = map.take("input").map_err(ComponentError::new).map(|input| {
input
.map(move |next| {
let field = context.config.field.clone();
next.and_then(move |packet| {
if packet.has_data() {
let obj = packet.decode_value()?;
let value = pluck(&obj, &field).map_or_else(
// let mut map = StreamMap::from_stream(invocation.packets, self.input_names(&context.config));
let field = context.config.field.clone();
let stream = invocation.into_stream();
let mapped = stream.filter_map(move |next| {
let a = next.map_or_else(
|e| Some(Err(e)),
|packet| {
if packet.port() != "input" {
return None;
}
if packet.has_data() {
Some(packet.decode_value().map(|obj| {
pluck(&obj, &field).map_or_else(
|| {
Packet::err(
"output",
format!("could not retrieve data from object path [{}]", field.join(",")),
)
},
|value| Packet::encode("output", value),
);

Ok(value)
} else {
Ok(packet.set_port("output"))
}
})
})
.boxed()
.into()
)
}))
} else {
Some(Ok(packet.set_port("output")))
}
},
);
futures::future::ready(a)
});
async move { mapped }.boxed()
async move { Ok(PacketStream::new(mapped)) }.boxed()
}

fn get_signature(&self, _config: Option<&Self::Config>) -> &OperationSignature {
Expand Down
Loading

0 comments on commit 33b6785

Please sign in to comment.