Skip to content

Commit

Permalink
refactor(interactive): Add test to ensure the robustness of Interacti…
Browse files Browse the repository at this point in the history
…ve (#4078)

Add a unit test to test the robustness of `InteractiveServer`.

Still in progress, need to add more test cases.


Fix #4070 #4034 #4010
  • Loading branch information
zhanglei1949 authored Aug 6, 2024
1 parent 34f25ea commit f2215cf
Show file tree
Hide file tree
Showing 14 changed files with 834 additions and 27 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
# build compiler
cd ${GIE_HOME}/
mvn clean install -Pexperimental -DskipTests
mvn clean install -Pexperimental -DskipTests -q
- name: Prepare dataset and workspace
env:
Expand Down Expand Up @@ -166,6 +166,13 @@ jobs:
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml python
sed -i 's/temp_workspace/interactive_workspace/g' ./engine_config_test.yaml
sed -i 's/thread_num_per_worker: 4/thread_num_per_worker: 1/g' ./engine_config_test.yaml
- name: Robustness test
env:
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
bash hqps_robust_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml
- name: Sample Query test
env:
Expand Down Expand Up @@ -305,7 +312,7 @@ jobs:
sdk_version=$(grep -oPm1 "(?<=<version>)[^<]+" ${GITHUB_WORKSPACE}/flex/interactive/sdk/java/pom.xml)
sed -i "s/<interactive.sdk.version>.*<\/interactive.sdk.version>/<interactive.sdk.version>${sdk_version}<\/interactive.sdk.version>/" ${GITHUB_WORKSPACE}/interactive_engine/pom.xml
cd ${GITHUB_WORKSPACE}/interactive_engine/
mvn clean install -Pexperimental -DskipTests
mvn clean install -Pexperimental -DskipTests -q
- name: Run End-to-End cypher adhoc ldbc query test
env:
Expand Down
19 changes: 12 additions & 7 deletions flex/engines/hqps_db/core/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ void template_set_value(common::Value* value, T v) {
}

template <typename T,
typename std::enable_if<
(std::is_same_v<T, uint64_t>) &&
(!std::is_same_v<uint64_t, unsigned long>)>::type* = nullptr>
typename std::enable_if<(std::is_same_v<T, uint64_t>) &&(
!std::is_same_v<uint64_t, unsigned long>)>::type* = nullptr>
void template_set_value(common::Value* value, T v) {
value->set_i64(v);
}
Expand Down Expand Up @@ -429,8 +428,12 @@ class SinkOp {
// get all property for two labels vertex
auto& schema = graph.schema();
std::array<std::vector<std::string>, 2> prop_names;
prop_names[0] = schema.get_vertex_property_names(labels[0]);
prop_names[1] = schema.get_vertex_property_names(labels[1]);
if (labels[0] < schema.vertex_label_num()) {
prop_names[0] = schema.get_vertex_property_names(labels[0]);
}
if (labels[1] < schema.vertex_label_num()) {
prop_names[1] = schema.get_vertex_property_names(labels[1]);
}
// get all properties
std::array<std::vector<std::shared_ptr<RefColumnBase>>, 2> column_ptrs;
for (size_t i = 0; i < prop_names[0].size(); ++i) {
Expand Down Expand Up @@ -479,7 +482,7 @@ class SinkOp {
for (size_t i : repeat_offsets) {
num_rows += i;
}
CHECK(num_rows == results_vec.results_size())
CHECK((int32_t) num_rows == results_vec.results_size())
<< num_rows << " " << results_vec.results_size();
}
size_t cur_ind = 0;
Expand Down Expand Up @@ -526,7 +529,9 @@ class SinkOp {
results::CollectiveResults& results_vec,
const std::vector<size_t>& repeat_offsets, int32_t tag_id) {
auto& schema = graph.schema();
auto prop_names = schema.get_vertex_property_names(label);
auto prop_names = label < schema.vertex_label_num()
? schema.get_vertex_property_names(label)
: std::vector<std::string>();
// get all properties
std::vector<std::shared_ptr<RefColumnBase>> column_ptrs;
for (size_t i = 0; i < prop_names.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class UnTypedEdgeSet {
std::vector<Direction> get_directions() const {
std::vector<Direction> res;
auto edge_triplet = get_edge_triplets();
for (auto src_label_ind = 0; src_label_ind < src_labels_.size();
for (size_t src_label_ind = 0; src_label_ind < src_labels_.size();
++src_label_ind) {
auto src_label = src_labels_[src_label_ind];
std::vector<std::tuple<LabelT, LabelT, LabelT, Direction>> tmp;
Expand Down Expand Up @@ -627,14 +627,13 @@ class UnTypedEdgeSet {
auto src_vid = src_vertices_[i];
auto& cur_edge_iters = edge_iters[i];
auto src_label_ind = label_indices_[i];
auto src_label = src_labels_[src_label_ind];

for (size_t j = 0; j < cur_edge_iters.size(); ++j) {
auto& cur_iter = cur_edge_iters[j];
while (cur_iter.IsValid()) {
auto dst_vid = cur_iter.GetDstId();
auto data = cur_iter.GetData();
for (auto k = 0; k < repeat_array[cur_ind]; ++k) {
for (size_t k = 0; k < repeat_array[cur_ind]; ++k) {
dst_eles.emplace_back(std::make_tuple(src_vid, dst_vid, data));
label_triplet_indices.emplace_back(sizes[src_label_ind] + j);
}
Expand Down Expand Up @@ -716,7 +715,7 @@ class UnTypedEdgeSet {
std::vector<std::vector<std::tuple<LabelT, LabelT, LabelT>>>
get_edge_triplets() const {
std::vector<std::vector<std::tuple<LabelT, LabelT, LabelT>>> ret;
for (auto src_label_ind = 0; src_label_ind < src_labels_.size();
for (size_t src_label_ind = 0; src_label_ind < src_labels_.size();
++src_label_ind) {
auto src_label = src_labels_[src_label_ind];
std::vector<std::tuple<LabelT, LabelT, LabelT>> tmp;
Expand Down
8 changes: 6 additions & 2 deletions flex/engines/http_server/workdir_manipulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include "flex/engines/http_server/workdir_manipulator.h"
#include "flex/engines/http_server/codegen_proxy.h"

#include <boost/uuid/uuid.hpp> // uuid class
#include <boost/uuid/uuid_generators.hpp> // generators
#include <boost/uuid/uuid_io.hpp> // streaming operators etc.

// Write a macro to define the function, to check whether a filed presents in a
// json object.
#define CHECK_JSON_FIELD(json, field) \
Expand Down Expand Up @@ -668,8 +672,8 @@ gs::Result<std::string> WorkDirManipulator::CreateFile(
}

// get the timestamp as the file name
auto time_stamp = std::to_string(gs::GetCurrentTimeStamp());
auto file_name = GetUploadDir() + "/" + time_stamp;
boost::uuids::uuid uuid = boost::uuids::random_generator()();
auto file_name = GetUploadDir() + "/" + boost::uuids::to_string(uuid);
std::ofstream fout(file_name);
if (!fout.is_open()) {
return {gs::Status(gs::StatusCode::PermissionError, "Fail to open file")};
Expand Down
12 changes: 11 additions & 1 deletion flex/interactive/sdk/python/gs_interactive/client/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ def __init__(
self._gremlin_endpoint = gremlin_endpoint
self._session = None
self.init_host_and_port()
self._neo4j_driver = None

def close(self):
if self._neo4j_driver is not None:
self._neo4j_driver.close()

def __del(self):
self.close()

def init_host_and_port(self):
# prepend http:// to self._admin_endpoint
Expand Down Expand Up @@ -118,7 +126,9 @@ def get_port(self) -> int:
def getNeo4jSessionImpl(self, **config) -> Neo4jSession:
if self._cypher_endpoint is None:
self._cypher_endpoint = self.getNeo4jEndpoint()
return GraphDatabase.driver(self._cypher_endpoint, auth=None).session(**config)
if self._neo4j_driver is None:
self._neo4j_driver = GraphDatabase.driver(self._cypher_endpoint, auth=None)
return self._neo4j_driver.session(**config)

def getNeo4jEndpoint(self) -> str:
"""
Expand Down
18 changes: 18 additions & 0 deletions flex/interactive/sdk/python/gs_interactive/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

Loading

0 comments on commit f2215cf

Please sign in to comment.