Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Impl the file upload interface for FLEX dataloading #3557

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flex/coordinator/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gs_flex_coordinator/models/column_mapping.py
gs_flex_coordinator/models/connection.py
gs_flex_coordinator/models/connection_status.py
gs_flex_coordinator/models/deployment_info.py
gs_flex_coordinator/models/deployment_info_graphs_info_value.py
gs_flex_coordinator/models/deployment_status.py
gs_flex_coordinator/models/edge_mapping.py
gs_flex_coordinator/models/edge_mapping_destination_vertex_mappings_inner.py
Expand Down
2 changes: 1 addition & 1 deletion flex/coordinator/.openapi-generator/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.2.0
7.3.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from gs_flex_coordinator.core import client_wrapper
from gs_flex_coordinator.core import handle_api_exception
from gs_flex_coordinator import util


@handle_api_exception()
def upload_file(filestorage=None): # noqa: E501
"""upload_file

# noqa: E501

:param filestorage:
:type filestorage: str

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.upload_file(filestorage)
73 changes: 68 additions & 5 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
import datetime
import itertools
import logging
import os
import pickle
import socket
import threading
from typing import List, Union

import psutil
from gs_flex_coordinator.core.config import (CLUSTER_TYPE, INSTANCE_NAME,
SOLUTION)
from gs_flex_coordinator.core.config import (CLUSTER_TYPE,
COORDINATOR_STARTING_TIME,
DATASET_WORKSPACE, INSTANCE_NAME,
SOLUTION, WORKSPACE)
from gs_flex_coordinator.core.interactive import init_hqps_client
from gs_flex_coordinator.core.utils import encode_datetime
from gs_flex_coordinator.core.scheduler import schedule
from gs_flex_coordinator.core.utils import (GraphInfo, decode_datetimestr,
encode_datetime, get_current_time)
from gs_flex_coordinator.models import (DeploymentInfo, Graph, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
Expand All @@ -45,6 +51,34 @@
self._lock = threading.RLock()
# initialize specific client
self._client = self._initialize_client()
# graphs info
self._graphs_info = {}
# pickle path
self._pickle_path = os.path.join(WORKSPACE, "graphs_info.pickle")
# recover
self._try_to_recover_from_disk()

def _try_to_recover_from_disk(self):
try:
if os.path.exists(self._pickle_path):
logger.info("Recover graphs info from file %s", self._pickle_path)
with open(self._pickle_path, "rb") as f:
self._graphs_info = pickle.load(f)

Check warning on line 66 in flex/coordinator/gs_flex_coordinator/core/client_wrapper.py

View check run for this annotation

codefactor.io / CodeFactor

flex/coordinator/gs_flex_coordinator/core/client_wrapper.py#L66

Pickle and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue. (B301)
except Exception as e:
logger.warn("Failed to recover graphs info: %s", str(e))
# set default graph info
for g in self.list_graphs():
if g.name not in self._graphs_info:
self._graphs_info[g.name] = GraphInfo(
name=g.name, creation_time=COORDINATOR_STARTING_TIME
)

def _pickle_graphs_info_impl(self):
try:
with open(self._pickle_path, "wb") as f:
pickle.dump(self._graphs_info, f)
except Exception as e:
logger.warn("Failed to dump graphs info: %s", str(e))

def _initialize_client(self):
service_initializer = {"INTERACTIVE": init_hqps_client}
Expand Down Expand Up @@ -73,10 +107,18 @@
graph_dict = graph.to_dict()
if "_schema" in graph_dict:
graph_dict["schema"] = graph_dict.pop("_schema")
return self._client.create_graph(graph_dict)
rlt = self._client.create_graph(graph_dict)
self._graphs_info[graph.name] = GraphInfo(
name=graph.name, creation_time=get_current_time()
)
self._pickle_graphs_info_impl()
return rlt

def delete_graph_by_name(self, graph_name: str) -> str:
return self._client.delete_graph_by_name(graph_name)
rlt = self._client.delete_graph_by_name(graph_name)
del self._graphs_info[graph_name]
self._pickle_graphs_info_impl()
return rlt

def create_procedure(self, graph_name: str, procedure: Procedure) -> str:
procedure_dict = procedure.to_dict()
Expand Down Expand Up @@ -111,10 +153,25 @@
return rlt

def get_deployment_info(self) -> DeploymentInfo:
# update graphs info
for job in self.list_jobs():
if (
job.detail["graph_name"] in self._graphs_info
and job.end_time is not None
):
self._graphs_info[job.detail["graph_name"]].last_dataloading_time = (
decode_datetimestr(job.end_time)
)
self._pickle_graphs_info_impl()
graphs_info = {}
for name, info in self._graphs_info.items():
graphs_info[name] = info.to_dict()
info = {
"name": INSTANCE_NAME,
"cluster_type": CLUSTER_TYPE,
"version": __version__,
"solution": SOLUTION,
"graphs_info": graphs_info,
}
return DeploymentInfo.from_dict(info)

Expand Down Expand Up @@ -159,5 +216,11 @@
job_id = self._client.create_dataloading_job(graph_name, schema_mapping_dict)
return job_id

def upload_file(self, filestorage) -> str:
if CLUSTER_TYPE == "HOSTS":
filepath = os.path.join(DATASET_WORKSPACE, filestorage.filename)
filestorage.save(filepath)
return str(filepath)


client_wrapper = ClientWrapper()
10 changes: 10 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
#

import datetime
import logging
import os
import tempfile
Expand Down Expand Up @@ -63,6 +64,11 @@ def config_logging(log_level: str):
os.makedirs(ALERT_WORKSPACE, exist_ok=True)


# dataset workspace
DATASET_WORKSPACE = os.path.join(WORKSPACE, "dataset")
os.makedirs(DATASET_WORKSPACE, exist_ok=True)


# we use the solution encompasses the various applications and use cases of the
# product across different industries and business scenarios, e.g. "INTERACTIVE",
# "GRAPHSCOPE INSIGHT".
Expand All @@ -79,3 +85,7 @@ def config_logging(log_level: str):

# interactive configuration
HQPS_ADMIN_SERVICE_PORT = os.environ.get("HIACTOR_ADMIN_SERVICE_PORT", 7777)


# coordinator starting time
COORDINATOR_STARTING_TIME = datetime.datetime.now()
35 changes: 25 additions & 10 deletions flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@
from typing import List, Union

import hqps_client
from hqps_client import (Graph, JobResponse, JobStatus, ModelSchema, Procedure,
SchemaMapping, Service)

from gs_flex_coordinator.core.config import (CLUSTER_TYPE,
HQPS_ADMIN_SERVICE_PORT,
WORKSPACE)
from gs_flex_coordinator.core.utils import encode_datetime, get_internal_ip
from hqps_client import (
Graph,
JobResponse,
JobStatus,
ModelSchema,
Procedure,
SchemaMapping,
Service,
)

from gs_flex_coordinator.core.config import (
CLUSTER_TYPE,
HQPS_ADMIN_SERVICE_PORT,
WORKSPACE,
)
from gs_flex_coordinator.core.utils import (
encode_datetime,
get_internal_ip,
get_public_ip,
)
from gs_flex_coordinator.models import StartServiceRequest

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -130,13 +143,15 @@ def get_service_status(self) -> dict:
response = api_instance.get_service_status()
# transfer
if CLUSTER_TYPE == "HOSTS":
internal_ip = get_internal_ip()
host = get_public_ip()
if host is None:
host = get_internal_ip()
return {
"status": response.status,
"graph_name": response.graph_name,
"sdk_endpoints": {
"cypher": f"neo4j://{internal_ip}:{response.bolt_port}",
"hqps": f"http://{internal_ip}:{response.hqps_port}",
"cypher": f"neo4j://{host}:{response.bolt_port}",
"hqps": f"http://{host}:{response.hqps_port}",
},
}

Expand Down
67 changes: 67 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import random
import socket
import string
from typing import Union

import requests

logger = logging.getLogger("graphscope")

Expand Down Expand Up @@ -72,6 +75,10 @@ def random_string(nlen):
return "".join([random.choice(string.ascii_lowercase) for _ in range(nlen)])


def get_current_time() -> datetime.datetime:
return datetime.datetime.now()


def str_to_bool(s):
if isinstance(s, bool):
return s
Expand All @@ -82,3 +89,63 @@ def get_internal_ip() -> str:
hostname = socket.gethostname()
internal_ip = socket.gethostbyname(hostname)
return internal_ip


def get_public_ip() -> Union[str, None]:
try:
response = requests.get("https://api.ipify.org?format=json")
if response.status_code == 200:
data = response.json()
return data["ip"]
else:
return None
except requests.exceptions.RequestException as e:
logger.warn("Failed to get public ip: %s", str(e))
return None


class GraphInfo(object):
def __init__(
self, name, creation_time, update_time=None, last_dataloading_time=None
):
self._name = name
self._creation_time = creation_time
self._update_time = update_time
if self._update_time is None:
self._update_time = self._creation_time
self._last_dataloading_time = last_dataloading_time

@property
def name(self):
return self._name

@property
def creation_time(self):
return self._creation_time

@property
def update_time(self):
return self._update_time

@property
def last_dataloading_time(self):
return self._last_dataloading_time

@update_time.setter
def update_time(self, new_time):
self._update_time = new_time

@last_dataloading_time.setter
def last_dataloading_time(self, new_time):
if self._last_dataloading_time is None:
self._last_dataloading_time = new_time
elif new_time > self._last_dataloading_time:
self._last_dataloading_time = new_time

def to_dict(self):
return {
"name": self._name,
"creation_time": encode_datetime(self._creation_time),
"update_time": encode_datetime(self._update_time),
"last_dataloading_time": encode_datetime(self._last_dataloading_time),
}
1 change: 1 addition & 0 deletions flex/coordinator/gs_flex_coordinator/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gs_flex_coordinator.models.connection import Connection
from gs_flex_coordinator.models.connection_status import ConnectionStatus
from gs_flex_coordinator.models.deployment_info import DeploymentInfo
from gs_flex_coordinator.models.deployment_info_graphs_info_value import DeploymentInfoGraphsInfoValue
from gs_flex_coordinator.models.deployment_status import DeploymentStatus
from gs_flex_coordinator.models.edge_mapping import EdgeMapping
from gs_flex_coordinator.models.edge_mapping_destination_vertex_mappings_inner import EdgeMappingDestinationVertexMappingsInner
Expand Down
Loading
Loading