Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward to correct recipient #121

Open
wants to merge 10 commits into
base: wip-proxyRequests
Choose a base branch
from
Open
10 changes: 7 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ RUN apk add build-base

FROM base

COPY . /bumper

WORKDIR /bumper
COPY requirements.txt /requirements.txt

# install required python packages
RUN pip3 install -r requirements.txt

WORKDIR /bumper

# Copy only required folders instead of all
COPY create_certs/ create_certs/
COPY bumper/ bumper/

ENTRYPOINT ["python3", "-m", "bumper"]
104 changes: 68 additions & 36 deletions bumper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ def strtobool(strbool):
# os.environ['PYTHONASYNCIODEBUG'] = '1' # Uncomment to enable ASYNCIODEBUG
bumper_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))

log_to_stdout = os.environ.get("LOG_TO_STDOUT")

# Set defaults from environment variables first
# Folders
logs_dir = os.environ.get("BUMPER_LOGS") or os.path.join(bumper_dir, "logs")
os.makedirs(logs_dir, exist_ok=True) # Ensure logs directory exists or create
if not log_to_stdout:
logs_dir = os.environ.get("BUMPER_LOGS") or os.path.join(bumper_dir, "logs")
os.makedirs(logs_dir, exist_ok=True) # Ensure logs directory exists or create
data_dir = os.environ.get("BUMPER_DATA") or os.path.join(bumper_dir, "data")
os.makedirs(data_dir, exist_ok=True) # Ensure data directory exists or create
certs_dir = os.environ.get("BUMPER_CERTS") or os.path.join(bumper_dir, "certs")
Expand Down Expand Up @@ -81,27 +84,36 @@ def strtobool(strbool):
)

bumperlog = logging.getLogger("bumper")
bumper_rotate = RotatingFileHandler("logs/bumper.log", maxBytes=5000000, backupCount=5)
bumper_rotate.setFormatter(logformat)
bumperlog.addHandler(bumper_rotate)
if not log_to_stdout:
bumper_rotate = RotatingFileHandler("logs/bumper.log", maxBytes=5000000, backupCount=5)
bumper_rotate.setFormatter(logformat)
bumperlog.addHandler(bumper_rotate)
else:
bumperlog.addHandler(logging.StreamHandler(sys.stdout))
# Override the logging level
# bumperlog.setLevel(logging.INFO)

confserverlog = logging.getLogger("confserver")
conf_rotate = RotatingFileHandler(
"logs/confserver.log", maxBytes=5000000, backupCount=5
)
conf_rotate.setFormatter(logformat)
confserverlog.addHandler(conf_rotate)
if not log_to_stdout:
conf_rotate = RotatingFileHandler(
"logs/confserver.log", maxBytes=5000000, backupCount=5
)
conf_rotate.setFormatter(logformat)
confserverlog.addHandler(conf_rotate)
else:
confserverlog.addHandler(logging.StreamHandler(sys.stdout))
# Override the logging level
# confserverlog.setLevel(logging.INFO)

mqttserverlog = logging.getLogger("mqttserver")
mqtt_rotate = RotatingFileHandler(
"logs/mqttserver.log", maxBytes=5000000, backupCount=5
)
mqtt_rotate.setFormatter(logformat)
mqttserverlog.addHandler(mqtt_rotate)
if not log_to_stdout:
mqtt_rotate = RotatingFileHandler(
"logs/mqttserver.log", maxBytes=5000000, backupCount=5
)
mqtt_rotate.setFormatter(logformat)
mqttserverlog.addHandler(mqtt_rotate)
else:
mqttserverlog.addHandler(logging.StreamHandler(sys.stdout))
# Override the logging level
# mqttserverlog.setLevel(logging.INFO)

Expand All @@ -117,53 +129,73 @@ def strtobool(strbool):

### Additional MQTT Logs
translog = logging.getLogger("transitions")
translog.addHandler(mqtt_rotate)
if not log_to_stdout:
translog.addHandler(mqtt_rotate)
else:
translog.addHandler(logging.StreamHandler(sys.stdout))
translog.setLevel(logging.CRITICAL + 1) # Ignore this logger
logging.getLogger("passlib").setLevel(logging.CRITICAL + 1) # Ignore this logger
brokerlog = logging.getLogger("hbmqtt.broker")
#brokerlog.setLevel(
# logging.CRITICAL + 1
#) # Ignore this logger #There are some sublogs that could be set if needed (.plugins)
brokerlog.addHandler(mqtt_rotate)
if not log_to_stdout:
brokerlog.addHandler(mqtt_rotate)
else:
brokerlog.addHandler(logging.StreamHandler(sys.stdout))
protolog = logging.getLogger("hbmqtt.mqtt.protocol")
#protolog.setLevel(
# logging.CRITICAL + 1
#) # Ignore this logger
protolog.addHandler(mqtt_rotate)
if not log_to_stdout:
protolog.addHandler(mqtt_rotate)
else:
protolog.addHandler(logging.StreamHandler(sys.stdout))
clientlog = logging.getLogger("hbmqtt.client")
#clientlog.setLevel(logging.CRITICAL + 1) # Ignore this logger
clientlog.addHandler(mqtt_rotate)

