Skip to content

Microframework para escrever consumer assíncrono para RabbitMQ

Notifications You must be signed in to change notification settings

marianaruddy/async-worker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

49 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status codecov

Async Worker

Propósito

Ser um microframework (inspirado no flask) para facilitar a escrita de workers de RabbitMQ.

Exemplo

from asyncworker import App

app = App(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)

@app.route(["asgard/counts", "asgard/counts/errors"], vhost="fluentd")
async def drain_handler(message):
    logger.info(message)


app.run()

Nesse exemplo, o handler drain_handler() recebe mensagens de ambas as filas: asgard/counts e asgard/counts/errors.

Se o handler lançar alguma exception, a mensagem é automaticamente devolvida para a fila (reject com requeue=True); Se o handler rodar sem erros, a mensagem é automaticamente confirmada (ack).

Atualizando o async-worker no seu projeto

0.1.0 para 0.2.0

Na versão 0.2.0 criamos a possibilidade de receber mensagens em lote. E a partir dessa versão a assinatura do handler mudo para:

from asyncworker.rabbitmq.message import Message

async def handler(messages: List[Message]):
  pass

As instâncias do objeto asyncworker.rabbitmq.RabbitMQMessage já vêm por padrão configurado para receber ack() depois queo handler retornar (sem exception), mas o handler pode mudar isso chamando o método message.reject() para cada mensagem que precisar ser devolvida para a fila.

O conteúdo da mensagem original está agora no atributo message.body. Então um handler antigo que era assim:

from asyncworker import App

app = App(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)

@app.route(["asgard/counts", "asgard/counts/errors"], vhost="fluentd")
async def drain_handler(message):
    logger.info(message)


app.run()

passa a ser assim:

from asyncworker import App

app = App(host="127.0.0.1", user="guest", password="guest", prefetch_count=256)

@app.route(["asgard/counts", "asgard/counts/errors"], vhost="fluentd")
async def drain_handler(messages):
    for m in messages:
      logger.info(message.body)


app.run()

About

Microframework para escrever consumer assíncrono para RabbitMQ

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages