Stream background job results to MPSC channel not ending #266
-
Hello, I've recently discovered this cool library and I wanted to use it to make parallel and scheduled calculations. The issue is that the Monitor never ends, and I don't know why. I think this is related to the original sender not being closed/dropped but I can't figure out a way to drop it without breaking something. Here is a minimal example of what I described: use std::thread::sleep;
use std::time::Duration;
use apalis::prelude::*;
use apalis::{
prelude::{Data, Job, WorkerBuilder},
redis::RedisStorage,
};
use futures::channel::mpsc::{channel, Sender};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
#[derive(Clone)]
struct JobContext {
sender: Sender<i32>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
struct SomeJob {
id: i32,
}
impl Job for SomeJob {
const NAME: &'static str = "apalis::SomeJob";
}
async fn send_id(job: SomeJob, ctx: Data<JobContext>) {
let mut cloned_sender = ctx.sender.clone();
sleep(Duration::from_secs(1));
cloned_sender.try_send(job.id).unwrap();
println!("Sent {}", job.id);
}
async fn produce_jobs(mut storage: RedisStorage<SomeJob>) {
for i in 0..12 {
let job = SomeJob { id: i };
storage.push(job).await.unwrap();
}
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = channel::<i32>(1024);
let conn = apalis::redis::connect("redis://127.0.0.1/").await.unwrap();
let storage = RedisStorage::new(conn);
produce_jobs(storage.clone()).await;
tokio::task::spawn(async move {
while let Some(x) = rx.next().await {
println!("Received {:?}", x);
}
println!("This is never reached, loop will not finish");
});
let job_ctx = JobContext { sender: tx };
let worker = WorkerBuilder::new("testing")
.data(job_ctx)
.with_storage(storage.clone())
.build_fn(send_id);
Monitor::<TokioExecutor>::new()
.register_with_count(3, worker)
.run()
.await
.unwrap();
println!("This is never reached either, monitor is still waiting for workers to finish");
} Please tell me if there's another way of doing what i'm trying to do! Thanks |
Beta Was this translation helpful? Give feedback.
Replies: 7 comments 17 replies
-
Hello! Welcome to our discussions. We will walk together and hopefully help you out 🙂. Your approach should work, you may consider sending an enum something like enum Control {
Continue(TaskId),
Break
} This can be combined with That said If you could specify what you want to do with the result I can write you a basic starter layer. |
Beta Was this translation helpful? Give feedback.
-
Step 1: Setup Both your monitor and async-graphql in the main Step 2: Create a channel in the let (tx, rx) = channel::<i32>(1024); Step 3: Pass in the receiver to async graphql .data method eg in .data(rx) Step 4: Create a layer that takes in the sender (this assumes your result is a String for simplicity) pub struct ChannelResultLayer(Sender<String>);
impl<S> Layer<S> for ChannelResultLayer {
type Service = ChannelResultService<S>;
fn layer(&self, service: S) -> Self::Service {
ChannelResultService {
service,
sender: self.sender.clone()
}
}
}
pub struct ChannelResultService<S> {
service: S,
sender: Sender<String>
}
impl<S, J> Service<Request<J>> for SaveResultService<S>
where
S: Service<Request<J>>,
Request: std::fmt::Debug,
J: Job
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request<J>) -> Self::Future {
let id = request.id();
let fut = self.service.call(request);
let sender = self.sender.clone();
Box::pin(async move {
let res = fut.await;
sender.send(res.to_string());
res
})
}
} Step 5: Pass in our layer to the worker: let worker = WorkerBuilder::new("testing")
.layer(ChannelResultLayer(tx))
.with_storage(storage.clone())
.build_fn(send_email); Step 6: Pass in the Receiver to the subscription: #[Subscription]
impl SomeSubscription {
async fn iterate(&self, ctx: &Context<'_>) -> impl Stream<Item = String> {
let rx = ctx.data::<Receiver<String>>();
rx
}
} This should demonstrate what you need to do. Let me know if it helps. |
Beta Was this translation helpful? Give feedback.
-
I think the current approach could be modified with
1. Make the sender send a tuple eg ‘Res(Id, String)’ or something
similar.
2. Graphql subscriptions allow you to specify an input
See
https://graphql.org/blog/subscriptions-in-graphql-and-relay/
You could let the user pass the id they want to subscribe to.
3. Leverage stream functions to filter/filter_map the stream to provide
each subscription with specific data.
See
https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter
Let me if that works.
|
Beta Was this translation helpful? Give feedback.
-
You might need to use Arc<Mutex<T>>.
Apalis has Notify btw which implements Stream
|
Beta Was this translation helpful? Give feedback.
-
You may need to deref the Mutex guard
|
Beta Was this translation helpful? Give feedback.
-
Please check out
https://github.com/geofmureithi/apalis/blob/master/packages/apalis-core/src/notify.rs
It has the implementation you are looking for.
Regarding your approach, its possible but you may need to create a type for
each subscription.
If the only issue remaining is the stream part I suggest you look at the
code I have shared with you. Else you give me until Tuesday as I am on
vacation.
|
Beta Was this translation helpful? Give feedback.
-
You might want to consider dashmap or something else concurrent.
|
Beta Was this translation helpful? Give feedback.
What I've finally done is create a global hashmap that links job ids to senders.
When a client makes a request, the server :
Creates a MPSC channel (tx, rx)
Creates a Job
Add tx to the global hashmap with the job id
Add job to storage
Workers receives the job
Get the tx sender from the global hashmap and clone it
Work on the job and send the results with the cloned tx
It seems to work pretty well, tho i've not tested it at big scale, I wonder if the hashmap mutex will bottleneck when trying to lock it in multiple threads at the same time.