From 49d028b70cc7d0431446b398c3d686f48f6b4aa7 Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Fri, 4 Oct 2024 15:45:34 -0400 Subject: [PATCH 1/3] fix: Event sink errors when enabled models are missing When event sinks are enabled in the config or waffle flag, but the model itself doesn't exist we were throwing errors that caused downstream errors. We now warn on this condition and return the model as disabled instead. --- platform_plugin_aspects/sinks/base_sink.py | 13 +++++++++++-- .../sinks/tests/test_base_sink.py | 6 ++++-- .../sinks/tests/test_course_overview_sink.py | 5 ++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/platform_plugin_aspects/sinks/base_sink.py b/platform_plugin_aspects/sinks/base_sink.py index 7e80ea2..155498e 100644 --- a/platform_plugin_aspects/sinks/base_sink.py +++ b/platform_plugin_aspects/sinks/base_sink.py @@ -5,6 +5,7 @@ import csv import datetime import io +import logging from collections import namedtuple import requests @@ -342,7 +343,7 @@ def is_enabled(cls): """ Return True if the sink is enabled, False otherwise """ - enabled = getattr( + enabled_settings = getattr( settings, f"{WAFFLE_FLAG_NAMESPACE.upper()}_{cls.model.upper()}_ENABLED", False, @@ -358,7 +359,15 @@ def is_enabled(cls): __name__, ) - return enabled or waffle_flag.is_enabled() + enabled = enabled_settings or waffle_flag.is_enabled() + + if enabled and not get_model(cls.model): + logging.warning( + f"Event Sink Model {cls.model} is not installed, but is enabled in settings or waffle flag." + ) + enabled = False + + return enabled @classmethod def get_sink_by_model_name(cls, model): diff --git a/platform_plugin_aspects/sinks/tests/test_base_sink.py b/platform_plugin_aspects/sinks/tests/test_base_sink.py index 054b1b8..9819e35 100644 --- a/platform_plugin_aspects/sinks/tests/test_base_sink.py +++ b/platform_plugin_aspects/sinks/tests/test_base_sink.py @@ -270,8 +270,9 @@ def test_is_not_enabled_waffle(self, mock_waffle_flag_is_enabled): mock_waffle_flag_is_enabled.return_value = False self.assertEqual(self.child_sink.__class__.is_enabled(), False) + @patch("platform_plugin_aspects.sinks.base_sink.get_model") @patch("platform_plugin_aspects.sinks.base_sink.WaffleFlag.is_enabled") - def test_is_enabled_waffle(self, mock_waffle_flag_is_enabled): + def test_is_enabled_waffle(self, mock_waffle_flag_is_enabled, mock_get_model): """ Test that is_enable() returns the correct data. """ @@ -279,7 +280,8 @@ def test_is_enabled_waffle(self, mock_waffle_flag_is_enabled): self.assertEqual(self.child_sink.__class__.is_enabled(), True) @override_settings(EVENT_SINK_CLICKHOUSE_CHILD_MODEL_ENABLED=True) - def test_is_enabled(self): + @patch("platform_plugin_aspects.sinks.base_sink.get_model") + def test_is_enabled(self, mock_get_model): """ Test that is_enable() returns the correct data. """ diff --git a/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py b/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py index f7ad9c1..bf06d69 100644 --- a/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py +++ b/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py @@ -34,6 +34,7 @@ registry=OrderedRegistry ) @override_settings(EVENT_SINK_CLICKHOUSE_COURSE_OVERVIEW_ENABLED=True) +@patch("platform_plugin_aspects.sinks.base_sink.get_model") @patch("platform_plugin_aspects.sinks.course_overview_sink.get_tags_for_block") @patch("platform_plugin_aspects.sinks.CourseOverviewSink.serialize_item") @patch("platform_plugin_aspects.sinks.CourseOverviewSink.get_model") @@ -47,6 +48,7 @@ def test_course_publish_success( mock_overview, mock_serialize_item, mock_get_tags, + mock_get_model, ): """ Test of a successful end-to-end run. @@ -110,13 +112,14 @@ def test_course_publish_success( @responses.activate( # pylint: disable=unexpected-keyword-arg,no-value-for-parameter registry=OrderedRegistry ) +@patch("platform_plugin_aspects.sinks.base_sink.get_model") @patch("platform_plugin_aspects.sinks.CourseOverviewSink.serialize_item") @patch("platform_plugin_aspects.sinks.CourseOverviewSink.get_model") @patch("platform_plugin_aspects.sinks.course_overview_sink.get_detached_xblock_types") @patch("platform_plugin_aspects.sinks.course_overview_sink.get_modulestore") # pytest:disable=unused-argument def test_course_publish_clickhouse_error( - mock_modulestore, mock_detached, mock_overview, mock_serialize_item, caplog + mock_modulestore, mock_detached, mock_overview, mock_serialize_item, mock_get_model, caplog ): """ Test the case where a ClickHouse POST fails. From 6501bcb9f75c83d82121263d99c35d41b640fc13 Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Fri, 4 Oct 2024 15:48:45 -0400 Subject: [PATCH 2/3] style: Fix formatting --- .../sinks/tests/test_course_overview_sink.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py b/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py index bf06d69..2bb5801 100644 --- a/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py +++ b/platform_plugin_aspects/sinks/tests/test_course_overview_sink.py @@ -119,7 +119,12 @@ def test_course_publish_success( @patch("platform_plugin_aspects.sinks.course_overview_sink.get_modulestore") # pytest:disable=unused-argument def test_course_publish_clickhouse_error( - mock_modulestore, mock_detached, mock_overview, mock_serialize_item, mock_get_model, caplog + mock_modulestore, + mock_detached, + mock_overview, + mock_serialize_item, + mock_get_model, + caplog, ): """ Test the case where a ClickHouse POST fails. From d1569d09c80f56b095f544e8e5d5401b5e15a299 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 4 Oct 2024 16:04:54 -0500 Subject: [PATCH 3/3] fix: verify is sink enabled before sending celery task --- platform_plugin_aspects/signals.py | 22 +++++++++++++++++++--- platform_plugin_aspects/tasks.py | 25 +++++++++---------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index 3dba75b..edd94f0 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -9,6 +9,7 @@ from platform_plugin_aspects.sinks import ( CourseEnrollmentSink, + CourseOverviewSink, ExternalIdSink, ObjectTagSink, TagSink, @@ -36,6 +37,9 @@ def receive_course_publish( # pylint: disable=unused-argument # pragma: no cov dump_course_to_clickhouse, ) + if not CourseOverviewSink.is_enabled(): + return + dump_course_to_clickhouse.delay(str(course_key)) @@ -51,11 +55,13 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra user = kwargs.get("user") course_id = kwargs.get("course_id") + sink = CourseEnrollmentSink(None, None) - CourseEnrollment = get_model("course_enrollment") - instance = CourseEnrollment.objects.get(user=user, course_id=course_id) + if not CourseEnrollmentSink.is_enabled(): + return - sink = CourseEnrollmentSink(None, None) + CourseEnrollment = sink.get_model() + instance = CourseEnrollment.objects.get(user=user, course_id=course_id) dump_data_to_clickhouse.delay( sink_module=sink.__module__, @@ -90,6 +96,8 @@ def on_user_profile_updated(instance, **kwargs): object_id=str(instance.id), ) + if not UserProfileSink.is_enabled(): + return transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs)) @@ -130,6 +138,8 @@ def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover object_id=str(instance.id), ) + if not ExternalIdSink.is_enabled(): + return transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs)) @@ -188,6 +198,8 @@ def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover object_id=str(instance.id), ) + if not TagSink.is_enabled(): + return transaction.on_commit(lambda: on_tag_saved(*args, **kwargs)) @@ -226,6 +238,8 @@ def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover object_id=str(instance.id), ) + if not TaxonomySink.is_enabled(): + return transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs)) @@ -264,6 +278,8 @@ def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover on_object_tag_deleted(sender, instance, **kwargs) + if not ObjectTagSink.is_enabled(): + return transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs)) diff --git a/platform_plugin_aspects/tasks.py b/platform_plugin_aspects/tasks.py index b0b880e..6c7209b 100644 --- a/platform_plugin_aspects/tasks.py +++ b/platform_plugin_aspects/tasks.py @@ -27,17 +27,14 @@ def dump_course_to_clickhouse(course_key_string, connection_overrides=None): connection_overrides (dict): overrides to ClickHouse connection parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. """ - if CourseOverviewSink.is_enabled(): # pragma: no cover - course_key = CourseKey.from_string(course_key_string) - sink = CourseOverviewSink( - connection_overrides=connection_overrides, log=celery_log - ) - sink.dump(course_key) + course_key = CourseKey.from_string(course_key_string) + sink = CourseOverviewSink(connection_overrides=connection_overrides, log=celery_log) + sink.dump(course_key) - ccx_courses = get_ccx_courses(course_key) - for ccx_course in ccx_courses: - ccx_course_key = str(ccx_course.locator) - sink.dump(ccx_course_key) + ccx_courses = get_ccx_courses(course_key) + for ccx_course in ccx_courses: + ccx_course_key = str(ccx_course.locator) + sink.dump(ccx_course_key) @shared_task @@ -56,9 +53,5 @@ def dump_data_to_clickhouse( """ Sink = getattr(import_module(sink_module), sink_name) - if Sink.is_enabled(): - sink = Sink(connection_overrides=connection_overrides, log=celery_log) - sink.dump(object_id) - return "Dumped" - - return "Disabled" + sink = Sink(connection_overrides=connection_overrides, log=celery_log) + sink.dump(object_id)