Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sd/sc to allow setting date/time in Click PLC #43

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 140 additions & 8 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ClickPLC(AsyncioModbusClient):
'c': 'bool', # (C)ontrol relay
't': 'bool', # (T)imer
'ct': 'bool', # (C)oun(t)er
'sc': 'bool', # (S)ystem (c)ontrol relays
'ds': 'int16', # (D)ata register (s)ingle
'dd': 'int32', # (D)ata register, (d)ouble
'dh': 'int16', # (D)ata register, (h)ex
Expand Down Expand Up @@ -112,7 +113,7 @@ async def get(self, address: str | None = None) -> dict:
category, start_index = start[:i].lower(), int(start[i:])
end_index = None if end is None else int(end[i:])

if end_index is not None and end_index < start_index:
if end_index is not None and end_index <= start_index:
raise ValueError("End address must be greater than start address.")
if category not in self.data_types:
raise ValueError(f"{category} currently unsupported.")
Expand Down Expand Up @@ -255,7 +256,7 @@ async def _get_c(self, start: int, end: int | None) -> dict | bool:
"""Read C addresses. Called by `get`.

C entries start at 16384 (16385 in the Click software's 1-indexed
notation). This continues for 2000 bits, ending at 18383.
notation) and span a total of 2000 bits, ending at 18383.

The response always returns a full byte of data. If you request
a number of addresses not divisible by 8, it will have extra data. The
Expand All @@ -279,7 +280,7 @@ async def _get_t(self, start: int, end: int | None) -> dict | bool:
"""Read T addresses.

T entries start at 45056 (45057 in the Click software's 1-indexed
notation). This continues for 500 bits, ending at 45555.
notation) and span a total of 500 bits, ending at 45555.

The response always returns a full byte of data. If you request
a number of addresses not divisible by 8, it will have extra data. The
Expand All @@ -304,7 +305,7 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool:
"""Read CT addresses.

CT entries start at 49152 (49153 in the Click software's 1-indexed
notation). This continues for 250 bits, ending at 49402.
notation) and span a total of 250 bits, ending at 49401.

The response always returns a full byte of data. If you request
a number of addresses not divisible by 8, it will have extra data. The
Expand All @@ -326,6 +327,39 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool:
coils = await self.read_coils(start_coil, count)
return {f'ct{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count}

async def _get_sc(self, start: int, end: int | None) -> dict | bool:
"""Read SC addresses. Called by `get`.

SC entries start at 61440 (61441 in the Click software's 1-indexed
notation) and span a total of 1000 bits, ending at 62439.

Args:
start: Starting SC address (1-indexed as per ClickPLC).
end: Optional ending SC address (inclusive, 1-indexed).

Returns:
A dictionary of SC values if `end` is provided, or a single bool
value if `end` is None.

Raises:
ValueError: If the start or end address is out of range or invalid.
"""
if start < 1 or start > 1000:
raise ValueError('SC start address must be in [1, 1000]')
if end is not None and (end <= start or end > 1000):
raise ValueError("SC end address must be >= start and <= 1000.")

start_coil = 61440 + (start - 1) # Modbus coil address for SC
if end is None:
# Read a single coil
return (await self.read_coils(start_coil, 1)).bits[0]

end_coil = 61440 + (end - 1)
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, count)
return {f'sc{start + i}': bit for i, bit in enumerate(coils.bits) if i < count}


async def _get_ds(self, start: int, end: int | None) -> dict | int:
"""Read DS registers. Called by `get`.

Expand Down Expand Up @@ -459,10 +493,10 @@ async def _get_sd(self, start: int, end: int | None) -> dict | int:
SD entries start at Modbus address 361440 (361441 in the Click software's
1-indexed notation). Each SD entry takes 16 bits.
"""
if start < 1 or start > 4500:
raise ValueError('SD must be in [1, 4500]')
if end is not None and (end < 1 or end > 4500):
raise ValueError('SD end must be in [1, 4500]')
if start < 1 or start > 1000:
raise ValueError('SD must be in [1, 1000]')
if end is not None and (end < 1 or end > 1000):
raise ValueError('SD end must be in [1, 1000]')

address = 61440 + start - 1
count = 1 if end is None else (end - start + 1)
Expand Down Expand Up @@ -559,6 +593,48 @@ async def _set_c(self, start: int, data: list[bool] | bool):
else:
await self.write_coil(coil, data)

async def _set_sc(self, start: int, data: list[bool] | bool):
"""Set SC addresses. Called by `set`.

SC entries start at 61440 (61441 in the Click software's 1-indexed
notation). This continues for 1000 bits.

Args:
start: Starting SC address (1-indexed as per ClickPLC).
data: Single value or list of values to set.

Raises:
ValueError: If the start address is out of range or is not writable,
or if the data list exceeds the allowed writable range.

