Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler can send task twice #296

Open
sector119 opened this issue Feb 22, 2024 · 5 comments
Open

Scheduler can send task twice #296

sector119 opened this issue Feb 22, 2024 · 5 comments

Comments

@sector119
Copy link

Hello

In run_scheduler_loop function in next_minute calculation if datetime.now() = 17:22:59.55555 and we replace seconds and microseconds with 0 and add timedelta(minutes=1) we get next_minute=17:23:00
and when we calculate delay with delay = next_minute - datetime.now(), current time can be > or just a bit lower than next_minute, so we get negative delay like -0.000299 or positive like 0.000373

Don't you think that we should check delay like:

delay = (next_minute - datetime.now()).total_seconds()

if int(delay) <= 0:
    delay = 60.0

await asyncio.sleep(delay)

Logs:

Next minute: 2024-02-22 14:13:00
Now: 2024-02-22 14:13:00.000299
Delay: -1 day, 23:59:59.999701
Delay total seconds: -0.000299
[2024-02-22 14:13:00,000][INFO ][run:delayed_send:130] Sending task billing:sync-payments.

Next minute: 2024-02-22 14:14:00
Now: 2024-02-22 14:13:00.001998
Delay: 0:00:59.998002
Delay total seconds: 59.998002
[2024-02-22 14:13:00,002][INFO ][run:delayed_send:130] Sending task billing:sync-payments.

@s3rius
Copy link
Member

s3rius commented Apr 19, 2024

Actually you're right, but I think it can be resolved by calculating next_minute right after sending all tasks.

diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py
index 6a17a11..73f06f8 100644
--- a/taskiq/cli/scheduler/run.py
+++ b/taskiq/cli/scheduler/run.py
@@ -144,9 +144,6 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
     running_schedules = set()
     while True:
         # We use this method to correctly sleep for one minute.
-        next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
-            minutes=1,
-        )
         scheduled_tasks = await get_all_schedules(scheduler)
         for source, task_list in scheduled_tasks.items():
             for task in task_list:
@@ -165,7 +162,9 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
                     )
                     running_schedules.add(send_task)
                     send_task.add_done_callback(running_schedules.discard)
-
+        next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
+            minutes=1,
+        )
         delay = next_minute - datetime.now()
         await asyncio.sleep(delay.total_seconds())
 

In this setup it would be impossible to get into the situation with negative number of seconds. What do you think?

@taras0024
Copy link

Hello
I have the same problem with sending scheduled task twice. Are there any solutions to resolve this problem?

@s3rius
Copy link
Member

s3rius commented Sep 6, 2024

What version of taskiq are you on? And can you please give some information what is happening?

@taras0024
Copy link

taskiq==0.11.7

tasks file (main.py)

import datetime
import os

from taskiq import Context, TaskiqDepends
from taskiq.serializers import JSONSerializer
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

broker = (
    ListQueueBroker("redis://localhost:6379/0")
    .with_result_backend(
        RedisAsyncResultBackend(
            "redis://localhost:6379/1",
            result_ex_time=int(datetime.timedelta(minutes=10).total_seconds()),
            serializer=JSONSerializer()
        )
    )
)


@broker.task(schedule=[{"cron": "*/1 * * * *"}])
async def foo(context: Context = TaskiqDepends()):
    print(f"\n--- {os.getpid()} ---")
    print("Task ID:", context.message.task_id)
    print(datetime.datetime.now())
    print(f"---------------------\n")

scheduler file (schedule.py)

from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import get_all_schedules
from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler

from task.main import broker
from task.lib import run_scheduler

scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)


async def main() -> None:
    print(await get_all_schedules(scheduler))
    await run_scheduler(SchedulerArgs(scheduler=scheduler, modules=[], skip_first_run=True))


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

taskiq runner fork file with added prints (lib.py)

import asyncio
import sys
from datetime import datetime, timedelta
from logging import basicConfig, getLevelName, getLogger
from typing import Dict, List, Optional

import pytz
from pycron import is_now

from taskiq.abc.schedule_source import ScheduleSource
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import get_all_schedules, get_task_delay, delayed_send
from taskiq.cli.utils import import_object, import_tasks
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler

logger = getLogger(__name__)


async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
    loop = asyncio.get_event_loop()
    running_schedules = set()
    while True:
        # We use this method to correctly sleep for one minute.
        scheduled_tasks = await get_all_schedules(scheduler)
        for source, task_list in scheduled_tasks.items():
            for task in task_list:
                try:
                    task_delay = get_task_delay(task)
                except ValueError:
                    logger.warning(
                        "Cannot parse cron: %s for task: %s, schedule_id: %s",
                        task.cron,
                        task.task_name,
                        task.schedule_id,
                    )
                    continue
                if task_delay is not None:
                    send_task = loop.create_task(
                        delayed_send(scheduler, source, task, task_delay),
                    )
                    running_schedules.add(send_task)
                    send_task.add_done_callback(running_schedules.discard)

        print("\n=== Running schedules ===")
        dt1 = datetime.now()
        print(f"{dt1=}")
        next_minute = dt1.replace(second=0, microsecond=0) + timedelta(
            minutes=1,
        )
        print(f"{next_minute=}")
        delay = next_minute - (dt2 := datetime.now())
        print(f"{dt2=}")
        print(f"{delay=}")
        print("================\n")
        await asyncio.sleep(delay.total_seconds())


async def run_scheduler(args: SchedulerArgs) -> None:
    if args.configure_logging:
        basicConfig(
            level=getLevelName(args.log_level),
            format=(
                "[%(asctime)s][%(levelname)-7s]"
                "[%(module)s:%(funcName)s:%(lineno)d]"
                " %(message)s"
            ),
        )
    getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
    if isinstance(args.scheduler, str):
        scheduler = import_object(args.scheduler)
    else:
        scheduler = args.scheduler
    if not isinstance(scheduler, TaskiqScheduler):
        logger.error(
            "Imported scheduler is not a subclass of TaskiqScheduler.",
        )
        sys.exit(1)
    scheduler.broker.is_scheduler_process = True
    import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
    for source in scheduler.sources:
        await source.startup()

    logger.info("Starting scheduler.")
    await scheduler.startup()
    logger.info("Startup completed.")
    if args.skip_first_run:
        next_minute = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
            minutes=1,
        )
        delay = next_minute - datetime.utcnow()
        delay_secs = int(delay.total_seconds())
        logger.info(f"Skipping first run. Waiting {delay_secs} seconds.")
        await asyncio.sleep(delay.total_seconds())
        logger.info("First run skipped. The scheduler is now running.")
    try:
        await run_scheduler_loop(scheduler)
    except asyncio.CancelledError:
        logger.warning("Shutting down scheduler.")
        await scheduler.shutdown()
        for source in scheduler.sources:
            await source.shutdown()
        logger.info("Scheduler shut down. Good bye!")

Results
image

@Atanusaha143
Copy link

@s3rius Can you please look into this discussion?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants