Skip to content

Commit

Permalink
fix(interactive): Fix some bug of interactive admin service (#3938)
Browse files Browse the repository at this point in the history
Fix some bugs.
Fix #3927 
TODO: add a test that test bulk loading should fails.
  • Loading branch information
zhanglei1949 authored Jul 3, 2024
1 parent 70188a6 commit d5760e4
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 14 deletions.
2 changes: 1 addition & 1 deletion flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void signal_handler(int signal) {
<< ",Clearing directory: " << work_dir << ", exiting...";
// remove all files in work_dir
std::filesystem::remove_all(work_dir);
exit(0);
exit(signal);
} else {
LOG(ERROR) << "Received unexpected signal " << signal << ", exiting...";
exit(1);
Expand Down
4 changes: 3 additions & 1 deletion flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ void HQPSService::init(const ServiceConfig& config) {
}
}
if (config.start_compiler) {
start_compiler_subprocess();
if (!start_compiler_subprocess()) {
LOG(FATAL) << "Failed to start compiler subprocess! exiting...";
}
}
start_time_.store(gs::GetCurrentTimeStamp());
}
Expand Down
56 changes: 47 additions & 9 deletions flex/interactive/sdk/python/test/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_example(self):
self._graph_id = self.createGraph()
self.bulkLoading()
self.bulkLoadingUploading()
self.waitJobFinish()
self.bulkLoadingFailure()
self.list_graph()
self.runCypherQuery()
self.runGremlinQuery()
Expand Down Expand Up @@ -157,7 +157,8 @@ def bulkLoading(self):
)
resp = self._sess.bulk_loading(self._graph_id, schema_mapping)
assert resp.is_ok()
self._job_id = resp.get_value().job_id
job_id = resp.get_value().job_id
assert self.waitJobFinish(job_id)


def bulkLoadingUploading(self):
Expand Down Expand Up @@ -191,22 +192,59 @@ def bulkLoadingUploading(self):
)
resp = self._sess.bulk_loading(self._graph_id, schema_mapping)
assert resp.is_ok()
self._job_id = resp.get_value().job_id
job_id = resp.get_value().job_id
assert self.waitJobFinish(job_id)

def waitJobFinish(self):
assert self._job_id is not None
def waitJobFinish(self, job_id: str):
assert job_id is not None
while True:
resp = self._sess.get_job(self._job_id)
resp = self._sess.get_job(job_id)
assert resp.is_ok()
status = resp.get_value().status
print("job status: ", status)
if status == "SUCCESS":
break
return True
elif status == "FAILED":
raise Exception("job failed")
return False
else:
time.sleep(1)
print("job finished")

def bulkLoadingFailure(self):
"""
Submit a bulk loading job with invalid data, and expect the job to fail.
"""
assert os.environ.get("FLEX_DATA_DIR") is not None
person_csv_path = os.path.join(os.environ.get("FLEX_DATA_DIR"), "person.csv")
knows_csv_path = os.path.join(
os.environ.get("FLEX_DATA_DIR"), "person_knows_person.csv"
)
print("test bulk loading: ", self._graph_id)
schema_mapping = SchemaMapping(
loading_config=SchemaMappingLoadingConfig(
import_option="init",
format=SchemaMappingLoadingConfigFormat(type="csv"),
),
vertex_mappings=[
# Intentionally use the wrong file for the vertex mapping
VertexMapping(type_name="person", inputs=[knows_csv_path])
],
edge_mappings=[
EdgeMapping(
type_triplet=EdgeMappingTypeTriplet(
edge="knows",
source_vertex="person",
destination_vertex="person",
),
# Intentionally use the wrong file for the edge mapping
inputs=[person_csv_path],
)
],
)
resp = self._sess.bulk_loading(self._graph_id, schema_mapping)
assert resp.is_ok()
job_id = resp.get_value().job_id
# Expect to fail
assert self.waitJobFinish(job_id) == False

def list_graph(self):
resp = self._sess.list_graphs()
Expand Down
13 changes: 10 additions & 3 deletions flex/tests/interactive/test_call_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,21 @@ def callProcedureWithEncoder(self, graph_id : str):
print("call count_vertex_num failed: ", resp.get_status_message())
exit(1)

# plus_one, should be with id 3
# construct a byte array with bytes: the 4 bytes of integer 1, and a byte 3
byte_string = bytes([0,0,0,0,2]) # 4 bytes of integer 1, and a byte 3
# plus_one, should be with id 2
# construct a byte array with bytes: the 4 bytes of integer 1, and a byte 2
value = 1
byte_string = value.to_bytes(4, byteorder=sys.byteorder) + bytes([2])
# byte_string = bytes([1,0,0,0,2]) # 4 bytes of integer 1, and a byte 3
params = byte_string.decode('utf-8')
resp = self._sess.call_procedure_raw(graph_id, params)
if not resp.is_ok():
print("call plus_one failed: ", resp.get_status_message())
exit(1)
res = resp.get_value()
assert len(res) == 4
# the four byte represent a integer
res = int.from_bytes(res, byteorder=sys.byteorder)
assert(res == 2)

if __name__ == "__main__":
#parse command line args
Expand Down

0 comments on commit d5760e4

Please sign in to comment.