Skip to content

Commit

Permalink
Add gart cli
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Jul 1, 2024
1 parent decd7d3 commit 5b4dbf3
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 12 deletions.
2 changes: 1 addition & 1 deletion charts/gart/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ data:
{{- else }}
SECONDS_PER_EPOCH: "60"
{{- end }}
{{- if not .Values.analyzer.enabled }}
{{- if not .Values.dataconfig.useGAE }}
GIE_EXECUTOR_POD_BASE_NAME: {{ include "gart.writer.fullname" . | quote }}
GIE_EXECUTOR_POD_SERVICE_NAME: {{ include "gart.fullname" . }}-gie-executor-service
GIE_EXECUTOR_POD_SERVICE_PORT: {{ .Values.gie_executor.HTTP_SERVICE_PORT | quote }}
Expand Down
2 changes: 2 additions & 0 deletions charts/gart/templates/gie_frontend/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if not .Values.dataconfig.useGAE }}
{{- $etcd_service_name := .Values.etcd.fullnameOverride }}
{{- $etcd_service_port := int .Values.etcd.containerPorts.client }}
{{- $etcd_service := printf "%s:%d" $etcd_service_name $etcd_service_port }}
Expand Down Expand Up @@ -40,3 +41,4 @@ spec:



{{- end }}
4 changes: 3 additions & 1 deletion charts/gart/templates/gie_frontend/svc.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if not .Values.dataconfig.useGAE }}
apiVersion: v1
kind: Service
metadata:
Expand All @@ -12,4 +13,5 @@ spec:
ports:
- protocol: TCP
port: {{ .Values.gie_frontend.gremlinPort }}
targetPort: gremlin
targetPort: gremlin
{{- end }}
2 changes: 1 addition & 1 deletion charts/gart/templates/writer/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ spec:
volumeMounts:
- name: shared-socket
mountPath: /tmp/shared
{{- if .Values.analyzer.enabled }}
{{- if .Values.dataconfig.useGAE }}
- name: analyzer
image: {{ include "gart.analyzer.image" . }}
imagePullPolicy: {{ .Values.analyzer.image.pullPolicy | quote }}
Expand Down
2 changes: 1 addition & 1 deletion charts/gart/templates/writer/svc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- if not .Values.analyzer.enabled }}
{{- if not .Values.dataconfig.useGAE }}
apiVersion: v1
kind: Service
metadata:
Expand Down
9 changes: 7 additions & 2 deletions charts/gart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ writer:

# analyzer config
analyzer:
enabled: false
image:
repository: gart-analyzer
tag: latest
Expand Down Expand Up @@ -237,11 +236,17 @@ dataconfig:
dbPassword: "123456"
dbType: "postgresql"
dbName: "ldbc"
#optional
v6dSocket: "/tmp/shared/v6d.sock"
#optional
v6dSize: "750G"
#optional
etcdPrefix: "gart_meta_"
enableBulkload: 1
# if you want to need an epoch after a give time interval, set useSecondsPerEpoch to 1
# if you want to need an epoch after a give time interval, set useSecondsPerEpoch to 1.
# If not set, use logsPerEpoch
# useSecondsPerEpoch: 1
logsPerEpoch: 10000
# secondsPerEpoch: 60
# if you run GAE task, set useGAE to 1, otherwise run GIE task. Default run GIE task
useGAE: 0
58 changes: 52 additions & 6 deletions scripts/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

from flask import Flask, request
from flask import Flask, request, jsonify
import subprocess
import os
from kubernetes import client, config
Expand All @@ -18,6 +18,34 @@
previous_read_epoch = None


@app.route("/submit-config", methods=["POST"])
def submit_config():
if "file" not in request.files:
return jsonify({"error": "No file part in the request"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "No selected file"}), 400

try:
content = file.read()
etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith("http://"):
etcd_server = f"http://{etcd_server}"
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")
etcd_host = etcd_server.split("://")[1].split(":")[0]
etcd_port = etcd_server.split(":")[2]
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
while True:
try:
etcd_client.put(etcd_prefix + "gart_rg_mapping_yaml", content)
break
except Exception as e:
time.sleep(5)
return "Config submitted", 200
except Exception as e:
return jsonify({"error": str(e)}), 400