if not log_to_stdout:
clientlog.addHandler(mqtt_rotate)
else:
clientlog.addHandler(logging.StreamHandler(sys.stdout))
helperbotlog = logging.getLogger("helperbot")
helperbot_rotate = RotatingFileHandler(
"logs/helperbot.log", maxBytes=5000000, backupCount=5
)
helperbot_rotate.setFormatter(logformat)
helperbotlog.addHandler(helperbot_rotate)
if not log_to_stdout:
helperbot_rotate = RotatingFileHandler(
"logs/helperbot.log", maxBytes=5000000, backupCount=5
)
helperbot_rotate.setFormatter(logformat)
helperbotlog.addHandler(helperbot_rotate)
else:
helperbotlog.addHandler(logging.StreamHandler(sys.stdout))
# Override the logging level
# helperbotlog.setLevel(logging.INFO)

boterrorlog = logging.getLogger("boterror")
boterrorlog_rotate = RotatingFileHandler(
"logs/boterror.log", maxBytes=5000000, backupCount=5
)
boterrorlog_rotate.setFormatter(logformat)
boterrorlog.addHandler(boterrorlog_rotate)
if not log_to_stdout:
boterrorlog_rotate = RotatingFileHandler(
"logs/boterror.log", maxBytes=5000000, backupCount=5
)
boterrorlog_rotate.setFormatter(logformat)
boterrorlog.addHandler(boterrorlog_rotate)
else:
boterrorlog.addHandler(logging.StreamHandler(sys.stdout))
# Override the logging level
# boterrorlog.setLevel(logging.INFO)

xmppserverlog = logging.getLogger("xmppserver")
xmpp_rotate = RotatingFileHandler(
"logs/xmppserver.log", maxBytes=5000000, backupCount=5
)
xmpp_rotate.setFormatter(logformat)
xmppserverlog.addHandler(xmpp_rotate)
if not log_to_stdout:
xmpp_rotate = RotatingFileHandler(
"logs/xmppserver.log", maxBytes=5000000, backupCount=5
)
xmpp_rotate.setFormatter(logformat)
xmppserverlog.addHandler(xmpp_rotate)
else:
xmppserverlog.addHandler(logging.StreamHandler(sys.stdout))
# Override the logging level
# xmppserverlog.setLevel(logging.INFO)

logging.getLogger("asyncio").setLevel(logging.CRITICAL + 1) # Ignore this logger


# iptables -A PREROUTING -t nat -i wlp0s20f3 -p tcp --dport 443 -j REDIRECT --to-port 8883
mqtt_listen_port = 8883
conf1_listen_port = 443
conf2_listen_port = 8007
Expand Down
2 changes: 1 addition & 1 deletion bumper/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def config_proxyMode_defaults():
{"type":"app","host":"eco-us-api.ecovacs.com","ip":"47.89.135.130","match":"eco-"},
{"type":"app","host":"ecovacs.com","ip":"47.90.210.46"},
{"type":"app","host":"ecouser.net","ip":"116.62.93.217"},
{"type":"mqtt_server","host":"mq-ww.ecouser.net","ip":"47.254.52.46"},
{"type":"mqtt_server","host":"mq-ww.ecouser.net","ip":"47.254.143.26"},
]
opendb = db_get()
with opendb:
Expand Down
35 changes: 22 additions & 13 deletions bumper/mqttserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import logging
import asyncio
import os
from typing import Dict

import hbmqtt
import websockets
from hbmqtt.broker import Broker
from hbmqtt.client import MQTTClient
from hbmqtt.client import MQTTClient, ConnectException
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
import pkg_resources
import time
Expand Down Expand Up @@ -226,7 +229,9 @@ def __init__(self, address, **kwargs):
mqttserverlog.exception("{}".format(e))

class BumperProxyModeMQTTClient(MQTTClient):
ecohelpername = ""

