diff --git a/apps/rdbms/insert_db_txn.py b/apps/rdbms/insert_db_txn.py index 2f173e0..6a12158 100755 --- a/apps/rdbms/insert_db_txn.py +++ b/apps/rdbms/insert_db_txn.py @@ -510,17 +510,16 @@ def process_forum(line): insert_simple_edges("22", "/forum_hasModerator_person_0_0.csv", "forum_hasmoderator") insert_simple_edges("23", "/forum_hasTag_tag_0_0.csv", "forum_hastag") - -if True: + insert_simple_edges("24", "/person_hasInterest_tag_0_0.csv", "person_hasinterest") insert_simple_edges("25", "/person_isLocatedIn_place_0_0.csv", "person_islocationin") # insert edge tables with additional properties - insert_prop_edges("26", "/forum_hasMember_person_0_0.csv", "forum_hasmember") +if True: insert_prop_edges("27", "/person_knows_person_0_0.csv", "knows") insert_prop_edges("28", "/person_likes_comment_0_0.csv", "likes_comment") diff --git a/coordinator/flex/server/controllers/data_source_controller.py b/coordinator/flex/server/controllers/data_source_controller.py index dffd502..d871056 100644 --- a/coordinator/flex/server/controllers/data_source_controller.py +++ b/coordinator/flex/server/controllers/data_source_controller.py @@ -43,6 +43,7 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501 ) return (response.text, response.status_code) + def get_datasource_by_id(graph_id): # noqa: E501 """get_datasource_by_id diff --git a/coordinator/flex/server/controllers/graph_controller.py b/coordinator/flex/server/controllers/graph_controller.py index becd511..49c603b 100644 --- a/coordinator/flex/server/controllers/graph_controller.py +++ b/coordinator/flex/server/controllers/graph_controller.py @@ -385,6 +385,29 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501 return 'do some magic!' +def get_current_chosen_version(graph_id): # noqa: E501 + """get_current_chosen_version + + Get the current chosen version of the graph # noqa: E501 + + :param graph_id: + :type graph_id: str + + :rtype: Union[GetGraphVersionResponse, Tuple[GetGraphVersionResponse, int], Tuple[GetGraphVersionResponse, int, Dict[str, str]] + """ + if graph_id != GRAPH_ID: + return (f"Graph {graph_id} does not exist...", 500) + + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") + if not gart_controller_server.startswith(("http://", "https://")): + gart_controller_server = f"http://{gart_controller_server}" + + response = requests.get(f"{gart_controller_server}/get-graph-current-version") + if response.status_code != 200: + return (response.text, response.status_code) + + return (GetGraphVersionResponse.from_dict(response.json()), 200) + def get_graph_all_available_versions(graph_id): # noqa: E501 """get_graph_all_available_versions diff --git a/coordinator/flex/server/controllers/job_controller.py b/coordinator/flex/server/controllers/job_controller.py index 4f46e9c..d674020 100644 --- a/coordinator/flex/server/controllers/job_controller.py +++ b/coordinator/flex/server/controllers/job_controller.py @@ -87,8 +87,8 @@ def get_job_by_id(job_id): # noqa: E501 return (JobStatus.from_dict(result_dict), 200) return (JobStatus.from_dict(result_dict), 200) - - + + def list_jobs(): # noqa: E501 """list_jobs diff --git a/coordinator/flex/server/openapi/openapi.yaml b/coordinator/flex/server/openapi/openapi.yaml index de42871..f95a980 100644 --- a/coordinator/flex/server/openapi/openapi.yaml +++ b/coordinator/flex/server/openapi/openapi.yaml @@ -1897,6 +1897,34 @@ paths: tags: - Graph x-openapi-router-controller: flex.server.controllers.graph_controller + /api/v1/graph/{graph_id}/version/current-version: + get: + description: Get the current chosen version of the graph + operationId: get_current_chosen_version + parameters: + - explode: false + in: path + name: graph_id + required: true + schema: + type: string + style: simple + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/GetGraphVersionResponse' + description: Successfully returned the current chosen versions of the graph + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Server error + tags: + - Graph + x-openapi-router-controller: flex.server.controllers.graph_controller /api/v1/graph/{graph_id}/version/{timestamp}: get: description: Get the latest version by providing a timestamp diff --git a/coordinator/flex/server/test/test_graph_controller.py b/coordinator/flex/server/test/test_graph_controller.py index 0eedb73..54557de 100644 --- a/coordinator/flex/server/test/test_graph_controller.py +++ b/coordinator/flex/server/test/test_graph_controller.py @@ -163,6 +163,21 @@ def test_delete_vertex_type_by_name(self): self.assert200(response, 'Response body is : ' + response.data.decode('utf-8')) + def test_get_current_chosen_version(self): + """Test case for get_current_chosen_version + + + """ + headers = { + 'Accept': 'application/json', + } + response = self.client.open( + '/api/v1/graph/{graph_id}/version/current-version'.format(graph_id='graph_id_example'), + method='GET', + headers=headers) + self.assert200(response, + 'Response body is : ' + response.data.decode('utf-8')) + def test_get_graph_all_available_versions(self): """Test case for get_graph_all_available_versions diff --git a/scripts/controller.py b/scripts/controller.py index 3336f8b..8321acb 100755 --- a/scripts/controller.py +++ b/scripts/controller.py @@ -387,6 +387,28 @@ def get_read_epoch_by_timestamp(): return json.dumps({}), 200 +@app.route("/get-graph-current-version", methods=["GET"]) +def get_graph_current_version(): + if previous_read_epoch is None: + return json.dumps({}), 200 + all_epochs = get_all_available_read_epochs_internal()[0] + result = {} + for idx in range(len(all_epochs)): + if all_epochs[idx][0] == int(previous_read_epoch): + begin_time = all_epochs[idx][1] + end_time = all_epochs[idx][2] + num_vertices = all_epochs[idx][3] + num_edges = all_epochs[idx][4] + result = { + "version_id": str(previous_read_epoch), + "begin_time": begin_time, + "end_time": end_time, + "num_vertices": num_vertices, + "num_edges": num_edges, + } + return json.dumps(result), 200 + + @app.route("/run-gae-task", methods=["POST"]) def run_gae_task(): command = "" @@ -426,7 +448,6 @@ def change_read_epoch(): return "No available read epoch", 400 global previous_read_epoch if previous_read_epoch is None or previous_read_epoch != read_epoch: - previous_read_epoch = read_epoch etcd_server = os.getenv("ETCD_SERVICE", "etcd") if not etcd_server.startswith(("http://", "https://")): etcd_server = f"http://{etcd_server}" @@ -488,6 +509,8 @@ def change_read_epoch(): ): break time.sleep(0.5) + # make sure change read epoch successfully + previous_read_epoch = read_epoch return f"Read version changed to version {read_epoch}", 200 @@ -600,6 +623,8 @@ def get_all_available_read_epochs_internal(): schema_str, _ = etcd_client.get(schema_key) schema = json.loads(schema_str) unix_timestamp = schema["timestamp"] + num_vertices += schema["total_vertex_num"] + num_edges += schema["total_edge_num"] if latest_timestamp is None or unix_timestamp > latest_timestamp: latest_timestamp = unix_timestamp converted_time = datetime.fromtimestamp(latest_timestamp) diff --git a/vegito/src/graph/graph_ops/add_edge.cc b/vegito/src/graph/graph_ops/add_edge.cc index 48982b8..07de009 100644 --- a/vegito/src/graph/graph_ops/add_edge.cc +++ b/vegito/src/graph/graph_ops/add_edge.cc @@ -68,6 +68,7 @@ void process_add_edge(const StringViewList& cmd, dst_fid != graph_store->get_local_pid()) { seggraph::SegGraph* ov_graph = graph_store->get_ov_graph(dst_label); auto ov_writer = ov_graph->create_graph_writer(write_epoch); + graph_store->add_total_edge_num_by_one(); #ifdef USE_MULTI_THREADS auto outer_vertex_label_mutex = graph_store->get_outer_vertex_label_mutex(dst_label); @@ -185,6 +186,8 @@ void process_add_edge(const StringViewList& cmd, vertex_t dst_lid = graph_store->id_parser.GenerateId(0, dst_label, dst_offset); + graph_store->add_total_edge_num_by_one(); + // inner edges src_writer.put_edge(src_offset, elabel, seggraph::EOUT, dst_lid, edge_data); dst_writer.put_edge(dst_offset, elabel, seggraph::EIN, src_lid, edge_data); diff --git a/vegito/src/graph/graph_ops/del_edge.cc b/vegito/src/graph/graph_ops/del_edge.cc index 061ba89..b74c28e 100644 --- a/vegito/src/graph/graph_ops/del_edge.cc +++ b/vegito/src/graph/graph_ops/del_edge.cc @@ -65,7 +65,7 @@ void process_del_edge(const StringViewList& cmd, dst_offset = max_outer_id_offset - dst_offset_reverse; src_graph = graph_store->get_graph(src_label); dst_graph = graph_store->get_ov_graph(dst_label); - + graph_store->del_total_edge_num_by_one(); } else if (src_fid != graph_store->get_local_pid() && dst_fid == graph_store->get_local_pid()) { src_offset_reverse = graph_store->get_lid(src_label, src_vid); @@ -83,6 +83,7 @@ void process_del_edge(const StringViewList& cmd, dst_offset_reverse = dst_offset; src_graph = graph_store->get_graph(src_label); dst_graph = graph_store->get_graph(dst_label); + graph_store->del_total_edge_num_by_one(); } { diff --git a/vegito/src/graph/graph_ops/del_vertex.cc b/vegito/src/graph/graph_ops/del_vertex.cc index c1b998a..3fd8e77 100644 --- a/vegito/src/graph/graph_ops/del_vertex.cc +++ b/vegito/src/graph/graph_ops/del_vertex.cc @@ -41,6 +41,7 @@ void process_del_vertex(const StringViewList& cmd, graph_store->delete_inner(v_label, v_offset); // delete vertex from vertex table src_graph->add_deleted_inner_num(1); + graph_store->del_total_vertex_num_by_one(); // delete ralated edges auto src_writer = @@ -151,6 +152,7 @@ void process_del_vertex(const StringViewList& cmd, src_writer.put_edge(v_offset, elabel, seggraph::EOUT, dst_loc, edge_data); + graph_store->del_total_edge_num_by_one(); if (dst_offset < graph_store->get_vtable_max_inner( dst_label)) { // dst is an inner vertex diff --git a/vegito/src/graph/graph_store.cc b/vegito/src/graph/graph_store.cc index f74fb91..5798bc0 100644 --- a/vegito/src/graph/graph_store.cc +++ b/vegito/src/graph/graph_store.cc @@ -490,6 +490,10 @@ void GraphStore::put_blob_json_etcd(uint64_t write_epoch) { std::time_t currentTime = std::time(nullptr); // put the currentTime as a int64_t into blob schema blob_schema["timestamp"] = currentTime; + blob_schema["total_vertex_num"] = + total_vertex_num_.load(std::memory_order_relaxed); + blob_schema["total_edge_num"] = + total_edge_num_.load(std::memory_order_relaxed); blob_schema["string_buffer_object_id"] = string_buffer_manager_.get_buffer_oid(); auto blob_schemas = fetch_blob_schema(write_epoch); @@ -532,6 +536,8 @@ bool GraphStore::insert_inner_vertex(int epoch, uint64_t gid, return false; // not in this partition } + add_total_vertex_num_by_one(); + #ifndef USE_GLOBAL_VERTEX_MAP // local vertex map if (external_id_dtype_[vlabel] == PropertyDataType::LONG) { diff --git a/vegito/src/graph/graph_store.h b/vegito/src/graph/graph_store.h index 5bdf0b4..49c0193 100644 --- a/vegito/src/graph/graph_store.h +++ b/vegito/src/graph/graph_store.h @@ -497,6 +497,14 @@ class GraphStore { return max_memory_usage_[vlabel]; } + void add_total_vertex_num_by_one() { total_vertex_num_++; } + + void add_total_edge_num_by_one() { total_edge_num_++; } + + void del_total_vertex_num_by_one() { total_vertex_num_--; } + + void del_total_edge_num_by_one() { total_edge_num_--; } + #ifdef USE_MULTI_THREADS std::shared_timed_mutex* get_vertex_label_mutex(uint64_t vlabel) { return vertex_label_mutexes_[vlabel]; @@ -533,6 +541,8 @@ class GraphStore { const int local_pnum_; // number of partitions in the machine const int total_partitions_; // total number of partitions int total_vertex_label_num_; + std::atomic total_vertex_num_{0}; + std::atomic total_edge_num_{0}; SparseArrayAllocator array_allocator_;