Skip to content

Commit

Permalink
Interim commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jensenbox committed Oct 19, 2017
1 parent 6114247 commit 86ef0fe
Show file tree
Hide file tree
Showing 38 changed files with 542 additions and 828 deletions.
43 changes: 41 additions & 2 deletions agent/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,49 @@
FROM python:3
FROM ubuntu:17.04

RUN \
apt-get update \
&& apt-get install -y \
apt-utils \
build-essential \
cmake \
git \
wget \
libncurses5-dev \
libreadline-dev \
nettle-dev \
libgnutls28-dev \
libuv1-dev \
libmsgpack-dev \
libargon2-0-dev \
libssl-dev \
net-tools \
nmap \
&& apt-get dist-upgrade -y \
&& apt-get clean

RUN wget https://www.python.org/ftp/python/3.6.3/Python-3.6.3.tgz \
&& tar xfzv Python-3.6.3.tgz \
&& cd Python-3.6.3 \
&& ./configure \
&& make \
&& make install \
&& pip3 install cython

RUN git clone --branch 1.3.6 https://github.com/savoirfairelinux/opendht.git \
&& cd opendht \
&& mkdir build \
&& cd build \
&& cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On -DOPENDHT_LTO=On \
&& make -j8 \
&& make install \
&& cd ../.. \
&& rm -rf opendht

ENV PYTHONUNBUFFERED 1

WORKDIR /code

ADD requirements.txt /code
RUN pip install -r requirements.txt
RUN pip3 install -r requirements.txt

ADD . /code
26 changes: 0 additions & 26 deletions agent/activate.settings.sh

This file was deleted.

17 changes: 16 additions & 1 deletion agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
from aiohttp import web

from sn_agent.agent import AgentSettings
from sn_agent.app import create_app
import ssl

import logging

logger = logging.getLogger(__name__)

app = create_app()

# sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
# sslcontext.load_cert_chain('server.crt', 'server.key')

# TODO Make the port configurable from the ENV
web.run_app(app, port=8000)
# web.run_app(app, port=8000, ssl_context=sslcontext)

settings = AgentSettings()

logger.info('Host setting: %s', settings.WEB_HOST)

web.run_app(app, port=settings.WEB_PORT)
4 changes: 3 additions & 1 deletion agent/agent.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
set -o errexit
set -o nounset

export SN_AGENT_WEB_HOST=$(netstat -nr | grep '^0\.0\.0\.0' | awk '{print $2}')

function run_tests {
py.test --verbose --cov-config .coveragerc --cov-report html --cov=sn_agent tests
}
Expand All @@ -13,7 +15,7 @@ noop)
;;

run)
python agent.py
python3 agent.py
;;

docs)
Expand Down
12 changes: 6 additions & 6 deletions agent/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ sphinx-autobuild
sphinx_rtd_theme
recommonmark

msgpack-python
ipaddress
miniupnpc
apscheduler
m2crypto
pynacl
aiohttp-jinja2
bson

tensorflow

web3
2 changes: 2 additions & 0 deletions agent/sn_agent/agent/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ def __init__(self, **custom_settings):
self._ENV_PREFIX = 'SN_AGENT_'
self.CLASS = 'sn_agent.agent.test.TestAgent'
self.ID = Required(uuid.UUID)
self.WEB_HOST = "0.0.0.0"
self.WEB_PORT = 8000
super().__init__(**custom_settings)
21 changes: 0 additions & 21 deletions agent/sn_agent/api.py

This file was deleted.

79 changes: 79 additions & 0 deletions agent/sn_agent/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
import os
from aiohttp import web, WSMsgType
from aiohttp.web_response import Response
from jsonrpcserver.aio import methods

from sn_agent.api.job import submit_job

logger = logging.getLogger(__name__)


async def http_handler(request):
request = await request.text()
response = await methods.dispatch(request)
if response.is_notification:
return web.Response()
else:
return web.json_response(response, status=response.http_status)


WS_FILE = os.path.join(os.path.dirname(__file__), 'websocket.html')


async def ws_handler(request):
logger.debug('WebSocket Handler started')

app = request.app

resp = web.WebSocketResponse()

ok, protocol = resp.can_prepare(request)
if not ok:
with open(WS_FILE, 'rb') as fp:
return Response(body=fp.read(), content_type='text/html')

await resp.prepare(request)

logger.debug('WebSocket data received')

try:

request.app['sockets'].append(resp)

async for msg in resp:

logger.debug('Processing WebSocket message: %s', msg.type)

