diff --git a/agent/Dockerfile b/agent/Dockerfile index 60817e1..8bb3af0 100644 --- a/agent/Dockerfile +++ b/agent/Dockerfile @@ -1,10 +1,49 @@ -FROM python:3 +FROM ubuntu:17.04 + +RUN \ + apt-get update \ + && apt-get install -y \ + apt-utils \ + build-essential \ + cmake \ + git \ + wget \ + libncurses5-dev \ + libreadline-dev \ + nettle-dev \ + libgnutls28-dev \ + libuv1-dev \ + libmsgpack-dev \ + libargon2-0-dev \ + libssl-dev \ + net-tools \ + nmap \ + && apt-get dist-upgrade -y \ + && apt-get clean + +RUN wget https://www.python.org/ftp/python/3.6.3/Python-3.6.3.tgz \ + && tar xfzv Python-3.6.3.tgz \ + && cd Python-3.6.3 \ + && ./configure \ + && make \ + && make install \ + && pip3 install cython + +RUN git clone --branch 1.3.6 https://github.com/savoirfairelinux/opendht.git \ + && cd opendht \ + && mkdir build \ + && cd build \ + && cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On -DOPENDHT_LTO=On \ + && make -j8 \ + && make install \ + && cd ../.. \ + && rm -rf opendht ENV PYTHONUNBUFFERED 1 WORKDIR /code ADD requirements.txt /code -RUN pip install -r requirements.txt +RUN pip3 install -r requirements.txt ADD . /code diff --git a/agent/activate.settings.sh b/agent/activate.settings.sh deleted file mode 100644 index 8dd27f2..0000000 --- a/agent/activate.settings.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env bash -# App settings go here, they're validated in app.settings - an attempt to coerce to the internal type is made at run time - -# Boolean values can be any of: '1', 'TRUE', 'YES', 'ON' - case insensitive - -# the AIO_ env variables are used by `adev runserver` when serving your app for development -#export AIO_APP_PATH="app/" -#export AIO_STATIC_PATH="${AGENT_SRC_DIR}/sn_agent_ui/static/" -export AIO_LIVERELOAD="true" -export AIO_DEBUG_TOOLBAR="true" -export AIO_PRECHECK="true" -export AIO_PORT=8080 -export AIO_AUX_PORT=8081 - - -export SN_DB_URL="sqlite://" -export SN_AGENT_ETH_CLIENT="http://localhost:8545" - -# this is the key used to encrypt cookies. Keep it safe! -# you can generate a new key with `./tools.sh cookie` -export SN_AGENT_COOKIE_SECRET="M0a6HuukMqXf9VMmMMf9RiZFKKPc5etebG4-R8IPTQc=" - -# this should be changed for every agent - we need a way to create these and store the data persistently - perhaps a file? -export SN_NETWORK_AGENT_ID='b545478a-971a-48ec-bc56-4b9b7176799c' - -export SN_SERVICE_ADAPTER_CONFIG_FILE="${AGENT_DIR}/service_adapter_config_example.yml" diff --git a/agent/agent.py b/agent/agent.py index 368b5d3..92c7433 100644 --- a/agent/agent.py +++ b/agent/agent.py @@ -1,8 +1,23 @@ from aiohttp import web +from sn_agent.agent import AgentSettings from sn_agent.app import create_app +import ssl + +import logging + +logger = logging.getLogger(__name__) app = create_app() +# sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) +# sslcontext.load_cert_chain('server.crt', 'server.key') + # TODO Make the port configurable from the ENV -web.run_app(app, port=8000) +# web.run_app(app, port=8000, ssl_context=sslcontext) + +settings = AgentSettings() + +logger.info('Host setting: %s', settings.WEB_HOST) + +web.run_app(app, port=settings.WEB_PORT) diff --git a/agent/agent.sh b/agent/agent.sh old mode 100644 new mode 100755 index 3d2bdfa..d884b9f --- a/agent/agent.sh +++ b/agent/agent.sh @@ -3,6 +3,8 @@ set -o errexit set -o nounset +export SN_AGENT_WEB_HOST=$(netstat -nr | grep '^0\.0\.0\.0' | awk '{print $2}') + function run_tests { py.test --verbose --cov-config .coveragerc --cov-report html --cov=sn_agent tests } @@ -13,7 +15,7 @@ noop) ;; run) - python agent.py + python3 agent.py ;; docs) diff --git a/agent/requirements.txt b/agent/requirements.txt index 4be2f53..6dd2ce6 100644 --- a/agent/requirements.txt +++ b/agent/requirements.txt @@ -22,9 +22,9 @@ sphinx-autobuild sphinx_rtd_theme recommonmark -msgpack-python -ipaddress -miniupnpc -apscheduler -m2crypto -pynacl +aiohttp-jinja2 +bson + +tensorflow + +web3 diff --git a/agent/sn_agent/agent/settings.py b/agent/sn_agent/agent/settings.py index c8deb21..d354ff9 100644 --- a/agent/sn_agent/agent/settings.py +++ b/agent/sn_agent/agent/settings.py @@ -8,4 +8,6 @@ def __init__(self, **custom_settings): self._ENV_PREFIX = 'SN_AGENT_' self.CLASS = 'sn_agent.agent.test.TestAgent' self.ID = Required(uuid.UUID) + self.WEB_HOST = "0.0.0.0" + self.WEB_PORT = 8000 super().__init__(**custom_settings) diff --git a/agent/sn_agent/api.py b/agent/sn_agent/api.py deleted file mode 100644 index 0aa4727..0000000 --- a/agent/sn_agent/api.py +++ /dev/null @@ -1,21 +0,0 @@ -from aiohttp import web -from jsonrpcserver.aio import methods - - -@methods.add -async def ping(): - return 'pong' - - -@methods.add -async def disable_service(service_descriptor): - return 'pong' - - -async def handler(request): - request = await request.text() - response = await methods.dispatch(request) - if response.is_notification: - return web.Response() - else: - return web.json_response(response, status=response.http_status) diff --git a/agent/sn_agent/api/__init__.py b/agent/sn_agent/api/__init__.py new file mode 100644 index 0000000..26bb37a --- /dev/null +++ b/agent/sn_agent/api/__init__.py @@ -0,0 +1,79 @@ +import logging +import os +from aiohttp import web, WSMsgType +from aiohttp.web_response import Response +from jsonrpcserver.aio import methods + +from sn_agent.api.job import submit_job + +logger = logging.getLogger(__name__) + + +async def http_handler(request): + request = await request.text() + response = await methods.dispatch(request) + if response.is_notification: + return web.Response() + else: + return web.json_response(response, status=response.http_status) + + +WS_FILE = os.path.join(os.path.dirname(__file__), 'websocket.html') + + +async def ws_handler(request): + logger.debug('WebSocket Handler started') + + app = request.app + + resp = web.WebSocketResponse() + + ok, protocol = resp.can_prepare(request) + if not ok: + with open(WS_FILE, 'rb') as fp: + return Response(body=fp.read(), content_type='text/html') + + await resp.prepare(request) + + logger.debug('WebSocket data received') + + try: + + request.app['sockets'].append(resp) + + async for msg in resp: + + logger.debug('Processing WebSocket message: %s', msg.type) + + if msg.type == WSMsgType.TEXT: + + response = await methods.dispatch(msg.data, app) + if not response.is_notification: + await resp.send_str(str(response)) + + elif msg.type == WSMsgType.ERROR: + logger.debug('ws connection closed with exception %s' % resp.exception()) + + else: + logger.debug("Unhandled message type") + return resp + return resp + + finally: + request.app['sockets'].remove(resp) + logger.debug('Someone disconnected.') + + +async def on_shutdown(app): + for ws in app['sockets']: + await ws.close() + +def setup_api(app): + methods.add(submit_job) + + app['sockets'] = [] + + app.router.add_post('/api', http_handler) + app.router.add_get('/api/ws', ws_handler) + + app.on_shutdown.append(on_shutdown) diff --git a/agent/sn_agent/api/job.py b/agent/sn_agent/api/job.py new file mode 100644 index 0000000..ae31a3f --- /dev/null +++ b/agent/sn_agent/api/job.py @@ -0,0 +1,76 @@ +import logging +from urllib.parse import urlparse + +import aiohttp + +from sn_agent import ontology + +logger = logging.getLogger(__name__) + + +async def submit_job(context=None): + process_job(context) + + return 'pong' + + +async def process_job(app): + logger.debug("Job submission") + + blockchain = app['blockchain'] + dht = app['dht'] + + ontology_id = ontology.DOCUMENT_SUMMARIZER_ID + agent_ids = blockchain.get_agents_for_ontology(ontology_id) + + available_agents = [] + for agent_id in agent_ids: + + connection_info = dht.get(agent_id) + + for value in connection_info: + logger.debug('received value: %s', value) + + if isinstance(value, dict): + if 'url' in value.keys(): + url = urlparse(value['url']) + url_str = url.geturl() + + logger.debug('Connection URL: %s', url_str) + # if url.scheme == 'ws' or url.scheme == 'wss': + + try: + session = aiohttp.ClientSession() + async with session.ws_connect(url_str, heartbeat=10000) as ws: + + logger.debug("************** Successfully connected to %s", url) + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + if msg.data == 'close cmd': + await ws.close() + break + else: + await ws.send_str(msg.data + '/answer') + elif msg.type == aiohttp.WSMsgType.CLOSED: + break + elif msg.type == aiohttp.WSMsgType.ERROR: + break + + except aiohttp.ClientConnectorError: + logger.error('Client Connector error for: %s', url_str) + pass + + except aiohttp.ServerDisconnectedError: + logger.error('Server disconnected error for: %s', url_str) + pass + + except aiohttp.WSServerHandshakeError: + logger.error('Incorrect WS handshake for: %s', url_str) + pass + + except aiohttp.ClientOSError: + logger.error('Client OS error for: %s', url_str) + pass + + finally: + session.close() diff --git a/agent/sn_agent/network/geth/poller.py b/agent/sn_agent/api/poller.py similarity index 100% rename from agent/sn_agent/network/geth/poller.py rename to agent/sn_agent/api/poller.py diff --git a/agent/sn_agent/app.py b/agent/sn_agent/app.py index 3125608..598f26d 100644 --- a/agent/sn_agent/app.py +++ b/agent/sn_agent/app.py @@ -5,15 +5,21 @@ from aiohttp import web from sn_agent.agent import setup_agent +from sn_agent.api import setup_api from sn_agent.log import setup_logging -from sn_agent.network import setup_network, join_network +from sn_agent.network import setup_network from sn_agent.ontology import setup_ontology from sn_agent.routes import setup_routes from sn_agent.service_adapter import setup_service_manager +from sn_agent.ui import setup_ui logger = logging.getLogger(__name__) +async def startup(app): + await app['network'].startup() + + def create_app(): # Significant performance improvement: https://github.com/MagicStack/uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) @@ -25,10 +31,12 @@ def create_app(): setup_ontology(app) setup_network(app) setup_service_manager(app) + setup_api(app) setup_agent(app) - - join_network(app) + setup_ui(app) app['name'] = 'SingularityNET Agent' + app.on_startup.append(startup) + return app diff --git a/agent/sn_agent/job/job_descriptor.py b/agent/sn_agent/job/job_descriptor.py index 8dbbaab..0025974 100644 --- a/agent/sn_agent/job/job_descriptor.py +++ b/agent/sn_agent/job/job_descriptor.py @@ -11,6 +11,7 @@ test_jobs = {} + class JobDescriptor(object): def __init__(self, service: ServiceDescriptor, job_parameters: dict = None): self.service = service @@ -33,8 +34,10 @@ def __iter__(self): def __delitem__(self, key): self.job_parameters.__delitem__(key) + def __getitem__(self, key): return self.job_parameters.__getitem__(key) + def __setitem__(self, key, value): self.job_parameters.__setitem__(key, value) @@ -58,9 +61,9 @@ def init_test_jobs(): test_jobs[ontology.ENTITY_EXTRACTER_ID] = [] job_parameters = {'input_type': 'file', - 'input_url': 'http://test.com/inputs/test_input.txt', - 'output_type': 'file_url_put', - 'output_url': 'test_output.txt'} + 'input_url': 'http://test.com/inputs/test_input.txt', + 'output_type': 'file_url_put', + 'output_url': 'test_output.txt'} job_parameters_2 = {'input_type': 'file', 'input_url': 'http://test.com/inputs/test_input_2.txt', 'output_type': 'file_url_put', diff --git a/agent/sn_agent/log.py b/agent/sn_agent/log.py index 12c00df..60c45a6 100644 --- a/agent/sn_agent/log.py +++ b/agent/sn_agent/log.py @@ -4,10 +4,10 @@ def setup_logging(): _logging = { 'version': 1, - 'disable_existing_loggers': True, + 'disable_existing_loggers': False, 'root': { - 'level': 'WARNING', + 'level': 'INFO', 'handlers': ['console'], }, diff --git a/agent/sn_agent/network/__init__.py b/agent/sn_agent/network/__init__.py index 97912f0..915b206 100644 --- a/agent/sn_agent/network/__init__.py +++ b/agent/sn_agent/network/__init__.py @@ -15,18 +15,3 @@ def setup_network(app): def join_network(app): app['network'].join_network() - - -class BadMessage(Exception): - """ Raised when a message can't be parsed or a timeout occurs """ - pass - - -class MaxSizeException(Exception): - """ Maximum size of something is reached """ - pass - - -class NetworkError(Exception): - """ Network error """ - pass diff --git a/agent/sn_agent/network/blockchain.py b/agent/sn_agent/network/blockchain.py new file mode 100644 index 0000000..72d1feb --- /dev/null +++ b/agent/sn_agent/network/blockchain.py @@ -0,0 +1,19 @@ +import Web3 as Web3 +from web3 import HTTPProvider + +from sn_agent import ontology + + +class BlockChain: + def __init__(self, client_url) -> None: + self.conn = Web3(HTTPProvider(client_url)) + + def get_agents_for_ontology(self, ontology_id): + + # Total short circuit here - this needs to go out to the blockchain + if ontology.DOCUMENT_SUMMARIZER_ID: + # Send the top part to Alice + return ['b545478a-971a-48ec-bc56-4b9b7176799c', ] + else: + # Everything else goes to Bob + return ['c545478a-971a-48ec-bc56-aaaaaaaaaaaa'] diff --git a/agent/sn_agent/network/dht.py b/agent/sn_agent/network/dht.py new file mode 100644 index 0000000..a5fb29f --- /dev/null +++ b/agent/sn_agent/network/dht.py @@ -0,0 +1,51 @@ +import logging +import opendht as dht + +import bson + +logger = logging.getLogger(__name__) + + +class DHT: + def __init__(self, boot_host, boot_port): + + node = dht.DhtRunner() + self.node = node + + node.run() + + node.bootstrap(boot_host, boot_port) + + def put(self, key, value, value_id=None, type_id=None): + logger.debug('Putting key and value into DHT: %s - %s', key, value) + + value_bin = bson.dumps(value) + + key_hash = dht.InfoHash.get(key) + connection_data = dht.Value(value_bin) + + if value_id: + connection_data.id = value_id + + if type_id: + connection_data.user_type = type_id + + self.node.put(key_hash, connection_data) + + logger.debug('key and value put into DHT') + + def get(self, key): + logger.debug('Getting data info for key: %s', key) + + key_hash = dht.InfoHash.get(key) + data_bin = self.node.get(key_hash) + + data = [] + + # Deserialize from BSON - another serializer could be used but BSON works well here + for d in data_bin: + data = bson.loads(d.data) + data.append(data) + + logger.debug('Key Value info: %s', data) + return data diff --git a/agent/sn_agent/network/dht/__init__.py b/agent/sn_agent/network/dht/__init__.py deleted file mode 100644 index a104fef..0000000 --- a/agent/sn_agent/network/dht/__init__.py +++ /dev/null @@ -1,48 +0,0 @@ -import logging - -from sn_agent.agent.base import AgentABC -from sn_agent.network.base import NetworkABC -from sn_agent.network.dht.dht import DHT -from sn_agent.network.enum import NetworkStatus -from sn_agent.ontology.service_descriptor import ServiceDescriptor - -logger = logging.getLogger(__name__) - - -class DHTNetwork(NetworkABC): - def __init__(self, app): - super().__init__(app) - - self.dht = DHT() - - def update_ontology(self): - super().update_ontology() - - def is_agent_a_member(self, agent: AgentABC) -> bool: - return super().is_agent_a_member(agent) - - def leave_network(self) -> bool: - return super().leave_network() - - def join_network(self) -> bool: - agent = self.app['agent'] - agent_id = str(agent.agent_id) - self.dht[agent_id] = self.dht.my_id - - def get_network_status(self) -> NetworkStatus: - return super().get_network_status() - - def advertise_service(self, service: ServiceDescriptor): - super().advertise_service(service) - - def logon_network(self) -> bool: - return super().logon_network() - - def find_service_providers(self, service: ServiceDescriptor) -> list: - return super().find_service_providers(service) - - def remove_service_advertisement(self, service: ServiceDescriptor): - super().remove_service_advertisement(service) - - def logoff_network(self) -> bool: - return super().logoff_network() diff --git a/agent/sn_agent/network/dht/bucketset.py b/agent/sn_agent/network/dht/bucketset.py deleted file mode 100644 index f6b896c..0000000 --- a/agent/sn_agent/network/dht/bucketset.py +++ /dev/null @@ -1,43 +0,0 @@ -import heapq -import threading - -from .peer import Peer - - -def largest_differing_bit(value1, value2): - distance = value1 ^ value2 - length = -1 - while distance: - distance >>= 1 - length += 1 - return max(0, length) - - -class BucketSet(object): - def __init__(self, bucket_size, buckets, id): - self.id = id - self.bucket_size = bucket_size - self.buckets = [list() for _ in range(buckets)] - self.lock = threading.Lock() - - def insert(self, peer): - if peer.id != self.id: - bucket_number = largest_differing_bit(self.id, peer.id) - peer_triple = peer.astriple() - with self.lock: - bucket = self.buckets[bucket_number] - if peer_triple in bucket: - bucket.pop(bucket.index(peer_triple)) - elif len(bucket) >= self.bucket_size: - bucket.pop(0) - bucket.append(peer_triple) - - def nearest_nodes(self, key): - - with self.lock: - def keyfunction(peer): - return key ^ peer[2] # ideally there would be a better way with names? Instead of storing triples it would be nice to have a dict - - peers = (peer for bucket in self.buckets for peer in bucket) - best_peers = heapq.nsmallest(self.bucket_size, peers, keyfunction) - return [Peer(*peer) for peer in best_peers] diff --git a/agent/sn_agent/network/dht/dht.py b/agent/sn_agent/network/dht/dht.py deleted file mode 100644 index 28abf85..0000000 --- a/agent/sn_agent/network/dht/dht.py +++ /dev/null @@ -1,124 +0,0 @@ -import logging -import random -import threading -import time - -import nacl.encoding -import nacl.signing - -from .bucketset import BucketSet -from .handler import DHTRequestHandler -from .hashing import hash_function, random_id -from .peer import Peer -from .server import DHTServer -from .settings import DHTSettings -from .shortlist import Shortlist - -logger = logging.getLogger(__name__) - - -class DHT(object): - def __init__(self): - self.settings = DHTSettings() - - self.my_id = random_id() - - host = self.settings.HOST - - port = self.settings.PORT - if port == 0: - port = random.randint(5000, 10000) - - self.my_key = nacl.signing.SigningKey.generate() - - self.peer = Peer(host, port, self.my_id) - self.data = {} - self.buckets = BucketSet(self.settings.K, self.settings.ID_BITS, self.peer.id) - self.rpc_ids = {} # should probably have a lock for this - - self.server = DHTServer(self.peer.address(), DHTRequestHandler) - self.server.dht = self - self.server_thread = threading.Thread(target=self.server.serve_forever) - self.server_thread.daemon = True - self.server_thread.start() - - if self.settings.USE_UPNP: - self.server.try_upnp_portmap(port) - - if self.settings.NEEDS_BOOTING: - self.is_boot_node = False - self.bootstrap(self.settings.BOOT_HOST, self.settings.BOOT_PORT) - else: - self.is_boot_node = True - - logger.debug('DHT Server started') - - def iterative_find_nodes(self, key, boot_peer=None): - logger.debug('Finding nearest nodes...') - shortlist = Shortlist(self.settings.K, key) - shortlist.update(self.buckets.nearest_nodes(key)) - - if boot_peer: - logger.debug('This node a boot node: %s', boot_peer) - rpc_id = random.getrandbits(self.settings.ID_BITS) - self.rpc_ids[rpc_id] = shortlist - boot_peer.find_node(key, rpc_id, socket=self.server.socket, peer_id=self.peer.id) - - while (not shortlist.complete()) or boot_peer: - nearest_nodes = shortlist.get_next_iteration(self.settings.ALPHA) - for peer in nearest_nodes: - logger.debug('Nearest Node: %s', peer) - shortlist.mark(peer) - rpc_id = random.getrandbits(self.settings.ID_BITS) - self.rpc_ids[rpc_id] = shortlist - peer.find_node(key, rpc_id, socket=self.server.socket, peer_id=self.peer.id) - time.sleep(self.settings.ITERATION_SLEEP) - boot_peer = None - - return shortlist.results() - - def iterative_find_value(self, key): - shortlist = Shortlist(self.settings.K, key) - shortlist.update(self.buckets.nearest_nodes(key)) - while not shortlist.complete(): - nearest_nodes = shortlist.get_next_iteration(self.settings.ALPHA) - for peer in nearest_nodes: - shortlist.mark(peer) - rpc_id = random.getrandbits(self.settings.ID_BITS) - self.rpc_ids[rpc_id] = shortlist - peer.find_value(key, rpc_id, socket=self.server.socket, peer_id=self.peer.id) - time.sleep(self.settings.ITERATION_SLEEP) - return shortlist.completion_result() - - def bootstrap(self, boot_host, boot_port): - - logger.debug("Bootstrapping for %s:%s", boot_host, boot_port) - - boot_peer = Peer(boot_host, boot_port, 0) - self.iterative_find_nodes(self.peer.id, boot_peer=boot_peer) - - def __getitem__(self, key, bypass=0): - hashed_key = hash_function(key.encode("ascii")) - if hashed_key in self.data: - return self.data[hashed_key]["content"] - result = self.iterative_find_value(hashed_key) - if result: - return result["content"] - - raise KeyError - - def __setitem__(self, key, content): - content = str(content) - hashed_key = hash_function(key.encode("ascii")) - nearest_nodes = self.iterative_find_nodes(hashed_key) - value = { - "content": content, - "key": self.my_key.verify_key.encode(encoder=nacl.encoding.Base64Encoder).decode("utf-8"), - "signature": nacl.encoding.Base64Encoder.encode(self.my_key.sign(content.encode("ascii"))).decode("utf-8") - } - - if not nearest_nodes: - self.data[hashed_key] = value - - for node in nearest_nodes: - node.store(hashed_key, value, socket=self.server.socket, peer_id=self.peer.id) diff --git a/agent/sn_agent/network/dht/handler.py b/agent/sn_agent/network/dht/handler.py deleted file mode 100644 index 028596a..0000000 --- a/agent/sn_agent/network/dht/handler.py +++ /dev/null @@ -1,102 +0,0 @@ -import json -import logging -import socketserver - -import nacl.encoding -import nacl.signing - -from .peer import Peer - -logger = logging.getLogger(__name__) - - -class DHTRequestHandler(socketserver.BaseRequestHandler): - def handle(self): - try: - message = json.loads(self.request[0].decode("utf-8").strip()) - message_type = message["message_type"] - - logger.debug('Handling message type" %s', message_type) - - if message_type == "ping": - self.handle_ping(message) - elif message_type == "pong": - self.handle_pong(message) - elif message_type == "find_node": - self.handle_find(message) - elif message_type == "find_value": - self.handle_find(message, find_value=True) - elif message_type == "found_nodes": - self.handle_found_nodes(message) - elif message_type == "found_value": - self.handle_found_value(message) - elif message_type == "store": - self.handle_store(message) - elif message_type == "push": - self.handle_push(message) - except: - return - - client_host, client_port = self.client_address - peer_id = message["peer_id"] - new_peer = Peer(client_host, client_port, peer_id) - self.server.dht.buckets.insert(new_peer) - - def handle_ping(self, message): - client_host, client_port = self.client_address - id = message["peer_id"] - peer = Peer(client_host, client_port, id) - peer.pong(socket=self.server.socket, peer_id=self.server.dht.peer.id, lock=self.server.send_lock) - - def handle_pong(self, message): - pass - - def handle_find(self, message, find_value=False): - key = message["id"] - id = message["peer_id"] - client_host, client_port = self.client_address - peer = Peer(client_host, client_port, id) - response_socket = self.request[1] - if find_value and (key in self.server.dht.data): - value = self.server.dht.data[key] - peer.found_value(id, value, message["rpc_id"], socket=response_socket, peer_id=self.server.dht.peer.id, lock=self.server.send_lock) - else: - nearest_nodes = self.server.dht.buckets.nearest_nodes(id) - if not nearest_nodes: - nearest_nodes.append(self.server.dht.peer) - nearest_nodes = [nearest_peer.astriple() for nearest_peer in nearest_nodes] - peer.found_nodes(id, nearest_nodes, message["rpc_id"], socket=response_socket, peer_id=self.server.dht.peer.id, lock=self.server.send_lock) - - def handle_found_nodes(self, message): - rpc_id = message["rpc_id"] - shortlist = self.server.dht.rpc_ids[rpc_id] - del self.server.dht.rpc_ids[rpc_id] - nearest_nodes = [Peer(*peer) for peer in message["nearest_nodes"]] - shortlist.update(nearest_nodes) - - def handle_found_value(self, message): - rpc_id = message["rpc_id"] - shortlist = self.server.dht.rpc_ids[rpc_id] - del self.server.dht.rpc_ids[rpc_id] - shortlist.set_complete(message["value"]) - - def handle_store(self, message): - key = message["id"] - - # Verify updated message is signed with same key. - if key in self.server.dht.data: - # Signature is valid. - # (Raises exception if not.) - ret = nacl.signing.VerifyKey(self.server.dht.data[key]["key"], encoder=nacl.encoding.Base64Encoder).verify(nacl.encoding.Base64Encoder.decode(message["value"]["signature"])) - if type(ret) == bytes: - ret = ret.decode("utf-8") - - # Check that the signature corresponds to this message. - message_content = message["value"]["content"] - if ret != message_content: - return - - self.server.dht.data[key] = message["value"] - - def handle_push(self, message): - pass diff --git a/agent/sn_agent/network/dht/hashing.py b/agent/sn_agent/network/dht/hashing.py deleted file mode 100644 index 0c542ca..0000000 --- a/agent/sn_agent/network/dht/hashing.py +++ /dev/null @@ -1,14 +0,0 @@ -import hashlib -import random - -id_bits = 128 - - -def hash_function(data): - return int(hashlib.md5(data).hexdigest(), 16) - - -def random_id(seed=None): - if seed: - random.seed(seed) - return random.randint(0, (2 ** id_bits) - 1) diff --git a/agent/sn_agent/network/dht/peer.py b/agent/sn_agent/network/dht/peer.py deleted file mode 100644 index 2b3af77..0000000 --- a/agent/sn_agent/network/dht/peer.py +++ /dev/null @@ -1,95 +0,0 @@ -import json - -import logging - -logger = logging.getLogger(__name__) - - -class Peer(object): - ''' DHT Peer Information''' - - def __init__(self, host, port, id): - self.host, self.port, self.id = host, port, id - - def astriple(self): - return self.host, self.port, self.id - - def address(self): - return self.host, self.port - - def __repr__(self): - return repr(self.astriple()) - - def _sendmessage(self, message, sock=None, peer_id=None, lock=None): - message["peer_id"] = peer_id # more like sender_id - encoded = json.dumps(message) - - try: - encoded = str(encoded).encode("ascii") - except: - logger.error('There was a problem encoding the encode JSON') - return - - logger.debug('Sending Message: %s to %s on port %s via %s', message, self.host, self.port, type(sock)) - - if sock: - if lock: - with lock: - sock.sendto(encoded, (self.host, self.port)) - else: - - sock.sendto(encoded, (self.host, self.port)) - - def ping(self, socket=None, peer_id=None, lock=None): - message = { - "message_type": "ping" - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) - - def pong(self, socket=None, peer_id=None, lock=None): - message = { - "message_type": "pong" - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) - - def store(self, key, value, socket=None, peer_id=None, lock=None): - message = { - "message_type": "store", - "id": key, - "value": value - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) - - def find_node(self, id, rpc_id, socket=None, peer_id=None, lock=None): - message = { - "message_type": "find_node", - "id": id, - "rpc_id": rpc_id - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) - - def found_nodes(self, id, nearest_nodes, rpc_id, socket=None, peer_id=None, lock=None): - message = { - "message_type": "found_nodes", - "id": id, - "nearest_nodes": nearest_nodes, - "rpc_id": rpc_id - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) - - def find_value(self, id, rpc_id, socket=None, peer_id=None, lock=None): - message = { - "message_type": "find_value", - "id": id, - "rpc_id": rpc_id - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) - - def found_value(self, id, value, rpc_id, socket=None, peer_id=None, lock=None): - message = { - "message_type": "found_value", - "id": id, - "value": value, - "rpc_id": rpc_id - } - self._sendmessage(message, socket, peer_id=peer_id, lock=lock) diff --git a/agent/sn_agent/network/dht/server.py b/agent/sn_agent/network/dht/server.py deleted file mode 100644 index ce66a87..0000000 --- a/agent/sn_agent/network/dht/server.py +++ /dev/null @@ -1,27 +0,0 @@ -import logging -import miniupnpc -import socketserver -import threading - -logger = logging.getLogger(__name__) - - -class DHTServer(socketserver.ThreadingMixIn, socketserver.UDPServer): - def __init__(self, host_address, handler_cls): - - logger.debug("Starting DHT Sever on %s", host_address) - - socketserver.UDPServer.__init__(self, host_address, handler_cls) - self.send_lock = threading.Lock() - - def try_upnp_portmap(self, port): - - upnp = miniupnpc.UPnP() - upnp.discover() - - try: - upnp.selectigd() - result = upnp.addportmapping(port, 'TCP', upnp.lanaddr, port, 'SN Agent dht port: %u' % port, '', ) - logger.debug('UPnP result: %s', result) - except: - logging.error("Unable to port map using UPnP") diff --git a/agent/sn_agent/network/dht/settings.py b/agent/sn_agent/network/dht/settings.py deleted file mode 100644 index f3f4660..0000000 --- a/agent/sn_agent/network/dht/settings.py +++ /dev/null @@ -1,22 +0,0 @@ -from sn_agent import SettingsBase - - -class DHTSettings(SettingsBase): - def __init__(self, **custom_settings): - - self.ITERATION_SLEEP = 1 - self.ID_BITS = 128 - self.ALPHA = 3 - self.K = 20 - self._ENV_PREFIX = 'SN_DHT_' - - self.USE_UPNP = True - - self.NEEDS_BOOTING = True - self.BOOT_HOST = 'localhost' - self.BOOT_PORT = 6881 - - self.HOST = '0.0.0.0' - self.PORT = 6881 - - super().__init__(**custom_settings) diff --git a/agent/sn_agent/network/dht/shortlist.py b/agent/sn_agent/network/dht/shortlist.py deleted file mode 100644 index 154e203..0000000 --- a/agent/sn_agent/network/dht/shortlist.py +++ /dev/null @@ -1,82 +0,0 @@ -import logging -import threading - -from .peer import Peer - -logger = logging.getLogger(__name__) - - -class Shortlist(object): - def __init__(self, k, key): - self.k = k - self.key = key - self.list = list() - self.lock = threading.Lock() - self.completion_value = None - - def set_complete(self, value): - with self.lock: - self.completion_value = value - - def completion_result(self): - with self.lock: - return self.completion_value - - def update(self, nodes): - for node in nodes: - self._update_one(node) - - def _update_one(self, node): - if node.id == self.key or self.completion_value: - return - with self.lock: - for i in range(len(self.list)): - if node.id == self.list[i][0][2]: - break - if node.id ^ self.key < self.list[i][0][2] ^ self.key: - self.list.insert(i, (node.astriple(), False)) - self.list = self.list[:self.k] - break - else: - if len(self.list) < self.k: - self.list.append((node.astriple(), False)) - - def mark(self, node): - with self.lock: - for i in range(len(self.list)): - if node.id == self.list[i][0][2]: - self.list[i] = (node.astriple(), True) - - def complete(self): - - logger.debug('Completion value is: %s', self.completion_value) - if self.completion_value: - return True - - with self.lock: - for node, completed in self.list: - if not completed: - return False - return True - - def get_next_iteration(self, alpha): - - logger.debug('Getting next iteration alpha: %s', alpha) - - if self.completion_value: - return [] - - next_iteration = [] - with self.lock: - for node, completed in self.list: - logger.debug('Getting next iteration %s, %s', node, completed) - if not completed: - next_iteration.append(Peer(*node)) - if len(next_iteration) >= alpha: - break - - return next_iteration - - def results(self): - with self.lock: - return [Peer(*node) for (node, completed) in self.list] diff --git a/agent/sn_agent/network/geth/__init__.py b/agent/sn_agent/network/geth/__init__.py deleted file mode 100644 index edd2995..0000000 --- a/agent/sn_agent/network/geth/__init__.py +++ /dev/null @@ -1,145 +0,0 @@ - -from abc import abstractmethod -from sn_agent.network.base import NetworkABC -from sn_agent.network.enum import NetworkStatus -from sn_agent.network.geth.poller import setup_poller -from sn_agent.ontology.service_descriptor import ServiceDescriptor - - -class GethNetwork(NetworkABC): - def remove_service_advertisement(self, service: ServiceDescriptor): - pass - - def update_ontology(self): - pass - - def get_network_status(self) -> NetworkStatus: - pass - - def logoff_network(self) -> bool: - pass - - def advertise_service(self, service: ServiceDescriptor): - pass - - def leave_network(self) -> bool: - pass - - def logon_network(self) -> bool: - pass - - def find_service_providers(self, service: ServiceDescriptor) -> list: - pass - - def join_network(self) -> bool: - pass - - def __init__(self, app): - super().__init__(app) - setup_poller(app) - - @abstractmethod - def join(self) -> bool: - """ - Agent calls this the first time to connect to the network. An Private and Public key should be returned - """ - pass - - @abstractmethod - def leave(self) -> bool: - """ - Should this do something in the blockchain or just delete the public and private keys? - """ - pass - - @abstractmethod - def status(self) -> bool: - """ - Determine what the current network status is (joined or not joined) - :return: - """ - pass - - @abstractmethod - def get_ontology(self): - """ - Asks for the latest ontology from wherever it is stored. - :return: - """ - raise NotImplementedError() - - @abstractmethod - def advertise(self, agent_id: str, service: ServiceDescriptor) -> bool: - """ - Given an ontology, advertise it as a service that the agent provides - :param agent_id: - :param service: - :return: - """ - pass - - @abstractmethod - def deadvertise(self, agent_id: str, service: ServiceDescriptor) -> bool: - """ - Remove the advertisement of the service for a given agent - :param agent_id: - :param service: - :return: - """ - pass - - @abstractmethod - def find_providers(self, service: ServiceDescriptor) -> list: - """ - Called by the UI as well as find_provider - should return a list that contains information about all the providers that have indicated that they can proved the designated service - :param service: - :return: - """ - pass - - @abstractmethod - def ask_agent_if_can_perform(self, agent_id, service: ServiceDescriptor) -> bool: - """ - :param agent_id: - :param service: - :return: - """ - pass - - @abstractmethod - def ask_agent_to_perform(self, agent_id, service: ServiceDescriptor, json_content) -> bool: - """ - - :return: - """ - pass - - @abstractmethod - def ask_agent_for_their_providers(self, agent_id, service: ServiceDescriptor) -> list: - """ - This is used for creating the tree of services behind a given ontology - - :param agent_id: - :param service: - :return: - """ - pass - - def can_i_perform(self, service: ServiceDescriptor) -> bool: - """ - This is a request coming from the network asking if I can actually do the service - - :param service: - :return: - """ - pass - - def perform(self, service: ServiceDescriptor, json_content) -> bool: - """ - This will instruct the worker to do the task requested - - :param service: - :param json_content: - :return: - """ - pass diff --git a/agent/sn_agent/network/geth/settings.py b/agent/sn_agent/network/geth/settings.py deleted file mode 100644 index 659ea62..0000000 --- a/agent/sn_agent/network/geth/settings.py +++ /dev/null @@ -1,10 +0,0 @@ -from urllib3.util import Url - -from sn_agent import SettingsBase, Required - - -class GetSettings(SettingsBase): - def __init__(self, **custom_settings): - self._ENV_PREFIX = 'SN_GETH_' - self.ETH_CLIENT = Required(Url) - super().__init__(**custom_settings) diff --git a/agent/sn_agent/network/poller.py b/agent/sn_agent/network/poller.py new file mode 100644 index 0000000..2027803 --- /dev/null +++ b/agent/sn_agent/network/poller.py @@ -0,0 +1,56 @@ +import asyncio +import datetime +import logging +from contextlib import suppress + +from aiohttp import web + +logger = logging.getLogger(__file__) + + +class Periodic: + def __init__(self, func, time): + self.func = func + self.time = time + self.is_started = False + self._task = None + + async def start(self): + logger.debug('Starting periodic task') + if not self.is_started: + self.is_started = True + # Start task to call func periodically: + self._task = asyncio.ensure_future(self._run()) + + async def stop(self): + logger.debug('Stopping periodic task') + if self.is_started: + self.is_started = False + # Stop task and await it stopped: + self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task + + async def _run(self): + while True: + await asyncio.sleep(self.time) + self.func() + + +def task_to_run(): + print('Periodic Task: %s' % datetime.datetime.now()) + + +async def startup(app: web.Application): + poller = Periodic(task_to_run, 5) + await poller.start() + app['eth_client_poller'] = poller + + +async def cleanup(app: web.Application): + await app['eth_client_poller'].stop() + + +def setup_poller(app): + app.on_startup.append(startup) + app.on_cleanup.append(cleanup) diff --git a/agent/sn_agent/network/settings.py b/agent/sn_agent/network/settings.py index b8cbf00..d618de1 100644 --- a/agent/sn_agent/network/settings.py +++ b/agent/sn_agent/network/settings.py @@ -1,8 +1,21 @@ -from sn_agent import SettingsBase +from urllib3.util import Url + +from sn_agent import SettingsBase, Required class NetworkSettings(SettingsBase): def __init__(self, **custom_settings): + self.CLIENT_URL = 'http://192.168.16.17:8545' self._ENV_PREFIX = 'SN_NETWORK_' - self.CLASS = 'sn_agent.network.dht.DHTNetwork' + + self.WEB_PORT = 8000 + self.WEB_HOST = '0.0.0.0' + + self.CLASS = 'sn_agent.network.sn.SNNetwork' + + self.BOOT_HOST = 'bootstrap.ring.cx' + self.BOOT_PORT = "4222" + + # self.BLOCKCHAIN_CLIENT = Required(Url) + super().__init__(**custom_settings) diff --git a/agent/sn_agent/network/sn.py b/agent/sn_agent/network/sn.py new file mode 100644 index 0000000..68587a4 --- /dev/null +++ b/agent/sn_agent/network/sn.py @@ -0,0 +1,75 @@ +import json +import logging + +from sn_agent.agent.base import AgentABC +from sn_agent.network import NetworkSettings +from sn_agent.network.base import NetworkABC +from sn_agent.network.blockchain import BlockChain +from sn_agent.network.dht import DHT +from sn_agent.network.enum import NetworkStatus +from sn_agent.ontology.service_descriptor import ServiceDescriptor + +logger = logging.getLogger(__name__) + + +class SNNetwork(NetworkABC): + def __init__(self, app): + super().__init__(app) + self.settings = NetworkSettings() + self.blockchain = BlockChain(self.settings.CLIENT_URL) + self.dht = DHT(self.settings.BOOT_HOST, self.settings.BOOT_PORT) + + async def startup(self): + logger.debug('Registering agent on DHT') + + agent = self.app['agent'] + agent_id = agent.agent_id + agent_id_str = str(agent_id) + + self.dht.put(agent_id_str, {'url': "%s://%s:%s/api/ws" % ('http', self.settings.WEB_HOST, self.settings.WEB_PORT)}, 1) + + def find_service_providers(self, service: ServiceDescriptor) -> list: + return self.blockchain.get_agents_for_ontology(service.ontology_node_id) + + def logoff_network(self) -> bool: + return super().logoff_network() + + def update_ontology(self): + super().update_ontology() + + def join_network(self) -> bool: + def getAddressByName(addresses, name): + for key, value in addresses.items(): + if key == name: + return value + + def parseAbi(data): + for key, value in data.items(): + if key == 'abi': + return value + + payload = {'from': web3.eth.coinbase, 'gas': 1500000, 'gasPrice': 30000000000000} + agentFactoryAbi = parseAbi(json.loads(open('../build/contracts/AgentFactory.json', 'r').read())) + agentFactoryAddress = getAddressByName(json.loads(open('../addresses.json', 'r').read()), 'AgentFactory') + agentFactoryContract = web3.eth.contract(abi=agentFactoryAbi, address=agentFactoryAddress) + + return agentFactoryContract.transact(payload).create() + + def remove_service_advertisement(self, service: ServiceDescriptor): + super().remove_service_advertisement(service) + + def advertise_service(self, service: ServiceDescriptor): + super().advertise_service(service) + + def is_agent_a_member(self, agent: AgentABC) -> bool: + return super().is_agent_a_member(agent) + + def logon_network(self) -> bool: + return super().logon_network() + + def get_network_status(self) -> NetworkStatus: + return super().get_network_status() + + def leave_network(self) -> bool: + return super().leave_network() + diff --git a/agent/sn_agent/network/test.py b/agent/sn_agent/network/test.py index b19959e..1bd8d09 100644 --- a/agent/sn_agent/network/test.py +++ b/agent/sn_agent/network/test.py @@ -76,8 +76,8 @@ def find_service_providers(self, service: ServiceDescriptor) -> list: """ Called by the UI as well as find_provider - should return a list that contains information about all the providers that have indicated that they can proved - the designated service. + the designated service. This is a lookup from the blockchain. :param service: :return: a list of external agents which provide the service requested """ - pass + return ['ALICE',] diff --git a/agent/sn_agent/routes.py b/agent/sn_agent/routes.py index e5e8b09..e3f4bf8 100644 --- a/agent/sn_agent/routes.py +++ b/agent/sn_agent/routes.py @@ -1,5 +1,6 @@ from sn_agent import api +from sn_agent.ui.handlers import index def setup_routes(app): - app.router.add_post('/api', api.handler) + app.router.add_get('', index) diff --git a/agent/sn_agent/ui/__init__.py b/agent/sn_agent/ui/__init__.py new file mode 100644 index 0000000..da3445d --- /dev/null +++ b/agent/sn_agent/ui/__init__.py @@ -0,0 +1,5 @@ +import aiohttp_jinja2 +import jinja2 + +def setup_ui(app): + aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('sn_agent/ui/templates')) diff --git a/agent/sn_agent/ui/handlers.py b/agent/sn_agent/ui/handlers.py new file mode 100644 index 0000000..cdfb99d --- /dev/null +++ b/agent/sn_agent/ui/handlers.py @@ -0,0 +1,6 @@ +import aiohttp_jinja2 as aiohttp_jinja2 + + +@aiohttp_jinja2.template('index.jinja2') +def index(request): + return {'name': 'Andrew', 'surname': 'Svetlov'} diff --git a/agent/sn_agent/ui/templates/index.jinja2 b/agent/sn_agent/ui/templates/index.jinja2 new file mode 100644 index 0000000..fd9cec9 --- /dev/null +++ b/agent/sn_agent/ui/templates/index.jinja2 @@ -0,0 +1,46 @@ + + +
+ + + + + + +Back to basics + + diff --git a/docker-compose.demo.yml b/docker-compose.demo.yml index 9b750e5..d16075a 100644 --- a/docker-compose.demo.yml +++ b/docker-compose.demo.yml @@ -46,10 +46,6 @@ services: volumes: - ./parity-data:/data - - truffle: - build: truffle - ipfs: build: ipfs environment: diff --git a/docker-compose.yml b/docker-compose.yml index bcfeab9..7ddb205 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,29 +3,27 @@ version: "3" services: - agent: + alice: build: agent environment: - SN_AGENT_ID=b545478a-971a-48ec-bc56-4b9b7176799c + - SN_AGENT_WEB_PORT=8000 - SN_SERVICE_ADAPTER_CONFIG_FILE=service_adapter_config_example.yml - PYTHONPATH=/code - - SN_DHT_BOOT_HOST=172.17.0.1 - - SN_DHT_PORT=3000 ports: - - "3000:3000/udp" + - "8000:8000" volumes: - ./agent-data:/data - agent2: + bob: build: agent environment: - SN_AGENT_ID=c545478a-971a-48ec-bc56-aaaaaaaaaaaa + - SN_AGENT_WEB_PORT=8001 - SN_SERVICE_ADAPTER_CONFIG_FILE=service_adapter_config_example.yml - PYTHONPATH=/code - - SN_DHT_NEEDS_BOOTING=False - - SN_DHT_PORT=6881 ports: - - "6881:6881/udp" + - "8001:8001" volumes: - ./agent-data:/data @@ -36,3 +34,6 @@ services: environment: - SN_WEB_COOKIE_SECRET=kubr6DvIuYj4GREdgXq5CCoL5qHQWglj8IECTsI79mY= - PYTHONPATH=/code + + truffle: + build: truffle diff --git a/tools.sh b/tools.sh index c9114fd..a1af4de 100755 --- a/tools.sh +++ b/tools.sh @@ -5,25 +5,20 @@ set -o verbose set -o xtrace set -o nounset -function recreate_agent_image { - docker-compose create --build --force-recreate agent - docker-compose create --build --force-recreate agent2 -} - case "$1" in demo) docker-compose up --build --force-recreate ;; -agent) - recreate_agent_image - docker-compose run --service-ports agent ./agent.sh run +alice) + docker-compose create --build --force-recreate alice + docker-compose run --service-ports alice ./agent.sh run ;; -agent2) - recreate_agent_image - docker-compose run --service-ports agent2 ./agent.sh run +bob) + docker-compose create --build --force-recreate bob + docker-compose run --service-ports bob ./agent.sh run ;; agent-docs) @@ -52,8 +47,10 @@ parity) docker-compose run --service-ports parity ;; -truffle) - docker-compose run --service-ports truffle +prepare) + docker-compose run --service-ports truffle compile --all dao + docker-compose run --service-ports truffle migrate --reset dao + docker-compose run --service-ports truffle test dao ;; ipfs) @@ -65,8 +62,9 @@ clean) ;; hard-clean) + docker image prune docker-compose down --rmi all --remove-orphans - docker kill `docker ps -q` + docker kill `docker ps -q` || true docker rm `docker ps -a -q` docker rmi `docker images -q` docker volume rm `docker volume ls -qf dangling=true` @@ -77,11 +75,8 @@ create-web-cookie) ;; gen-ssl) - openssl genrsa -des3 -passout pass:x -out server.pass.key 2048 - openssl rsa -passin pass:x -in server.pass.key -out server.key - rm server.pass.key - openssl req -new -key server.key -out server.csr -subj "/C=UK/ST=Warwickshire/L=Leamington/O=OrgName/OU=IT Department/CN=example.com" - openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt + cd agent + openssl req -nodes -new -x509 -keyout server.key -out server.crt -subj '/CN=localhost' ;; *) echo 'No operation specified'