diff --git a/etc/faraday/proxy.sample.ini b/etc/faraday/proxy.sample.ini index e0d54d7c..c6f4fd33 100644 --- a/etc/faraday/proxy.sample.ini +++ b/etc/faraday/proxy.sample.ini @@ -16,6 +16,7 @@ TESTMODE=0 TESTRATE=1 TESTCALLSIGN=REPLACEME TESTNODEID=REPLACEME +PAYLOADSIZE=39 [UNIT0] CALLSIGN=REPLACEME diff --git a/faraday/proxy.py b/faraday/proxy.py index 00caedfd..070ae5ea 100644 --- a/faraday/proxy.py +++ b/faraday/proxy.py @@ -21,6 +21,8 @@ import time import argparse import shutil +import socket +import struct from flask import Flask from flask import request @@ -72,6 +74,7 @@ parser.add_argument('--test-callsign', dest='testcallsign', help='Set Faraday test mode callsign') parser.add_argument('--test-nodeid', dest='testnodeid', type=int, help='Set Faraday test mode nodeid') parser.add_argument('--test-rate', dest='testrate', default=1, type=int, help='Set Faraday test mode rate') +parser.add_argument('--payload-size', dest='payloadsize', default=39, type=int, help='Set Faraday data mode payload size') # Proxy database options parser.add_argument('--database', help='Set Faraday Proxy database') @@ -197,6 +200,8 @@ def configureProxy(args, proxyConfigPath): config.set('PROXY', 'testnodeid', args.testnodeid) if args.testrate: config.set('PROXY', 'testrate', args.testrate) + if args.payloadsize: + config.set('PROXY', 'payloadsize', args.payloadsize) #Configure Proxy databases if args.database is not None: @@ -252,14 +257,37 @@ def configureProxy(args, proxyConfigPath): logger.warning("--start option not present, exiting Proxy server!") sys.exit(0) -# Create and initialize dictionary queues -postDict = {} +# Create and initialize dictionary queues for post/get dictionaries +# would like to not be global but Flask currently forces this postDicts = {} getDicts = {} -unitDict = {} -def uart_worker(modem, getDicts, units, log): +def startServer(modem, dataPort): + # Start socket + s = socket.socket() + host = socket.gethostname() + port = dataPort + result = 0 + + # Check port status + while result == 0: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = 1 + result = sock.connect_ex((host, port)) + + except IOError as e: + logger.warning(e) + + logger.info("Started server on {0}:{1}".format(host, port)) + + s.bind((host, port)) + + return s + + +def uart_worker(modem, getDicts, postDicts, units, log): """ Interface Faraday ports over USB UART @@ -279,6 +307,7 @@ def uart_worker(modem, getDicts, units, log): # Place data into the FIFO coming from UART try: for port in modem['com'].RxPortListOpen(): + if(modem['com'].RxPortHasItem(port)): for i in range(0, modem['com'].RxPortItemCount(port)): # Data is available @@ -290,7 +319,7 @@ def uart_worker(modem, getDicts, units, log): getDicts[modem['unit']][port].append(item) except: - getDicts[modem['unit']][port] = deque([], maxlen=100) + getDicts[modem['unit']][port] = deque([]) getDicts[modem['unit']][port].append(item) # Check for Proxy logging and save to SQL if true @@ -311,17 +340,23 @@ def uart_worker(modem, getDicts, units, log): # COM ports and create the necessary buffers on the fly for port in postDicts[modem['unit']].keys(): try: + count = len(postDicts[modem['unit']][port]) + except: # Port simply doesn't exist so don't bother + pass else: for num in range(count): # Data is available, pop off [unit][port] queue # and convert to BASE64 before sending to UART - message = postDicts[modem['unit']][port].popleft() - message = base64.b64decode(message) - modem['com'].POST(port, len(message), message) + try: + message = postDicts[modem['unit']][port].popleft() + message = base64.b64decode(message) + modem['com'].POST(port, len(message), message) + except StandardError as e: + logger.error(e) # Slow down while loop to something reasonable time.sleep(0.01) @@ -382,7 +417,7 @@ def testdb_read_worker(): getDicts[unit][port].append(item) except: - getDicts[unit][port] = deque([], maxlen=100) + getDicts[unit][port] = deque([]) getDicts[unit][port].append(item) logger.debug('Appended packet: id [ {} ] port [ {} ]' @@ -393,6 +428,253 @@ def testdb_read_worker(): row = cursor.fetchone() +def acceptConnection(server, faraday): + conn, addr = server.accept() + logger.debug("Got connection from {0} on {1}".format(addr, faraday)) + return conn, addr + + +def closeConnection(conn, addr, faraday): + # close the connection + logger.info("Closing connection with {0} on {1}".format(addr[0], faraday)) + time.sleep(0.01) # Need to give time for any TX to finish + try: + conn.close() + except IOError as e: + logger.error(e) + + +def extractBytes(data, dataBuffer, unit): + # data is a BASE64 string with unknown length, iterate and separete + #data = data.decode('base64', 'strict') + + for byte in data: + try: + byte = struct.unpack("c", byte)[0] + dataBuffer[unit].append(byte) + + except: + logger.debug("Creating dataBuffer") + dataBuffer[unit] = deque([]) + dataBuffer[unit].append(byte) + + +def receiveData(conn, addr, dataBuffer, unit): + while True: + try: + data = conn.recv(4096) + except IOError as e: + logger.warning(e) + closeConnection(conn, addr, unit) + break + + if not data: + closeConnection(conn, addr, unit) + break + + # Expect BASE64 so decode it + extractBytes(data, dataBuffer, unit) + + +def sendData(conn, addr, getDicts, unit, payloadSize): + # Enter infinite loop serving up data as it arrives until socket closes + while True: + # clear list every cycle + dataQueue = [] + + # Check if getDicts[unit][1] exists + try: + dataQueue = getDicts[unit][1] + except KeyError: + # Port 1 data doesn't exist yet + logger.debug("port 1 doesn't exist") + pass + + if len(dataQueue) > 0: + try: + # pop off a data entry from the left of getDicts + temp = dataQueue.popleft() + except IndexError as e: + # Empty queue + logger.error(e) + + try: + # We have a data item so decode it + data = temp['data'].decode('base64', 'strict') + + except UnicodeError as e: + logger.error(e) + + # unpack 123 byte frames and retrieve data originally sent to socket + if len(data) == 123: + try: + dataList = struct.unpack("BB121s", data) + dataList2 = struct.unpack("B{0}s".format(payloadSize), dataList[2][:payloadSize + 1]) + socketData = dataList2[1][:dataList2[0]] + + except struct.error as e: + logger.warning(e) + logger.warning(len(data)) + logger.warning(repr(data)) + + except StandardError as e: + logger.warning(e) + + else: + # If previous try is successful, then send data + try: + conn.sendall(socketData) + + except IOError as e: + # Socket has probably closed, break out of loop + break + + # unpack 42 byte frames and retrieve data originally sent to socket + if len(data) == 42: + try: + dataList = struct.unpack("BB{0}s".format(payloadSize + 1), data) + dataList2 = struct.unpack("B{0}s".format(payloadSize), dataList[2][:payloadSize + 1]) + socketData = dataList2[1][:dataList2[0]] + + except struct.error as e: + logger.warning(e) + logger.warning(len(data)) + logger.warning(repr(data)) + + except StandardError as e: + logger.warning(e) + + else: + # If previous try is successful, then send data + try: + conn.sendall(socketData) + + except IOError as e: + # Socket has probably closed, break out of loop + break + + +def socket_worker(modem, getDicts, dataPort, dataBuffer): + """ + Create sockets for data connections + + """ + logger.debug('Starting socket_worker thread') + unit = modem['unit'] + dataBuffer[unit] = {} + + # Start a socket server for the modem + server = startServer(unit, dataPort) + + # Listen to server in infinit loop + server.listen(5) + while True: + # continuously accept connections and read data from socket to buffers + conn, addr = acceptConnection(server, unit) + logger.info("Connected TX data thread for {0} to {1} on IP port {2}".format(addr[0], unit, dataPort)) + receiveData(conn, addr, dataBuffer, unit) + + +def socket_worker_RX(modem, getDicts, dataPort, dataBuffer, payloadSize): + """ + Create sockets for data connections + + """ + logger.debug('Starting socket_worker thread') + unit = modem['unit'] + dataBuffer[unit] = {} + + # Start a socket server for the modem + server = startServer(unit, dataPort) + + # Listen to server in infinit loop + server.listen(5) + while True: + conn, addr = acceptConnection(server, unit) + logger.info("Connected RX data thread for {0} to {1} on IP port {2}".format(addr[0], unit, dataPort)) + while True: + try: + sendData(conn, addr, getDicts, unit, payloadSize) + except socket.error as e: + logger.info(e) + break + except StandardError as e: + logger.error("Type") + logger.error(type(e).__name__) + break + + # Reached the end of the loop, exit + closeConnection(conn, addr, unit) + break + + +def createPacket(data, size): + # initialize temp variable list and packet + temp = [] + packet = '' + + # Pop off "size" bytes and append to temporary list + i = 0 + for i in range(size): + try: + a = data.popleft() + temp.append(a) + + except IndexError: + # simply an empty queue + pass + # Join list together and append two control bytes, convert to BASE64 + try: + # TODO: Use better method of MSP430 header allocation + payload = ''.join(temp) + preamble = struct.pack("BB", 0, 0) # Header for MSP430 firmware + size = struct.pack("B", len(payload)) + framedPayload = size + payload + packet = (preamble + framedPayload).encode('base64', 'strict') # Proxy expects BASE64 + + except TypeError as e: + logger.error(e) + + except struct.error as e: + logger.warning(e) + + except UnicodeError as e: + logger.error(e) + + except StandardError as e: + logger.info("StandardError") + logger.error(e) + + return packet + + +def stagePacket(postDicts, unit, packetData): + # Append formatted packet to postDicts for corrent unit/port to send + # Create if postDicts entry doesn't exists + + try: + # Hardcoded for port 1 + postDicts[unit][1].append(packetData) + except: + postDicts[unit][1] = deque([]) + postDicts[unit][1].append(packetData) + + +def bufferWorker(modem, postDicts, dataBuffer, payloadSize): + logger.debug("Starting bufferWorker Thread") + + unit = modem['unit'] + + while True: + try: + + if len(dataBuffer[unit]) > 0: + packetData = createPacket(dataBuffer[unit], payloadSize) + stagePacket(postDicts, unit, packetData) + except: + pass + + # Initialize Flask microframework app = Flask(__name__) @@ -799,9 +1081,13 @@ def sqlInsert(data): def main(): + dataBuffer = {} + unitDict = {} + try: log = proxyConfig.getboolean('PROXY', 'LOG') testmode = proxyConfig.getboolean('PROXY', 'TESTMODE') + payloadSize = proxyConfig.getint('PROXY', 'PAYLOADSIZE') except ConfigParser.Error as e: logger.error("ConfigParse.Error: " + str(e)) sys.exit(1) # Sys.exit(1) is an error @@ -827,11 +1113,26 @@ def main(): except: logger.error('Could not connect to {0} on {1}'.format(node, values["com"])) + dataPort = 10000 for key in unitDict: - logger.info('Starting Thread For Unit: {0}'.format(str(key))) + logger.debug('Starting Thread For Unit: {0}'.format(str(key))) tempdict = {"unit": key, 'com': unitDict[key]} - t = threading.Thread(target=uart_worker, args=(tempdict, getDicts, units, log)) + t = threading.Thread(target=uart_worker, args=(tempdict, getDicts, postDicts, units, log)) t.start() + + logger.debug("starting socket_worker") + u = threading.Thread(target=socket_worker, args=(tempdict, getDicts, dataPort, dataBuffer)) + u.start() + + logger.debug("starting socket_worker_RX") + w = threading.Thread(target=socket_worker_RX, args=(tempdict, getDicts, dataPort + 10, dataBuffer, payloadSize)) + w.start() + + logger.debug("starting bufferWorker") + v = threading.Thread(target=bufferWorker, args=(tempdict, postDicts, dataBuffer, payloadSize)) + v.start() + + dataPort += 1 else: t = threading.Thread(target=testdb_read_worker) t.start() @@ -844,6 +1145,7 @@ def main(): logger.error("ConfigParse.Error: " + str(e)) sys.exit(1) # Sys.exit(1) is an error + # Start Flask server app.run(host=proxyHost, port=proxyPort, threaded=True) diff --git a/setup.cfg b/setup.cfg index e8f2f137..722ad53b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,7 @@ summary = FaradayRF amateur radio open source software description-file = README home-page = https://github.com/FaradayRF/Faraday-Software license = GPLv3 -version = 0.0.1015 +version = 0.0.1016 classifier = "Development Status :: 2 - Pre-Alpha" "Framework :: Flask" @@ -51,7 +51,7 @@ data_files = etc/faraday/deviceconfiguration.sample.ini etc/faraday/faraday_config.sample.ini etc/faraday/data.sample.ini - + [entry_points] console_scripts = faraday-proxy = faraday.proxy:main @@ -60,4 +60,4 @@ console_scripts = faraday-simpleui = faraday.simpleui:main faraday-deviceconfiguration = faraday.deviceconfiguration:main faraday-simpleconfig = faraday.simpleconfig:main - faraday-data = faraday.data:main \ No newline at end of file + faraday-data = faraday.data:main