Skip to content

Commit

Permalink
Fix bugs when skipping unused tables
Browse files Browse the repository at this point in the history
Signed-off-by: Sijie Shen <[email protected]>
  • Loading branch information
ds-ssj committed May 31, 2024
1 parent d343633 commit 54211e2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 20 deletions.
28 changes: 16 additions & 12 deletions converter/binlog_convert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ int main(int argc, char** argv) {
LogEntry log_entry;
GART_CHECK_OK(parser.parse(log_entry, line, epoch));

++init_logs;

// skip invalid log entry (unused tables)
if (!log_entry.valid()) {
consumer->delete_message(msg);
continue;
}

while (log_entry.more_entires()) {
ostream << log_entry.to_string() << flush;
GART_CHECK_OK(parser.parse(log_entry, line, epoch));
Expand All @@ -86,12 +94,6 @@ int main(int argc, char** argv) {
}
}

++init_logs;
if (!log_entry.valid()) {
consumer->delete_message(msg);
continue;
}

ostream << log_entry.to_string() << flush;
consumer->delete_message(msg);

Expand Down Expand Up @@ -145,6 +147,14 @@ int main(int argc, char** argv) {
LogEntry log_entry;
GART_CHECK_OK(parser.parse(log_entry, line, epoch));

++log_count;

// skip invalid log entry (unused tables)
if (!log_entry.valid()) {
consumer->delete_message(msg);
continue;
}

#ifndef USE_DEBEZIUM
epoch = log_count / FLAGS_logs_per_epoch;
#else
Expand Down Expand Up @@ -173,13 +183,7 @@ int main(int argc, char** argv) {
}
}

if (!log_entry.valid()) {
continue;
}

ostream << log_entry.to_string() << flush;
consumer->delete_message(msg);

log_count++;
}
}
21 changes: 15 additions & 6 deletions converter/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ string LogEntry::to_string() const {
: op_type == OpType::UPDATE ? "update"
: op_type == OpType::DELETE ? "delete"
: "unknown";
if (base == "unknown") {
LOG(ERROR) << "Unknown operation type: " << static_cast<int>(op_type);
assert(false);
}
if (entity_type == EntityType::VERTEX) {
append_str(base, string("vertex"), '_');
} else {
Expand Down Expand Up @@ -139,8 +143,8 @@ gart::Status TxnLogParser::init(const string& etcd_endpoint,
const string& table_name = vdef[idx]["dataSourceName"].as<string>();
const string& label = vdef[idx]["type_name"].as<string>();
table2label_names_[table_name].push_back(label);
useful_tables_.emplace(table_name, true);
is_vlable_names_.emplace(label, true);
useful_tables_.insert(table_name);
vlable_names_.insert(label);
vertex_id_columns_.emplace(label, vdef[idx]["idFieldName"].as<string>());
vertex_label2ids_.emplace(label, id);
YAML::Node properties = vdef[idx]["mappings"];
Expand All @@ -156,7 +160,7 @@ gart::Status TxnLogParser::init(const string& etcd_endpoint,
// parse edges
for (int idx = 0; idx < elabel_num; ++idx) {
auto table_name = edef[idx]["dataSourceName"].as<string>();
useful_tables_.emplace(table_name, true);
useful_tables_.insert(table_name);
auto src_label = edef[idx]["type_pair"]["source_vertex"].as<string>();
auto dst_label = edef[idx]["type_pair"]["destination_vertex"].as<string>();
auto label = edef[idx]["type_pair"]["edge"].as<string>();
Expand Down Expand Up @@ -222,6 +226,10 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str,
auto useful_tables_it = useful_tables_.find(table_name);
if (useful_tables_it == useful_tables_.end()) {
// skip unused tables
if (unused_tables_.find(table_name) == unused_tables_.end()) {
LOG(INFO) << "Skip unused table: " << table_name;
unused_tables_.insert(table_name);
}
return gart::Status::OK();
}
#ifndef USE_DEBEZIUM
Expand All @@ -230,8 +238,8 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str,
type = log["op"].get<string>();
#endif
auto label_name = table2label_names_.find(table_name)->second[out.table_idx];
auto is_vlabel_name_it = is_vlable_names_.find(label_name);
if (is_vlabel_name_it != is_vlable_names_.end()) {
auto is_vlabel_name_it = vlable_names_.find(label_name);
if (is_vlabel_name_it != vlable_names_.end()) {
out.entity_type = LogEntry::EntityType::VERTEX;
} else {
out.entity_type = LogEntry::EntityType::EDGE;
Expand Down Expand Up @@ -298,7 +306,6 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str,
fill_prop(out, log);
}

out.valid_ = true;
if (out.update_has_finish_delete != true) {
out.table_idx++;
}
Expand All @@ -308,6 +315,8 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str,
out.all_labels_have_process = true;
out.table_idx = 0;
}

out.valid_ = true;
return gart::Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions converter/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#define CONVERTER_PARSER_H_

#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -120,8 +121,9 @@ class TxnLogParser {
// used for schema mapping (unchanged after init)
int vlabel_num_;
int subgraph_num_;
std::map<std::string, bool> useful_tables_;
std::map<std::string, bool> is_vlable_names_;
std::set<std::string> useful_tables_;
std::set<std::string> unused_tables_;
std::set<std::string> vlable_names_;
// table_name -> vertex_label names (one table may responses to multiple
// vertex labels)
std::map<std::string, std::vector<std::string>> table2label_names_;
Expand Down

0 comments on commit 54211e2

Please sign in to comment.