Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The features "signal" didn't work when combined with Axum #66

Open
linyihai opened this issue May 14, 2024 · 3 comments
Open

The features "signal" didn't work when combined with Axum #66

linyihai opened this issue May 14, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@linyihai
Copy link

I use tokio-cron-scheduler to run schedule job, and run together with Axum

#[tokio::main]
async fn main() {
    // Build our application with a single route.
    let ctx = Context::new();
    let app = axum::Router::new()
        .route("/", axum::routing::get(|| async move { "Hello, World!" }))
        .route("/metrics", axum::routing::get(metrics::test_metrics))
        .layer(ServiceBuilder::new().layer(Extension(ctx.test_metrics())));

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3030").await.unwrap();
    scheduler::schedule(&ctx).await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

I can't use ctrl + c to stop the running process.

@linyihai
Copy link
Author

But this example code, it can be stopped by ctrl + c

use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};

#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
    let mut sched = JobScheduler::new().await?;

    // Add basic cron job
    sched.add(
        Job::new("1/10 * * * * *", |_uuid, _l| {
            println!("I run every 10 seconds");
        })?
    ).await?;

    // Add async job
    sched.add(
        Job::new_async("1/7 * * * * *", |uuid, mut l| {
            Box::pin(async move {
                println!("I run async every 7 seconds");

                // Query the next execution time for this job
                let next_tick = l.next_tick_for_job(uuid).await;
                match next_tick {
                    Ok(Some(ts)) => println!("Next time for 7s job is {:?}", ts),
                    _ => println!("Could not get next tick for 7s job"),
                }
            })
        })?
    ).await?;

    // Add one-shot job with given duration
    sched.add(
        Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
            println!("I only run once");
        })?
    ).await?;

    // Create repeated job with given duration, make it mutable to edit it afterwards
    let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
        println!("I run repeatedly every 8 seconds");
    })?;

    // Add actions to be executed when the jobs starts/stop etc.
    jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was started, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was completed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was removed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;
    sched.add(jj).await?;

    // Feature 'signal' must be enabled
    sched.shutdown_on_ctrl_c();

    // Add code to be run during/after shutdown
    sched.set_shutdown_handler(Box::new(|| {
        Box::pin(async move {
            println!("Shut down done");
        })
    }));

    // Start the scheduler
    sched.start().await?;

    // Wait while the jobs run
    tokio::time::sleep(Duration::from_secs(100)).await;

    Ok(())
}

@mvniekerk
Copy link
Owner

Hi @linyihai
I'm wondering, seeing that Axum uses Tokio, if they're also using the ctrl+c adapter we're using? In that Tokio must handle both?

@mvniekerk mvniekerk added the bug Something isn't working label Aug 9, 2024
@linyihai
Copy link
Author

I'm wondering, seeing that Axum uses Tokio, if they're also using the ctrl+c adapter we're using? In that Tokio must handle both?

In my previous code, I didn't register the CTRL+C signal for Axum server instance, which prevents the task from being terminated by CTRL+C.

The smallest example that can be reproduced is as follows

use std::time::Duration;
use tokio::signal;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};

#[tokio::main]
async fn main() {
    // Example of using tokio-cron-scheduler
    schedule().await.unwrap();
    // Axum example, only reply ping with pong.
    let app = axum::Router::new().route("/ping", axum::routing::get(|| async move { "pong" }));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3030").await.unwrap();
    axum::serve(listener, app)
        // Commenting this line prevents the task from being terminated by CTRL+C
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

pub async fn schedule() -> Result<(), JobSchedulerError> {
    let mut sched = JobScheduler::new().await?;

    // Add basic cron job
    sched
        .add(Job::new("1/10 * * * * *", |_uuid, _l| {
            println!("I run every 10 seconds");
        })?)
        .await?;

    // Add async job
    sched
        .add(Job::new_async("1/7 * * * * *", |uuid, mut l| {
            Box::pin(async move {
                println!("I run async every 7 seconds");

                // Query the next execution time for this job
                let next_tick = l.next_tick_for_job(uuid).await;
                match next_tick {
                    Ok(Some(ts)) => println!("Next time for 7s job is {:?}", ts),
                    _ => println!("Could not get next tick for 7s job"),
                }
            })
        })?)
        .await?;

    // Add one-shot job with given duration
    sched
        .add(Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
            println!("I only run once");
        })?)
        .await?;

    // Create repeated job with given duration, make it mutable to edit it afterwards
    let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
        println!("I run repeatedly every 8 seconds");
    })?;

    // Add actions to be executed when the jobs starts/stop etc.
    jj.on_start_notification_add(
        &sched,
        Box::new(|job_id, notification_id, type_of_notification| {
            Box::pin(async move {
                println!(
                    "Job {:?} was started, notification {:?} ran ({:?})",
                    job_id, notification_id, type_of_notification
                );
            })
        }),
    )
    .await?;

    jj.on_stop_notification_add(
        &sched,
        Box::new(|job_id, notification_id, type_of_notification| {
            Box::pin(async move {
                println!(
                    "Job {:?} was completed, notification {:?} ran ({:?})",
                    job_id, notification_id, type_of_notification
                );
            })
        }),
    )
    .await?;

    jj.on_removed_notification_add(
        &sched,
        Box::new(|job_id, notification_id, type_of_notification| {
            Box::pin(async move {
                println!(
                    "Job {:?} was removed, notification {:?} ran ({:?})",
                    job_id, notification_id, type_of_notification
                );
            })
        }),
    )
    .await?;
    sched.add(jj).await?;

    // // Feature 'signal' must be enabled
    sched.shutdown_on_ctrl_c();

    // Add code to be run during/after shutdown
    sched.set_shutdown_handler(Box::new(|| {
        Box::pin(async move {
            println!("Shut down done");
        })
    }));

    // Start the scheduler
    sched.start().await?;

    Ok(())
}
async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => { println!("shutdown_signal");},
        _ = terminate => {println!("terminating");},
    }
}

If you uncomment this line, the problem is solved

.with_graceful_shutdown(shutdown_signal())

it seems that if sched.shutdown_on_ctrl_c(); used then the axum must use it together.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants