Skip to content

Latest commit

 

History

History
1130 lines (846 loc) · 33.5 KB

File metadata and controls

1130 lines (846 loc) · 33.5 KB

Caso de uso: publicador/consumidor

A lo largo de este capítulo exploraremos las capacidades de asyncio utilizando como base el excelente tutorial de Lynn Root, asyncio: We Did It Wrong.

Los escenarios donde múltiples tareas se comunican a través de una cola son muchos. El tutorial de Lynn Root nos insta a emular Chaos Monkey, una herramienta para mandar señales de terminación a máquinas virtuales y así obligar a los ingenieros a crear sistemas más tolerantes a fallos.

Nuestro objetivo es atender a los numerosos mensajes de terminación emulando la finalización de las máquinas virtuales. Los mensajes los produciremos nosotros y los consumiremos nosotros, sin que productores y consumidores se comuniquen directamente sino a través de una cola.

Configuración inicial

Crea un proyecto nuevo y crea un fichero mayhem.py con el siguiente contenido:

import asyncio
import logging
import random
import string
from dataclasses import dataclass, field


logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def restart_host(msg):
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')


async def publish(queue, n):
    choices = string.ascii_lowercase + string.digits
    for x in range(1, n + 1):
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=x, instance_name=instance_name)
        # publish an item
        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    # indicate the publisher is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        # the publisher emits None to indicate that it is done
        if msg is None:
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        await restart_host(msg)


def main():
    queue = asyncio.Queue()
    asyncio.run(publish(queue, 5))
    asyncio.run(consume(queue))


if __name__ == '__main__':
    main()

Como ves, PubSubMessage es una clase orientada a datos que, sencillamente, simula un mensaje con un identificador dirigido a una determinada instancia virtual expuesta a través de un nombre de dominio.

La corrutina publish() toma una cola de tipo asyncio.Queue donde publicar los mensajes y un número de mensajes, y, por cada mensaje, se inventa el nombre de una instancia y deja un mensaje en la cola dirigido a ella. Cuando ha terminado, deja un mensaje None.

La corrutine consume() toma una cola de la que recoge los mensajes, en principio, indefinidamente, hasta que da con el mensaje especial None, y simula que los procesa mediante una llamada a asyncio.sleep().

La función main() ejecuta primero dos tareas en lote, primero la tarea productora, y luego la consumidora.

Lanza el programa y observa cómo se comporta. Asíncrono, sin duda, pero no muy efectivo.

De programa de una sola ejecución a servicio

El problema con nuestra función main() es que ejecuta una tarea, luego otra, y termina. Lo que queremos es que nuestro programa se comporte como un servicio y corra de manera indefinida.

Para ello cambia la función main() de la siguiente forma:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info('Successfully shutdown the Mayhem service.')

Vuelve a lanzar el programa y observa como ahora no termina. Necesitas pulsar ctrl + C (o pulsar stop en PyCharm).

Cuando interrumpas el bucle observarás que jamás se imprime el mensaje de finalización del servicio. La excepción se produce en run_forever() y nunca se ejecutan las dos últimas instrucciones.

Una ligera modificación involucrando la sentencia try ... except ... soluciona este problema:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue, 5))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Process interrupted')
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')

Concurrencia efectiva

Como hemos visto en la lección anterior, no basta con utilizar async y await para obtener una concurrencia efectiva. Si esperamos, por el publicador, a que termine y luego por el consumidor ¿qué diferencia hay con la versión síncrona?

Vamos a eliminar algunos bloqueos. Actualiza el código fuente de la siguiente forma:

import uuid

...

@dataclass
class PubSubMessage:
    instance_name: str
    message_id: str = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'

...

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f'Published message {msg}')
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

...

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Process interrupted')
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')

Para empezar, ahora publish() no deja de producir mensajes (también hemos adaptado main() para no pasar el número de mensajes que han de producirse). Pero el cambio más relevante es que ya no esperamos a que la publicación del mensaje termine.

En vez de utilizar await, que "aguanta" la corrutina hasta que se publique el mensaje, con asyncio.create_task estamos queriendo decir "que planifique la publicación y continúe".

Lo que sí que hacemos es esperar un pequeño tiempo al azar para simular la sobrecarga de publicar el mensaje. Para forzar que algunas publicaciones tomen más que otras.

¡Ahora somos asíncronos! ¿Se te ocurre cómo podríamos darnos cuenta? Planificando más tareas es la resupuesta pero... ¿podrías planificar más de un publicador?

Cuando te convenzas de que tenemos un publicador asíncrono, cambia el consumidor para que también sea asíncrono, no teniendo que esperar por el reinicio de la máquina.

