Skip to content

Commit

Permalink
Merge branch 'main' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 authored Aug 14, 2024
2 parents 4d1ae93 + e69a05a commit 09a6231
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 56 deletions.
60 changes: 45 additions & 15 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ std::string merge_graph_and_plugin_meta(

nlohmann::json res;
for (auto& graph_meta : res_graph_metas) {
res.push_back(nlohmann::json::parse(graph_meta.ToJson()));
try {
res.push_back(nlohmann::json::parse(graph_meta.ToJson()));
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse graph meta: " << e.what()
<< graph_meta.ToJson();
}
}
return res.empty() ? "{}" : res.dump();
}
Expand Down Expand Up @@ -348,20 +353,38 @@ gs::Status invoke_delete_plugin_meta(

// util functions

std::string to_json_str(const std::vector<gs::PluginMeta>& plugin_metas) {
nlohmann::json res;
for (auto& plugin_meta : plugin_metas) {
res.push_back(nlohmann::json::parse(plugin_meta.ToJson()));
gs::Result<seastar::sstring> to_json_str(
const std::vector<gs::PluginMeta>& plugin_metas) {
try {
nlohmann::json res;
for (auto& plugin_meta : plugin_metas) {
res.push_back(nlohmann::json::parse(plugin_meta.ToJson()));
}
return res.empty() ? gs::Result<seastar::sstring>("{}")
: gs::Result<seastar::sstring>(res.dump());
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse plugin meta from json string: " << e.what();
return gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::InternalError,
"Fail to parse plugin meta: " + std::string(e.what())));
}
return res.empty() ? "{}" : res.dump();
}

std::string to_json_str(const std::vector<gs::JobMeta>& job_metas) {
nlohmann::json res;
for (auto& job_meta : job_metas) {
res.push_back(nlohmann::json::parse(job_meta.ToJson(true)));
gs::Result<seastar::sstring> to_json_str(
const std::vector<gs::JobMeta>& job_metas) {
try {
nlohmann::json res;
for (auto& job_meta : job_metas) {
res.push_back(nlohmann::json::parse(job_meta.ToJson()));
}
return res.empty() ? gs::Result<seastar::sstring>("{}")
: gs::Result<seastar::sstring>(res.dump());
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse job meta from json string: " << e.what();
return gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::InternalError,
"Fail to parse job meta: " + std::string(e.what())));
}
return res.empty() ? "{}" : res.dump();
}

admin_actor::~admin_actor() {
Expand Down Expand Up @@ -711,7 +734,7 @@ seastar::future<admin_query_result> admin_actor::get_procedures_by_graph_name(
graph_meta_res.value().plugin_metas.begin(),
graph_meta_res.value().plugin_metas.end());
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(to_json_str(all_plugin_metas)));
to_json_str(all_plugin_metas));
} else {
LOG(ERROR) << "Fail to get all procedures: "
<< get_all_procedure_res.status().error_message();
Expand Down Expand Up @@ -1146,7 +1169,15 @@ seastar::future<admin_query_result> admin_actor::service_status(
graph_meta.plugin_metas.emplace_back(plugin_meta);
}
}
res["graph"] = nlohmann::json::parse(graph_meta.ToJson());
try {
res["graph"] = nlohmann::json::parse(graph_meta.ToJson());
} catch (std::exception& e) {
LOG(ERROR) << "Fail to parse graph meta: " << e.what();
return seastar::make_exception_future<admin_query_result>(
gs::Status(
gs::StatusCode::InternalError,
"Fail to parse graph meta: " + std::string(e.what())));
}
} else {
LOG(ERROR) << "Fail to get all procedures: "
<< get_all_procedure_res.status().error_message();
Expand Down Expand Up @@ -1222,9 +1253,8 @@ seastar::future<admin_query_result> admin_actor::list_jobs(
auto list_res = metadata_store_->GetAllJobMeta();
if (list_res.ok()) {
VLOG(10) << "Successfully list jobs";
auto list_job_metas_str = to_json_str(list_res.value());
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(std::move(list_job_metas_str)));
to_json_str(list_res.value()));
} else {
LOG(ERROR) << "Fail to list jobs: " << list_res.status().error_message();
return seastar::make_ready_future<admin_query_result>(list_res.status());
Expand Down
13 changes: 8 additions & 5 deletions flex/storages/metadata/default_graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ Result<bool> DefaultGraphMetaStore::UpdateGraphMeta(
json = nlohmann::json::parse(old_meta);
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse old graph meta:" << e.what();
return Result<std::string>(Status(StatusCode::InternalError,
"Fail to parse old graph meta"));
return Result<std::string>(
Status(StatusCode::InternalError,
std::string("Fail to parse old graph meta: ") + e.what()));
}
auto graph_meta = GraphMeta::FromJson(json);
if (request.graph_name.has_value()) {
Expand Down Expand Up @@ -197,8 +198,9 @@ Result<bool> DefaultGraphMetaStore::UpdatePluginMeta(
json = nlohmann::json::parse(old_meta);
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse old plugin meta:" << e.what();
return Result<std::string>(Status(StatusCode::InternalError,
"Fail to parse old plugin meta"));
return Result<std::string>(Status(
StatusCode::InternalError,
std::string("Fail to parse old plugin meta: ") + e.what()));
}
auto plugin_meta = PluginMeta::FromJson(json);
if (plugin_meta.bound_graph != graph_id) {
Expand Down Expand Up @@ -288,7 +290,8 @@ Result<bool> DefaultGraphMetaStore::UpdateJobMeta(
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse old job meta:" << e.what();
return Result<std::string>(
Status(StatusCode::InternalError, "Fail to parse old job meta"));
Status(StatusCode::InternalError,
std::string("Fail to parse old job meta: ") + e.what()));
}
auto job_meta = JobMeta::FromJson(json);
if (update_request.status.has_value()) {
Expand Down
139 changes: 103 additions & 36 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,40 @@ std::string GraphMeta::ToJson() const {
json["creation_time"] = creation_time;
json["data_update_time"] = data_update_time;
if (!data_import_config.empty()) {
json["data_import_config"] = nlohmann::json::parse(data_import_config);
try {
json["data_import_config"] = nlohmann::json::parse(data_import_config);
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid data_import_config: " << data_import_config << " "
<< e.what();
}
}
try {
json["schema"] = nlohmann::json::parse(schema);
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid schema: " << schema << " " << e.what();
}
json["schema"] = nlohmann::json::parse(schema);
json["stored_procedures"] = nlohmann::json::array();
for (auto& plugin_meta : plugin_metas) {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
try {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid plugin_meta: " << plugin_meta.ToJson() << " "
<< e.what();
}
}
json["store_type"] = store_type;
return json.dump();
}

GraphMeta GraphMeta::FromJson(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
return GraphMeta::FromJson(j);
try {
auto j = nlohmann::json::parse(json_str);
return GraphMeta::FromJson(j);
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid json string: " << json_str << " " << e.what();
return GraphMeta();
}
}

GraphMeta GraphMeta::FromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -119,8 +138,13 @@ GraphMeta GraphMeta::FromJson(const nlohmann::json& json) {
}

PluginMeta PluginMeta::FromJson(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
return PluginMeta::FromJson(j);
try {
auto j = nlohmann::json::parse(json_str);
return PluginMeta::FromJson(j);
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid json string: " << json_str << " " << e.what();
return PluginMeta();
}
}

PluginMeta PluginMeta::FromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -217,33 +241,46 @@ void PluginMeta::setParamsFromJsonString(const std::string& json_str) {
json_str == "nu") {
return;
}
auto j = nlohmann::json::parse(json_str);
if (j.is_array()) {
for (auto& param : j) {
Parameter p;
p.name = param["name"].get<std::string>();
p.type = param["type"].get<PropertyType>();
params.push_back(p);
try {
auto j = nlohmann::json::parse(json_str);
if (j.is_array()) {
for (auto& param : j) {
Parameter p;
p.name = param["name"].get<std::string>();
p.type = param["type"].get<PropertyType>();
params.push_back(p);
}
} else {
LOG(ERROR) << "Invalid params string: " << json_str;
}
} else {
LOG(ERROR) << "Invalid params string: " << json_str;
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid params string: " << json_str << " " << e.what();
}
}

void PluginMeta::setReturnsFromJsonString(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
for (auto& ret : j) {
Parameter p;
p.name = ret["name"].get<std::string>();
p.type = ret["type"].get<PropertyType>();
returns.push_back(p);
try {
auto j = nlohmann::json::parse(json_str);
for (auto& ret : j) {
Parameter p;
p.name = ret["name"].get<std::string>();
p.type = ret["type"].get<PropertyType>();
returns.push_back(p);
}
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid returns string: " << json_str << " " << e.what();
}
}

void PluginMeta::setOptionFromJsonString(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
for (auto& opt : j.items()) {
option[opt.key()] = opt.value().get<std::string>();
try {
auto j = nlohmann::json::parse(json_str);
for (auto& opt : j.items()) {
option[opt.key()] = opt.value().get<std::string>();
}
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid option string: " << json_str;
return;
}
}

Expand All @@ -265,8 +302,14 @@ std::string JobMeta::ToJson(bool print_log) const {
}

JobMeta JobMeta::FromJson(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
return JobMeta::FromJson(j);
try {
auto j = nlohmann::json::parse(json_str);
return JobMeta::FromJson(j);
} catch (const std::exception& e) {
LOG(ERROR) << "Fail to parse JobMeta from json: " << json_str << " "
<< e.what();
return JobMeta();
}
}

JobMeta JobMeta::FromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -354,7 +397,11 @@ std::string CreateGraphMetaRequest::ToString() const {
nlohmann::json json;
json["name"] = name;
json["description"] = description;
json["schema"] = nlohmann::json::parse(schema);
try {
json["schema"] = nlohmann::json::parse(schema);
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid schema: " << schema << " " << e.what();
}
if (data_update_time.has_value()) {
json["data_update_time"] = data_update_time.value();
} else {
Expand All @@ -363,8 +410,13 @@ std::string CreateGraphMetaRequest::ToString() const {
json["creation_time"] = creation_time;
json["stored_procedures"] = nlohmann::json::array();
for (auto& plugin_meta : plugin_metas) {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
try {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid plugin_meta: " << plugin_meta.ToJson() << " "
<< e.what();
}
}
return json.dump();
}
Expand Down Expand Up @@ -438,8 +490,14 @@ std::string CreatePluginMetaRequest::ToString() const {

CreatePluginMetaRequest CreatePluginMetaRequest::FromJson(
const std::string& json) {
auto j = nlohmann::json::parse(json);
return CreatePluginMetaRequest::FromJson(j);
try {
auto j = nlohmann::json::parse(json);
return CreatePluginMetaRequest::FromJson(j);
} catch (const std::exception& e) {
LOG(ERROR) << "CreatePluginMetaRequest::FromJson error: " << json << ", "
<< e.what();
return CreatePluginMetaRequest();
}
}

CreatePluginMetaRequest CreatePluginMetaRequest::FromJson(
Expand Down Expand Up @@ -561,7 +619,8 @@ UpdatePluginMetaRequest UpdatePluginMetaRequest::FromJson(
request.enable = j["enable"].get<bool>();
}
} catch (const std::exception& e) {
LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << e.what();
LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << e.what() << " "
<< json;
}
return request;
}
Expand Down Expand Up @@ -741,8 +800,16 @@ std::string GraphStatistics::ToJson() const {
}

Result<GraphStatistics> GraphStatistics::FromJson(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
return GraphStatistics::FromJson(j);
try {
auto j = nlohmann::json::parse(json_str);
return GraphStatistics::FromJson(j);
} catch (const std::exception& e) {
LOG(ERROR) << "Invalid json string: " << json_str << " " << e.what();
return Result<GraphStatistics>(Status(
StatusCode::InternalError,
"Invalid json string when parsing graph statistics : " + json_str +
" " + e.what()));
}
}

Result<GraphStatistics> GraphStatistics::FromJson(const nlohmann::json& json) {
Expand Down

0 comments on commit 09a6231

Please sign in to comment.