diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 1f5c1d8dcc..ea75310aa6 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -814,6 +814,24 @@ class MySQL_HostGroups_Manager { void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); unsigned long long Get_Memory_Stats(); + struct serverDetails { + long int hostgroup_id; + string hostname; + uint16_t port; + uint16_t gtid_port; + string status; + int64_t weight; + unsigned int compression; + int64_t max_connections; + unsigned int max_replication_lag; + int32_t use_ssl; + unsigned int max_latency_ms; + string comment; + }; + + int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping); + void rebuild_hostname_hostgroup_mapping(); + void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 25388264f3..1d5eb3bc82 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -56,6 +56,8 @@ struct cmp_str { #define N_L_ASE 16 +#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com" + /* Implementation of monitoring in AWS Aurora will be different than previous modules @@ -418,6 +420,8 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); + vector discover_topology(const char* hostname, int port); + void discover_topology_and_add_to_mysql_servers(); private: std::vector *tables_defs_monitor; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index e1915a549a..20e39a504d 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -305,6 +305,7 @@ struct p_th_gauge { mysql_monitor_ping_interval, mysql_monitor_ping_timeout, mysql_monitor_ping_max_failures, + mysql_monitor_topology_discovery_interval, mysql_monitor_read_only_interval, mysql_monitor_read_only_timeout, mysql_monitor_writer_is_also_reader, @@ -386,6 +387,8 @@ class MySQL_Threads_Handler int monitor_ping_max_failures; //! Monitor ping timeout. Unit: 'ms'. int monitor_ping_timeout; + //! Monitor topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. + int monitor_topology_discovery_interval; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_interval; //! Monitor read only timeout. Unit: 'ms'. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index d2abf393ea..56e55682c1 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -899,6 +899,7 @@ __thread int mysql_thread___monitor_connect_timeout; __thread int mysql_thread___monitor_ping_interval; __thread int mysql_thread___monitor_ping_max_failures; __thread int mysql_thread___monitor_ping_timeout; +__thread int mysql_thread___monitor_topology_discovery_interval; __thread int mysql_thread___monitor_read_only_interval; __thread int mysql_thread___monitor_read_only_timeout; __thread int mysql_thread___monitor_read_only_max_timeout_count; @@ -1068,6 +1069,7 @@ extern __thread int mysql_thread___monitor_connect_timeout; extern __thread int mysql_thread___monitor_ping_interval; extern __thread int mysql_thread___monitor_ping_max_failures; extern __thread int mysql_thread___monitor_ping_timeout; +extern __thread int mysql_thread___monitor_topology_discovery_interval; extern __thread int mysql_thread___monitor_read_only_interval; extern __thread int mysql_thread___monitor_read_only_timeout; extern __thread int mysql_thread___monitor_read_only_max_timeout_count; diff --git a/include/proxysql_utils.h b/include/proxysql_utils.h index ffe3fe1bba..2984648c38 100644 --- a/include/proxysql_utils.h +++ b/include/proxysql_utils.h @@ -218,4 +218,6 @@ void close_all_non_term_fd(std::vector excludeFDs); */ std::pair get_dollar_quote_error(const char* version); +long parseLong(const char* s); + #endif diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 80f3608271..90e8994437 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -1969,65 +1969,7 @@ bool MySQL_HostGroups_Manager::commit( if (hgsm_mysql_servers_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] || hgsm_mysql_replication_hostgroups_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]) { - proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n", - hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS], - hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]); - - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; - - const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \ - UNION \ - SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \ - ORDER BY hostname, port"; - - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); - - hostgroup_server_mapping.clear(); - - if (resultset && resultset->rows_count) { - std::string fetched_server_id; - HostGroup_Server_Mapping* fetched_server_mapping = NULL; - - for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { - SQLite3_row* r = *it; - - const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1]; - - if (fetched_server_mapping == NULL || server_id != fetched_server_id) { - - auto itr = hostgroup_server_mapping.find(server_id); - - if (itr == hostgroup_server_mapping.end()) { - std::unique_ptr server_mapping(new HostGroup_Server_Mapping(this)); - fetched_server_mapping = server_mapping.get(); - hostgroup_server_mapping.insert( std::pair> { - server_id, std::move(server_mapping) - } ); - } - else { - fetched_server_mapping = itr->second.get(); - } - - fetched_server_id = server_id; - } - - HostGroup_Server_Mapping::Node node; - //node.server_status = static_cast(atoi(r->fields[3])); - node.reader_hostgroup_id = atoi(r->fields[4]); - node.writer_hostgroup_id = atoi(r->fields[5]); - node.srv = reinterpret_cast(atoll(r->fields[6])); - - HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER; - fetched_server_mapping->add(type, node); - } - } - delete resultset; - - hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; - hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]; + rebuild_hostname_hostgroup_mapping(); } ev_async_send(gtid_ev_loop, gtid_ev_async); @@ -7977,3 +7919,120 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; srv->ConnectionsFree->drop_all_connections(); } + +/** +* @brief Updates replication hostgroups by adding autodiscovered mysql servers. +* @details Adds each server from 'servers_to_add' to the 'runtime_mysql_servers' table. +* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. +* @param servers_to_add A vector containing the strings representing the hostnames of servers to add to 'mysql_servers'. +* @param hostname_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. +* +* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success. +*/ +int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping) { + int exit_code = EXIT_SUCCESS; + wrlock(); + + try { + for (string host : servers_to_add) { + long int hostgroup_id = hostname_values_mapping[host].hostgroup_id; + uint16_t port = hostname_values_mapping[host].port; + + uint16_t gtid_port = hostname_values_mapping[host].gtid_port; + int64_t weight = hostname_values_mapping[host].weight; + unsigned int compression = hostname_values_mapping[host].compression; + int64_t max_connections = hostname_values_mapping[host].max_connections; + unsigned int max_replication_lag = hostname_values_mapping[host].max_replication_lag; + int32_t use_ssl = hostname_values_mapping[host].use_ssl; + unsigned int max_latency_ms = hostname_values_mapping[host].max_latency_ms; + + MySrvC* mysrvc = new MySrvC( + const_cast(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE, + compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, const_cast("Discovered endpoint") + ); + add(mysrvc, hostgroup_id); + + proxy_info( + "Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", + host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl + ); + } + + purge_mysql_servers_table(); + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + generate_mysql_servers_table(); + update_table_mysql_servers_for_monitor(false); + rebuild_hostname_hostgroup_mapping(); + + } catch (...) { + exit_code = EXIT_FAILURE; + } + + wrunlock(); + return exit_code; +} + +/** +* @brief Rebuilds the 'hostname_hostgroup_mapping' +* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered +* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'. +*/ +void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() { + proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n", + hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS], + hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]); + + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + + const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \ + UNION \ + SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \ + ORDER BY hostname, port"; + + mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + + hostgroup_server_mapping.clear(); + + if (resultset && resultset->rows_count) { + std::string fetched_server_id; + HostGroup_Server_Mapping* fetched_server_mapping = NULL; + + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) { + SQLite3_row *r = *it; + + const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1]; + + if (fetched_server_mapping == NULL || server_id != fetched_server_id) { + auto itr = hostgroup_server_mapping.find(server_id); + + if (itr == hostgroup_server_mapping.end()) { + std::unique_ptr server_mapping(new HostGroup_Server_Mapping(this)); + fetched_server_mapping = server_mapping.get(); + hostgroup_server_mapping.insert( std::pair> { + server_id, std::move(server_mapping) + } ); + } else { + fetched_server_mapping = itr->second.get(); + } + + fetched_server_id = server_id; + } + + HostGroup_Server_Mapping::Node node; + node.reader_hostgroup_id = atoi(r->fields[4]); + node.writer_hostgroup_id = atoi(r->fields[5]); + node.srv = reinterpret_cast(atoll(r->fields[6])); + + HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER; + fetched_server_mapping->add(type, node); + } + } + delete resultset; + + hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; + hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]; +} \ No newline at end of file diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 2ec9822433..536d312dbd 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3256,6 +3256,240 @@ VALGRIND_ENABLE_ERROR_REPORTING; return ret; } + +/** +* @brief Discovers the topology of a server. +* @details Discovers the topology of the server specified by hostname and port. +* The monitor user must explicitly be granted permissions to view 'mysql.rds_topology'. +* @param hostname Hostname of the server. +* @param port Server port. +* +* @return Returns a vector of 'MYSQL_ROW' objects which contain the discovered servers. +*/ +vector MySQL_Monitor::discover_topology(const char* hostname, int port) { + std::unique_ptr mmsd(new MySQL_Monitor_State_Data(MON_CONNECT, const_cast (hostname), port)); + mmsd->mondb = monitordb; + mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); + + unsigned long long start_time = monotonic_time(); + mmsd->t1=start_time; + + bool read_only_success = false; + bool crc = false; + if (mmsd->mysql == NULL) { // we don't have a connection, let's create it + bool rc; + rc = mmsd->create_new_connection(); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd.get()); + } + crc = true; + if (rc == false) { + unsigned long long now = monotonic_time(); + char *new_error = (char *) malloc(50 + strlen(mmsd->mysql_error_msg)); + snprintf(new_error, sizeof(mmsd->mysql_error_msg), "timeout on creating new connection: %s", mmsd->mysql_error_msg); + free(mmsd->mysql_error_msg); + mmsd->mysql_error_msg = new_error; + proxy_error("Timeout on discover_topology check for %s:%d after %lldms. Unable to create a connection. If the server is overload, increase mysql-monitor_connect_timeout. Error: %s.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000, new_error); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT); + goto __exit_monitor_discover_topology; + } + } + + mmsd->interr = 0; // reset the value + mmsd->async_exit_status = mysql_query_start(&mmsd->interr,mmsd->mysql, "SELECT * from mysql.rds_topology"); + while (mmsd->async_exit_status) { + const unsigned long long now = monotonic_time(); + mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + + if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { + mmsd->mysql_error_msg = strdup("timeout check"); + proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); + goto __exit_monitor_discover_topology; + } + + if (mmsd->interr) { + // error during query + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + goto __exit_monitor_discover_topology; + } + + if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { + mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); + } + } + + if (mmsd->interr) { + // error during query + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + goto __exit_monitor_discover_topology; + } + + mmsd->async_exit_status = mysql_store_result_start(&mmsd->result,mmsd->mysql); + while (mmsd->async_exit_status && ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0)) { + mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + const unsigned long long now = monotonic_time(); + + if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { + mmsd->mysql_error_msg = strdup("timeout check"); + proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); + goto __exit_monitor_discover_topology; + } + + if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { + mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status); + } + } + + if (mmsd->interr) { // ping failed + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + } + +__exit_monitor_discover_topology: + if (mmsd->mysql) { + // if we reached here we didn't put the connection back + if (mmsd->mysql_error_msg) { + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); // if we reached here we should destroy it + mmsd->mysql = NULL; + } else { + if (crc) { + bool rc = mmsd->set_wait_timeout(); + if (rc) { + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); + } else { + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); // set_wait_timeout failed + } + mmsd->mysql = NULL; + } else { // really not sure how we reached here, drop it + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); + mmsd->mysql = NULL; + } + } + } + + // Process the output of the query, if any + vector discovered_rows; + if (mmsd->result) { + MYSQL_FIELD *fields = mysql_fetch_fields(mmsd->result); + int num_fields = mysql_num_fields(mmsd->result); + + for (int i = 0; i < num_fields; i++) { + MYSQL_ROW curr_row = mysql_fetch_row(mmsd->result); + string discovered_hostname = curr_row[1]; + string discovered_port = curr_row[2]; + + if (strcmp(hostname, curr_row[1]) != 0) { + discovered_rows.push_back(curr_row); + } + } + + mysql_free_result(mmsd->result); + mmsd->result = NULL; + } else { + proxy_info("Unable to query for topology.\n"); + } + + return discovered_rows; +} + +/** +* @brief Discovers the topology of a server and adds the discovered servers to 'mysql_servers'. +* @details Helper method which calls the 'discover_topology' method as well as +* 'MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups' in order to discover topology +* and then add it to 'mysql_servers'. +*/ +void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() { + char *error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result *runtime_mysql_servers = NULL; + + char *query=(char *)"SELECT hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.runtime_mysql_servers ORDER BY hostgroup_id, hostname, port"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &runtime_mysql_servers); + + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + set existing_servers; + vector servers_to_add; + unordered_map hostname_values_mapping; + + // Do an initial loop through query results to keep track of existing server hostnames + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + + string current_hostname = r1->fields[1]; + if (std::find(existing_servers.begin(), existing_servers.end(), current_hostname) == existing_servers.end()) { + existing_servers.insert(current_hostname); + } + } + + // Discover topology for each server in runtime_mysql_servers that have an aws endpoint + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + + long int hostgroup = parseLong(r1->fields[0]); + string current_hostname = r1->fields[1]; + long int port = parseLong(r1->fields[2]); + + if (current_hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { + vector discovered_servers = GloMyMon->discover_topology(current_hostname.c_str(), port); + + if (!discovered_servers.empty()) { + for (MYSQL_ROW s: discovered_servers) { + vector value_vector; + string discovered_id = s[0]; + string discovered_hostname = s[1]; + string discovered_port = s[2]; + + // Add discovered servers that don't already exist in runtime_mysql_servers + if (std::find(existing_servers.begin(), existing_servers.end(), discovered_hostname) == existing_servers.end()) { + servers_to_add.push_back(discovered_hostname); + + MySQL_HostGroups_Manager::serverDetails original_server_values = { + parseLong(r1->fields[0]), // hostgroup_id + r1->fields[1], // hostname + parseLong(discovered_port.c_str()), // port, use from topology discovery instead of from originating server + parseLong(r1->fields[3]), // gtid_port + r1->fields[4], // status, but not using it + parseLong(r1->fields[5]), // weight + parseLong(r1->fields[6]), // compression + parseLong(r1->fields[7]), // max_connections + parseLong(r1->fields[8]), // max_replication_lag + parseLong(r1->fields[9]), // use_ssl + parseLong(r1->fields[10]), // max_latency_ms + r1->fields[11] // comment, but not using it + }; + + hostname_values_mapping[discovered_hostname] = original_server_values; + } + } + } + } + } + if (!servers_to_add.empty()) { + int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(servers_to_add, hostname_values_mapping); + + if (successfully_added_all_servers == EXIT_FAILURE) { + proxy_info("Inserting auto-discovered servers failed.\n"); + } else { + proxy_info("Inserting auto-discovered servers succeeded.\n"); + } + } + } +} + void * MySQL_Monitor::monitor_read_only() { mysql_close(mysql_init(NULL)); // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) @@ -3269,6 +3503,8 @@ void * MySQL_Monitor::monitor_read_only() { unsigned long long t1; unsigned long long t2; unsigned long long next_loop_at=0; + int topology_loop = 0; + int topology_loop_max = mysql_thread___monitor_topology_discovery_interval; while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) { @@ -3287,6 +3523,18 @@ void * MySQL_Monitor::monitor_read_only() { next_loop_at=0; } + if (topology_loop >= topology_loop_max) { + try { + discover_topology_and_add_to_mysql_servers(); + topology_loop = 0; + } catch (std::runtime_error &e) { + proxy_error("Error during topology auto-discovery: %s\n", e.what()); + } catch (...) { + proxy_error("Unknown error during topology auto-discovery.\n"); + } + } + topology_loop += 1; + if (t1 < next_loop_at) { goto __sleep_monitor_read_only; } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 945ebe332f..1efbbfb191 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -307,6 +307,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_ping_interval", (char *)"monitor_ping_max_failures", (char *)"monitor_ping_timeout", + (char *)"monitor_topology_discovery_interval", (char *)"monitor_read_only_interval", (char *)"monitor_read_only_timeout", (char *)"monitor_read_only_max_timeout_count", @@ -822,6 +823,12 @@ th_metrics_map = std::make_tuple( "Reached maximum ping attempts from monitor.", metric_tags {} ), + std::make_tuple ( + p_th_gauge::mysql_monitor_topology_discovery_interval, + "proxysql_mysql_monitor_topology_discovery_interval", + "How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ", + metric_tags {} + ), std::make_tuple ( p_th_gauge::mysql_monitor_read_only_interval, "proxysql_mysql_monitor_read_only_interval_seconds", @@ -912,6 +919,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_ping_interval=8000; variables.monitor_ping_max_failures=3; variables.monitor_ping_timeout=1000; + variables.monitor_topology_discovery_interval=1000; variables.monitor_read_only_interval=1000; variables.monitor_read_only_timeout=800; variables.monitor_read_only_max_timeout_count=3; @@ -2021,6 +2029,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false); + VariablesPointers_int["monitor_topology_discovery_interval"] = make_tuple(&variables.monitor_topology_discovery_interval, 1, 100000, false); VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false); @@ -3922,6 +3931,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval"); mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures"); mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout"); + mysql_thread___monitor_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_topology_discovery_interval"); mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval"); mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout"); mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count"); @@ -5201,6 +5211,7 @@ void MySQL_Threads_Handler::p_update_metrics() { this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures); + this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_topology_discovery_interval]->Set(this->variables.monitor_topology_discovery_interval); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader); diff --git a/lib/proxysql_utils.cpp b/lib/proxysql_utils.cpp index 5475afb459..965858c1d6 100644 --- a/lib/proxysql_utils.cpp +++ b/lib/proxysql_utils.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -451,3 +452,22 @@ std::pair get_dollar_quote_error(const char* version) { } } } + +/** +* @brief Parses a string into a long. +* @details Parses a string into a long, with error checks. Throws an exception if parse fails. +* @param s The string to parse. +* +* @return The parsed value of the string as a long. +*/ +long parseLong(const char* s) { + errno = 0; + char *temp; + long val = strtol(s, &temp, 0); + + if (temp == s || *temp != '\0' || ((val == LONG_MIN || val == LONG_MAX) && errno == ERANGE)) { + throw std::runtime_error("Could not parse long."); + } + + return val; +}