eco_helper_names: Dict[str, str] = {}

async def _connect_coro(self): #Override default to ignore ssl verification
kwargs = dict()

Expand Down Expand Up @@ -278,7 +283,7 @@ async def _connect_coro(self): #Override default to ignore ssl verification
reader = StreamReaderAdapter(conn_reader)
writer = StreamWriterAdapter(conn_writer)
elif scheme in ('ws', 'wss'):
websocket = await websockets.connect(
websocket = await websockets.connect(
self.session.broker_uri,
subprotocols=['mqtt'],
loop=self._loop,
Expand Down Expand Up @@ -322,21 +327,25 @@ async def get_msg(self):
msgdata = str(message.data.decode("utf-8"))

proxymodelog.info(f"MQTT Proxy Client - Message Received From Ecovacs - Topic: {message.topic} - Message: {msgdata}")
ttopic = message.topic.split("/")
self.ecohelpername = ttopic[3]
ttopic[3] = "proxyhelper"
ttopic_comb = "/".join(ttopic)
proxymodelog.info(f"MQTT Proxy Client - Converted Topic From {message.topic} TO {ttopic_comb}")
proxymodelog.info(f"MQTT Proxy Client - Proxy Forward Message to Helperbot - Topic: {ttopic_comb} - Message: {msgdata.encode()}")
topic = message.topic
ttopic = topic.split("/")
if ttopic[1] == "p2p":
self.eco_helper_names[ttopic[10]] = ttopic[3]
ttopic[3] = "proxyhelper"
topic = "/".join(ttopic)
proxymodelog.info(f"MQTT Proxy Client - Converted Topic From {message.topic} TO {topic}")

proxymodelog.info(
f"MQTT Proxy Client - Proxy Forward Message to Robot - Topic: {topic} - Message: {msgdata.encode()}")
await bumper.mqtt_helperbot.Client.publish(
ttopic_comb, msgdata.encode(), QOS_0
topic, msgdata.encode(), QOS_0
)

except Exception as e:
proxymodelog.error(f"MQTT Proxy Client - get_msg Exception - {e}")

class BumperMQTTServer_Plugin:
proxyclients = {}
proxyclients: Dict[str, BumperProxyModeMQTTClient] = {}
def __init__(self, context):
self.context = context
try:
Expand Down Expand Up @@ -395,7 +404,7 @@ async def authenticate(self, *args, **kwargs):

try:
await self.proxyclients[client_id].connect(
f"mqtts://{username}:{password}@{mqtt_server}:8883",
f"mqtts://{username}:{password}@{mqtt_server}:443",
)
except Exception as e:
mqttserverlog.error(f"MQTT Proxy Mode - Exception connecting with proxy to ecovacs - {e}")
Expand Down Expand Up @@ -514,7 +523,7 @@ async def handle_helperbot_msg(self, client_id, message):
if not str(message.topic).split("/")[3] == "proxyhelper": # if from proxyhelper, don't send back to ecovacs...yet
if str(message.topic).split("/")[6] == "proxyhelper":
ttopic = message.topic.split("/")
ttopic[6] = self.proxyclients[client_id].ecohelpername
ttopic[6] = self.proxyclients[client_id].eco_helper_names.pop(ttopic[10], "")
ttopic_join = "/".join(ttopic)
proxymodelog.info(f"MQTT Proxy Client - Bot Message Converted Topic From {message.topic} TO {ttopic_join} with message: {msgdata}")
else:
Expand Down
2 changes: 1 addition & 1 deletion bumper/plugins/bumper_confserver_portal_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def handle_devmanager_botcommand(self, request):
try:
json_body = json.loads(await request.text())

randomid = "".join(random.sample(string.ascii_letters, 6))
randomid = "".join(random.sample(string.ascii_letters, 4))
did = ""
if "toId" in json_body: # Its a command
did = json_body["toId"]
Expand Down
3 changes: 2 additions & 1 deletion docs/Env_Var.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ Bumper has a number of environment variables to help with custom deployments and
| BUMPER_KEY | {full path to bumper.key location} | The private server key (bumper.key) to be used by the Bumper server |
| BUMPER_LOGS | {full path to logs directory} | The directory where logs should be stored |
| BUMPER_DATA | {full path to data directory} | The directory where persistent data should be stored (bumper.db) |
| BUMPER_DEBUG | true | Run Bumper with debug mode/logging |
| BUMPER_DEBUG | true | Run Bumper with debug mode/logging |
| LOG_TO_STDOUT | true | Instead of logging to logs/, logs to to STDOUT |