Skip to content

Commit

Permalink
fix: verify is sink enabled before sending celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Oct 4, 2024
1 parent 6501bcb commit 870b69d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
21 changes: 19 additions & 2 deletions platform_plugin_aspects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TaxonomySink,
UserProfileSink,
UserRetirementSink,
CourseOverviewSink
)
from platform_plugin_aspects.utils import get_model

Expand All @@ -36,6 +37,9 @@ def receive_course_publish( # pylint: disable=unused-argument # pragma: no cov
dump_course_to_clickhouse,
)

if not CourseOverviewSink.is_enabled():
return

dump_course_to_clickhouse.delay(str(course_key))


Expand All @@ -51,11 +55,14 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra

user = kwargs.get("user")
course_id = kwargs.get("course_id")
sink = CourseEnrollmentSink(None, None)

CourseEnrollment = get_model("course_enrollment")
if not CourseEnrollmentSink.is_enabled():
return

CourseEnrollment = sink.get_model()
instance = CourseEnrollment.objects.get(user=user, course_id=course_id)

sink = CourseEnrollmentSink(None, None)

dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
Expand Down Expand Up @@ -90,6 +97,8 @@ def on_user_profile_updated(instance, **kwargs):
object_id=str(instance.id),
)

if not UserProfileSink.is_enabled():
return
transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs))


Expand Down Expand Up @@ -130,6 +139,8 @@ def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
object_id=str(instance.id),
)

if not ExternalIdSink.is_enabled():
return
transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs))


Expand Down Expand Up @@ -188,6 +199,8 @@ def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
object_id=str(instance.id),
)

if not TagSink.is_enabled():
return
transaction.on_commit(lambda: on_tag_saved(*args, **kwargs))


Expand Down Expand Up @@ -226,6 +239,8 @@ def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
object_id=str(instance.id),
)

if not TaxonomySink.is_enabled():
return
transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs))


Expand Down Expand Up @@ -264,6 +279,8 @@ def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover

on_object_tag_deleted(sender, instance, **kwargs)

if not ObjectTagSink.is_enabled():
return
transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs))


Expand Down
28 changes: 12 additions & 16 deletions platform_plugin_aspects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@ def dump_course_to_clickhouse(course_key_string, connection_overrides=None):
connection_overrides (dict): overrides to ClickHouse connection
parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`.
"""
if CourseOverviewSink.is_enabled(): # pragma: no cover
course_key = CourseKey.from_string(course_key_string)
sink = CourseOverviewSink(
connection_overrides=connection_overrides, log=celery_log
)
sink.dump(course_key)
course_key = CourseKey.from_string(course_key_string)
sink = CourseOverviewSink(
connection_overrides=connection_overrides, log=celery_log
)
sink.dump(course_key)

ccx_courses = get_ccx_courses(course_key)
for ccx_course in ccx_courses:
ccx_course_key = str(ccx_course.locator)
sink.dump(ccx_course_key)
ccx_courses = get_ccx_courses(course_key)
for ccx_course in ccx_courses:
ccx_course_key = str(ccx_course.locator)
sink.dump(ccx_course_key)


@shared_task
Expand All @@ -56,9 +55,6 @@ def dump_data_to_clickhouse(
"""
Sink = getattr(import_module(sink_module), sink_name)

if Sink.is_enabled():
sink = Sink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(object_id)
return "Dumped"

return "Disabled"
sink = Sink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(object_id)
return "Dumped"

0 comments on commit 870b69d

Please sign in to comment.