@app.route("/control/pause", methods=["POST"])
def pause():
subprocess.run(
Expand Down Expand Up @@ -72,8 +100,21 @@ def get_read_epoch_by_timestamp():
@app.route("/run-gae-task", methods=["POST"])
def run_gae_task():
command = ""
for key, value in request.form.items():
command += f"--{key} {value} "
data = request.json
algorithm_name = data.get("algorithm_name")
graph_version = data.get("graph_version")
latest_epoch = get_latest_read_epoch()
if int(graph_version) > latest_epoch:
return "Invalid read epoch", 400
if latest_epoch == 2**64 - 1:
return "No available read epoch", 400
command += f"--app_name {algorithm_name} "
command += f"--read_epoch {graph_version} "
for key, value in data.items():
if key not in ["algorithm_name", "graph_version"]:
command += f"--{key} {value} "
# for key, value in request.form.items():
# command += f"--{key} {value} "
etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith("http://"):
etcd_server = f"http://{etcd_server}"
Expand All @@ -91,6 +132,8 @@ def change_read_epoch():
latest_epoch = get_latest_read_epoch()
if int(read_epoch) > latest_epoch:
return "Invalid read epoch", 400
if latest_epoch == 2**64 - 1:
return "No available read epoch", 400
global previous_read_epoch
if previous_read_epoch is None or previous_read_epoch != read_epoch:
previous_read_epoch = read_epoch
Expand Down Expand Up @@ -287,9 +330,12 @@ def get_latest_read_epoch():
latest_epoch = 2**64 - 1
for idx in range(int(num_fragment)):
etcd_key = etcd_prefix + "gart_latest_epoch_p" + str(idx)
etcd_value, _ = etcd_client.get(etcd_key)
if latest_epoch > int(etcd_value):
latest_epoch = int(etcd_value)
try:
etcd_value, _ = etcd_client.get(etcd_key)
if latest_epoch > int(etcd_value):
latest_epoch = int(etcd_value)
except Exception as e:
print(f"Error: {e}")

return latest_epoch

Expand Down
184 changes: 184 additions & 0 deletions scripts/gart_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#!/usr/bin/env python3

import click
import os
import json
import socket
import requests
from urllib.parse import urlparse

CONFIG_FILE_PATH = "/tmp/gart_cli_config.json"


def save_config(config):
with open(CONFIG_FILE_PATH, "w") as f:
json.dump(config, f)


def load_config():
if os.path.exists(CONFIG_FILE_PATH):
with open(CONFIG_FILE_PATH, "r") as f:
try:
return json.load(f)
except json.JSONDecodeError:
return {}
return {}


@click.group()
@click.pass_context
def cli(ctx):
"""System Manager CLI"""
# Load the configuration file and store it in the context
ctx.ensure_object(dict)
ctx.obj = load_config()


@cli.command()
@click.pass_context
@click.argument("endpoint", required=True, type=str)
def connect(ctx, endpoint):
"""Connect to a new service endpoint."""
# Save the endpoint to the configuration file
if not endpoint.startswith(("http://", "https://")):
endpoint = "http://" + endpoint
ctx.obj["endpoint"] = endpoint
save_config(ctx.obj)
# check if the endpoint is reachable
parsed_url = urlparse(endpoint)
host = parsed_url.netloc.split(":")[0]
port = parsed_url.port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.settimeout(1)
s.connect((host, port))
click.echo(f"Connected to {endpoint}")
s.close()
except socket.error as e:
click.echo(f"Failed to connect to {endpoint}: {e}")
return


@cli.command()
@click.pass_context
def resume_data_loading(ctx):
"""Resume data loading process."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return

response = requests.post(f"{endpoint}/control/resume")
click.echo(f"Resumed data loading: {response.text}")


@cli.command()
@click.pass_context
def pause_data_loading(ctx):
"""Pause data loading process."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return

response = requests.post(f"{endpoint}/control/pause")
click.echo(f"Paused data loading: {response.text}")


@cli.command()
@click.pass_context
def get_all_available_versions(ctx):
"""Get all available versions of the graph at the moment."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return

response = requests.post(f"{endpoint}/get-all-available-read-epochs")
click.echo(f"Available versions: {response.text}")


@cli.command()
@click.pass_context
@click.argument("timestamp", required=True, type=str)
def get_version_by_timestamp(ctx, timestamp):
"""Get the version of the graph at the given timestamp."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return
response = requests.post(
f"{endpoint}/get-read-epoch-by-timestamp", data={"timestamp": timestamp}
)
click.echo(f"Version at {timestamp}: {response.text}")


@cli.command()
@click.pass_context
@click.argument("config_path", type=click.Path(exists=True))
def submit_config(ctx, config_path):
"""Submit a new configuration file."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return

with open(config_path, "rb") as file:
files = {"file": (config_path, file)}
try:
response = requests.post(f"{endpoint}/submit-config", files=files)
response.raise_for_status()
click.echo(f"Success: Server responded with {response.status_code} status")
except requests.exceptions.HTTPError as e:
click.echo(f"Failed to submit the configuration file: {e}")
except requests.exceptions.RequestException as e:
click.echo(f"Failed to submit the configuration file: {e}")
except Exception as e:
click.echo(f"Failed to submit the configuration file: {e}")


@cli.command()
@click.pass_context
@click.argument("algorithm_name")
@click.argument("graph_version", type=str, required=True)
@click.option("--arg", multiple=True, type=(str, str))
def submit_gae_task(ctx, algorithm_name, graph_version, arg):
"""Submit a new GAE task."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return

args_dict = dict(arg)
args_dict["algorithm_name"] = algorithm_name
args_dict["graph_version"] = graph_version

response = requests.post(f"{endpoint}/run-gae-task", json=args_dict)

if response.status_code == 200:
click.echo("Algorithm executed successfully!")
click.echo(response.text)
else:
click.echo("Failed to execute algorithm!")
click.echo(f"Status code: {response.status_code}")
click.echo(response.text)


@cli.command()
@click.pass_context
@click.argument("graph_version", type=int, required=True)
def change_graph_version_gie(ctx, graph_version):
"""Change the graph version to the given version for GIE."""
endpoint = ctx.obj.get("endpoint")
if not endpoint:
click.echo('Please connect to an endpoint first using the "connect" command.')
return

response = requests.post(
f"{endpoint}/change-read-epoch", data={"read_epoch": graph_version}
)
click.echo(f"Changed graph version to {graph_version}: {response.text}")


if __name__ == "__main__":
cli(obj={})
Empty file modified scripts/k8s_deployment.py
100644 → 100755
Empty file.

0 comments on commit 5b4dbf3

Please sign in to comment.