Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async confluent-kafka producer #775

Merged
merged 2 commits into from
Jan 22, 2024

Conversation

matyaskuti
Copy link
Contributor

@matyaskuti matyaskuti commented Dec 6, 2023

About this change - What it does

  • Create a wrapper around the existing, confluent-kafka based KafkaProducer with an async send method and a poll-thread to send messages continuously in the background
  • Dropping the usage of an SSL context for the async producer, as confluent-kafka has no support for this
  • Change the REST web application shutdown hook from on_cleanup to on_shutdown - this was an existing bug preventing any kind of closing of resources (Kafka clients, consumer manager, etc.) to actually run, further explanation in commit message

Why this way

@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch from a5e4bbe to 7e6d22f Compare December 6, 2023 12:52
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_consumer branch from 4740bab to 0112a8c Compare December 6, 2023 12:55
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch 2 times, most recently from 56642a0 to cdd9907 Compare December 6, 2023 13:03
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_consumer branch from 0112a8c to 981fa69 Compare December 6, 2023 13:23
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch from cdd9907 to 3a2fa65 Compare December 6, 2023 15:05
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_consumer branch from 981fa69 to 0039127 Compare December 6, 2023 15:05
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch 2 times, most recently from f112c8f to 334035e Compare December 6, 2023 15:11
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_consumer branch from 0039127 to f86d68d Compare December 7, 2023 11:52
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch from 334035e to f5e9ef3 Compare December 7, 2023 11:53
@matyaskuti matyaskuti changed the title Implement async confluent-kafka Kafka producer Implement async confluent-kafka producer Dec 8, 2023
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch 5 times, most recently from 096fda4 to 8eeade1 Compare December 8, 2023 10:10
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_consumer branch 13 times, most recently from 8e0f0d4 to e91aafa Compare December 13, 2023 12:51
Copy link
Contributor

@eliax1996 eliax1996 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel confident in merging once you reply to my doubts :)

karapace/kafka/producer.py Outdated Show resolved Hide resolved
karapace/kafka/producer.py Outdated Show resolved Hide resolved
@aiven-anton
Copy link
Contributor

Dropping the usage of an SSL context for the async producer, as confluent-kafka has no support for this

Just to make sure this doesn't become a future blocker, it could perhaps be worth experimenting with handling SSL external to confluent-kafka, and making it communicate through an already setup TLS connection.

karapace/kafka/producer.py Show resolved Hide resolved
karapace/kafka/producer.py Outdated Show resolved Hide resolved
tests/integration/kafka/test_producer.py Outdated Show resolved Hide resolved
tests/integration/kafka/test_producer.py Outdated Show resolved Hide resolved
tests/integration/kafka/test_producer.py Show resolved Hide resolved
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch 4 times, most recently from 3a3f265 to 5787118 Compare January 18, 2024 15:12
karapace/kafka_rest_apis/__init__.py Outdated Show resolved Hide resolved
karapace/kafka_rest_apis/__init__.py Outdated Show resolved Hide resolved
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_asyncio branch 11 times, most recently from 89db3f9 to 19d8b16 Compare January 22, 2024 07:41
aiven-anton
aiven-anton previously approved these changes Jan 22, 2024
Copy link
Contributor

@aiven-anton aiven-anton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I added a nit-level comment, but I think this is mergable as-is. Nice work!

karapace/kafka/producer.py Outdated Show resolved Hide resolved
Mátyás Kuti added 2 commits January 22, 2024 11:01
THe async `AIOKafkaProducer` is implemented as a wrapper around
`KafkaProducer` with an async `send` method and a poll-thread to
continuously send messages in the background, making the result of
`send` an awaitable `asyncio.Future`.

The example of confluent-kafka has been followed:
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/asyncio_example.py
REST webapp improvement:

Using `on_cleanup` is not the correct hook to use here, as this would
run _after_ the event loop has closed, making it unsuitable for
cancelling background tasks for example. `on_shutdown` is triggered
before the REST app shuts down, thus it's able to clean up eg. Kafka
clients, background tasks, etc. properly.

Before this change, the symptom of the bug is most prevalent in the
Karapace REST proxy and its "idle proxy janitor" background task.
Stopping the application when the janitor task is not running is
straightforward, however when any `UserRestProxy` is present (ie. some
requests have already been handled) and the task is running, stopping
the REST proxy hangs or needs multiple signals to shut down.
With the new `AIOKafkaProducer` implementation (which runs a poll-thread
in the background) this results in an application that is unable to
gracefully shutdown, only SIGKILL works.

Using the `on_shutdown` hook fixes this issue, as we still have an event
loop available to be able to cancel background tasks, etc.
@aiven-anton aiven-anton merged commit 4698577 into main Jan 22, 2024
8 checks passed
@aiven-anton aiven-anton deleted the matyaskuti/confluent_kafka_asyncio branch January 22, 2024 10:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants