From b6a1dceffa6525c16efed5100b07ea956b40335e Mon Sep 17 00:00:00 2001 From: Sijie Shen Date: Wed, 12 Jun 2024 11:20:37 +0800 Subject: [PATCH] Add MySQL GTID parse and config Signed-off-by: Sijie Shen --- converter/parser.cc | 40 ++++++++++++++++++- .../tutorials/data-source-config/mysql.rst | 4 ++ scripts/install-mysql.sh | 4 ++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/converter/parser.cc b/converter/parser.cc index 2ad64b5..0f81827 100644 --- a/converter/parser.cc +++ b/converter/parser.cc @@ -63,6 +63,25 @@ inline string to_lower_case(const string& input) { return output; } +// for MySQL GTID = source_id:transaction_id +inline int extract_tx_id(const std::string& gtid_str) { + size_t last_colon = gtid_str.find_last_of(':'); + if (last_colon != std::string::npos) { + try { + return stoi(gtid_str.substr(last_colon + 1)); + } catch (const std::invalid_argument& e) { + LOG(ERROR) << "Invalid argument: no conversion could be performed"; + } catch (const std::out_of_range& e) { + LOG(ERROR) << "Out of range: the converted value would fall out of the " + "range of the result type"; + } + } else { + LOG(ERROR) << "Error: GTID format invalid. No colon found." << std::endl; + } + + return -1; +} + } // namespace namespace converter { @@ -266,13 +285,32 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str, : LogEntry::OpType::UNKNOWN; out.snapshot = LogEntry::Snapshot::FALSE; #else - out.tx_id = log["source"].value("txId", -1); out.op_type = type == "c" ? LogEntry::OpType::INSERT : type == "u" ? LogEntry::OpType::UPDATE : type == "d" ? LogEntry::OpType::DELETE : type == "r" ? LogEntry::OpType::INSERT : LogEntry::OpType::UNKNOWN; + // parse transaction id (tx_id) + // default for PostgreSQL, -1 for MySQL + // TODO(SSJ): Hardcode for PostgreSQL and MySQL + int tx_id = log["source"].value("txId", -1); + if (tx_id == -1) { + // MySQL + const string& gtid_str = log["source"].value("gtid", string()); + if (!gtid_str.empty()) { + // parse the format: GTID = source_id:transaction_id + tx_id = extract_tx_id(gtid_str); + } else if (type == "r") { + // for snapshot status, GTID is null + tx_id = 0; + } else { + LOG(ERROR) << "Please open GTID for MySQL."; + } + } + + out.tx_id = tx_id; + // Special snapshot status during snapshot: // first: firstRecordInTable && firstTable // last: lastRecordInTable && lastTable diff --git a/docs/documentation/tutorials/data-source-config/mysql.rst b/docs/documentation/tutorials/data-source-config/mysql.rst index 1e80381..34f0173 100644 --- a/docs/documentation/tutorials/data-source-config/mysql.rst +++ b/docs/documentation/tutorials/data-source-config/mysql.rst @@ -17,6 +17,10 @@ MySQL configuration file ``/etc/mysql/my.cnf``: binlog_format=row binlog_row_image=full + # Enable GTID for consistency + gtid_mode=ON + enforce_gtid_consistency=ON + # The databases captured. GART will capture all databases if not specified. binlog-do-db=ldbc # change the name to your database binlog-do-db=... # change the name to your database diff --git a/scripts/install-mysql.sh b/scripts/install-mysql.sh index 711f1ec..0e3fcae 100755 --- a/scripts/install-mysql.sh +++ b/scripts/install-mysql.sh @@ -13,6 +13,10 @@ log-bin=mysql-bin binlog_format=row binlog_row_image=full +# Enable GTID for consistency +gtid_mode=ON +enforce_gtid_consistency=ON + # The databases captured. GART will capture all databases if not specified. binlog-do-db=ldbc # change the name to your database