Skip to content

Commit

Permalink
refactor: removed mutex from context config
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Sep 12, 2023
1 parent 9b6380e commit 935a7f3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/wick/wick-component-wasm/src/wasm_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl WasmHost {
}

#[allow(clippy::needless_pass_by_value)]
pub fn call(&self, invocation: Invocation, config: Option<RuntimeConfig>) -> Result<PacketStream> {
pub fn call(&self, mut invocation: Invocation, config: Option<RuntimeConfig>) -> Result<PacketStream> {
let _span = self.span.enter();
let component_name = invocation.target.operation_id();
let inherent = invocation.inherent;
Expand Down
23 changes: 13 additions & 10 deletions crates/wick/wick-packet/src/packet_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pin_project! {
pub struct PacketStream {
#[pin]
inner: std::sync::Arc<parking_lot::Mutex<dyn Stream<Item = Result<Packet>> + Send + Unpin>>,
config: std::sync::Arc<parking_lot::Mutex<Option<ContextConfig>>>,
config: Option<ContextConfig>,
span: Span
}
}
Expand Down Expand Up @@ -79,8 +79,8 @@ impl PacketStream {
self.span = span;
}

pub fn set_context(&self, context: RuntimeConfig, inherent: InherentData) {
self.config.lock().replace((context, inherent));
pub fn set_context(&mut self, context: RuntimeConfig, inherent: InherentData) {
self.config.replace((context, inherent));
}

pub fn new_channels() -> (PacketSender, Self) {
Expand All @@ -103,9 +103,12 @@ impl Stream for PacketStream {
type Item = Result<Packet>;

fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let config = self.config.lock().take();
let mut this = self;
#[allow(unsafe_code)] // this is the implementation of futures::pin_mut!()
let mut this = unsafe { Pin::new_unchecked(&mut this) };
let config = this.config.take();
let poll = {
let mut stream = self.inner.lock();
let mut stream = this.inner.lock();
Pin::new(&mut *stream).poll_next(cx)
};

Expand All @@ -119,14 +122,14 @@ impl Stream for PacketStream {
);
tracing::trace!("attached context to packet on port '{}'", packet.port());
if cfg!(debug_assertions) {
self.span.in_scope(|| {
this.span.in_scope(|| {
if span_enabled!(tracing::Level::TRACE) {
let debug_packet = packet
.clone()
.decode_value()
.map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string());
let until = std::cmp::min(debug_packet.len(), 2048);
self.span.in_scope(|| {
this.span.in_scope(|| {
tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet");
});
}
Expand All @@ -135,21 +138,21 @@ impl Stream for PacketStream {
Poll::Ready(Some(Ok(packet)))
}
x => {
self.config.lock().replace(config);
this.config.replace(config);
x
}
}
} else {
if let Poll::Ready(Some(Ok(packet))) = &poll {
if cfg!(debug_assertions) {
self.span.in_scope(|| {
this.span.in_scope(|| {
if span_enabled!(tracing::Level::TRACE) {
let debug_packet = packet
.clone()
.decode_value()
.map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string());
let until = std::cmp::min(debug_packet.len(), 2048);
self.span.in_scope(|| {
this.span.in_scope(|| {
tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet");
});
}
Expand Down

0 comments on commit 935a7f3

Please sign in to comment.