Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LITE-31232 Support for bulk_relate_cqrs_serialization #160

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ Unit testing

Run tests with various RDBMS:
- `cd integration_tests`
- `DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test`

Check code style: `flake8`
Run tests: `pytest`
Expand All @@ -244,6 +244,6 @@ To generate HTML coverage reports use:

Integrational testing
------
1. docker-compose
1. docker compose
2. `cd integration_tests`
3. `docker-compose run master`
3. `docker compose run master`
14 changes: 11 additions & 3 deletions dj_cqrs/mixins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import logging

Expand All @@ -20,6 +20,7 @@
from dj_cqrs.managers import MasterManager, ReplicaManager
from dj_cqrs.metas import MasterMeta, ReplicaMeta
from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update
from dj_cqrs.state import cqrs_state


logger = logging.getLogger('django-cqrs')
Expand Down Expand Up @@ -292,9 +293,16 @@ def _class_serialization(self, using, sync=False):
if sync:
instance = self
else:
instance = None
db = using if using is not None else self._state.db
qs = self.__class__._default_manager.using(db)
instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)

bulk_relate_cm = cqrs_state.bulk_relate_cm
if bulk_relate_cm:
instance = bulk_relate_cm.get_cached_instance(self, db)

if not instance:
qs = self.__class__._default_manager.using(db)
instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)

data = self._cqrs_serializer_cls(instance).data
data['cqrs_revision'] = instance.cqrs_revision
Expand Down
7 changes: 6 additions & 1 deletion dj_cqrs/signals.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import logging

Expand All @@ -9,6 +9,7 @@
from dj_cqrs.constants import SignalType
from dj_cqrs.controller import producer
from dj_cqrs.dataclasses import TransportPayload
from dj_cqrs.state import cqrs_state
from dj_cqrs.utils import get_message_expiration_dt


Expand Down Expand Up @@ -64,6 +65,10 @@ def post_save(cls, sender, **kwargs):

using = kwargs['using']

bulk_relate_cm = cqrs_state.bulk_relate_cm
if bulk_relate_cm:
bulk_relate_cm.register(instance, using)

sync = kwargs.get('sync', False)
queue = kwargs.get('queue', None)

Expand Down
7 changes: 7 additions & 0 deletions dj_cqrs/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import threading


cqrs_state = threading.local()
cqrs_state.bulk_relate_cm = None
62 changes: 61 additions & 1 deletion dj_cqrs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import logging
from collections import defaultdict
from contextlib import ContextDecorator
from datetime import date, datetime, timedelta
from uuid import UUID

Expand All @@ -10,6 +12,7 @@

from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS
from dj_cqrs.logger import install_last_query_capturer
from dj_cqrs.state import cqrs_state


logger = logging.getLogger('django-cqrs')
Expand Down Expand Up @@ -80,3 +83,60 @@ def apply_query_timeouts(model_cls): # pragma: no cover
cursor.execute(statement, params=(query_timeout,))

install_last_query_capturer(model_cls)


class _BulkRelateCM(ContextDecorator):
def __init__(self, cqrs_id=None):
self._cqrs_id = cqrs_id
self._mapping = defaultdict(lambda: defaultdict(set))
self._cache = {}

def register(self, instance, using=None):
instance_cqrs_id = getattr(instance, 'CQRS_ID', None)
if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id):
return

self._mapping[instance_cqrs_id][using].add(instance.pk)

def get_cached_instance(self, instance, using=None):
instance_cqrs_id = getattr(instance, 'CQRS_ID', None)
if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id):
return

instance_pk = instance.pk
cached_instances = self._cache.get(instance_cqrs_id, {}).get(using, {})
if cached_instances:
return cached_instances.get(instance_pk)

cached_pks = self._mapping[instance_cqrs_id][using]
if not cached_pks:
return

qs = instance.__class__._default_manager.using(using)
instances_cache = {
instance.pk: instance
for instance in instance.__class__.relate_cqrs_serialization(qs)
.filter(
pk__in=cached_pks,
)
.order_by()
.all()
}
self._cache.update(
{
instance_cqrs_id: {
using: instances_cache,
},
}
)
return instances_cache.get(instance_pk)

def __enter__(self):
cqrs_state.bulk_relate_cm = self

def __exit__(self, exc_type, exc_val, exc_tb):
cqrs_state.bulk_relate_cm = None


