Skip to content

Commit

Permalink
support get current version in coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Oct 21, 2024
1 parent 7eb11f9 commit 7533a49
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 7 deletions.
5 changes: 2 additions & 3 deletions apps/rdbms/insert_db_txn.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,16 @@ def process_forum(line):
insert_simple_edges("22", "/forum_hasModerator_person_0_0.csv", "forum_hasmoderator")

insert_simple_edges("23", "/forum_hasTag_tag_0_0.csv", "forum_hastag")

if True:

insert_simple_edges("24", "/person_hasInterest_tag_0_0.csv", "person_hasinterest")

insert_simple_edges("25", "/person_isLocatedIn_place_0_0.csv", "person_islocationin")

# insert edge tables with additional properties


insert_prop_edges("26", "/forum_hasMember_person_0_0.csv", "forum_hasmember")

if True:
insert_prop_edges("27", "/person_knows_person_0_0.csv", "knows")

insert_prop_edges("28", "/person_likes_comment_0_0.csv", "likes_comment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
)
return (response.text, response.status_code)


def get_datasource_by_id(graph_id): # noqa: E501
"""get_datasource_by_id
Expand Down
23 changes: 23 additions & 0 deletions coordinator/flex/server/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,29 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501
return 'do some magic!'


def get_current_chosen_version(graph_id): # noqa: E501
"""get_current_chosen_version
Get the current chosen version of the graph # noqa: E501
:param graph_id:
:type graph_id: str
:rtype: Union[GetGraphVersionResponse, Tuple[GetGraphVersionResponse, int], Tuple[GetGraphVersionResponse, int, Dict[str, str]]
"""
if graph_id != GRAPH_ID:
return (f"Graph {graph_id} does not exist...", 500)

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"

response = requests.get(f"{gart_controller_server}/get-graph-current-version")
if response.status_code != 200:
return (response.text, response.status_code)

return (GetGraphVersionResponse.from_dict(response.json()), 200)

def get_graph_all_available_versions(graph_id): # noqa: E501
"""get_graph_all_available_versions
Expand Down
4 changes: 2 additions & 2 deletions coordinator/flex/server/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def get_job_by_id(job_id): # noqa: E501
return (JobStatus.from_dict(result_dict), 200)

return (JobStatus.from_dict(result_dict), 200)


def list_jobs(): # noqa: E501
"""list_jobs
Expand Down
28 changes: 28 additions & 0 deletions coordinator/flex/server/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,34 @@ paths:
tags:
- Graph
x-openapi-router-controller: flex.server.controllers.graph_controller
/api/v1/graph/{graph_id}/version/current-version:
get:
description: Get the current chosen version of the graph
operationId: get_current_chosen_version
parameters:
- explode: false
in: path
name: graph_id
required: true
schema:
type: string
style: simple
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/GetGraphVersionResponse'
description: Successfully returned the current chosen versions of the graph
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
description: Server error
tags:
- Graph
x-openapi-router-controller: flex.server.controllers.graph_controller
/api/v1/graph/{graph_id}/version/{timestamp}:
get:
description: Get the latest version by providing a timestamp
Expand Down
15 changes: 15 additions & 0 deletions coordinator/flex/server/test/test_graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ def test_delete_vertex_type_by_name(self):
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))

def test_get_current_chosen_version(self):
"""Test case for get_current_chosen_version
"""
headers = {
'Accept': 'application/json',
}
response = self.client.open(
'/api/v1/graph/{graph_id}/version/current-version'.format(graph_id='graph_id_example'),
method='GET',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))

def test_get_graph_all_available_versions(self):
"""Test case for get_graph_all_available_versions
Expand Down
27 changes: 26 additions & 1 deletion scripts/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,28 @@ def get_read_epoch_by_timestamp():
return json.dumps({}), 200


@app.route("/get-graph-current-version", methods=["GET"])
def get_graph_current_version():
if previous_read_epoch is None:
return json.dumps({}), 200
all_epochs = get_all_available_read_epochs_internal()[0]
result = {}
for idx in range(len(all_epochs)):
if all_epochs[idx][0] == int(previous_read_epoch):
begin_time = all_epochs[idx][1]
end_time = all_epochs[idx][2]
num_vertices = all_epochs[idx][3]
num_edges = all_epochs[idx][4]
result = {
"version_id": str(previous_read_epoch),
"begin_time": begin_time,
"end_time": end_time,
"num_vertices": num_vertices,
"num_edges": num_edges,
}
return json.dumps(result), 200


@app.route("/run-gae-task", methods=["POST"])
def run_gae_task():
command = ""
Expand Down Expand Up @@ -426,7 +448,6 @@ def change_read_epoch():
return "No available read epoch", 400
global previous_read_epoch
if previous_read_epoch is None or previous_read_epoch != read_epoch:
previous_read_epoch = read_epoch
etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
Expand Down Expand Up @@ -488,6 +509,8 @@ def change_read_epoch():
):
break
time.sleep(0.5)
# make sure change read epoch successfully
previous_read_epoch = read_epoch
return f"Read version changed to version {read_epoch}", 200


