Skip to content

Commit

Permalink
fix bridge after updating cargo
Browse files Browse the repository at this point in the history
  • Loading branch information
k2d222 committed Oct 19, 2024
1 parent 82dc438 commit 2f4cdb2
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 72 deletions.
68 changes: 16 additions & 52 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ vek = { version = "0.16", features = ["az", "rgba", "serde", "uv"] }
thiserror = "1.0.44"
sanitize-filename = "0.5.0"
serde_with = "3.3.0"
url = "2.4.1"
rand = "0.8.5"
either = "1.9.0"
itertools = "0.11.0"
tower_governor = { version = "0.4.3", features = ["axum"] }
tokio-tungstenite = { version = "0.24.0", features = ["rustls-tls-webpki-roots"] }


[lib]
Expand Down
43 changes: 30 additions & 13 deletions server/src/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc};

use axum_tungstenite::Message as WebSocketMessage;
use axum_tungstenite::WebSocket;
use axum::extract::ws::{Message as WebSocketMessage, WebSocket};
use futures::{
channel::mpsc::{unbounded, UnboundedSender},
StreamExt, TryStreamExt,
SinkExt, StreamExt, TryStreamExt,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame};
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;

use crate::{error::Error, protocol::*, room::Peer, server::Server, util::*};

use futures_util::{future::Either, stream, SinkExt};
use futures_util::{future::Either, stream};
use tokio_tungstenite::connect_async;

type Tx = UnboundedSender<WebSocketMessage>;
Expand Down Expand Up @@ -81,8 +82,7 @@ impl Server {
WebSocketMessage::Close(_) => futures::future::err(Error::BridgeClosed),
WebSocketMessage::Binary(_)
| WebSocketMessage::Ping(_)
| WebSocketMessage::Pong(_)
| WebSocketMessage::Frame(_) => futures::future::ok(()),
| WebSocketMessage::Pong(_) => futures::future::ok(()),
}
});

Expand Down Expand Up @@ -185,15 +185,15 @@ impl Server {
/// Open a bridge with a remote server
pub async fn open_bridge(server: Arc<Server>, cfg: BridgeConfig) -> Result<(), Error> {
server.close_bridge();
let url = url::Url::parse(&cfg.url).map_err(|_| Error::BridgeNotFound)?;

let (socket, _resp) = connect_async(&url)
let (socket, _resp) = connect_async(&cfg.url)
.await
.map_err(|_| Error::BridgeNotFound)?;
let (mut ws_send, ws_recv) = socket.split();

ws_send
.send(WebSocketMessage::Text(serde_json::to_string(&cfg).unwrap()))
.send(TungsteniteMessage::Text(
serde_json::to_string(&cfg).unwrap(),
))
.await
.map_err(|_| Error::BridgeClosed)?;

Expand All @@ -209,7 +209,7 @@ impl Server {
.map_err(|_| Error::BridgeClosed)
.try_for_each(move |v| {
let (addr_msg, payload_msg) = match v.into_iter().collect_tuple() {
Some((WebSocketMessage::Text(m1), WebSocketMessage::Text(m2))) => (m1, m2),
Some((TungsteniteMessage::Text(m1), TungsteniteMessage::Text(m2))) => (m1, m2),
_ => return futures::future::err(Error::BridgeClosed),
};

Expand All @@ -227,6 +227,23 @@ impl Server {
payload,
])
})
// we need to do a bit of juggling here because we combine axum's tungstenite
// with tokio's tungstenite
.map(|msg| match msg {
WebSocketMessage::Text(x) => TungsteniteMessage::Text(x),
WebSocketMessage::Binary(x) => TungsteniteMessage::Binary(x),
WebSocketMessage::Ping(x) => TungsteniteMessage::Ping(x),
WebSocketMessage::Pong(x) => TungsteniteMessage::Pong(x),
WebSocketMessage::Close(Some(close)) => {
TungsteniteMessage::Close(Some(CloseFrame {
code: CloseCode::from(close.code),
reason: close.reason,
}))
}
WebSocketMessage::Close(None) => {
TungsteniteMessage::Close(None)
}
})
.map(Ok)
.forward(tx.clone());
tokio::spawn(fut);
Expand Down Expand Up @@ -273,8 +290,8 @@ impl Server {
futures::future::ready(res.ok_or(Error::BridgeFailure))
});

log::info!("bridge connected to {}", &url);
let url = url.clone();
let url = cfg.url.clone();
log::info!("bridge connected to {}", url);

let handle = tokio::spawn(async move {
match futures::future::select(fut_send, fut_recv).await {
Expand Down
7 changes: 3 additions & 4 deletions server/src/bridge_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ use std::{
sync::Arc,
};

use axum::extract::ws::{Message as WebSocketMessage, WebSocketUpgrade};
use axum::{
extract::{ConnectInfo, Path, State},
headers::UserAgent,
response::IntoResponse,
Json, TypedHeader,
Json,
};
use axum_tungstenite::Message as WebSocketMessage;
use axum_tungstenite::WebSocketUpgrade;
use axum_extra::{headers::UserAgent, TypedHeader};
use futures::channel::mpsc::unbounded;
use futures_util::StreamExt;

Expand Down
4 changes: 2 additions & 2 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ mod twmap_map_checks;
mod twmap_map_edit;
mod util;

#[cfg(any(feature = "bridge_in", feature = "bridge_out"))]
#[cfg(feature = "bridge")]
mod bridge;
#[cfg(any(feature = "bridge_in", feature = "bridge_out"))]
#[cfg(feature = "bridge")]
mod bridge_router;

use room::Room;
Expand Down

0 comments on commit 2f4cdb2

Please sign in to comment.