From 33bf3ab3957e404f63fb37f76bb05f4c92fa221a Mon Sep 17 00:00:00 2001 From: Christian Jensen Date: Tue, 3 Oct 2017 14:16:34 -0700 Subject: [PATCH] Initial version of the DHT --- agent/sn_agent/network/dht/__init__.py | 59 +++++++++++ agent/sn_agent/network/dht/bucketset.py | 43 ++++++++ agent/sn_agent/network/dht/dht.py | 124 ++++++++++++++++++++++++ agent/sn_agent/network/dht/handler.py | 102 +++++++++++++++++++ agent/sn_agent/network/dht/hashing.py | 14 +++ agent/sn_agent/network/dht/lazymq.py | 0 agent/sn_agent/network/dht/peer.py | 95 ++++++++++++++++++ agent/sn_agent/network/dht/server.py | 23 +++++ agent/sn_agent/network/dht/settings.py | 18 ++++ agent/sn_agent/network/dht/shortlist.py | 82 ++++++++++++++++ docker-compose.demo.yml | 58 +++++++++++ docker-compose.yml | 28 ------ 12 files changed, 618 insertions(+), 28 deletions(-) create mode 100644 agent/sn_agent/network/dht/__init__.py create mode 100644 agent/sn_agent/network/dht/bucketset.py create mode 100644 agent/sn_agent/network/dht/dht.py create mode 100644 agent/sn_agent/network/dht/handler.py create mode 100644 agent/sn_agent/network/dht/hashing.py create mode 100644 agent/sn_agent/network/dht/lazymq.py create mode 100644 agent/sn_agent/network/dht/peer.py create mode 100644 agent/sn_agent/network/dht/server.py create mode 100644 agent/sn_agent/network/dht/settings.py create mode 100644 agent/sn_agent/network/dht/shortlist.py create mode 100644 docker-compose.demo.yml diff --git a/agent/sn_agent/network/dht/__init__.py b/agent/sn_agent/network/dht/__init__.py new file mode 100644 index 0000000..886c63e --- /dev/null +++ b/agent/sn_agent/network/dht/__init__.py @@ -0,0 +1,59 @@ +import logging + +from sn_agent.network.dht.dht import DHT + +from sn_agent.agent.base import AgentABC +from sn_agent.network.base import NetworkABC +from sn_agent.network.enum import NetworkStatus +from sn_agent.ontology.service_descriptor import ServiceDescriptor + +logger = logging.getLogger(__name__) + + +class DHTNetwork(NetworkABC): + def __init__(self, app): + super().__init__(app) + import nacl.signing + + + + host1, port1 = 'localhost', 3000 + dht1 = DHT(key1) + + key2 = nacl.signing.SigningKey.generate() + + host2, port2 = 'localhost', 3001 + dht2 = DHT(host2, port2, key2, boot_host=host1, boot_port=port1) + + dht1["test2"] = ["My", "json-serializable", "Object"] + print(dht2["test2"]) + + def update_ontology(self): + super().update_ontology() + + def is_agent_a_member(self, agent: AgentABC) -> bool: + return super().is_agent_a_member(agent) + + def leave_network(self) -> bool: + return super().leave_network() + + def join_network(self) -> bool: + return super().join_network() + + def get_network_status(self) -> NetworkStatus: + return super().get_network_status() + + def advertise_service(self, service: ServiceDescriptor): + super().advertise_service(service) + + def logon_network(self) -> bool: + return super().logon_network() + + def find_service_providers(self, service: ServiceDescriptor) -> list: + return super().find_service_providers(service) + + def remove_service_advertisement(self, service: ServiceDescriptor): + super().remove_service_advertisement(service) + + def logoff_network(self) -> bool: + return super().logoff_network() diff --git a/agent/sn_agent/network/dht/bucketset.py b/agent/sn_agent/network/dht/bucketset.py new file mode 100644 index 0000000..f6b896c --- /dev/null +++ b/agent/sn_agent/network/dht/bucketset.py @@ -0,0 +1,43 @@ +import heapq +import threading + +from .peer import Peer + + +def largest_differing_bit(value1, value2): + distance = value1 ^ value2 + length = -1 + while distance: + distance >>= 1 + length += 1 + return max(0, length) + + +class BucketSet(object): + def __init__(self, bucket_size, buckets, id): + self.id = id + self.bucket_size = bucket_size + self.buckets = [list() for _ in range(buckets)] + self.lock = threading.Lock() + + def insert(self, peer): + if peer.id != self.id: + bucket_number = largest_differing_bit(self.id, peer.id) + peer_triple = peer.astriple() + with self.lock: + bucket = self.buckets[bucket_number] + if peer_triple in bucket: + bucket.pop(bucket.index(peer_triple)) + elif len(bucket) >= self.bucket_size: + bucket.pop(0) + bucket.append(peer_triple) + + def nearest_nodes(self, key): + + with self.lock: + def keyfunction(peer): + return key ^ peer[2] # ideally there would be a better way with names? Instead of storing triples it would be nice to have a dict + + peers = (peer for bucket in self.buckets for peer in bucket) + best_peers = heapq.nsmallest(self.bucket_size, peers, keyfunction) + return [Peer(*peer) for peer in best_peers] diff --git a/agent/sn_agent/network/dht/dht.py b/agent/sn_agent/network/dht/dht.py new file mode 100644 index 0000000..c70d358 --- /dev/null +++ b/agent/sn_agent/network/dht/dht.py @@ -0,0 +1,124 @@ +import logging +import random +import threading +import time + +import nacl.encoding +import nacl.signing + +from .handler import DHTRequestHandler +from .server import DHTServer +from .settings import DHTSettings +from .bucketset import BucketSet +from .hashing import hash_function, random_id +from .peer import Peer +from .shortlist import Shortlist + +logger = logging.getLogger(__name__) + + +class DHT(object): + def __init__(self, host=None, port=None, key=None, my_id=None): + self.settings = DHTSettings() + + if not my_id: + my_id = random_id() + + if not host: + host = "0.0.0.0" + + if not port: + port = random.randint(5000, 10000) + + if not key: + key = nacl.signing.SigningKey.generate() + self.my_key = key + + self.peer = Peer(host, port, my_id) + self.data = {} + self.buckets = BucketSet(self.settings.K, self.settings.ID_BITS, self.peer.id) + self.rpc_ids = {} # should probably have a lock for this + + self.server = DHTServer(self.peer.address(), DHTRequestHandler) + self.server.dht = self + self.server_thread = threading.Thread(target=self.server.serve_forever) + self.server_thread.daemon = True + self.server_thread.start() + + if self.settings.USE_UPNP: + self.server.try_upnp_portmap(port) + + if self.settings.NEEDS_BOOTING: + self.is_boot_node = False + self.bootstrap(self.settings.BOOT_HOST, self.settings.BOOT_PORT) + else: + self.is_boot_node = True + + logger.debug('DHT Server started') + + def iterative_find_nodes(self, key, boot_peer=None): + logger.debug('Finding nearest nodes...') + shortlist = Shortlist(self.settings.K, key) + shortlist.update(self.buckets.nearest_nodes(key)) + + if boot_peer: + logger.debug('This node a boot node: %s', boot_peer) + rpc_id = random.getrandbits(self.settings.ID_BITS) + self.rpc_ids[rpc_id] = shortlist + boot_peer.find_node(key, rpc_id, socket=self.server.socket, peer_id=self.peer.id) + + while (not shortlist.complete()) or boot_peer: + nearest_nodes = shortlist.get_next_iteration(self.settings.ALPHA) + for peer in nearest_nodes: + logger.debug('Nearest Node: %s', peer) + shortlist.mark(peer) + rpc_id = random.getrandbits(self.settings.ID_BITS) + self.rpc_ids[rpc_id] = shortlist + peer.find_node(key, rpc_id, socket=self.server.socket, peer_id=self.peer.id) + time.sleep(self.settings.ITERATION_SLEEP) + boot_peer = None + + return shortlist.results() + + def iterative_find_value(self, key): + shortlist = Shortlist(self.settings.K, key) + shortlist.update(self.buckets.nearest_nodes(key)) + while not shortlist.complete(): + nearest_nodes = shortlist.get_next_iteration(self.settings.ALPHA) + for peer in nearest_nodes: + shortlist.mark(peer) + rpc_id = random.getrandbits(self.settings.ID_BITS) + self.rpc_ids[rpc_id] = shortlist + peer.find_value(key, rpc_id, socket=self.server.socket, peer_id=self.peer.id) + time.sleep(self.settings.ITERATION_SLEEP) + return shortlist.completion_result() + + def bootstrap(self, boot_host, boot_port): + boot_peer = Peer(boot_host, boot_port, 0) + self.iterative_find_nodes(self.peer.id, boot_peer=boot_peer) + + def __getitem__(self, key, bypass=0): + hashed_key = hash_function(key.encode("ascii")) + if hashed_key in self.data: + return self.data[hashed_key]["content"] + result = self.iterative_find_value(hashed_key) + if result: + return result["content"] + + raise KeyError + + def __setitem__(self, key, content): + content = str(content) + hashed_key = hash_function(key.encode("ascii")) + nearest_nodes = self.iterative_find_nodes(hashed_key) + value = { + "content": content, + "key": self.my_key.verify_key.encode(encoder=nacl.encoding.Base64Encoder).decode("utf-8"), + "signature": nacl.encoding.Base64Encoder.encode(self.my_key.sign(content.encode("ascii"))).decode("utf-8") + } + + if not nearest_nodes: + self.data[hashed_key] = value + + for node in nearest_nodes: + node.store(hashed_key, value, socket=self.server.socket, peer_id=self.peer.id) diff --git a/agent/sn_agent/network/dht/handler.py b/agent/sn_agent/network/dht/handler.py new file mode 100644 index 0000000..028596a --- /dev/null +++ b/agent/sn_agent/network/dht/handler.py @@ -0,0 +1,102 @@ +import json +import logging +import socketserver + +import nacl.encoding +import nacl.signing + +from .peer import Peer + +logger = logging.getLogger(__name__) + + +class DHTRequestHandler(socketserver.BaseRequestHandler): + def handle(self): + try: + message = json.loads(self.request[0].decode("utf-8").strip()) + message_type = message["message_type"] + + logger.debug('Handling message type" %s', message_type) + + if message_type == "ping": + self.handle_ping(message) + elif message_type == "pong": + self.handle_pong(message) + elif message_type == "find_node": + self.handle_find(message) + elif message_type == "find_value": + self.handle_find(message, find_value=True) + elif message_type == "found_nodes": + self.handle_found_nodes(message) + elif message_type == "found_value": + self.handle_found_value(message) + elif message_type == "store": + self.handle_store(message) + elif message_type == "push": + self.handle_push(message) + except: + return + + client_host, client_port = self.client_address + peer_id = message["peer_id"] + new_peer = Peer(client_host, client_port, peer_id) + self.server.dht.buckets.insert(new_peer) + + def handle_ping(self, message): + client_host, client_port = self.client_address + id = message["peer_id"] + peer = Peer(client_host, client_port, id) + peer.pong(socket=self.server.socket, peer_id=self.server.dht.peer.id, lock=self.server.send_lock) + + def handle_pong(self, message): + pass + + def handle_find(self, message, find_value=False): + key = message["id"] + id = message["peer_id"] + client_host, client_port = self.client_address + peer = Peer(client_host, client_port, id) + response_socket = self.request[1] + if find_value and (key in self.server.dht.data): + value = self.server.dht.data[key] + peer.found_value(id, value, message["rpc_id"], socket=response_socket, peer_id=self.server.dht.peer.id, lock=self.server.send_lock) + else: + nearest_nodes = self.server.dht.buckets.nearest_nodes(id) + if not nearest_nodes: + nearest_nodes.append(self.server.dht.peer) + nearest_nodes = [nearest_peer.astriple() for nearest_peer in nearest_nodes] + peer.found_nodes(id, nearest_nodes, message["rpc_id"], socket=response_socket, peer_id=self.server.dht.peer.id, lock=self.server.send_lock) + + def handle_found_nodes(self, message): + rpc_id = message["rpc_id"] + shortlist = self.server.dht.rpc_ids[rpc_id] + del self.server.dht.rpc_ids[rpc_id] + nearest_nodes = [Peer(*peer) for peer in message["nearest_nodes"]] + shortlist.update(nearest_nodes) + + def handle_found_value(self, message): + rpc_id = message["rpc_id"] + shortlist = self.server.dht.rpc_ids[rpc_id] + del self.server.dht.rpc_ids[rpc_id] + shortlist.set_complete(message["value"]) + + def handle_store(self, message): + key = message["id"] + + # Verify updated message is signed with same key. + if key in self.server.dht.data: + # Signature is valid. + # (Raises exception if not.) + ret = nacl.signing.VerifyKey(self.server.dht.data[key]["key"], encoder=nacl.encoding.Base64Encoder).verify(nacl.encoding.Base64Encoder.decode(message["value"]["signature"])) + if type(ret) == bytes: + ret = ret.decode("utf-8") + + # Check that the signature corresponds to this message. + message_content = message["value"]["content"] + if ret != message_content: + return + + self.server.dht.data[key] = message["value"] + + def handle_push(self, message): + pass diff --git a/agent/sn_agent/network/dht/hashing.py b/agent/sn_agent/network/dht/hashing.py new file mode 100644 index 0000000..0c542ca --- /dev/null +++ b/agent/sn_agent/network/dht/hashing.py @@ -0,0 +1,14 @@ +import hashlib +import random + +id_bits = 128 + + +def hash_function(data): + return int(hashlib.md5(data).hexdigest(), 16) + + +def random_id(seed=None): + if seed: + random.seed(seed) + return random.randint(0, (2 ** id_bits) - 1) diff --git a/agent/sn_agent/network/dht/lazymq.py b/agent/sn_agent/network/dht/lazymq.py new file mode 100644 index 0000000..e69de29 diff --git a/agent/sn_agent/network/dht/peer.py b/agent/sn_agent/network/dht/peer.py new file mode 100644 index 0000000..2b3af77 --- /dev/null +++ b/agent/sn_agent/network/dht/peer.py @@ -0,0 +1,95 @@ +import json + +import logging + +logger = logging.getLogger(__name__) + + +class Peer(object): + ''' DHT Peer Information''' + + def __init__(self, host, port, id): + self.host, self.port, self.id = host, port, id + + def astriple(self): + return self.host, self.port, self.id + + def address(self): + return self.host, self.port + + def __repr__(self): + return repr(self.astriple()) + + def _sendmessage(self, message, sock=None, peer_id=None, lock=None): + message["peer_id"] = peer_id # more like sender_id + encoded = json.dumps(message) + + try: + encoded = str(encoded).encode("ascii") + except: + logger.error('There was a problem encoding the encode JSON') + return + + logger.debug('Sending Message: %s to %s on port %s via %s', message, self.host, self.port, type(sock)) + + if sock: + if lock: + with lock: + sock.sendto(encoded, (self.host, self.port)) + else: + + sock.sendto(encoded, (self.host, self.port)) + + def ping(self, socket=None, peer_id=None, lock=None): + message = { + "message_type": "ping" + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) + + def pong(self, socket=None, peer_id=None, lock=None): + message = { + "message_type": "pong" + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) + + def store(self, key, value, socket=None, peer_id=None, lock=None): + message = { + "message_type": "store", + "id": key, + "value": value + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) + + def find_node(self, id, rpc_id, socket=None, peer_id=None, lock=None): + message = { + "message_type": "find_node", + "id": id, + "rpc_id": rpc_id + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) + + def found_nodes(self, id, nearest_nodes, rpc_id, socket=None, peer_id=None, lock=None): + message = { + "message_type": "found_nodes", + "id": id, + "nearest_nodes": nearest_nodes, + "rpc_id": rpc_id + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) + + def find_value(self, id, rpc_id, socket=None, peer_id=None, lock=None): + message = { + "message_type": "find_value", + "id": id, + "rpc_id": rpc_id + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) + + def found_value(self, id, value, rpc_id, socket=None, peer_id=None, lock=None): + message = { + "message_type": "found_value", + "id": id, + "value": value, + "rpc_id": rpc_id + } + self._sendmessage(message, socket, peer_id=peer_id, lock=lock) diff --git a/agent/sn_agent/network/dht/server.py b/agent/sn_agent/network/dht/server.py new file mode 100644 index 0000000..472678e --- /dev/null +++ b/agent/sn_agent/network/dht/server.py @@ -0,0 +1,23 @@ +import logging +import miniupnpc +import socketserver +import threading + +logger = logging.getLogger(__name__) + + +class DHTServer(socketserver.ThreadingMixIn, socketserver.UDPServer): + def __init__(self, host_address, handler_cls): + socketserver.UDPServer.__init__(self, host_address, handler_cls) + self.send_lock = threading.Lock() + + def try_upnp_portmap(self, host_address): + port = host_address[1] + upnp = miniupnpc.UPnP() + upnp.discover() + try: + upnp.selectigd() + result = upnp.addportmapping(port, 'TCP', upnp.lanaddr, port, 'SN Agent dht port: %u' % port, '', ) + logger.debug('UPnP result: %s', result) + except: + logging.error("Unable to port map using UPnP") diff --git a/agent/sn_agent/network/dht/settings.py b/agent/sn_agent/network/dht/settings.py new file mode 100644 index 0000000..fe42cff --- /dev/null +++ b/agent/sn_agent/network/dht/settings.py @@ -0,0 +1,18 @@ +from sn_agent import SettingsBase + + +class DHTSettings(SettingsBase): + def __init__(self, **custom_settings): + self.ITERATION_SLEEP = 1 + self.ID_BITS = 128 + self.ALPHA = 3 + self.K = 20 + self._ENV_PREFIX = 'SN_DHT_' + + self.USE_UPNP = True + + self.NEEDS_BOOTING = True + self.BOOT_HOST = 'sn.jensenbox.com' + self.BOOT_PORT = 6881 + + super().__init__(**custom_settings) diff --git a/agent/sn_agent/network/dht/shortlist.py b/agent/sn_agent/network/dht/shortlist.py new file mode 100644 index 0000000..154e203 --- /dev/null +++ b/agent/sn_agent/network/dht/shortlist.py @@ -0,0 +1,82 @@ +import logging +import threading + +from .peer import Peer + +logger = logging.getLogger(__name__) + + +class Shortlist(object): + def __init__(self, k, key): + self.k = k + self.key = key + self.list = list() + self.lock = threading.Lock() + self.completion_value = None + + def set_complete(self, value): + with self.lock: + self.completion_value = value + + def completion_result(self): + with self.lock: + return self.completion_value + + def update(self, nodes): + for node in nodes: + self._update_one(node) + + def _update_one(self, node): + if node.id == self.key or self.completion_value: + return + with self.lock: + for i in range(len(self.list)): + if node.id == self.list[i][0][2]: + break + if node.id ^ self.key < self.list[i][0][2] ^ self.key: + self.list.insert(i, (node.astriple(), False)) + self.list = self.list[:self.k] + break + else: + if len(self.list) < self.k: + self.list.append((node.astriple(), False)) + + def mark(self, node): + with self.lock: + for i in range(len(self.list)): + if node.id == self.list[i][0][2]: + self.list[i] = (node.astriple(), True) + + def complete(self): + + logger.debug('Completion value is: %s', self.completion_value) + if self.completion_value: + return True + + with self.lock: + for node, completed in self.list: + if not completed: + return False + return True + + def get_next_iteration(self, alpha): + + logger.debug('Getting next iteration alpha: %s', alpha) + + if self.completion_value: + return [] + + next_iteration = [] + with self.lock: + for node, completed in self.list: + logger.debug('Getting next iteration %s, %s', node, completed) + if not completed: + next_iteration.append(Peer(*node)) + if len(next_iteration) >= alpha: + break + + return next_iteration + + def results(self): + with self.lock: + return [Peer(*node) for (node, completed) in self.list] diff --git a/docker-compose.demo.yml b/docker-compose.demo.yml new file mode 100644 index 0000000..9b750e5 --- /dev/null +++ b/docker-compose.demo.yml @@ -0,0 +1,58 @@ + +version: "3" + +services: + + agent: + build: agent + environment: + - SN_AGENT_ID=b545478a-971a-48ec-bc56-4b9b7176799c + - SN_SERVICE_ADAPTER_CONFIG_FILE=service_adapter_config_example.yml + - PYTHONPATH=/code + ports: + - "8000:8000" + volumes: + - ./agent-data:/data + + agent-web: + build: agent-web + ports: + - "8080:8080" + environment: + - SN_WEB_COOKIE_SECRET=kubr6DvIuYj4GREdgXq5CCoL5qHQWglj8IECTsI79mY= + - PYTHONPATH=/code + + geth: + build: geth + ports: + - "8545:8545" + - "8546:8546" + - "30303:30303" + + volumes: + - ./geth-data:/data + + deploy: + resources: + limits: + cpus: '0.001' + memory: 50M + reservations: + cpus: '0.0001' + memory: 20M + + parity: + build: parity + volumes: + - ./parity-data:/data + + + truffle: + build: truffle + + ipfs: + build: ipfs + environment: + - IPFS_PATH=/data + volumes: + - ./ipfs-data:/data diff --git a/docker-compose.yml b/docker-compose.yml index e4dff6c..50508a8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,31 +21,3 @@ services: environment: - SN_WEB_COOKIE_SECRET=kubr6DvIuYj4GREdgXq5CCoL5qHQWglj8IECTsI79mY= - PYTHONPATH=/code - - geth: - build: geth - ports: - - "8545:8545" - - "8546:8546" - - "30303:30303" - - volumes: - - ./geth-data:/data - - deploy: - resources: - limits: - cpus: '0.001' - memory: 50M - reservations: - cpus: '0.0001' - memory: 20M - - parity: - build: parity - volumes: - - ./parity-data:/data - - - truffle: - build: truffle