Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wait for transaction commit before trying to sink models #90

Merged
merged 8 commits into from
Oct 4, 2024
210 changes: 129 additions & 81 deletions platform_plugin_aspects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.db import transaction
from django.db.models.signals import post_delete, post_save
from django.dispatch import Signal, receiver
from opaque_keys import InvalidKeyError

from platform_plugin_aspects.sinks import (
CourseEnrollmentSink,
Expand Down Expand Up @@ -63,34 +64,33 @@
)


def on_user_profile_updated(instance):
"""
Queues the UserProfile dump job when the parent transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = UserProfileSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)


def on_user_profile_updated_txn(**kwargs):
def on_user_profile_updated_txn(*args, **kwargs):
"""
Handle user_profile saves in the middle of a transaction.

Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
transaction.on_commit(
lambda: on_user_profile_updated(kwargs["instance"])
) # pragma: no cover

def on_user_profile_updated(instance, **kwargs):
"""
Queues the UserProfile dump job when the parent transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = UserProfileSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs))

Check failure on line 93 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on lines 77-93


# Connect the UserProfile.post_save signal handler only if we have a model to attach to.
Expand All @@ -102,30 +102,42 @@
) # pragma: no cover


def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_externalid_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.

Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ExternalIdSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ExternalIdSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs))


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_external_id = get_model("external_id")
if _external_id:
post_save.connect(on_externalid_saved, sender=_external_id) # pragma: no cover
post_save.connect(on_externalid_saved_txn, sender=_external_id) # pragma: no cover


@receiver(USER_RETIRE_LMS_MISC)
Expand All @@ -148,75 +160,111 @@
)


def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_tag_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.

Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_tag_saved(*args, **kwargs))

Check failure on line 191 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on line 191


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_tag = get_model("tag")
if _tag:
post_save.connect(on_tag_saved, sender=_tag) # pragma: no cover
post_save.connect(on_tag_saved_txn, sender=_tag) # pragma: no cover


def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_taxonomy_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.

Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TaxonomySink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TaxonomySink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs))

Check failure on line 229 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on line 229


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_taxonomy = get_model("taxonomy")
if _taxonomy:
post_save.connect(on_taxonomy_saved, sender=_taxonomy) # pragma: no cover
post_save.connect(on_taxonomy_saved_txn, sender=_taxonomy) # pragma: no cover


def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover
def on_object_tag_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.

Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ObjectTagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ObjectTagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

on_object_tag_deleted(sender, instance, **kwargs)

on_object_tag_deleted(sender, instance, **kwargs)
transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs))

Check failure on line 267 in platform_plugin_aspects/signals.py

View workflow job for this annotation

GitHub Actions / tests (ubuntu-24.04, 3.11, django42)

Missing coverage

Missing coverage on line 267


def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cover
Expand All @@ -235,13 +283,13 @@
try:
CourseOverview.objects.get(id=instance.object_id)
dump_course_to_clickhouse.delay(instance.object_id)
except CourseOverview.DoesNotExist:
except (CourseOverview.DoesNotExist, InvalidKeyError):
pass


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_object_tag = get_model("object_tag")
if _object_tag: # pragma: no cover
post_save.connect(on_object_tag_saved, sender=_object_tag)
post_save.connect(on_object_tag_saved_txn, sender=_object_tag)
post_delete.connect(on_object_tag_deleted, sender=_object_tag)
17 changes: 5 additions & 12 deletions platform_plugin_aspects/tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from django.test import TestCase

from platform_plugin_aspects.signals import (
on_externalid_saved,
on_externalid_saved_txn,
on_user_retirement,
receive_course_publish,
)
from platform_plugin_aspects.sinks.external_id_sink import ExternalIdSink
from platform_plugin_aspects.sinks.user_retire_sink import UserRetirementSink


Expand All @@ -31,22 +30,16 @@ def test_receive_course_publish(self, mock_dump_task):

mock_dump_task.delay.assert_called_once_with(course_key)

@patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse")
def test_on_externalid_saved(self, mock_dump_task):
@patch("platform_plugin_aspects.signals.transaction")
def test_on_externalid_saved(self, mock_transaction):
"""
Test that on_externalid_saved calls dump_data_to_clickhouse.
"""
instance = Mock()
sender = Mock()
on_externalid_saved(sender, instance)

sink = ExternalIdSink(None, None)
on_externalid_saved_txn(sender, instance)

mock_dump_task.delay.assert_called_once_with(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
mock_transaction.on_commit.assert_called_once()

@patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse")
def test_on_user_retirement(self, mock_dump_task):
Expand Down
Loading