Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cag/patch #98

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions platform_plugin_aspects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from platform_plugin_aspects.sinks import (
CourseEnrollmentSink,
CourseOverviewSink,
ExternalIdSink,
ObjectTagSink,
TagSink,
Expand Down Expand Up @@ -36,6 +37,9 @@ def receive_course_publish( # pylint: disable=unused-argument # pragma: no cov
dump_course_to_clickhouse,
)

if not CourseOverviewSink.is_enabled():
return

dump_course_to_clickhouse.delay(str(course_key))


Expand All @@ -51,11 +55,13 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra

user = kwargs.get("user")
course_id = kwargs.get("course_id")
sink = CourseEnrollmentSink(None, None)

CourseEnrollment = get_model("course_enrollment")
instance = CourseEnrollment.objects.get(user=user, course_id=course_id)
if not CourseEnrollmentSink.is_enabled():
return

sink = CourseEnrollmentSink(None, None)
CourseEnrollment = sink.get_model()
instance = CourseEnrollment.objects.get(user=user, course_id=course_id)

dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
Expand Down Expand Up @@ -90,6 +96,8 @@ def on_user_profile_updated(instance, **kwargs):
object_id=str(instance.id),
)

if not UserProfileSink.is_enabled():
return
transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs))


Expand Down Expand Up @@ -130,6 +138,8 @@ def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
object_id=str(instance.id),
)

if not ExternalIdSink.is_enabled():
return
transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs))


Expand Down Expand Up @@ -188,6 +198,8 @@ def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
object_id=str(instance.id),
)

if not TagSink.is_enabled():
return
transaction.on_commit(lambda: on_tag_saved(*args, **kwargs))


Expand Down Expand Up @@ -226,6 +238,8 @@ def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
object_id=str(instance.id),
)

if not TaxonomySink.is_enabled():
return
transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs))


Expand Down Expand Up @@ -264,6 +278,8 @@ def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover

on_object_tag_deleted(sender, instance, **kwargs)

if not ObjectTagSink.is_enabled():
return
transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs))


Expand Down
13 changes: 11 additions & 2 deletions platform_plugin_aspects/sinks/base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import csv
import datetime
import io
import logging
from collections import namedtuple

import requests
Expand Down Expand Up @@ -342,7 +343,7 @@ def is_enabled(cls):
"""
Return True if the sink is enabled, False otherwise
"""
enabled = getattr(
enabled_settings = getattr(
settings,
f"{WAFFLE_FLAG_NAMESPACE.upper()}_{cls.model.upper()}_ENABLED",
False,
Expand All @@ -358,7 +359,15 @@ def is_enabled(cls):
__name__,
)

return enabled or waffle_flag.is_enabled()
enabled = enabled_settings or waffle_flag.is_enabled()

if enabled and not get_model(cls.model):
logging.warning(
f"Event Sink Model {cls.model} is not installed, but is enabled in settings or waffle flag."
)
enabled = False

return enabled

@classmethod
def get_sink_by_model_name(cls, model):
Expand Down
6 changes: 4 additions & 2 deletions platform_plugin_aspects/sinks/tests/test_base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,18 @@ def test_is_not_enabled_waffle(self, mock_waffle_flag_is_enabled):
mock_waffle_flag_is_enabled.return_value = False
self.assertEqual(self.child_sink.__class__.is_enabled(), False)

@patch("platform_plugin_aspects.sinks.base_sink.get_model")
@patch("platform_plugin_aspects.sinks.base_sink.WaffleFlag.is_enabled")
def test_is_enabled_waffle(self, mock_waffle_flag_is_enabled):
def test_is_enabled_waffle(self, mock_waffle_flag_is_enabled, mock_get_model):
"""
Test that is_enable() returns the correct data.
"""
mock_waffle_flag_is_enabled.return_value = True
self.assertEqual(self.child_sink.__class__.is_enabled(), True)

@override_settings(EVENT_SINK_CLICKHOUSE_CHILD_MODEL_ENABLED=True)
def test_is_enabled(self):
@patch("platform_plugin_aspects.sinks.base_sink.get_model")
def test_is_enabled(self, mock_get_model):
"""
Test that is_enable() returns the correct data.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
registry=OrderedRegistry
)
@override_settings(EVENT_SINK_CLICKHOUSE_COURSE_OVERVIEW_ENABLED=True)
@patch("platform_plugin_aspects.sinks.base_sink.get_model")
@patch("platform_plugin_aspects.sinks.course_overview_sink.get_tags_for_block")
@patch("platform_plugin_aspects.sinks.CourseOverviewSink.serialize_item")
@patch("platform_plugin_aspects.sinks.CourseOverviewSink.get_model")
Expand All @@ -47,6 +48,7 @@ def test_course_publish_success(
mock_overview,
mock_serialize_item,
mock_get_tags,
mock_get_model,
):
"""
Test of a successful end-to-end run.
Expand Down Expand Up @@ -110,13 +112,19 @@ def test_course_publish_success(
@responses.activate( # pylint: disable=unexpected-keyword-arg,no-value-for-parameter
registry=OrderedRegistry
)
@patch("platform_plugin_aspects.sinks.base_sink.get_model")
@patch("platform_plugin_aspects.sinks.CourseOverviewSink.serialize_item")
@patch("platform_plugin_aspects.sinks.CourseOverviewSink.get_model")
@patch("platform_plugin_aspects.sinks.course_overview_sink.get_detached_xblock_types")
@patch("platform_plugin_aspects.sinks.course_overview_sink.get_modulestore")
# pytest:disable=unused-argument
def test_course_publish_clickhouse_error(
mock_modulestore, mock_detached, mock_overview, mock_serialize_item, caplog
mock_modulestore,
mock_detached,
mock_overview,
mock_serialize_item,
mock_get_model,
caplog,
):
"""
Test the case where a ClickHouse POST fails.
Expand Down
25 changes: 9 additions & 16 deletions platform_plugin_aspects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,14 @@ def dump_course_to_clickhouse(course_key_string, connection_overrides=None):
connection_overrides (dict): overrides to ClickHouse connection
parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`.
"""
if CourseOverviewSink.is_enabled(): # pragma: no cover
course_key = CourseKey.from_string(course_key_string)
sink = CourseOverviewSink(
connection_overrides=connection_overrides, log=celery_log
)
sink.dump(course_key)
course_key = CourseKey.from_string(course_key_string)
sink = CourseOverviewSink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(course_key)

ccx_courses = get_ccx_courses(course_key)
for ccx_course in ccx_courses:
ccx_course_key = str(ccx_course.locator)
sink.dump(ccx_course_key)
ccx_courses = get_ccx_courses(course_key)
for ccx_course in ccx_courses:
ccx_course_key = str(ccx_course.locator)
sink.dump(ccx_course_key)


@shared_task
Expand All @@ -56,9 +53,5 @@ def dump_data_to_clickhouse(
"""
Sink = getattr(import_module(sink_module), sink_name)

if Sink.is_enabled():
sink = Sink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(object_id)
return "Dumped"

return "Disabled"
sink = Sink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(object_id)
Loading