Skip to content

Commit

Permalink
Merge pull request #8 from numat/add_tags
Browse files Browse the repository at this point in the history
Prep for adding tags by adding tests
  • Loading branch information
JamesJeffryes authored May 19, 2020
2 parents 3a868cc + 487dd36 commit 437ba39
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 28 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
clickplc
========

Python ≥3.5 driver and command-line tool for [Koyo Ethernet ClickPLCs](https://www.automationdirect.com/adc/Overview/Catalog/Programmable_Controllers/CLICK_Series_PLCs_(Stackable_Micro_Brick)).
Python ≥3.6 driver and command-line tool for [Koyo Ethernet ClickPLCs](https://www.automationdirect.com/adc/Overview/Catalog/Programmable_Controllers/CLICK_Series_PLCs_(Stackable_Micro_Brick)).

<p align="center">
<img src="https://www.automationdirect.com/microsites/clickplcs/images/expandedclick.jpg" />
Expand Down Expand Up @@ -57,5 +57,11 @@ The entire API is `get` and `set`, and takes a range of inputs:
>>> await plc.set('y101', True) # Sets Y101 to true
```

Currently, only X, Y, C, DS, and DF are supported. I personally haven't needed to
use the other categories, but they are straightforward to add if needed.
Currently, only X, Y, C, DS, and DF are supported:
| x | bool | Input point |
| y | bool | Output point |
| c | bool | (C)ontrol relay |
| df | float | (D)ata register, (f)loating point |
| ds | int16 | (D)ata register, (s)igned int |
I personally haven't needed to use the other categories, but they are
straightforward to add if needed.
45 changes: 45 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Python Django
# Test a Django project on multiple versions of Python.
# Add steps that analyze code, save build artifacts, deploy, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/python

trigger:
- master

pool:
vmImage: 'ubuntu-latest'
strategy:
matrix:
Python37:
PYTHON_VERSION: '3.7'
maxParallel: 3

steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '$(PYTHON_VERSION)'
architecture: 'x64'

- script: |
python -m pip install --upgrade pip
pip install '.[test]'
displayName: 'Install dependencies'

- script: |
cd clickplc
pytest --junitxml=../reports/test-coverage.xml --cov=. --cov-report=xml
env:
USER_TOKEN: $(USER_TOKEN)
displayName: 'Run tests'

- task: PublishTestResults@2
inputs:
testResultsFiles: 'reports/test-coverage.xml'
testRunTitle: '$(Agent.OS) - $(Build.BuildNumber)[$(Agent.JobName)] - Python $(python.version)'
condition: succeededOrFailed()

- task: PublishCodeCoverageResults@1
inputs:
codeCoverageTool: Cobertura
summaryFileLocation: '$(System.DefaultWorkingDirectory)/**/coverage.xml'
reportDirectory: '$(System.DefaultWorkingDirectory)/**/htmlcov'
2 changes: 1 addition & 1 deletion clickplc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/python3
"""
A Python driver for Koyo ClickPLC ethernet units.
Expand Down
56 changes: 36 additions & 20 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
A Python driver for Koyo ClickPLC ethernet units.
Distributed under the GNU General Public License v2
Copyright (C) 2019 NuMat Technologies
Copyright (C) 2020 NuMat Technologies
"""
import pydoc
from string import digits
from typing import Union, List

from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder

Expand All @@ -17,9 +21,15 @@ class ClickPLC(AsyncioModbusClient):
abstracting corner cases and providing a simple asynchronous interface.
"""

supported = ['x', 'y', 'c', 'df', 'ds']
data_types = {
'x': 'bool', # Input point
'y': 'bool', # Output point
'c': 'bool', # (C)ontrol relay
'df': 'float', # (D)ata register (f)loating point
'ds': 'int16', # (D)ata register (s)igned int
}

async def get(self, address):
async def get(self, address: str) -> dict:
"""Get variables from the ClickPLC.
Args:
Expand Down Expand Up @@ -47,7 +57,7 @@ async def get(self, address):

if end_index is not None and end_index < start_index:
raise ValueError("End address must be greater than start address.")
if category not in self.supported:
if category not in self.data_types:
raise ValueError("{} currently unsupported.".format(category))
if end is not None and end[:i].lower() != category:
raise ValueError("Inter-category ranges are unsupported.")
Expand All @@ -73,11 +83,17 @@ async def set(self, address, data):

i = address.index(next(s for s in address if s.isdigit()))
category, index = address[:i].lower(), int(address[i:])
if category not in self.supported:
raise ValueError("{} currently unsupported.".format(category))
if category not in self.data_types:
raise ValueError(f"{category} currently unsupported.")
data_type = self.data_types[category].rstrip(digits)
for datum in data:
if type(datum) == int and data_type == 'float':
datum = float(datum)
if type(datum) != pydoc.locate(data_type):
raise ValueError(f"Expected {address} as a {data_type}.")
return await getattr(self, '_set_' + category)(index, data)

async def _get_x(self, start, end):
async def _get_x(self, start: int, end: int) -> dict:
"""Read X addresses. Called by `get`.
X entries start at 0 (1 in the Click software's 1-indexed
Expand Down Expand Up @@ -122,13 +138,13 @@ async def _get_x(self, start, end):
if current > end:
break
elif current % 100 <= 16:
output['x{:03d}'.format(current)] = bit
output[f'x{current:03}'] = bit
elif current % 100 == 32:
current += 100 - 32
current += 1
return output

async def _get_y(self, start, end):
async def _get_y(self, start: int, end: int) -> dict:
"""Read Y addresses. Called by `get`.
Y entries start at 8192 (8193 in the Click software's 1-indexed
Expand Down Expand Up @@ -173,13 +189,13 @@ async def _get_y(self, start, end):
if current > end:
break
elif current % 100 <= 16:
output['y{:03d}'.format(current)] = bit
output[f'y{current:03}'] = bit
elif current % 100 == 32:
current += 100 - 32
current += 1
return output

async def _get_c(self, start, end):
async def _get_c(self, start: int, end: int) -> Union[dict, bool]:
"""Read C addresses. Called by `get`.
C entries start at 16384 (16385 in the Click software's 1-indexed
Expand All @@ -204,9 +220,9 @@ async def _get_c(self, start, end):
coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
return {'c{:d}'.format(start + i): bit for i, bit in enumerate(coils.bits)}
return {f'c{(start + i)}': bit for i, bit in enumerate(coils.bits)}

async def _get_df(self, start, end):
async def _get_df(self, start: int, end: int) -> Union[dict, float]:
"""Read DF registers. Called by `get`.
DF entries start at Modbus address 28672 (28673 in the Click software's
Expand All @@ -228,7 +244,7 @@ async def _get_df(self, start, end):
return decoder.decode_32bit_float()
return {f'df{n}': decoder.decode_32bit_float() for n in range(start, end + 1)}

async def _get_ds(self, start, end):
async def _get_ds(self, start: int, end: int) -> Union[dict, int]:
"""Read DS registers. Called by `get`.
DS entries start at Modbus address 0 (1 in the Click software's
Expand All @@ -249,7 +265,7 @@ async def _get_ds(self, start, end):
return decoder.decode_16bit_int()
return {f'ds{n}': decoder.decode_16bit_int() for n in range(start, end + 1)}

async def _set_x(self, start, data):
async def _set_x(self, start: int, data: Union[List[bool], bool]):
"""Set X addresses. Called by `set`.
For more information on the quirks of X coils, read the `_get_x`
Expand Down Expand Up @@ -277,7 +293,7 @@ async def _set_x(self, start, data):
else:
await self.write_coil(coil, data)

async def _set_y(self, start, data):
async def _set_y(self, start: int, data: Union[List[bool], bool]):
"""Set Y addresses. Called by `set`.
For more information on the quirks of Y coils, read the `_get_y`
Expand Down Expand Up @@ -305,7 +321,7 @@ async def _set_y(self, start, data):
else:
await self.write_coil(coil, data)

async def _set_c(self, start, data):
async def _set_c(self, start: int, data: Union[List[bool], bool]):
"""Set C addresses. Called by `set`.
For more information on the quirks of C coils, read the `_get_c`
Expand All @@ -322,7 +338,7 @@ async def _set_c(self, start, data):
else:
await self.write_coil(coil, data)

async def _set_df(self, start, data):
async def _set_df(self, start: int, data: Union[List[float], float]):
"""Set DF registers. Called by `set`.
The ClickPLC is little endian, but on registers ("words") instead
Expand Down Expand Up @@ -351,13 +367,13 @@ def _pack(value):
else:
await self.write_register(address, _pack(data), skip_encode=True)

async def _set_ds(self, start, data):
async def _set_ds(self, start: int, data: Union[List[int], int]):
"""Set DS registers. Called by `set`.
See _get_ds for more information.
"""
if start < 1 or start > 4500:
raise ValueError('DS must be in [1, 500]')
raise ValueError('DS must be in [1, 4500]')
address = (start - 1)

def _pack(value):
Expand Down
55 changes: 55 additions & 0 deletions clickplc/mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from collections import defaultdict
from unittest.mock import MagicMock

from pymodbus.bit_read_message import ReadCoilsResponse, ReadDiscreteInputsResponse
from pymodbus.bit_write_message import WriteSingleCoilResponse, WriteMultipleCoilsResponse
from pymodbus.register_read_message import ReadHoldingRegistersResponse
from pymodbus.register_write_message import WriteMultipleRegistersResponse

from clickplc.driver import ClickPLC as realClickPLC


class AsyncMock(MagicMock):
"""Magic mock that works with async methods"""
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)


class ClickPLC(realClickPLC):
"""A version of the driver with the remote communication replaced with local data storage
for testing"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = AsyncMock()
self._coils = defaultdict(bool)
self._discrete_inputs = defaultdict(bool)
self._registers = defaultdict(bytes)

async def _request(self, method, *args, **kwargs):
if method == 'read_coils':
address, count = args
return ReadCoilsResponse([self._coils[address + i] for i in range(count)])
if method == 'read_discrete_inputs':
address, count = args
return ReadDiscreteInputsResponse([self._discrete_inputs[address + i]
for i in range(count)])
elif method == 'read_holding_registers':
address, count = args
return ReadHoldingRegistersResponse([int.from_bytes(self._registers[address + i],
byteorder='big')
for i in range(count)])
elif method == 'write_coil':
address, data = args
self._coils[address] = data
return WriteSingleCoilResponse(address, data)
elif method == 'write_coils':
address, data = args
for i, d in enumerate(data):
self._coils[address + i] = d
return WriteMultipleCoilsResponse(address, data)
elif method == 'write_registers':
address, data = args
for i, d in enumerate(data):
self._registers[address + i] = d
return WriteMultipleRegistersResponse(address, data)
return NotImplementedError(f'Unrecognised method: {method}')
Loading

0 comments on commit 437ba39

Please sign in to comment.