Skip to content

Commit

Permalink
Reqorking the blockchain code
Browse files Browse the repository at this point in the history
  • Loading branch information
jensenbox committed Oct 20, 2017
1 parent 53929cc commit c8c9fa3
Show file tree
Hide file tree
Showing 27 changed files with 667 additions and 271 deletions.
3 changes: 2 additions & 1 deletion agent/agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
set -o errexit
set -o nounset

export SN_AGENT_WEB_HOST=$(netstat -nr | grep '^0\.0\.0\.0' | awk '{print $2}')
export SN_NETWORK_GATEWAY=$(netstat -nr | grep '^0\.0\.0\.0' | awk '{print $2}')
echo $SN_NETWORK_GATEWAY

function run_tests {
py.test --verbose --cov-config .coveragerc --cov-report html --cov=sn_agent tests
Expand Down
82 changes: 21 additions & 61 deletions agent/sn_agent/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import logging
import os
from aiohttp import web, WSMsgType
from aiohttp.web_response import Response

from aiohttp import web
from jsonrpcserver.aio import methods

from sn_agent.api.job import submit_job
from sn_agent import ontology
from sn_agent.api.job import can_perform_service, perform_job
from sn_agent.job.job_descriptor import JobDescriptor
from sn_agent.ontology.service_descriptor import ServiceDescriptor

logger = logging.getLogger(__name__)


@methods.add
async def can_perform(request, context):
# figure out what we are being asked to perform and answer
service = ServiceDescriptor(ontology.DOCUMENT_SUMMARIZER_ID)
app = context
return await can_perform_service(app, service)


@methods.add
async def perform(request, context):
job = JobDescriptor()
app = context
return await perform_job(app, job)


async def http_handler(request):
request = await request.text()
response = await methods.dispatch(request)
Expand All @@ -18,62 +35,5 @@ async def http_handler(request):
return web.json_response(response, status=response.http_status)


WS_FILE = os.path.join(os.path.dirname(__file__), 'websocket.html')


async def ws_handler(request):
logger.debug('WebSocket Handler started')

app = request.app

resp = web.WebSocketResponse()

ok, protocol = resp.can_prepare(request)
if not ok:
with open(WS_FILE, 'rb') as fp:
return Response(body=fp.read(), content_type='text/html')

await resp.prepare(request)

logger.debug('WebSocket data received')

try:

request.app['sockets'].append(resp)

async for msg in resp:

logger.debug('Processing WebSocket message: %s', msg.type)

if msg.type == WSMsgType.TEXT:

response = await methods.dispatch(msg.data, app)
if not response.is_notification:
await resp.send_str(str(response))

elif msg.type == WSMsgType.ERROR:
logger.debug('ws connection closed with exception %s' % resp.exception())

else:
logger.debug("Unhandled message type")
return resp
return resp

finally:
request.app['sockets'].remove(resp)
logger.debug('Someone disconnected.')


async def on_shutdown(app):
for ws in app['sockets']:
await ws.close()

def setup_api(app):
methods.add(submit_job)

app['sockets'] = []

app.router.add_post('/api', http_handler)
app.router.add_get('/api/ws', ws_handler)

app.on_shutdown.append(on_shutdown)
77 changes: 17 additions & 60 deletions agent/sn_agent/api/job.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,33 @@
import logging
from urllib.parse import urlparse

import aiohttp

from sn_agent import ontology
from sn_agent.job.job_descriptor import JobDescriptor
from sn_agent.ontology.service_descriptor import ServiceDescriptor

logger = logging.getLogger(__name__)


async def submit_job(context=None):
process_job(context)

return 'pong'


async def process_job(app):
logger.debug("Job submission")

blockchain = app['blockchain']
dht = app['dht']

ontology_id = ontology.DOCUMENT_SUMMARIZER_ID
agent_ids = blockchain.get_agents_for_ontology(ontology_id)

available_agents = []
for agent_id in agent_ids:

connection_info = dht.get(agent_id)
async def can_perform_service(app, service_descriptor: ServiceDescriptor):
logger.debug("get_can_perform: %s", service_descriptor)

for value in connection_info:
logger.debug('received value: %s', value)
service_manager = app['service_manager']
service_adapter = service_manager.get_service_adapter_for_id(service_descriptor.ontology_node_id)

if isinstance(value, dict):
if 'url' in value.keys():
url = urlparse(value['url'])
url_str = url.geturl()
if service_adapter is None:
raise Exception('Service not available')

logger.debug('Connection URL: %s', url_str)
# if url.scheme == 'ws' or url.scheme == 'wss':
return service_adapter.can_perform()

try:
session = aiohttp.ClientSession()
async with session.ws_connect(url_str, heartbeat=10000) as ws:

logger.debug("************** Successfully connected to %s", url)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close cmd':
await ws.close()
break
else:
await ws.send_str(msg.data + '/answer')
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
async def perform_job(app, job_descriptor: JobDescriptor):
logger.debug("perform_job: %s", job_descriptor)

except aiohttp.ClientConnectorError:
logger.error('Client Connector error for: %s', url_str)
pass
service_manager = app['service_manager']

except aiohttp.ServerDisconnectedError:
logger.error('Server disconnected error for: %s', url_str)
pass
service_descriptor = job_descriptor.service

except aiohttp.WSServerHandshakeError:
logger.error('Incorrect WS handshake for: %s', url_str)
pass
service_adapter = service_manager.get_service_adapter_for_id(service_descriptor.ontology_node_id)

except aiohttp.ClientOSError:
logger.error('Client OS error for: %s', url_str)
pass
if service_adapter is None:
raise Exception('Service not available')

finally:
session.close()
return service_adapter.perform(job_descriptor)
4 changes: 0 additions & 4 deletions agent/sn_agent/job/job_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@ def __delitem__(self, key):
self.job_parameters.__delitem__(key)

def __getitem__(self, key):
<<<<<<< HEAD
return self.job_parameters.__getitem__(key)

=======
return self.job_parameters[key]
>>>>>>> d488c9bcd3f8b3f2850cc2fcb85eb304d3c92a30
def __setitem__(self, key, value):
self.job_parameters[key] = value

Expand Down
4 changes: 0 additions & 4 deletions agent/sn_agent/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,3 @@ def setup_network(app):
klass = import_string(settings.CLASS)
logger.debug('Loading network class: %s', klass)
app['network'] = klass(app)


def join_network(app):
app['network'].join_network()
19 changes: 0 additions & 19 deletions agent/sn_agent/network/blockchain.py

This file was deleted.

41 changes: 41 additions & 0 deletions agent/sn_agent/network/data/AgentFactory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"contract_name": "AgentFactory",
"abi": [
{
"constant": false,
"inputs": [],
"name": "create",
"outputs": [
{
"name": "",
"type": "address"
}
],
"payable": false,
"type": "function"
}
],
"unlinked_binary": "0x6060604052341561000f57600080fd5b5b61087a8061001f6000396000f300606060405263ffffffff7c0100000000000000000000000000000000000000000000000000000000600035041663efc81a8c811461003d575b600080fd5b341561004857600080fd5b610050610079565b60405173ffffffffffffffffffffffffffffffffffffffff909116815260200160405180910390f35b600061008361009f565b604051809103906000f080151561009957600080fd5b90505b90565b60405161079f806100b083390190560060606040525b60008054600160a060020a03191633600160a060020a03161790555b5b61076e806100316000396000f300606060405236156100965763ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631cc1e7bc811461009a5780632f54bf6e146100ba57806358e7ec3c146100ed5780636c0437731461011c5780638da5cb5b14610149578063a59455dc14610178578063a662031b14610206578063d0a9e0c014610241578063f2fde38b146102cf575b5b5b005b34156100a557600080fd5b61009660048035602481019101356102f0565b005b34156100c557600080fd5b6100d9600160a060020a0360043516610322565b604051901515815260200160405180910390f35b34156100f857600080fd5b610100610339565b604051600160a060020a03909116815260200160405180910390f35b341561012757600080fd5b61009660048035600160a060020a03169060248035908101910135610348565b005b341561015457600080fd5b6101006103f2565b604051600160a060020a03909116815260200160405180910390f35b341561018357600080fd5b61018e600435610401565b60405160208082528190810183818151815260200191508051906020019080838360005b838110156101cb5780820151818401525b6020016101b2565b50505050905090810190601f1680156101f85780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b341561021157600080fd5b610100600160a060020a03600435166104c7565b604051600160a060020a03909116815260200160405180910390f35b341561024c57600080fd5b61018e6004356104f7565b60405160208082528190810183818151815260200191508051906020019080838360005b838110156101cb5780820151818401525b6020016101b2565b50505050905090810190601f1680156101f85780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b34156102da57600080fd5b610096600160a060020a03600435166105b3565b005b6001805480820161030183826105f4565b916000526020600020900160005b5061031b90848461061e565b50505b5050565b600054600160a060020a038281169116145b919050565b600254600160a060020a031681565b61035133610322565b151561035c57600080fd5b82600160a060020a0316631cc1e7bc83836040517c010000000000000000000000000000000000000000000000000000000063ffffffff85160281526020600482019081526024820183905290819060440184848082843782019150509350505050600060405180830381600087803b15156103d757600080fd5b6102c65a03f115156103e857600080fd5b5050505b5b505050565b600054600160a060020a031681565b61040961069d565b600180548390811061041757fe5b906000526020600020900160005b508054600181600116156101000203166002900480601f0160208091040260200160405190810160405280929190818152602001828054600181600116156101000203166002900480156104ba5780601f1061048f576101008083540402835291602001916104ba565b820191906000526020600020905b81548152906001019060200180831161049d57829003601f168201915b505050505090505b919050565b6002805473ffffffffffffffffffffffffffffffffffffffff1916600160a060020a03831617905560005b919050565b600180548290811061050557fe5b906000526020600020900160005b915090508054600181600116156101000203166002900480601f0160208091040260200160405190810160405280929190818152602001828054600181600116156101000203166002900480156105ab5780601f10610580576101008083540402835291602001916105ab565b820191906000526020600020905b81548152906001019060200180831161058e57829003601f168201915b505050505081565b6105bc33610322565b15156105c757600080fd5b6000805473ffffffffffffffffffffffffffffffffffffffff1916600160a060020a0383161790555b5b50565b8154818355818115116103ec576000838152602090206103ec9181019083016106af565b5b505050565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f1061065f5782800160ff1982351617855561068c565b8280016001018555821561068c579182015b8281111561068c578235825591602001919060010190610671565b5b506106999291506106d9565b5090565b60206040519081016040526000815290565b6106d691905b808211156106995760006106c982826106fa565b506001016106b5565b5090565b90565b6106d691905b8082111561069957600081556001016106df565b5090565b90565b50805460018160011615610100020316600290046000825580601f1061072057506105f0565b601f0160209004906000526020600020908101906105f091906106d9565b5b505600a165627a7a723058201057da26894506df11bde6d0821ed5926efce4fe5afccb7c0be1b2c3ba655d6c0029a165627a7a72305820b58b7e7739d32b54a3fdfd31bd302af676be79487f7d62a889c75caf502500a80029",
"networks": {
"1508470469290": {
"events": {},
"links": {},
"address": "0x1cf4cb6d4f7672e352342300ecbc15ff0acd79d8",
"updated_at": 1508470475336
},
"1508470589354": {
"events": {},
"links": {},
"address": "0x60ddbd1f9f8c786e6f43e0470ed5a3498f6f4440",
"updated_at": 1508470592733
},
"1508470632092": {
"events": {},
"links": {},
"address": "0xb2eed4473b2e4107e7a43f79a23d3500acfe3ac2",
"updated_at": 1508470635443
}
},
"schema_version": "0.0.5",
"updated_at": 1508470635443
}
Loading

0 comments on commit c8c9fa3

Please sign in to comment.