Lanza tu código y verás cómo se intercalan los mensajes del publicador, el consumidor y el reiniciador, y no siempre en el mismo orden. La finalización fuera de orden es una característica típica de la asincronía.

Gestión avanzada de mensajes

El contenido de tu fichero mayhem.py debería ser algo así, en estos momentos:

import asyncio
import logging
import random
import string
import uuid
from dataclasses import dataclass, field

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def restart_host(msg):
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')


async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f'Published message {msg}')
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        # the publisher emits None to indicate that it is done
        if msg is None:
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        asyncio.create_task(restart_host(msg))


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Process interrupted')
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')


if __name__ == '__main__':
    main()

Pongamos que además de reiniciar la máquina, queremos guardar el mensaje y, finalmente, realizar algo de limpieza y marcarlo como procesado. Modifica la clase PubSubMessage y añade las siguientes corrutinas:

@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)
    saved: bool = field(repr=False, default=False)
    ack: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def save(msg):
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f'Saved {msg}')


async def cleanup(msg):
    msg.ack = True
    logging.info(f'Done. Acked {msg}')


async def handle_message(msg):
    """Replace this with your handling code."""

Ahora rellena handle_message() para que salve, reinicie y, finalmente, limpie el mensaje. Limpiar es lo único que necesariamente tiene que ocurrir tras salvar y reiniciar, respecto a salvar y reiniciar, no importa el orden.

Acuérdate de reemplazar restart_host() por handle_message() en el consumidor.

Intenta hacerlo lo más asíncrono posible. Recuerda que tienes a tu disposición asyncio.create_task(), asyncio.gather() y await.

Gestión de señales y finalización del bucle

El código de tu fichero mayhem.py debería tener esta pinta:

import asyncio
import logging
import random
import string
import uuid
from dataclasses import dataclass, field

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)
    saved: bool = field(repr=False, default=False)
    ack: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def save(msg):
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f'Saved {msg}')


async def cleanup(msg):
    msg.ack = True
    logging.info(f'Done. Acked {msg}')


async def handle_message(msg):
    await asyncio.gather(restart_host(msg), save(msg))
    await cleanup(msg)


async def restart_host(msg):
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')


async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f'Published message {msg}')
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        # process the msg
        logging.info(f'Consumed {msg}')
        asyncio.create_task(handle_message(msg))


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Process interrupted')
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')


if __name__ == '__main__':
    main()

Si ejecutas el código, y lo interrumpes, verás varios mensajes de error acerca de tareas planificadas pero nunca ejecutadas. Si corres el programa desde una terminal y le mandas un mensaje SIGQUIT (ctrl + 4 o ctrl + \ si el layout del teclado es inglés), verás que la clausula finally ni se ejecuta.

Gestionar señales es una capacidad del bucle de eventos. Escribe primero el código de finalización:

async def shutdown(loop, signal):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')

    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks)

    logging.info(f'Flushing metrics')
    loop.stop()

Los distintos logging.info() no hacen nada, están ahí para ilustrar las muchas operaciones que haríamos durante la finalización del servicio.

Ahora modifica el método main() para añadir este manejador a distintas señales:

import signal

...

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, s)))

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')

Los manajeadores de señales se añaden con el método loop.add_signal_handler(). El propio manejador programa una nueva tarea al bucle de eventos, que se encargará de finalizarlo convenientemente. Fíjate también en la lambda, donde tenemos que dar el valor por defecto a s para evitar los efectos del enlazamiento tardío (late binding) de variables.

Ahora ejecuta el código y mira qué pasa al hacer ctrl + 4 o ctrl + c. ¿Notas algo extraño? Corrígelo añadiendo el parámetro return_exceptions=True a la función asyncio.gather() de la corrutina shutdown().

¿Qué ha pasado? Lo que ha pasado es que en shutdown() recogemos todas las tareas menos la actual (que es precisamente shutdown()) y las cancelamos. Esto deja las tareas en un estado de "rechazo por cancelación". La expresión await asyncio.gather() lanza una excepción al encontrar la primera tarea rechazada, lo que interrumpe la ejecución en este punto y por eso no vemos el mensaje referente a las métricas.

Sin embargo, el bucle de eventos está pensado para ser tolerante a las excepciones, así que "engulle" y "silencia" la excepción, permitiendo que continúe.

La segunda vez que envíamos una señal, el manejador programará otra tarea shutdown() que, cuando le toque (presumiblemente de manera inmediata, pues todo lo demás fué cancelado), recogerá las tareas pendientes que no sean la actual, es decir, ninguna y, por tanto, el bucle for no cancelará nada, la expresión await asyncio.gather() no esperará por nada, alcanzaremos la instrucción loop.stop()y, finalmente, llegaremos a la claúsula finally con la que finalizará la ejecución del programa satisfactoriamente.