Notes:
Only the following SC addresses are writable:
SC50, SC51, SC53, SC55, SC60, SC61, SC65, SC66, SC67, SC75, SC76, SC120, SC121.
"""
writable_sc_addresses = {
50, 51, 53, 55, 60, 61, 65, 66, 67, 75, 76, 120, 121
}

if start < 1 or start > 1000:
raise ValueError('SC start address must be in [1, 1000]')
if isinstance(data, list):
if len(data) > 1000 - start + 1:
raise ValueError('Data list longer than available SC addresses.')
for i in range(len(data)):
if (start + i) not in writable_sc_addresses:
raise ValueError(f"SC{start + i} is not writable.")
else:
if start not in writable_sc_addresses:
raise ValueError(f"SC{start} is not writable.")

coil = 61440 + (start - 1)

if isinstance(data, list):
await self.write_coils(coil, data)
else:
await self.write_coil(coil, data)


async def _set_df(self, start: int, data: list[float] | float):
"""Set DF registers. Called by `set`.

Expand Down Expand Up @@ -686,6 +762,62 @@ def _pack(values: list[int]):
else:
await self.write_register(address, _pack([data]), skip_encode=True)

async def _set_sd(self, start: int, data: list[int] | int):
"""Set writable SD registers. Called by `set`.

SD entries start at Modbus address 61440 (61441 in the Click software's
1-indexed notation). Each SD entry takes 16 bits.

Args:
start: Starting SD address (1-indexed as per ClickPLC).
data: Single value or list of values to set.

Raises:
ValueError: If an address is not writable or if data list exceeds the
allowed writable range.

Notes:
Only the following SD addresses are writable:
SD29, SD31, SD32, SD34, SD35, SD36, SD40, SD41, SD42, SD50,
SD51, SD60, SD61, SD106, SD107, SD108, SD112, SD113, SD114,
SD140, SD141, SD142, SD143, SD144, SD145, SD146, SD147,
SD214, SD215
"""
writable_sd_addresses = {
29, 31, 32, 34, 35, 36, 40, 41, 42, 50, 51, 60, 61, 106, 107, 108,
112, 113, 114, 140, 141, 142, 143, 144, 145, 146, 147, 214, 215
}

def validate_address(address: int):
if address not in writable_sd_addresses:
raise ValueError(f"SD{address} is not writable. Only specific SD registers are writable.")

if isinstance(data, list):
for idx, value in enumerate(data):
validate_address(start + idx)
else:
validate_address(start)

address = 61440 + (start - 1)

def _pack(values: list[int]):
builder = BinaryPayloadBuilder(byteorder=self.bigendian,
wordorder=self.lilendian)
for value in values:
builder.add_16bit_int(int(value))
return builder.build()

if isinstance(data, list):
if len(data) > len(writable_sd_addresses):
raise ValueError('Data list contains more elements than writable SD registers.')
payload = _pack(data)
await self.write_registers(address, payload, skip_encode=True)
else:
payload = _pack([data])
await self.write_register(address, payload, skip_encode=True)



async def _set_txt(self, start: int, data: str | list[str]):
"""Set TXT registers. Called by `set`.

Expand Down
116 changes: 116 additions & 0 deletions clickplc/tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ async def test_c_roundtrip(plc_driver):
await plc_driver.set('c2000', True)
assert await plc_driver.get('c2000') is True

@pytest.mark.asyncio(loop_scope='session')
async def test_sc_roundtrip(plc_driver):
"""Confirm writable SC bools are read back correctly after being set."""
# Test writing to SC50 (_PLC_Mode_Change_to_STOP) to stop PLC mode
await plc_driver.set('sc50', True)
assert await plc_driver.get('sc50') is True

# Test writing to SC60 and SC61 (_BT_Disable_Pairing, _BT_Activate_Pairing) to
# manage Bluetooth pairing
await plc_driver.set('sc60', [True, False])
expected = {'sc60': True, 'sc61': False}
assert expected == await plc_driver.get('sc60-sc61')

# Test writing to SC120 (_Network_Time_Request) to start an NTP request
await plc_driver.set('sc120', True)
assert await plc_driver.get('sc120') is True

# Test error handling for non-writable SC62 (_BT_Paired_Devices)
with pytest.raises(ValueError, match="SC62 is not writable"):
await plc_driver.set('sc62', True)

# Test error handling for non-writable SC63 (_BT_Pairing_SW_State)
with pytest.raises(ValueError, match="SC63 is not writable"):
await plc_driver.set('sc63', False)


@pytest.mark.asyncio(loop_scope='session')
async def test_ds_roundtrip(plc_driver):
"""Confirm ds ints are read back correctly after being set."""
Expand Down Expand Up @@ -145,6 +171,35 @@ async def test_dh_roundtrip(plc_driver):
await plc_driver.set('dh500', 500)
assert await plc_driver.get('dh500') == 500

@pytest.mark.asyncio(loop_scope='session')
async def test_sd_roundtrip(plc_driver):
"""Confirm writable SD ints are read back correctly after being set."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's split out the roundtrips for sd29 and sd35 into a separate test that fully sets the time, since that's what you actually care about.

At some point I expect there will be explicit functions for each of the 'special' SD registers.

