Skip to content

Commit

Permalink
refactor(sink): move signals from generic txn wrapper to function in …
Browse files Browse the repository at this point in the history
…functions
  • Loading branch information
Ian2012 committed Sep 26, 2024
1 parent dd76076 commit a44c39c
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 98 deletions.
210 changes: 123 additions & 87 deletions platform_plugin_aspects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,73 +64,78 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra
)


def on_user_profile_updated(instance, **kwargs):
"""
Queues the UserProfile dump job when the parent transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = UserProfileSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)


def on_txn_wrapper(func, *args, **kwargs):
def on_user_profile_updated_txn(**kwargs):
"""
Handle user_profile saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""

def on_txn(**kwargs):
transaction.on_commit(lambda: func(*args, **kwargs))
def on_user_profile_updated(instance):
"""
Queues the UserProfile dump job when the parent transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

return on_txn
sink = UserProfileSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_user_profile_updated(**kwargs))

Check failure on line 92 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on lines 76-92


# Connect the UserProfile.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_user_profile = get_model("user_profile")
if _user_profile:
post_save.connect(
on_txn_wrapper(on_user_profile_updated), sender=_user_profile
on_user_profile_updated_txn, sender=_user_profile
) # pragma: no cover


def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_externalid_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ExternalIdSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ExternalIdSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs))


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_external_id = get_model("external_id")
if _external_id:
post_save.connect(
on_txn_wrapper(on_externalid_saved), sender=_external_id
) # pragma: no cover
post_save.connect(on_externalid_saved_txn, sender=_external_id) # pragma: no cover


@receiver(USER_RETIRE_LMS_MISC)
Expand All @@ -153,77 +158,108 @@ def on_user_retirement( # pylint: disable=unused-argument # pragma: no cover
)


def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_tag_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_tag_saved(*args, **kwargs))

Check failure on line 188 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on line 188


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_tag = get_model("tag")
if _tag:
post_save.connect(on_txn_wrapper(on_tag_saved), sender=_tag) # pragma: no cover
post_save.connect(on_tag_saved_txn, sender=_tag) # pragma: no cover


def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_taxonomy_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TaxonomySink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TaxonomySink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs))

Check failure on line 225 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on line 225


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_taxonomy = get_model("taxonomy")
if _taxonomy:
post_save.connect(
on_txn_wrapper(on_taxonomy_saved), sender=_taxonomy
) # pragma: no cover
post_save.connect(on_taxonomy_saved_txn, sender=_taxonomy) # pragma: no cover


def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover
def on_object_tag_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ObjectTagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ObjectTagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

on_object_tag_deleted(sender, instance, **kwargs)

on_object_tag_deleted(sender, instance, **kwargs)
transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs))

Check failure on line 262 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on line 262


def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cover
Expand All @@ -250,5 +286,5 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove
# (prevents celery errors during tests)
_object_tag = get_model("object_tag")
if _object_tag: # pragma: no cover
post_save.connect(on_txn_wrapper(on_object_tag_saved), sender=_object_tag)
post_delete.connect(on_txn_wrapper(on_object_tag_deleted), sender=_object_tag)
post_save.connect(on_object_tag_saved_txn, sender=_object_tag)
post_delete.connect(on_object_tag_deleted, sender=_object_tag)
16 changes: 5 additions & 11 deletions platform_plugin_aspects/tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.test import TestCase

from platform_plugin_aspects.signals import (
on_externalid_saved,
on_externalid_saved_txn,
on_user_retirement,
receive_course_publish,
)
Expand All @@ -31,22 +31,16 @@ def test_receive_course_publish(self, mock_dump_task):

mock_dump_task.delay.assert_called_once_with(course_key)

@patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse")
def test_on_externalid_saved(self, mock_dump_task):
@patch("platform_plugin_aspects.signals.transaction")
def test_on_externalid_saved(self, mock_transaction):
"""
Test that on_externalid_saved calls dump_data_to_clickhouse.
"""
instance = Mock()
sender = Mock()
on_externalid_saved(sender, instance)

sink = ExternalIdSink(None, None)
on_externalid_saved_txn(sender, instance)

mock_dump_task.delay.assert_called_once_with(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
mock_transaction.on_commit.assert_called_once()

@patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse")
def test_on_user_retirement(self, mock_dump_task):
Expand Down

0 comments on commit a44c39c

Please sign in to comment.