From 05d0d3a323f05f5b84e6ed297b0f73f54f37be3a Mon Sep 17 00:00:00 2001 From: Ryan Bales Date: Thu, 21 Nov 2024 11:23:36 -0500 Subject: [PATCH] ignoring unchanged updates in entity sync --- entity/sync.py | 178 +++++++++++++++++++++++----------- entity/tests/sync_tests.py | 2 +- entity/version.py | 2 +- release_notes.md | 2 + requirements/requirements.txt | 1 + 5 files changed, 125 insertions(+), 60 deletions(-) diff --git a/entity/sync.py b/entity/sync.py index 6326977..4b7f1b7 100644 --- a/entity/sync.py +++ b/entity/sync.py @@ -2,21 +2,25 @@ Provides functions for syncing entities and their relationships to the Entity and EntityRelationship tables. """ +from __future__ import annotations import logging +from collections import defaultdict from time import sleep +from typing import TYPE_CHECKING +import manager_utils +import pgbulk import wrapt -from collections import defaultdict - from activatable_model import model_activations_changed from django import db from django.contrib.contenttypes.models import ContentType -import manager_utils -from django.db import transaction, connection +from django.db import connection, transaction from entity.config import entity_registry -from entity.models import Entity, EntityRelationship, EntityKind +from entity.models import Entity, EntityKind, EntityRelationship +if TYPE_CHECKING: + from django.db.models import QuerySet LOG = logging.getLogger(__name__) @@ -249,7 +253,7 @@ def sync_entities_watching(instance): sync_entities(*model_objs) -class EntitySyncer(object): +class EntitySyncer: """ A class that will handle the syncing of entities """ @@ -270,8 +274,6 @@ def sync(self): LOG.debug('sync_entities') LOG.debug(self.model_objs) - # Determine if we are syncing all - sync_all = not self.model_objs model_objs_map = { (ContentType.objects.get_for_model(model_obj, for_concrete_model=False), model_obj.id): model_obj for model_obj in self.model_objs @@ -301,12 +303,17 @@ def sync(self): # For each ctype, obtain super entities. This is a dict keyed on ctype. Each value # is a dict keyed on the ctype of the super entity with a list of tuples for # IDs of sub/super entity relationships - super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, sync_all) + super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, self.sync_all) # Now that we have all models we need to sync, fetch them so that we can extract # metadata and entity kinds. If we are syncing all entities, we've already fetched # everything and can fill in this data struct without doing another DB hit - model_objs_to_sync = _get_model_objs_to_sync(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all) + model_objs_to_sync = _get_model_objs_to_sync( + model_ids_to_sync, + model_objs_map, + model_objs_by_ctype, + self.sync_all, + ) # Obtain all entity kind tuples associated with the models entity_kind_tuples_to_sync = set() @@ -384,16 +391,9 @@ def sync(self): if (ctype.id, model_obj.id) in entities_map ] - if self.sync_all: - # If we're syncing everything, just sync against the entire entity relationship - # table instead of doing a complex __in query - sync_against = EntityRelationship.objects.all() - else: - sync_against = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids) - # Sync the relations self.upsert_entity_relationships( - queryset=sync_against, + original_entity_ids, entity_relationships=entity_relationships_to_sync ) @@ -437,14 +437,12 @@ def upsert_entity_kinds(self, entity_kinds): list(EntityKind.all_objects.all().order_by('id').select_for_update().values_list('id', flat=True)) # Upsert the entity kinds - upserted_enitity_kinds = manager_utils.bulk_upsert( - queryset=EntityKind.all_objects.filter( - name__in=[entity_kind.name for entity_kind in changed_entity_kinds] - ), - model_objs=changed_entity_kinds, - unique_fields=['name'], - update_fields=['display_name'], - return_upserts=True + upserted_enitity_kinds = pgbulk.upsert( + EntityKind, + changed_entity_kinds, + ['name'], + ['display_name'], + returning=True, ) # Return all the entity kinds @@ -504,24 +502,34 @@ def upsert_entities(self, entities, sync=False): for entity in initial_queryset.values_list('id', 'is_active') } - # Sync all the entities if the sync flag is passed - if sync: - upserted_entities = manager_utils.sync( - queryset=initial_queryset, - model_objs=entities, - unique_fields=['entity_type_id', 'entity_id'], - update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], - return_upserts=True - ) - # Otherwise we want to upsert our entities - else: - upserted_entities = manager_utils.bulk_upsert( - queryset=initial_queryset, - model_objs=entities, - unique_fields=['entity_type_id', 'entity_id'], - update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], - return_upserts=True - ) + # Upsert entities + pgbulk.upsert( + queryset=initial_queryset, + model_objs=entities, + unique_fields=['entity_type_id', 'entity_id'], + update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'], + ignore_unchanged=True, + ) + + upserted_entities = [] + if entities: + upserted_entities = list(Entity.all_objects.extra( + where=['(entity_type_id, entity_id) IN %s'], + params=[tuple( + (entity.entity_type_id, entity.entity_id) + for entity in entities + )] + )) + + # Delete unreferenced entities if sync=True + if sync: + models_to_delete = [ + model + for model in initial_queryset + if model.id not in [entity.id for entity in upserted_entities] + ] + if models_to_delete: + initial_queryset.exclude(id__in=[entity.id for entity in upserted_entities]).delete() # Compute the current state of the entities current_entity_activation_state = { @@ -550,29 +558,83 @@ def upsert_entities(self, entities, sync=False): return upserted_entities, changed_entity_activation_state @transaction_atomic_with_retry() - def upsert_entity_relationships(self, queryset, entity_relationships): + def upsert_entity_relationships(self, original_entity_ids, entity_relationships): """ - Upsert entity relationships to the database - :param queryset: The base queryset to use + Sync entity relationships to the database + + :param queryset: The ids of the original entity objects that are being sync'd :param entity_relationships: The entity relationships to ensure exist in the database """ + initial_queryset = self._get_entity_relationships_to_sync(original_entity_ids) + initial_queryset_ids = None # Select the relationships for update if entity_relationships: - list(queryset.order_by('id').select_for_update().values_list( - 'id', - flat=True - )) + initial_queryset_ids = list( + initial_queryset.order_by('id').select_for_update().values_list( + 'id', + flat=True + ) + ) - # Sync the relationships - return manager_utils.sync( - queryset=queryset, - model_objs=entity_relationships, - unique_fields=['sub_entity_id', 'super_entity_id'], - update_fields=[], - return_upserts=True + # Upsert the relationships + pgbulk.upsert( + initial_queryset, + entity_relationships, + ['sub_entity_id', 'super_entity_id'], + ignore_unchanged=True, ) + syncd_relationships = [] + syncd_relationship_ids = [] + if entity_relationships: + # Get the new and updated relationships that were upserted + syncd_relationships = list( + EntityRelationship.objects.extra( + where=['(sub_entity_id, super_entity_id) IN %s'], + params=[tuple( + (relationship.sub_entity_id, relationship.super_entity_id) + for relationship in entity_relationships + )] + ) + ) + syncd_relationship_ids = [relationship.id for relationship in syncd_relationships] + + # Determine whether there are relationships that should be deleted + # ---------------------------------------------------------------- + # If there were relationships explicitly provided, then we selected the initial set for update + # and have a reference to those ids. We can simply compare the updated/sync'd set of ids to + # determine if there are old relationships that have to be deleted + if entity_relationships: + relationships_to_delete = [ + relationship_id for relationship_id in initial_queryset_ids + if relationship_id not in syncd_relationship_ids + ] + EntityRelationship.objects.filter(id__in=relationships_to_delete).delete() + # Else, just exclude the sync'd ids from the initial queryset to delete + else: + self._get_entity_relationships_to_sync(original_entity_ids).exclude( + id__in=syncd_relationship_ids + ).delete() + + return syncd_relationships + + def _get_entity_relationships_to_sync(self, original_entity_ids) -> QuerySet: + """ + Given the calling context and the ids of the entities this process was originally syncing, + return the entity relationship queryset that should be replaced/sync'd with an updated set of relationships + + :param original_entity_ids: The list of the entities originally meant to be sync'd by this process + """ + queryset = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids) + + if self.sync_all: + # If we're syncing everything, just sync against the entire entity relationship + # table instead of doing a complex __in query + queryset = EntityRelationship.objects.all() + + return queryset + def send_entity_activation_events(self, changed_entity_activation_state): """ Given a changed entity state dict, fire the appropriate signals diff --git a/entity/tests/sync_tests.py b/entity/tests/sync_tests.py index 7435b7b..3074882 100644 --- a/entity/tests/sync_tests.py +++ b/entity/tests/sync_tests.py @@ -996,7 +996,7 @@ def test_sync_all_optimal_queries(self): with patch('entity.sync.entity_registry') as mock_entity_registry: mock_entity_registry.entity_registry = new_registry.entity_registry ContentType.objects.clear_cache() - with self.assertNumQueries(20): + with self.assertNumQueries(22): sync_entities() self.assertEqual(Entity.objects.filter(entity_type=ContentType.objects.get_for_model(Account)).count(), 5) diff --git a/entity/version.py b/entity/version.py index 3fec2bf..89d98bf 100644 --- a/entity/version.py +++ b/entity/version.py @@ -1 +1 @@ -__version__ = '6.2.3' +__version__ = '6.3.0' diff --git a/release_notes.md b/release_notes.md index 400493c..eea05c9 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,5 +1,7 @@ ## Release Notes +- 6.3.0: + - Ignore no-op updates when syncing entities and entity relationships in order to avoid downstream cost - 6.2.3: - Update the `defer_entity_syncing` decorator to support an optional handler. - 6.2.2: diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 3f16bd4..68d1547 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,5 +1,6 @@ Django>=3.2 django-activatable-model>=3.1.0 +django-pgbulk==3.2.0 django-manager-utils>=3.1.0 python3-utils>=0.3 wrapt>=1.10.5