def bulk_relate_cqrs_serialization(cqrs_id=None):
return _BulkRelateCM(cqrs_id=cqrs_id)
12 changes: 6 additions & 6 deletions examples/demo_project/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ It's a simple demo project contains 2 services:
## Start project:

```
docker-compose up -d db_pgsql db_mysql
docker-compose run master ./manage.py migrate
docker-compose run replica ./manage.py migrate
docker-compose up -d
docker-compose run master ./manage.py cqrs_sync --cqrs-id=user -f={}
docker-compose run master ./manage.py cqrs_sync --cqrs-id=product -f={}
docker compose up -d db_pgsql db_mysql
docker compose run master ./manage.py migrate
docker compose run replica ./manage.py migrate
docker compose up -d
docker compose run master ./manage.py cqrs_sync --cqrs-id=user -f={}
docker compose run master ./manage.py cqrs_sync --cqrs-id=product -f={}
```

It starts master WEB app on [http://127.0.0.1:8000](http://127.0.0.1:8000) and replica on [http://127.0.0.1:8001](http://127.0.0.1:8001)
Expand Down
22 changes: 11 additions & 11 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,40 @@
.DEFAULT_GOAL := pika

build:
docker-compose build
docker compose build

build_master_v1:
docker-compose -f docker-compose.yml -f masterV1.yml build
docker compose -f docker-compose.yml -f masterV1.yml build

build_replica_v1:
docker-compose -f docker-compose.yml -f replicaV1.yml build
docker compose -f docker-compose.yml -f replicaV1.yml build

pika: build
@echo "Run PIKA integration tests..."
docker-compose run master
docker compose run master
@echo "Stopping running containers..."
docker-compose down --remove-orphans
docker compose down --remove-orphans
@echo "Done!"

kombu: build
@echo "Run KOMBU integration tests..."
docker-compose -f docker-compose.yml -f kombu.yml run master
docker compose -f docker-compose.yml -f kombu.yml run master
@echo "Stopping running containers..."
docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans
docker compose -f docker-compose.yml -f kombu.yml down --remove-orphans
@echo "Done!"

master_v1: build_master_v1
@echo "Run regression tests Master v1.3.1..."
docker-compose -f docker-compose.yml -f masterV1.yml run master
docker compose -f docker-compose.yml -f masterV1.yml run master
@echo "Stopping running containers..."
docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans
docker compose -f docker-compose.yml -f masterV1.yml down --remove-orphans
@echo "Done!"

replica_v1: build_replica_v1
@echo "Run regression tests Replica v1.3.1..."
docker-compose -f docker-compose.yml -f replicaV1.yml run master
docker compose -f docker-compose.yml -f replicaV1.yml run master
@echo "Stopping running containers..."
docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans
docker compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans
@echo "Done!"

all: pika kombu master_v1 replica_v1
31 changes: 30 additions & 1 deletion tests/test_master/test_signals.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

from datetime import datetime, timezone

Expand All @@ -8,6 +8,7 @@

from dj_cqrs.constants import SignalType
from dj_cqrs.signals import post_bulk_create, post_update
from dj_cqrs.utils import bulk_relate_cqrs_serialization
from tests.dj_master import models
from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args

Expand Down Expand Up @@ -127,6 +128,34 @@ def test_manual_post_bulk_create(mocker):
assert publisher_mock.call_count == 3


@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize('count', (1, 3, 5))
def test_bulk_relate_cqrs_serialization(
django_assert_num_queries,
django_v_trans_q_count_sup,
mocker,
count,
settings,
):
mocker.patch('dj_cqrs.controller.producer.produce')

if settings.DB_ENGINE == 'sqlite' and django_v_trans_q_count_sup == 0:
suppl = 1
else:
suppl = django_v_trans_q_count_sup

opt_query_count = count + 2 + suppl
with django_assert_num_queries(opt_query_count):
with bulk_relate_cqrs_serialization():
with transaction.atomic(savepoint=False):
[models.Author.objects.create(id=i) for i in range(count)]

not_opt_query_count = count + count * 2 + suppl
with django_assert_num_queries(not_opt_query_count):
with transaction.atomic(savepoint=False):
[models.Author.objects.create(id=10 + i) for i in range(count)]


@pytest.mark.django_db(transaction=True)
def test_automatic_post_bulk_create(mocker):
publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
Expand Down
Loading
Loading