diff --git a/Makefile b/Makefile index 4974ff03a8..85e8d3de4b 100644 --- a/Makefile +++ b/Makefile @@ -74,10 +74,13 @@ default: build_src .PHONY: debug debug: build_src_debug +.PHONY: testaurora_random +testaurora_random: build_src_testaurora_random + .PHONY: testaurora testaurora: build_src_testaurora - cd test/tap && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA" CC=${CC} CXX=${CXX} ${MAKE} - cd test/tap/tests && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA" CC=${CC} CXX=${CXX} ${MAKE} $(MAKECMDGOALS) + # cd test/tap && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA" CC=${CC} CXX=${CXX} ${MAKE} + # cd test/tap/tests && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA" CC=${CC} CXX=${CXX} ${MAKE} $(MAKECMDGOALS) .PHONY: testgalera testgalera: build_src_testgalera @@ -128,10 +131,18 @@ build_lib_debug: build_deps_debug build_src_testaurora: build_lib_testaurora cd src && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA" CC=${CC} CXX=${CXX} ${MAKE} +.PHONY: build_src_testaurora_random +build_src_testaurora_random: build_lib_testaurora_random + cd src && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA -DTEST_AURORA_RANDOM" CC=${CC} CXX=${CXX} ${MAKE} + .PHONY: build_lib_testaurora build_lib_testaurora: build_deps_debug cd lib && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA" CC=${CC} CXX=${CXX} ${MAKE} +.PHONY: build_lib_testaurora_random +build_lib_testaurora_random: build_deps_debug + cd lib && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA -DTEST_AURORA_RANDOM" CC=${CC} CXX=${CXX} ${MAKE} + .PHONY: build_src_testgalera build_src_testgalera: build_lib_testgalera cd src && OPTZ="${O0} -ggdb -DDEBUG -DTEST_GALERA" CC=${CC} CXX=${CXX} ${MAKE} diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 675cf090e2..bb8adaeedf 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -432,6 +432,27 @@ enum REPLICATION_LAG_SERVER_T { RLS__SIZE }; +/** + * @brief Contains the minimal info for server creation. + */ +struct srv_info_t { + /* @brief Server address */ + string addr; + /* @brief Server port */ + uint16_t port; + /* @brief Server type identifier, used for logging, e.g: 'Aurora AWS', 'GR', etc... */ + string kind; +}; + +/** + * @brief Contains options to be specified during server creation. + */ +struct srv_opts_t { + int64_t weigth; + int64_t max_conns; + int32_t use_ssl; +}; + class MySQL_HostGroups_Manager { private: SQLite3DB *admindb; @@ -534,8 +555,26 @@ class MySQL_HostGroups_Manager { MySQL_HostGroups_Manager* myHGM; }; + /** + * @brief Used by 'MySQL_Monitor::read_only' to hold a mapping between servers and hostgroups. + * @details The hostgroup mapping holds the MySrvC for each of the hostgroups in which the servers is + * present, distinguishing between 'READER' and 'WRITER' hostgroups. + */ std::unordered_map> hostgroup_server_mapping; + /** + * @brief Holds the previous computed checksum for 'mysql_servers'. + * @details Used to check if the servers checksums has changed during 'commit', if a change is detected, + * the member 'hostgroup_server_mapping' is required to be regenerated. + * + * This is only updated during 'read_only_action_v2', since the action itself modifies + * 'hostgroup_server_mapping' in case any actions needs to be performed against the servers. + */ uint64_t hgsm_mysql_servers_checksum = 0; + /** + * @brief Holds the previous checksum for the 'MYSQL_REPLICATION_HOSTGROUPS'. + * @details Used during 'commit' to determine if config has changed for 'MYSQL_REPLICATION_HOSTGROUPS', + * and 'hostgroup_server_mapping' should be rebuild. + */ uint64_t hgsm_mysql_replication_hostgroups_checksum = 0; @@ -585,6 +624,17 @@ class MySQL_HostGroups_Manager { SQLite3_result *incoming_replication_hostgroups; void generate_mysql_group_replication_hostgroups_table(); + /** + * @brief Regenerates the resultset used by 'MySQL_Monitor' containing the servers to be monitored. + * @details This function is required to be called after any action that results in the addition of a new + * server that 'MySQL_Monitor' should be aware of for 'group_replication', i.e. a server added to the + * hostgroups present in any entry of 'mysql_group_replication_hostgroups'. E.g: + * - Inside 'generate_mysql_group_replication_hostgroups_table'. + * - Autodiscovery. + * + * NOTE: This is a common pattern for all the clusters monitoring. + */ + void generate_mysql_group_replication_hostgroups_monitor_resultset(); SQLite3_result *incoming_group_replication_hostgroups; pthread_mutex_t Group_Replication_Info_mutex; @@ -748,7 +798,10 @@ class MySQL_HostGroups_Manager { void wrlock(); void wrunlock(); int servers_add(SQLite3_result *resultset); + std::string gen_global_mysql_servers_checksum(); bool commit(SQLite3_result* runtime_mysql_servers = nullptr, const std::string& checksum = "", const time_t epoch = 0); + void commit_generate_mysql_servers_table(SQLite3_result* runtime_mysql_servers = nullptr); + void commit_update_checksum_from_mysql_servers(SpookyHash& myhash, bool& init); void commit_update_checksums_from_tables(SpookyHash& myhash, bool& init); void CUCFT1(SpookyHash& myhash, bool& init, const string& TableName, const string& ColumnName, uint64_t& raw_checksum); // used by commit_update_checksums_from_tables() @@ -787,6 +840,34 @@ class MySQL_HostGroups_Manager { MyHGC * MyHGC_lookup(unsigned int); void MyConn_add_to_pool(MySQL_Connection *); + /** + * @brief Creates a new server in the target hostgroup if isn't already present. + * @details If the server is found already in the target hostgroup, no action is taken, unless its status + * is 'OFFLINE_HARD'. In case of finding it as 'OFFLINE_HARD': + * 1. Server hostgroup attributes are reset to known values, so they can be updated. + * 2. Server attributes are updated to either table definition values, or hostgroup 'servers_defaults'. + * 3. Server is bring back as 'ONLINE'. + * @param hid The hostgroup in which the server is to be created (or to bring it back as 'ONLINE'). + * @param srv_info Basic server info to be used during creation. + * @param srv_opts Server creation options. + * @return 0 in case of success, -1 in case of failure. + */ + int create_new_server_in_hg(uint32_t hid, const srv_info_t& srv_info, const srv_opts_t& srv_opts); + /** + * @brief Completely removes server from the target hostgroup if found. + * @details Several actions are taken if server is found: + * - Set the server as 'OFFLINE_HARD'. + * - Drop all current FREE connections to the server. + * - Delete the server from the 'myhgm.mysql_servers' table. + * + * This later step is not required if the caller is already going to perform a full deletion of the + * servers in the target hostgroup. Which is a common operation during table regeneration. + * @param hid Target hostgroup id. + * @param addr Target server address. + * @param port Target server port. + * @return 0 in case of success, -1 in case of failure. + */ + int remove_server_in_hg(uint32_t hid, const string& addr, uint16_t port); MySQL_Connection * get_MyConn_from_pool(unsigned int hid, MySQL_Session *sess, bool ff, char * gtid_uuid, uint64_t gtid_trxid, int max_lag_ms); @@ -812,6 +893,33 @@ class MySQL_HostGroups_Manager { void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); + /** + * @brief Tries to add a new server found during GR autodiscovery to the supplied hostgroup. + * @details For adding the new server, several actions are performed: + * 1. Lookup the target server in the corresponding MyHGC for the supplied hostgroup. + * 2. If server is found, and it's status isn't 'OFFLINE_HARD' do nothing. Otherwise: + * - If server is found as 'OFFLINE_HARD', reset the internal values corresponding to + * 'servers_defaults' values to '-1', update the defaulted values to the ones in its 'MyHGC', lastly + * re-enable the server and log the action. + * - If server isn't found, create it in the corresponding reader hostgroup of the supplied writer + * hostgroup, setting all 'servers_defaults' params as '-1', log the action. + * - After any of the two previous actions, always regenerate servers data structures. + * + * NOTE: Server data structures regeneration requires: + * 1. Purging the 'mysql_servers_table' (Lazy removal of 'OFFLINE_HARD' servers.) + * 2. Regenerate the actual 'myhgm::mysql_servers' table from memory structures. + * 3. Update the 'mysql_servers' resultset used for monitoring. This resultset is used for general + * monitoring actions like 'ping', 'connect'. + * 4. Regenerate the specific resultset for 'Group Replication' monitoring. This resultset is the way to + * communicate back to the main monitoring thread that servers config has changed, and a new thread + * shall be created with the new servers config. This same principle is used for Aurora. + * + * @param _host Server address. + * @param _port Server port. + * @param _wr_hg Writer hostgroup of the cluster being monitored. Autodiscovered servers are always added + * to the reader hostgroup by default, later monitoring actions will re-position the server is required. + */ + void update_group_replication_add_autodiscovered(const std::string& _host, int _port, int _wr_hg); void converge_group_replication_config(int _writer_hostgroup); /** * @brief Set the supplied server as SHUNNED, this function shall be called @@ -850,6 +958,19 @@ class MySQL_HostGroups_Manager { bool aws_aurora_replication_lag_action(int _whid, int _rhid, char *server_id, float current_replication_lag_ms, bool enable, bool is_writer, bool verbose=true); void update_aws_aurora_set_writer(int _whid, int _rhid, char *server_id, bool verbose=true); void update_aws_aurora_set_reader(int _whid, int _rhid, char *server_id); + /** + * @brief Updates the resultset and corresponding checksum used by Monitor for AWS Aurora. + * @details This is required to be called when: + * - The 'mysql_aws_aurora_hostgroups' table is regenerated (via 'commit'). + * - When new servers are discovered, and created in already monitored Aurora clusters. + * + * The resultset holds the servers that are present in 'mysql_servers' table, and share hostgroups with + * the **active** clusters specified in 'mysql_aws_aurora_hostgroups'. See query + * 'SELECT_AWS_AURORA_SERVERS_FOR_MONITOR'. + * @param lock Wether if both 'AWS_Aurora_Info_mutex' and 'MySQL_Monitor::aws_aurora_mutex' mutexes should + * be taken or not. + */ + void update_aws_aurora_hosts_monitor_resultset(bool lock=false); SQLite3_result * get_stats_mysql_gtid_executed(); void generate_mysql_gtid_executed_tables(); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 25388264f3..8454cf939e 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -70,6 +70,14 @@ A single AWS_Aurora_monitor_node will have a AWS_Aurora_status_entry per check. */ +#ifdef TEST_AURORA + +#define TEST_AURORA_MONITOR_BASE_QUERY \ + "SELECT SERVER_ID, SESSION_ID, LAST_UPDATE_TIMESTAMP, REPLICA_LAG_IN_MILLISECONDS, CPU"\ + " FROM REPLICA_HOST_STATUS ORDER BY SERVER_ID " + +#endif + class AWS_Aurora_replica_host_status_entry { public: char * server_id = NULL; @@ -200,6 +208,17 @@ enum class MySQL_Monitor_State_Data_Task_Result { TASK_RESULT_PENDING }; +/** + * @brief Holds the info from a GR server definition. + */ +struct gr_host_def_t { + string host; + int port; + int use_ssl; + bool writer_is_also_reader; + int max_transactions_behind; + int max_transactions_behind_count; +}; class MySQL_Monitor_State_Data { public: @@ -237,6 +256,11 @@ class MySQL_Monitor_State_Data { * @details Currently only used by 'group_replication'. */ uint64_t init_time = 0; + /** + * @brief Used by GroupReplication to determine if servers reported by cluster 'members' are already monitored. + * @details This way we avoid non-needed locking on 'MySQL_HostGroups_Manager' for server search. + */ + const std::vector* cur_monitored_gr_srvs = nullptr; MySQL_Monitor_State_Data(MySQL_Monitor_State_Data_Task_Type task_type, char* h, int p, bool _use_ssl = 0, int g = 0); ~MySQL_Monitor_State_Data(); diff --git a/include/SQLite3_Server.h b/include/SQLite3_Server.h index ce33a6b6ec..fe304c9a94 100644 --- a/include/SQLite3_Server.h +++ b/include/SQLite3_Server.h @@ -5,6 +5,7 @@ #include "proxysql.h" #include "cpp.h" #include +#include class SQLite3_Session { public: @@ -14,7 +15,7 @@ class SQLite3_Session { }; #ifdef TEST_GROUPREP -using group_rep_status = std::tuple; +using group_rep_status = std::tuple; #endif class SQLite3_Server { @@ -70,7 +71,12 @@ class SQLite3_Server { unsigned int num_aurora_servers[3]; unsigned int max_num_aurora_servers; pthread_mutex_t aurora_mutex; - void populate_aws_aurora_table(MySQL_Session *sess); + /** + * @brief Handles queries to table 'REPLICA_HOST_STATUS'. + * @details This function needs to be called with lock on mutex aurora_mutex already acquired. + * @param sess The session which request is to be handled. + */ + void populate_aws_aurora_table(MySQL_Session *sess, uint32_t whg); void init_aurora_ifaces_string(std::string& s); #endif // TEST_AURORA #ifdef TEST_GALERA diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 24a365294e..cf1f6981d1 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -544,6 +544,8 @@ class ProxySQL_Admin { #ifdef TEST_AURORA void enable_aurora_testing(); + void enable_aurora_testing_populate_mysql_servers(); + void enable_aurora_testing_populate_mysql_aurora_hostgroups(); #endif // TEST_AURORA #ifdef TEST_GALERA diff --git a/include/proxysql_glovars.hpp b/include/proxysql_glovars.hpp index fe23e756fd..e4ff075fe8 100644 --- a/include/proxysql_glovars.hpp +++ b/include/proxysql_glovars.hpp @@ -5,27 +5,17 @@ #define CLUSTER_SYNC_INTERFACES_MYSQL "('mysql-interfaces')" #include +#include #include #include "configfile.hpp" #include "proxy_defines.h" +#include "proxysql_utils.h" namespace ez { class ezOptionParser; }; -/** - * @brief Helper function used to replace spaces and zeros by '0' char in the supplied checksum buffer. - * @param checksum Input buffer containing the checksum. - */ -inline void replace_checksum_zeros(char* checksum) { - for (int i=2; i<18; i++) { - if (checksum[i]==' ' || checksum[i]==0) { - checksum[i]='0'; - } - } -} - #ifndef ProxySQL_Checksum_Value_LENGTH #define ProxySQL_Checksum_Value_LENGTH 20 #endif diff --git a/include/proxysql_utils.h b/include/proxysql_utils.h index eb510357a9..a474955a96 100644 --- a/include/proxysql_utils.h +++ b/include/proxysql_utils.h @@ -2,6 +2,7 @@ #define __PROXYSQL_UTILS_H #include +#include #include #include #include @@ -11,6 +12,12 @@ #include #include +#include "sqlite3db.h" + +#ifndef ProxySQL_Checksum_Value_LENGTH +#define ProxySQL_Checksum_Value_LENGTH 20 +#endif + #ifndef ETIME // ETIME is not defined on FreeBSD // ETIME is used internaly to report API timer expired @@ -206,8 +213,44 @@ uint64_t get_timestamp_us(); */ std::string replace_str(const std::string& str, const std::string& match, const std::string& repl); +/** + * @brief Split a string into a vector of strings with the provided 'char' delimiter. + * @param s String to be split. + * @param delimiter Delimiter to be used. + * @return Vector with the string splits. Empty if none is found. + */ +std::vector split_str(const std::string& s, char delimiter); std::string generate_multi_rows_query(int rows, int params); void close_all_non_term_fd(std::vector excludeFDs); + +/** + * @brief Helper function used to replace spaces and zeros by '0' char in the supplied checksum buffer. + * @param checksum Input buffer containing the checksum. + */ +inline void replace_checksum_zeros(char* checksum) { + for (int i=2; i<18; i++) { + if (checksum[i]==' ' || checksum[i]==0) { + checksum[i]='0'; + } + } +} + +/** + * @brief Generates a ProxySQL checksum as a string from the supplied integer hash. + * @param hash The integer hash to be formated as a string. + * @return String representation of the supplied hash. + */ +std::string get_checksum_from_hash(uint64_t hash); + +/** + * @brief Remove the rows from the resultset matching the supplied predicate. + * @param resultset The resultset which rows are to be removed. + * @param pred Predicate that should return 'true' for the rows to be removed. + */ +void remove_sqlite3_resultset_rows( + std::unique_ptr& resultset, const std::function& pred +); + #endif diff --git a/include/sqlite3db.h b/include/sqlite3db.h index b4d3c52f3b..977a8cc296 100644 --- a/include/sqlite3db.h +++ b/include/sqlite3db.h @@ -1,9 +1,11 @@ #ifndef __CLASS_SQLITE3DB_H #define __CLASS_SQLITE3DB_H +#include #include "sqlite3.h" #undef swap #undef min #undef max +#include #include #define PROXYSQL_SQLITE3DB_PTHREAD_MUTEX diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 22c13d6398..77172cf66c 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -1614,6 +1614,11 @@ void MySQL_HostGroups_Manager::CUCFT1(SpookyHash& myhash, bool& init, const stri } void MySQL_HostGroups_Manager::commit_update_checksums_from_tables(SpookyHash& myhash, bool& init) { + // Always reset the current table values before recomputing + for (size_t i = 0; i < table_resultset_checksum.size(); i++) { + if (i != HGM_TABLES::MYSQL_SERVERS) { table_resultset_checksum[i] = 0; } + } + CUCFT1(myhash,init,"mysql_replication_hostgroups","writer_hostgroup", table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]); CUCFT1(myhash,init,"mysql_group_replication_hostgroups","writer_hostgroup", table_resultset_checksum[HGM_TABLES::MYSQL_GROUP_REPLICATION_HOSTGROUPS]); CUCFT1(myhash,init,"mysql_galera_hostgroups","writer_hostgroup", table_resultset_checksum[HGM_TABLES::MYSQL_GALERA_HOSTGROUPS]); @@ -1621,6 +1626,177 @@ void MySQL_HostGroups_Manager::commit_update_checksums_from_tables(SpookyHash& m CUCFT1(myhash,init,"mysql_hostgroup_attributes","hostgroup_id", table_resultset_checksum[HGM_TABLES::MYSQL_HOSTGROUP_ATTRIBUTES]); } +const char MYSQL_SERVERS_CHECKSUM_QUERY[] { + "SELECT hostgroup_id, hostname, port, gtid_port, CASE WHEN status=0 OR status=1 OR status=4 THEN 0 ELSE status END status," + " weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers" + " WHERE status<>3 ORDER BY hostgroup_id, hostname, port" +}; + +/** + * @brief Generates a resultset which is used to compute the current 'mysql_servers' checksum. + * @details The resultset should report all servers status as ONLINE(0), with the exception of 'OFFLINE_HARD'. + * Servers with this status should be excluded from the resultset. + * @param mydb The db in which to perform the query, typically 'MySQL_HostGroups_Manager::mydb'. + * @return An SQLite3 resultset for the query 'MYSQL_SERVERS_CHECKSUM_QUERY'. + */ +unique_ptr get_mysql_servers_checksum_resultset(SQLite3DB* mydb) { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = nullptr; + + mydb->execute_statement(MYSQL_SERVERS_CHECKSUM_QUERY, &error, &cols, &affected_rows, &resultset); + + if (error) { + proxy_error("Checksum generation query for 'mysql_servers' failed with error '%s'\n", error); + assert(0); + } + + return unique_ptr(resultset); +} + +/** + * @brief Generates a resultset holding the current Admin 'runtime_mysql_servers' as reported by Admin. + * @param mydb The db in which to perform the query, typically 'MySQL_HostGroups_Manager::mydb'. + * @return An SQLite3 resultset for the query 'MYHGM_GEN_ADMIN_RUNTIME_SERVERS'. + */ +unique_ptr get_admin_runtime_mysql_servers(SQLite3DB* mydb) { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = nullptr; + + mydb->execute_statement(MYHGM_GEN_ADMIN_RUNTIME_SERVERS, &error, &cols, &affected_rows, &resultset); + + return unique_ptr(resultset); +} + +/** + * @brief Removes rows with 'OFFLINE_HARD' servers in the supplied resultset. + * @details It assumes that the supplied resultset is generated via 'MYHGM_GEN_ADMIN_RUNTIME_SERVERS'. + * @param resultset The resultset from which rows are to be removed. + */ +void remove_resultset_offline_hard_servers(unique_ptr& resultset) { + if (resultset->columns < 5) { + return; + } + + const auto is_offline = [] (SQLite3_row* row) { + if (strcasecmp(row->fields[4], "OFFLINE_HARD") == 0) { + return true; + } else { + return false; + } + }; + + remove_sqlite3_resultset_rows(resultset, is_offline); +} + +/** + * @brief Updates the global 'mysql_servers' checksum. + * @details If the new computed checksum matches the supplied 'cluster_checksum', the epoch used for the + * checksum, is the supplied epoch instead of current time. This way we ensure the preservation of the + * checksum and epoch fetched from the ProxySQL cluster peer node. + * + * @param new_checksum The new computed checksum by ProxySQL. + * @param old_checksum A checksum, previously fetched from ProxySQL cluster. Should be left empty if the + * update isn't considering this scenario. + * @param epoch The epoch to be preserved in case the supplied 'cluster_checksum' matches the new computed + * checksum. + */ +void update_glovars_mysql_servers_checksum( + const std::string& new_checksum, const std::string& cluster_checksum = "", const time_t epoch = 0 +) { + GloVars.checksums_values.mysql_servers.set_checksum(const_cast(new_checksum.c_str())); + GloVars.checksums_values.mysql_servers.version++; + + time_t t = time(NULL); + bool computed_checksum_matches { + cluster_checksum != "" && GloVars.checksums_values.mysql_servers.checksum == cluster_checksum + }; + + if (epoch != 0 && computed_checksum_matches) { + GloVars.checksums_values.mysql_servers.epoch = epoch; + } else { + GloVars.checksums_values.mysql_servers.epoch = t; + } + + GloVars.checksums_values.updates_cnt++; + GloVars.generate_global_checksum(); + GloVars.epoch_version = t; +} + +void MySQL_HostGroups_Manager::commit_generate_mysql_servers_table(SQLite3_result* runtime_mysql_servers) { + mydb->execute("DELETE FROM mysql_servers"); + generate_mysql_servers_table(); + + if (runtime_mysql_servers == nullptr) { + unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; + + // Remove 'OFFLINE_HARD' servers since they are not relevant to propagate to other Cluster nodes, or + // relevant for checksum computation. If this step isn't performed, this could cause mismatching + // checksums between different primary nodes in a ProxySQL cluster, since OFFLINE_HARD servers + // preservation depends on unknown and unpredictable connections conditions. + remove_resultset_offline_hard_servers(resultset); + save_runtime_mysql_servers(resultset.release()); + } else { + save_runtime_mysql_servers(runtime_mysql_servers); + } +} + +void MySQL_HostGroups_Manager::commit_update_checksum_from_mysql_servers(SpookyHash& myhash, bool& init) { + unique_ptr mysrvs_checksum_resultset { get_mysql_servers_checksum_resultset(mydb) }; + + // Reset table checksum value before recomputing + table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = 0; + + if (mysrvs_checksum_resultset) { + if (mysrvs_checksum_resultset->rows_count) { + if (init == false) { + init = true; + myhash.Init(19,3); + } + uint64_t hash1_ = mysrvs_checksum_resultset->raw_checksum(); + table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = hash1_; + + myhash.Update(&hash1_, sizeof(hash1_)); + proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", hash1_); + } + } else { + proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0); + } +} + +std::string MySQL_HostGroups_Manager::gen_global_mysql_servers_checksum() { + SpookyHash global_hash; + bool init = false; + + // Regenerate 'mysql_servers' and generate new checksum, initialize the new 'global_hash' + commit_update_checksum_from_mysql_servers(global_hash, init); + + // Complete the hash with the rest of the unchanged modules + for (size_t i = 0; i < table_resultset_checksum.size(); i++) { + uint64_t hash_val = table_resultset_checksum[i]; + + if (i != HGM_TABLES::MYSQL_SERVERS && hash_val != 0) { + if (init == false) { + init = true; + global_hash.Init(19, 3); + } + + global_hash.Update(&hash_val, sizeof(hash_val)); + } + } + + uint64_t hash_1 = 0, hash_2 = 0; + if (init) { + global_hash.Final(&hash_1,&hash_2); + } + + string mysrvs_checksum { get_checksum_from_hash(hash_1) }; + return mysrvs_checksum; +} + bool MySQL_HostGroups_Manager::commit( SQLite3_result* runtime_mysql_servers, const std::string& checksum, const time_t epoch ) { @@ -1873,94 +2049,24 @@ bool MySQL_HostGroups_Manager::commit( // Checksums are always generated - 'admin-checksum_*' deprecated { - uint64_t hash1 = 0, hash2 = 0; + commit_generate_mysql_servers_table(runtime_mysql_servers); + SpookyHash myhash; - char buf[ProxySQL_Checksum_Value_LENGTH]; bool init = false; - { - mydb->execute("DELETE FROM mysql_servers"); - generate_mysql_servers_table(); - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port"; - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (runtime_mysql_servers == nullptr) { - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; - - mydb->execute_statement(MYHGM_GEN_ADMIN_RUNTIME_SERVERS, &error, &cols, &affected_rows, &resultset); - - // Remove 'OFFLINE_HARD' servers since they are not relevant to propagate to other Cluster - // nodes, or relevant for checksum computation. - const size_t init_row_count = resultset->rows_count; - size_t rm_rows_count = 0; - const auto is_offline_server = [&rm_rows_count] (SQLite3_row* row) { - if (strcasecmp(row->fields[4], "OFFLINE_HARD") == 0) { - rm_rows_count += 1; - return true; - } else { - return false; - } - }; - resultset->rows.erase( - std::remove_if(resultset->rows.begin(), resultset->rows.end(), is_offline_server), - resultset->rows.end() - ); - resultset->rows_count = init_row_count - rm_rows_count; - - save_runtime_mysql_servers(resultset); - } else { - save_runtime_mysql_servers(runtime_mysql_servers); - } - - // reset all checksum - table_resultset_checksum.fill(0); - - if (resultset) { - if (resultset->rows_count) { - if (init == false) { - init = true; - myhash.Init(19,3); - } - uint64_t hash1_ = resultset->raw_checksum(); - - table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = hash1_; - - myhash.Update(&hash1_, sizeof(hash1_)); - proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", hash1_); - } - delete resultset; - } else { - proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0); - } - } + commit_update_checksum_from_mysql_servers(myhash, init); commit_update_checksums_from_tables(myhash, init); + uint64_t hash1 = 0, hash2 = 0; if (init == true) { myhash.Final(&hash1, &hash2); } - uint32_t d32[2]; - memcpy(&d32,&hash1,sizeof(hash1)); - sprintf(buf,"0x%0X%0X", d32[0], d32[1]); + + string new_checksum { get_checksum_from_hash(hash1) }; + proxy_info("New computed global checksum for 'mysql_servers' is '%s'\n", new_checksum.c_str()); + pthread_mutex_lock(&GloVars.checksum_mutex); - GloVars.checksums_values.mysql_servers.set_checksum(buf); - GloVars.checksums_values.mysql_servers.version++; - //struct timespec ts; - //clock_gettime(CLOCK_REALTIME, &ts); - time_t t = time(NULL); - if (epoch != 0 && checksum != "" && GloVars.checksums_values.mysql_servers.checksum == checksum) { - GloVars.checksums_values.mysql_servers.epoch = epoch; - } else { - GloVars.checksums_values.mysql_servers.epoch = t; - } - GloVars.checksums_values.updates_cnt++; - GloVars.generate_global_checksum(); - GloVars.epoch_version = t; + update_glovars_mysql_servers_checksum(new_checksum, checksum, epoch); pthread_mutex_unlock(&GloVars.checksum_mutex); } @@ -2427,6 +2533,11 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table // it is now time to build a new structure in Monitor + generate_mysql_group_replication_hostgroups_monitor_resultset(); + pthread_mutex_unlock(&Group_Replication_Info_mutex); +} + +void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_monitor_resultset() { pthread_mutex_lock(&GloMyMon->group_replication_mutex); { char *error=NULL; @@ -2445,8 +2556,6 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table } } pthread_mutex_unlock(&GloMyMon->group_replication_mutex); - - pthread_mutex_unlock(&Group_Replication_Info_mutex); } void MySQL_HostGroups_Manager::generate_mysql_galera_hostgroups_table() { @@ -3424,23 +3533,13 @@ inline double get_prometheus_counter_val( return current_val; } -void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding MySrvC %p (%s:%d) for hostgroup %d\n", mysrvc, mysrvc->address, mysrvc->port, _hid); - - // Since metrics for servers are stored per-endpoint; the metrics for a particular endpoint can live longer than the - // 'MySrvC' itself. For example, a failover or a server config change could remove the server from a particular - // hostgroup, and a subsequent one bring it back to the original hostgroup. For this reason, everytime a 'mysrvc' is - // created and added to a particular hostgroup, we update the endpoint metrics for it. - std::string endpoint_id { std::to_string(_hid) + ":" + string { mysrvc->address } + ":" + std::to_string(mysrvc->port) }; - - mysrvc->bytes_recv = get_prometheus_counter_val(this->status.p_conn_pool_bytes_data_recv_map, endpoint_id); - mysrvc->bytes_sent = get_prometheus_counter_val(this->status.p_conn_pool_bytes_data_sent_map, endpoint_id); - mysrvc->connect_ERR = get_prometheus_counter_val(this->status.p_connection_pool_conn_err_map, endpoint_id); - mysrvc->connect_OK = get_prometheus_counter_val(this->status.p_connection_pool_conn_ok_map, endpoint_id); - mysrvc->queries_sent = get_prometheus_counter_val(this->status.p_connection_pool_queries_map, endpoint_id); - - MyHGC *myhgc=MyHGC_lookup(_hid); +void reset_hg_attrs_server_defaults(MySrvC* mysrvc) { + mysrvc->weight = -1; + mysrvc->max_connections = -1; + mysrvc->use_ssl = -1; +} +void update_hg_attrs_server_defaults(MySrvC* mysrvc, MyHGC* myhgc) { if (mysrvc->weight == -1) { if (myhgc->servers_defaults.weight != -1) { mysrvc->weight = myhgc->servers_defaults.weight; @@ -3465,7 +3564,25 @@ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) { mysrvc->use_ssl = 0; } } +} + +void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding MySrvC %p (%s:%d) for hostgroup %d\n", mysrvc, mysrvc->address, mysrvc->port, _hid); + + // Since metrics for servers are stored per-endpoint; the metrics for a particular endpoint can live longer than the + // 'MySrvC' itself. For example, a failover or a server config change could remove the server from a particular + // hostgroup, and a subsequent one bring it back to the original hostgroup. For this reason, everytime a 'mysrvc' is + // created and added to a particular hostgroup, we update the endpoint metrics for it. + std::string endpoint_id { std::to_string(_hid) + ":" + string { mysrvc->address } + ":" + std::to_string(mysrvc->port) }; + mysrvc->bytes_recv = get_prometheus_counter_val(this->status.p_conn_pool_bytes_data_recv_map, endpoint_id); + mysrvc->bytes_sent = get_prometheus_counter_val(this->status.p_conn_pool_bytes_data_sent_map, endpoint_id); + mysrvc->connect_ERR = get_prometheus_counter_val(this->status.p_connection_pool_conn_err_map, endpoint_id); + mysrvc->connect_OK = get_prometheus_counter_val(this->status.p_connection_pool_conn_ok_map, endpoint_id); + mysrvc->queries_sent = get_prometheus_counter_val(this->status.p_connection_pool_queries_map, endpoint_id); + + MyHGC *myhgc=MyHGC_lookup(_hid); + update_hg_attrs_server_defaults(mysrvc, myhgc); myhgc->mysrvs->add(mysrvc); } @@ -4754,21 +4871,22 @@ void MySQL_HostGroups_Manager::read_only_action_v2(const std::listchecksum_variables.checksum_mysql_servers) { + // NOTE: We are always required to remove 'OFFLINE_HARD' servers, since we are not interested in + // their propagation to other cluster members. + unique_ptr runtime_servers_resultset { get_admin_runtime_mysql_servers(mydb) }; + remove_resultset_offline_hard_servers(runtime_servers_resultset); + save_runtime_mysql_servers(runtime_servers_resultset.release()); + char* error = NULL; int cols = 0; int affected_rows = 0; SQLite3_result* resultset = NULL; - mydb->execute_statement(MYHGM_GEN_ADMIN_RUNTIME_SERVERS, &error, &cols, &affected_rows, &resultset); - save_runtime_mysql_servers(resultset); // assigning runtime_mysql_servers with updated mysql server resultset - - resultset = NULL; // reset mysql_server checksum table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = 0; hgsm_mysql_servers_checksum = 0; - char* query = (char*)"SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port"; - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + mydb->execute_statement(MYSQL_SERVERS_CHECKSUM_QUERY, &error, &cols, &affected_rows, &resultset); if (resultset) { if (resultset->rows_count) { @@ -5645,6 +5763,75 @@ void MySQL_HostGroups_Manager::converge_group_replication_config(int _writer_hos pthread_mutex_unlock(&Group_Replication_Info_mutex); } +void MySQL_HostGroups_Manager::update_group_replication_add_autodiscovered( + const string& _host, int _port, int _wr_hg +) { + pthread_mutex_lock(&Group_Replication_Info_mutex); + const auto gr_info_map_it = this->Group_Replication_Info_Map.find(_wr_hg); + int32_t reader_hg = -1; + + if (gr_info_map_it == Group_Replication_Info_Map.end()) { + assert(0); + } else { + reader_hg = gr_info_map_it->second->reader_hostgroup; + } + pthread_mutex_unlock(&Group_Replication_Info_mutex); + + wrlock(); + + MyHGC *myhgc = MyHGC_lookup(reader_hg); + bool srv_found = false; + bool srv_found_offline = false; + + for (uint32_t j = 0; j < myhgc->mysrvs->cnt(); j++) { + MySrvC* mysrvc = static_cast(myhgc->mysrvs->servers->index(j)); + + // If the server is found as 'OFFLINE_HARD' we reset the 'MySrvC' values corresponding with the + // 'servers_defaults' (as in a new 'MySrvC' creation). We then later update these values with the + // 'servers_defaults' attributes from its corresponding 'MyHGC'. This way we ensure uniform behavior + // of new servers, and 'OFFLINE_HARD' ones when a user update 'servers_defaults' values, and reloads + // the servers to runtime. + if (strcmp(mysrvc->address,_host.c_str())==0 && mysrvc->port==_port) { + srv_found = true; + if (mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) { + reset_hg_attrs_server_defaults(mysrvc); + update_hg_attrs_server_defaults(mysrvc, mysrvc->myhgc); + proxy_info( + "Found healthy previously discovered GR node %s:%d as 'OFFLINE_HARD', setting back as 'ONLINE' with:" + " hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", + _host.c_str(), _port, reader_hg, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl + ); + mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; + srv_found_offline = true; + } + } + } + + if (srv_found == false) { + MySrvC* mysrvc = new MySrvC( + const_cast(_host.c_str()), _port, 0, -1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast("") + ); + add(mysrvc, reader_hg); + proxy_info( + "Adding new discovered GR node %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", + _host.c_str(), _port, reader_hg, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl + ); + } + + if (srv_found == false || srv_found_offline) { + purge_mysql_servers_table(); + + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + + generate_mysql_servers_table(); + update_table_mysql_servers_for_monitor(false); + generate_mysql_group_replication_hostgroups_monitor_resultset(); + } + + wrunlock(); +} + Galera_Info::Galera_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, int _w, char *c) { comment=NULL; if (c) { @@ -7178,23 +7365,7 @@ void MySQL_HostGroups_Manager::generate_mysql_aws_aurora_hostgroups_table() { // it is now time to build a new structure in Monitor pthread_mutex_lock(&GloMyMon->aws_aurora_mutex); - { - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT writer_hostgroup, reader_hostgroup, hostname, port, MAX(use_ssl) use_ssl , max_lag_ms , check_interval_ms , check_timeout_ms , " - "add_lag_ms , min_lag_ms , lag_num_checks FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR " - "hostgroup_id=reader_hostgroup WHERE active=1 AND status NOT IN (2,3) GROUP BY writer_hostgroup, hostname, port"; - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (resultset) { - if (GloMyMon->AWS_Aurora_Hosts_resultset) { - delete GloMyMon->AWS_Aurora_Hosts_resultset; - } - GloMyMon->AWS_Aurora_Hosts_resultset=resultset; - GloMyMon->AWS_Aurora_Hosts_resultset_checksum=resultset->raw_checksum(); - } - } + update_aws_aurora_hosts_monitor_resultset(false); pthread_mutex_unlock(&GloMyMon->aws_aurora_mutex); pthread_mutex_unlock(&AWS_Aurora_Info_mutex); @@ -7307,6 +7478,86 @@ bool MySQL_HostGroups_Manager::aws_aurora_replication_lag_action(int _whid, int return ret; } +int MySQL_HostGroups_Manager::create_new_server_in_hg( + uint32_t hid, const srv_info_t& srv_info, const srv_opts_t& srv_opts +) { + int32_t res = -1; + MySrvC* mysrvc = find_server_in_hg(hid, srv_info.addr, srv_info.port); + + if (mysrvc == nullptr) { + char* c_hostname { const_cast(srv_info.addr.c_str()) }; + MySrvC* mysrvc = new MySrvC( + c_hostname, srv_info.port, 0, srv_opts.weigth, MYSQL_SERVER_STATUS_ONLINE, 0, srv_opts.max_conns, 0, + srv_opts.use_ssl, 0, const_cast("") + ); + add(mysrvc,hid); + proxy_info( + "Adding new discovered %s node %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", + srv_info.kind.c_str(), c_hostname, srv_info.port, hid, mysrvc->weight, mysrvc->max_connections, + mysrvc->use_ssl + ); + + res = 0; + } else { + // If the server is found as 'OFFLINE_HARD' we reset the 'MySrvC' values corresponding with the + // 'servers_defaults' (as in a new 'MySrvC' creation). We then later update these values with the + // 'servers_defaults' attributes from its corresponding 'MyHGC'. This way we ensure uniform behavior + // of new servers, and 'OFFLINE_HARD' ones when a user update 'servers_defaults' values, and reloads + // the servers to runtime. + if (mysrvc && mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) { + reset_hg_attrs_server_defaults(mysrvc); + update_hg_attrs_server_defaults(mysrvc, mysrvc->myhgc); + mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; + + proxy_info( + "Found healthy previously discovered %s node %s:%d as 'OFFLINE_HARD', setting back as 'ONLINE' with:" + " hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", + srv_info.kind.c_str(), srv_info.addr.c_str(), srv_info.port, hid, mysrvc->weight, + mysrvc->max_connections, mysrvc->use_ssl + ); + + res = 0; + } + } + + return res; +} + +int MySQL_HostGroups_Manager::remove_server_in_hg(uint32_t hid, const string& addr, uint16_t port) { + MySrvC* mysrvc = find_server_in_hg(hid, addr, port); + if (mysrvc == nullptr) { + return -1; + } + + uint64_t mysrvc_addr = reinterpret_cast(mysrvc); + + proxy_warning( + "Removed server at address %ld, hostgroup %d, address %s port %d." + " Setting status OFFLINE HARD and immediately dropping all free connections." + " Used connections will be dropped when trying to use them\n", + mysrvc_addr, hid, mysrvc->address, mysrvc->port + ); + + // Set the server status + mysrvc->status=MYSQL_SERVER_STATUS_OFFLINE_HARD; + mysrvc->ConnectionsFree->drop_all_connections(); + + // TODO-NOTE: This is only required in case the caller isn't going to perform: + // - Full deletion of servers in the target 'hid'. + // - Table regeneration for the servers in the target 'hid'. + // This is a very common pattern when further operations have been performed over the + // servers, e.g. a set of servers additions and deletions over the target hostgroups. + // //////////////////////////////////////////////////////////////////////// + + // Remove the server from the table + const string del_srv_query { "DELETE FROM mysql_servers WHERE mem_pointer=" + std::to_string(mysrvc_addr) }; + mydb->execute(del_srv_query.c_str()); + + // //////////////////////////////////////////////////////////////////////// + + return 0; +} + // FIXME: complete this!! void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid, char *_server_id, bool verbose) { int cols=0; @@ -7352,26 +7603,8 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid free(error); error=NULL; } - //free(query); if (resultset) { -/* - // let's get info about this cluster - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - std::map::iterator it2; - it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); - AWS_Aurora_Info *info=NULL; - if (it2!=AWS_Aurora_Info_Map.end()) { - info=it2->second; - writer_is_also_reader=info->writer_is_also_reader; - new_reader_weight = info->new_reader_weight; - read_HG = info->reader_hostgroup; - //need_converge=info->need_converge; - //info->need_converge=false; - //max_writers = info->max_writers; - } - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); -*/ if (resultset->rows_count) { for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; @@ -7386,41 +7619,17 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid } } } -/* - if (need_converge == false) { - SQLite3_result *resultset2=NULL; - q = (char *)"SELECT COUNT(*) FROM mysql_servers WHERE hostgroup_id=%d AND status=0"; - query=(char *)malloc(strlen(q)+32); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int nwriters = atoi(r->fields[0]); - if (nwriters > max_writers) { - proxy_warning("Galera: too many writers in HG %d. Max=%d, current=%d\n", _writer_hostgroup, max_writers, nwriters); - need_converge = true; - } - } - } - delete resultset2; + + if (found_writer) { // maybe no-op + if ( + (writer_is_also_reader==0 && found_reader==false) + || + (writer_is_also_reader > 0 && found_reader==true) + ) { // either both true or both false + delete resultset; + resultset=NULL; } - free(query); } -*/ -// if (need_converge==false) { - if (found_writer) { // maybe no-op - if ( - (writer_is_also_reader==0 && found_reader==false) - || - (writer_is_also_reader > 0 && found_reader==true) - ) { // either both true or both false - delete resultset; - resultset=NULL; - } - } -// } } if (resultset) { @@ -7428,8 +7637,6 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid // This should be the case most of the time, // because the calling function knows if an action is required. if (resultset->rows_count) { - //need_converge=false; - GloAdmin->mysql_servers_wrlock(); mydb->execute("DELETE FROM mysql_servers_incoming"); q=(char *)"INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE hostgroup_id=%d"; @@ -7439,17 +7646,12 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port); mydb->execute(query); q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+1024); // increased this buffer as it is used for other queries too sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port, _writer_hostgroup); mydb->execute(query); - //free(query); q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); mydb->execute(query); - //free(query); q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); mydb->execute(query); @@ -7461,13 +7663,13 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid sprintf(query,q,_rhid, new_reader_weight, _whid); mydb->execute(query); - //free(query); if (writer_is_also_reader && read_HG>=0) { q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; sprintf(query, q, read_HG, _writer_hostgroup, _server_id, domain_name, aurora_port); mydb->execute(query); q = (char *)"UPDATE mysql_servers_incoming SET weight=%d WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; sprintf(query, q, new_reader_weight, read_HG, _server_id, domain_name, aurora_port); + mydb->execute(query); } uint64_t checksum_current = 0; uint64_t checksum_incoming = 0; @@ -7513,30 +7715,11 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid mydb->execute(query); commit(); wrlock(); -/* - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, reader_hostgroup FROM mysql_aws_aurora_hostgroups WHERE writer_hostgroup=%d"; - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int reader_hostgroup=atoi(r->fields[1]); -*/ - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; - sprintf(query,q,_whid,_rhid); - mydb->execute(query); - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); -/* - } - } - delete resultset2; - resultset2=NULL; - } -*/ + q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; + sprintf(query,q,_whid,_rhid); + mydb->execute(query); + generate_mysql_servers_table(&_whid); + generate_mysql_servers_table(&_rhid); wrunlock(); } else { if (GloMTH->variables.hostgroup_manager_verbose > 1) { @@ -7547,45 +7730,57 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid free(query); query = NULL; } else { + string full_hostname { string { _server_id } + string { domain_name } }; + GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - q=(char *)"INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE hostname<>'%s%s'"; - sprintf(query,q, _server_id, domain_name); - mydb->execute(query); + wrlock(); - unsigned int max_max_connections = 1000; - unsigned int max_use_ssl = 0; - MyHGC *myhgc = MyHGC_lookup(_whid); - for (int j = 0; j < (int) myhgc->mysrvs->cnt(); j++) { - MySrvC *mysrvc = (MySrvC *) myhgc->mysrvs->servers->index(j); - if (mysrvc->max_connections > max_max_connections) { - max_max_connections = mysrvc->max_connections; - } - if (mysrvc->use_ssl > max_use_ssl) { - max_use_ssl = mysrvc->use_ssl; - } + srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora AWS" }; + srv_opts_t wr_srv_opts { -1, -1, -1 }; + + int wr_res = create_new_server_in_hg(_writer_hostgroup, srv_info, wr_srv_opts); + int rd_res = -1; + + // WRITER can also be placed as READER, or could previously be one + if (writer_is_also_reader && read_HG >= 0) { + srv_opts_t rd_srv_opts { new_reader_weight, -1, -1 }; + rd_res = create_new_server_in_hg(read_HG, srv_info, rd_srv_opts); } - q=(char *)"INSERT INTO mysql_servers_incoming (hostgroup_id, hostname, port, weight, max_connections, use_ssl) VALUES (%d, '%s%s', %d, %d, %d, %d)"; - sprintf(query,q, _writer_hostgroup, _server_id, domain_name, aurora_port, new_reader_weight, max_max_connections, max_use_ssl); - mydb->execute(query); - if (writer_is_also_reader && read_HG>=0) { - q=(char *)"INSERT INTO mysql_servers_incoming (hostgroup_id, hostname, port, weight, max_connections, use_ssl) VALUES (%d, '%s%s', %d, %d, %d, %d)"; - sprintf(query, q, read_HG, _server_id, domain_name, aurora_port, new_reader_weight, max_max_connections, max_use_ssl); - mydb->execute(query); + // A new server has been created, or an OFFLINE_HARD brought back as ONLINE + if (wr_res == 0 || rd_res == 0) { + proxy_info( + "AWS Aurora: setting new auto-discovered host %s:%d as writer\n", full_hostname.c_str(), aurora_port + ); + purge_mysql_servers_table(); + + const char del_srvs_query_t[] { "DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)" }; + const string del_srvs_query { cstr_format(del_srvs_query_t, _whid, _rhid).str }; + mydb->execute(del_srvs_query.c_str()); + + generate_mysql_servers_table(&_whid); + generate_mysql_servers_table(&_rhid); + + // Update the global checksums after 'mysql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; + remove_resultset_offline_hard_servers(resultset); + save_runtime_mysql_servers(resultset.release()); + + string mysrvs_checksum { gen_global_mysql_servers_checksum() }; + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_mysql_servers_checksum(mysrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); + } + + // Because 'commit' isn't called, we are required to update 'mysql_servers_for_monitor'. + update_table_mysql_servers_for_monitor(false); + // Update AWS Aurora resultset used for monitoring + update_aws_aurora_hosts_monitor_resultset(true); } - proxy_info("AWS Aurora: setting new auto-discovered host %s%s:%d as writer\n", _server_id, domain_name, aurora_port); - commit(); - wrlock(); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; - sprintf(query,q,_whid,_rhid); - mydb->execute(query); - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); + wrunlock(); GloAdmin->mysql_servers_wrunlock(); - free(query); - query = NULL; } } if (resultset) { @@ -7625,9 +7820,9 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid } pthread_mutex_unlock(&AWS_Aurora_Info_mutex); } - q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3"; - query=(char *)malloc(strlen(q)+strlen(_server_id)+strlen(domain_name)+32); - sprintf(query, q, _server_id, domain_name, aurora_port); + q = (char*)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d,%d)"; + query=(char *)malloc(strlen(q)+strlen(_server_id)+strlen(domain_name)+32+32+32); + sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); if (error) { free(error); @@ -7640,124 +7835,74 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid GloAdmin->mysql_servers_wrlock(); mydb->execute("DELETE FROM mysql_servers_incoming"); mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; + // If server present as WRITER try moving it to 'reader_hostgroup'. + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; query=(char *)malloc(strlen(q)+strlen(_server_id)+strlen(domain_name)+512); - sprintf(query, q, _rhid, _server_id, domain_name, aurora_port, _rhid); + sprintf(query, q, _rhid, _server_id, domain_name, aurora_port, _whid); mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query, q, _server_id, domain_name, aurora_port, _rhid); + // Reader could previously be also a reader, in which case previous operation 'UPDATE OR IGNORE' + // did nothing. If server is still in the 'writer_hostgroup', we should remove it. + q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _whid); mydb->execute(query); - //free(query); q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; sprintf(query, q, _server_id, domain_name, aurora_port, _rhid); mydb->execute(query); - //free(query); - //converge_galera_config(_writer_hostgroup); commit(); wrlock(); -/* - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, reader_hostgroup FROM mysql_galera_hostgroups WHERE writer_hostgroup=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); -*/ - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; - sprintf(query,q,_whid,_rhid); - mydb->execute(query); - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); -/* - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } -*/ + + q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; + sprintf(query,q,_whid,_rhid); + mydb->execute(query); + generate_mysql_servers_table(&_whid); + generate_mysql_servers_table(&_rhid); + wrunlock(); GloAdmin->mysql_servers_wrunlock(); free(query); } else { // we couldn't find the server // autodiscovery algorithm here - char *full_hostname=(char *)malloc(strlen(_server_id)+strlen(domain_name)+1); - sprintf(full_hostname, "%s%s", _server_id, domain_name); - bool found = false; + string full_hostname { string { _server_id } + string { domain_name } }; GloAdmin->mysql_servers_wrlock(); - unsigned int max_max_connections = 10; - unsigned int max_use_ssl = 0; wrlock(); - MyHGC *myhgc=MyHGC_lookup(_rhid); - { - for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) { - MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); - if (mysrvc->max_connections > max_max_connections) { - max_max_connections = mysrvc->max_connections; - } - if (mysrvc->use_ssl > max_use_ssl) { - max_use_ssl = mysrvc->use_ssl; - } - if (strcmp(mysrvc->address,full_hostname)==0 && mysrvc->port==aurora_port) { - found = true; - // we found the server, we just configure it online if it was offline - if (mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) { - mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; - } - } - } - if (found == false) { // the server doesn't exist - MySrvC *mysrvc=new MySrvC(full_hostname, aurora_port, 0, new_reader_weight, MYSQL_SERVER_STATUS_ONLINE, 0, max_max_connections, 0, max_use_ssl, 0, (char *)""); // add new fields here if adding more columns in mysql_servers - proxy_info("Adding new discovered AWS Aurora node %s:%d with: hostgroup=%d, weight=%d, max_connections=%d\n" , full_hostname, aurora_port, _rhid , new_reader_weight, max_max_connections); - add(mysrvc,_rhid); - } - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; - query = (char *)malloc(strlen(q)+64); - sprintf(query,q,_whid,_rhid); - mydb->execute(query); + + srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora AWS" }; + srv_opts_t srv_opts { new_reader_weight, -1, -1 }; + int wr_res = create_new_server_in_hg(_rhid, srv_info, srv_opts); + + // A new server has been created, or an OFFLINE_HARD brought back as ONLINE + if (wr_res == 0) { + purge_mysql_servers_table(); + + const char del_srvs_query_t[] { "DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)" }; + const string del_srvs_query { cstr_format(del_srvs_query_t, _whid, _rhid).str }; + mydb->execute(del_srvs_query.c_str()); + generate_mysql_servers_table(&_whid); generate_mysql_servers_table(&_rhid); - free(query); - } - // NOTE: Because 'commit' isn't called, we are required to update 'mysql_servers_for_monitor'. - // Also note that 'generate_mysql_servers' is previously called. - update_table_mysql_servers_for_monitor(false); - wrunlock(); - // it is now time to build a new structure in Monitor - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - pthread_mutex_lock(&GloMyMon->aws_aurora_mutex); - { - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT writer_hostgroup, reader_hostgroup, hostname, port, MAX(use_ssl) use_ssl , max_lag_ms , check_interval_ms , check_timeout_ms, add_lag_ms, min_lag_ms, lag_num_checks FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE active=1 AND status NOT IN (2,3) GROUP BY hostname, port"; - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (resultset) { - if (GloMyMon->AWS_Aurora_Hosts_resultset) { - delete GloMyMon->AWS_Aurora_Hosts_resultset; - } - GloMyMon->AWS_Aurora_Hosts_resultset=resultset; - GloMyMon->AWS_Aurora_Hosts_resultset_checksum=resultset->raw_checksum(); + + // Update the global checksums after 'mysql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; + remove_resultset_offline_hard_servers(resultset); + save_runtime_mysql_servers(resultset.release()); + + string mysrvs_checksum { gen_global_mysql_servers_checksum() }; + proxy_info("New computed global checksum for 'mysql_servers' is '%s'\n", mysrvs_checksum.c_str()); + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_mysql_servers_checksum(mysrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); } + + // Because 'commit' isn't called, we are required to update 'mysql_servers_for_monitor'. + update_table_mysql_servers_for_monitor(false); + // Update AWS Aurora resultset used for monitoring + update_aws_aurora_hosts_monitor_resultset(true); } - pthread_mutex_unlock(&GloMyMon->aws_aurora_mutex); - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + + wrunlock(); GloAdmin->mysql_servers_wrunlock(); - free(full_hostname); } } if (resultset) { @@ -7767,6 +7912,42 @@ void MySQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid free(domain_name); } +const char SELECT_AWS_AURORA_SERVERS_FOR_MONITOR[] { + "SELECT writer_hostgroup, reader_hostgroup, hostname, port, MAX(use_ssl) use_ssl, max_lag_ms, check_interval_ms," + " check_timeout_ms, add_lag_ms, min_lag_ms, lag_num_checks FROM mysql_servers" + " JOIN mysql_aws_aurora_hostgroups ON" + " hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE active=1 AND status NOT IN (2,3)" + " GROUP BY writer_hostgroup, hostname, port" +}; + +void MySQL_HostGroups_Manager::update_aws_aurora_hosts_monitor_resultset(bool lock) { + if (lock) { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + pthread_mutex_lock(&GloMyMon->aws_aurora_mutex); + } + + SQLite3_result* resultset = nullptr; + { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + mydb->execute_statement(SELECT_AWS_AURORA_SERVERS_FOR_MONITOR, &error, &cols, &affected_rows, &resultset); + } + + if (resultset) { + if (GloMyMon->AWS_Aurora_Hosts_resultset) { + delete GloMyMon->AWS_Aurora_Hosts_resultset; + } + GloMyMon->AWS_Aurora_Hosts_resultset=resultset; + GloMyMon->AWS_Aurora_Hosts_resultset_checksum=resultset->raw_checksum(); + } + + if (lock) { + pthread_mutex_unlock(&GloMyMon->aws_aurora_mutex); + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } +} + MySrvC* MySQL_HostGroups_Manager::find_server_in_hg(unsigned int _hid, const std::string& addr, int port) { MySrvC* f_server = nullptr; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 01a0df6135..17acd2ef87 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -212,6 +212,9 @@ class MySQL_Monitor_Connection_Pool { __conn_register_label: for (unsigned int i=0; ilen; i++) { MYSQL *my1 = (MYSQL *)conns->index(i); + // 'my1' can be NULL due to connection cleanup + if (my1 == nullptr) continue; + assert(my!=my1); //assert(my->net.fd!=my1->net.fd); // FIXME: we changed this with the next section of code if (my->net.fd == my1->net.fd) { @@ -232,6 +235,20 @@ class MySQL_Monitor_Connection_Pool { #endif // DEBUG return; }; + /** + * @brief Unregister the conn from the supplied 'mmsd'. + * @details DEBUG only helper function useful for checking the get/put connection flow + * for 'MySQL_Monitor_Connection_Pool'. This function should be called whenever a monitoring action does + * no longer require the conn of it's 'MMSD' and the conn has been considered 'non-suited' for being + * returned to the conn pool. This can be due to a failure in the data querying from the server itself, + * or due to unexpected data retrieved from the server. Due to this, the flow for calling this function + * during 'async' monitoring actions is: + * - If an error has taken place during the fetching itself, this function shall be called as soon as + * the failure is detected by the async state machine. + * - In case no error has taken place (TASK_RESULT_SUCCESS), this function should be called by the + * task-handler if it determines that the retrieved data is malformed. See handle_mmsd_mysql_conn. + * @param mmsd The 'mmsd' which conn should be unregistered. + */ void conn_unregister(MySQL_Monitor_State_Data *mmsd) { #ifdef DEBUG std::lock_guard lock(mutex); @@ -322,6 +339,9 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port, if (my) { for (unsigned int j=0; jlen; j++) { MYSQL *my1 = (MYSQL *)conns->index(j); + // 'my1' can be NULL due to connection cleanup + if (!my1) continue; + assert(my!=my1); assert(my->net.fd!=my1->net.fd); } @@ -488,19 +508,23 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS /** * @brief MySQL 8 status query for Group Replication members. * @details Since 'MySQL 8' we rely on 'COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE', deprecating the previously - * required 'sys.gr_member_routing_candidate_status' view. + * required 'sys.gr_member_routing_candidate_status' view. Another additions: + * - A new field 'members' has been added to the query, containing the current cluster members as seen by the + * queried node. This field is used for auto discovery. + * - Server state 'RECOVERING' is now also considered when detecting if a member is a 'viable' candidate. */ const char MYSQL_8_GR_QUERY[] { "SELECT (SELECT IF (" "MEMBER_STATE='ONLINE' AND (" - "(SELECT COUNT(*) FROM performance_schema.replication_group_members WHERE MEMBER_STATE != 'ONLINE') >=" + "(SELECT COUNT(*) FROM performance_schema.replication_group_members WHERE MEMBER_STATE NOT IN ('ONLINE', 'RECOVERING')) >=" " ((SELECT COUNT(*) FROM performance_schema.replication_group_members)/2) = 0)" ", 'YES', 'NO')) AS viable_candidate," " (SELECT IF (@@read_only, 'YES', 'NO')) as read_only," - " COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE AS transactions_behind " + " COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE AS transactions_behind, " + " (SELECT GROUP_CONCAT(CONCAT(member_host, \":\", member_port)) FROM performance_schema.replication_group_members) AS members " "FROM " "performance_schema.replication_group_members " - "JOIN performance_schema.replication_group_member_stats rgms USING(member_id)" + "JOIN performance_schema.replication_group_member_stats rgms USING(member_id) " "WHERE rgms.MEMBER_ID=@@SERVER_UUID" }; @@ -609,7 +633,7 @@ void MySQL_Monitor_State_Data::init_async() { async_state_machine_ = ASYNC_QUERY_START; #ifdef TEST_GROUPREP { - query_ = "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS "; + query_ = "SELECT viable_candidate,read_only,transactions_behind,members FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS "; query_ += std::string(hostname) + ":" + std::to_string(port); } #else @@ -1861,7 +1885,7 @@ void * monitor_group_replication_thread(void *arg) { mmsd->interr=0; // reset the value #ifdef TEST_GROUPREP { - std::string s { "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" }; + std::string s { "SELECT viable_candidate,read_only,transactions_behind,members FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" }; s += " " + std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port); mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,s.c_str()); } @@ -3367,18 +3391,6 @@ set extract_writer_hgs(SQLite3_result* Group_Replication_Hosts_results return writer_hgs; } -/** - * @brief Holds the info from a GR server definition. - */ -typedef struct _gr_host_def_t { - string host; - int port; - int use_ssl; - bool writer_is_also_reader; - int max_transactions_behind; - int max_transactions_behind_count; -} gr_host_def_t; - /** * @brief Extracts a 'MySQL_Monitor_State_Data' from the provided 'SQLite3_result*'. * @details The expected contents of the provided 'SQLite3_result*' are the ones generated by @@ -3499,13 +3511,67 @@ unique_ptr init_mmsd_with_conn( return mmsd; } +using gr_srv_addr_t = pair; + struct gr_srv_st_t { bool viable_candidate = false; bool read_only = true; int64_t transactions_behind = -1; bool inv_srv_state = false; + vector gr_members {}; }; +#define GR_MEMBER_ENTRY_ERR "%s '%s' in 'members' field from GR query to server '%s:%d'. Autodiscovery action aborted.\n" + +vector> parse_gr_members_addrs( + const MySQL_Monitor_State_Data* mmsd, const vector& gr_cluster_members +) { +#ifdef DEBUG + nlohmann::ordered_json members { gr_cluster_members }; + proxy_debug( + PROXY_DEBUG_MONITOR, 7, "Received 'members' field '%s' from GR query to server '%s:%d'\n", members.dump().c_str(), + mmsd->hostname, mmsd->port + ); +#endif + vector> result {}; + + for (const auto& cluster_member : gr_cluster_members) { + const vector gr_member_host_port { split_str(cluster_member, ':') }; + if (gr_member_host_port.size() != 2) { + proxy_error(GR_MEMBER_ENTRY_ERR, "Invalid server entry", cluster_member.c_str(), mmsd->hostname, mmsd->port); + break; + } + + const string srv_host { gr_member_host_port[0] }; + const char* c_str_port { gr_member_host_port[1].c_str() }; + + int32_t srv_port = -1; + + { + char* p_end = nullptr; + long port = std::strtol(c_str_port, &p_end, 10); + + if (c_str_port == p_end) { + proxy_error( + GR_MEMBER_ENTRY_ERR, "Failed to parse port for server entry", cluster_member.c_str(), mmsd->hostname, mmsd->port + ); + break; + } else { + srv_port = port; + } + } + + result.push_back({srv_host, srv_port}); + } + + // If any entry fails to parse, we invalidate the whole action + if (gr_cluster_members.size() != result.size()) { + return {}; + } else { + return result; + } +} + gr_srv_st_t extract_gr_srv_st(MySQL_Monitor_State_Data* mmsd) { gr_srv_st_t gr_srv_st {}; @@ -3516,14 +3582,21 @@ gr_srv_st_t extract_gr_srv_st(MySQL_Monitor_State_Data* mmsd) { num_fields = mysql_num_fields(mmsd->result); num_rows = mysql_num_rows(mmsd->result); - if (fields == NULL || num_fields!=3 || num_rows!=1) { - proxy_error( - "'mysql_fetch_fields' returns 'NULL', or 'mysql_num_fields(%d)', or 'mysql_num_rows(%d)' are incorrect." - " Server %s:%d. See bug #1994\n", - num_fields, num_rows, mmsd->hostname, mmsd->port - ); + if (fields == NULL || num_fields!=4 || num_rows!=1) { + if (num_rows == 0) { + proxy_error( + "Empty resultset for GR monitoring query from server %s:%d. Server is likely misconfigured\n", + mmsd->hostname, mmsd->port + ); + } else { + proxy_error( + "Invalid resultset for GR monitoring query from server %s:%d. Either 'mysql_fetch_fields=NULL' or unexpected 'mysql_num_fields=%d'." + " Please report this incident\n", + mmsd->hostname, mmsd->port, num_fields + ); + } if (mmsd->mysql_error_msg == NULL) { - mmsd->mysql_error_msg = strdup("Unknown error"); + mmsd->mysql_error_msg = strdup("Invalid or malformed resultset"); } gr_srv_st.inv_srv_state = true; } else { @@ -3537,11 +3610,17 @@ gr_srv_st_t extract_gr_srv_st(MySQL_Monitor_State_Data* mmsd) { if (row[2]) { gr_srv_st.transactions_behind=atol(row[2]); } + if (mmsd->cur_monitored_gr_srvs && row[3]) { + const string str_members_addrs { row[3] }; + const vector members_addrs { split_str(str_members_addrs, ',') }; + + gr_srv_st.gr_members = parse_gr_members_addrs(mmsd, members_addrs); + } } } proxy_debug( - PROXY_DEBUG_MONITOR, 4, + PROXY_DEBUG_MONITOR, 7, "Fetched %u:%s:%d info - interr: %d, error: %s, viable_candidate:'%d', read_only:'%d'," " transactions_behind:'%ld'\n", mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mmsd->interr, mmsd->mysql_error_msg, @@ -3572,7 +3651,6 @@ gr_node_info_t gr_update_hosts_map( // NOTE: This isn't specified in the initializer list due to current standard limitations gr_node_info_t node_info {}; node_info.srv_st = gr_srv_st; - MySQL_Monitor_State_Data_Task_Result task_result = mmsd->get_task_result(); // Consider 'time_now' to be 'now - fetch_duration' unsigned long long time_now=realtime_time(); @@ -3665,6 +3743,24 @@ void gr_mon_action_over_resp_srv(MySQL_Monitor_State_Data* mmsd, const gr_node_i MyHGM->group_replication_lag_action( mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, node_info.lag_counts, node_info.srv_st.read_only, enable ); + + if (mmsd->cur_monitored_gr_srvs && node_info.srv_st.gr_members.empty() == false) { + for (const gr_srv_addr_t& gr_member : node_info.srv_st.gr_members) { + const string& srv_host { gr_member.first }; + const int32_t srv_port { gr_member.second }; + bool found = false; + + for (const gr_host_def_t& host_def : *mmsd->cur_monitored_gr_srvs) { + if (srv_host == host_def.host && srv_port == host_def.port) { + found = true; + } + } + + if (found == false) { + MyHGM->update_group_replication_add_autodiscovered(srv_host, srv_port, mmsd->writer_hostgroup); + } + } + } } } } @@ -3696,12 +3792,16 @@ void gr_handle_actions_over_unresp_srvs(const vector& hosts_defs, * before placing the connection back into the 'ConnectionPool', on failure, we discard the connection. * @param mmsd The mmsd wrapper holding all information for returning the connection. */ -void handle_mmsd_mysql_conn(MySQL_Monitor_State_Data* mmsd, bool unregister_conn_on_failure) { +void handle_mmsd_mysql_conn(MySQL_Monitor_State_Data* mmsd) { if (mmsd == nullptr) return; if (mmsd->mysql) { if (mmsd->interr || mmsd->mysql_error_msg) { - if (unregister_conn_on_failure) { + // If 'MySQL_Monitor_State_Data' reaches the end of a task_handler without 'TASK_RESULT_UNKNOWN': + // 1. Connection failed to be created, 'task_result' should be 'TASK_RESULT_UNKNOWN'. No + // unregister needed. + // 2. Fetching operation failed, the async fetching handler already handled the 'unregister'. + if (mmsd->get_task_result() == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) { GloMyMon->My_Conn_Pool->conn_unregister(mmsd); } mysql_close(mmsd->mysql); @@ -3718,7 +3818,7 @@ void handle_mmsd_mysql_conn(MySQL_Monitor_State_Data* mmsd, bool unregister_conn MyHGM->p_update_mysql_error_counter( p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql) ); - if (unregister_conn_on_failure) { + if (mmsd->get_task_result() == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) { GloMyMon->My_Conn_Pool->conn_unregister(mmsd); } mysql_close(mmsd->mysql); @@ -3767,9 +3867,9 @@ void gr_report_fetching_errs(MySQL_Monitor_State_Data* mmsd) { * @param mmsd The server 'MySQL_Monitor_State_Data' after the fetching is completed. It should either * hold a valid 'MYSQL_RES' or an error. */ -void async_gr_mon_actions_handler(MySQL_Monitor_State_Data* mmsd, bool unregister_conn_on_failure) { +void async_gr_mon_actions_handler(MySQL_Monitor_State_Data* mmsd) { // We base 'start_time' on the conn init for 'MySQL_Monitor_State_Data'. If a conn creation was - // required, we take into account this time into account, otherwise we asume that 'start_time=t1'. + // required, we take into account this time into account, otherwise we assume that 'start_time=t1'. uint64_t start_time = 0; if (mmsd->created_conn) { start_time = mmsd->init_time; @@ -3790,7 +3890,7 @@ void async_gr_mon_actions_handler(MySQL_Monitor_State_Data* mmsd, bool unregiste } // Handle 'mmsd' MySQL conn return to 'ConnectionPool' - handle_mmsd_mysql_conn(mmsd, unregister_conn_on_failure); + handle_mmsd_mysql_conn(mmsd); } /** @@ -3895,6 +3995,11 @@ void* monitor_GR_thread_HG(void *arg) { } } + int rnd_discoverer = conn_mmsds.size() == 0 ? -1 : rand() % conn_mmsds.size(); + if (rnd_discoverer != -1) { + conn_mmsds[rnd_discoverer]->cur_monitored_gr_srvs = &hosts_defs; + } + // TODO: This needs to be reworked once we change the way monitoring actions work on clusters, taking // the full cluster fetch data to avoid transient states. For now, since we perform the monitoring // actions independently, we workaround the limitation of 'Monitor_Poll' of only handling @@ -3906,7 +4011,7 @@ void* monitor_GR_thread_HG(void *arg) { // Handle 'mmsds' that failed to optain conns for (const unique_ptr& mmsd : fail_mmsds) { - async_gr_mon_actions_handler(mmsd.get(), true); + async_gr_mon_actions_handler(mmsd.get()); } // Update 't1' for subsequent fetch operations and reset errors @@ -3924,6 +4029,10 @@ void* monitor_GR_thread_HG(void *arg) { /////////////////////////////////////////////////////////////////////////////////////// + if (rnd_discoverer != -1) { + conn_mmsds[rnd_discoverer]->cur_monitored_gr_srvs = nullptr; + } + // Set the time for the next iteration next_check_time = curtime + mysql_thread___monitor_groupreplication_healthcheck_interval * 1000; } @@ -5781,12 +5890,12 @@ void * monitor_AWS_Aurora_thread_HG(void *arg) { rnd %= num_hosts; rc_ping = GloMyMon->server_responds_to_ping(hpa[rnd].host, hpa[rnd].port); //proxy_info("Looping Monitor thread for AWS Aurora writer HG %u\n", wHG); -#ifdef TEST_AURORA +#ifdef TEST_AURORA_RANDOM if (rand() % 100 < 30) { // we randomly fail 30% of the requests rc_ping = false; } -#endif // TEST_AURORA +#endif // TEST_AURORA_RANDOM if (rc_ping) { found_pingable_host = true; cur_host_idx = rnd; @@ -5809,12 +5918,12 @@ void * monitor_AWS_Aurora_thread_HG(void *arg) { } } -#ifdef TEST_AURORA +#ifdef TEST_AURORA_RANDOM if (rand() % 200 == 0) { // we randomly fail 0.5% of the requests found_pingable_host = false; } -#endif // TEST_AURORA +#endif // TEST_AURORA_RANDOM if (found_pingable_host == false) { proxy_error("No node is pingable for AWS Aurora cluster with writer HG %u\n", wHG); @@ -5868,7 +5977,10 @@ void * monitor_AWS_Aurora_thread_HG(void *arg) { mmsd->t1=monotonic_time(); mmsd->interr=0; // reset the value #ifdef TEST_AURORA - mmsd->async_exit_status = mysql_query_start(&mmsd->interr, mmsd->mysql, "SELECT SERVER_ID, SESSION_ID, LAST_UPDATE_TIMESTAMP, REPLICA_LAG_IN_MILLISECONDS, CPU FROM REPLICA_HOST_STATUS ORDER BY SERVER_ID"); + { + string query { TEST_AURORA_MONITOR_BASE_QUERY + std::to_string(wHG) }; + mmsd->async_exit_status = mysql_query_start(&mmsd->interr, mmsd->mysql, query.c_str()); + } #else // for reference we list the old queries. // original implementation: @@ -6076,34 +6188,6 @@ void * monitor_AWS_Aurora_thread_HG(void *arg) { } } } -/* - mmsd->writer_hostgroup=atoi(r->fields[0]); - mmsd->writer_is_also_reader=atoi(r->fields[4]); - mmsd->max_transactions_behind=atoi(r->fields[5]); - mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_AWS_Aurora_thread); - GloMyMon->queue.add(item); - usleep(us); -*/ -// } - -/* - for - for (std::vector::iterator it = Galera_Hosts_resultset->rows.begin() ; it != Galera_Hosts_resultset->rows.end(); ++it) { - - } - SQLite3_row *r=*it; - bool rc_ping = true; - rc_ping = server_responds_to_ping(r->fields[1],atoi(r->fields[2])); - if (rc_ping) { // only if server is responding to pings - MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[1],atoi(r->fields[2]), NULL, atoi(r->fields[3])); - mmsd->writer_hostgroup=atoi(r->fields[0]); - mmsd->writer_is_also_reader=atoi(r->fields[4]); - mmsd->max_transactions_behind=atoi(r->fields[5]); - mmsd->mondb=monitordb; - -*/ } __exit_monitor_AWS_Aurora_thread_HG_now: if (mmsd) { @@ -6223,66 +6307,6 @@ void * MySQL_Monitor::monitor_aws_aurora() { pthread_mutex_unlock(&aws_aurora_mutex); } -/* - if (t1 < next_loop_at) { - goto __sleep_monitor_aws_aurora; - } - - if (next_loop_at == 0) { - // free the queue - - } - - next_loop_at=t1+1000*mysql_thread___monitor_galera_healthcheck_interval; - pthread_mutex_lock(&aws_aurora_mutex); - if (AWS_Aurora_Hosts_resultset==NULL) { - goto __end_monitor_aws_aurora_loop; - } else { - if (AWS_Aurora_Hosts_resultset->rows_count==0) { - goto __end_monitor_aws_aurora_loop; - } - int us=100; - if (AWS_Aurora_Hosts_resultset->rows_count) { - us=mysql_thread___monitor_read_only_interval/2/Galera_Hosts_resultset->rows_count; - } - for (std::vector::iterator it = Galera_Hosts_resultset->rows.begin() ; it != Galera_Hosts_resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - bool rc_ping = true; - rc_ping = server_responds_to_ping(r->fields[1],atoi(r->fields[2])); - if (rc_ping) { // only if server is responding to pings - MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[1],atoi(r->fields[2]), NULL, atoi(r->fields[3])); - mmsd->writer_hostgroup=atoi(r->fields[0]); - mmsd->writer_is_also_reader=atoi(r->fields[4]); - mmsd->max_transactions_behind=atoi(r->fields[5]); - mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_AWS_Aurora_thread); - GloMyMon->queue.add(item); - usleep(us); - } - if (GloMyMon->shutdown) { - pthread_mutex_unlock(&galera_mutex); - return NULL; - } - } - } - -__end_monitor_aws_aurora_loop: - pthread_mutex_unlock(&aws_aurora_mutex); - if (mysql_thread___monitor_enabled==true) { - } - -__sleep_monitor_aws_aurora: - t2=monotonic_time(); - if (t2 200000) { - st = 200000; - } - usleep(st); - } -*/ usleep(10000); } if (mysql_thr) { @@ -6324,6 +6348,18 @@ unsigned int MySQL_Monitor::estimate_lag(char* server_id, AWS_Aurora_status_entr return mlag; } +void print_aws_aurora_status_entry(AWS_Aurora_status_entry* aase) { + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (AWS_Aurora_replica_host_status_entry* hse : *aase->host_statuses) { + if (hse) { + fprintf(stderr,"%s %s %s %f %f\n", hse->server_id, hse->session_id, hse->last_update_timestamp, hse->replica_lag_ms , hse->cpu); + } + } + } + } +} + void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int rHG, AWS_Aurora_status_entry **lasts_ase, unsigned int ase_idx, unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks) { #ifdef TEST_AURORA unsigned int i = 0; @@ -6341,16 +6377,7 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r for (i=0; i < N_L_ASE; i++) { AWS_Aurora_status_entry *aase = lasts_ase[i]; if (ev == true || i == ase_idx) { - if (aase && aase->start_time) { - if ( aase->host_statuses->size() ) { - for (std::vector::iterator it3 = aase->host_statuses->begin(); it3!=aase->host_statuses->end(); ++it3) { - AWS_Aurora_replica_host_status_entry *hse = *it3; - if (hse) { - fprintf(stderr,"%s %s %s %f %f\n", hse->server_id, hse->session_id, hse->last_update_timestamp, hse->replica_lag_ms , hse->cpu); - } - } - } - } + print_aws_aurora_status_entry(aase); } } } @@ -6384,7 +6411,7 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r if (strcmp(prev_hse->server_id,hse->server_id)==0) { bool prev_enabled = true; - unsigned int prev_lag_ms = estimate_lag(hse->server_id, lasts_ase, ase_idx, add_lag_ms, min_lag_ms, lag_num_checks); + unsigned int prev_lag_ms = estimate_lag(hse->server_id, lasts_ase, prev_ase_idx, add_lag_ms, min_lag_ms, lag_num_checks); if (prev_lag_ms > max_latency_ms) { prev_enabled = false; } @@ -7563,7 +7590,7 @@ bool MySQL_Monitor::monitor_group_replication_process_ready_tasks_2( for (MySQL_Monitor_State_Data* mmsd : mmsds) { const MySQL_Monitor_State_Data_Task_Result task_result = mmsd->get_task_result(); assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); - async_gr_mon_actions_handler(mmsd, false); + async_gr_mon_actions_handler(mmsd); } return true; diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 5e46ffc161..b106e42759 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -13942,17 +13942,13 @@ void ProxySQL_Admin::enable_galera_testing() { } #endif // TEST_GALERA #ifdef TEST_AURORA -void ProxySQL_Admin::enable_aurora_testing() { - proxy_info("Admin is enabling AWS Aurora Testing using SQLite3 Server and HGs from 1271 to 1276\n"); + +void ProxySQL_Admin::enable_aurora_testing_populate_mysql_servers() { sqlite3_stmt *statement=NULL; - //sqlite3 *mydb3=admindb->get_db(); unsigned int num_aurora_servers = GloSQLite3Server->num_aurora_servers[0]; - int rc; - mysql_servers_wrlock(); admindb->execute("DELETE FROM mysql_servers WHERE hostgroup_id BETWEEN 1271 AND 1276"); char *query=(char *)"INSERT INTO mysql_servers (hostgroup_id,hostname,use_ssl,comment) VALUES (?1, ?2, ?3, ?4)"; - //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query, -1, &statement, 0); - rc = admindb->prepare_v2(query, &statement); + int rc = admindb->prepare_v2(query, &statement); ASSERT_SQLITE_OK(rc, admindb); for (unsigned int j=1; j<4; j++) { proxy_info("Admin is enabling AWS Aurora Testing using SQLite3 Server and HGs 127%d and 127%d\n" , j*2-1 , j*2); @@ -13966,7 +13962,6 @@ void ProxySQL_Admin::enable_aurora_testing() { } else { if (j==3) { serverid = "host.1." + std::to_string(i+11) + ".aws-test.com"; - //serverid = "host." + std::to_string(j) + "." + std::to_string(i+11) + ".aws.3.test.com"; } } } @@ -13982,24 +13977,35 @@ void ProxySQL_Admin::enable_aurora_testing() { } } (*proxy_sqlite3_finalize)(statement); +} + +void ProxySQL_Admin::enable_aurora_testing_populate_mysql_aurora_hostgroups() { +#ifndef TEST_AURORA_RANDOM + admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1271, 1272, 1, '.aws-test.com', 25, 1000, 90, 1, 1, 10, 20, 5, 'Automated Aurora Testing Cluster 1')"); + admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1273, 1274, 1, '.cluster2.aws.test', 25, 1000, 90, 0, 1, 10, 20, 5, 'Automated Aurora Testing Cluster 2')"); + admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1275, 1276, 1, '.aws-test.com', 25, 1000, 90, 0, 2, 10, 20, 5, 'Automated Aurora Testing Cluster 3')"); +#else admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1271, 1272, 1, '.aws-test.com', 25, 120, 90, 1, 1, 10, 20, 5, 'Automated Aurora Testing Cluster 1')"); admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1273, 1274, 1, '.cluster2.aws.test', 25, 120, 90, 0, 1, 10, 20, 5, 'Automated Aurora Testing Cluster 2')"); admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1275, 1276, 1, '.aws-test.com', 25, 120, 90, 0, 2, 10, 20, 5, 'Automated Aurora Testing Cluster 3')"); - //admindb->execute("INSERT INTO mysql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) VALUES (1275, 1276, 1, '.aws.3.test.com', 25, 120, 90, 0, 2, 10, 20, 5, 'Automated Aurora Testing Cluster 3')"); +#endif admindb->execute("UPDATE mysql_aws_aurora_hostgroups SET active=1"); - //admindb->execute("update mysql_servers set max_replication_lag=20"); +} + +void ProxySQL_Admin::enable_aurora_testing() { + proxy_info("Admin is enabling AWS Aurora Testing using SQLite3 Server and HGs from 1271 to 1276\n"); + mysql_servers_wrlock(); + enable_aurora_testing_populate_mysql_servers(); + enable_aurora_testing_populate_mysql_aurora_hostgroups(); load_mysql_servers_to_runtime(); mysql_servers_wrunlock(); - //admindb->execute("UPDATE global_variables SET variable_value=3000 WHERE variable_name='mysql-monitor_ping_interval'"); - //admindb->execute("UPDATE global_variables SET variable_value=1500 WHERE variable_name='mysql-monitor_ping_timeout'"); - //admindb->execute("UPDATE global_variables SET variable_value=3000 WHERE variable_name='mysql-monitor_replication_lag_interval'"); - //admindb->execute("UPDATE global_variables SET variable_value=1500 WHERE variable_name='mysql-monitor_replication_lag_timeout'"); - admindb->execute("UPDATE global_variables SET variable_value=200 WHERE variable_name='mysql-monitor_ping_interval'"); + admindb->execute("UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-monitor_ping_interval'"); admindb->execute("UPDATE global_variables SET variable_value=3000 WHERE variable_name='mysql-monitor_ping_timeout'"); - admindb->execute("UPDATE global_variables SET variable_value=200 WHERE variable_name='mysql-monitor_replication_lag_interval'"); + admindb->execute("UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-monitor_replication_lag_interval'"); admindb->execute("UPDATE global_variables SET variable_value=3000 WHERE variable_name='mysql-monitor_replication_lag_timeout'"); admindb->execute("UPDATE global_variables SET variable_value='percona.heartbeat' WHERE variable_name='mysql-monitor_replication_lag_use_percona_heartbeat'"); load_mysql_variables_to_runtime(); + admindb->execute("DELETE FROM mysql_users WHERE username LIKE '%aurora%'"); admindb->execute("INSERT INTO mysql_users (username,password,default_hostgroup) VALUES ('aurora1','pass1',1271), ('aurora2','pass2',1273), ('aurora3','pass3',1275)"); init_users(); admindb->execute("INSERT INTO mysql_query_rules (active, username, match_pattern, destination_hostgroup, apply) VALUES (1, 'aurora1', '^SELECT.*max_lag_ms', 1272, 1)"); diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index 6d9d42f9f0..63d39983d6 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -967,17 +967,6 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { } } -std::string get_checksum_from_hash(uint64_t hash) { - uint32_t d32[2] = { 0 }; - memcpy(&d32, &hash, sizeof(hash)); - - vector s_buf(20, 0); - sprintf(&s_buf[0],"0x%0X%0X", d32[0], d32[1]); - replace_checksum_zeros(&s_buf[0]); - - return string { &s_buf.front() }; -} - /** * @brief Computes the checksum from a MySQL resultset in the same we already do in 'SQLite3_result::raw_checksum'. * @details For each received column computing the field length via 'strlen' is required, this is because we diff --git a/lib/proxysql_utils.cpp b/lib/proxysql_utils.cpp index 27d8051beb..8f2841cee4 100644 --- a/lib/proxysql_utils.cpp +++ b/lib/proxysql_utils.cpp @@ -1,5 +1,6 @@ #include "proxysql_utils.h" +#include #include #include #include @@ -13,7 +14,9 @@ #include #include +using std::function; using std::string; +using std::unique_ptr; using std::vector; __attribute__((__format__ (__printf__, 1, 2))) @@ -367,6 +370,18 @@ int wexecvp( return child_err; } +std::vector split_str(const std::string& s, char delimiter) { + std::vector tokens {}; + std::string token {}; + std::istringstream tokenStream(s); + + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + + return tokens; +} + std::string replace_str(const std::string& str, const std::string& match, const std::string& repl) { if(match.empty()) { return str; @@ -431,3 +446,24 @@ void close_all_non_term_fd(std::vector excludeFDs) { } } } + +std::string get_checksum_from_hash(uint64_t hash) { + uint32_t d32[2] = { 0 }; + memcpy(&d32, &hash, sizeof(hash)); + + vector s_buf(ProxySQL_Checksum_Value_LENGTH, 0); + sprintf(&s_buf[0],"0x%0X%0X", d32[0], d32[1]); + replace_checksum_zeros(&s_buf[0]); + + return string { &s_buf.front() }; +} + +void remove_sqlite3_resultset_rows( + unique_ptr& resultset, const function& pred +) { + const vector::iterator remove_it { + std::remove_if(resultset->rows.begin(), resultset->rows.end(), pred) + }; + resultset->rows.erase(remove_it, resultset->rows.end()); + resultset->rows_count = resultset->rows.size(); +} diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index c7e72aa013..ea55fa43a0 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -33,6 +33,8 @@ #include #include +using std::string; + #define SELECT_VERSION_COMMENT "select @@version_comment limit 1" #define SELECT_VERSION_COMMENT_LEN 32 #define SELECT_DB_USER "select DATABASE(), USER() limit 1" @@ -317,6 +319,44 @@ bool match_monitor_query(const std::string& monitor_query, const std::string& qu } #endif // TEST_GROUPREP +#ifdef TEST_AURORA + +using std::vector; + +using aurora_hg_info_t = std::tuple; +enum AURORA_HG_INFO { + WRITER_HG, + READER_HG, + DOMAIN_NAME +}; + +vector get_hgs_info(SQLite3DB* db) { + vector whgs {}; + + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + + GloAdmin->admindb->execute_statement( + "SELECT writer_hostgroup,reader_hostgroup,domain_name FROM mysql_aws_aurora_hostgroups", + &error, &cols, &affected_rows, &resultset + ); + + for (const SQLite3_row* r : resultset->rows) { + uint32_t writer_hg = atoi(r->fields[0]); + uint32_t reader_hg = atoi(r->fields[1]); + string domain_name { r->fields[2] }; + + whgs.push_back({writer_hg, reader_hg, domain_name}); + } + + return whgs; +} + + +#endif + void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { char *error=NULL; @@ -735,7 +775,34 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p #ifdef TEST_AURORA if (strstr(query_no_space,(char *)"REPLICA_HOST_STATUS")) { pthread_mutex_lock(&GloSQLite3Server->aurora_mutex); - GloSQLite3Server->populate_aws_aurora_table(sess); + + if (strcasestr(query_no_space, TEST_AURORA_MONITOR_BASE_QUERY)) { + string s_whg { query_no_space + strlen(TEST_AURORA_MONITOR_BASE_QUERY) }; + uint32_t whg = atoi(s_whg.c_str()); + + GloSQLite3Server->populate_aws_aurora_table(sess, whg); + vector hgs_info { get_hgs_info(GloAdmin->admindb) }; + + const auto match_writer = [&whg](const aurora_hg_info_t& hg_info) { + return std::get(hg_info) == whg; + }; + const auto hg_info_it = std::find_if(hgs_info.begin(), hgs_info.end(), match_writer); + string select_query { + "SELECT SERVER_ID,SESSION_ID,LAST_UPDATE_TIMESTAMP,REPLICA_LAG_IN_MILLISECONDS,CPU" + " FROM REPLICA_HOST_STATUS " + }; + + if (hg_info_it == hgs_info.end()) { + select_query += " LIMIT 0"; + } else { + const string& domain_name { std::get(*hg_info_it) }; + select_query += " WHERE DOMAIN_NAME='" + domain_name + "' ORDER BY SERVER_ID"; + } + + free(query); + query = static_cast(malloc(select_query.length() + 1)); + strcpy(query, select_query.c_str()); + } } #endif // TEST_AURORA #ifdef TEST_GALERA @@ -751,7 +818,7 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p // NOTE: This query should be in one place that can be reused by // 'ProxySQL_Monitor' module. const std::string grouprep_monitor_test_query_start { - "SELECT viable_candidate,read_only,transactions_behind " + "SELECT viable_candidate,read_only,transactions_behind,members " "FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS " }; @@ -769,14 +836,15 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p free(query); std::string t_select_as_query { - "SELECT '%s' AS viable_candidate, '%s' AS read_only, %d AS transactions_behind" + "SELECT '%s' AS viable_candidate, '%s' AS read_only, %d AS transactions_behind, '%s' AS members" }; std::string select_as_query {}; string_format( t_select_as_query, select_as_query, std::get<0>(gr_srv_status) ? "YES" : "NO", std::get<1>(gr_srv_status) ? "YES" : "NO", - std::get<2>(gr_srv_status) + std::get<2>(gr_srv_status), + std::get<3>(gr_srv_status).c_str() ); query = static_cast(malloc(select_as_query.length() + 1)); @@ -842,10 +910,12 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p #ifdef TEST_AURORA if (strstr(query_no_space,(char *)"REPLICA_HOST_STATUS")) { pthread_mutex_unlock(&GloSQLite3Server->aurora_mutex); +#ifdef TEST_AURORA_RANDOM if (rand() % 100 == 0) { // randomly add some latency on 1% of the traffic sleep(2); } +#endif } #endif // TEST_AURORA #ifdef TEST_GALERA @@ -913,7 +983,7 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p #ifdef TEST_GROUPREP group_rep_status SQLite3_Server::grouprep_test_value(const std::string& srv_addr) { - group_rep_status cur_srv_st { "YES", "YES", 0 }; + group_rep_status cur_srv_st { "YES", "YES", 0, "" }; auto it = grouprep_map.find(srv_addr); if (it != grouprep_map.end()) { @@ -1378,27 +1448,187 @@ void SQLite3_Server::populate_galera_table(MySQL_Session *sess) { #endif // TEST_GALERA #ifdef TEST_AURORA -void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) { - // this function needs to be called with lock on mutex aurora_mutex already acquired - sessdb->execute("DELETE FROM REPLICA_HOST_STATUS"); - sqlite3_stmt *statement=NULL; - //sqlite3 *mydb3=sessdb->get_db(); - int rc; - char *query=(char *)"INSERT INTO REPLICA_HOST_STATUS VALUES (?1, ?2, ?3, ?4, ?5)"; - //rc=sqlite3_prepare_v2(mydb3, query, -1, &statement, 0); - rc = sessdb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, sessdb); + +float get_rand_cpu() { + int cpu_i = rand() % 10000; + float cpu = static_cast(cpu_i) / 100; + + return cpu; +} + +string get_curtime_str() { time_t __timer; char lut[30]; struct tm __tm_info; time(&__timer); localtime_r(&__timer, &__tm_info); strftime(lut, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); + string s = string(lut); + return s; +} + +void bind_query_params( + SQLite3DB* db, + sqlite3_stmt* stmt, + const string& server_id, + const string& domain, + const string& session_id, + float cpu, + const string& lut, + int32_t lag_ms +) { + int rc = 0; + + rc=sqlite3_bind_text(stmt, 1, server_id.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc=sqlite3_bind_text(stmt, 2, domain.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc=sqlite3_bind_text(stmt, 3, session_id.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc=sqlite3_bind_double(stmt, 4, cpu); ASSERT_SQLITE_OK(rc, db); + rc=sqlite3_bind_text(stmt, 5, lut.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc=sqlite3_bind_double(stmt, 6, lag_ms); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(stmt); + rc=sqlite3_clear_bindings(stmt); ASSERT_SQLITE_OK(rc, db); + rc=sqlite3_reset(stmt); ASSERT_SQLITE_OK(rc, db); +} + +/** + * @brief Extracts SERVER_ID from the supplied hostname using DOMAIN_NAME. + * @param hostname The server hostname (SERVER_ID + DOMAIN_NAME)). + * @param domain_name The server DOMAIN_NAME as in 'mysql_aws_aurora_hostgroups' + * @return Either the SERVER_ID in the supplied hostname or empty if DOMAIN_NAME failed to match. + */ +string get_server_id(const string& hostname, const string& domain_name) { + string::size_type pos = hostname.find(domain_name); + + if (pos == string::npos) { + return {}; + } else { + return hostname.substr(0, pos); + } +} + +void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess, uint32_t whg) { + int rc = 0; + sqlite3_stmt* stmt = NULL; + const char query[] { "INSERT INTO REPLICA_HOST_STATUS VALUES (?1, ?2, ?3, ?4, ?5, ?6)" }; + + rc = sessdb->prepare_v2(query, &stmt); + ASSERT_SQLITE_OK(rc, sessdb); + +#ifndef TEST_AURORA_RANDOM + SQLite3_result* host_status = NULL; + + { + char* error = NULL; + int cols = 0; + int affected_rows = 0; + + string query { + "SELECT SERVER_ID,DOMAIN_NAME,SESSION_ID,LAST_UPDATE_TIMESTAMP,REPLICA_LAG_IN_MILLISECONDS" + " FROM REPLICA_HOST_STATUS" + }; + sessdb->execute_statement(query.c_str(), &error, &cols, &affected_rows, &host_status); + } + + // If empty, we fill the map with sensible defaults for performing manual testing. + if (host_status->rows.empty()) { + vector hgs_info { get_hgs_info(GloAdmin->admindb) }; + SQLite3_result* resultset = nullptr; + + { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + + GloAdmin->admindb->execute_statement( + "SELECT hostname, hostgroup_id FROM mysql_servers WHERE hostgroup_id BETWEEN 1270 AND 1300" + " GROUP BY HOSTNAME", + &error, &cols, &affected_rows, &resultset + ); + } + + sessdb->execute("DELETE FROM REPLICA_HOST_STATUS"); + vector proc_srvs {}; + + for (const aurora_hg_info_t& hg_info : hgs_info) { + const auto match_writer = [&hg_info](const SQLite3_row* row) { + return atoi(row->fields[1]) == std::get(hg_info); + }; + const auto mysrv_it = std::find_if(resultset->rows.begin(), resultset->rows.end(), match_writer); + bool writer_set = false; + + for (const SQLite3_row* r : resultset->rows) { + const string srv_hostname { r->fields[0] }; + const uint32_t srv_hg_id = atoi(r->fields[1]); + const string& aurora_domain { std::get(hg_info) }; + + if ( + srv_hostname.find(aurora_domain) == string::npos || + std::find(proc_srvs.begin(), proc_srvs.end(), srv_hostname) != proc_srvs.end() + ) { + continue; + } + + const string server_id { + get_server_id(srv_hostname, std::get(hg_info)) + }; + + string session_id {}; + + if ( + (mysrv_it == resultset->rows.end() && writer_set == false) || + (srv_hg_id == std::get(hg_info) && writer_set == false) + ) { + session_id = "MASTER_SESSION_ID"; + writer_set = true; + } else { + session_id = "TESTID-" + server_id + aurora_domain + "-R"; + } + + const float cpu = get_rand_cpu(); + const string lut { get_curtime_str() }; + const int lag_ms = 0; + + bind_query_params(sessdb, stmt, server_id, aurora_domain, session_id, cpu, lut, lag_ms); + proc_srvs.push_back(srv_hostname); + } + } + + sqlite3_finalize(stmt); + delete resultset; + } else { + // We just re-generate deterministic 'SESSION_IDS', preserving 'MASTER_SESSION_ID' values: + // 'SESSION_IDS' are preserved, 'MASTER_SESSION_ID' or others. + for (SQLite3_row* row : host_status->rows) { + const char* server_id = row->fields[0]; + const char* domain_name = row->fields[1]; + + const char update_query_t[] { + "UPDATE REPLICA_HOST_STATUS SET SESSION_ID='%s',CPU=%f,LAST_UPDATE_TIMESTAMP='%s'" + " WHERE SERVER_ID='%s' AND DOMAIN_NAME='%s' AND SESSION_ID!='MASTER_SESSION_ID'" + }; + + const string session_id { "TESTID-" + string { server_id } + domain_name + "-R" }; + const float cpu = get_rand_cpu(); + const string lut { get_curtime_str() }; + + const string update_query { + cstr_format(update_query_t, session_id.c_str(), cpu, lut.c_str(), server_id, domain_name).str + }; + + sessdb->execute(update_query.c_str()); + } + } + + delete host_status; +#else + sessdb->execute("DELETE FROM REPLICA_HOST_STATUS"); + + string lut { get_curtime_str() }; string myip = string(sess->client_myds->proxy_addr.addr); string clu_id_s = myip.substr(6,1); unsigned int cluster_id = atoi(clu_id_s.c_str()); cluster_id--; - //if (rand() % 200 == 0) { + if (rand() % 20000 == 0) { // simulate a failover cur_aurora_writer[cluster_id] = rand() % num_aurora_servers[cluster_id]; @@ -1419,14 +1649,12 @@ void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) { } } for (unsigned int i=0; ifields[2] } == "YES" ? true : false, std::string { r->fields[3] } == "YES" ? true : false, - atoi(r->fields[4]) + atoi(r->fields[4]), + std::string { r->fields[5] } }; this->grouprep_map[srv_addr] = srv_status; @@ -1509,16 +1730,16 @@ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind int hostgroup_id = atoi(r->fields[2]); const std::string t_insert_query { "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS" - " (hostname, port, viable_candidate, read_only, transactions_behind) VALUES" - " ('%s', %d, '%s', '%s', 0)" + " (hostname, port, viable_candidate, read_only, transactions_behind, members) VALUES" + " ('%s', %d, '%s', '%s', 0, '%s')" }; std::string insert_query {}; if (hostgroup_id % 4 == 0) { - string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "NO"); + string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "NO", ""); sessdb->execute(insert_query.c_str()); } else { - string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "YES"); + string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "YES", ""); sessdb->execute(insert_query.c_str()); } } @@ -1582,7 +1803,12 @@ bool SQLite3_Server::init() { tables_defs_aurora = new std::vector; insert_into_tables_defs(tables_defs_aurora, (const char *)"REPLICA_HOST_STATUS", - (const char *)"CREATE TABLE REPLICA_HOST_STATUS (SERVER_ID VARCHAR NOT NULL, SESSION_ID VARCHAR NOT NULL, CPU REAL NOT NULL, LAST_UPDATE_TIMESTAMP VARCHAR NOT NULL, REPLICA_LAG_IN_MILLISECONDS REAL NOT NULL)"); + "CREATE TABLE REPLICA_HOST_STATUS (" + " SERVER_ID VARCHAR NOT NULL , DOMAIN_NAME VARCHAR NOT NULL , SESSION_ID VARCHAR NOT NULL ," + " CPU REAL NOT NULL , LAST_UPDATE_TIMESTAMP VARCHAR NOT NULL , REPLICA_LAG_IN_MILLISECONDS REAL NOT NULL ," + " PRIMARY KEY (SERVER_ID, DOMAIN_NAME)" + ")" + ); check_and_build_standard_tables(sessdb, tables_defs_aurora); GloAdmin->enable_aurora_testing(); #endif // TEST_AURORA @@ -1599,7 +1825,7 @@ bool SQLite3_Server::init() { insert_into_tables_defs(tables_defs_grouprep, (const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS", (const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS (" - "hostname VARCHAR NOT NULL, port INT NOT NULL, viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null, PRIMARY KEY (hostname, port)" + "hostname VARCHAR NOT NULL, port INT NOT NULL, viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null, members VARCHAR NOT NULL, PRIMARY KEY (hostname, port)" ")" );