-
Hello everyone, I have a thread generating some data and sending it through a channel, I can't figure out how to, then, expose it to a websocket for clients to listen to. I'm aware of #2652 but it's still not clear. The code I wrote so far is here.
Do you have any pointers on how to solve this? |
Beta Was this translation helpful? Give feedback.
Answered by
Krahos
May 11, 2022
Replies: 1 comment 1 reply
-
Solved: I had to implement a different StreamHandler for the type I'm generating in the server. Full example code for anyone else stumbling on this: use std::{net::TcpListener, thread, time::Duration};
use actix::{Actor, AsyncContext, SpawnHandle, StreamHandler};
use actix_web::{
web::{self, Data, Payload},
App, Error, HttpRequest, HttpResponse, HttpServer,
};
use actix_web_actors::ws::{self};
use tokio::sync::watch::{channel, Receiver};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Define communication channel.
let (sender, receiver) = channel(Message { int: 0 });
// Spawn sender thread.
let _handle = thread::spawn(move || loop {
thread::sleep(Duration::from_secs(2));
println!("Generated a message");
sender
.send(Message { int: 3 })
.expect("Failed to write to channel");
println!("Sent a message");
});
// Define server.
let listener = TcpListener::bind("127.0.0.1:5568")?;
let app_state = Data::new(AppState { receiver });
let _server = HttpServer::new(move || {
App::new()
.route("/", web::get().to(index))
.app_data(app_state.clone())
})
.listen(listener)?
.run()
.await;
Ok(())
}
#[derive(Clone)]
struct AppState {
receiver: Receiver<Message>,
}
#[derive(Debug)]
pub struct Message {
pub int: i8,
}
impl ToString for Message {
fn to_string(&self) -> String {
self.int.to_string()
}
}
struct MyWs {
receiver: Receiver<Message>,
spawn_handle: Option<SpawnHandle>,
}
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let mut receiver = self.receiver.clone();
self.spawn_handle = Some(ctx.add_stream(async_stream::stream! {
while receiver.changed().await.is_ok() {
yield receiver.borrow().to_string()
};
}));
}
}
impl StreamHandler<String> for MyWs {
fn handle(&mut self, msg: String, ctx: &mut Self::Context) {
ctx.text(msg);
}
}
async fn index(
req: HttpRequest,
stream: Payload,
app_state: Data<AppState>,
) -> Result<HttpResponse, Error> {
ws::start(
MyWs {
receiver: app_state.receiver.clone(),
spawn_handle: None,
},
&req,
stream,
)
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(&mut self, _msg: Result<ws::Message, ws::ProtocolError>, _ctx: &mut Self::Context) {
print!("Received a message")
}
} |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
Krahos
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Solved: I had to implement a different StreamHandler for the type I'm generating in the server. Full example code for anyone else stumbling on this: