Skip to content

Commit

Permalink
Simplify reading a single coil
Browse files Browse the repository at this point in the history
  • Loading branch information
alexrudd2 committed Aug 12, 2024
1 parent 377ff2c commit bab69f3
Showing 1 changed file with 29 additions and 39 deletions.
68 changes: 29 additions & 39 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,16 @@ async def _get_x(self, start: int, end: int | None) -> dict:

start_coil = 32 * (start // 100) + start % 100 - 1
if end is None:
count = 1
else:
if end % 100 == 0 or end % 100 > 16:
raise ValueError('X end address must be *01-*16.')
if end < 1 or end > 816:
raise ValueError('X end address must be in [001, 816].')
end_coil = 32 * (end // 100) + end % 100 - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, 1)
return coils.bits[0]

if end % 100 == 0 or end % 100 > 16:
raise ValueError('X end address must be *01-*16.')
if end < 1 or end > 816:
raise ValueError('X end address must be in [001, 816].')
end_coil = 32 * (end // 100) + end % 100 - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
output = {}
current = start
for bit in coils.bits:
Expand Down Expand Up @@ -231,18 +229,16 @@ async def _get_y(self, start: int, end: int | None) -> dict:

start_coil = 8192 + 32 * (start // 100) + start % 100 - 1
if end is None:
count = 1
else:
if end % 100 == 0 or end % 100 > 16:
raise ValueError('Y end address must be *01-*16.')
if end < 1 or end > 816:
raise ValueError('Y end address must be in [001, 816].')
end_coil = 8192 + 32 * (end // 100) + end % 100 - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, 1)
return coils.bits[0]

if end % 100 == 0 or end % 100 > 16:
raise ValueError('Y end address must be *01-*16.')
if end < 1 or end > 816:
raise ValueError('Y end address must be in [001, 816].')
end_coil = 8192 + 32 * (end // 100) + end % 100 - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
output = {}
current = start
for bit in coils.bits:
Expand Down Expand Up @@ -270,16 +266,13 @@ async def _get_c(self, start: int, end: int | None) -> dict | bool:

start_coil = 16384 + start - 1
if end is None:
count = 1
else:
if end <= start or end > 2000:
raise ValueError('C end address must be >start and <=2000.')
end_coil = 16384 + end - 1
count = end_coil - start_coil + 1
return (await self.read_coils(start_coil, 1)).bits[0]

if end <= start or end > 2000:
raise ValueError('C end address must be >start and <=2000.')
end_coil = 16384 + end - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
return {f'c{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count}

async def _get_t(self, start: int, end: int | None) -> dict | bool:
Expand All @@ -297,16 +290,14 @@ async def _get_t(self, start: int, end: int | None) -> dict | bool:

start_coil = 45057 + start - 1
if end is None:
count = 1
else:
if end <= start or end > 500:
raise ValueError('T end address must be >start and <=500.')
end_coil = 14555 + end - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, 1)
return coils.bits[0]

if end <= start or end > 500:
raise ValueError('T end address must be >start and <=500.')
end_coil = 14555 + end - 1
count = end_coil - start_coil + 1
coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
return {f't{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count}

async def _get_ct(self, start: int, end: int | None) -> dict | bool:
Expand All @@ -324,16 +315,15 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool:

start_coil = 49152 + start - 1
if end is None:
count = 1
coils = await self.read_coils(start_coil, 1)
return coils.bits[0]
else:
if end <= start or end > 250:
raise ValueError('CT end address must be >start and <=250.')
end_coil = 49401 + end - 1
count = end_coil - start_coil + 1

coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
return {f'ct{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count}

async def _get_ds(self, start: int, end: int | None) -> dict | int:
Expand Down

0 comments on commit bab69f3

Please sign in to comment.