Skip to content

Commit

Permalink
Add MySQL GTID parse and config
Browse files Browse the repository at this point in the history
Signed-off-by: Sijie Shen <[email protected]>
  • Loading branch information
ds-ssj committed Jun 12, 2024
1 parent 8bb7a42 commit 8c48aeb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
42 changes: 41 additions & 1 deletion converter/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ inline string to_lower_case(const string& input) {
return output;
}

// for MySQL GTID = source_id:transaction_id
inline int extract_tx_id(const std::string& gtid_str) {
size_t last_colon = gtid_str.find_last_of(':');
if (last_colon != std::string::npos) {
try {
return stoi(gtid_str.substr(last_colon + 1));
} catch (const std::invalid_argument& e) {
LOG(ERROR) << "Invalid argument: no conversion could be performed";
} catch (const std::out_of_range& e) {
LOG(ERROR) << "Out of range: the converted value would fall out of the "
"range of the result type";
}
} else {
LOG(ERROR) << "Error: GTID format invalid. No colon found." << std::endl;
}

return -1;
}

} // namespace

namespace converter {
Expand Down Expand Up @@ -266,13 +285,34 @@ gart::Status TxnLogParser::parse(LogEntry& out, const string& log_str,
: LogEntry::OpType::UNKNOWN;
out.snapshot = LogEntry::Snapshot::FALSE;
#else
out.tx_id = log["source"].value("txId", -1);
out.op_type = type == "c" ? LogEntry::OpType::INSERT
: type == "u" ? LogEntry::OpType::UPDATE
: type == "d" ? LogEntry::OpType::DELETE
: type == "r" ? LogEntry::OpType::INSERT
: LogEntry::OpType::UNKNOWN;

// parse transaction id (tx_id)
// default for PostgreSQL, -1 for MySQL
// TODO(SSJ): Hardcode for PostgreSQL and MySQL
int tx_id = log["source"].value("txId", -1);
if (tx_id == -1) {
// MySQL
const json& gtid_json = log["source"]["gtid"];
const string& gtid_str =
gtid_json.is_null() ? string() : gtid_json.get<string>();
if (!gtid_str.empty()) {
// parse the format: GTID = source_id:transaction_id
tx_id = extract_tx_id(gtid_str);
} else if (type == "r") {
// for snapshot status, GTID is null
tx_id = 0;
} else {
LOG(ERROR) << "Please open GTID for MySQL.";
}
}

out.tx_id = tx_id;

// Special snapshot status during snapshot:
// first: firstRecordInTable && firstTable
// last: lastRecordInTable && lastTable
Expand Down
4 changes: 4 additions & 0 deletions docs/documentation/tutorials/data-source-config/mysql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ MySQL configuration file ``/etc/mysql/my.cnf``:
binlog_format=row
binlog_row_image=full
# Enable GTID for consistency
gtid_mode=ON
enforce_gtid_consistency=ON
# The databases captured. GART will capture all databases if not specified.
binlog-do-db=ldbc # change the name to your database
binlog-do-db=... # change the name to your database
Expand Down
4 changes: 4 additions & 0 deletions scripts/install-mysql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ log-bin=mysql-bin
binlog_format=row
binlog_row_image=full
# Enable GTID for consistency
gtid_mode=ON
enforce_gtid_consistency=ON
# The databases captured. GART will capture all databases if not specified.
binlog-do-db=ldbc # change the name to your database
Expand Down

0 comments on commit 8c48aeb

Please sign in to comment.