Ser um microframework (inspirado no flask) para facilitar a escrita de workers de RabbitMQ.
from asyncworker import App
app = App(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)
@app.route(["asgard/counts", "asgard/counts/errors"], vhost="fluentd")
async def drain_handler(message):
logger.info(message)
app.run()
Nesse exemplo, o handler drain_handler()
recebe mensagens de ambas as filas: asgard/counts
e asgard/counts/errors
.
Se o handler lançar alguma exception, a mensagem é automaticamente devolvida para a fila (reject com requeue=True); Se o handler rodar sem erros, a mensagem é automaticamente confirmada (ack).
Na versão 0.2.0
criamos a possibilidade de receber mensagens em lote. E a partir dessa versão
a assinatura do handler mudo para:
from asyncworker.rabbitmq.message import Message
async def handler(messages: List[Message]):
pass
As instâncias do objeto asyncworker.rabbitmq.RabbitMQMessage
já vêm por padrão configurado para receber ack()
depois queo handler retornar (sem exception), mas o handler pode mudar isso
chamando o método message.reject()
para cada mensagem que precisar ser devolvida para a fila.
O conteúdo da mensagem original está agora no atributo message.body
. Então um handler antigo que era assim:
from asyncworker import App
app = App(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)
@app.route(["asgard/counts", "asgard/counts/errors"], vhost="fluentd")
async def drain_handler(message):
logger.info(message)
app.run()
passa a ser assim:
from asyncworker import App
app = App(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)
@app.route(["asgard/counts", "asgard/counts/errors"], vhost="fluentd")
async def drain_handler(messages):
for m in messages:
logger.info(message.body)
app.run()