Skip to content

Commit

Permalink
fix(interactive): add runtime test tool (#4156)
Browse files Browse the repository at this point in the history
as titled.
  • Loading branch information
liulx20 authored Aug 20, 2024
1 parent b04f585 commit ae51397
Show file tree
Hide file tree
Showing 18 changed files with 484 additions and 43 deletions.
9 changes: 9 additions & 0 deletions flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ add_executable(rt_admin rt_admin.cc)
target_link_libraries(rt_admin flex_utils flex_rt_mutable_graph flex_graph_db)
install_without_export_flex_target(rt_admin)


add_executable(adhoc_runner adhoc_runner.cc)
target_link_libraries(adhoc_runner flex_utils flex_graph_db)
install_without_export_flex_target(adhoc_runner)

add_executable(flex_analytical_engine flex_analytical_engine.cc)
target_link_libraries(flex_analytical_engine flex_immutable_graph flex_bsp ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
install_without_export_flex_target(flex_analytical_engine)
Expand All @@ -36,3 +41,7 @@ include_directories(${Boost_INCLUDE_DIRS})
add_executable(bulk_loader bulk_loader.cc)
target_link_libraries(bulk_loader flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
install_without_export_flex_target(bulk_loader)

add_executable(stored_procedure_runner stored_procedure_runner.cc)
target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils flex_graph_db ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
install_without_export_flex_target(stored_procedure_runner)
207 changes: 207 additions & 0 deletions flex/bin/adhoc_runner.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "grape/util.h"

#include <boost/program_options.hpp>
#include <fstream>
#include <iostream>
#include <vector>
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"

namespace bpo = boost::program_options;

std::string read_pb(const std::string& filename) {
std::ifstream file(filename, std::ios::binary);

if (!file.is_open()) {
LOG(FATAL) << "open pb file: " << filename << " failed...";
return "";
}

file.seekg(0, std::ios::end);
size_t size = file.tellg();
file.seekg(0, std::ios::beg);

std::string buffer;
buffer.resize(size);

file.read(&buffer[0], size);

file.close();

return buffer;
}

void load_params(const std::string& filename,
std::vector<std::map<std::string, std::string>>& map) {
std::ifstream in(filename);
if (!in.is_open()) {
LOG(FATAL) << "open params file: " << filename << " failed...";
return;
}
std::string line;
std::vector<std::string> keys;
std::getline(in, line);
std::stringstream ss(line);
std::string key;
while (std::getline(ss, key, '|')) {
keys.push_back(key);
LOG(INFO) << key;
}
while (std::getline(in, line)) {
std::map<std::string, std::string> m;
std::stringstream ss(line);
std::string value;
for (auto& key : keys) {
std::getline(ss, value, '|');
m[key] = value;
}
map.push_back(m);
}
}

int main(int argc, char** argv) {
bpo::options_description desc("Usage:");
desc.add_options()("help", "Display help message")(
"version,v", "Display version")("shard-num,s",
bpo::value<uint32_t>()->default_value(1),
"shard number of actor system")(
"data-path,d", bpo::value<std::string>(), "data directory path")(
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
"query-file,q", bpo::value<std::string>(), "query file")(
"params_file,p", bpo::value<std::string>(), "params file")(
"query-num,n", bpo::value<int>()->default_value(0))(
"output-file,o", bpo::value<std::string>(), "output file");

google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;

bpo::variables_map vm;
bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
bpo::notify(vm);

if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}
if (vm.count("version")) {
std::cout << "GraphScope/Flex version " << FLEX_VERSION << std::endl;
return 0;
}

uint32_t shard_num = vm["shard-num"].as<uint32_t>();

std::string graph_schema_path = "";
std::string data_path = "";
std::string output_path = "";
int query_num = vm["query-num"].as<int>();

if (!vm.count("graph-config")) {
LOG(ERROR) << "graph-config is required";
return -1;
}
graph_schema_path = vm["graph-config"].as<std::string>();
if (!vm.count("data-path")) {
LOG(ERROR) << "data-path is required";
return -1;
}
data_path = vm["data-path"].as<std::string>();
if (vm.count("output-file")) {
output_path = vm["output-file"].as<std::string>();
}

setenv("TZ", "Asia/Shanghai", 1);
tzset();

double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
if (!schema.ok()) {
LOG(ERROR) << "Failed to load graph schema from " << graph_schema_path;
return -1;
}
db.Open(schema.value(), data_path, shard_num);

t0 += grape::GetCurrentTime();

LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s";
std::string req_file = vm["query-file"].as<std::string>();
std::string query = read_pb(req_file);
auto txn = db.GetReadTransaction();
std::vector<std::map<std::string, std::string>> map;
load_params(vm["params_file"].as<std::string>(), map);
size_t params_num = map.size();

physical::PhysicalPlan pb;
pb.ParseFromString(query);

if (query_num == 0) {
query_num = params_num;
}
std::vector<std::vector<char>> outputs(query_num);

double t1 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
}
t1 += grape::GetCurrentTime();

double t2 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
}
t2 += grape::GetCurrentTime();

double t3 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
}
t3 += grape::GetCurrentTime();

LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t1
<< " s, avg " << t1 / static_cast<double>(query_num) * 1000000
<< " us";
LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t2
<< " s, avg " << t2 / static_cast<double>(query_num) * 1000000
<< " us";
LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t3
<< " s, avg " << t3 / static_cast<double>(query_num) * 1000000
<< " us";

if (!output_path.empty()) {
FILE* fout = fopen(output_path.c_str(), "a");
for (auto& output : outputs) {
fwrite(output.data(), sizeof(char), output.size(), fout);
}
fflush(fout);
fclose(fout);
}

return 0;
}
Loading

0 comments on commit ae51397

Please sign in to comment.