Skip to content

Commit

Permalink
post_queue async method to hook up any functionality needed after job…
Browse files Browse the repository at this point in the history
… was queued. Closes #6
  • Loading branch information
robinharms committed Oct 28, 2024
1 parent bdfcae0 commit c311e96
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

## dev

* `DeferredJob` messages now enqueued with fully qualified name so they're traceable.
* `DeferredJob` now enqueued with fully qualified name so they're traceable.
* `DeferredJob` results returned now stored as RQ result.
* `DeferredJob` classes can now set RQ defaults message wide via `ttl`, `result_ttl`, `job_timeout`, `failure_ttl`.
* `DeferredJob` can now set RQ defaults message wide via `ttl`, `result_ttl`, `job_timeout`, `failure_ttl`.
* `DeferredJob` async method `post_queue` similar to `pre_queue` but with actual job passed along.

## 1.0.4 (2024-10-07)

Expand Down
3 changes: 2 additions & 1 deletion envelope/deferred_jobs/async_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ async def queue_deferred_job(
if isinstance(message, DeferredJob):
await message.pre_queue(consumer=consumer, **kwargs)
if message.should_run:
message.enqueue()
job = message.enqueue()
consumer.last_job = now()
await message.post_queue(job=job, consumer=consumer, **kwargs)
9 changes: 8 additions & 1 deletion envelope/deferred_jobs/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from envelope.utils import websocket_send_error

if TYPE_CHECKING:
from envelope.consumers.websocket import WebsocketConsumer
from django.db.models import Model
from rq.job import Job

_marker = object()

Expand All @@ -43,7 +45,7 @@ class DeferredJob(Message, ABC):
on_worker: bool = False
should_run: bool = True # Mark as false to abort run

async def pre_queue(self, **kwargs):
async def pre_queue(self, *, consumer: WebsocketConsumer, **kwargs):
"""
Do something before entering the queue. Only applies to when the consumer receives the message.
It's a good idea to avoid using this if it's not needed.
Expand Down Expand Up @@ -136,6 +138,11 @@ def enqueue(self, queue: Queue | None = None, **kwargs):
**kwargs,
)

async def post_queue(self, *, job: Job, consumer: WebsocketConsumer, **kwargs):
"""
Do something after entering the queue. Only called if the message was actually added to the queue.
"""

@abstractmethod
def run_job(self):
"""
Expand Down
13 changes: 8 additions & 5 deletions envelope/deferred_jobs/tests/test_async_signals.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from unittest import mock
from unittest.mock import patch

from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -134,16 +135,15 @@ async def test_maybe_update_connection(self):
self.assertIsInstance(connection.last_action, datetime)

async def test_queue_deferred_job(self):

self.mock_consumer.user_pk = self.user.pk

msg = Subscribe(
mm={"user_pk": self.user.pk, "env": WS_INCOMING},
channel_type="user",
pk=self.user.pk,
)

self.mock_consumer.user_pk = self.user.pk
# self.mock_consumer.connection_update_interval = 10
# self.user = self.user

msg.post_queue = mock.AsyncMock()
with patch(
"django_rq.queues.get_redis_connection",
return_value=self.fake_redis_conn,
Expand All @@ -168,6 +168,9 @@ async def test_queue_deferred_job(self):
},
response.data.dict(),
)
self.assertTrue(msg.post_queue.called)
self.assertIn("job", msg.post_queue.mock_calls[0].kwargs)
self.assertIn("consumer", msg.post_queue.mock_calls[0].kwargs)

async def test_queue_deferred_job_wrong_msg_type(self):
self.mock_consumer.user_pk = self.user.pk
Expand Down

0 comments on commit c311e96

Please sign in to comment.