# Test writing to SD29 (_RTC_New_Year) to adjust the RTC year (requires SC53
# trigger)
await plc_driver.set('sd29', 2024)
assert await plc_driver.get('sd29') == 2024

# Test writing to SD34, SD35, SD36 (_RTC_New_Hour, _RTC_New_Minute, _RTC_New_Second)
# to adjust RTC time (requires SC55 trigger)
await plc_driver.set('sd34', 12)
await plc_driver.set('sd35', [30, 45]) # Minute = 30, Second = 45
expected = {'sd34': 12, 'sd35': 30, 'sd36': 45}
assert expected == await plc_driver.get('sd34-sd36')

# Test writing to SD112 (_EIP_Con2_LostCount) to reset lost packets counter for
# Ethernet/IP Connection 2
await plc_driver.set('sd112', 0)
assert await plc_driver.get('sd112') == 0

# Test error handling for non-writable SD62 (_BT_Paired_Device_Count)
with pytest.raises(ValueError, match="SD62 is not writable"):
await plc_driver.set('sd62', 5)

# Test error handling for non-writable SD63 (_SD_Total_Memory_L)
with pytest.raises(ValueError, match="SD63 is not writable"):
await plc_driver.set('sd63', 500)


@pytest.mark.asyncio(loop_scope='session')
async def test_txt_roundtrip(plc_driver):
"""Confirm texts are read back correctly after being set."""
Expand Down Expand Up @@ -215,6 +270,39 @@ async def test_c_error_handling(plc_driver):
with pytest.raises(ValueError, match=r'Data list longer than available addresses.'):
await plc_driver.set('c2000', [True, True])

@pytest.mark.asyncio(loop_scope='session')
async def test_sc_error_handling(plc_driver):
"""Ensure errors are handled for invalid requests of SC registers."""
# Test valid boundary
await plc_driver.set('sc50', True) # Valid writable address
assert await plc_driver.get('sc50') is True

# Test invalid boundary (below range)
with pytest.raises(ValueError, match=r'SC start address must be in \[1, 1000\]'):
await plc_driver.set('sc0', True) # Below valid range

# Test invalid boundary (above range)
with pytest.raises(ValueError, match=r'SC start address must be in \[1, 1000\]'):
await plc_driver.set('sc1001', True) # Above valid range

# Test valid read-only SC
with pytest.raises(ValueError, match=r"SC62 is not writable."):
await plc_driver.set('sc62', True) # Read-only SC

# Test end address below start address
with pytest.raises(ValueError, match=r'End address must be greater than start address.'):
await plc_driver.get('sc100-sc50') # End address less than start address

# Test invalid range crossing writable boundaries
with pytest.raises(ValueError, match=r'SC52 is not writable.'):
# Range includes non-writable SC
await plc_driver.set('sc50', [True, True, True, True])

# Test data type mismatch
with pytest.raises(ValueError, match=r"Expected sc50 as a bool."):
await plc_driver.set('sc50', 123) # SC expects a bool value


@pytest.mark.asyncio(loop_scope='session')
async def test_t_error_handling(plc_driver):
"""Ensure errors are handled for invalid requests of t registers."""
Expand Down Expand Up @@ -295,6 +383,34 @@ async def test_ctd_error_handling(plc_driver):
with pytest.raises(ValueError, match=r'CTD end must be in \[1, 250\]'):
await plc_driver.get('ctd1-ctd251')

@pytest.mark.asyncio(loop_scope='session')
async def test_sd_error_handling(plc_driver):
"""Ensure errors are handled for invalid requests of SD registers."""
# Test out-of-range addresses
with pytest.raises(ValueError, match=r'SD must be in \[1, 1000\]'):
await plc_driver.get('sd1001') # Above valid range
with pytest.raises(ValueError, match=r'SD end must be in \[1, 1000\]'):
await plc_driver.get('sd1-sd1001') # Range includes invalid end address
with pytest.raises(ValueError, match=r'SD1001 is not writable. Only specific SD registers are writable.'):
await plc_driver.set('sd1001', 1) # Above valid range

# Test read-only boundaries
with pytest.raises(ValueError, match=r'SD62 is not writable.'):
await plc_driver.set('sd62', 1) # Read-only SD register
with pytest.raises(ValueError, match=r'SD63 is not writable.'):
await plc_driver.set('sd63', 1) # Read-only SD register

# Test type mismatch
with pytest.raises(ValueError, match=r'Expected sd29 as a int.'):
await plc_driver.set('sd29', 'string') # SD expects an integer value
with pytest.raises(ValueError, match=r'Expected sd29 as a int.'):
await plc_driver.set('sd29', [1, 'string']) # SD expects all integers

# Test valid writable SD
await plc_driver.set('sd29', 2024) # Valid writable address
assert await plc_driver.get('sd29') == 2024


@pytest.mark.asyncio(loop_scope='session')
@pytest.mark.parametrize('prefix', ['y', 'c'])
async def test_bool_typechecking(plc_driver, prefix):
Expand Down