Skip to content

Commit

Permalink
Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
anphucbui committed Dec 20, 2023
1 parent e662667 commit 5a30377
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 59 deletions.
18 changes: 18 additions & 0 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,24 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();

struct serverDetails {
long int hostgroup_id;
string hostname;
uint16_t port;
uint16_t gtid_port;
string status;
int64_t weight;
unsigned int compression;
int64_t max_connections;
unsigned int max_replication_lag;
int32_t use_ssl;
unsigned int max_latency_ms;
string comment;
};

int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<string> servers_to_add, unordered_map<string, MySQL_HostGroups_Manager::serverDetails> hostname_values_mapping);
void rebuild_hostname_hostgroup_mapping();

void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
Expand Down
4 changes: 4 additions & 0 deletions include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct cmp_str {

#define N_L_ASE 16

#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"

/*
Implementation of monitoring in AWS Aurora will be different than previous modules
Expand Down Expand Up @@ -418,6 +420,8 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();

vector<MYSQL_ROW> discover_topology(const char* hostname, int port);
void discover_topology_and_add_to_mysql_servers();

private:
std::vector<table_def_t *> *tables_defs_monitor;
Expand Down
3 changes: 3 additions & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct p_th_gauge {
mysql_monitor_ping_interval,
mysql_monitor_ping_timeout,
mysql_monitor_ping_max_failures,
mysql_monitor_topology_discovery_interval,
mysql_monitor_read_only_interval,
mysql_monitor_read_only_timeout,
mysql_monitor_writer_is_also_reader,
Expand Down Expand Up @@ -386,6 +387,8 @@ class MySQL_Threads_Handler
int monitor_ping_max_failures;
//! Monitor ping timeout. Unit: 'ms'.
int monitor_ping_timeout;
//! Monitor topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'.
int monitor_topology_discovery_interval;
//! Monitor read only timeout. Unit: 'ms'.
int monitor_read_only_interval;
//! Monitor read only timeout. Unit: 'ms'.
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ __thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_max_failures;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_topology_discovery_interval;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down Expand Up @@ -1068,6 +1069,7 @@ extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_max_failures;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_topology_discovery_interval;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,6 @@ void close_all_non_term_fd(std::vector<int> excludeFDs);
*/
std::pair<int,const char*> get_dollar_quote_error(const char* version);

long parseLong(const char* s);

#endif
177 changes: 118 additions & 59 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1969,65 +1969,7 @@ bool MySQL_HostGroups_Manager::commit(
if (hgsm_mysql_servers_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] ||
hgsm_mysql_replication_hostgroups_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS])
{
proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n",
hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS],
hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]);

char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;

const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \
UNION \
SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \
ORDER BY hostname, port";

mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);

hostgroup_server_mapping.clear();

if (resultset && resultset->rows_count) {
std::string fetched_server_id;
HostGroup_Server_Mapping* fetched_server_mapping = NULL;

for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;

const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1];

if (fetched_server_mapping == NULL || server_id != fetched_server_id) {

auto itr = hostgroup_server_mapping.find(server_id);

if (itr == hostgroup_server_mapping.end()) {
std::unique_ptr<HostGroup_Server_Mapping> server_mapping(new HostGroup_Server_Mapping(this));
fetched_server_mapping = server_mapping.get();
hostgroup_server_mapping.insert( std::pair<std::string,std::unique_ptr<MySQL_HostGroups_Manager::HostGroup_Server_Mapping>> {
server_id, std::move(server_mapping)
} );
}
else {
fetched_server_mapping = itr->second.get();
}

fetched_server_id = server_id;
}

HostGroup_Server_Mapping::Node node;
//node.server_status = static_cast<MySerStatus>(atoi(r->fields[3]));
node.reader_hostgroup_id = atoi(r->fields[4]);
node.writer_hostgroup_id = atoi(r->fields[5]);
node.srv = reinterpret_cast<MySrvC*>(atoll(r->fields[6]));

HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER;
fetched_server_mapping->add(type, node);
}
}
delete resultset;

hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS];
rebuild_hostname_hostgroup_mapping();
}

ev_async_send(gtid_ev_loop, gtid_ev_async);
Expand Down Expand Up @@ -7977,3 +7919,120 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv)
srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD;
srv->ConnectionsFree->drop_all_connections();
}

/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'servers_to_add' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param servers_to_add A vector containing the strings representing the hostnames of servers to add to 'mysql_servers'.
* @param hostname_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct.
*
* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success.
*/
int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<string> servers_to_add, unordered_map<string, MySQL_HostGroups_Manager::serverDetails> hostname_values_mapping) {
int exit_code = EXIT_SUCCESS;
wrlock();

try {
for (string host : servers_to_add) {
long int hostgroup_id = hostname_values_mapping[host].hostgroup_id;
uint16_t port = hostname_values_mapping[host].port;

uint16_t gtid_port = hostname_values_mapping[host].gtid_port;
int64_t weight = hostname_values_mapping[host].weight;
unsigned int compression = hostname_values_mapping[host].compression;
int64_t max_connections = hostname_values_mapping[host].max_connections;
unsigned int max_replication_lag = hostname_values_mapping[host].max_replication_lag;
int32_t use_ssl = hostname_values_mapping[host].use_ssl;
unsigned int max_latency_ms = hostname_values_mapping[host].max_latency_ms;

MySrvC* mysrvc = new MySrvC(
const_cast<char*>(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE,
compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, const_cast<char*>("Discovered endpoint")
);
add(mysrvc, hostgroup_id);

proxy_info(
"Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n",
host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl
);
}

purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
update_table_mysql_servers_for_monitor(false);
rebuild_hostname_hostgroup_mapping();

} catch (...) {
exit_code = EXIT_FAILURE;
}

wrunlock();
return exit_code;
}

/**
* @brief Rebuilds the 'hostname_hostgroup_mapping'
* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered
* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'.
*/
void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() {
proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n",
hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS],
hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]);

char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;

const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \
UNION \
SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \
ORDER BY hostname, port";

mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);

hostgroup_server_mapping.clear();

if (resultset && resultset->rows_count) {
std::string fetched_server_id;
HostGroup_Server_Mapping* fetched_server_mapping = NULL;

for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) {
SQLite3_row *r = *it;

const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1];

if (fetched_server_mapping == NULL || server_id != fetched_server_id) {
auto itr = hostgroup_server_mapping.find(server_id);

if (itr == hostgroup_server_mapping.end()) {
std::unique_ptr<HostGroup_Server_Mapping> server_mapping(new HostGroup_Server_Mapping(this));
fetched_server_mapping = server_mapping.get();
hostgroup_server_mapping.insert( std::pair<std::string,std::unique_ptr<MySQL_HostGroups_Manager::HostGroup_Server_Mapping>> {
server_id, std::move(server_mapping)
} );
} else {
fetched_server_mapping = itr->second.get();
}

fetched_server_id = server_id;
}

HostGroup_Server_Mapping::Node node;
node.reader_hostgroup_id = atoi(r->fields[4]);
node.writer_hostgroup_id = atoi(r->fields[5]);
node.srv = reinterpret_cast<MySrvC*>(atoll(r->fields[6]));

HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER;
fetched_server_mapping->add(type, node);
}
}
delete resultset;

hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS];
}
Loading

0 comments on commit 5a30377

Please sign in to comment.