Skip to content

Commit

Permalink
feat: Enumeration Management Group
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfeniex committed Nov 9, 2024
1 parent 43ee844 commit 0e3949e
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docs/enumeration_management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Enumeration Management

::: smp.enumeration_management
123 changes: 123 additions & 0 deletions smp/enumeration_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""The Simple Management Protocol (SMP) Enumeration Management group."""

from __future__ import annotations

from enum import IntEnum, unique
from typing import List, Tuple

from pydantic import BaseModel, ConfigDict

import smp.error as smperr
import smp.header as smphdr
import smp.message as smpmsg


class GroupCountRequest(smpmsg.ReadRequest):
"""Read the number of SMP server groups."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_COUNT


class GroupCountResponse(smpmsg.ReadResponse):
"""SMP group count response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_COUNT

count: int


class ListOfGroupsRequest(smpmsg.ReadRequest):
"""List the available SMP groups."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.LIST_OF_GROUPS


class ListOfGroupsResponse(smpmsg.ReadResponse):
"""SMP group list response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.LIST_OF_GROUPS

groups: Tuple[int, ...]


class GroupIdRequest(smpmsg.ReadRequest):
"""List a SMP group by index."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_ID

index: int | None = None


class GroupIdResponse(smpmsg.ReadResponse):
"""SMP group at index response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_ID

group: int
end: bool | None = None


class GroupDetailsRequest(smpmsg.ReadRequest):
"""List a SMP group by index."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_DETAILS

groups: Tuple[int, ...]


class GroupDetails(BaseModel):
"""Group Details"""

model_config = ConfigDict(extra="forbid", frozen=True)

id: int
name: str | None = None
handlers: int | None = None


class GroupDetailsResponse(smpmsg.ReadResponse):
"""SMP group details response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_DETAILS

groups: Tuple[GroupDetails, ...]


@unique
class ENUM_MGMT_ERR(IntEnum):
"""Return codes for the enumeration management group."""

OK = 0
"""No error, this is implied if there is no ret value in the response."""

UNKNOWN = 1
"""Unknown error occurred."""

ERR_TOO_MANY_GROUP_ENTRIES = 2
"""Too many entries were provided."""

ERR_INSUFFICIENT_HEAP_FOR_ENTRIES = 3
"""Insufficient heap memory to store entry data."""

ENUM_MGMT_ERR_INDEX_TOO_LARGE = 4
"""Provided index is larger than the number of supported groups."""


class EnumManagementErrorV1(smperr.ErrorV1):
"""Error response to a enumeration management command."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT


class EnumManagementErrorV2(smperr.ErrorV2[ENUM_MGMT_ERR]):
"""Error response to a enumeration management command."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
8 changes: 8 additions & 0 deletions smp/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class FileManagement(IntEnum):
SUPPORTED_FILE_HASH_CHECKSUM_TYPES = 3
FILE_CLOSE = 4

@unique
class EnumManagement(IntEnum):
GROUP_COUNT = 0
LIST_OF_GROUPS = 1
GROUP_ID = 2
GROUP_DETAILS = 3

@unique
class ZephyrManagement(IntEnum):
ERASE_STORAGE = 0
Expand All @@ -80,6 +87,7 @@ class GroupId(IntEnum):
TEST_CRASH = 7
FILE_MANAGEMENT = 8
SHELL_MANAGEMENT = 9
ENUM_MANAGEMENT = 10
ZEPHYR_MANAGEMENT = 63


Expand Down
133 changes: 133 additions & 0 deletions tests/test_enumeration_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Test the SMP Enumeration Management group."""

from __future__ import annotations

from typing import Any, Dict, Type, TypeVar

import cbor2
import pytest
from pydantic import BaseModel

from smp import enumeration_management as smpenum
from smp import header as smphdr
from smp import message as smpmsg
from tests.helpers import make_assert_header

T = TypeVar("T", bound=smpmsg._MessageBase)


def _do_test(
msg: Type[T],
op: smphdr.OP,
command_id: smphdr.CommandId.EnumManagement,
data: Dict[str, Any],
nested_model: Type[BaseModel] | None = None,
) -> T:
cbor = cbor2.dumps(data, canonical=True)
assert_header = make_assert_header(smphdr.GroupId.ENUM_MANAGEMENT, op, command_id, len(cbor))

def _assert_common(r: smpmsg._MessageBase) -> None:
assert_header(r)
for k, v in data.items():
if type(v) is tuple and nested_model is not None:
for v2 in v:
assert v2 == nested_model(**v2).model_dump()
else:
assert v == getattr(r, k)
assert cbor == r.BYTES[8:]

r = msg(**data)

_assert_common(r) # serialize
_assert_common(msg.loads(r.BYTES)) # deserialize

return r


def test_GroupCountRequest() -> None:
_do_test(
smpenum.GroupCountRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.GROUP_COUNT,
{},
)


def test_GroupCountResponse() -> None:
r = _do_test(
smpenum.GroupCountResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.GROUP_COUNT,
{"count": 2},
)
assert r.count == 2


def test_ListOfGroupsRequest() -> None:
_do_test(
smpenum.ListOfGroupsRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.LIST_OF_GROUPS,
{},
)


def test_ListOfGroupsResponse() -> None:
r = _do_test(
smpenum.ListOfGroupsResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.LIST_OF_GROUPS,
{"groups": (2, 5, 15)},
)
assert r.groups == (2, 5, 15)


@pytest.mark.parametrize("index", [0, 1, None])
def test_GroupIdRequest(index: int | None) -> None:
_do_test(
smpenum.GroupIdRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.GROUP_ID,
{"index": index},
)


def test_GroupIdResponse() -> None:
r = _do_test(
smpenum.GroupIdResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.GROUP_ID,
{"group": 2},
)
assert r.group == 2
assert not r.end


def test_GroupDetailsRequest() -> None:
_do_test(
smpenum.GroupDetailsRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.GROUP_DETAILS,
{"groups": (2, 5, 15)},
)


def test_GroupDetailsResponse() -> None:
r = _do_test(
smpenum.GroupDetailsResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.GROUP_DETAILS,
{
"groups": (
{"id": 2, "name": "group2", "handlers": 2},
{"id": 5, "name": "group5", "handlers": 5},
{"id": 15, "name": "group15", "handlers": 15},
)
},
nested_model=smpenum.GroupDetails,
)
assert r.groups == (
smpenum.GroupDetails(id=2, name="group2", handlers=2),
smpenum.GroupDetails(id=5, name="group5", handlers=5),
smpenum.GroupDetails(id=15, name="group15", handlers=15),
)

0 comments on commit 0e3949e

Please sign in to comment.