Bastaría con que hubiéramos envuelto await asyncio.gather() en un bloque try ... except ... para silenciar la excepción y permitir a shutdown() progresar. Sin embargo, pasamos a asyncio.gather() el parámetro return_exceptions=True. Con esto conseguimos que la función no lance, sino que interprete las excepciones como posibles resultados. Veremos para qué sirve esto en un par de secciones.

Manejo de excepciones

Para este momento, tu fichero mayhem.py debería parecerse a:

import asyncio
import logging
import random
import string
import uuid
import signal
from dataclasses import dataclass, field

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)
    saved: bool = field(repr=False, default=False)
    ack: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def save(msg):
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f'Saved {msg}')


async def cleanup(msg):
    msg.ack = True
    logging.info(f'Done. Acked {msg}')


async def handle_message(msg):
    await asyncio.gather(restart_host(msg), save(msg))
    await cleanup(msg)


async def restart_host(msg):
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')


async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f'Published message {msg}')
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        # process the msg
        logging.info(f'Consumed {msg}')
        asyncio.create_task(handle_message(msg))


async def shutdown(loop, signal):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')

    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)

    logging.info(f'Flushing metrics')
    loop.stop()


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, s)))

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')


if __name__ == '__main__':
    main()

Vamos a modificar el consumidor para lanzar una excepción de vez en cuando:

async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        # randomly fail consuming
        if random.randint(1, 10) == 5:
            raise Exception(f'Could not consume {msg.hostname}')

        # process the msg
        logging.info(f'Consumed {msg}')
        asyncio.create_task(handle_message(msg))

Ejecuta el programa y fíjate cómo se comporta. Tras la primera excepción, el consumidor deja de funcionar y sólo se publicarán mensajes. Además, se registra un error indicando que no hemos recuperado la excepción que se ha producido.

Lo que quisiéramos es tomar alguna acción controlada, como reiniciar el consumidor o apagar el sistema. Para ello conviene instalar un manejador de excepciones global. Modifica las funciones handle_exception y shutdown:

def handle_exception(loop, context):
    # context['message'] will always be there; but context['exception'] may not
    msg = context.get('exception', context['message'])
    logging.error(f'Caught exception: {msg}')
    logging.info('Shutting down...')
    asyncio.create_task(shutdown(loop))


async def shutdown(loop, signal=None):
    if signal:
        logging.info(f'Received exit signal {signal.name}...')

    logging.info('Closing database connections')

    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)

    logging.info(f'Flushing metrics')
    loop.stop()

Hemos hecho el parámetro signal optativo para poder diferenciar cuando estamos apagando porque nos lo han pedido desde fuera (signal será algo distinto de None) o porque lo hemos decidido en el manejador global.

Falta registrar la corrutina como manejador con el método set_exception_handler().:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
    loop.set_exception_handler(handle_exception)

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')

Ejecuta el código y observa cómo finaliza, grácilmente, ante una excepción. ¿Podrías modificar el código para que, en vez de finalizar, reiniciáramos el consumidor? Crea una subclase de Exception con la información que necesites para llevar a cabo el reinicio.

Gestionando excepciones individualmente

Si todo ha ido bien, tu código en mayhem.py debería lucir tal que...

import asyncio
import logging
import random
import string
import uuid
import signal
from dataclasses import dataclass, field

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)
    saved: bool = field(repr=False, default=False)
    ack: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


class RestartFailed(Exception):
    pass


async def save(msg):
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f'Saved {msg}')


async def cleanup(msg):
    msg.ack = True
    logging.info(f'Done. Acked {msg}')


async def handle_message(msg):
    await asyncio.gather(restart_host(msg), save(msg))
    await cleanup(msg)


async def restart_host(msg):
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')


async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f'Published message {msg}')
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        if random.randint(1, 10) == 5:
            raise Exception(f'Could not consume {msg.hostname}')

        # process the msg
        logging.info(f'Consumed {msg}')
        asyncio.create_task(handle_message(msg))


def handle_exception(loop, context):
    # context['message'] will always be there; but context['exception'] may not
    msg = context.get('exception', context['message'])
    logging.error(f'Caught exception: {msg}')
    logging.info('Shutting down...')
    asyncio.create_task(shutdown(loop))


async def shutdown(loop, signal=None):
    if signal:
        logging.info(f'Received exit signal {signal.name}...')

    logging.info('Closing database connections')

    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)

    logging.info(f'Flushing metrics')
    loop.stop()


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
    loop.set_exception_handler(handle_exception)

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')