Expand Down Expand Up @@ -600,6 +623,8 @@ def get_all_available_read_epochs_internal():
schema_str, _ = etcd_client.get(schema_key)
schema = json.loads(schema_str)
unix_timestamp = schema["timestamp"]
num_vertices += schema["total_vertex_num"]
num_edges += schema["total_edge_num"]
if latest_timestamp is None or unix_timestamp > latest_timestamp:
latest_timestamp = unix_timestamp
converted_time = datetime.fromtimestamp(latest_timestamp)
Expand Down
3 changes: 3 additions & 0 deletions vegito/src/graph/graph_ops/add_edge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void process_add_edge(const StringViewList& cmd,
dst_fid != graph_store->get_local_pid()) {
seggraph::SegGraph* ov_graph = graph_store->get_ov_graph(dst_label);
auto ov_writer = ov_graph->create_graph_writer(write_epoch);
graph_store->add_total_edge_num_by_one();
#ifdef USE_MULTI_THREADS
auto outer_vertex_label_mutex =
graph_store->get_outer_vertex_label_mutex(dst_label);
Expand Down Expand Up @@ -185,6 +186,8 @@ void process_add_edge(const StringViewList& cmd,
vertex_t dst_lid =
graph_store->id_parser.GenerateId(0, dst_label, dst_offset);

graph_store->add_total_edge_num_by_one();

// inner edges
src_writer.put_edge(src_offset, elabel, seggraph::EOUT, dst_lid, edge_data);
dst_writer.put_edge(dst_offset, elabel, seggraph::EIN, src_lid, edge_data);
Expand Down
3 changes: 2 additions & 1 deletion vegito/src/graph/graph_ops/del_edge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void process_del_edge(const StringViewList& cmd,
dst_offset = max_outer_id_offset - dst_offset_reverse;
src_graph = graph_store->get_graph<seggraph::SegGraph>(src_label);
dst_graph = graph_store->get_ov_graph(dst_label);

graph_store->del_total_edge_num_by_one();
} else if (src_fid != graph_store->get_local_pid() &&
dst_fid == graph_store->get_local_pid()) {
src_offset_reverse = graph_store->get_lid(src_label, src_vid);
Expand All @@ -83,6 +83,7 @@ void process_del_edge(const StringViewList& cmd,
dst_offset_reverse = dst_offset;
src_graph = graph_store->get_graph<seggraph::SegGraph>(src_label);
dst_graph = graph_store->get_graph<seggraph::SegGraph>(dst_label);
graph_store->del_total_edge_num_by_one();
}

{
Expand Down
2 changes: 2 additions & 0 deletions vegito/src/graph/graph_ops/del_vertex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void process_del_vertex(const StringViewList& cmd,
graph_store->delete_inner(v_label,
v_offset); // delete vertex from vertex table
src_graph->add_deleted_inner_num(1);
graph_store->del_total_vertex_num_by_one();

// delete ralated edges
auto src_writer =
Expand Down Expand Up @@ -151,6 +152,7 @@ void process_del_vertex(const StringViewList& cmd,

src_writer.put_edge(v_offset, elabel, seggraph::EOUT, dst_loc,
edge_data);
graph_store->del_total_edge_num_by_one();

if (dst_offset < graph_store->get_vtable_max_inner(
dst_label)) { // dst is an inner vertex
Expand Down
6 changes: 6 additions & 0 deletions vegito/src/graph/graph_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ void GraphStore::put_blob_json_etcd(uint64_t write_epoch) {
std::time_t currentTime = std::time(nullptr);
// put the currentTime as a int64_t into blob schema
blob_schema["timestamp"] = currentTime;
blob_schema["total_vertex_num"] =
total_vertex_num_.load(std::memory_order_relaxed);
blob_schema["total_edge_num"] =
total_edge_num_.load(std::memory_order_relaxed);
blob_schema["string_buffer_object_id"] =
string_buffer_manager_.get_buffer_oid();
auto blob_schemas = fetch_blob_schema(write_epoch);
Expand Down Expand Up @@ -532,6 +536,8 @@ bool GraphStore::insert_inner_vertex(int epoch, uint64_t gid,
return false; // not in this partition
}

add_total_vertex_num_by_one();

#ifndef USE_GLOBAL_VERTEX_MAP
// local vertex map
if (external_id_dtype_[vlabel] == PropertyDataType::LONG) {
Expand Down
10 changes: 10 additions & 0 deletions vegito/src/graph/graph_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,14 @@ class GraphStore {
return max_memory_usage_[vlabel];
}

void add_total_vertex_num_by_one() { total_vertex_num_++; }

void add_total_edge_num_by_one() { total_edge_num_++; }

void del_total_vertex_num_by_one() { total_vertex_num_--; }

void del_total_edge_num_by_one() { total_edge_num_--; }

#ifdef USE_MULTI_THREADS
std::shared_timed_mutex* get_vertex_label_mutex(uint64_t vlabel) {
return vertex_label_mutexes_[vlabel];
Expand Down Expand Up @@ -533,6 +541,8 @@ class GraphStore {
const int local_pnum_; // number of partitions in the machine
const int total_partitions_; // total number of partitions
int total_vertex_label_num_;
std::atomic<size_t> total_vertex_num_{0};
std::atomic<size_t> total_edge_num_{0};

SparseArrayAllocator array_allocator_;

Expand Down

0 comments on commit 7533a49

Please sign in to comment.