Skip to content

Commit

Permalink
Add option for TLS (#32)
Browse files Browse the repository at this point in the history
Add support for providing client/CA certs/keys to enable TLS connections to an MQTT broker.
  • Loading branch information
iconnor authored Apr 4, 2021
1 parent 4294158 commit 5e30b04
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .pep8speaks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pycodestyle: # Same as scanner.linter value. Other option is flake8
max-line-length: 120 # Default is 79 in PEP 8 - sorry IBM 3270 terminal users
89 changes: 65 additions & 24 deletions modbus4mqtt/modbus4mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@

MAX_DECIMAL_POINTS = 8


class mqtt_interface():
def __init__(self, hostname, port, username, password, config_file, mqtt_topic_prefix):
def __init__(self, hostname, port, username, password, config_file, mqtt_topic_prefix,
use_tls=True, insecure=False, cafile=None, cert=None, key=None):
self.hostname = hostname
self._port = port
self.username = username
self.password = password
self.config = self._load_modbus_config(config_file)
self.use_tls = use_tls
self.insecure = insecure
self.cafile = cafile
self.cert = cert
self.key = key
if not mqtt_topic_prefix.endswith('/'):
mqtt_topic_prefix = mqtt_topic_prefix + '/'
self.prefix = mqtt_topic_prefix
self.address_offset = self.config.get('address_offset', 0)
self.registers = self.config['registers']
for register in self.registers:
register['address'] += self.address_offset
self.modbus_connect_retries = -1 # Retry forever by default
self.modbus_reconnect_sleep_interval = 5 # Wait this many seconds between modbus connection attempts
self.modbus_connect_retries = -1 # Retry forever by default
self.modbus_reconnect_sleep_interval = 5 # Wait this many seconds between modbus connection attempts

def connect(self):
# Connects to modbus and MQTT.
Expand Down Expand Up @@ -65,6 +72,9 @@ def connect_mqtt(self):
self._mqtt_client._on_disconnect = self._on_disconnect
self._mqtt_client._on_message = self._on_message
self._mqtt_client._on_subscribe = self._on_subscribe
if self.use_tls:
self._mqtt_client.tls_set(ca_certs=self.cafile, certfile=self.cert, keyfile=self.key)
self._mqtt_client.tls_insecure_set(self.insecure)
self._mqtt_client.connect(self.hostname, self._port, 60)
self._mqtt_client.loop_start()

Expand All @@ -87,8 +97,9 @@ def poll(self):
for register in self._get_registers_with('pub_topic'):
try:
value = self._mb.get_value(register.get('table', 'holding'), register['address'])
except:
logging.warning("Couldn't get value from register {} in table {}".format(register['address'], register.get('table', 'holding')))
except Exception:
logging.warning("Couldn't get value from register {} in table {}".format(register['address'],
register.get('table', 'holding')))
continue
# Filter the value through the mask, if present.
value &= register.get('mask', 0xFFFF)
Expand Down Expand Up @@ -157,24 +168,28 @@ def _on_message(self, client, userdata, msg):
if 'value_map' in register:
try:
value = str(value, 'utf-8')
if not value in register['value_map']:
logging.warning("Value not in value_map. Topic: {}, value: {}, valid values: {}".format(topic, value, register['value_map'].keys()))
if value not in register['value_map']:
logging.warning("Value not in value_map. Topic: {}, value: {}, valid values: {}".format(topic,
value, register['value_map'].keys()))
continue
# Map the value from the human-readable form into the raw modbus number
value = register['value_map'][value]
except UnicodeDecodeError:
logging.warning("Failed to decode MQTT payload as UTF-8. Can't compare it to the value_map for register {}".format(register))
logging.warning("Failed to decode MQTT payload as UTF-8. "
"Can't compare it to the value_map for register {}".format(register))
continue
try:
# Scale the value, if required.
value = float(value)
value = round(value/register.get('scale', 1))
except ValueError:
logging.error("Failed to convert register value for writing. Bad/missing value_map? Topic: {}, Value: {}".format(topic, value))
logging.error("Failed to convert register value for writing. "
"Bad/missing value_map? Topic: {}, Value: {}".format(topic, value))
continue
type = register.get('type', 'uint16')
value = modbus_interface._convert_from_type_to_uint16(value, type)
self._mb.set_value(register.get('table', 'holding'), register['address'], int(value), register.get('mask', 0xFFFF))
self._mb.set_value(register.get('table', 'holding'), register['address'], int(value),
register.get('mask', 0xFFFF))

# This throws ValueError exceptions if the imported registers are invalid
@staticmethod
Expand All @@ -197,27 +212,34 @@ def _validate_registers(registers):
duplicate_json_keys[register['pub_topic']] = []
retain_setting[register['pub_topic']] = set()
if 'json_key' in register and 'set_topic' in register:
raise ValueError("Bad YAML configuration. Register with set_topic '{}' has a json_key specified. This is invalid. See https://github.com/tjhowse/modbus4mqtt/issues/23 for details.".format(register['set_topic']))
raise ValueError("Bad YAML configuration. Register with set_topic '{}' has a json_key specified. "
"This is invalid. See https://github.com/tjhowse/modbus4mqtt/issues/23 for details."
.format(register['set_topic']))
all_pub_topics.add(register['pub_topic'])

# Check that all registers with duplicate pub topics have json_keys
for register in registers:
if register['pub_topic'] in duplicate_pub_topics:
if 'json_key' not in register:
raise ValueError("Bad YAML configuration. pub_topic '{}' duplicated across registers without json_key field. Registers that share a pub_topic must also have a unique json_key.".format(register['pub_topic']))
raise ValueError("Bad YAML configuration. pub_topic '{}' duplicated across registers without "
"json_key field. Registers that share a pub_topic must also have a unique "
"json_key.".format(register['pub_topic']))
if register['json_key'] in duplicate_json_keys[register['pub_topic']]:
raise ValueError("Bad YAML configuration. pub_topic '{}' duplicated across registers with a duplicated json_key field. Registers that share a pub_topic must also have a unique json_key.".format(register['pub_topic']))
raise ValueError("Bad YAML configuration. pub_topic '{}' duplicated across registers with a "
"duplicated json_key field. Registers that share a pub_topic must also have "
"a unique json_key.".format(register['pub_topic']))
duplicate_json_keys[register['pub_topic']] += [register['json_key']]
if 'retain' in register:
retain_setting[register['pub_topic']].add(register['retain'])
# Check that there are no disagreements as to whether this pub_topic should be retained or not.
for topic, retain_set in retain_setting.items():
if len(retain_set) > 1:
raise ValueError("Bad YAML configuration. pub_topic '{}' has conflicting retain settings.".format(topic))
raise ValueError("Bad YAML configuration. pub_topic '{}' has conflicting retain settings."
.format(topic))

def _load_modbus_config(self, path):
yaml=YAML(typ='safe')
result = yaml.load(open(path,'r').read())
yaml = YAML(typ='safe')
result = yaml.load(open(path, 'r').read())
registers = [register for register in result['registers'] if 'pub_topic' in register]
mqtt_interface._validate_registers(registers)
return result
Expand All @@ -228,22 +250,41 @@ def loop_forever(self):
self.poll()
sleep(self.config['update_rate'])


@click.command()
@click.option('--hostname', default='localhost', help='The hostname or IP address of the MQTT server.', show_default=True)
@click.option('--port', default=1883, help='The port of the MQTT server.', show_default=True)
@click.option('--username', default='username', help='The username to authenticate to the MQTT server.', show_default=True)
@click.option('--password', default='password', help='The password to authenticate to the MQTT server.', show_default=True)
@click.option('--config', default='./Sungrow_SH5k_20.yaml', help='The YAML config file for your modbus device.', show_default=True)
@click.option('--mqtt_topic_prefix', default='modbus4mqtt', help='A prefix for published MQTT topics.', show_default=True)
def main(hostname, port, username, password, config, mqtt_topic_prefix):
@click.option('--hostname', default='localhost',
help='The hostname or IP address of the MQTT server.', show_default=True)
@click.option('--port', default=1883,
help='The port of the MQTT server.', show_default=True)
@click.option('--username', default='username',
help='The username to authenticate to the MQTT server.', show_default=True)
@click.option('--password', default='password',
help='The password to authenticate to the MQTT server.', show_default=True)
@click.option('--mqtt_topic_prefix', default='modbus4mqtt',
help='A prefix for published MQTT topics.', show_default=True)
@click.option('--config', default='./Sungrow_SH5k_20.yaml',
help='The YAML config file for your modbus device.', show_default=True)
@click.option('--use_tls', default=False,
help='Configure network encryption and authentication options. Enables SSL/TLS.', show_default=True)
@click.option('--insecure', default=True,
help='Do not check that the server certificate hostname matches the remote hostname.', show_default=True)
@click.option('--cafile', default=None,
help='The path to a file containing trusted CA certificates to enable encryption.', show_default=True)
@click.option('--cert', default=None,
help='Client certificate for authentication, if required by server.', show_default=True)
@click.option('--key', default=None,
help='Client private key for authentication, if required by server.', show_default=True)
def main(hostname, port, username, password, config, mqtt_topic_prefix, use_tls, insecure, cafile, cert, key):
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
logging.info("Starting modbus4mqtt v{}".format(version.version))
i = mqtt_interface(hostname, port, username, password, config, mqtt_topic_prefix)
i = mqtt_interface(hostname, port, username, password, config, mqtt_topic_prefix,
use_tls, insecure, cafile, cert, key)
i.connect()
i.loop_forever()


if __name__ == '__main__':
main()

0 comments on commit 5e30b04

Please sign in to comment.