-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create simple box device for memory input/output (#70)
The envisioned use of this class is where you wish to simulate a network interface but the internal hardware logic is either not needed or very simple. A custom adapter can be made for the network interface and can simply read and write to an IoBox. I wondered if a lot of people would write devices like this and if it would make sense for tickit to provide a simple helper device. Any thoughts welcome. --------- Co-authored-by: Garry O'Donnell <garry.o'[email protected]>
- Loading branch information
1 parent
464663e
commit cd428de
Showing
2 changed files
with
180 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
import logging | ||
from typing import Any, Dict, Generic, List, Tuple, TypeVar | ||
|
||
from typing_extensions import NotRequired | ||
|
||
from tickit.core.components.component import Component, ComponentConfig | ||
from tickit.core.components.device_simulation import DeviceSimulation | ||
from tickit.core.device import Device, DeviceUpdate | ||
from tickit.core.typedefs import SimTime | ||
from tickit.utils.compat.typing_compat import TypedDict | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
A = TypeVar("A") | ||
V = TypeVar("V") | ||
|
||
|
||
class IoBoxDevice(Device, Generic[A, V]): | ||
""" | ||
A simple device which can take and store key-value pairs from both | ||
network adapters and the ticket graph. | ||
Adapters should write values to the device via device.write(addr, value) | ||
or device[addr] = value. The writes will be pending until the scheduler | ||
interrupts the device. For example: | ||
```python | ||
device[foo] | ||
>> 5 | ||
device[foo] = 6 | ||
device[foo] | ||
>> 5 | ||
interrupt() | ||
device[foo] | ||
>> 6 | ||
``` | ||
Linked devices can send values to be written via inputs and receive changes | ||
via outputs. For example: | ||
```python | ||
update = box.update(SimTime(0), {"updates": [(4, "foo")]}) | ||
assert update.outputs["updates"] == [(4, "foo")] | ||
>> 6 | ||
``` | ||
The two modes of I/O may used independently, optionally and interoperably. | ||
This device is useful for simulating a basic block of memory. | ||
The envisioned use of this class is where you wish to simulate a network | ||
interface but the internal hardware logic is either not needed or very simple. | ||
A custom adapter can be made for the network interface and can simply | ||
read and write to an IoBox. | ||
""" | ||
|
||
#: A typed mapping containing the 'input' input value | ||
Inputs: TypedDict = TypedDict( | ||
"Inputs", {"updates": NotRequired[List[Tuple[Any, Any]]]} | ||
) | ||
#: An empty typed mapping of device outputs | ||
Outputs: TypedDict = TypedDict( | ||
"Outputs", {"updates": NotRequired[List[Tuple[Any, Any]]]} | ||
) | ||
|
||
_memory: Dict[A, V] | ||
_change_buffer: List[Tuple[A, V]] | ||
|
||
def __init__(self) -> None: # noqa: D107 | ||
self._memory = {} | ||
self._change_buffer = [] | ||
|
||
def write(self, addr: A, value: V) -> None: | ||
"""Write a value to an address. | ||
The value will only make it into memory when update() is called | ||
e.g. by an interrupt. | ||
Args: | ||
addr (A): Address to store value | ||
value (V): Value to store | ||
""" | ||
self._change_buffer.append((addr, value)) | ||
|
||
def read(self, addr: A) -> V: | ||
"""Read a value from an address. | ||
Args: | ||
addr (A): Address to find value | ||
Returns: | ||
V: Value at address | ||
Raises: | ||
ValueError: If no value stored at address | ||
""" | ||
return self._memory[addr] | ||
|
||
# As well as read and write, can use device[addr] and device[addr] = "foo" | ||
__getitem__ = read | ||
__setitem__ = write | ||
|
||
def update(self, time: SimTime, inputs: Inputs) -> DeviceUpdate[Outputs]: | ||
"""Write all pending values to their addresses. | ||
Args: | ||
time (SimTime): (Simulated) time at which this is called | ||
inputs (Inputs): Inputs to this device, may contain addresses | ||
and values to update | ||
Returns: | ||
DeviceUpdate[Outputs]: Outputs and update, may contain addresses | ||
and values that have been updated, either via inputs or | ||
an adapter | ||
""" | ||
self._change_buffer += inputs.get("updates", []) | ||
updates = [] | ||
while self._change_buffer: | ||
addr, value = self._change_buffer.pop() | ||
self._memory[addr] = value | ||
updates.append((addr, value)) | ||
return DeviceUpdate(IoBoxDevice.Outputs(updates=updates), None) | ||
|
||
|
||
class IoBox(ComponentConfig): | ||
"""Arbitrary box of key-value pairs.""" | ||
|
||
def __call__(self) -> Component: # noqa: D102 | ||
return DeviceSimulation( | ||
name=self.name, | ||
device=IoBoxDevice(), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from typing import Any | ||
|
||
import pytest | ||
|
||
from tickit.core.typedefs import SimTime | ||
from tickit.devices.iobox import IoBoxDevice | ||
|
||
|
||
@pytest.fixture | ||
def box() -> IoBoxDevice[int, Any]: | ||
return IoBoxDevice() | ||
|
||
|
||
def test_raises_error_if_no_values(box: IoBoxDevice[int, Any]) -> None: | ||
with pytest.raises(KeyError): | ||
box.read(4) | ||
|
||
|
||
def test_writes_pending_until_update(box: IoBoxDevice[int, Any]) -> None: | ||
box.write(4, "foo") | ||
box.update(SimTime(0), {}) | ||
assert "foo" == box.read(4) | ||
box.write(4, "bar") | ||
assert "foo" == box.read(4) | ||
box.update(SimTime(0), {}) | ||
assert "bar" == box.read(4) | ||
|
||
|
||
def test_outputs_change(box: IoBoxDevice[int, Any]) -> None: | ||
box.write(4, "foo") | ||
update = box.update(SimTime(0), {}) | ||
assert update.outputs["updates"] == [(4, "foo")] | ||
|
||
|
||
def test_outputs_only_last_changes(box: IoBoxDevice[int, Any]) -> None: | ||
box.write(4, "foo") | ||
box.update(SimTime(0), {}) | ||
box.write(3, "bar") | ||
update = box.update(SimTime(0), {}) | ||
assert update.outputs["updates"] == [(3, "bar")] | ||
|
||
|
||
def test_writes_input(box: IoBoxDevice[int, Any]) -> None: | ||
box.update(SimTime(0), {"updates": [(4, "foo")]}) | ||
assert box.read(4) == "foo" | ||
|
||
|
||
def test_propagates_input(box: IoBoxDevice[int, Any]) -> None: | ||
update = box.update(SimTime(0), {"updates": [(4, "foo")]}) | ||
assert update.outputs["updates"] == [(4, "foo")] |