Skip to content

Commit

Permalink
Merge pull request #476 from winged/feat_convenience_methods
Browse files Browse the repository at this point in the history
feat(models): (re)introduce some convenience methods for working with scope trees
  • Loading branch information
winged authored Jun 12, 2024
2 parents e0c80f2 + b96014f commit 538b4df
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 40 deletions.
59 changes: 58 additions & 1 deletion emeis/core/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import operator
import unicodedata
import uuid
from functools import reduce

from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser, UserManager
Expand Down Expand Up @@ -158,6 +160,58 @@ def is_authenticated(self):
return True


class ScopeQuerySet(TreeQuerySet):
# django-tree-queries sadly does not (yet?) support ancestors query
# for QS - only for single nodes. So we're providing all_descendants()
# and all_ancestors() queryset methods.

def all_descendants(self, include_self=False):
"""Return a QS that contains all descendants of the given QS.
This is a workaround for django-tree-queries, which currently does
not support this query (it can only do it on single nodes).
This is in contrast to .descendants(), which can only give the descendants
of one model instance.
"""
descendants_q = reduce(
operator.or_,
[
models.Q(
pk__in=entry.descendants(include_self=include_self).values("pk")
)
for entry in self
],
models.Q(),
)
return self.model.objects.filter(descendants_q)

def all_ancestors(self, include_self=False):
"""Return a QS that contains all ancestors of the given QS.
This is a workaround for django-tree-queries, which currently does
not support this query (it can only do it on single nodes).
This is in contrast to .ancestors(), which can only give the ancestors
of one model instance.
"""

descendants_q = reduce(
operator.or_,
[
models.Q(pk__in=entry.ancestors(include_self=include_self).values("pk"))
for entry in self
],
models.Q(),
)
return self.model.objects.filter(descendants_q)

def all_roots(self):
return Scope.objects.all().filter(
pk__in=[scope.ancestors(include_self=True).first().pk for scope in self]
)


class Scope(TreeNode, UUIDModel):
name = LocalizedCharField(_("scope name"), blank=False, null=False, required=False)

Expand All @@ -170,7 +224,10 @@ class Scope(TreeNode, UUIDModel):
)
is_active = models.BooleanField(default=True)

objects = TreeQuerySet.as_manager(with_tree_fields=True)
objects = ScopeQuerySet.as_manager(with_tree_fields=True)

def get_root(self):
return self.ancestors(include_self=True).first()

def save(self, *args, **kwargs):
# django-tree-queries does validation in TreeNode.clean(), which is not
Expand Down
8 changes: 7 additions & 1 deletion emeis/core/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ def get_level(self, obj):
# Sometimes, the model object may come out of a non-django-tree-queries
# QS, and thus would not have the `tree_*` attributes amended. Then we
# need to go the "slow path"
return obj.ancestors().count()
if not obj.pk and obj.parent_id:
# unsaved object, sometimes used in unit tests etc
return self.get_level(obj.parent) + 1

if obj.parent_id:
return obj.ancestors().count()
return 0

