diff --git a/crates/wick/wick-component-wasm/src/wasm_host.rs b/crates/wick/wick-component-wasm/src/wasm_host.rs index c9077fd8..a5998e1a 100644 --- a/crates/wick/wick-component-wasm/src/wasm_host.rs +++ b/crates/wick/wick-component-wasm/src/wasm_host.rs @@ -205,7 +205,7 @@ impl WasmHost { } #[allow(clippy::needless_pass_by_value)] - pub fn call(&self, invocation: Invocation, config: Option) -> Result { + pub fn call(&self, mut invocation: Invocation, config: Option) -> Result { let _span = self.span.enter(); let component_name = invocation.target.operation_id(); let inherent = invocation.inherent; diff --git a/crates/wick/wick-packet/src/packet_stream.rs b/crates/wick/wick-packet/src/packet_stream.rs index eaffb61a..edc631b0 100644 --- a/crates/wick/wick-packet/src/packet_stream.rs +++ b/crates/wick/wick-packet/src/packet_stream.rs @@ -33,7 +33,7 @@ pin_project! { pub struct PacketStream { #[pin] inner: std::sync::Arc> + Send + Unpin>>, - config: std::sync::Arc>>, + config: Option, span: Span } } @@ -79,8 +79,8 @@ impl PacketStream { self.span = span; } - pub fn set_context(&self, context: RuntimeConfig, inherent: InherentData) { - self.config.lock().replace((context, inherent)); + pub fn set_context(&mut self, context: RuntimeConfig, inherent: InherentData) { + self.config.replace((context, inherent)); } pub fn new_channels() -> (PacketSender, Self) { @@ -103,9 +103,12 @@ impl Stream for PacketStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let config = self.config.lock().take(); + let mut this = self; + #[allow(unsafe_code)] // this is the implementation of futures::pin_mut!() + let mut this = unsafe { Pin::new_unchecked(&mut this) }; + let config = this.config.take(); let poll = { - let mut stream = self.inner.lock(); + let mut stream = this.inner.lock(); Pin::new(&mut *stream).poll_next(cx) }; @@ -119,14 +122,14 @@ impl Stream for PacketStream { ); tracing::trace!("attached context to packet on port '{}'", packet.port()); if cfg!(debug_assertions) { - self.span.in_scope(|| { + this.span.in_scope(|| { if span_enabled!(tracing::Level::TRACE) { let debug_packet = packet .clone() .decode_value() .map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string()); let until = std::cmp::min(debug_packet.len(), 2048); - self.span.in_scope(|| { + this.span.in_scope(|| { tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet"); }); } @@ -135,21 +138,21 @@ impl Stream for PacketStream { Poll::Ready(Some(Ok(packet))) } x => { - self.config.lock().replace(config); + this.config.replace(config); x } } } else { if let Poll::Ready(Some(Ok(packet))) = &poll { if cfg!(debug_assertions) { - self.span.in_scope(|| { + this.span.in_scope(|| { if span_enabled!(tracing::Level::TRACE) { let debug_packet = packet .clone() .decode_value() .map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string()); let until = std::cmp::min(debug_packet.len(), 2048); - self.span.in_scope(|| { + this.span.in_scope(|| { tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet"); }); }