Skip to content

Commit

Permalink
ignoring unchanged updates in entity sync
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkt4nk committed Nov 21, 2024
1 parent 68be2ad commit 05d0d3a
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 60 deletions.
178 changes: 120 additions & 58 deletions entity/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@
Provides functions for syncing entities and their relationships to the
Entity and EntityRelationship tables.
"""
from __future__ import annotations
import logging
from collections import defaultdict
from time import sleep
from typing import TYPE_CHECKING

import manager_utils
import pgbulk
import wrapt
from collections import defaultdict

from activatable_model import model_activations_changed
from django import db
from django.contrib.contenttypes.models import ContentType
import manager_utils
from django.db import transaction, connection
from django.db import connection, transaction

from entity.config import entity_registry
from entity.models import Entity, EntityRelationship, EntityKind
from entity.models import Entity, EntityKind, EntityRelationship

if TYPE_CHECKING:
from django.db.models import QuerySet

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -249,7 +253,7 @@ def sync_entities_watching(instance):
sync_entities(*model_objs)


class EntitySyncer(object):
class EntitySyncer:
"""
A class that will handle the syncing of entities
"""
Expand All @@ -270,8 +274,6 @@ def sync(self):
LOG.debug('sync_entities')
LOG.debug(self.model_objs)

# Determine if we are syncing all
sync_all = not self.model_objs
model_objs_map = {
(ContentType.objects.get_for_model(model_obj, for_concrete_model=False), model_obj.id): model_obj
for model_obj in self.model_objs
Expand Down Expand Up @@ -301,12 +303,17 @@ def sync(self):
# For each ctype, obtain super entities. This is a dict keyed on ctype. Each value
# is a dict keyed on the ctype of the super entity with a list of tuples for
# IDs of sub/super entity relationships
super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, sync_all)
super_entities_by_ctype = _get_super_entities_by_ctype(model_objs_by_ctype, model_ids_to_sync, self.sync_all)

# Now that we have all models we need to sync, fetch them so that we can extract
# metadata and entity kinds. If we are syncing all entities, we've already fetched
# everything and can fill in this data struct without doing another DB hit
model_objs_to_sync = _get_model_objs_to_sync(model_ids_to_sync, model_objs_map, model_objs_by_ctype, sync_all)
model_objs_to_sync = _get_model_objs_to_sync(
model_ids_to_sync,
model_objs_map,
model_objs_by_ctype,
self.sync_all,
)

# Obtain all entity kind tuples associated with the models
entity_kind_tuples_to_sync = set()
Expand Down Expand Up @@ -384,16 +391,9 @@ def sync(self):
if (ctype.id, model_obj.id) in entities_map
]

if self.sync_all:
# If we're syncing everything, just sync against the entire entity relationship
# table instead of doing a complex __in query
sync_against = EntityRelationship.objects.all()
else:
sync_against = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids)

# Sync the relations
self.upsert_entity_relationships(
queryset=sync_against,
original_entity_ids,
entity_relationships=entity_relationships_to_sync
)

Expand Down Expand Up @@ -437,14 +437,12 @@ def upsert_entity_kinds(self, entity_kinds):
list(EntityKind.all_objects.all().order_by('id').select_for_update().values_list('id', flat=True))

# Upsert the entity kinds
upserted_enitity_kinds = manager_utils.bulk_upsert(
queryset=EntityKind.all_objects.filter(
name__in=[entity_kind.name for entity_kind in changed_entity_kinds]
),
model_objs=changed_entity_kinds,
unique_fields=['name'],
update_fields=['display_name'],
return_upserts=True
upserted_enitity_kinds = pgbulk.upsert(
EntityKind,
changed_entity_kinds,
['name'],
['display_name'],
returning=True,
)

# Return all the entity kinds
Expand Down Expand Up @@ -504,24 +502,34 @@ def upsert_entities(self, entities, sync=False):
for entity in initial_queryset.values_list('id', 'is_active')
}

# Sync all the entities if the sync flag is passed
if sync:
upserted_entities = manager_utils.sync(
queryset=initial_queryset,
model_objs=entities,
unique_fields=['entity_type_id', 'entity_id'],
update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'],
return_upserts=True
)
# Otherwise we want to upsert our entities
else:
upserted_entities = manager_utils.bulk_upsert(
queryset=initial_queryset,
model_objs=entities,
unique_fields=['entity_type_id', 'entity_id'],
update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'],
return_upserts=True
)
# Upsert entities
pgbulk.upsert(
queryset=initial_queryset,
model_objs=entities,
unique_fields=['entity_type_id', 'entity_id'],
update_fields=['entity_kind_id', 'entity_meta', 'display_name', 'is_active'],
ignore_unchanged=True,
)

upserted_entities = []
if entities:
upserted_entities = list(Entity.all_objects.extra(
where=['(entity_type_id, entity_id) IN %s'],
params=[tuple(
(entity.entity_type_id, entity.entity_id)
for entity in entities
)]
))

# Delete unreferenced entities if sync=True
if sync:
models_to_delete = [
model
for model in initial_queryset
if model.id not in [entity.id for entity in upserted_entities]
]
if models_to_delete:
initial_queryset.exclude(id__in=[entity.id for entity in upserted_entities]).delete()

# Compute the current state of the entities
current_entity_activation_state = {
Expand Down Expand Up @@ -550,29 +558,83 @@ def upsert_entities(self, entities, sync=False):
return upserted_entities, changed_entity_activation_state

@transaction_atomic_with_retry()
def upsert_entity_relationships(self, queryset, entity_relationships):
def upsert_entity_relationships(self, original_entity_ids, entity_relationships):
"""
Upsert entity relationships to the database
:param queryset: The base queryset to use
Sync entity relationships to the database
:param queryset: The ids of the original entity objects that are being sync'd
:param entity_relationships: The entity relationships to ensure exist in the database
"""
initial_queryset = self._get_entity_relationships_to_sync(original_entity_ids)

initial_queryset_ids = None
# Select the relationships for update
if entity_relationships:
list(queryset.order_by('id').select_for_update().values_list(
'id',
flat=True
))
initial_queryset_ids = list(
initial_queryset.order_by('id').select_for_update().values_list(
'id',
flat=True
)
)

# Sync the relationships
return manager_utils.sync(
queryset=queryset,
model_objs=entity_relationships,
unique_fields=['sub_entity_id', 'super_entity_id'],
update_fields=[],
return_upserts=True
# Upsert the relationships
pgbulk.upsert(
initial_queryset,
entity_relationships,
['sub_entity_id', 'super_entity_id'],
ignore_unchanged=True,
)

syncd_relationships = []
syncd_relationship_ids = []
if entity_relationships:
# Get the new and updated relationships that were upserted
syncd_relationships = list(
EntityRelationship.objects.extra(
where=['(sub_entity_id, super_entity_id) IN %s'],
params=[tuple(
(relationship.sub_entity_id, relationship.super_entity_id)
for relationship in entity_relationships
)]
)
)
syncd_relationship_ids = [relationship.id for relationship in syncd_relationships]

# Determine whether there are relationships that should be deleted
# ----------------------------------------------------------------
# If there were relationships explicitly provided, then we selected the initial set for update
# and have a reference to those ids. We can simply compare the updated/sync'd set of ids to
# determine if there are old relationships that have to be deleted
if entity_relationships:
relationships_to_delete = [
relationship_id for relationship_id in initial_queryset_ids
if relationship_id not in syncd_relationship_ids
]
EntityRelationship.objects.filter(id__in=relationships_to_delete).delete()
# Else, just exclude the sync'd ids from the initial queryset to delete
else:
self._get_entity_relationships_to_sync(original_entity_ids).exclude(
id__in=syncd_relationship_ids
).delete()

return syncd_relationships

def _get_entity_relationships_to_sync(self, original_entity_ids) -> QuerySet:
"""
Given the calling context and the ids of the entities this process was originally syncing,
return the entity relationship queryset that should be replaced/sync'd with an updated set of relationships
:param original_entity_ids: The list of the entities originally meant to be sync'd by this process
"""
queryset = EntityRelationship.objects.filter(sub_entity_id__in=original_entity_ids)

if self.sync_all:
# If we're syncing everything, just sync against the entire entity relationship
# table instead of doing a complex __in query
queryset = EntityRelationship.objects.all()

return queryset

def send_entity_activation_events(self, changed_entity_activation_state):
"""
Given a changed entity state dict, fire the appropriate signals
Expand Down
2 changes: 1 addition & 1 deletion entity/tests/sync_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ def test_sync_all_optimal_queries(self):
with patch('entity.sync.entity_registry') as mock_entity_registry:
mock_entity_registry.entity_registry = new_registry.entity_registry
ContentType.objects.clear_cache()
with self.assertNumQueries(20):
with self.assertNumQueries(22):
sync_entities()

self.assertEqual(Entity.objects.filter(entity_type=ContentType.objects.get_for_model(Account)).count(), 5)
Expand Down
2 changes: 1 addition & 1 deletion entity/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '6.2.3'
__version__ = '6.3.0'
2 changes: 2 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Release Notes

- 6.3.0:
- Ignore no-op updates when syncing entities and entity relationships in order to avoid downstream cost
- 6.2.3:
- Update the `defer_entity_syncing` decorator to support an optional handler.
- 6.2.2:
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Django>=3.2
django-activatable-model>=3.1.0
django-pgbulk==3.2.0
django-manager-utils>=3.1.0
python3-utils>=0.3
wrapt>=1.10.5

0 comments on commit 05d0d3a

Please sign in to comment.