class Meta:
model = Scope
Expand Down
37 changes: 0 additions & 37 deletions emeis/core/tests/snapshots/snap_test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,43 +229,6 @@
SELECT (__tree.tree_depth) AS "tree_depth", (__tree.tree_path) AS "tree_path", (__tree.tree_ordering) AS "tree_ordering", "emeis_core_scope"."parent_id", "emeis_core_scope"."created_at", "emeis_core_scope"."modified_at", "emeis_core_scope"."created_by_user_id", "emeis_core_scope"."metainfo", "emeis_core_scope"."id", "emeis_core_scope"."name", "emeis_core_scope"."full_name", "emeis_core_scope"."description", "emeis_core_scope"."is_active" FROM "emeis_core_scope" , "__tree" WHERE ("emeis_core_scope"."parent_id" = \'f561aaf6-ef0b-f14d-4208-bb46a4ccb3ad\'::uuid AND (__tree.tree_pk = emeis_core_scope.id)) ORDER BY ("__tree".tree_ordering) ASC""",
"""INSERT INTO "emeis_core_scope" ("parent_id", "created_at", "modified_at", "created_by_user_id", "metainfo", "id", "name", "full_name", "description", "is_active") VALUES (NULL, \'2017-05-21T00:00:00+00:00\'::timestamptz, \'2017-05-21T00:00:00+00:00\'::timestamptz, \'9336ebf2-5087-d91c-818e-e6e9ec29f8c1\'::uuid, \'{}\', \'f561aaf6-ef0b-f14d-4208-bb46a4ccb3ad\'::uuid, hstore(ARRAY[\'en\',\'de\',\'fr\'], ARRAY[\'Pamela Horton\',\'\',\'\']), hstore(ARRAY[\'en\',\'de\',\'fr\'], ARRAY[\'Pamela Horton\',\'Pamela Horton\',\'Pamela Horton\']), hstore(ARRAY[\'en\',\'de\',\'fr\'], ARRAY[\'Effort meet relationship far. Option program interesting station. First where during teach country talk across.
Argue move appear catch toward help wind. Material minute ago get.','','']), true)""",
"""
WITH RECURSIVE __rank_table(
"id",
"parent_id",
"rank_order"
) AS (
SELECT "emeis_core_scope"."id", "emeis_core_scope"."parent_id", ROW_NUMBER() OVER (ORDER BY "emeis_core_scope"."name" ASC) AS "rank_order" FROM "emeis_core_scope"
),
__tree (
"tree_depth",
"tree_path",
"tree_ordering",
"tree_pk"
) AS (
SELECT
0,
array[T.id],
array[T.rank_order],
T."id"
FROM __rank_table T
WHERE T."parent_id" IS NULL
UNION ALL
SELECT
__tree.tree_depth + 1,
__tree.tree_path || T.id,
__tree.tree_ordering || T.rank_order,
T."id"
FROM __rank_table T
JOIN __tree ON T."parent_id" = __tree.tree_pk
)
SELECT (__tree.tree_depth) AS "tree_depth", (__tree.tree_path) AS "tree_path", (__tree.tree_ordering) AS "tree_ordering", "emeis_core_scope"."parent_id", "emeis_core_scope"."created_at", "emeis_core_scope"."modified_at", "emeis_core_scope"."created_by_user_id", "emeis_core_scope"."metainfo", "emeis_core_scope"."id", "emeis_core_scope"."name", "emeis_core_scope"."full_name", "emeis_core_scope"."description", "emeis_core_scope"."is_active" FROM "emeis_core_scope" , "__tree" WHERE ("emeis_core_scope"."id" = \'f561aaf6-ef0b-f14d-4208-bb46a4ccb3ad\'::uuid AND (__tree.tree_pk = emeis_core_scope.id)) ORDER BY ("__tree".tree_ordering) ASC LIMIT 21""",
],
"request": {
"CONTENT_LENGTH": "614",
Expand Down
4 changes: 3 additions & 1 deletion emeis/core/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ def test_api_detail(fixture, admin_client, viewset, snapshot):


@pytest.mark.freeze_time("2017-05-21")
def test_api_create(transactional_db, fixture, admin_client, viewset, snapshot):
def test_api_create(
transactional_db, deterministic_uuids, fixture, admin_client, viewset, snapshot
):
url = reverse("{0}-list".format(viewset.base_name))

serializer = viewset.serializer_class(fixture)
Expand Down
133 changes: 133 additions & 0 deletions emeis/core/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,136 @@ def test_update_full_name_of_child(db, scope_factory):

grandchild.refresh_from_db()
assert str(grandchild.full_name) == "r » s » c » g"


@pytest.fixture
def simple_tree_structure(db, scope_factory):
# root1
# - sub1sub1
# - sub1sub1sub1
# - sub1sub1sub2
# - sub1sub2
# root2
# - sub2sub1
# - sub2sub2
root1 = scope_factory(name="root1")
root2 = scope_factory(name="root2")
sub1sub1 = scope_factory(parent=root1, name="sub1sub1")
sub1sub2 = scope_factory(parent=root1, name="sub1sub2")
sub1sub1sub1 = scope_factory(parent=sub1sub1, name="sub1sub1sub1")
sub1sub1sub2 = scope_factory(parent=sub1sub1, name="sub1sub1sub2")

sub2sub1 = scope_factory(parent=root2, name="sub2sub1")
sub2sub2 = scope_factory(parent=root2, name="sub2sub2")
return {
"root1": root1,
"root2": root2,
"sub1sub1": sub1sub1,
"sub1sub2": sub1sub2,
"sub1sub1sub1": sub1sub1sub1,
"sub1sub1sub2": sub1sub1sub2,
"sub2sub1": sub2sub1,
"sub2sub2": sub2sub2,
}


@pytest.mark.parametrize(
"include_self, expect_count",
[
(True, 5),
(False, 3),
],
)
def test_scope_ancestors(db, simple_tree_structure, include_self, expect_count):
qs = Scope.objects.filter(
pk__in=[
simple_tree_structure["sub2sub2"].pk,
simple_tree_structure["sub1sub1sub2"].pk,
]
)

ancestors_qs = qs.all_ancestors(include_self=include_self)
# the direct and indirect ancestors must be there
assert simple_tree_structure["root2"] in ancestors_qs
assert simple_tree_structure["root1"] in ancestors_qs
assert simple_tree_structure["sub1sub1"] in ancestors_qs

if include_self:
assert simple_tree_structure["sub2sub2"] in ancestors_qs
assert simple_tree_structure["sub1sub1sub2"] in ancestors_qs
else:
assert simple_tree_structure["sub2sub2"] not in ancestors_qs
assert simple_tree_structure["sub1sub1sub2"] not in ancestors_qs

# ... and nothing else
assert ancestors_qs.count() == expect_count


@pytest.mark.parametrize(
"include_self, expect_count",
[
(True, 6),
(False, 4),
],
)
def test_scope_descendants(db, simple_tree_structure, include_self, expect_count):
qs = Scope.objects.filter(
pk__in=[simple_tree_structure["sub1sub1"].pk, simple_tree_structure["root2"].pk]
)

descendants_qs = qs.all_descendants(include_self=include_self)
# the direct and indirect descendants must be there
assert simple_tree_structure["sub1sub1sub1"] in descendants_qs
assert simple_tree_structure["sub1sub1sub2"] in descendants_qs
assert simple_tree_structure["sub2sub1"] in descendants_qs
assert simple_tree_structure["sub2sub2"] in descendants_qs

if include_self:
assert simple_tree_structure["sub1sub1"] in descendants_qs
assert simple_tree_structure["root2"] in descendants_qs
else:
assert simple_tree_structure["sub1sub1"] not in descendants_qs
assert simple_tree_structure["root2"] not in descendants_qs

# ... and nothing else
assert descendants_qs.count() == expect_count


def test_get_root(db, simple_tree_structure):
assert (
simple_tree_structure["sub1sub2"].get_root() == simple_tree_structure["root1"]
)
assert (
simple_tree_structure["sub1sub1"].get_root() == simple_tree_structure["root1"]
)
assert (
simple_tree_structure["sub2sub2"].get_root() == simple_tree_structure["root2"]
)
assert (
simple_tree_structure["sub2sub1"].get_root() == simple_tree_structure["root2"]
)
assert (
simple_tree_structure["sub1sub1sub2"].get_root()
== simple_tree_structure["root1"]
)


def test_all_roots(db, simple_tree_structure):
qs1 = Scope.objects.filter(
pk__in=[
simple_tree_structure["sub1sub1sub1"].pk,
simple_tree_structure["sub1sub2"].pk,
]
).all_roots()
assert qs1.count() == 1
assert qs1.filter(pk=simple_tree_structure["root1"].pk).exists()

qs2 = Scope.objects.filter(
pk__in=[
simple_tree_structure["sub1sub1sub1"].pk,
simple_tree_structure["sub2sub2"].pk,
]
).all_roots()
assert qs2.count() == 2
assert qs2.filter(pk=simple_tree_structure["root1"].pk).exists()
assert qs2.filter(pk=simple_tree_structure["root2"].pk).exists()
22 changes: 22 additions & 0 deletions emeis/core/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
HTTP_405_METHOD_NOT_ALLOWED,
)

from emeis.core.serializers import ScopeSerializer


def test_me_200(db, acl, client):
client.force_authenticate(user=acl.user)
Expand Down Expand Up @@ -332,3 +334,23 @@ def test_sorted_scopes_when_forced_language(
]

assert received_names == ["eins", "zwei"]


@pytest.mark.parametrize(
"has_parent, is_in_db, expect_level",
[
(False, False, 0),
(False, True, 0),
(True, False, 1),
(True, True, 1),
],
)
def test_serializer_level(db, scope_factory, has_parent, is_in_db, expect_level):
scope = scope_factory(parent=scope_factory() if has_parent else None)
if not is_in_db:
scope.pk = None

ser = ScopeSerializer(instance=scope)

level = ser.data["level"]
assert level == expect_level

0 comments on commit 538b4df

Please sign in to comment.