if msg.type == WSMsgType.TEXT:

response = await methods.dispatch(msg.data, app)
if not response.is_notification:
await resp.send_str(str(response))

elif msg.type == WSMsgType.ERROR:
logger.debug('ws connection closed with exception %s' % resp.exception())

else:
logger.debug("Unhandled message type")
return resp
return resp

finally:
request.app['sockets'].remove(resp)
logger.debug('Someone disconnected.')


async def on_shutdown(app):
for ws in app['sockets']:
await ws.close()

def setup_api(app):
methods.add(submit_job)

app['sockets'] = []

app.router.add_post('/api', http_handler)
app.router.add_get('/api/ws', ws_handler)

app.on_shutdown.append(on_shutdown)
76 changes: 76 additions & 0 deletions agent/sn_agent/api/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
from urllib.parse import urlparse

import aiohttp

from sn_agent import ontology

logger = logging.getLogger(__name__)


async def submit_job(context=None):
process_job(context)

return 'pong'


async def process_job(app):
logger.debug("Job submission")

blockchain = app['blockchain']
dht = app['dht']

ontology_id = ontology.DOCUMENT_SUMMARIZER_ID
agent_ids = blockchain.get_agents_for_ontology(ontology_id)

available_agents = []
for agent_id in agent_ids:

connection_info = dht.get(agent_id)

for value in connection_info:
logger.debug('received value: %s', value)

if isinstance(value, dict):
if 'url' in value.keys():
url = urlparse(value['url'])
url_str = url.geturl()

logger.debug('Connection URL: %s', url_str)
# if url.scheme == 'ws' or url.scheme == 'wss':

try:
session = aiohttp.ClientSession()
async with session.ws_connect(url_str, heartbeat=10000) as ws:

logger.debug("************** Successfully connected to %s", url)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close cmd':
await ws.close()
break
else:
await ws.send_str(msg.data + '/answer')
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break

except aiohttp.ClientConnectorError:
logger.error('Client Connector error for: %s', url_str)
pass

except aiohttp.ServerDisconnectedError:
logger.error('Server disconnected error for: %s', url_str)
pass

except aiohttp.WSServerHandshakeError:
logger.error('Incorrect WS handshake for: %s', url_str)
pass

except aiohttp.ClientOSError:
logger.error('Client OS error for: %s', url_str)
pass

finally:
session.close()
File renamed without changes.
14 changes: 11 additions & 3 deletions agent/sn_agent/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@
from aiohttp import web

from sn_agent.agent import setup_agent
from sn_agent.api import setup_api
from sn_agent.log import setup_logging
from sn_agent.network import setup_network, join_network
from sn_agent.network import setup_network
from sn_agent.ontology import setup_ontology
from sn_agent.routes import setup_routes
from sn_agent.service_adapter import setup_service_manager
from sn_agent.ui import setup_ui

logger = logging.getLogger(__name__)


async def startup(app):
await app['network'].startup()


def create_app():
# Significant performance improvement: https://github.com/MagicStack/uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Expand All @@ -25,10 +31,12 @@ def create_app():
setup_ontology(app)
setup_network(app)
setup_service_manager(app)
setup_api(app)
setup_agent(app)

join_network(app)
setup_ui(app)

app['name'] = 'SingularityNET Agent'

app.on_startup.append(startup)

return app
9 changes: 6 additions & 3 deletions agent/sn_agent/job/job_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

test_jobs = {}


class JobDescriptor(object):
def __init__(self, service: ServiceDescriptor, job_parameters: dict = None):
self.service = service
Expand All @@ -33,8 +34,10 @@ def __iter__(self):

def __delitem__(self, key):
self.job_parameters.__delitem__(key)

def __getitem__(self, key):
return self.job_parameters.__getitem__(key)

def __setitem__(self, key, value):
self.job_parameters.__setitem__(key, value)

Expand All @@ -58,9 +61,9 @@ def init_test_jobs():
test_jobs[ontology.ENTITY_EXTRACTER_ID] = []

job_parameters = {'input_type': 'file',
'input_url': 'http://test.com/inputs/test_input.txt',
'output_type': 'file_url_put',
'output_url': 'test_output.txt'}
'input_url': 'http://test.com/inputs/test_input.txt',
'output_type': 'file_url_put',
'output_url': 'test_output.txt'}
job_parameters_2 = {'input_type': 'file',
'input_url': 'http://test.com/inputs/test_input_2.txt',
'output_type': 'file_url_put',
Expand Down
Loading

0 comments on commit 86ef0fe

Please sign in to comment.