Skip to content

Commit

Permalink
Update discovery logic to add discovered servers with default values …
Browse files Browse the repository at this point in the history
…instead of originating server's values, add new field in mmsd for reader hostgroup, and query monitor db instead of admin db
  • Loading branch information
anphucbui committed Mar 4, 2024
1 parent 0bcddd0 commit d04173b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 86 deletions.
17 changes: 1 addition & 16 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1019,22 +1019,7 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();

struct serverDetails {
long int hostgroup_id;
string originating_hostname;
uint16_t port;
uint16_t gtid_port;
string status;
int64_t weight;
unsigned int compression;
int64_t max_connections;
unsigned int max_replication_lag;
int32_t use_ssl;
unsigned int max_latency_ms;
string comment;
};

int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map<string, MySQL_HostGroups_Manager::serverDetails> new_server_values_mapping);
int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>> new_servers);
void rebuild_hostname_hostgroup_mapping();

void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
Expand Down
3 changes: 2 additions & 1 deletion include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class MySQL_Monitor_State_Data {
char *hostname;
int port;
int writer_hostgroup; // used only by group replication
int reader_hostgroup;
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
int max_transactions_behind_count; // used only by group replication
Expand Down Expand Up @@ -446,7 +447,7 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();

void process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers);
void process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers, int reader_hostgroup);

private:
std::vector<table_def_t *> *tables_defs_monitor;
Expand Down
32 changes: 9 additions & 23 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8168,40 +8168,26 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv)

/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'new_server_values_mapping' to the 'runtime_mysql_servers' table.
* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param new_server_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct.
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*
* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success.
*/
int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map<string, MySQL_HostGroups_Manager::serverDetails> new_server_values_mapping) {
int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>> new_servers) {
int exit_code = EXIT_SUCCESS;
bool added_new_server = false;
wrlock();

try {
for (const auto &s : new_server_values_mapping) {
if (new_server_values_mapping.find(s.first) == new_server_values_mapping.end()) {
continue;
}

string host = s.first;
MySQL_HostGroups_Manager::serverDetails new_server_values = new_server_values_mapping[host];

long int hostgroup_id = new_server_values.hostgroup_id;
uint16_t port = new_server_values.port;

uint16_t gtid_port = new_server_values.gtid_port;
int64_t weight = new_server_values.weight;
unsigned int compression = new_server_values.compression;
int64_t max_connections = new_server_values.max_connections;
unsigned int max_replication_lag = new_server_values.max_replication_lag;
int32_t use_ssl = new_server_values.use_ssl;
unsigned int max_latency_ms = new_server_values.max_latency_ms;
for (tuple<string, int, int> s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);

// Add the discovered server with default values
MySrvC* mysrvc = new MySrvC(
const_cast<char*>(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE,
compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, const_cast<char*>("Discovered endpoint")
const_cast<char*>(host.c_str()), port, 0, 1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast<char*>("Discovered endpoint")
);
add(mysrvc, hostgroup_id);

Expand Down
67 changes: 21 additions & 46 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3290,77 +3290,51 @@ VALGRIND_ENABLE_ERROR_REPORTING;

/**
* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'.
* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers' with the
* values from their originating server.
* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers'.
* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found.
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers) {
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers, int reader_hostgroup) {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;

char *query=(char *)"SELECT hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.runtime_mysql_servers ORDER BY hostgroup_id, hostname, port";
char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &runtime_mysql_servers);
monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers);

if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
set<string> existing_runtime_servers;
unordered_map<string, MySQL_HostGroups_Manager::serverDetails> new_server_values_mapping;
vector<tuple<string, int, int>> new_servers;
vector<string> saved_hostnames;
saved_hostnames.push_back(originating_server_hostname);

// Do an initial loop through the query results to keep track of existing runtime server hostnames
// Do an initial loop through the query results to save existing runtime server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_runtime_hostname = r1->fields[0];

string current_runtime_hostname = r1->fields[1];
if (std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_runtime_hostname) == existing_runtime_servers.end()) {
existing_runtime_servers.insert(current_runtime_hostname);
}
saved_hostnames.push_back(current_runtime_hostname);
}

// Loop through discovered servers and process the ones we plan to add
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW s : discovered_servers) {
string current_discovered_id = s[1];
string current_discovered_hostname = s[2];
string current_discovered_port = s[3];

// We only add the discovered server if it is not the originating server and it does not already exist in 'runtime_mysql_servers' and it is not already saved to be added
bool already_exists = std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_discovered_hostname) != existing_runtime_servers.end();
bool already_saved = new_server_values_mapping.find(current_discovered_hostname) != new_server_values_mapping.end();
if (current_discovered_hostname != originating_server_hostname && !already_exists && !already_saved) {
// Search for the originating server's values and store it
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;

string current_runtime_hostname = r1->fields[1];
if (current_runtime_hostname == originating_server_hostname) {
MySQL_HostGroups_Manager::serverDetails originating_server_values;

originating_server_values.hostgroup_id = parseLong(r1->fields[0]);
originating_server_values.originating_hostname = current_runtime_hostname;
originating_server_values.port = parseLong(current_discovered_port.c_str());
originating_server_values.gtid_port = parseLong(r1->fields[3]);
originating_server_values.status = r1->fields[4]; // not used
originating_server_values.weight = parseLong(r1->fields[5]);
originating_server_values.compression = parseLong(r1->fields[6]);
originating_server_values.max_connections = parseLong(r1->fields[7]);
originating_server_values.max_replication_lag = parseLong(r1->fields[8]);
originating_server_values.use_ssl = parseLong(r1->fields[9]);
originating_server_values.max_latency_ms = parseLong(r1->fields[10]);
originating_server_values.comment = r1->fields[11]; // not used

new_server_values_mapping[current_discovered_hostname] = originating_server_values;
}
}
if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) {
tuple<string, int, int> new_server(current_discovered_hostname, parseLong(current_discovered_port.c_str()), reader_hostgroup);
new_servers.push_back(new_server);
saved_hostnames.push_back(current_discovered_hostname);
}
}

// Add the new servers if any
if (!new_server_values_mapping.empty()) {
int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_server_values_mapping);
if (!new_servers.empty()) {
int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);

if (successfully_added_all_servers == EXIT_FAILURE) {
proxy_info("Inserting auto-discovered servers failed.\n");
Expand Down Expand Up @@ -3394,7 +3368,7 @@ void * MySQL_Monitor::monitor_read_only() {
char *error=NULL;
SQLite3_result *resultset=NULL;
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
t1=monotonic_time();

if (!GloMTH) return NULL; // quick exit during shutdown/restart
Expand Down Expand Up @@ -7399,7 +7373,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
// Process the discovered servers and add them to 'runtime_mysql_servers'
if (!discovered_servers.empty()) {
try {
process_discovered_topology(originating_server_hostname, discovered_servers);
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
} catch (std::runtime_error &e) {
proxy_error("Error during topology auto-discovery: %s\n", e.what());
} catch (...) {
Expand Down Expand Up @@ -7500,6 +7474,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d
std::unique_ptr<MySQL_Monitor_State_Data> mmsd(
new MySQL_Monitor_State_Data(task_type, r->fields[0], atoi(r->fields[1]), atoi(r->fields[2])));

mmsd->reader_hostgroup = atoi(r->fields[4]); // set reader_hostgroup
mmsd->mondb = monitordb;
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());

Expand Down

0 comments on commit d04173b

Please sign in to comment.