if __name__ == '__main__':
    main()

Imagina ahora que queremos un control más fino sobre las excepciones que se producen durante el manejo del mensaje. Crea las siguientes clases:

class SaveError(Exception):
    pass


class RestartError(Exception):
    pass

Y modifica las corrutinas restart_host y save para que fallen de vez en cuando:

async def save(msg):
    await asyncio.sleep(random.random())
    if random.randint(1, 5) == 3:
        raise SaveError(f'Could not save msg {msg}')

    msg.saved = True
    logging.info(f'Saved {msg}')


async def restart_host(msg):
    await asyncio.sleep(random.random())
    if random.randint(1, 5) == 3:
        raise RestartError(f'Could not save msg {msg}')
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')

La clave radica en el parámetro return_exceptions=True. Esta opción hace que las excepciones se devuelvan como resultados. Podemos modificar la corrutina handle_message() para que delegue en otra corrutina handle_results() la gestión de errores:

async def handle_message(msg):
    results = await asyncio.gather(
        restart_host(msg), save(msg), return_exceptions=True)
    asyncio.create_task(handle_results(results, msg))
    await cleanup(msg)


async def handle_results(results, msg):
    for result in results:
        if isinstance(result, SaveError):
            logging.error(f'Saving msg {msg} failed.')

        elif isinstance(result, RestartError):
            logging.warning(f'Retrying restarting host: {msg.hostname}')

        elif isinstance(result, Exception):
            logging.error(f'Handling general error: {result}')

Conclusiones

Este es el aspecto final del código en mayhem.py:

import asyncio
import logging
import random
import string
import uuid
import signal
from dataclasses import dataclass, field

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


@dataclass
class PubSubMessage:
    instance_name: str
    message_id: int = field(repr=False)
    hostname: str = field(repr=False, init=False)
    restarted: bool = field(repr=False, default=False)
    saved: bool = field(repr=False, default=False)
    ack: bool = field(repr=False, default=False)

    def __post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


class SaveError(Exception):
    pass


class RestartError(Exception):
    pass


async def save(msg):
    await asyncio.sleep(random.random())
    if random.randint(1, 5) == 3:
        raise SaveError(f'Could not save msg {msg}')

    msg.saved = True
    logging.info(f'Saved {msg}')


async def cleanup(msg):
    msg.ack = True
    logging.info(f'Done. Acked {msg}')


async def handle_message(msg):
    results = await asyncio.gather(restart_host(msg), save(msg), return_exceptions=True)
    asyncio.create_task(handle_results(results, msg))
    await cleanup(msg)


async def handle_results(results, msg):
    for result in results:
        if isinstance(result, SaveError):
            logging.error(f'Saving msg {msg} failed.')

        elif isinstance(result, RestartError):
            logging.warning(f'Retrying restarting host: {msg.hostname}')

        elif isinstance(result, Exception):
            logging.error(f'Handling general error: {result}')


async def restart_host(msg):
    await asyncio.sleep(random.random())
    if random.randint(1, 5) == 3:
        raise RestartError(f'Could not save msg {msg}')
    msg.restarted = True
    logging.info(f'Restarted {msg.hostname}')


async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f'Published message {msg}')
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        if random.randrange(1, 10) == 1:
            raise Exception(f'Could not consume {msg.hostname}')

        # process the msg
        logging.info(f'Consumed {msg}')
        asyncio.create_task(handle_message(msg))


def handle_exception(loop, context):
    # context['message'] will always be there; but context['exception'] may not
    msg = context.get('exception', context['message'])
    logging.error(f'Caught exception: {msg}')
    logging.info('Shutting down...')
    asyncio.create_task(shutdown(loop))


async def shutdown(loop, signal=None):
    if signal:
        logging.info(f'Received exit signal {signal.name}...')

    logging.info('Closing database connections')

    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)

    logging.info(f'Flushing metrics')
    loop.stop()


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, s)))
    loop.set_exception_handler(handle_exception)

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info('Successfully shutdown the Mayhem service.')


if __name__ == '__main__':
    main()

A lo largo del camino has aprendido a:

  1. Planificar tareas sin tener que esperar por ellas.
  2. Gestionar señales de interrupción externas.
  3. Manejar excepciones de forma global.
  4. Convertir excepciones en resultados y tomar decisiones en base a ellas.

Este ejercicio tan sólo cubre las primeras tres partes del tutorial en profundidad de Lynn Root y ni siquiera de forma completa. asyncio es una biblioteca compleja y aun quedan por cubrir algunos aspectos extra: