Skip to content

Commit

Permalink
dynamic device selection
Browse files Browse the repository at this point in the history
  • Loading branch information
benderl committed Oct 12, 2023
1 parent bb0ade5 commit e853c13
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 33 deletions.
12 changes: 4 additions & 8 deletions packages/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ def schedule_jobs():
handler = HandlerAlgorithm()
prep = prepare.Prepare()
general_internal_chargepoint_handler = GeneralInternalChargepointHandler()
rfid0 = RfidReader("event0")
rfid1 = RfidReader("event1")
rfid = RfidReader()
event_ev_template = threading.Event()
event_ev_template.set()
event_charge_template = threading.Event()
Expand Down Expand Up @@ -212,12 +211,9 @@ def schedule_jobs():
t_soc = Thread(target=soc.update, args=(), name="SoC")
t_internal_chargepoint = Thread(target=general_internal_chargepoint_handler.handler,
args=(), name="Internal Chargepoint")
if hasattr(rfid0, "input_device"):
t_rfid0 = Thread(target=rfid0.loop, args=(), name="Internal Chargepoint")
t_rfid0.start()
if hasattr(rfid1, "input_device"):
t_rfid1 = Thread(target=rfid1.loop, args=(), name="Internal Chargepoint")
t_rfid1.start()
if rfid.keyboards_detected:
t_rfid = Thread(target=rfid.run, args=(), name="Internal Chargepoint")
t_rfid.start()

t_sub.start()
t_set.start()
Expand Down
152 changes: 127 additions & 25 deletions packages/modules/internal_chargepoint_handler/rfid.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,145 @@
import evdev
import asyncio
import logging
from evdev import InputDevice

from evdev import InputDevice, ecodes, list_devices, categorize
from helpermodules import pub
log = logging.getLogger(__name__)


