Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis namespace not properly combined with abort and cancel keys #26

Open
huwylphimet opened this issue Apr 11, 2024 · 4 comments
Open

Comments

@huwylphimet
Copy link
Contributor

huwylphimet commented Apr 11, 2024

When abort() or cancel() is called, a key is pushed on to redis. That key seems to be named:
redis namespace + dramatiq message_id but the namespace is not properly separated by a column (':') with the message_id, see here.

For example if my redis namespace = "myapp" and the dramatiq message_id = "task123" then aborting that message will push a key "myappabort:task123" instead of "myapp:abort:task123" (notice the column just after the namespace)

Are the abort / cancel key names expected to be like that?

Also in the middleware, before processing the message, the abort / cancel key will be checked, see here.
Is the redis namespace taken into account here?

Does aborting / cancelling dramatiq message works when a custom redis namespace has been defined?

@dp-alvarez
Copy link
Contributor

dp-alvarez commented Apr 12, 2024

When I implemented it I expected for users to bring their own colons, as you can see in here. I guess it would be better for the library to give you a free one.

Both the cases you provided should work with namespaces as the namespace is applied by the backend at the _encode_key function, but you are welcome to test it.

@huwylphimet
Copy link
Contributor Author

huwylphimet commented Apr 12, 2024

Ah yes I see, thank you for pointing me these facts. So in that case it is free to add any separator like a column or not when defining the namespace. This is fine I think too. It just differs from other middleware like ResultMiddleware where the column separator seems to be hardcoded between the namespace and the (rest of the) key, see here.

Now I have one additional question: I saw that the requests (e.g. abort) are identified by threads, see here and here.

I guess this technic would not survive a reboot of the worker handling the actor since the thread would be restarted and get a new threading instance.
Is it somehow possible to override that get_current_thread() method to return some persistent worker/actor identifier in order any actor would still abort any message where pending abort keys exists even after a reboot of the worker?

@isra17
Copy link
Member

isra17 commented Apr 12, 2024

The thread identifier is not stored in Redis, it's in process memory. If the worker reboot, it's going to rebuild this mapping with the new threads. In short, as a worker process messages, it keeps a mapping of message => thread, so that it know what thread to kill. If the worker reboot, it will pick up the message again and rebuild the mapping as it picks them up.

When I implemented it I expected for users to bring their own colons, as you can see in here. I guess it would be better for the library to give you a free one.

I guess it can make sense, consistency is good. PR are welcome.

@huwylphimet
Copy link
Contributor Author

huwylphimet commented Apr 12, 2024

Actually, if I understand it correctly, calling abort() does not delete the dramatiq message on the redis hash queue but "only" signal actors to abort / cancel the current task.
If I want the worker to not retry that message, I should (try to) gracefully end the task within the actor by catching the Abort exception raised or whenabort_requested() returns a positive value in order the message get "marked" as "handled" (meaning will be popped out the queue) by dramatiq.
But in case the worker would never catch the abort / cancel message because it could be down, then if, after the abort timeout has expired, I want to delete the remaining dramatiq message that has been aborted, I would have to do it manually on the message emitter side like that for example (since I only found a way in dramatiq to flush a queue but not a single messages):

redis_msg_id = message.options.get("redis_message_id", None)
redis_queue = f"{broker.namespace}:{message.queue_name}"
broker.client.lrem(redis_queue, 0, redis_msg_id)
broker.client.hdel(f"{redis_queue}.msgs", redis_msg_id)

I guess this feature of "force-deleting" the dramatiq message after aborting timeout has been reached would be out of scope of the abort middleware right?

As for the column suffix after the redis namespace, I'm finally not sure anymore it is needed to be changed. I guess that column is only some commonly seen convention but no standard rule, so maybe it should rather be un-harcoded in the dramatiq lib, I don't know...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants