From d0da60193c9ccc32b3c4b51de06bb433e1ede45d Mon Sep 17 00:00:00 2001 From: "pierre.turpin@insa-lyon.fr" Date: Fri, 15 Nov 2013 17:47:25 +0100 Subject: [PATCH] Ajout de la partie filtre avec receiver/emitter --- Blender/PositionController.py | 28 +++++++---- Filter/Emitter.py | 20 ++++++++ Filter/FilterHandler.py | 20 ++++++++ Filter/Receiver.py | 28 +++++++++++ Filter/ThreadedFilter.py | 64 ++++++++++++++++++++++++++ Filter/__init__.py | 1 + Server/AbstractServer.py | 1 + Server/ClientHandler.py | 8 +++- Server/Handler.py | 4 +- Server/Listener.py | 19 +++++--- Blender/TestServer.py => TestServer.py | 6 +-- 11 files changed, 176 insertions(+), 23 deletions(-) create mode 100644 Filter/Emitter.py create mode 100644 Filter/FilterHandler.py create mode 100644 Filter/Receiver.py create mode 100755 Filter/ThreadedFilter.py create mode 100644 Filter/__init__.py rename Blender/TestServer.py => TestServer.py (86%) diff --git a/Blender/PositionController.py b/Blender/PositionController.py index 04dc968..07cdde8 100644 --- a/Blender/PositionController.py +++ b/Blender/PositionController.py @@ -10,15 +10,12 @@ from Server import ClientHandler # Shared memory -relPosition = mathutils.Vector((0, 1, 0)) +relPosition = [0, 0, 0, 0, 0, 0] lockPosition = threading.Lock() # Sample trajectory to test the module import math class server(ClientHandler): - def __init__(self, addr): - ClientHandler.__init__(self, addr) - def _parseRecv(self, data_raw): global lockPosition global relPosition @@ -26,13 +23,16 @@ def _parseRecv(self, data_raw): data_line = data_raw.decode('utf-8').split('\n') for data in data_line: data_array = data.split(' ') - # Read data parsed like 'POSITION XXX YYY ZZZ' to test - if len(data_array) == 4 and data_array[0] == 'POSITION': + # Read data parsed like 'POSITION X Y Z THETA PHI PSY' to test + if len(data_array) == 7 and data_array[0] == 'TOGETIC': try: # Get the position x = float(data_array[1]) y = float(data_array[2]) z = float(data_array[3]) + theta = float(data_array[4]) + phi = float(data_array[5]) + psy = float(data_array[6]) except ValueError: continue @@ -41,6 +41,9 @@ def _parseRecv(self, data_raw): relPosition[0] = x relPosition[1] = y relPosition[2] = z + relPosition[3] = theta + relPosition[4] = phi + relPosition[5] = psy lockPosition.release() def _msgToSend(self): @@ -62,9 +65,9 @@ def __init__(self, controller): self._server.start() def __del__(self): - if self._server is not None: + if self._server is not None and self._server.isAlive(): self._server.stop() - self._server.join() + self._server.join(2) def run(self, controller): global lockPosition @@ -73,9 +76,16 @@ def run(self, controller): owner = controller.owner lockPosition.acquire(True) - owner.worldPosition = self._initPosition + relPosition + x = relPosition[0] + y = relPosition[1] + z = relPosition[2] + theta = relPosition[3] + phi = relPosition[4] + psy = relPosition[5] lockPosition.release() + owner.worldPosition = self._initPosition + mathutils.Vector((x, y, z)) + static_controller = None def main(controller): global static_controller diff --git a/Filter/Emitter.py b/Filter/Emitter.py new file mode 100644 index 0000000..b7ddaf3 --- /dev/null +++ b/Filter/Emitter.py @@ -0,0 +1,20 @@ +#!/bin/env python3 + +import sys + +sys.path += ['..'] +from Server import Handler + +def Emitter(shm): + class _Emitter(Handler): + def _msgToSend(self): + data = shm.data + data_str = 'TOGETIC ' + ' '.join(map(str, data)) + return bytes(data_str + '\n', 'UTF-8') + + def _parseRecv(self, data_raw): + pass + + def _run(self): + pass + return _Emitter diff --git a/Filter/FilterHandler.py b/Filter/FilterHandler.py new file mode 100644 index 0000000..a6e0b00 --- /dev/null +++ b/Filter/FilterHandler.py @@ -0,0 +1,20 @@ +#!/bin/env python3 + +import sys + +sys.path += ['..'] +from Server.AbstractServer import AbstractServer + +class FilterHandler(AbstractServer): + def __init__(self, input_shm, output_shm): + AbstractServer.__init__(self) + self._in_shm = input_shm + self._out_shm = output_shm + + def _serve(self): + in_data = self._in_shm.data + out_data = in_data + self._out_shm.data = out_data + + def _free(self): + pass diff --git a/Filter/Receiver.py b/Filter/Receiver.py new file mode 100644 index 0000000..ceb1b0a --- /dev/null +++ b/Filter/Receiver.py @@ -0,0 +1,28 @@ +#!/bin/env python3 + +import sys + +sys.path += ['..'] +from Server import ClientHandler + +class Receiver(ClientHandler): + def __init__(self, addr, shm): + ClientHandler.__init__(self, addr) + self._shm = shm + + def _msgToSend(self): + pass + + def _parseRecv(self, data_raw): + data_line = data_raw.decode('UTF-8').split('\n') + for data in data_line: + data_array = data.split(' ') + if len(data_array) == 7 and data_array[0] == 'TOGETIC': + try: + info = map(float, data_array[1:]) + except ValueError: + return + self._shm.data = info + + def _run(self): + pass diff --git a/Filter/ThreadedFilter.py b/Filter/ThreadedFilter.py new file mode 100755 index 0000000..07dbf4f --- /dev/null +++ b/Filter/ThreadedFilter.py @@ -0,0 +1,64 @@ +#!/bin/env python3 + +import sys +from threading import Lock + +sys.path += ['..'] +from Server.AbstractServer import AbstractServer +from Server.Listener import Listener + +from Filter.Receiver import Receiver +from Filter.Emitter import Emitter +from Filter.FilterHandler import FilterHandler + +class shm: + def __init__(self): + self._data = None + self._lock = Lock() + + def set(self, data): + with self._lock: + self._data = data + + def get(self): + with self._lock: + return self._data + + data = property(get, set) + +class ThreadedFilter(AbstractServer): + def __init__(self, addr_input, addr_output): + AbstractServer.__init__(self) + + shm_in = shm() + shm_out = shm() + self._receiver = Receiver(addr_input, shm_in) + self._handler = FilterHandler(shm_in, shm_out) + self._emitter = Listener(addr_output, Emitter(shm_out)) + + def start(self): + self._receiver.start() + self._handler.start() + self._emitter.start() + AbstractServer.start(self) + + def _serve(self): + pass + + def _free(self): + for s in [self._receiver, self._handler, self._emitter]: + s.stop() + if s.isAlive(): + s.join(2) + +if __name__ == '__main__': + addr_in = '/tmp/togetic-input' + addr_out = '/tmp/togetic-out' + f = ThreadedFilter(addr_in, addr_out) + try: + f.start() + except KeyboardInterrupt: + f.stop() + f.join(2) + if f.isAlive(): + f.terminate() diff --git a/Filter/__init__.py b/Filter/__init__.py new file mode 100644 index 0000000..a369f6b --- /dev/null +++ b/Filter/__init__.py @@ -0,0 +1 @@ +from ThreadedFilter import ThreadedFilter diff --git a/Server/AbstractServer.py b/Server/AbstractServer.py index c9f7ae7..c336f65 100644 --- a/Server/AbstractServer.py +++ b/Server/AbstractServer.py @@ -42,6 +42,7 @@ def run(self): except Exception as e: print('Server `', self, '` stopped by an exception :', e, file=sys.stderr) self.stop() + raise self._free() def _serve(self): diff --git a/Server/ClientHandler.py b/Server/ClientHandler.py index 1c766b4..ffc2fa3 100644 --- a/Server/ClientHandler.py +++ b/Server/ClientHandler.py @@ -5,10 +5,14 @@ class ClientHandler(Handler): def __init__(self, addr): # Connect to the socket as a client + self._addr = addr sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + Handler.__init__(self, (sock, addr)) + + def start(self): try: - sock.connect(addr) + self._socket.connect(self._addr) except (FileNotFoundError, ConnectionRefusedError): raise - Handler.__init__(self, (sock, addr)) + Handler.start(self) diff --git a/Server/Handler.py b/Server/Handler.py index 21fc870..27a74fd 100644 --- a/Server/Handler.py +++ b/Server/Handler.py @@ -1,4 +1,4 @@ -import select +from select import select from Server.AbstractServer import AbstractServer @@ -22,7 +22,7 @@ def _serve(self): """ \brief Select usable IOs and send/recv in consequence. """ - selection = select.select([self._socket], [self._socket], [], 0) + selection = select([self._socket], [self._socket], [], 0) if selection[0]: data = self._socket.recv(4096) self._parseRecv(data) diff --git a/Server/Listener.py b/Server/Listener.py index 15815e5..a07a2ab 100644 --- a/Server/Listener.py +++ b/Server/Listener.py @@ -24,19 +24,26 @@ def __init__(self, addr, pipe): a new instance of `clientServer` and start it immedialty. """ AbstractServer.__init__(self) + self._addr = addr self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - if os.path.exists(addr): + self._clients = [] + self._pipe = pipe + + def start(self): + if os.path.exists(self._addr): try: - os.remove(addr) + os.remove(self._addr) except OSError: raise try: - self._socket.bind(addr) + self._socket.bind(self._addr) except socket.error: self._socket.close() raise - self._clients = [] - self._pipe = pipe + except OSError: + self._socket.close() + raise + AbstractServer.start(self) def _serve(self): """ @@ -63,7 +70,5 @@ def _free(self): for client in self._clients: client.stop() client.join(2) - if client.isAlive(): - client.terminate() self._socket.close() diff --git a/Blender/TestServer.py b/TestServer.py similarity index 86% rename from Blender/TestServer.py rename to TestServer.py index 1efe840..3246ba8 100755 --- a/Blender/TestServer.py +++ b/TestServer.py @@ -15,8 +15,8 @@ def __init__(self, client): self._dt = 0.01 def _msgToSend(self): - x, y, z = path(self._time) - data = 'POSITION ' + str(x) + ' ' + str(y) + ' ' + str(z) + data_int = path(self._time) + data = 'TOGETIC ' + ' '.join(map(str, data_int)) return bytes(data + '\n', 'UTF-8') def _parseRecv(self, data): @@ -30,7 +30,7 @@ def _run(self): def dummyPath(t): import math - return (10 * math.cos(t), 10 * math.sin(t), 0) + return (10 * math.cos(t), 10 * math.sin(t), 0, 0, 0, 0) if __name__ == '__main__': parser = argparse.ArgumentParser()