diff --git a/converter/binlog_convert.cc b/converter/binlog_convert.cc index 7e39f39..01a7bd2 100644 --- a/converter/binlog_convert.cc +++ b/converter/binlog_convert.cc @@ -77,6 +77,14 @@ int main(int argc, char** argv) { LogEntry log_entry; GART_CHECK_OK(parser.parse(log_entry, line, epoch)); + ++init_logs; + + // skip invalid log entry (unused tables) + if (!log_entry.valid()) { + consumer->delete_message(msg); + continue; + } + while (log_entry.more_entires()) { ostream << log_entry.to_string() << flush; GART_CHECK_OK(parser.parse(log_entry, line, epoch)); @@ -86,12 +94,6 @@ int main(int argc, char** argv) { } } - ++init_logs; - if (!log_entry.valid()) { - consumer->delete_message(msg); - continue; - } - ostream << log_entry.to_string() << flush; consumer->delete_message(msg); @@ -145,6 +147,14 @@ int main(int argc, char** argv) { LogEntry log_entry; GART_CHECK_OK(parser.parse(log_entry, line, epoch)); + ++log_count; + + // skip invalid log entry (unused tables) + if (!log_entry.valid()) { + consumer->delete_message(msg); + continue; + } + #ifndef USE_DEBEZIUM epoch = log_count / FLAGS_logs_per_epoch; #else @@ -173,13 +183,7 @@ int main(int argc, char** argv) { } } - if (!log_entry.valid()) { - continue; - } - ostream << log_entry.to_string() << flush; consumer->delete_message(msg); - - log_count++; } } diff --git a/converter/parser.cc b/converter/parser.cc index eba2ef9..66242a3 100644 --- a/converter/parser.cc +++ b/converter/parser.cc @@ -78,6 +78,10 @@ string LogEntry::to_string() const { : op_type == OpType::UPDATE ? "update" : op_type == OpType::DELETE ? "delete" : "unknown"; + if (base == "unknown") { + LOG(ERROR) << "Unknown operation type: " << static_cast(op_type); + assert(false); + } if (entity_type == EntityType::VERTEX) { append_str(base, string("vertex"), '_'); } else { @@ -139,8 +143,8 @@ gart::Status TxnLogParser::init(const string& etcd_endpoint, const string& table_name = vdef[idx]["dataSourceName"].as(); const string& label = vdef[idx]["type_name"].as(); table2label_names_[table_name].push_back(label); - useful_tables_.emplace(table_name, true); - is_vlable_names_.emplace(label, true); + useful_tables_.insert(table_name); + vlable_names_.insert(label); vertex_id_columns_.emplace(label, vdef[idx]["idFieldName"].as()); vertex_label2ids_.emplace(label, id); YAML::Node properties = vdef[idx]["mappings"]; @@ -156,7 +160,7 @@ gart::Status TxnLogParser::init(const string& etcd_endpoint, // parse edges for (int idx = 0; idx < elabel_num; ++idx) { auto table_name = edef[idx]["dataSourceName"].as(); - useful_tables_.emplace(table_name, true); + useful_tables_.insert(table_name); auto src_label = edef[idx]["type_pair"]["source_vertex"].as(); auto dst_label = edef[idx]["type_pair"]["destination_vertex"].as(); auto label = edef[idx]["type_pair"]["edge"].as(); @@ -222,6 +226,10 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str, auto useful_tables_it = useful_tables_.find(table_name); if (useful_tables_it == useful_tables_.end()) { // skip unused tables + if (unused_tables_.find(table_name) == unused_tables_.end()) { + LOG(INFO) << "Skip unused table: " << table_name; + unused_tables_.insert(table_name); + } return gart::Status::OK(); } #ifndef USE_DEBEZIUM @@ -230,8 +238,8 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str, type = log["op"].get(); #endif auto label_name = table2label_names_.find(table_name)->second[out.table_idx]; - auto is_vlabel_name_it = is_vlable_names_.find(label_name); - if (is_vlabel_name_it != is_vlable_names_.end()) { + auto is_vlabel_name_it = vlable_names_.find(label_name); + if (is_vlabel_name_it != vlable_names_.end()) { out.entity_type = LogEntry::EntityType::VERTEX; } else { out.entity_type = LogEntry::EntityType::EDGE; @@ -298,7 +306,6 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str, fill_prop(out, log); } - out.valid_ = true; if (out.update_has_finish_delete != true) { out.table_idx++; } @@ -308,6 +315,8 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str, out.all_labels_have_process = true; out.table_idx = 0; } + + out.valid_ = true; return gart::Status::OK(); } diff --git a/converter/parser.h b/converter/parser.h index 2664b90..594ccb9 100644 --- a/converter/parser.h +++ b/converter/parser.h @@ -17,6 +17,7 @@ #define CONVERTER_PARSER_H_ #include +#include #include #include #include @@ -120,8 +121,9 @@ class TxnLogParser { // used for schema mapping (unchanged after init) int vlabel_num_; int subgraph_num_; - std::map useful_tables_; - std::map is_vlable_names_; + std::set useful_tables_; + std::set unused_tables_; + std::set vlable_names_; // table_name -> vertex_label names (one table may responses to multiple // vertex labels) std::map> table2label_names_;