diff --git a/.env b/.env deleted file mode 100644 index b99628a..0000000 --- a/.env +++ /dev/null @@ -1,3 +0,0 @@ -TOKEN= -RAVENVERSE_URL=http://0.0.0.0:9999 -RAVENVERSE_FTP_URL=0.0.0.0 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..0501126 --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +TOKEN= +RAVENVERSE_URL=http://0.0.0.0:8081 +RAVENVERSE_FTP_URL=0.0.0.0 +RAVENAUTH_URL=http://0.0.0.0:8000 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 090a1f0..857d7de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea .DS_Store +.env \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..85ebeb8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.7.11 + +ARG DEBIAN_FRONTEND=noninteractive + +COPY . / + +RUN python -m ensurepip --upgrade +RUN python -m pip install --upgrade pip + +RUN pip install -r requirements.txt + +CMD ["run_distributed_client.py"] +ENTRYPOINT ["python"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..eff9764 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,4 @@ +version: '3.3' +services: + ravsock: + build: . diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..a6a0a33 --- /dev/null +++ b/gui.py @@ -0,0 +1,136 @@ +import eel +import logging +import os +import psutil +import shutil +from hurry.filesize import size + +os.environ['RAVENVERSE_URL'] = "http://server.ravenverse.ai" +os.environ['RAVENVERSE_FTP_HOST'] = "server.ravenverse.ai" +os.environ['RAVENVERSE_FTP_URL'] = "server.ravenverse.ai" +os.environ['RAVENAUTH_URL'] = "https://auth.ravenverse.ai" + +eel.init('web') + + +@eel.expose +def disconnect(): + if g.client.connected: + g.logger.debug("Disconnecting...") + if g.client.connected: + g.client.emit("disconnect", namespace="/client") + g.logger.debug("Disconnected") + g.logger.debug("") + + return True + + +def close_callback(a, b): + disconnect() + + +from ravpy.globals import g + + +class CustomHandler(logging.Handler): + def __init__(self): + logging.Handler.__init__(self) + + def emit(self, record): + print("Custom", record) + eel.getLog({"asctime": record.asctime, "threadName": record.threadName, "levelname": record.levelname, + "message": record.message}) + return record + + +log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") +my_handler = CustomHandler() +my_handler.setLevel(logging.DEBUG) +my_handler.setFormatter(log_formatter) +g.logger.addHandler(my_handler) + + +@eel.expose +def verify_access_token(access_token): + from ravpy.utils import verify_token + if verify_token(access_token): + return [access_token, "success", ""] + else: + return [access_token, "failure", "Invalid access token!"] + + +@eel.expose +def get_system_config(): + ram_total = str(size(psutil.virtual_memory().total)) + ram_available = str(size(psutil.virtual_memory().available)) + cpu_count = psutil.cpu_count(logical=False) + cpu_percent = psutil.cpu_percent() + total, used, free = shutil.disk_usage("/") + storage_total = total // (2 ** 30) + storage_available = free // (2 ** 30) + + return {"ram_total": ram_total, "ram_available": ram_available, + "cpu_count": cpu_count, "cpu_percent": cpu_percent, "storage_total": storage_total, + "storage_available": storage_available} + + +@eel.expose +def get_logs(skip, limit): + with open("debug.log", "r") as f: + logs = f.readlines()[skip:] + print(logs) + + +@eel.expose +def participate(token): + from ravpy.distributed.benchmarking import benchmark + from ravpy.utils import initialize_ftp_client + from ravpy.initialize import initialize + + # Initialize + socket_client = initialize(ravenverse_token=token) + + if socket_client is None: + disconnect() + eel.clientDisconnected() + return False + else: + eel.clientConnected() + + # get ftp client + ftp_client = initialize_ftp_client() + + if ftp_client is None: + disconnect() + eel.clientDisconnected() + return False + + # do benchmark + benchmark() + + g.logger.debug("") + g.logger.debug("Ravpy is waiting for graphs/subgraphs/ops...") + g.logger.debug("Warning: Do not close Ravpy if you like to " + "keep participating and keep earning Raven tokens\n") + + return True + + +@eel.expose +def get_subgraphs(): + subgraphs = g.ravdb.get_subgraphs() + subgraphs = [sg.as_dict() for sg in subgraphs] + return subgraphs + + +@eel.expose +def delete_subgraphs(): + g.ravdb.delete_subgraphs() + return True + + +g.ravdb.create_database() +g.ravdb.create_tables() + + +eel.start('main.html', close_callback=close_callback) diff --git a/ravpy/__init__.py b/ravpy/__init__.py index e69de29..2721361 100644 --- a/ravpy/__init__.py +++ b/ravpy/__init__.py @@ -0,0 +1,2 @@ +from .initialize import initialize +from .distributed.participate import participate diff --git a/ravpy/config.py b/ravpy/config.py index 2529b8b..43d0af3 100644 --- a/ravpy/config.py +++ b/ravpy/config.py @@ -3,7 +3,7 @@ from pathlib import Path BASE_DIR = os.path.join(str(Path.home()), "ravenverse/ravpy") - +PROJECT_DIR = pathlib.Path(__file__).parent.parent.resolve() CONTEXT_FOLDER = os.path.join(BASE_DIR, "contexts") PARAMS_DIR = os.path.join(BASE_DIR, "params") @@ -13,12 +13,21 @@ RAVENVERSE_URL = os.environ.get("RAVENVERSE_URL") RAVENVERSE_FTP_URL = os.environ.get("RAVENVERSE_FTP_URL") -BENCHMARK_FILE_NAME = "ravpy/distributed/benchmark.json" +BENCHMARK_FILE_NAME = os.path.join(PROJECT_DIR, "ravpy/distributed/benchmark.json") TYPE = "client" ENCRYPTION = False -FTP_TEMP_FILES_FOLDER = os.path.join(os.getcwd(), "ravpy/distributed/temp_files") -FTP_DOWNLOAD_FILES_FOLDER = os.path.join(os.getcwd(), "ravpy/distributed/downloads") +FTP_TEMP_FILES_FOLDER = os.path.join(PROJECT_DIR, "ravpy/distributed/temp_files") +FTP_DOWNLOAD_FILES_FOLDER = os.path.join(PROJECT_DIR, "ravpy/distributed/downloads") + +os.makedirs(FTP_TEMP_FILES_FOLDER, exist_ok=True) +os.makedirs(FTP_DOWNLOAD_FILES_FOLDER, exist_ok=True) + +RAVPY_LOG_FILE = os.path.join(PROJECT_DIR, "debug.log") + +BENCHMARK_DOWNLOAD_PATH = os.path.join(PROJECT_DIR, "ravpy/distributed/downloads/") +TEMP_FILES_PATH = os.path.join(PROJECT_DIR, "ravpy/distributed/temp_files/") +RAVENAUTH_TOKEN_VERIFY_URL = "{}{}".format(os.environ.get("RAVENAUTH_URL"), "/api/token/verify/") -RAVPY_LOG_FILE = os.path.join(pathlib.Path(__file__).parent.parent.resolve(), "debug.log") +DATABASE_URI = "sqlite:///{}/{}".format(PROJECT_DIR, "database.db") diff --git a/ravpy/db/__init__.py b/ravpy/db/__init__.py new file mode 100644 index 0000000..5cc4e19 --- /dev/null +++ b/ravpy/db/__init__.py @@ -0,0 +1,2 @@ +from .models import Subgraph +from .manager import DBManager diff --git a/ravpy/db/manager.py b/ravpy/db/manager.py new file mode 100644 index 0000000..11861e6 --- /dev/null +++ b/ravpy/db/manager.py @@ -0,0 +1,122 @@ +import sqlalchemy +import sqlalchemy as db +from sqlalchemy.orm import sessionmaker +from sqlalchemy_utils import create_database as cd +from sqlalchemy_utils import database_exists, get_tables +from sqlalchemy_utils import drop_database as dba + +from .models import Base, Subgraph +from ..config import DATABASE_URI + + +class DBManager: + def __init__(self): + self.create_database() + self.engine = self.connect() + self.logger = None + + def set_logger(self, logger): + self.logger = logger + + def get_session(self): + Session = sessionmaker(bind=self.engine, expire_on_commit=False) + return Session + + def connect(self): + engine = db.create_engine(DATABASE_URI, isolation_level='READ UNCOMMITTED') + Base.metadata.bind = engine + return engine + + def create_database(self): + if not database_exists(DATABASE_URI): + cd(DATABASE_URI) + print('Database created') + + def drop_database(self): + if database_exists(DATABASE_URI): + dba(DATABASE_URI) + print('Database dropped') + + def create_tables(self): + """ + Create tables + """ + Base.metadata.create_all(self.engine, checkfirst=True) + + def add_subgraph(self, **kwargs): + """ + Create a subgraph and add values + :param kwargs: subgraph details + """ + Session = self.get_session() + with Session.begin() as session: + + subgraph = self.find_subgraph(graph_id=kwargs['graph_id'], subgraph_id=kwargs['subgraph_id']) + if subgraph is None: + # create new subgraph + subgraph = Subgraph() + for key, value in kwargs.items(): + setattr(subgraph, key, value) + session.add(subgraph) + self.logger.debug("Subgraph created") + else: + self.logger.debug("Subgraph available") + return subgraph + + def find_subgraph(self, graph_id, subgraph_id): + """ + Find a subgraph + :param graph_id: Graph id + :param subgraph_id: subgraph id + :return: subgraph object + """ + Session = self.get_session() + with Session.begin() as session: + subgraph = ( + session.query(Subgraph).filter( + Subgraph.graph_id == graph_id, Subgraph.subgraph_id == subgraph_id, + ).first() + ) + return subgraph + + def update_subgraph(self, subgraph, **kwargs): + """ + Update a subgraph + :param subgraph: subgraph object + :param kwargs: details + :return: updated subgraph object + """ + Session = self.get_session() + with Session.begin() as session: + for key, value in kwargs.items(): + setattr(subgraph, key, value) + session.add(subgraph) + return subgraph + + def delete_subgraph(self, obj): + """ + Delete subgraph object + :param obj: subgraph object + :return: None + """ + Session = self.get_session() + with Session.begin() as session: + session.delete(obj) + + def get_subgraphs(self): + """ + Fetch all subgraphs + :return: list of subgraphs + """ + Session = self.get_session() + with Session.begin() as session: + return session.query(Subgraph).order_by(Subgraph.created_at.desc()).all() + + def delete_subgraphs(self): + """ + Delete all subgraphs + :return: None + """ + Session = self.get_session() + with Session.begin() as session: + session.query(Subgraph).delete() diff --git a/ravpy/db/models.py b/ravpy/db/models.py new file mode 100644 index 0000000..77d9143 --- /dev/null +++ b/ravpy/db/models.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import datetime +import sqlalchemy as sa +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import Float +from sqlalchemy import Integer +from sqlalchemy import String +from sqlalchemy import orm + +metadata = sa.MetaData() +Base = orm.declarative_base(metadata=metadata) + + +class Subgraph(Base): + __tablename__ = 'subgraphs' + id = Column(Integer, primary_key=True) + graph_id = Column(Integer, nullable=False) + subgraph_id = Column(Integer, nullable=False) + status = Column(String(50), nullable=True, default=None) + progress = Column(Float, nullable=True, default=None) + tokens = Column(Float, nullable=True, default=None) + created_at = Column(DateTime, default=datetime.datetime.utcnow) + + def as_dict(self): + values = {c.name: getattr(self, c.name) for c in self.__table__.columns} + values['created_at'] = values['created_at'].strftime("%Y-%m-%d %H:%M:%S") + return values diff --git a/ravpy/distributed/benchmark.json b/ravpy/distributed/benchmark.json index d318313..21339ca 100644 --- a/ravpy/distributed/benchmark.json +++ b/ravpy/distributed/benchmark.json @@ -1 +1 @@ -[{"op_id": 2, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "square", "params": {}}, {"op_id": 4, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "cube_root", "params": {}}, {"op_id": 6, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "absolute", "params": {}}, {"op_id": 8, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "matrix_sum", "params": {}}, {"op_id": 10, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "min", "params": {}}, {"op_id": 12, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "max", "params": {}}, {"op_id": 14, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "argmax", "params": {}}, {"op_id": 16, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "argmin", "params": {}}, {"op_id": 18, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "transpose", "params": {}}, {"op_id": 20, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "exponential", "params": {}}, {"op_id": 22, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "sort", "params": {}}, {"op_id": 25, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "addition", "params": {}}, {"op_id": 28, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "subtraction", "params": {}}, {"op_id": 31, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "multiplication", "params": {}}, {"op_id": 34, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "power", "params": {}}] \ No newline at end of file +[{"op_id": 2, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "square", "params": {}}, {"op_id": 4, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "cube_root", "params": {}}, {"op_id": 6, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "absolute", "params": {}}, {"op_id": 8, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "matrix_sum", "params": {}}, {"op_id": 10, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "min", "params": {}}, {"op_id": 12, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "max", "params": {}}, {"op_id": 14, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "argmax", "params": {}}, {"op_id": 16, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "argmin", "params": {}}, {"op_id": 18, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "transpose", "params": {}}, {"op_id": 20, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "exponential", "params": {}}, {"op_id": 22, "values": [{"value": [[0.3631658436056542, 0.3771815454382733, 0.8594016439073865, 0.05726714637859631, 0.4275623426080757], [0.8375701874521871, 0.4592154167090502, 0.682271933358012, 0.5646052761737979, 0.26399901595145336], [0.46822458979223125, 0.18634697752694784, 0.5732316786217657, 0.6056714628174507, 0.5282834154533805], [0.7201091813882491, 0.7241274529709905, 0.4056821924851388, 0.07909783412481852, 0.9617915703759579], [0.30318477338898864, 0.1818823691577709, 0.6103249343557879, 0.39524829285981566, 0.5800398402795025]]}], "op_type": "unary", "operator": "sort", "params": {}}, {"op_id": 25, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "addition", "params": {}}, {"op_id": 28, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "subtraction", "params": {}}, {"op_id": 31, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "multiplication", "params": {}}, {"op_id": 34, "values": [{"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}, {"value": [[0.2354680068490863, 0.32564373263284785, 0.5046043720985676, 0.22498776205647286, 0.5087178637009997], [0.08929225852698419, 0.10393547471624143, 0.3034992265525378, 0.6318728499269698, 0.09378128963350707], [0.9715402429374916, 0.4821253826417833, 0.34844373662083095, 0.2528055071171341, 0.2573120529971662], [0.7824634922839924, 0.9311343807629027, 0.968404179646788, 0.4992724266843359, 0.8375160726585381], [0.021473677988800843, 0.44050088265597964, 0.283720364881099, 0.37163149181647237, 0.6998221781668889]]}], "op_type": "binary", "operator": "power", "params": {}}] diff --git a/ravpy/distributed/benchmarking.py b/ravpy/distributed/benchmarking.py index c65a220..d492263 100644 --- a/ravpy/distributed/benchmarking.py +++ b/ravpy/distributed/benchmarking.py @@ -1,68 +1,19 @@ -import os -import ast import json +import os import time -import speedtest -from ..strings import functions - from .compute import compute_locally_bm from .evaluate import waitInterval -from ..config import RAVENVERSE_URL, BENCHMARK_FILE_NAME, RAVENVERSE_FTP_URL -from ..ftp import check_credentials -from ..ftp import get_client as get_ftp_client +from ..config import RAVENVERSE_URL, BENCHMARK_FILE_NAME from ..globals import g -from ..utils import download_file, get_key, setTimeout, get_ftp_credentials - - -def initialize(): - credentials = get_ftp_credentials() - - if credentials is None: - g.logger.debug("Unable to fetch credentials") - return - - creds = ast.literal_eval(credentials['ftp_credentials']) - time.sleep(2) - - try: - if RAVENVERSE_FTP_URL != 'localhost' and RAVENVERSE_FTP_URL != '0.0.0.0': - wifi = speedtest.Speedtest() - upload_speed = int(wifi.upload()) - download_speed = int(wifi.download()) - upload_speed = upload_speed / 8 - download_speed = download_speed / 8 - if upload_speed <= 3000000: - upload_multiplier = 1 - elif upload_speed < 80000000: - upload_multiplier = int((upload_speed / 80000000) * 1000) - else: - upload_multiplier = 1000 - - if download_speed <= 3000000: - download_multiplier = 1 - elif download_speed < 80000000: - download_multiplier = int((download_speed / 80000000) * 1000) - else: - download_multiplier = 1000 - - g.ftp_upload_blocksize = 8192 * upload_multiplier - g.ftp_download_blocksize = 8192 * download_multiplier - - else: - g.ftp_upload_blocksize = 8192 * 10000 - g.ftp_download_blocksize = 8192 * 10000 - - except Exception as e: - g.ftp_upload_blocksize = 8192 * 1000 - g.ftp_download_blocksize = 8192 * 1000 - - g.ftp_client = get_ftp_client(creds['username'], creds['password']) +from ..strings import functions +from ..utils import download_file, get_key, setTimeout def benchmark(): - g.logger.debug("Benchmarking in progress...") - initialize() + g.logger.debug("") + g.logger.debug("Starting benchmarking...") + client = g.client initialTimeout = g.initialTimeout @@ -75,7 +26,6 @@ def benchmark(): benchmark_results = {} for benchmark_op in benchmark_ops: - # print("BM OP inside enumerate: ",benchmark_op) operator = get_key(benchmark_op['operator'], functions) t1 = time.time() compute_locally_bm(*benchmark_op['values'], op_type=benchmark_op['op_type'], operator=operator) @@ -85,11 +35,11 @@ def benchmark(): for file in os.listdir(): if file.endswith(".zip"): os.remove(file) - + + g.logger.debug("Benchmarking completed successfully!") g.logger.debug("Emitting Benchmark Results...") client.emit("benchmark_callback", data=json.dumps(benchmark_results), namespace="/client") client.sleep(1) g.logger.debug("Benchmarking Complete!") setTimeout(waitInterval, initialTimeout) - return benchmark_results diff --git a/ravpy/distributed/compute.py b/ravpy/distributed/compute.py index bf1dd97..7cd27a5 100644 --- a/ravpy/distributed/compute.py +++ b/ravpy/distributed/compute.py @@ -1,41 +1,40 @@ +import ast +import json import os import sys -import numpy as np -import json import time -import ast -from ..globals import g -from ..utils import get_key, dump_data, load_data -from ..ftp import check_credentials as check_credentials from ..config import FTP_DOWNLOAD_FILES_FOLDER +from ..globals import g from ..strings import functions - +from ..utils import get_key, dump_data, load_data from .op_functions import * + def compute_locally_bm(*args, **kwargs): operator = kwargs.get("operator", None) op_type = kwargs.get("op_type", None) - param_args =kwargs.get("params",None) + param_args = kwargs.get("params", None) # print("Operator", operator,"Op Type:",op_type) if op_type == "unary": value1 = args[0] - params={} - t1=time.time() - bm_result = get_unary_result(value1['value'], params, operator) - t2=time.time() - return t2-t1 + params = {} + t1 = time.time() + bm_result = get_unary_result(value1['value'], params, operator) + t2 = time.time() + return t2 - t1 elif op_type == "binary": value1 = args[0]['value'] value2 = args[1]['value'] params = {} - t1=time.time() + t1 = time.time() bm_result = get_binary_result(value1, value2, params, operator) - t2=time.time() - return t2-t1 + t2 = time.time() + return t2 - t1 + -# async +# async def compute_locally(payload, subgraph_id, graph_id): try: # print("Computing ",payload["operator"]) @@ -43,14 +42,14 @@ def compute_locally(payload, subgraph_id, graph_id): values = [] - for i in range(len(payload["values"])): if "value" in payload["values"][i].keys(): if "path" not in payload["values"][i].keys(): values.append(payload["values"][i]["value"]) else: - download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER,os.path.basename(payload["values"][i]["path"])) + download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER, + os.path.basename(payload["values"][i]["path"])) value = load_data(download_path).tolist() values.append(value) @@ -63,26 +62,32 @@ def compute_locally(payload, subgraph_id, graph_id): op_type = payload["op_type"] operator = get_key(payload["operator"], functions) - params=payload['params'] + params = payload['params'] for i in params.keys(): if type(params[i]) == str: temp = ast.literal_eval(params[i]) if type(temp) == dict: params[i] = temp - elif type(params[i]) == dict and 'op_id' in params[i].keys(): - op_id = params[i]["op_id"] - param_value = g.outputs[op_id] + elif type(params[i]) == dict: + if 'op_id' in params[i].keys(): + op_id = params[i]["op_id"] + param_value = g.outputs[op_id] + elif 'value' in params[i].keys(): + download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER, + os.path.basename(params[i]["path"])) + param_value = load_data(download_path).tolist() + params[i] = param_value - + if op_type == "unary": result = get_unary_result(payload["values"][0], params, operator) elif op_type == "binary": result = get_binary_result(payload["values"][0], payload["values"][1], params, operator) - if 'sklearn' in str(type(result)): - file_path = upload_result(payload, result, subgraph_id=subgraph_id, graph_id=graph_id) #upload_result(payload, result) + file_path = upload_result(payload, result, subgraph_id=subgraph_id, + graph_id=graph_id) # upload_result(payload, result) return json.dumps({ 'op_type': payload["op_type"], 'file_name': os.path.basename(file_path), @@ -92,7 +97,8 @@ def compute_locally(payload, subgraph_id, graph_id): }) if 'dict' in str(type(result)): - file_path = upload_result(payload, result, subgraph_id=subgraph_id, graph_id=graph_id) #upload_result(payload, result) + file_path = upload_result(payload, result, subgraph_id=subgraph_id, + graph_id=graph_id) # upload_result(payload, result) g.outputs[payload["op_id"]] = result return json.dumps({ @@ -103,13 +109,12 @@ def compute_locally(payload, subgraph_id, graph_id): "status": "success" }) - if not isinstance(result, np.ndarray): result = np.array(result) result_byte_size = result.size * result.itemsize - if result_byte_size < (30 * 1000000)//10000: + if result_byte_size < (30 * 1000000) // 10000: try: result = result.tolist() except: @@ -118,16 +123,17 @@ def compute_locally(payload, subgraph_id, graph_id): g.outputs[payload["op_id"]] = result return json.dumps({ - 'op_type': payload["op_type"], - 'result': result, - 'operator': payload["operator"], - "op_id": payload["op_id"], - "status": "success" + 'op_type': payload["op_type"], + 'result': result, + 'operator': payload["operator"], + "op_id": payload["op_id"], + "status": "success" }) else: - file_path = upload_result(payload, result, subgraph_id=subgraph_id, graph_id=graph_id) #upload_result(payload, result) + file_path = upload_result(payload, result, subgraph_id=subgraph_id, + graph_id=graph_id) # upload_result(payload, result) g.outputs[payload["op_id"]] = result.tolist() return json.dumps({ @@ -239,7 +245,7 @@ def get_unary_result(value1, params, operator): result = cnn_index_2(value1, params=params) elif operator == 'size': result = size(value1, params=params) - + # Machine Learning Algorithms elif operator == 'kmeans': result = kmeans(value1, params=params) @@ -276,6 +282,7 @@ def get_unary_result(value1, params, operator): return result + def get_binary_result(value1, value2, params, operator): if operator == "add": result = np_add(value1, value2, params=params) @@ -317,7 +324,7 @@ def get_binary_result(value1, value2, params, operator): result = np_random_uniform(value1, value2, params=params) elif operator == "arange": result = np_arange(value1, value2, params=params) - + elif operator == 'gather': result = gather(value1, value2, params=params) @@ -329,7 +336,7 @@ def get_binary_result(value1, value2, params, operator): result = one_hot_encoding(value1, value2, params=params) elif operator == 'find_indices': result = find_indices(value1, value2, params=params) - elif operator == 'concatenate': + elif operator == 'concat': result = concatenate(value1, value2, params=params) elif operator == 'join_to_list': result = join_to_list(value1, value2, params=params) @@ -339,9 +346,9 @@ def get_binary_result(value1, value2, params, operator): result = cnn_add_at(value1, value2, params=params) elif operator == 'set_value': result = set_value(value1, value2, params=params) - - - + + + # Machine Learning Algorithms elif operator == "linear_regression": result = linear_regression(value1, value2, params=params) @@ -365,7 +372,7 @@ def get_binary_result(value1, value2, params, operator): result = random_forest_classifier(value1, value2, params=params) elif operator == "random_forest_regressor": result = random_forest_regressor(value1, value2, params=params) - + # Losses elif operator == 'square_loss': @@ -381,36 +388,37 @@ def get_binary_result(value1, value2, params, operator): return result + def upload_result(payload, result, subgraph_id=None, graph_id=None): try: result = result.tolist() except: - result=result + result = result - file_path = dump_data(payload['op_id'],result) + file_path = dump_data(payload['op_id'], result) from zipfile import ZipFile with ZipFile('local_{}_{}.zip'.format(subgraph_id, graph_id), 'a') as zipObj2: - zipObj2.write(file_path, os.path.basename(file_path)) - + zipObj2.write(file_path, os.path.basename(file_path)) + os.remove(file_path) return file_path - + + def emit_error(payload, error, subgraph_id, graph_id): - print("Emit Error") g.error = True - error=str(error) + error = str(error) client = g.client - print(error,payload) + print("Emmit error:{}".format(str(error))) client.emit("op_completed", json.dumps({ - 'op_type': payload["op_type"], - 'error': error, - 'operator': payload["operator"], - "op_id": payload["op_id"], - "status": "failure", - "subgraph_id": subgraph_id, - "graph_id": graph_id + 'op_type': payload["op_type"], + 'error': error, + 'operator': payload["operator"], + "op_id": payload["op_id"], + "status": "failure", + "subgraph_id": subgraph_id, + "graph_id": graph_id }), namespace="/client") try: diff --git a/ravpy/distributed/evaluate.py b/ravpy/distributed/evaluate.py index 64cd1d1..8a419d0 100644 --- a/ravpy/distributed/evaluate.py +++ b/ravpy/distributed/evaluate.py @@ -1,31 +1,37 @@ import json import os import sys +import socket from terminaltables import AsciiTable from zipfile import ZipFile from .compute import compute_locally, emit_error - -from ..utils import setTimeout, stopTimer -from ..globals import g from ..config import FTP_DOWNLOAD_FILES_FOLDER - +from ..globals import g +from ..utils import setTimeout, stopTimer timeoutId = g.timeoutId opTimeout = g.opTimeout initialTimeout = g.initialTimeout client = g.client + @g.client.on('subgraph', namespace="/client") def compute_subgraph(d): global client, timeoutId - + g.logger.debug("") + g.logger.debug("Subgraph received!") + g.logger.debug("Graph id: {}, subgraph id: {}".format(d['graph_id'], d["subgraph_id"])) os.system('clear') # print("Received Subgraph : ",d["subgraph_id"]," of Graph : ",d["graph_id"]) print(AsciiTable([['Provider Dashboard']]).table) g.dashboard_data.append([d["subgraph_id"], d["graph_id"], "Computing"]) print(AsciiTable(g.dashboard_data).table) - + + # create a subgraph row in database + subgraph_obj = g.ravdb.add_subgraph(graph_id=d["graph_id"], subgraph_id=d["subgraph_id"], status="Computing") + g.ravdb.update_subgraph(subgraph=subgraph_obj, status="Computing") + g.has_subgraph = True subgraph_id = d["subgraph_id"] graph_id = d["graph_id"] @@ -37,9 +43,10 @@ def compute_subgraph(d): if subgraph_zip_file_flag == "True": server_file_path = 'zip_{}_{}.zip'.format(subgraph_id, graph_id) - download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER,server_file_path) + download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER, server_file_path) try: + g.ftp_client.ftp.voidcmd('NOOP') g.ftp_client.download(download_path, os.path.basename(server_file_path)) except Exception as error: @@ -47,9 +54,16 @@ def compute_subgraph(d): g.dashboard_data[-1][2] = "Failed" print(AsciiTable([['Provider Dashboard']]).table) print(AsciiTable(g.dashboard_data).table) + + # update subgraph + g.ravdb.update_subgraph(subgraph=subgraph_obj, + status="Failed") + + g.logger.debug("Error: {}".format(str(error))) + g.has_subgraph = False stopTimer(timeoutId) - timeoutId = setTimeout(waitInterval,opTimeout) + timeoutId = setTimeout(waitInterval, opTimeout) delete_dir = FTP_DOWNLOAD_FILES_FOLDER for f in os.listdir(delete_dir): @@ -58,10 +72,10 @@ def compute_subgraph(d): g.delete_files_list = [] g.outputs = {} - print('Error: ', error) emit_error(data[0], error, subgraph_id, graph_id) if 'broken pipe' in str(error).lower() or '421' in str(error).lower(): - print('\n\nYou have encountered an IO based Broken Pipe Error. \nRestart terminal and try connecting again') + g.logger.debug( + '\n\nYou have encountered an IO based Broken Pipe Error. \nRestart terminal and try connecting again') sys.exit() if os.path.exists(download_path): @@ -72,28 +86,33 @@ def compute_subgraph(d): os.remove(download_path) g.ftp_client.delete_file(server_file_path) - for index in data: + for index, op_obj in enumerate(data): - #Perform - operation_type = index["op_type"] - operator = index["operator"] + # Perform + operation_type = op_obj["op_type"] + operator = op_obj["operator"] if operation_type is not None and operator is not None: - result_payload = compute_locally(index, subgraph_id, graph_id) - + result_payload = compute_locally(op_obj, subgraph_id, graph_id) + if not g.error: results.append(result_payload) + + # update subgraph + g.ravdb.update_subgraph(subgraph=subgraph_obj, + progress=((index+1)/len(data))*100) else: break if not g.error: - + # check if file exists zip_file_name = 'local_{}_{}.zip'.format(subgraph_id, graph_id) if os.path.exists(zip_file_name): g.ftp_client.upload(zip_file_name, zip_file_name) os.remove(zip_file_name) - - emit_result_data = {"subgraph_id": d["subgraph_id"],"graph_id":d["graph_id"],"token": g.ravenverse_token, "results":results} + + emit_result_data = {"subgraph_id": d["subgraph_id"], "graph_id": d["graph_id"], "token": g.ravenverse_token, + "results": results} client.emit("subgraph_completed", json.dumps(emit_result_data), namespace="/client") # print('Emitted subgraph_completed') @@ -101,12 +120,17 @@ def compute_subgraph(d): g.dashboard_data[-1][2] = "Computed" print(AsciiTable([['Provider Dashboard']]).table) print(AsciiTable(g.dashboard_data).table) - + + # update subgraph + g.ravdb.update_subgraph(subgraph=subgraph_obj, + status="Computed") + + g.logger.debug("Subgraph computed successfully") g.has_subgraph = False - + stopTimer(timeoutId) - timeoutId = setTimeout(waitInterval,opTimeout) + timeoutId = setTimeout(waitInterval, opTimeout) delete_dir = FTP_DOWNLOAD_FILES_FOLDER for f in os.listdir(delete_dir): @@ -119,17 +143,59 @@ def compute_subgraph(d): @g.client.on('ping', namespace="/client") def ping(d): global client + g.ping_timeout_counter = 0 client.emit('pong', d, namespace='/client') +@g.client.on('redundant_subgraph', namespace="/client") +def redundant_subgraph(d): + subgraph_id = d['subgraph_id'] + graph_id = d['graph_id'] + for i in range(len(g.dashboard_data)): + if g.dashboard_data[i][0] == subgraph_id and g.dashboard_data[i][1] == graph_id: + g.dashboard_data[i][2] = "redundant_computation" + os.system('clear') + print(AsciiTable([['Provider Dashboard']]).table) + print(AsciiTable(g.dashboard_data).table) + def waitInterval(): global client, timeoutId, opTimeout, initialTimeout client = g.client + try: + sock = socket.create_connection(('8.8.8.8',53)) + sock.close() + except Exception as e: + print('\n ----------- Device offline -----------') + os._exit(1) + if g.client.connected: if not g.has_subgraph: client.emit("get_op", json.dumps({ - "message": "Send me an aop" + "message": "Send me an aop" }), namespace="/client") stopTimer(timeoutId) - timeoutId = setTimeout(waitInterval, opTimeout) \ No newline at end of file + timeoutId = setTimeout(waitInterval, opTimeout) + + if not g.is_downloading: + if not g.is_uploading: + if g.noop_counter % 17 == 0: + try: + g.ftp_client.ftp.voidcmd('NOOP') + g.ping_timeout_counter += 1 + except Exception as e: + exit_handler() + os._exit(1) + + if g.ping_timeout_counter > 10: + exit_handler() + os._exit(1) + + g.noop_counter += 1 + +def exit_handler(): + g.logger.debug('Application is Closing!') + if g.client is not None: + g.logger.debug("Disconnecting...") + if g.client.connected: + g.client.emit("disconnect", namespace="/client") \ No newline at end of file diff --git a/ravpy/distributed/participate.py b/ravpy/distributed/participate.py index be441c4..5fb1801 100644 --- a/ravpy/distributed/participate.py +++ b/ravpy/distributed/participate.py @@ -1,14 +1,21 @@ import os +import eel + +from ..globals import g +from ..utils import initialize_ftp_client, disconnect def participate(): - # Connect - download_path = "./ravpy/distributed/downloads/" - temp_files_path = "./ravpy/distributed/temp_files/" - if not os.path.exists(download_path): - os.makedirs(download_path) - if not os.path.exists(temp_files_path): - os.makedirs(temp_files_path) - - from .benchmarking import benchmark - bm_results = benchmark() + # Initialize and create FTP client + res = initialize_ftp_client() + if res is None: + g.logger.error("quitting") + disconnect() + else: + from .benchmarking import benchmark + benchmark() + + g.logger.debug("") + g.logger.debug("Ravpy is waiting for ops and subgraphs...") + g.logger.debug("Warning: Do not close this terminal if you like to " + "keep participating and keep earning Raven tokens\n") diff --git a/ravpy/ftp/__init__.py b/ravpy/ftp/__init__.py index 05868bd..1679232 100644 --- a/ravpy/ftp/__init__.py +++ b/ravpy/ftp/__init__.py @@ -1,8 +1,16 @@ +import os +import socket from ftplib import FTP from ..config import RAVENVERSE_FTP_URL from ..globals import g +try: + import ssl +except ImportError: + _SSLSocket = None +else: + _SSLSocket = ssl.SSLSocket class FTPClient: def __init__(self, host, user, passwd): @@ -12,12 +20,41 @@ def __init__(self, host, user, passwd): self.ftp.set_pasv(True) def download(self, filename, path): - with open(filename, 'wb') as f: - self.ftp.retrbinary('RETR ' + path, f.write, blocksize=g.ftp_download_blocksize) + try: + sock = socket.create_connection(('8.8.8.8',53)) + sock.close() + except Exception as e: + print('\n ----------- Device offline -----------') + os._exit(1) + g.is_downloading = True + + try: + with open(filename, 'wb') as f: + self.ftp.retrbinary('RETR ' + path, f.write, blocksize=g.ftp_download_blocksize) + except Exception as e: + exit_handler() + os._exit(1) + + g.is_downloading = False def upload(self, filename, path): - with open(filename, 'rb') as f: - self.ftp.storbinary('STOR ' + path, f, blocksize=g.ftp_upload_blocksize) + try: + sock = socket.create_connection(('8.8.8.8',53)) + sock.close() + except Exception as e: + print('\n ----------- Device offline -----------') + os._exit(1) + g.is_uploading = True + + try: + with open(filename, 'rb') as f: + # self.ftp.storbinary('STOR ' + path, f, blocksize=g.ftp_upload_blocksize) + self.storbinary('STOR ' + path, f, blocksize=g.ftp_upload_blocksize) + except Exception as e: + exit_handler() + os._exit(1) + + g.is_uploading = False def list_server_files(self): self.ftp.retrlines('LIST') @@ -28,9 +65,52 @@ def delete_file(self, path): def close(self): self.ftp.quit() + def storbinary(self, cmd, fp, blocksize=8192, callback=None, rest=None): + """Store a file in binary mode. A new port is created for you. + + Args: + cmd: A STOR command. + fp: A file-like object with a read(num_bytes) method. + blocksize: The maximum data size to read from fp and send over + the connection at once. [default: 8192] + callback: An optional single parameter callable that is called on + each block of data after it is sent. [default: None] + rest: Passed to transfercmd(). [default: None] + + Returns: + The response code. + """ + self.ftp.voidcmd('TYPE I') + with self.ftp.transfercmd(cmd, rest) as conn: + while 1: + buf = fp.read(blocksize) + if not buf: + break + conn.sendall(buf) + if callback: + callback(buf) + # shutdown ssl layer + if _SSLSocket is not None and isinstance(conn, _SSLSocket): + # conn.unwrap() + pass + return self.ftp.voidresp() + def get_client(username, password): - return FTPClient(host=RAVENVERSE_FTP_URL, user=username, passwd=password) + """ + Create FTP client + :param username: FTP username + :param password: FTP password + :return: FTP client + """ + try: + g.logger.debug("Creating FTP client...") + client = FTPClient(host=RAVENVERSE_FTP_URL, user=username, passwd=password) + g.logger.debug("FTP client created successfully") + return client + except Exception as e: + g.logger.debug("Unable to create FTP client") + os._exit(1) def check_credentials(username, password): @@ -40,3 +120,10 @@ def check_credentials(username, password): except Exception as e: print("Error:{}".format(str(e))) return False + +def exit_handler(): + g.logger.debug('Application is Closing!') + if g.client is not None: + g.logger.debug("Disconnecting...") + if g.client.connected: + g.client.emit("disconnect", namespace="/client") \ No newline at end of file diff --git a/ravpy/globals.py b/ravpy/globals.py index e8d90ca..f59cda4 100644 --- a/ravpy/globals.py +++ b/ravpy/globals.py @@ -1,38 +1,36 @@ -import socketio import os -from .config import RAVENVERSE_URL, TYPE +import socketio + +from .config import RAVENVERSE_URL, TYPE, RAVENVERSE_FTP_URL +from .db import DBManager from .logger import get_logger from .singleton_utils import Singleton def get_client(ravenverse_token): + """ + Connect to Ravebverse and return socket client instance + :param ravenverse_token: authentication token + :return: socket client + """ + g.logger.debug("Connecting to Ravenverse...") + auth_headers = {"token": ravenverse_token} client = socketio.Client(logger=False, request_timeout=100, engineio_logger=False) - @client.on('error', namespace='/client') - def check_error(d): - g.logger.debug("\n======= Error: {} =======".format(d)) - client.disconnect() - os._exit(1) - - class MyCustomNamespace(socketio.ClientNamespace): - def on_connect(self): - pass - - def on_disconnect(self): - pass - - client.register_namespace(MyCustomNamespace('/client')) - try: + g.logger.debug("Ravenverse url: {}?type={}".format(RAVENVERSE_URL, TYPE)) + g.logger.debug("Ravenverse FTP host: {}".format(RAVENVERSE_FTP_URL)) client.connect(url="{}?type={}".format(RAVENVERSE_URL, TYPE), - auth=auth_headers, - transports=['websocket'], - namespaces=['/client'], wait_timeout=10) + auth=auth_headers, + transports=['websocket'], + namespaces=['/client'], wait_timeout=100) return client except Exception as e: - print("Exception:{}, Unable to connect to ravsock. Make sure you are using the right hostname and port".format(e)) - exit() + g.logger.error("Error: Unable to connect to Ravenverse. " + "Make sure you are using the right hostname and port. \n{}".format(e)) + client.disconnect() + os._exit(1) @Singleton @@ -41,18 +39,25 @@ def __init__(self): self._client = None self._timeoutId = None self._ops = {} - self._opTimeout = 50 - self._initialTimeout = 100 + self._opTimeout = 5000 + self._initialTimeout = 5000 self._outputs = {} self._ftp_client = None self._delete_files_list = [] self._has_subgraph = False + self._is_downloading = False + self._is_uploading = False + self._noop_counter = 0 self._ftp_upload_blocksize = 8192 self._ftp_download_blocksize = 8192 + self._ping_timeout_counter = 0 self._error = False self._ravenverse_token = None self._logger = get_logger() self._dashboard_data = [['Subgraph ID', 'Graph ID', 'Status']] + self._ravdb = DBManager() + self._ravdb.logger = self._logger + self._socket_client = self.get_socket_client() @property def timeoutId(self): @@ -96,12 +101,17 @@ def ops(self, ops): @property def client(self): - if self._client is not None: - return self._client + return self._client + + def get_socket_client(self): + from .socket import SocketClient + self._socket_client = SocketClient(self.logger) + self._client = self._socket_client.client + return self._socket_client - if self._client is None: - self._client = get_client(self._ravenverse_token) - return self._client + def connect_socket_client(self): + self._socket_client.connect(self._ravenverse_token) + self._client = self._socket_client.client @property def ftp_client(self): @@ -127,6 +137,38 @@ def has_subgraph(self): def has_subgraph(self, has_subgraph): self._has_subgraph = has_subgraph + @property + def is_downloading(self): + return self._is_downloading + + @is_downloading.setter + def is_downloading(self, is_downloading): + self._is_downloading = is_downloading + + @property + def is_uploading(self): + return self._is_uploading + + @is_uploading.setter + def is_uploading(self, is_uploading): + self._is_uploading = is_uploading + + @property + def noop_counter(self): + return self._noop_counter + + @noop_counter.setter + def noop_counter(self, noop_counter): + self._noop_counter = noop_counter + + @property + def ping_timeout_counter(self): + return self._ping_timeout_counter + + @ping_timeout_counter.setter + def ping_timeout_counter(self, ping_timeout_counter): + self._ping_timeout_counter = ping_timeout_counter + @outputs.setter def outputs(self, outputs): self._outputs = outputs @@ -159,4 +201,9 @@ def dashboard_data(self): def dashboard_data(self, dashboard_data): self._dashboard_data = dashboard_data + @property + def ravdb(self): + return self._ravdb + + g = Globals.Instance() diff --git a/ravpy/initialize.py b/ravpy/initialize.py index c681096..56876fd 100644 --- a/ravpy/initialize.py +++ b/ravpy/initialize.py @@ -1,29 +1,37 @@ import atexit +import os from .globals import g +from .utils import isLatestVersion def exit_handler(): - g.logger.debug('Application is Closing!') + g.logger.debug('Application is closing!') if g.client is not None: g.logger.debug("Disconnecting...") if g.client.connected: g.client.emit("disconnect", namespace="/client") - else: - g.logger.debug("No Client Found.") atexit.register(exit_handler) def initialize(ravenverse_token): + # g.logger.debug("Checking Version of Ravpy...") + # + # if not isLatestVersion('ravpy'): + # g.logger.debug("Please update ravpy to latest version...") + # os._exit(1) + g.logger.debug("Initializing...") g.ravenverse_token = ravenverse_token - '''Add Token Authorization code here.''' + g.connect_socket_client() client = g.client - if client is None: + if not client.connected: g.client.disconnect() - raise Exception("Unable to connect to Ravsock. Make sure you are using the right hostname and port") - - return client + g.logger.error("Unable to connect to ravenverse. Make sure you are using the right hostname and port") + return None + else: + g.logger.debug("Initialized successfully\n") + return client diff --git a/ravpy/socket/__init__.py b/ravpy/socket/__init__.py new file mode 100644 index 0000000..9753504 --- /dev/null +++ b/ravpy/socket/__init__.py @@ -0,0 +1 @@ +from .socket_client import SocketClient diff --git a/ravpy/socket/socket_client.py b/ravpy/socket/socket_client.py new file mode 100644 index 0000000..8c2432f --- /dev/null +++ b/ravpy/socket/socket_client.py @@ -0,0 +1,61 @@ +import socketio +from socketio.exceptions import ConnectionError +from ravpy.config import RAVENVERSE_URL, TYPE, RAVENVERSE_FTP_URL + + +class SocketNamespace(socketio.ClientNamespace): + def __init__(self, namespace, logger): + super().__init__(namespace) + self.logger = logger + + def on_connect(self): + self.logger.debug('Connected to Ravenverse successfully!') + + def on_disconnect(self): + self.logger.debug('Disconnected from the server') + + def on_message(self, data): + self.logger.debug('Message received:', data) + + def on_result(self, data): + self.logger.debug(data) + + def on_connect_error(self, e): + self.logger.debug("Error:{}".format(str(e))) + + +class SocketClient(object): + def __init__(self, logger): + self._client = socketio.Client(logger=False, request_timeout=100, engineio_logger=False) + self._client.register_namespace(SocketNamespace('/client', logger)) + self.logger = logger + + def set_logger(self, logger): + self.logger = logger + + def connect(self, token): + self.logger.debug("Connecting to Ravenverse...") + self.logger.debug("Ravenverse url: {}?type={}".format(RAVENVERSE_URL, TYPE)) + self.logger.debug("Ravenverse FTP host: {}".format(RAVENVERSE_FTP_URL)) + auth_headers = {"token": token} + + try: + self._client.connect(url="{}?type={}".format(RAVENVERSE_URL, TYPE), + auth=auth_headers, + transports=['websocket'], + namespaces=['/client'], wait_timeout=100) + except ConnectionError as e: + self.logger.error("Error: Unable to connect to Ravenverse. " + "Make sure you are using the right hostname and port. \n{}".format(e)) + self._client.disconnect() + except Exception as e: + self.logger.error("Error: Unable to connect to Ravenverse. " + "Make sure you are using the right hostname and port. \n{}".format(e)) + self._client.disconnect() + + @property + def client(self): + return self._client + + def disconnect(self): + self._client.disconnect() diff --git a/ravpy/utils.py b/ravpy/utils.py index 27a64c5..ff59072 100644 --- a/ravpy/utils.py +++ b/ravpy/utils.py @@ -1,22 +1,54 @@ +import ast import os import pickle as pkl +import speedtest +import time + +from .config import ENCRYPTION, RAVENVERSE_FTP_URL, RAVENAUTH_TOKEN_VERIFY_URL + +if ENCRYPTION: + import tenseal as ts + +from .ftp import get_client as get_ftp_client import shutil import numpy as np import requests from terminaltables import AsciiTable from .config import ENCRYPTION + if ENCRYPTION: import tenseal as ts from .config import BASE_DIR, CONTEXT_FOLDER, RAVENVERSE_URL, FTP_TEMP_FILES_FOLDER from threading import Timer from .globals import g +import json +import urllib.request +from pip._internal.operations.freeze import freeze + +def isLatestVersion(pkgName): + # Get the currently installed version + current_version = '' + for requirement in freeze(local_only=False): + pkg = requirement.split('==') + if pkg[0] == pkgName: + current_version = pkg[1] + # Check pypi for the latest version number + contents = urllib.request.urlopen('https://pypi.org/pypi/'+pkgName+'/json').read() + data = json.loads(contents) + latest_version = data['info']['version'] + # print(‘Current version of ‘+pkgName+’ is ’+current_version) + # print(‘Latest version of ‘+pkgName+’ is ’+latest_version) + return latest_version == current_version def download_file(url, file_name): + g.logger.debug("Downloading benchmark data") headers = {"token": g.ravenverse_token} with requests.get(url, stream=True, headers=headers) as r: with open(file_name, 'wb') as f: shutil.copyfileobj(r.raw, f) + g.logger.debug("Benchmark data downloaded") + def get_key(val, dict): for key, value in dict.items(): @@ -24,6 +56,7 @@ def get_key(val, dict): return key return "key doesn't exist" + def analyze_data(data): rank = len(np.array(data).shape) @@ -34,6 +67,7 @@ def analyze_data(data): else: return {"rank": rank, "dtype": np.array(data).dtype.__class__.__name__} + def dump_context(context, cid): filename = "context_{}.txt".format(cid) fpath = os.path.join(BASE_DIR, filename) @@ -42,22 +76,35 @@ def dump_context(context, cid): return filename, fpath + def load_context(file_path): with open(file_path, "rb") as f: return ts.context_from(f.read()) + def fetch_and_load_context(client, context_filename): client.download(os.path.join(CONTEXT_FOLDER, context_filename), context_filename) ckks_context = load_context(os.path.join(CONTEXT_FOLDER, context_filename)) return ckks_context + def get_ftp_credentials(): - # Get + """ + Fetch ftp credentials + :return: json response + """ + g.logger.debug("Fetching ftp credentials...") headers = {"token": g.ravenverse_token} r = requests.get(url="{}/client/ftp_credentials/".format(RAVENVERSE_URL), headers=headers) if r.status_code == 200: + g.logger.debug("Credentials fetched successfully") return r.json() - return None + else: + g.logger.debug("Unable to fetch ftp credentials. Try again after some time or " + "contact our team at team@ravenprotocol.com") + g.logger.debug(r.text) + return None + def get_graph(graph_id): # Get graph @@ -68,6 +115,7 @@ def get_graph(graph_id): return r.json() return None + def get_federated_graph(graph_id): # Get graph g.logger.debug("get_federated_graph") @@ -77,10 +125,11 @@ def get_federated_graph(graph_id): return r.json() return None + def list_graphs(approach=None): # Get graphs headers = {"token": g.ravenverse_token} - r = requests.get(url="{}/graph/get/all/?approach={}".format(RAVENVERSE_URL,approach), headers=headers) + r = requests.get(url="{}/graph/get/all/?approach={}".format(RAVENVERSE_URL, approach), headers=headers) if r.status_code != 200: return None @@ -95,13 +144,15 @@ def list_graphs(approach=None): return graphs + def print_graphs(graphs): g.logger.debug("\nGraphs") for graph in graphs: g.logger.debug("\nGraph id:{}\n" - "Name:{}\n" - "Approach:{}\n" - "Rules:{}".format(graph['id'], graph['name'], graph['approach'], graph['rules'])) + "Name:{}\n" + "Approach:{}\n" + "Rules:{}".format(graph['id'], graph['name'], graph['approach'], graph['rules'])) + def get_subgraph_ops(graph_id): # Get subgraph ops @@ -111,10 +162,12 @@ def get_subgraph_ops(graph_id): return r.json()['subgraph_ops'] return None + def get_rank(data): rank = len(np.array(data).shape) return rank + def apply_rules(data_columns, rules, final_column_names): data_silo = [] @@ -136,16 +189,19 @@ def apply_rules(data_columns, rules, final_column_names): data_silo.append(data_column_values) return data_silo + def setTimeout(fn, ms, *args, **kwargs): timeoutId = Timer(ms / 1000., fn, args=args, kwargs=kwargs) timeoutId.start() return timeoutId + def stopTimer(timeoutId): # print("Timer stopped") if timeoutId is not None: timeoutId.cancel() + def dump_data(op_id, value): """ Dump ndarray to file @@ -158,6 +214,7 @@ def dump_data(op_id, value): pkl.dump(value, f) return file_path + def load_data(path): """ Load ndarray from file @@ -165,3 +222,81 @@ def load_data(path): with open(path, 'rb') as f: data = pkl.load(f) return np.array(data) + + +def initialize_ftp_client(): + credentials = get_ftp_credentials() + + if credentials is None: + return None + + creds = ast.literal_eval(credentials['ftp_credentials']) + time.sleep(2) + + try: + g.logger.debug("") + g.logger.debug("Testing network speed...") + if RAVENVERSE_FTP_URL != 'localhost' and RAVENVERSE_FTP_URL != '0.0.0.0': + wifi = speedtest.Speedtest() + upload_speed = int(wifi.upload()) + download_speed = int(wifi.download()) + upload_speed = upload_speed / 8 + download_speed = download_speed / 8 + if upload_speed <= 3000000: + upload_multiplier = 1 + elif upload_speed < 80000000: + upload_multiplier = int((upload_speed / 80000000) * 1000) + else: + upload_multiplier = 1000 + + if download_speed <= 3000000: + download_multiplier = 1 + elif download_speed < 80000000: + download_multiplier = int((download_speed / 80000000) * 1000) + else: + download_multiplier = 1000 + + g.ftp_upload_blocksize = 8192 * upload_multiplier + g.ftp_download_blocksize = 8192 * download_multiplier + + else: + g.ftp_upload_blocksize = 8192 * 1000 + g.ftp_download_blocksize = 8192 * 1000 + + except Exception as e: + g.ftp_upload_blocksize = 8192 * 1000 + g.ftp_download_blocksize = 8192 * 1000 + + g.logger.debug("FTP Upload Blocksize:{}".format(g.ftp_upload_blocksize)) + g.logger.debug("FTP Download Blocksize: {}\n".format(g.ftp_download_blocksize)) + + """ + Create ftp client + """ + g.ftp_client = get_ftp_client(creds['username'], creds['password']) + + return g.ftp_client + + +def verify_token(token): + """ + Verify user token + :param token: token + :return: valid or not + """ + r = requests.post(RAVENAUTH_TOKEN_VERIFY_URL, data={"token": token}) + if r.status_code != 200: + g.logger.debug("Error:{}".format(r.text)) + return False + else: + return True + + +def disconnect(): + if g.client.connected: + g.logger.debug("Disconnecting...") + if g.client.connected: + g.client.emit("disconnect", namespace="/client") + g.logger.debug("Disconnected") + + return True diff --git a/requirements.txt b/requirements.txt index 39c2dab..4ac95ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,13 @@ scipy speedtest-cli terminaltables==3.1.10 websocket-client -scikit-learn==1.1.1 \ No newline at end of file +scikit-learn +eel +psutil +hurry.filesize +sqlalchemy +sqlalchemy-utils +pillow +tinyaes +pyinstaller[encryption] +dmgbuild diff --git a/run_distributed_client.py b/run_distributed_client.py index 640cb13..19997c3 100644 --- a/run_distributed_client.py +++ b/run_distributed_client.py @@ -4,9 +4,12 @@ load_dotenv() -from ravpy.distributed.participate import participate -from ravpy.initialize import initialize +from ravpy import participate +from ravpy import initialize if __name__ == '__main__': client = initialize(os.environ.get("TOKEN")) + if not client.connected: + os._exit(1) + participate() diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..fd0b08c --- /dev/null +++ b/settings.py @@ -0,0 +1,266 @@ +import os.path + +# +# Example settings file for dmgbuild +# + +# Use like this: dmgbuild -s settings.py "Test Volume" test.dmg + +# You can actually use this file for your own application (not just TextEdit) +# by doing e.g. +# +# dmgbuild -s settings.py -D app=/path/to/My.app "My Application" MyApp.dmg + +# .. Useful stuff .............................................................. +import plistlib + +application = defines.get("app", "dist/ui2.app") # noqa: F821 +appname = os.path.basename(application) + + +def icon_from_app(app_path): + plist_path = os.path.join(app_path, "Contents", "Info.plist") + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + icon_name = plist["CFBundleIconFile"] + icon_root, icon_ext = os.path.splitext(icon_name) + if not icon_ext: + icon_ext = ".icns" + icon_name = icon_root + icon_ext + return os.path.join(app_path, "Contents", "Resources", icon_name) + + +# .. Basics .................................................................... + +# Uncomment to override the output filename +# filename = 'test.dmg' + +# Uncomment to override the output volume name +# volume_name = 'Test' + +# Volume format (see hdiutil create -help) +format = defines.get("format", "UDBZ") # noqa: F821 + +# Compression level (if relevant) +# compression_level = 9 + +# Volume size +size = defines.get("size", None) # noqa: F821 + +# Files to include +files = [application] + +# Symlinks to create +symlinks = {"Applications": "/Applications"} + +# Files to hide +# hide = [ 'Secret.data' ] + +# Files to hide the extension of +# hide_extension = [ 'README.rst' ] + +# Volume icon +# +# You can either define icon, in which case that icon file will be copied to the +# image, *or* you can define badge_icon, in which case the icon file you specify +# will be used to badge the system's Removable Disk icon. Badge icons require +# pyobjc-framework-Quartz. +# +# icon = '/path/to/icon.icns' +badge_icon = icon_from_app(application) + +# Where to put the icons +icon_locations = {appname: (140, 120), "Applications": (500, 120)} + +# .. Window configuration ...................................................... + +# Background +# +# This is a STRING containing any of the following: +# +# #3344ff - web-style RGB color +# #34f - web-style RGB color, short form (#34f == #3344ff) +# rgb(1,0,0) - RGB color, each value is between 0 and 1 +# hsl(120,1,.5) - HSL (hue saturation lightness) color +# hwb(300,0,0) - HWB (hue whiteness blackness) color +# cmyk(0,1,0,0) - CMYK color +# goldenrod - X11/SVG named color +# builtin-arrow - A simple built-in background with a blue arrow +# /foo/bar/baz.png - The path to an image file +# +# The hue component in hsl() and hwb() may include a unit; it defaults to +# degrees ('deg'), but also supports radians ('rad') and gradians ('grad' +# or 'gon'). +# +# Other color components may be expressed either in the range 0 to 1, or +# as percentages (e.g. 60% is equivalent to 0.6). +background = "builtin-arrow" + +show_status_bar = False +show_tab_view = False +show_toolbar = False +show_pathbar = False +show_sidebar = False +sidebar_width = 180 + +# Window position in ((x, y), (w, h)) format +window_rect = ((100, 100), (640, 280)) + +# Select the default view; must be one of +# +# 'icon-view' +# 'list-view' +# 'column-view' +# 'coverflow' +# +default_view = "icon-view" + +# General view configuration +show_icon_preview = False + +# Set these to True to force inclusion of icon/list view settings (otherwise +# we only include settings for the default view) +include_icon_view_settings = "auto" +include_list_view_settings = "auto" + +# .. Icon view configuration ................................................... + +arrange_by = None +grid_offset = (0, 0) +grid_spacing = 100 +scroll_position = (0, 0) +label_pos = "bottom" # or 'right' +text_size = 16 +icon_size = 128 + +# .. List view configuration ................................................... + +# Column names are as follows: +# +# name +# date-modified +# date-created +# date-added +# date-last-opened +# size +# kind +# label +# version +# comments +# +list_icon_size = 16 +list_text_size = 12 +list_scroll_position = (0, 0) +list_sort_by = "name" +list_use_relative_dates = True +list_calculate_all_sizes = (False,) +list_columns = ("name", "date-modified", "size", "kind", "date-added") +list_column_widths = { + "name": 300, + "date-modified": 181, + "date-created": 181, + "date-added": 181, + "date-last-opened": 181, + "size": 97, + "kind": 115, + "label": 100, + "version": 75, + "comments": 300, +} +list_column_sort_directions = { + "name": "ascending", + "date-modified": "descending", + "date-created": "descending", + "date-added": "descending", + "date-last-opened": "descending", + "size": "descending", + "kind": "ascending", + "label": "ascending", + "version": "ascending", + "comments": "ascending", +} + +# .. License configuration ..................................................... + +# Text in the license configuration is stored in the resources, which means +# it gets stored in a legacy Mac encoding according to the language. dmgbuild +# will *try* to convert Unicode strings to the appropriate encoding, *but* +# you should be aware that Python doesn't support all of the necessary encodings; +# in many cases you will need to encode the text yourself and use byte strings +# instead here. + +# Recognized language names are: +# +# af_ZA, ar, be_BY, bg_BG, bn, bo, br, ca_ES, cs_CZ, cy, da_DK, de_AT, de_CH, +# de_DE, dz_BT, el_CY, el_GR, en_AU, en_CA, en_GB, en_IE, en_SG, en_US, eo, +# es_419, es_ES, et_EE, fa_IR, fi_FI, fo_FO, fr_001, fr_BE, fr_CA, fr_CH, +# fr_FR, ga-Latg_IE, ga_IE, gd, grc, gu_IN, gv, he_IL, hi_IN, hr_HR, hu_HU, +# hy_AM, is_IS, it_CH, it_IT, iu_CA, ja_JP, ka_GE, kl, ko_KR, lt_LT, lv_LV, +# mk_MK, mr_IN, mt_MT, nb_NO, ne_NP, nl_BE, nl_NL, nn_NO, pa, pl_PL, pt_BR, +# pt_PT, ro_RO, ru_RU, se, sk_SK, sl_SI, sr_RS, sv_SE, th_TH, to_TO, tr_TR, +# uk_UA, ur_IN, ur_PK, uz_UZ, vi_VN, zh_CN, zh_TW + +license = { + "default-language": "en_US", + "licenses": { + # For each language, the text of the license. This can be plain text, + # RTF (in which case it must start "{\rtf1"), or a path to a file + # containing the license text. If you're using RTF, + # watch out for Python escaping (or read it from a file). + "en_GB": b"""{\\rtf1\\ansi\\ansicpg1252\\cocoartf1504\\cocoasubrtf820 + {\\fonttbl\\f0\\fnil\\fcharset0 Helvetica-Bold;\\f1\\fnil\\fcharset0 Helvetica;} + {\\colortbl;\\red255\\green255\\blue255;\\red0\\green0\\blue0;} + {\\*\\expandedcolortbl;;\\cssrgb\\c0\\c0\\c0;} + \\paperw11905\\paperh16837\\margl1133\\margr1133\\margb1133\\margt1133 + \\deftab720 + \\pard\\pardeftab720\\sa160\\partightenfactor0 + + \\f0\\b\\fs60 \\cf2 \\expnd0\\expndtw0\\kerning0 + \\up0 \\nosupersub \\ulnone \\outl0\\strokewidth0 \\strokec2 Test License\\ + \\pard\\pardeftab720\\sa160\\partightenfactor0 + + \\fs36 \\cf2 \\strokec2 What is this?\\ + \\pard\\pardeftab720\\sa160\\partightenfactor0 + + \\f1\\b0\\fs22 \\cf2 \\strokec2 This is the English license. It says what you are allowed to do with this software.\\ + \\ + }""", + "de_DE": "Ich bin ein Berliner. Bielefeld gibt's doch gar nicht.", + }, + "buttons": { + # For each language, text for the buttons on the licensing window. + # + # Default buttons and text are built-in for the following languages: + # + # da_DK: Danish + # de_DE: German + # en_AU: English (Australian) + # en_GB: English (UK) + # en_NZ: English (New Zealand) + # en_US: English (US) + # es_ES: Spanish + # fr_CA: French (Canadian) + # fr_FR: French + # it_IT: Italian + # ja_JP: Japanese + # nb_NO: Norsk + # nl_BE: Flemish + # nl_NL: Dutch + # pt_BR: Brazilian Portuguese + # pt_PT: Portugese + # sv_SE: Swedish + # zh_CN: Simplified Chinese + # zh_TW: Traditional Chinese + # + # You don't need to specify them for those languages; if you fail to + # specify them for some other language, English will be used instead. + "en_US": ( + b"English", + b"Agree!", + b"Disagree!", + b"Print!", + b"Save!", + b'Do you agree or not? Press "Agree" or "Disagree".', + ), + }, +} diff --git a/setup.py b/setup.py index 55f6988..9a971ee 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name="ravpy", - version="0.6", + version="0.12", license='MIT', author="Raven Protocol", author_email='kailash@ravenprotocol.com', @@ -24,10 +24,16 @@ "python-socketio==5.4.1", "requests==2.27.1", "python-dotenv", - "scipy", "speedtest-cli", "terminaltables==3.1.10", "websocket-client", - "scikit-learn==1.1.1" - ] + "pyinstaller", + "scikit-learn", + "psutil", + "hurry.filesize", + "sqlalchemy", + "sqlalchemy-utils", + ], + app=["gui.py"], + setup_requires=["py2app"], ) diff --git a/ui.py b/ui.py new file mode 100644 index 0000000..353e436 --- /dev/null +++ b/ui.py @@ -0,0 +1,70 @@ +from dotenv import load_dotenv + +load_dotenv() + +import tkinter as tk +from ravpy.utils import verify_token +from ravpy.distributed.participate import participate +from ravpy.initialize import initialize + + +class RavpyUI(object): + def __init__(self): + self.root = tk.Tk() + self.root.title('Ravpy') + + self.signin_canvas = None + self.entry_token = None + self.dashboard_canvas = None + + self.create_signin_canvas() + + def create_signin_canvas(self): + self.signin_canvas = tk.Canvas(self.root, width=500, height=500) + self.signin_canvas.pack() + + signin_label = tk.Label(self.root, text='Sign in', fg='black', font=('helvetica', 16, 'bold')) + enter_token_label = tk.Label(self.root, text='Enter Token:', fg='black', font=('helvetica', 14)) + + token_string = tk.StringVar() + self.entry_token = tk.Entry(self.root, width=30, textvariable=token_string, font=("helvetica", 20)) + self.entry_token.pack(padx=10, pady=10) + + signin_button = tk.Button(text='Submit', command=self.signin, bg='brown', fg='black') + + self.signin_canvas.create_window(250, 150, window=signin_label) + self.signin_canvas.create_window(250, 200, window=enter_token_label) + self.signin_canvas.create_window(250, 250, window=self.entry_token) + self.signin_canvas.create_window(250, 300, window=signin_button) + + def create_dashboard_canvas(self, token): + self.dashboard_canvas = tk.Canvas(self.root, width=500, height=500) + self.dashboard_canvas.pack() + + label3 = tk.Label(self.root, text='Log\n', fg='black', font=('Arial', 14)) + self.dashboard_canvas.create_window(250, 150, window=label3) + + # canvas1.create_window(250, 375, window=label3) + label3['text'] += "Initializing...\n" + initialize(token) + label3['text'] += "Initialized\n" + participate() + label3['text'] += "Participated\n" + + def start(self): + self.root.mainloop() + + def signin(self): + token = self.entry_token.get() + if token == "": + label3 = tk.Label(self.root, text='Enter token to signin', fg='red', font=('Arial', 14)) + self.signin_canvas.create_window(250, 375, window=label3) + else: + if verify_token(token=token): + self.signin_canvas.pack_forget() + self.create_dashboard_canvas(token) + + +if __name__ == '__main__': + ui = RavpyUI() + ui.start() diff --git a/web/css/main.css b/web/css/main.css new file mode 100644 index 0000000..e69de29 diff --git a/web/img/ravenverse-logo.png b/web/img/ravenverse-logo.png new file mode 100644 index 0000000..1deaa6b Binary files /dev/null and b/web/img/ravenverse-logo.png differ diff --git a/web/img/ravpy-icon.png b/web/img/ravpy-icon.png new file mode 100644 index 0000000..f3180a4 Binary files /dev/null and b/web/img/ravpy-icon.png differ diff --git a/web/img/ravpy-icon.svg b/web/img/ravpy-icon.svg new file mode 100644 index 0000000..b539c4b --- /dev/null +++ b/web/img/ravpy-icon.svg @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/web/img/reload.svg b/web/img/reload.svg new file mode 100644 index 0000000..a281b0b --- /dev/null +++ b/web/img/reload.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/web/main.html b/web/main.html new file mode 100644 index 0000000..4eab7d0 --- /dev/null +++ b/web/main.html @@ -0,0 +1,387 @@ + + + + + + + + Ravpy + + + + + + + + + + + +
+ + +
+ + + +
+
+ + + + + + + \ No newline at end of file diff --git a/web/main.js b/web/main.js new file mode 100644 index 0000000..e69de29