How to use Schedule with Fastapi #374
-
I'm new to this architecture and read multiple examples but I still have some questions. Basically you need to run 3 things right?
import taskiq_fastapi
from taskiq import TaskiqScheduler, ZeroMQBroker
from taskiq.schedule_sources import LabelScheduleSource
broker = ZeroMQBroker()
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)
taskiq_fastapi.init(broker, "main:app")
@broker.task(schedule=[{"cron": "*/1 * * * *"}])
async def add_one() -> None:
print("Hello")
@asynccontextmanager
async def lifespan(app: FastAPI):
if not broker.is_worker_process:
print("Starting worker")
await broker.startup()
yield
if not broker.is_worker_process:
print("Shutdown worker")
await broker.shutdown()
app = FastAPI(lifespan=lifespan)
uvicorn main:app --port 8080 What does this do? From my understanding, this starts FastAPI with the broker. But does it start the scheduler?
[2024-11-06 12:52:29,946][INFO ][run:run_scheduler:206] Starting scheduler.
Traceback (most recent call last):
...
zmq.error.ZMQError: Address already in use (addr='tcp://0.0.0.0:5555')
$ taskiq worker schedule.taskiq_scheduler:broker This returns a small part of my fastapi [2024-11-06 12:46:24,926][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-06 12:46:24,929][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 36479
[2024-11-06 12:46:24,930][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 36481
INFO: [06-11-2024 12:46:25] main.py <module>(81): Not using GZIP
INFO: [06-11-2024 12:46:25] main.py <module>(81): Not using GZIP
INFO: [06-11-2024 12:46:25] receiver.py listen(330): Listening started.
INFO: [06-11-2024 12:46:25] receiver.py listen(330): Listening started. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
The problem core is in the way how ZeroMQ works. Please switch to another broker for correct behavior. The problem is that ZeroMQ publisher needs to bind a port so subscribers can connect to this address. Since scheduler is sending tasks, it needs to bind a socket for publishing, but your fastapi application has already bound this port for publishing because of the broker that runs inside the app. Also, yes, scheduler needs to run in order to schedule tasks. So my suggestion would be to try another broker like redis, nats or rabbitmq. |
Beta Was this translation helpful? Give feedback.
The problem core is in the way how ZeroMQ works. Please switch to another broker for correct behavior.
The problem is that ZeroMQ publisher needs to bind a port so subscribers can connect to this address. Since scheduler is sending tasks, it needs to bind a socket for publishing, but your fastapi application has already bound this port for publishing because of the broker that runs inside the app.
Also, yes, scheduler needs to run in order to schedule tasks. So my suggestion would be to try another broker like redis, nats or rabbitmq.