Skip to content

Commit

Permalink
make WebSocket Send + Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraef committed Dec 23, 2024
1 parent 304e109 commit 40aef96
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 181 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Thumbs.db
# Rust
/Cargo.lock
/target
/.cargo

# wasm example
/examples/wasm/dist
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ json = ["dep:serde", "dep:serde_json"]

[dependencies]
# pin version, see https://github.com/jgraef/reqwest-websocket/pull/33
futures-util = { version = ">=0.3.31", default-features = false, features = ["sink"] }
futures-util = { version = ">=0.3.31", default-features = false, features = ["sink", "async-await-macro"] }
futures-channel = { version = "0.3", default-features = false, features = ["sink", "std"] }
reqwest = { version = "0.12", default-features = false }
thiserror = "2"
tracing = "0.1"
Expand All @@ -38,14 +39,13 @@ tungstenite = { version = "0.24", default-features = false, features = ["handsha

[target.'cfg(target_arch = "wasm32")'.dependencies]
web-sys = { version = "0.3", features = ["WebSocket", "CloseEvent", "ErrorEvent", "Event", "MessageEvent", "BinaryType"] }
tokio = { version = "1", default-features = false, features = ["sync", "macros"] }
wasm-bindgen-futures = "0.4"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }
reqwest = { version = "0.12", features = ["default-tls"] }
serde = { version = "1.0", features = ["derive"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "alloc"] }
futures-util = "0.3"

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3"
wasm-bindgen-futures = "0.4"
2 changes: 2 additions & 0 deletions examples/wasm/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub fn App() -> impl IntoView {

spawn_local(async move {
let websocket = reqwest_websocket::websocket("https://echo.websocket.org/").await.unwrap();
tracing::info!("WebSocket connected");

let (mut sender, mut receiver) = websocket.split();

futures::join!(
Expand Down
40 changes: 34 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub enum Error {
#[cfg(target_arch = "wasm32")]
#[cfg_attr(docsrs, doc(cfg(target_arch = "wasm32")))]
#[error("web_sys error")]
WebSys(#[from] wasm::WebSysError),
WebSys(#[from] wasm::Error),

/// Error during serialization/deserialization.
#[error("serde_json error")]
Expand Down Expand Up @@ -178,7 +178,7 @@ impl UpgradedRequestBuilder {
let inner = native::send_request(self.inner, &self.protocols).await?;

#[cfg(target_arch = "wasm32")]
let inner = wasm::WebSysWebSocketStream::new(self.inner.build()?, &self.protocols).await?;
let inner = wasm::WebSocket::new(self.inner.build()?, &self.protocols).await?;

Ok(UpgradeResponse {
inner,
Expand All @@ -198,7 +198,7 @@ pub struct UpgradeResponse {
inner: native::WebSocketResponse,

#[cfg(target_arch = "wasm32")]
inner: wasm::WebSysWebSocketStream,
inner: wasm::WebSocket,

#[allow(dead_code)]
protocols: Vec<String>,
Expand Down Expand Up @@ -229,7 +229,7 @@ impl UpgradeResponse {

#[cfg(target_arch = "wasm32")]
let (inner, protocol) = {
let protocol = self.inner.protocol();
let protocol = self.inner.protocol().to_owned();
(self.inner, Some(protocol))
};

Expand All @@ -252,7 +252,7 @@ pub struct WebSocket {
inner: native::WebSocketStream,

#[cfg(target_arch = "wasm32")]
inner: wasm::WebSysWebSocketStream,
inner: wasm::WebSocket,

protocol: Option<String>,
}
Expand Down Expand Up @@ -283,7 +283,15 @@ impl WebSocket {
}

#[cfg(target_arch = "wasm32")]
self.inner.close(code.into(), reason.unwrap_or_default())?;
{
let mut inner = self.inner;
inner
.send(Message::Close {
code,
reason: reason.unwrap_or_default().to_owned(),
})
.await?;
}

Ok(())
}
Expand Down Expand Up @@ -344,8 +352,22 @@ pub mod tests {
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

use crate::{UpgradeResponse, UpgradedRequestBuilder};

use super::{websocket, CloseCode, Message, RequestBuilderExt, WebSocket};

macro_rules! assert_send {
($ty:ty) => {
const _: () = {
struct Assert<T: Send>(std::marker::PhantomData<T>);
Assert::<$ty>(std::marker::PhantomData);
};
};
}

// unfortunately hyper IO is not sync
assert_send!(WebSocket);

async fn test_websocket(mut websocket: WebSocket) {
let text = "Hello, World!";
websocket
Expand Down Expand Up @@ -467,4 +489,10 @@ pub mod tests {
assert_eq!(byte, 1001u16);
assert_eq!(u16::from(text), 1001u16);
}

// assert that our types are Send + Sync
trait AssertSendSync: Send + Sync {}
impl AssertSendSync for UpgradedRequestBuilder {}
impl AssertSendSync for UpgradeResponse {}
impl AssertSendSync for WebSocket {}
}
Loading

0 comments on commit 40aef96

Please sign in to comment.