class RfidReader:
SCAN_CODES = {
0: None, 1: u'ESC', 2: u'1', 3: u'2', 4: u'3', 5: u'4', 6: u'5', 7: u'6', 8: u'7', 9: u'8',
10: u'9', 11: u'0', 12: u'-', 13: u'=', 14: u'BKSP', 15: u'TAB', 16: u'Q', 17: u'W', 18: u'E', 19: u'R',
20: u'T', 21: u'Y', 22: u'U', 23: u'I', 24: u'O', 25: u'P', 26: u'[', 27: u']', 28: u'CRLF', 29: u'LCTRL',
30: u'A', 31: u'S', 32: u'D', 33: u'F', 34: u'G', 35: u'H', 36: u'J', 37: u'K', 38: u'L', 39: u';',
40: u'"', 41: u'`', 42: u'LSHFT', 43: u'\\', 44: u'Z', 45: u'X', 46: u'C', 47: u'V', 48: u'B', 49: u'N',
50: u'M', 51: u',', 52: u'.', 53: u'/', 54: u'RSHFT', 56: u'LALT', 100: u'RALT'
SCAN_CODE_MAP = {
# function keys
# 0: None,
# ecodes.KEY_ESC: u'ESC',
# ecodes.KEY_BACKSPACE: u'BKSP',
# ecodes.KEY_TAB: u'TAB',
# ecodes.KEY_LEFTBRACE: u'[',
# ecodes.KEY_RIGHTBRACE: u']',
# ecodes.KEY_ENTER: u'CRLF',
# ecodes.KEY_LEFTCTRL: u'LCTRL',
# ecodes.KEY_LEFTSHIFT: u'LSHFT',
# ecodes.KEY_RIGHTSHIFT: u'RSHFT',
# ecodes.KEY_LEFTALT: u'LALT',
# ecodes.KEY_KPENTER: u'CRLF',
# ecodes.KEY_RIGHTCTRL: u'RCTRL,
# ecodes.KEY_RIGHTALT: u'RALT'

# number keys
ecodes.KEY_1: u'1',
ecodes.KEY_2: u'2',
ecodes.KEY_3: u'3',
ecodes.KEY_4: u'4',
ecodes.KEY_5: u'5',
ecodes.KEY_6: u'6',
ecodes.KEY_7: u'7',
ecodes.KEY_8: u'8',
ecodes.KEY_9: u'9',
ecodes.KEY_0: u'0',
ecodes.KEY_KP1: u'1',
ecodes.KEY_KP2: u'2',
ecodes.KEY_KP3: u'3',
ecodes.KEY_KP4: u'4',
ecodes.KEY_KP5: u'5',
ecodes.KEY_KP6: u'6',
ecodes.KEY_KP7: u'7',
ecodes.KEY_KP8: u'8',
ecodes.KEY_KP9: u'9',
ecodes.KEY_KP0: u'0',

# latin letters
ecodes.KEY_A: u'A',
ecodes.KEY_B: u'B',
ecodes.KEY_C: u'C',
ecodes.KEY_D: u'D',
ecodes.KEY_E: u'E',
ecodes.KEY_F: u'F',
ecodes.KEY_G: u'G',
ecodes.KEY_H: u'H',
ecodes.KEY_I: u'I',
ecodes.KEY_J: u'J',
ecodes.KEY_K: u'K',
ecodes.KEY_L: u'L',
ecodes.KEY_M: u'M',
ecodes.KEY_N: u'N',
ecodes.KEY_O: u'O',
ecodes.KEY_P: u'P',
ecodes.KEY_Q: u'Q',
ecodes.KEY_R: u'R',
ecodes.KEY_S: u'S',
ecodes.KEY_T: u'T',
ecodes.KEY_U: u'U',
ecodes.KEY_V: u'V',
ecodes.KEY_W: u'W',
ecodes.KEY_X: u'X',
ecodes.KEY_Y: u'Y',
ecodes.KEY_Z: u'Z',

# punctuation marks and other characters
ecodes.KEY_MINUS: u'-',
ecodes.KEY_EQUAL: u'=',
ecodes.KEY_SEMICOLON: u';',
ecodes.KEY_COMMA: u',',
ecodes.KEY_DOT: u'.',
ecodes.KEY_SLASH: u'/',
ecodes.KEY_KPASTERISK: u'*',
ecodes.KEY_KPMINUS: u'-',
ecodes.KEY_KPPLUS: u'+',
ecodes.KEY_KPDOT: u'.',
ecodes.KEY_KPSLASH: u'/',
# ecodes.KEY_APOSTROPHE: u'"',
# ecodes.KEY_GRAVE: u'`',
# ecodes.KEY_BACKSLASH: u'\\',
}
_detected_keyboards: list[InputDevice] = []

def __init__(self, device: str) -> None:
def __init__(self) -> None:
try:
try:
self.input_device = InputDevice(f"/dev/input/{device}")
except FileNotFoundError:
log.debug(f"Input-Event {device} nicht gefunden.")
devices = [InputDevice(path) for path in list_devices()]
for device in devices:
log.debug(f"**** {device.path} {device.name} {device.phys} ****")
log.debug(device.capabilities(verbose=True))
device_capabilities = device.capabilities()
if ecodes.EV_KEY in device_capabilities:
log.debug("device emits keyboard events")
if ecodes.KEY_ENTER in device_capabilities[1]:
log.debug("detected 'enter' key, device seems to be a keyboard")
self._detected_keyboards.append(device)
else:
log.debug("no 'enter' key detected, skipping device")
else:
log.debug("device does not emit keyboard events, skipping")
log.info("detected keyboard devices:")
for device in self._detected_keyboards:
log.info(f"{device.path} {device.name}")
except Exception:
log.exception("Fehler im Rfid-Modul")

def loop(self):
def keyboards_detected(self) -> bool:
return len(self._detected_keyboards) > 0

async def _read_events(self, device: InputDevice):
log.debug(f"reading events from: {device.path}")
try:
key_string = ""
for event in self.input_device.read_loop():
if event.type == evdev.ecodes.EV_KEY:
data = evdev.categorize(event)
async for event in device.async_read_loop():
if event.type == ecodes.EV_KEY:
data = categorize(event)
if data.keystate == 1:
key_lookup = self.SCAN_CODES.get(data.scancode) or u'UNKNOWN:{}'.format(data.scancode)
key_string += str(format(key_lookup))
if "CRLF" in key_string:
key_string = key_string[:-4]
log.debug(f"RFID-String: {key_string}")
pub.pub_single("openWB/set/internal_chargepoint/last_tag", key_string)
key_string = ""
if data.scancode in (ecodes.KEY_ENTER, ecodes.KEY_KPENTER):
if len(key_string) > 0:
log.debug(f"RFID-String: {key_string}")
pub.pub_single("openWB/set/internal_chargepoint/last_tag", key_string)
key_string = ""
else:
log.debug(f"new key: {data.scancode} - {ecodes.KEY[data.scancode]}")
key_lookup = self.SCAN_CODE_MAP.get(data.scancode) or u''
key_string += str(format(key_lookup))
except Exception:
log.exception("Fehler im Rfid-Modul")

def run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for device in self._detected_keyboards:
asyncio.ensure_future(self._read_events(device))
loop.run_forever()

0 comments on commit e853c13

Please sign in to comment.