From 705f50e0f988ae82bcfb446fea05dbcb0c171aea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 26 Sep 2024 11:46:05 +0200 Subject: [PATCH 1/3] Add initial POC for PostgreSQL monitoring support --- include/PgSQL_HostGroups_Manager.h | 2 +- include/PgSQL_Monitor.hpp | 71 ++ include/PgSQL_Thread.h | 6 +- include/PgSQL_Variables.h | 1 - include/cpp.h | 1 + include/proxysql_structs.h | 31 + lib/Base_HostGroups_Manager.cpp | 1 + lib/Makefile | 2 +- lib/PgSQL_Monitor.cpp | 1735 ++++++++++++++++++++++++++++ lib/PgSQL_Thread.cpp | 72 +- lib/ProxySQL_Poll.cpp | 2 +- src/main.cpp | 30 +- 12 files changed, 1926 insertions(+), 28 deletions(-) create mode 100644 include/PgSQL_Monitor.hpp create mode 100644 lib/PgSQL_Monitor.cpp diff --git a/include/PgSQL_HostGroups_Manager.h b/include/PgSQL_HostGroups_Manager.h index 8a8e94f04..4662107db 100644 --- a/include/PgSQL_HostGroups_Manager.h +++ b/include/PgSQL_HostGroups_Manager.h @@ -585,7 +585,7 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { /** * @brief Mutex used to guard 'pgsql_servers_to_monitor' resulset. */ - std::mutex pgsql_servers_to_monitor_mutex; + std::mutex pgsql_servers_to_monitor_mutex {}; /** * @brief Resulset containing the latest 'pgsql_servers' present in 'mydb'. * @details This resulset should be updated via 'update_table_pgsql_servers_for_monitor' each time actions diff --git a/include/PgSQL_Monitor.hpp b/include/PgSQL_Monitor.hpp new file mode 100644 index 000000000..6a6b9e18d --- /dev/null +++ b/include/PgSQL_Monitor.hpp @@ -0,0 +1,71 @@ +#ifndef __PGSQL_MONITOR_H +#define __PGSQL_MONITOR_H + +#include "libpq-fe.h" + +#include "sqlite3db.h" +#include "proxysql_structs.h" + +#include +#include +#include + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_CONNECT_LOG "CREATE TABLE pgsql_server_connect_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , connect_success_time_us INT DEFAULT 0 , connect_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_PING_LOG "CREATE TABLE pgsql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , ping_success_time_us INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVERS "CREATE TABLE pgsql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port) )" + +#define MONITOR_SQLITE_TABLE_PROXYSQL_SERVERS "CREATE TABLE proxysql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostname, port) )" + +struct PgSQL_Monitor { + // @brief Flags if monitoring threads should be shutdown. + bool shutdown = false; + // @brief Mutex to hold to update `monitor_internal.pgsql_servers` + std::mutex pgsql_srvs_mutex {}; + // @brief Mutex to hold to update/read `pgsql_servers` to monitor + std::mutex pgsql_srvs_to_monitor_mutex {}; + // @brief Used to access monitor database + SQLite3DB monitordb {}; + // @brief Used to access internal monitor database + SQLite3DB monitor_internal_db {}; + // Internal counters for metrics + /////////////////////////////////////////////////////////////////////////// + uint64_t connect_check_ERR { 0 }; + uint64_t connect_check_OK { 0 }; + uint64_t ping_check_ERR { 0 }; + uint64_t ping_check_OK { 0 }; + /////////////////////////////////////////////////////////////////////////// + + std::vector tables_defs_monitor { + { + const_cast("pgsql_server_connect_log"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_CONNECT_LOG) + }, + { + const_cast("pgsql_server_ping_log"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_PING_LOG) + } + }; + + std::vector tables_defs_monitor_internal { + { + const_cast("pgsql_servers"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVERS) + } + }; + + PgSQL_Monitor(); +}; + +struct pgsql_conn_t { + PGconn* conn { nullptr }; + int fd { 0 }; + uint64_t last_used { 0 }; + ASYNC_ST state { ASYNC_ST::ASYNC_CONNECT_FAILED }; + mf_unique_ptr err {}; +}; + +void* PgSQL_monitor_scheduler_thread(); + +#endif diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 1f86acacb..672694eaf 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -6,7 +6,6 @@ #include "proxysql.h" #include "Base_Thread.h" -#include "cpp.h" #include "ProxySQL_Poll.h" #include "PgSQL_Variables.h" #ifdef IDLE_THREADS @@ -825,6 +824,7 @@ class PgSQL_Threads_Handler //! Read only check timeout. Unit: 'ms'. int monitor_replication_lag_timeout; int monitor_replication_lag_count; +/* TODO: Remove int monitor_groupreplication_healthcheck_interval; int monitor_groupreplication_healthcheck_timeout; int monitor_groupreplication_healthcheck_max_timeout_count; @@ -836,9 +836,13 @@ class PgSQL_Threads_Handler int monitor_query_interval; int monitor_query_timeout; int monitor_slave_lag_when_null; +*/ + int monitor_threads; +/* TODO: Remove int monitor_threads_min; int monitor_threads_max; int monitor_threads_queue_maxsize; +*/ int monitor_local_dns_cache_ttl; int monitor_local_dns_cache_refresh_interval; int monitor_local_dns_resolver_queue_maxsize; diff --git a/include/PgSQL_Variables.h b/include/PgSQL_Variables.h index d8a5ccbef..d276181d1 100644 --- a/include/PgSQL_Variables.h +++ b/include/PgSQL_Variables.h @@ -2,7 +2,6 @@ #define PGSQL_VARIABLES_H #include "proxysql.h" -#include "cpp.h" #include #include diff --git a/include/cpp.h b/include/cpp.h index 73a9b078c..12f965997 100644 --- a/include/cpp.h +++ b/include/cpp.h @@ -15,6 +15,7 @@ #include "sqlite3db.h" //#include "StatCounters.h" #include "MySQL_Monitor.hpp" +#include "PgSQL_Monitor.hpp" //#include "MySQL_Protocol.h" //#include "MySQL_Authentication.hpp" //#include "MySQL_LDAP_Authentication.hpp" diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 268f5d27e..47b149081 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -647,6 +647,7 @@ enum PROXYSQL_MYSQL_ERR { ER_PROXYSQL_AWS_HEALTH_CHECK_CONN_TIMEOUT = 9017, ER_PROXYSQL_AWS_HEALTH_CHECK_TIMEOUT = 9018, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG = 9019, + ER_PROXYSQL_CONNECT_TIMEOUT = 9020, }; enum proxysql_session_type { @@ -1079,6 +1080,21 @@ __thread char* pgsql_thread___firewall_whitelist_errormsg; __thread bool pgsql_thread___firewall_whitelist_enabled; __thread int pgsql_thread___query_processor_iterations; __thread int pgsql_thread___query_processor_regex; + +__thread bool pgsql_thread___monitor_enabled; +__thread int pgsql_thread___monitor_history; +__thread int pgsql_thread___monitor_connect_interval; +__thread int pgsql_thread___monitor_connect_timeout; +__thread int pgsql_thread___monitor_ping_interval; +__thread int pgsql_thread___monitor_ping_max_failures; +__thread int pgsql_thread___monitor_ping_timeout; +__thread int pgsql_thread___monitor_read_only_interval; +__thread int pgsql_thread___monitor_read_only_timeout; +__thread int pgsql_thread___monitor_read_only_max_timeout_count; +__thread int pgsql_thread___monitor_threads; +__thread char* pgsql_thread___monitor_username; +__thread char* pgsql_thread___monitor_password; + //--------------------------- __thread char *mysql_thread___default_schema; @@ -1351,6 +1367,21 @@ extern __thread char* pgsql_thread___firewall_whitelist_errormsg; extern __thread bool pgsql_thread___firewall_whitelist_enabled; extern __thread int pgsql_thread___query_processor_iterations; extern __thread int pgsql_thread___query_processor_regex; + +extern __thread bool pgsql_thread___monitor_enabled; +extern __thread int pgsql_thread___monitor_history; +extern __thread int pgsql_thread___monitor_connect_interval; +extern __thread int pgsql_thread___monitor_connect_timeout; +extern __thread int pgsql_thread___monitor_ping_interval; +extern __thread int pgsql_thread___monitor_ping_max_failures; +extern __thread int pgsql_thread___monitor_ping_timeout; +extern __thread int pgsql_thread___monitor_read_only_interval; +extern __thread int pgsql_thread___monitor_read_only_timeout; +extern __thread int pgsql_thread___monitor_read_only_max_timeout_count; +extern __thread int pgsql_thread___monitor_threads; +extern __thread char* pgsql_thread___monitor_username; +extern __thread char* pgsql_thread___monitor_password; + //--------------------------- extern __thread char *mysql_thread___default_schema; diff --git a/lib/Base_HostGroups_Manager.cpp b/lib/Base_HostGroups_Manager.cpp index 4a27e7888..734b94448 100644 --- a/lib/Base_HostGroups_Manager.cpp +++ b/lib/Base_HostGroups_Manager.cpp @@ -53,6 +53,7 @@ template void Base_HostGroups_Manager::wrlock(); template void Base_HostGroups_Manager::wrunlock(); template SQLite3_result * Base_HostGroups_Manager::execute_query(char*, char**); +template SQLite3_result * Base_HostGroups_Manager::execute_query(char*, char**); #if 0 #define SAFE_SQLITE3_STEP(_stmt) do {\ diff --git a/lib/Makefile b/lib/Makefile index d80d97b98..8585f4f40 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -147,7 +147,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo Base_Session.oo Base_Thread.oo \ proxy_protocol_info.oo \ proxysql_find_charset.oo ProxySQL_Poll.oo \ - PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo + PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo PgSQL_Monitor.oo OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX)) HEADERS := ../include/*.h ../include/*.hpp diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp new file mode 100644 index 000000000..8fa95d9b3 --- /dev/null +++ b/lib/PgSQL_Monitor.cpp @@ -0,0 +1,1735 @@ +#include "PgSQL_HostGroups_Manager.h" +#include "PgSQL_Monitor.hpp" +#include "PgSQL_Thread.h" + +#include "gen_utils.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using std::function; +using std::unique_ptr; +using std::vector; +using std::list; + +extern PgSQL_Monitor* GloPgMon; +extern PgSQL_Threads_Handler* GloPTH; + +const char RESP_SERVERS_QUERY_T[] { + "SELECT 1 FROM (" + "SELECT hostname,port,ping_error FROM pgsql_server_ping_log" + " WHERE hostname='%s' AND port=%d" + " ORDER BY time_start_us DESC LIMIT %d" + ") a WHERE" + " ping_error IS NOT NULL" + " AND ping_error NOT LIKE '%%password authentication failed for user%%'" + " GROUP BY hostname,port HAVING COUNT(*)=%d" +}; + +bool server_responds_to_ping( + SQLite3DB& db, const char* addr, int port, int max_fails +) { + bool res = true; + + cfmt_t query_fmt { cstr_format(RESP_SERVERS_QUERY_T, addr, port, max_fails, max_fails) }; + + char* err { nullptr }; + unique_ptr result { db.execute_statement(query_fmt.str.c_str(), &err) }; + + if (!err && result && result->rows_count) { + res = false; + } else if (err) { + proxy_error( + "Internal error querying 'pgsql_server_ping_log'. Aborting query=%s error=%s\n", + query_fmt.str.c_str(), err + ); + free(err); + assert(0); + } + + return res; +} + +void check_and_build_standard_tables(SQLite3DB& db, const vector& tables_defs) { + db.execute("PRAGMA foreign_keys = OFF"); + + for (const auto& def : tables_defs) { + db.check_and_build_table(def.table_name, def.table_def); + } + + db.execute("PRAGMA foreign_keys = ON"); +} + +PgSQL_Monitor::PgSQL_Monitor() { + int rc = monitordb.open( + const_cast("file:mem_monitordb?mode=memory&cache=shared"), + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX + ); + assert(rc == 0 && "Failed to open 'monitordb' for PgSQL Monitor"); + + rc = monitor_internal_db.open( + const_cast("file:mem_monitor_internal_db?mode=memory&cache=shared"), + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX + ); + assert(rc == 0 && "Failed to open 'internal_monitordb' for PgSQL Monitor"); + + rc = monitordb.execute( + "ATTACH DATABASE 'file:mem_monitor_internal_db?mode=memory&cache=shared' AS 'monitor_internal'" + ); + assert(rc == 1 && "Failed to attach 'monitor_internal' for PgSQL Monitor"); + + check_and_build_standard_tables(this->monitordb, this->tables_defs_monitor); + check_and_build_standard_tables(this->monitor_internal_db, this->tables_defs_monitor_internal); + + // Explicit index creation + monitordb.execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON pgsql_server_connect_log (time_start_us)"); + monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON pgsql_server_ping_log (time_start_us)"); + // TODO: Futher investigate + monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_2 ON pgsql_server_ping_log (hostname, port, time_start_us)"); +} + +/** + * @brief Initializes the structures related with a PgSQL_Thread. + * @details It doesn't initialize a real thread, just the structures associated with it. + * @return The created and initialized 'PgSQL_Thread'. + */ +unique_ptr init_pgsql_thread_struct() { + unique_ptr pgsql_thr { new PgSQL_Thread() }; + pgsql_thr->curtime = monotonic_time(); + pgsql_thr->refresh_variables(); + + return pgsql_thr; +} + +// Helper function for binding text +void sqlite_bind_text(sqlite3_stmt* stmt, int index, const char* text) { + int rc = (*proxy_sqlite3_bind_text)(stmt, index, text, -1, SQLITE_TRANSIENT); + ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); +} + +// Helper function for binding integers +void sqlite_bind_int(sqlite3_stmt* stmt, int index, int value) { + int rc = (*proxy_sqlite3_bind_int)(stmt, index, value); + ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); +} + +// Helper function for binding 64-bit integers +void sqlite_bind_int64(sqlite3_stmt* stmt, int index, long long value) { + int rc = (*proxy_sqlite3_bind_int64)(stmt, index, value); + ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); +} + +// Helper function for executing a statement +void sqlite_execute_statement(sqlite3_stmt* stmt) { + int rc = 0; + do { + rc = (*proxy_sqlite3_step)(stmt); + if (rc == SQLITE_LOCKED || rc == SQLITE_BUSY) { + usleep(100); + } + } while (rc == SQLITE_LOCKED || rc == SQLITE_BUSY); +} + +// Helper function for clearing bindings +void sqlite_clear_bindings(sqlite3_stmt* stmt) { + int rc = (*proxy_sqlite3_clear_bindings)(stmt); + ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); +} + +// Helper function for resetting a statement +void sqlite_reset_statement(sqlite3_stmt* stmt) { + int rc = (*proxy_sqlite3_reset)(stmt); + ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); +} + +// Helper function for finalizing a statement +void sqlite_finalize_statement(sqlite3_stmt* stmt) { + (*proxy_sqlite3_finalize)(stmt); +} + +void update_monitor_pgsql_servers(SQLite3_result* rs, SQLite3DB* db) { + std::lock_guard monitor_db_guard { GloPgMon->pgsql_srvs_mutex }; + + if (rs != nullptr) { + db->execute("DELETE FROM monitor_internal.pgsql_servers"); + + sqlite3_stmt* stmt1 = nullptr; + int rc = db->prepare_v2( + "INSERT INTO monitor_internal.pgsql_servers VALUES (?, ?, ?, ?)", &stmt1 + ); + ASSERT_SQLITE_OK(rc, db); + + sqlite3_stmt* stmt32 = nullptr; + rc = db->prepare_v2( + ("INSERT INTO monitor_internal.pgsql_servers VALUES " + + generate_multi_rows_query(32, 4)).c_str(), + &stmt32 + ); + ASSERT_SQLITE_OK(rc, db); + + // Iterate through rows + int row_idx = 0; + int max_bulk_row_idx = (rs->rows_count / 32) * 32; + for (const auto& r1 : rs->rows) { + int idx = row_idx % 32; + + if (row_idx < max_bulk_row_idx) { // Bulk insert + sqlite_bind_text(stmt32, (idx * 4) + 1, r1->fields[0]); + sqlite_bind_int64(stmt32, (idx * 4) + 2, std::atoll(r1->fields[1])); + sqlite_bind_int64(stmt32, (idx * 4) + 3, std::atoll(r1->fields[2])); + sqlite_bind_int64(stmt32, (idx * 4) + 4, std::atoll(r1->fields[3])); + + if (idx == 31) { + sqlite_execute_statement(stmt32); + sqlite_clear_bindings(stmt32); + sqlite_reset_statement(stmt32); + } + } else { // Single row insert + sqlite_bind_text(stmt1, 1, r1->fields[0]); + sqlite_bind_int64(stmt1, 2, std::atoll(r1->fields[1])); + sqlite_bind_int64(stmt1, 3, std::atoll(r1->fields[2])); + sqlite_bind_int64(stmt1, 4, std::atoll(r1->fields[3])); + + sqlite_execute_statement(stmt1); + sqlite_clear_bindings(stmt1); + sqlite_reset_statement(stmt1); + } + + row_idx++; + } + + // Finalize statements + sqlite_finalize_statement(stmt1); + sqlite_finalize_statement(stmt32); + } +} + +enum class task_type_t { ping, connect, readonly }; + +struct mon_srv_t { + string addr; + uint16_t port; + bool ssl; +}; + +struct mon_user_t { + string user; + string pass; + string schema; +}; + +struct ping_params_t { + int32_t interval; + int32_t timeout; + int32_t max_failures; +}; + +struct ping_conf_t { + unique_ptr srvs_info; + ping_params_t params; +}; + +struct connect_params_t { + int32_t interval; + int32_t timeout; + int32_t ping_max_failures; + int32_t ping_interval; +}; + +struct connect_conf_t { + unique_ptr srvs_info; + connect_params_t params; +}; + +struct readonly_params_t { + int32_t interval; + int32_t timeout; + int32_t max_timeout_count; +}; + +struct readonly_conf_t { + unique_ptr srvs_info; + readonly_params_t params; +}; + +struct mon_tasks_conf_t { + ping_conf_t ping; + connect_conf_t connect; + readonly_conf_t readonly; + mon_user_t user_info; +}; + +unique_ptr fetch_mon_srvs_conf(PgSQL_Monitor* mon, const char query[]) { + char* err = nullptr; + unique_ptr srvs { mon->monitordb.execute_statement(query, &err) }; + + if (err) { + proxy_error("SQLite3 error. Shutting down msg=%s\n", err); + free(err); + assert(0); + } + + return srvs; +} + +unique_ptr fetch_hgm_srvs_conf(PgSQL_HostGroups_Manager* hgm, const char query[]) { + char* err = nullptr; + unique_ptr srvs { hgm->execute_query(const_cast(query), &err) }; + + if (err) { + proxy_error("SQLite3 error. Shutting down msg=%s\n", err); + free(err); + assert(0); + } + + return srvs; +} + +vector ext_srvs(const unique_ptr& srvs_info) { + vector srvs {}; + + for (const auto& row : srvs_info->rows) { + srvs.push_back({ + string { row->fields[0] }, + static_cast(std::atoi(row->fields[1])), + static_cast(std::atoi(row->fields[2])) + }); + } + + return srvs; +} + +// First part of fetchStatusConfig :: [(resulset,config)] +mon_tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager* hgm) { + // Update the 'monitor_internal.pgsql_servers' servers info. + { + try { + std::lock_guard pgsql_srvs_guard(hgm->pgsql_servers_to_monitor_mutex); + update_monitor_pgsql_servers(hgm->pgsql_servers_to_monitor, &GloPgMon->monitordb); + } catch (const std::exception& e) { + proxy_error("Exception e=%s\n", e.what()); + } + } + + unique_ptr ping_srvrs { fetch_mon_srvs_conf(mon, + "SELECT hostname, port, MAX(use_ssl) use_ssl FROM monitor_internal.pgsql_servers" + " GROUP BY hostname, port ORDER BY RANDOM()" + )}; + + unique_ptr connect_srvrs { fetch_mon_srvs_conf(mon, + "SELECT hostname, port, MAX(use_ssl) use_ssl FROM monitor_internal.pgsql_servers" + " GROUP BY hostname, port ORDER BY RANDOM()" + )}; + + unique_ptr readonly_srvs { fetch_hgm_srvs_conf(hgm, + "SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup" + " FROM pgsql_servers JOIN pgsql_replication_hostgroups" + " ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup" + " WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()" + )}; + + + return mon_tasks_conf_t { + ping_conf_t { + std::move(ping_srvrs), + ping_params_t { + pgsql_thread___monitor_ping_interval * 1000, + pgsql_thread___monitor_ping_timeout * 1000, + pgsql_thread___monitor_ping_max_failures + } + }, + connect_conf_t { + std::move(connect_srvrs), + connect_params_t { + pgsql_thread___monitor_connect_interval * 1000, + pgsql_thread___monitor_connect_timeout * 1000, + // TODO: Revisit this logic; For now identical to previous + // - Used for server responsiveness + pgsql_thread___monitor_ping_max_failures, + // - Used for connection cleanup + pgsql_thread___monitor_ping_interval * 1000 + } + }, + readonly_conf_t { + std::move(readonly_srvs), + readonly_params_t { + pgsql_thread___monitor_read_only_interval * 1000, + pgsql_thread___monitor_read_only_timeout * 1000, + pgsql_thread___monitor_read_only_max_timeout_count + } + }, + mon_user_t { + pgsql_thread___monitor_username, + pgsql_thread___monitor_password + } + }; +} + +using task_params_t = std::unique_ptr>; + +struct op_st_t { + uint64_t start; + uint64_t end; + mon_srv_t srv_info; + mon_user_t user_info; + task_params_t task_params; +}; + +struct task_st_t { + uint64_t start; + uint64_t end; + task_type_t type; + op_st_t op_st; +}; + +struct state_t { + pgsql_conn_t conn; + task_st_t task; +}; + +enum class task_status_t { success, failure }; + +mf_unique_ptr strdup_no_lf(const char* input) { + if (input == nullptr) return nullptr; + size_t length = std::strlen(input); + + if (length > 0 && input[length - 1] == '\n') { + length--; + } + + char* result = static_cast(malloc(length + 1)); + + std::strncpy(result, input, length); + result[length] = '\0'; + + return mf_unique_ptr(result); +} + +short handle_pg_event(state_t& st, short event) { + pgsql_conn_t& pgconn { st.conn }; + short req_event = 0; + +#ifdef DEBUG + const char* host { PQhostaddr(pgconn.conn) }; + const char* port { PQport(pgconn.conn) }; + + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Handling event for conn fd=%d addr='%s:%s' event=%d state=%d thread=%lu\n", + pgconn.fd, host, port, event, st.conn.state, pthread_self() + ); +#endif + +next_immediate: + + switch (pgconn.state) { + case ASYNC_ST::ASYNC_CONNECT_CONT: { + PostgresPollingStatusType poll_res = PQconnectPoll(pgconn.conn); + + switch (poll_res) { + case PGRES_POLLING_WRITING: + // Continue writing + req_event |= POLLOUT; + break; + case PGRES_POLLING_ACTIVE: + case PGRES_POLLING_READING: + // Switch to reading + req_event |= POLLIN; + break; + case PGRES_POLLING_OK: + pgconn.state = ASYNC_ST::ASYNC_CONNECT_END; + + if (st.task.type == task_type_t::connect) { + st.task.end = monotonic_time(); + } else if (st.task.type == task_type_t::ping) { + goto next_immediate; + } else { + assert(0 && "Non-implemented task-type"); + } + break; + case PGRES_POLLING_FAILED: { + // During connection phase use `PQerrorMessage` + auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + st.task.end = monotonic_time(); + proxy_error("Monitor connect FAILED error='%s'\n", err.get()); + + pgconn.state = ASYNC_ST::ASYNC_CONNECT_FAILED; + pgconn.err = std::move(err); + break; + } + } + break; + } + case ASYNC_ST::ASYNC_CONNECT_END: { + // Check if NOTHING, comment works + int rc = PQsendQuery(pgconn.conn, ""); + if (rc == 0) { + const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + proxy_error("Monitor ping start FAILED error='%s'\n", err.get()); + + pgconn.state = ASYNC_ST::ASYNC_PING_FAILED; + } else { + int res = PQflush(pgconn.conn); + + if (res < 0) { + const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + proxy_error("Monitor ping start FAILED error='%s'\n", err.get()); + + pgconn.state = ASYNC_ST::ASYNC_PING_FAILED; + } else { + req_event |= res > 0 ? POLLOUT : POLLIN; + pgconn.state = ASYNC_ST::ASYNC_PING_CONT; + } + } + break; + } + case ASYNC_ST::ASYNC_PING_CONT: { + // Single command queries; 'PQisBusy' and 'PQconsumeInput' not required + PGresult* res { PQgetResult(pgconn.conn) }; + + // Wait for the result asynchronously + if (res == NULL) { + pgconn.state = ASYNC_ST::ASYNC_PING_END; + st.task.end = monotonic_time(); + } else { + // Check for errors in the query execution + ExecStatusType status = PQresultStatus(res); + + if (status == PGRES_EMPTY_QUERY) { + const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + pgconn.state = ASYNC_ST::ASYNC_PING_END; + st.task.end = monotonic_time(); + + // Cleanup of resultset required for conn reuse + PQclear(PQgetResult(pgconn.conn)); + } else if (status != PGRES_COMMAND_OK) { + const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + proxy_error("Monitor ping FAILED status=%d error='%s'\n", status, err.get()); + pgconn.state = ASYNC_ST::ASYNC_PING_FAILED; + } + } + + // Clear always; we assume no resultset on ping + PQclear(res); + break; + } + case ASYNC_ST::ASYNC_PING_END: { + pgconn.state = ASYNC_ST::ASYNC_CONNECT_END; + break; + } + default: { + // Should not be reached + assert(0 && "State matching should be exhaustive"); + break; + } + } + + return req_event; +} + +string build_conn_str(const task_st_t& task_st) { + const mon_srv_t& srv_info { task_st.op_st.srv_info }; + const mon_user_t& user_info { task_st.op_st.user_info }; + + return string { + "host='" + srv_info.addr + "' " + + "port='" + std::to_string(srv_info.port) + "' " + + "user='" + user_info.user + "' " + + "password='" + user_info.pass + "' " + + "dbname='" + user_info.schema + "' " + + "application_name=ProxySQL-Monitor" + }; +} + +struct conn_pool_t { + unordered_map> conn_map; + std::mutex mutex; +}; + +conn_pool_t mon_conn_pool {}; + +pair get_conn( + conn_pool_t& conn_pool, const mon_srv_t& srv_info, uint64_t intv +) { + bool found { false }; + pgsql_conn_t found_conn {}; + vector expired_conns {}; + + { + std::lock_guard lock(mon_conn_pool.mutex); + + const string key { srv_info.addr + ":" + std::to_string(srv_info.port) }; + auto it = mon_conn_pool.conn_map.find(key); + + if (it != mon_conn_pool.conn_map.end()) { + list& conn_list = it->second; + auto now = monotonic_time(); + + for (auto it = conn_list.begin(); it != conn_list.end();) { + // TODO: Tune this value; keeping alive too many conns per-host + // - Connect always create new connections + // - Low connect intervals guarantee to keep up to N conns per host + if (now - it->last_used > 3 * intv) { + expired_conns.emplace_back(std::move(*it)); + it = conn_list.erase(it); + } else { + ++it; + } + } + + if (!conn_list.empty()) { + found = true; + found_conn = std::move(conn_list.front()); + + conn_list.pop_front(); + } + } + } + + for (pgsql_conn_t& conn : expired_conns) { + PQfinish(conn.conn); + } + + return pair(found, std::move(found_conn)); +} + +void put_conn(conn_pool_t& conn_pool, const mon_srv_t& srv_info, pgsql_conn_t conn) { + std::lock_guard lock(conn_pool.mutex); + + const string key { srv_info.addr + ":" + std::to_string(srv_info.port) }; + conn_pool.conn_map[key].emplace_back(std::move(conn)); +} + +uint64_t get_connpool_cleanup_intv(task_st_t& task) { + uint64_t res = 0; + + if (task.type == task_type_t::connect) { + connect_params_t* params { + static_cast(task.op_st.task_params.get()) + }; + + res = params->ping_interval; + } else if (task.type == task_type_t::ping) { + ping_params_t* params { + static_cast(task.op_st.task_params.get()) + }; + + res = params->interval; + } else { + assert(0 && "Non-implemented task-type"); + } + + return res; +} + +pair get_task_conn(conn_pool_t& conn_pool, task_st_t& task_st) { + if (task_st.type == task_type_t::connect) { + return pair { false, pgsql_conn_t {} }; + } else { + const mon_srv_t& mon_srv { task_st.op_st.srv_info }; + uint64_t cleanup_intv { get_connpool_cleanup_intv(task_st) }; + + return get_conn(conn_pool, mon_srv, cleanup_intv); + } +} + +pgsql_conn_t create_conn(task_st_t& task_st) { + const mon_srv_t& srv { task_st.op_st.srv_info }; + + // Initialize connection parameters + const string conn_str { build_conn_str(task_st) }; + // Count the task as already started (conn acquisition) + task_st.start = monotonic_time(); + // Get task from connpool if task types allows it + pair conn_pool_res { get_task_conn(mon_conn_pool, task_st) }; + + if (conn_pool_res.first) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Fetched conn from pool addr='%s:%d' thread=%lu\n", + srv.addr.c_str(), srv.port, pthread_self() + ); + + return std::move(conn_pool_res.second); + } else { +#ifdef DEBUG + if (task_st.type != task_type_t::connect) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "No suitable conn found in pool addr='%s:%d' thread=%lu\n", + srv.addr.c_str(), srv.port, pthread_self() + ); + } +#endif + + pgsql_conn_t pg_conn {}; + pg_conn.conn = PQconnectStart(conn_str.c_str()); + + if (pg_conn.conn == NULL || PQstatus(pg_conn.conn) == CONNECTION_BAD) { + if (pg_conn.conn) { + // WARNING: DO NOT RELEASE this PGresult + const PGresult* result = PQgetResultFromPGconn(pg_conn.conn); + const char* error { PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY) }; + proxy_error("Monitor connect failed error='%s'\n", error); + } else { + proxy_error("Monitor connect failed error='%s'\n", "Out of memory"); + } + } else { + if (PQsetnonblocking(pg_conn.conn, 1) != 0) { + // WARNING: DO NOT RELEASE this PGresult + const PGresult* result = PQgetResultFromPGconn(pg_conn.conn); + const char* error { PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY) }; + proxy_error("Failed to set non-blocking mode error=%s\n", error); + } else { + pg_conn.state = ASYNC_ST::ASYNC_CONNECT_CONT; + pg_conn.fd = PQsocket(pg_conn.conn); + } + } + + return pg_conn; + } +} + +// Previous tasks results +struct tasks_stats_t { + uint64_t start; + uint64_t end; + uint64_t count; +}; + +// Compute the required number of threads for the current interval +uint32_t required_worker_threads( + tasks_stats_t prev, + uint64_t worker_threads, + uint64_t new_tasks_intv, + uint64_t new_tasks_count +) { + uint64_t req_worker_threads = worker_threads; + + double prev_intv_rate = double(prev.count) / (prev.end - prev.start); + double est_intv_proc_tasks = new_tasks_intv * prev_intv_rate; + + if (est_intv_proc_tasks < new_tasks_count && prev.count != 0) { + // Estimate of number of tasks consumed per worker + double tasks_per_worker = double(prev.count) / worker_threads; + req_worker_threads = ceil(new_tasks_count / tasks_per_worker); + } + + return req_worker_threads; +} + +struct next_tasks_intvs_t { + uint64_t next_ping_at; + uint64_t next_connect_at; +}; + +struct task_poll_t { + std::vector fds {}; + std::vector tasks {}; + size_t size = 0; +}; + +void add_task( + task_poll_t& task_poll, short int events, state_t&& task +) { + if (task_poll.size < task_poll.fds.size()) { + task_poll.fds[task_poll.size] = pollfd { task.conn.fd, events, 0 }; + } else { + task_poll.fds.emplace_back(pollfd { task.conn.fd, events, 0 }); + } + if (task_poll.size < task_poll.tasks.size()) { + task_poll.tasks[task_poll.size] = std::move(task); + } else { + task_poll.tasks.emplace_back(std::move(task)); + } + + task_poll.size++; +} + +void rm_task_fast(task_poll_t& task_poll, size_t idx) { + if (idx > task_poll.size || idx < 0) { + proxy_error("Receveid invalid task index idx=%lu", idx); + assert(0); + } + + task_poll.fds[idx] = task_poll.fds[task_poll.size - 1]; + task_poll.tasks[idx] = std::move(task_poll.tasks[task_poll.size - 1]); + task_poll.size--; +} + +struct task_queue_t { + int comm_fd[2]; + std::queue queue {}; + std::mutex mutex {}; + + task_queue_t() { + int rc = pipe(comm_fd); + assert(rc == 0 && "Failed to create pipe for Monitor worker thread"); + } +}; + +struct task_res_t { + task_status_t status; + task_st_t task; +}; + +struct result_queue_t { + std::queue queue {}; + std::mutex mutex {}; +}; + +tasks_stats_t compute_intv_stats(result_queue_t& results) { + std::lock_guard lock_queue { results.mutex }; + + tasks_stats_t stats {}; + + if (results.queue.size() != 0) { + stats = tasks_stats_t { + results.queue.front().task.op_st.start, + results.queue.back().task.op_st.end, + results.queue.size() + }; + } else { + stats = tasks_stats_t { 0, 0, 0 }; + } + + results.queue = {}; + + return stats; +} + +vector create_ping_tasks( + uint64_t curtime, + const mon_user_t user_info, + const ping_conf_t& conf +) { + vector tasks {}; + const vector srvs_info { ext_srvs(conf.srvs_info) }; + + for (const auto& srv_info : srvs_info) { + tasks.push_back(task_st_t { + curtime, + 0, + task_type_t::ping, + op_st_t { + 0, + 0, + srv_info, + user_info, + task_params_t { + new ping_params_t { conf.params }, + [] (void* v) { delete static_cast(v); } + } + } + }); + } + + return tasks; +} + +vector create_connect_tasks( + uint64_t curtime, + const mon_user_t user_info, + const connect_conf_t& conf +) { + vector tasks {}; + const vector srvs_info { ext_srvs(conf.srvs_info) }; + + for (const auto& srv_info : srvs_info) { + tasks.push_back(task_st_t { + curtime, + 0, + task_type_t::connect, + op_st_t { + 0, + 0, + srv_info, + user_info, + task_params_t { + new connect_params_t { conf.params }, + [] (void* v) { delete static_cast(v); } + } + } + }); + } + + return tasks; +} + +struct thread_t { + pthread_t handle; + + thread_t(const thread_t&) = delete; + thread_t(thread_t&) = delete; + + thread_t() : handle(0) {}; + thread_t(pthread_t hndl) : handle(hndl) {}; + thread_t(thread_t&& other) : handle(other.handle) { + other.handle = 0; + }; + + ~thread_t() { + if (handle == 0) return; + + // NOTE: Not required since **right now** threads are joined by scheduler + // //////////////////////////////////////////////////////////////////// + // Detach the thread if it's not already detached. + // int detach_result = pthread_detach(handle); + // assert(detach_result == 0 && "Failed to detach thread during destruction."); + // //////////////////////////////////////////////////////////////////// + + // Cancel the thread if it's not already canceled. + int cancel_result = pthread_cancel(handle); + assert(cancel_result == 0 && "Failed to cancel thread during destruction."); + } +}; + +using worker_queue_t = pair; +using worker_thread_t = pair>; + +std::pair create_thread(size_t stack_size, void*(*routine)(void*), void* args) { + pthread_attr_t attr; + int result = pthread_attr_init(&attr); + assert(result == 0 && "Failed to initialize thread attributes."); + + result = pthread_attr_setstacksize(&attr, stack_size); + assert(result == 0 && "Invalid stack size provided for thread creation."); + + pthread_t thread; + result = pthread_create(&thread, &attr, routine, args); + pthread_attr_destroy(&attr); + + if (result != 0) { + return std::make_pair(result, thread_t {}); + } else { + return std::make_pair(result, thread_t { thread }); + } +} + +void write_signal(int fd, uint8_t val) { + uint8_t s { val }; + + for (;;) { + int rc = write(fd, &s, 1); + + if (rc >= 0) { + break; + } else if (errno == EINTR || errno == EAGAIN) { + continue; + } else { + proxy_error( + "Failed to signal Monitor workers. Aborting rc=%d errno=%d\n", rc, errno + ); + assert(0); + } + } +} + +uint8_t read_signal(int fd) { + uint8_t s { 0 }; + + for (;;) { + int rc = read(fd, &s, 1); + + if (rc >= 0) { + break; + } else if (errno == EINTR || errno == EAGAIN) { + continue; + } else { + proxy_error( + "Failed to read scheduler signal. Aborting rc=%d errno=%d\n", rc, errno + ); + assert(0); + } + } + + return s; +} + +/** + * @brief At worst ⌊A/B⌋ + (B - 1) extra elements for the final thread. + * @details TODO: Improve batch scheduling to avoid network burst. + * + * @param worker_threads + * @param new_tasks + */ +void schedule_tasks( + vector& worker_threads, vector tasks +) { + size_t tasks_per_thread { tasks.size() / worker_threads.size() }; + size_t task_idx = 0; + + for (size_t i = 0; i < worker_threads.size(); i++) { + task_queue_t& task_queue { worker_threads[i].second->first }; + std::lock_guard lock_queue { task_queue.mutex }; + + if (i == worker_threads.size() - 1) { + for (size_t j = task_idx; j < tasks.size(); j++) { + task_queue.queue.push(std::move(tasks[j])); + } + } else { + for (uint64_t t = 0; t < tasks_per_thread; t++, task_idx++) { + task_queue.queue.push(std::move(tasks[task_idx])); + } + } + } + + // Signal all threads to process queues + for (size_t i = 0; i < worker_threads.size(); i++) { + task_queue_t& task_queue { worker_threads[i].second->first }; + write_signal(task_queue.comm_fd[1], 0); + } +} + +uint64_t CONN_RATE_LIMIT = 50; + +void schedule_tasks_batches( + vector& worker_threads, vector tasks +) { + size_t batch_c = tasks.size() / CONN_RATE_LIMIT; + size_t f_batch = tasks.size() % CONN_RATE_LIMIT; + +#ifdef DEBUG + // TODO: Should give info about the kind/count of tasks scheduled + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Scheduling tasks batches batch_count=%lu final_batch=%lu\n", + batch_c, f_batch + ); +#endif + + vector> batches {}; + + for (size_t i = 0; i <= batch_c; i++) { + vector new_batch {}; + + if (i < batch_c) { + for (size_t j = i * CONN_RATE_LIMIT; j < CONN_RATE_LIMIT * (i + 1); j++) { + new_batch.push_back(std::move(tasks[j])); + } + } else { + for (size_t j = i * CONN_RATE_LIMIT; j < f_batch; j++) { + new_batch.push_back(std::move(tasks[j])); + } + } + + batches.push_back(std::move(new_batch)); + } + + for (size_t i = 0; i < batches.size(); i++) { + schedule_tasks(worker_threads, std::move(batches[i])); + usleep(CONN_RATE_LIMIT * 1000); + } +} + +bool check_success(pgsql_conn_t& c, task_st_t& st) { + return + ((c.state != ASYNC_ST::ASYNC_CONNECT_FAILED && c.state != ASYNC_CONNECT_TIMEOUT) + || (c.state != ASYNC_ST::ASYNC_PING_FAILED && c.state != ASYNC_PING_TIMEOUT)) + && ((c.state == ASYNC_ST::ASYNC_CONNECT_END && st.type == task_type_t::connect) + || (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping)); +} + +bool is_task_finish(pgsql_conn_t& c, task_st_t& st) { + return + ((c.state == ASYNC_ST::ASYNC_CONNECT_FAILED || c.state == ASYNC_ST::ASYNC_CONNECT_TIMEOUT) + || (c.state == ASYNC_ST::ASYNC_PING_FAILED || c.state == ASYNC_ST::ASYNC_PING_TIMEOUT)) + || (c.state == ASYNC_ST::ASYNC_CONNECT_END && st.type == task_type_t::connect) + || (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping); +} + +void update_connect_table(SQLite3DB* db, state_t& state) { + sqlite3_stmt* stmt = nullptr; + int rc = db->prepare_v2( + "INSERT OR REPLACE INTO pgsql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)", &stmt + ); + ASSERT_SQLITE_OK(rc, db); + + sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str()); + sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port); + + uint64_t op_dur_us = state.task.end - state.task.start; + // TODO: Revisit this; maybe a better way? + uint64_t time_start_us = realtime_time() - op_dur_us; + sqlite_bind_int64(stmt, 3, time_start_us); + + uint64_t conn_succ_time_us { check_success(state.conn, state.task) ? op_dur_us : 0 }; + sqlite_bind_int64(stmt, 4, conn_succ_time_us); + sqlite_bind_text(stmt, 5, state.conn.err.get()); + + SAFE_SQLITE3_STEP2(stmt); + + sqlite_clear_bindings(stmt); + sqlite_reset_statement(stmt); + sqlite_finalize_statement(stmt); + + if (state.conn.err) { + const mon_srv_t& srv { state.task.op_st.srv_info }; + int err_code { 0 }; + + if (state.conn.state != ASYNC_ST::ASYNC_CONNECT_TIMEOUT) { + err_code = 9100 + state.conn.state; + } else { + err_code = ER_PROXYSQL_CONNECT_TIMEOUT; + }; + + PgHGM->p_update_pgsql_error_counter( + p_pgsql_error_type::proxysql, + 0, + const_cast(srv.addr.c_str()), + srv.port, + err_code + ); + __sync_fetch_and_add(&GloPgMon->connect_check_ERR, 1); + } else { + __sync_fetch_and_add(&GloPgMon->connect_check_OK, 1); + } +} + +void update_ping_table(SQLite3DB* db, state_t& state) { + sqlite3_stmt* stmt = nullptr; + int rc = db->prepare_v2( + "INSERT OR REPLACE INTO pgsql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)", &stmt + ); + ASSERT_SQLITE_OK(rc, db); + + sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str()); + sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port); + + uint64_t op_dur_us = state.task.end - state.task.start; + // TODO: Revisit this; maybe a better way? + uint64_t time_start_us = realtime_time() - op_dur_us; + sqlite_bind_int64(stmt, 3, time_start_us); + + uint64_t conn_succ_time_us { check_success(state.conn, state.task) ? op_dur_us : 0 }; + sqlite_bind_int64(stmt, 4, conn_succ_time_us); + sqlite_bind_text(stmt, 5, state.conn.err.get()); + + SAFE_SQLITE3_STEP2(stmt); + + sqlite_clear_bindings(stmt); + sqlite_reset_statement(stmt); + sqlite_finalize_statement(stmt); + + if (state.conn.err) { + const mon_srv_t& srv { state.task.op_st.srv_info }; + int err_code { 0 }; + + if (state.conn.state != ASYNC_ST::ASYNC_PING_TIMEOUT) { + err_code = 9100 + state.conn.state; + } else { + err_code = ER_PROXYSQL_PING_TIMEOUT; + }; + + PgHGM->p_update_pgsql_error_counter( + p_pgsql_error_type::proxysql, + 0, + const_cast(srv.addr.c_str()), + srv.port, + err_code + ); + __sync_fetch_and_add(&GloPgMon->ping_check_ERR, 1); + } else { + __sync_fetch_and_add(&GloPgMon->ping_check_OK, 1); + } +} + +const char MAINT_PING_LOG_QUERY[] { + "DELETE FROM pgsql_server_ping_log WHERE time_start_us < ?1" +}; + +const char MAINT_CONNECT_LOG_QUERY[] { + "DELETE FROM pgsql_server_connect_log WHERE time_start_us < ?1" +}; + +void maint_monitor_table(SQLite3DB* db, const char query[], const ping_params_t& params) { + sqlite3_stmt* stmt { nullptr }; + int rc = db->prepare_v2(query, &stmt); + ASSERT_SQLITE_OK(rc, db); + + if (pgsql_thread___monitor_history < (params.interval * (params.max_failures + 1)) / 1000) { + if (static_cast(params.interval) < uint64_t(3600000) * 1000) { + pgsql_thread___monitor_history = (params.interval * (params.max_failures + 1)) / 1000; + } + } + + uint64_t max_history_age { realtime_time() - uint64_t(pgsql_thread___monitor_history)*1000 }; + sqlite_bind_int64(stmt, 1, max_history_age); + SAFE_SQLITE3_STEP2(stmt); + + sqlite_clear_bindings(stmt); + sqlite_reset_statement(stmt); + sqlite_finalize_statement(stmt); +} + +const char PING_MON_HOSTS_QUERY[] { + "SELECT DISTINCT" + " a.hostname," + " a.port" + " FROM" + " monitor_internal.pgsql_servers a" + " JOIN pgsql_server_ping_log b ON a.hostname = b.hostname" + " WHERE" + " b.ping_error IS NOT NULL" + " AND b.ping_error NOT LIKE '%%password authentication failed for user%%'" +}; + +const char HOST_TO_SHUNN_QUERY[] { + "SELECT 1" + " FROM" + " (" + " SELECT hostname, port, ping_error" + " FROM pgsql_server_ping_log" + " WHERE hostname = '%s' AND port = '%s'" + " ORDER BY time_start_us DESC" + " LIMIT % d" + " ) a" + " WHERE" + " ping_error IS NOT NULL" + " AND ping_error NOT LIKE '%%password authentication failed for user%%'" + " GROUP BY" + " hostname," + " port" + " HAVING" + " COUNT(*) = %d" +}; + +void shunn_non_resp_srvs(SQLite3DB* db, state_t& state) { + ping_params_t* params { static_cast(state.task.op_st.task_params.get()) }; + char* err { nullptr }; + + unique_ptr resultset { db->execute_statement(PING_MON_HOSTS_QUERY, &err) }; + if (err) { + proxy_error( + "Internal query error. Aborting query=%s error=%s\n", PING_MON_HOSTS_QUERY, err + ); + free(err); + assert(0); + } + + vector> addr_port_p {}; + + for (const SQLite3_row* row : resultset->rows) { + char* addr { row->fields[0] }; + char* port { row->fields[1] }; + int32_t max_fails { params->max_failures }; + + cfmt_t query_fmt { + cstr_format(HOST_TO_SHUNN_QUERY, addr, port, max_fails, max_fails) + }; + char* err { nullptr }; + unique_ptr resultset { + db->execute_statement(query_fmt.str.c_str(), &err) + }; + + if (!err && resultset && resultset->rows_count) { + bool shunned { PgHGM->shun_and_killall(addr, atoi(port)) }; + if (shunned) { + proxy_error( + "Server %s:%s missed %d heartbeats, shunning it and killing all the connections." + " Disabling other checks until the node comes back online.\n", + addr, port, max_fails + ); + } + } else if (err) { + proxy_error( + "Internal query error. Aborting query=%s error=%s\n", + query_fmt.str.c_str(), err + ); + free(err); + assert(0); + } + } +} + +const char PING_SRVS_NO_ERRORS[] { + "SELECT DISTINCT a.hostname, a.port" + " FROM" + " monitor_internal.pgsql_servers a" + " JOIN pgsql_server_ping_log b ON a.hostname = b.hostname" + " WHERE b.ping_error IS NULL" +}; + +const char UPD_SRVS_LATENCY_QUERY[] { + "SELECT" + " hostname, port, COALESCE(CAST(AVG(ping_success_time_us) AS INTEGER), 10000)" + " FROM" + " (" + " SELECT hostname, port, ping_success_time_us, ping_error" + " FROM pgsql_server_ping_log" + " WHERE hostname = '%s' AND port = '%s'" + " ORDER BY time_start_us DESC" + " LIMIT 3" + " ) a" + " WHERE ping_error IS NULL" + " GROUP BY hostname, port" +}; + +void upd_srvs_latency(SQLite3DB* db, state_t& state) { + char* err { nullptr }; + + unique_ptr resultset { db->execute_statement(PING_SRVS_NO_ERRORS, &err) }; + if (err) { + proxy_error( + "Internal query error. Aborting query=%s error=%s\n", PING_SRVS_NO_ERRORS, err + ); + free(err); + assert(0); + } + + for (const SQLite3_row* row : resultset->rows) { + char* addr { row->fields[0] }; + char* port { row->fields[1] }; + + cfmt_t query_fmt { cstr_format(UPD_SRVS_LATENCY_QUERY, addr, port) }; + char* err { nullptr }; + unique_ptr resultset { + db->execute_statement(query_fmt.str.c_str(), &err) + }; + + if (!err && resultset && resultset->rows_count) { + for (const SQLite3_row* srv : resultset->rows) { + char* cur_latency { srv->fields[2] }; + PgHGM->set_server_current_latency_us(addr, atoi(port), atoi(cur_latency)); + } + } else if (err) { + proxy_error( + "Internal query error. Aborting query=%s error=%s\n", query_fmt.str.c_str(), err + ); + free(err); + assert(0); + } + } +} + +void perf_ping_actions(SQLite3DB* db, state_t& state) { + // Update table entries + update_ping_table(db, state); + + // TODO: Checks for the following potential actions take most of the processing time. + // The actions should be redesign so the checks themselves are cheap operations, + // actions could remain expensive, as they should be the exception, not the norm. + ///////////////////////////////////////////////////////////////////////////////////// + // Shunn all problematic hosts + shunn_non_resp_srvs(db, state); + + // Update 'current_lantency_ms' + upd_srvs_latency(db, state); + ///////////////////////////////////////////////////////////////////////////////////// +} + +void proc_task_state(state_t& state) { + pgsql_conn_t& pg_conn { state.conn }; + + if (state.task.type == task_type_t::connect) { + connect_params_t* params { + static_cast(state.task.op_st.task_params.get()) + }; + + if (monotonic_time() - state.task.start > static_cast(params->timeout)) { + // TODO: Unified state processing + pg_conn.state = ASYNC_ST::ASYNC_CONNECT_TIMEOUT; + state.task.end = monotonic_time(); + pg_conn.err = mf_unique_ptr(strdup("Operation timed out")); + + // TODO: proxy_error + metrics update + update_connect_table(&GloPgMon->monitordb, state); + } else if (is_task_finish(state.conn, state.task)) { + // Perform the dumping + update_connect_table(&GloPgMon->monitordb, state); + } + } else if (state.task.type == task_type_t::ping) { + ping_params_t* params { + static_cast(state.task.op_st.task_params.get()) + }; + + if (monotonic_time() - state.task.start > static_cast(params->timeout)) { + // TODO: Unified state processing + pg_conn.state = ASYNC_ST::ASYNC_PING_TIMEOUT; + state.task.end = monotonic_time(); + pg_conn.err = mf_unique_ptr(strdup("Operation timed out")); + + // TODO: proxy_error + metrics update + perf_ping_actions(&GloPgMon->monitordb, state); + } else if (is_task_finish(state.conn, state.task)) { + // Perform the dumping + perf_ping_actions(&GloPgMon->monitordb, state); + } + } else { + assert(0 && "Non-implemented task-type"); + } +} + +void add_scheduler_comm_task(const task_queue_t& tasks_queue, task_poll_t& task_poll) { + state_t dummy_state { + pgsql_conn_t { + nullptr, + tasks_queue.comm_fd[0], + 0, + ASYNC_ST::ASYNC_CONNECT_FAILED, + {} + }, + task_st_t {} + }; + + add_task(task_poll, POLLIN, std::move(dummy_state)); +} + +uint64_t MAX_CHECK_DELAY_US = 500000; + +uint64_t get_task_timeout(state_t& state) { + uint64_t task_to = 0; + + if (state.task.type == task_type_t::connect) { + connect_params_t* params { + static_cast(state.task.op_st.task_params.get()) + }; + + task_to = params->timeout; + } else if (state.task.type == task_type_t::ping) { + ping_params_t* params { + static_cast(state.task.op_st.task_params.get()) + }; + + task_to = params->timeout; + } else { + assert(0 && "Non-implemented task-type"); + } + + return task_to; +} + +void* worker_thread(void* args) { + pair* queues { + static_cast*>(args) + }; + +#ifdef DEBUG + pthread_t self = pthread_self(); +#endif + + task_queue_t& tasks_queue = queues->first; + result_queue_t& _ = queues->second; + bool recv_stop_signal = 0; + + queue next_tasks {}; + task_poll_t task_poll {}; + // Insert dummy task for scheduler comms + add_scheduler_comm_task(tasks_queue, task_poll); + + while (recv_stop_signal == false) { + // Process wakup signal from scheduler + if (task_poll.fds[0].revents & POLLIN) { + recv_stop_signal = read_signal(task_poll.fds[0].fd); + + if (recv_stop_signal == 1) { + proxy_info("Received exit signal, stopping worker thread=%ld\n", self); + continue; + } + } + + // Fetch the next tasks from the queue + { + std::lock_guard tasks_mutex { tasks_queue.mutex }; +#ifdef DEBUG + if (tasks_queue.queue.size()) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Fetching tasks from queue size=%lu thread=%lu\n", + tasks_queue.queue.size(), self + ); + } +#endif + while (tasks_queue.queue.size()) { + next_tasks.push(std::move(tasks_queue.queue.front())); + tasks_queue.queue.pop(); + } + } + + // Start processing the new tasks; create/fetch conns + while (next_tasks.size()) { + task_st_t task { std::move(next_tasks.front()) }; + next_tasks.pop(); + + + if (task.type != task_type_t::ping) { + // Check if server is responsive; if not, only ping tasks are processed + const mon_srv_t& srv { task.op_st.srv_info }; + + connect_params_t* params { + static_cast(task.op_st.task_params.get()) + }; + int32_t max_fails = params->ping_max_failures; + + bool srv_resp { + server_responds_to_ping( + GloPgMon->monitordb, srv.addr.c_str(), srv.port, max_fails + ) + }; + + if (srv_resp == false) { + proxy_debug(PROXY_DEBUG_MONITOR, 6, + "Skipping unresponsive server addr='%s:%d' thread=%lu\n", + srv.addr.c_str(), srv.port, self + ); + continue; + } + } + + pgsql_conn_t conn { create_conn(task) }; + state_t init_st { std::move(conn), std::move(task) }; + +#ifdef DEBUG + const mon_srv_t& srv { init_st.task.op_st.srv_info }; + proxy_debug(PROXY_DEBUG_MONITOR, 6, + "Adding new task to poll addr='%s:%d' fd=%d thread=%lu\n", + srv.addr.c_str(), srv.port, conn.fd, self + ); +#endif + + add_task(task_poll, POLLOUT, std::move(init_st)); + } + + uint64_t next_timeout_at = ULONG_LONG_MAX; + + // Continue processing tasks; Next async operation + for (size_t i = 1; i < task_poll.size; i++) { +#if DEBUG + pollfd& pfd { task_poll.fds[i] }; + state_t& task_st { task_poll.tasks[i] }; + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Processing task fd=%d revents=%d type=%d state=%d thread=%ld\n", + pfd.fd, pfd.revents, int(task_st.task.type), task_st.conn.state, self + ); +#endif + + // filtering is possible here for the task + if (task_poll.fds[i].revents) { + task_poll.fds[i].events = handle_pg_event( + task_poll.tasks[i], task_poll.fds[i].revents + ); + } + + // Reference invalidated by 'rm_task_fast'. + pgsql_conn_t& conn { task_poll.tasks[i].conn }; + + // TODO: Dump all relevant task state and changes due 'pg_event' + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Updating task state fd=%d conn_st=%d thread=%lu\n", + conn.fd, static_cast(conn.state), self + ); + + // Process task status; Update final state if finished + proc_task_state(task_poll.tasks[i]); + + // TODO: Dump all relevant task state + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Updated task state fd=%d conn_st=%d thread=%lu\n", + conn.fd, static_cast(conn.state), self + ); + + // Failed/finished task; resuse conn / cleanup resources + if (is_task_finish(conn, task_poll.tasks[i].task)) { + // TODO: Dump all relevant task state + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Finished task fd=%d conn_st=%d thread=%ld\n", + conn.fd, static_cast(conn.state), pthread_self() + ); + + if (check_success(task_poll.tasks[i].conn, task_poll.tasks[i].task)) { + const mon_srv_t& srv { task_poll.tasks[i].task.op_st.srv_info }; + + // TODO: Better unified design to update state + task_poll.tasks[i].conn.state = ASYNC_ST::ASYNC_CONNECT_END; + task_poll.tasks[i].conn.last_used = task_poll.tasks[i].task.start; + + put_conn(mon_conn_pool, srv, std::move(task_poll.tasks[i].conn)); + + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Succeed task conn returned to pool fd=%d conn_st=%d thread=%ld\n", + conn.fd, static_cast(conn.state), pthread_self() + ); + } else { + PQfinish(task_poll.tasks[i].conn.conn); + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Failed task conn killed fd=%d conn_st=%d thread=%ld\n", + conn.fd, static_cast(conn.state), pthread_self() + ); + } + + // Remove from poll; after conn cleanup + rm_task_fast(task_poll, i); + } else { + uint64_t task_to = get_task_timeout(task_poll.tasks[i]); + uint64_t task_due_to = task_poll.tasks[i].task.start + task_to; + next_timeout_at = next_timeout_at > task_due_to ? task_due_to : next_timeout_at; + } + } + + uint64_t curtime = monotonic_time(); + uint64_t next_to_wait = next_timeout_at - curtime; + uint64_t poll_wait = next_to_wait > MAX_CHECK_DELAY_US ? MAX_CHECK_DELAY_US : next_to_wait; + + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Waiting for poll fds_len=%lu wait=%lu thread=%ld\n", task_poll.size, poll_wait, self + ); + int rc = poll(task_poll.fds.data(), task_poll.size, poll_wait); + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Wokeup from poll fds_len=%lu thread=%ld\n", task_poll.size, self + ); + + if (rc == -1 && errno == EINTR) + continue; + if (rc == -1) { + proxy_error("Call to 'poll' failed. Aborting rc=%d errno=%d\n", rc, errno); + assert(0); + } + } + + return NULL; +} + +void* PgSQL_monitor_scheduler_thread() { + proxy_info("Started Monitor scheduler thread for PgSQL servers\n"); + + // Quick exit during shutdown/restart + if (!GloPTH) { return NULL; } + + // Initial Monitor thread variables version + unsigned int PgSQL_Thread__variables_version = GloPTH->get_global_version(); + // PgSQL thread structure used for variable refreshing + unique_ptr pgsql_thread { init_pgsql_thread_struct() }; + + task_queue_t conn_tasks {}; + result_queue_t conn_results {}; + + uint32_t worker_threads_count = pgsql_thread___monitor_threads; + vector worker_threads {}; + + // TODO: Threads are right now fixed on startup. After startup, they should be dynamically + // resized based on the processing rate of the queues. We need to fix contingency in the + // current approach before this scaling is a viable option. + for (uint32_t i = 0; i < worker_threads_count; i++) { + unique_ptr worker_queue { new worker_queue_t {} }; + auto [err, th] { create_thread(2048 * 1024, worker_thread, worker_queue.get()) }; + assert(err == 0 && "Thread creation failed"); + + worker_threads.emplace_back(worker_thread_t { std::move(th), std::move(worker_queue) }); + } + + uint64_t cur_intv_start = 0; + next_tasks_intvs_t next_tasks_intvs {}; + + while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true) { + cur_intv_start = monotonic_time(); + + if ( + cur_intv_start < next_tasks_intvs.next_ping_at + && cur_intv_start < next_tasks_intvs.next_connect_at + ) { + uint64_t closest_intv = std::min( + next_tasks_intvs.next_connect_at, next_tasks_intvs.next_ping_at + ); + uint64_t next_check_delay = 0; + + if (closest_intv > MAX_CHECK_DELAY_US) { + next_check_delay = MAX_CHECK_DELAY_US; + } else { + next_check_delay = closest_intv; + } + + usleep(next_check_delay); + continue; + } + + // Quick exit during shutdown/restart + if (!GloPTH) { return NULL; } + + // Check variable version changes; refresh if needed + unsigned int glover = GloPTH->get_global_version(); + if (PgSQL_Thread__variables_version < glover) { + PgSQL_Thread__variables_version = glover; + pgsql_thread->refresh_variables(); + // TODO: Invalidate the connection pool? Changed monitor username / password? + } + + // Fetch config for next task scheduling + mon_tasks_conf_t tasks_conf { fetch_updated_conf(GloPgMon, PgHGM) }; + + // TODO: Compute metrics from worker queues from previous processing interval + // tasks_stats_t prev_intv_stats { compute_intv_stats(worker_queues->second) }; + + // Schedule next tasks / Compute next task interval + uint64_t cur_intv_start = monotonic_time(); + + // Create the tasks from config for this interval + vector intv_tasks {}; + + if (next_tasks_intvs.next_ping_at < cur_intv_start) { + maint_monitor_table( + &GloPgMon->monitordb, MAINT_PING_LOG_QUERY, tasks_conf.ping.params + ); + + vector ping_tasks { + create_ping_tasks(cur_intv_start, tasks_conf.user_info, tasks_conf.ping), + }; + intv_tasks.insert( + intv_tasks.end(), + std::make_move_iterator(ping_tasks.begin()), + std::make_move_iterator(ping_tasks.end()) + ); + + // Schedule next interval + next_tasks_intvs.next_ping_at = cur_intv_start + tasks_conf.ping.params.interval; + } + + if (next_tasks_intvs.next_connect_at < cur_intv_start) { + maint_monitor_table( + &GloPgMon->monitordb, MAINT_CONNECT_LOG_QUERY, tasks_conf.ping.params + ); + + vector conn_tasks { + create_connect_tasks(cur_intv_start, tasks_conf.user_info, tasks_conf.connect) + }; + + intv_tasks.insert( + intv_tasks.end(), + std::make_move_iterator(conn_tasks.begin()), + std::make_move_iterator(conn_tasks.end()) + ); + + // Schedule next interval + next_tasks_intvs.next_connect_at = cur_intv_start + tasks_conf.connect.params.interval; + } + + // TODO: With previous stats compute/resize number of working threads + // uint32_t _ = required_worker_threads( + // prev_intv_stats, + // worker_threads_count, + // tasks_conf.ping.params.interval, + // intv_tasks.size() + // ); + + // Schedule the tasks for the worker threads; dummy even distribution + schedule_tasks_batches(worker_threads, std::move(intv_tasks)); + } + + proxy_info("Exiting PgSQL_Monitor scheduling thread\n"); + + // Wakeup workers for shutdown + { + for (worker_thread_t& worker : worker_threads) { + write_signal(worker.second->first.comm_fd[1], 1); + } + for (worker_thread_t& worker : worker_threads) { + pthread_join(worker.first.handle, NULL); + } + } + + return nullptr; +} diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index a199d1dc3..3931eba3a 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -45,6 +45,7 @@ static PgSQL_Session* sess_stopat; extern PgSQL_Query_Processor* GloPgQPro; extern PgSQL_Threads_Handler* GloPTH; extern MySQL_Monitor* GloMyMon; +extern PgSQL_Monitor* GloPgMon; extern PgSQL_Logger* GloPgSQL_Logger; typedef struct mythr_st_vars { @@ -314,6 +315,7 @@ static char* pgsql_thread_variables_names[] = { (char*)"monitor_ping_interval", (char*)"monitor_ping_max_failures", (char*)"monitor_ping_timeout", +/* (char*)"monitor_aws_rds_topology_discovery_interval", (char*)"monitor_read_only_interval", (char*)"monitor_read_only_timeout", @@ -330,12 +332,17 @@ static char* pgsql_thread_variables_names[] = { (char*)"monitor_galera_healthcheck_interval", (char*)"monitor_galera_healthcheck_timeout", (char*)"monitor_galera_healthcheck_max_timeout_count", +*/ (char*)"monitor_username", (char*)"monitor_password", +/* (char*)"monitor_replication_lag_use_percona_heartbeat", (char*)"monitor_query_interval", (char*)"monitor_query_timeout", (char*)"monitor_slave_lag_when_null", +*/ + (char*)"monitor_threads", +/* (char*)"monitor_threads_min", (char*)"monitor_threads_max", (char*)"monitor_threads_queue_maxsize", @@ -344,6 +351,7 @@ static char* pgsql_thread_variables_names[] = { (char*)"monitor_local_dns_resolver_queue_maxsize", (char*)"monitor_wait_timeout", (char*)"monitor_writer_is_also_reader", +*/ (char*)"max_allowed_packet", (char*)"tcp_keepalive_time", (char*)"use_tcp_keepalive", @@ -936,6 +944,7 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.monitor_replication_lag_interval = 10000; variables.monitor_replication_lag_timeout = 1000; variables.monitor_replication_lag_count = 1; +/* TODO: Remove variables.monitor_groupreplication_healthcheck_interval = 5000; variables.monitor_groupreplication_healthcheck_timeout = 800; variables.monitor_groupreplication_healthcheck_max_timeout_count = 3; @@ -950,12 +959,16 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.monitor_threads_min = 8; variables.monitor_threads_max = 128; variables.monitor_threads_queue_maxsize = 128; +*/ + variables.monitor_threads = 2; variables.monitor_local_dns_cache_ttl = 300000; variables.monitor_local_dns_cache_refresh_interval = 60000; variables.monitor_local_dns_resolver_queue_maxsize = 128; variables.monitor_username = strdup((char*)"monitor"); variables.monitor_password = strdup((char*)"monitor"); +/* TODO: Remove variables.monitor_replication_lag_use_percona_heartbeat = strdup((char*)""); +*/ variables.monitor_wait_timeout = true; variables.monitor_writer_is_also_reader = true; variables.max_allowed_packet = 64 * 1024 * 1024; @@ -1168,7 +1181,9 @@ char* PgSQL_Threads_Handler::get_variable_string(char* name) { if (!strncmp(name, "monitor_", 8)) { if (!strcmp(name, "monitor_username")) return strdup(variables.monitor_username); if (!strcmp(name, "monitor_password")) return strdup(variables.monitor_password); +/* if (!strcmp(name, "monitor_replication_lag_use_percona_heartbeat")) return strdup(variables.monitor_replication_lag_use_percona_heartbeat); +*/ } if (!strncmp(name, "ssl_", 4)) { if (!strcmp(name, "ssl_p2s_ca")) { @@ -1489,7 +1504,9 @@ char* PgSQL_Threads_Handler::get_variable(char* name) { // this is the public fu if (!strncasecmp(name, "monitor_", 8)) { if (!strcasecmp(name, "monitor_username")) return strdup(variables.monitor_username); if (!strcasecmp(name, "monitor_password")) return strdup(variables.monitor_password); +/* if (!strcasecmp(name, "monitor_replication_lag_use_percona_heartbeat")) return strdup(variables.monitor_replication_lag_use_percona_heartbeat); +*/ } if (!strcasecmp(name, "threads")) { sprintf(intbuf, "%d", (num_threads ? num_threads : DEFAULT_NUM_THREADS)); @@ -2068,11 +2085,13 @@ char** PgSQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600 * 1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000 * 1000, false); +/* VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false); +*/ VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7 * 24 * 3600 * 1000, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600 * 1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000 * 1000, false); - +/* VariablesPointers_int["monitor_replication_lag_interval"] = make_tuple(&variables.monitor_replication_lag_interval, 100, 7 * 24 * 3600 * 1000, false); VariablesPointers_int["monitor_replication_lag_timeout"] = make_tuple(&variables.monitor_replication_lag_timeout, 100, 600 * 1000, false); VariablesPointers_int["monitor_replication_lag_count"] = make_tuple(&variables.monitor_replication_lag_count, 1, 10, false); @@ -2089,13 +2108,17 @@ char** PgSQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_query_interval"] = make_tuple(&variables.monitor_query_interval, 100, 7 * 24 * 3600 * 1000, false); VariablesPointers_int["monitor_query_timeout"] = make_tuple(&variables.monitor_query_timeout, 100, 600 * 1000, false); +*/ + VariablesPointers_int["monitor_threads"] = make_tuple(&variables.monitor_threads, 2, 256, false); +/* VariablesPointers_int["monitor_threads_min"] = make_tuple(&variables.monitor_threads_min, 2, 256, false); VariablesPointers_int["monitor_threads_max"] = make_tuple(&variables.monitor_threads_max, 4, 1024, false); VariablesPointers_int["monitor_slave_lag_when_null"] = make_tuple(&variables.monitor_slave_lag_when_null, 0, 604800, false); - VariablesPointers_int["monitor_threads_queue_maxsize"] = make_tuple(&variables.monitor_threads_queue_maxsize, 16, 1024, false); + VariablesPointers_int["monitor_threads_queue_maxsize"] = make_tuple(&variables.monitor_threads_queue_maxsize, 16, 1024, false); +*/ VariablesPointers_int["monitor_local_dns_cache_ttl"] = make_tuple(&variables.monitor_local_dns_cache_ttl, 0, 7 * 24 * 3600 * 1000, false); VariablesPointers_int["monitor_local_dns_cache_refresh_interval"] = make_tuple(&variables.monitor_local_dns_cache_refresh_interval, 0, 7 * 24 * 3600 * 1000, false); VariablesPointers_int["monitor_local_dns_resolver_queue_maxsize"] = make_tuple(&variables.monitor_local_dns_resolver_queue_maxsize, 16, 1024, false); @@ -3783,16 +3806,25 @@ void PgSQL_Thread::refresh_variables() { mysql_thread___monitor_wait_timeout = (bool)GloPTH->get_variable_int((char*)"monitor_wait_timeout"); mysql_thread___monitor_writer_is_also_reader = (bool)GloPTH->get_variable_int((char*)"monitor_writer_is_also_reader"); - mysql_thread___monitor_enabled = (bool)GloPTH->get_variable_int((char*)"monitor_enabled"); - mysql_thread___monitor_history = GloPTH->get_variable_int((char*)"monitor_history"); - mysql_thread___monitor_connect_interval = GloPTH->get_variable_int((char*)"monitor_connect_interval"); - mysql_thread___monitor_connect_timeout = GloPTH->get_variable_int((char*)"monitor_connect_timeout"); - mysql_thread___monitor_ping_interval = GloPTH->get_variable_int((char*)"monitor_ping_interval"); - mysql_thread___monitor_ping_max_failures = GloPTH->get_variable_int((char*)"monitor_ping_max_failures"); - mysql_thread___monitor_ping_timeout = GloPTH->get_variable_int((char*)"monitor_ping_timeout"); + */ + + pgsql_thread___monitor_enabled = (bool)GloPTH->get_variable_int((char*)"monitor_enabled"); + pgsql_thread___monitor_history = GloPTH->get_variable_int((char*)"monitor_history"); + pgsql_thread___monitor_connect_interval = GloPTH->get_variable_int((char*)"monitor_connect_interval"); + pgsql_thread___monitor_connect_timeout = GloPTH->get_variable_int((char*)"monitor_connect_timeout"); + pgsql_thread___monitor_ping_interval = GloPTH->get_variable_int((char*)"monitor_ping_interval"); + pgsql_thread___monitor_ping_max_failures = GloPTH->get_variable_int((char*)"monitor_ping_max_failures"); + pgsql_thread___monitor_ping_timeout = GloPTH->get_variable_int((char*)"monitor_ping_timeout"); + pgsql_thread___monitor_read_only_interval = GloPTH->get_variable_int((char*)"monitor_read_only_interval"); + pgsql_thread___monitor_read_only_timeout = GloPTH->get_variable_int((char*)"monitor_read_only_timeout"); + pgsql_thread___monitor_threads = GloPTH->get_variable_int((char*)"monitor_threads"); + if (pgsql_thread___monitor_username) free(pgsql_thread___monitor_username); + pgsql_thread___monitor_username = GloPTH->get_variable_string((char*)"monitor_username"); + if (pgsql_thread___monitor_password) free(pgsql_thread___monitor_password); + pgsql_thread___monitor_password = GloPTH->get_variable_string((char*)"monitor_password"); + + /* mysql_thread___monitor_aws_rds_topology_discovery_interval = GloPTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval"); - mysql_thread___monitor_read_only_interval = GloPTH->get_variable_int((char*)"monitor_read_only_interval"); - mysql_thread___monitor_read_only_timeout = GloPTH->get_variable_int((char*)"monitor_read_only_timeout"); mysql_thread___monitor_read_only_max_timeout_count = GloPTH->get_variable_int((char*)"monitor_read_only_max_timeout_count"); mysql_thread___monitor_replication_lag_group_by_host = (bool)GloPTH->get_variable_int((char*)"monitor_replication_lag_group_by_host"); mysql_thread___monitor_replication_lag_interval = GloPTH->get_variable_int((char*)"monitor_replication_lag_interval"); @@ -4377,30 +4409,32 @@ SQLite3_result* PgSQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) { pta[1] = buf; result->add_row(pta); } + */ { - pta[0] = (char*)"MySQL_Monitor_connect_check_OK"; - sprintf(buf, "%llu", GloMyMon->connect_check_OK); + pta[0] = (char*)"PgSQL_Monitor_connect_check_OK"; + sprintf(buf, "%lu", GloPgMon->connect_check_OK); pta[1] = buf; result->add_row(pta); } { - pta[0] = (char*)"MySQL_Monitor_connect_check_ERR"; - sprintf(buf, "%llu", GloMyMon->connect_check_ERR); + pta[0] = (char*)"PgSQL_Monitor_connect_check_ERR"; + sprintf(buf, "%lu", GloPgMon->connect_check_ERR); pta[1] = buf; result->add_row(pta); } { - pta[0] = (char*)"MySQL_Monitor_ping_check_OK"; - sprintf(buf, "%llu", GloMyMon->ping_check_OK); + pta[0] = (char*)"PgSQL_Monitor_ping_check_OK"; + sprintf(buf, "%lu", GloPgMon->ping_check_OK); pta[1] = buf; result->add_row(pta); } { - pta[0] = (char*)"MySQL_Monitor_ping_check_ERR"; - sprintf(buf, "%llu", GloMyMon->ping_check_ERR); + pta[0] = (char*)"PgSQL_Monitor_ping_check_ERR"; + sprintf(buf, "%lu", GloPgMon->ping_check_ERR); pta[1] = buf; result->add_row(pta); } + /* { pta[0] = (char*)"MySQL_Monitor_read_only_check_OK"; sprintf(buf, "%llu", GloMyMon->read_only_check_OK); diff --git a/lib/ProxySQL_Poll.cpp b/lib/ProxySQL_Poll.cpp index 78cb47c46..66135ad67 100644 --- a/lib/ProxySQL_Poll.cpp +++ b/lib/ProxySQL_Poll.cpp @@ -175,4 +175,4 @@ int ProxySQL_Poll::find_index(int fd) { } template class ProxySQL_Poll; -template class ProxySQL_Poll; \ No newline at end of file +template class ProxySQL_Poll; diff --git a/src/main.cpp b/src/main.cpp index d49c6a03e..8cb5f854b 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -34,6 +34,7 @@ using json = nlohmann::json; #include "proxysql_restapi.h" #include "Web_Interface.hpp" #include "proxysql_utils.h" +#include "PgSQL_Monitor.hpp" #include "libdaemon/dfork.h" #include "libdaemon/dsignal.h" @@ -82,6 +83,7 @@ void * __mysql_ldap_auth; volatile create_Web_Interface_t * create_Web_Interface = NULL; void * __web_interface; +std::thread* pgsql_monitor_thread = nullptr; extern int ProxySQL_create_or_load_TLS(bool bootstrap, std::string& msg); @@ -458,6 +460,7 @@ Web_Interface *GloWebInterface; MySQL_STMT_Manager_v14 *GloMyStmt; MySQL_Monitor *GloMyMon; +PgSQL_Monitor *GloPgMon; std::thread *MyMon_thread = NULL; MySQL_Logger *GloMyLogger; @@ -1007,7 +1010,9 @@ void ProxySQL_Main_join_all_threads() { if (GloMyMon) { GloMyMon->shutdown=true; } - + if (GloPgMon) { + GloPgMon->shutdown=true; + } // join GloMyMon thread if (GloMyMon && MyMon_thread) { cpu_timer t; @@ -1018,7 +1023,15 @@ void ProxySQL_Main_join_all_threads() { std::cerr << "GloMyMon joined in "; #endif } - + if (GloPgMon && pgsql_monitor_thread) { + cpu_timer t; + pgsql_monitor_thread->join(); + delete pgsql_monitor_thread; + pgsql_monitor_thread = NULL; +#ifdef DEBUG + std::cerr << "GloPgMon joined in "; +#endif + } // join GloQC thread if (GloQC) { cpu_timer t; @@ -1041,7 +1054,14 @@ void ProxySQL_Main_shutdown_all_modules() { std::cerr << "GloMyMon shutdown in "; #endif } - + if (GloPgMon) { + cpu_timer t; + delete GloPgMon; + GloPgMon=NULL; +#ifdef DEBUG + std::cerr << "GloPgMon shutdown in "; +#endif + } if (GloQC) { cpu_timer t; delete GloQC; @@ -1302,7 +1322,6 @@ void ProxySQL_Main_init_phase2___not_started(const bootstrap_info_t& boostrap_in } } - void ProxySQL_Main_init_phase3___start_all() { { @@ -1323,6 +1342,7 @@ void ProxySQL_Main_init_phase3___start_all() { } // Initialized monitor, no matter if it will be started or not GloMyMon = new MySQL_Monitor(); + GloPgMon = new PgSQL_Monitor(); // load all mysql servers to GloHGH { cpu_timer t; @@ -1427,6 +1447,8 @@ void ProxySQL_Main_init_phase3___start_all() { // Load the config not previously loaded for these modules GloAdmin->load_http_server(); GloAdmin->load_restapi_server(); + + pgsql_monitor_thread = new std::thread(&PgSQL_monitor_scheduler_thread); } From 6af8b842205f1cdd5f709a51b37babb08fa3d44e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 26 Sep 2024 11:47:51 +0200 Subject: [PATCH 2/3] Cleanup: Remove dead/commented code and minor styling --- include/proxysql_debug.h | 69 +++++++++++----------------------------- lib/debug.cpp | 51 ++++++++++------------------- lib/set_parser.cpp | 68 ++------------------------------------- 3 files changed, 37 insertions(+), 151 deletions(-) diff --git a/include/proxysql_debug.h b/include/proxysql_debug.h index c6416497e..569f92e00 100644 --- a/include/proxysql_debug.h +++ b/include/proxysql_debug.h @@ -1,14 +1,3 @@ - -/* -#ifdef DEBUG -#ifndef DEBUG_EXTERN -#define DEBUG_EXTERN -extern debug_level *gdbg_lvl; -extern int gdbg; -#endif -#endif -*/ - #ifndef __PROXYSQL_DEBUG_H #define __PROXYSQL_DEBUG_H @@ -46,7 +35,6 @@ class Timer { #ifdef DEBUG #define PROXY_TRACE() { proxy_debug(PROXY_DEBUG_GENERIC,10,"TRACE\n"); } -//#define PROXY_TRACE2() { proxy_info("TRACE\n"); } #define PROXY_TRACE2() #else #define PROXY_TRACE() @@ -64,7 +52,6 @@ class Timer { } \ } while (0) #elif defined(__linux__) -//#ifdef SYS_gettid #define proxy_debug(module, verbosity, fmt, ...) \ do { if (GloVars.global.gdbg) { \ proxy_debug_func(module, verbosity, syscall(SYS_gettid), __FILE__, __LINE__, __func__ , fmt, ## __VA_ARGS__); \ @@ -76,9 +63,6 @@ class Timer { #define proxy_debug(module, verbosity, fmt, ...) #endif /* DEBUG */ -/* -#ifdef DEBUG -*/ #define proxy_error(fmt, ...) \ do { \ time_t __timer; \ @@ -111,23 +95,7 @@ class Timer { strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); \ proxy_error_func(0, "%s %s:%d:%s(): [ERROR] " fmt, __buffer, fi, li, fu , ## __VA_ARGS__); \ } while(0) -/* -#else -#define proxy_error(fmt, ...) \ - do { \ - time_t __timer; \ - char __buffer[25]; \ - struct tm *__tm_info; \ - time(&__timer); \ - __tm_info = localtime(&__timer); \ - strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \ - proxy_error_func("%s [ERROR] " fmt , __buffer , ## __VA_ARGS__); \ - } while(0) -#endif -*/ -/* -#ifdef DEBUG -*/ + #define proxy_warning(fmt, ...) \ do { \ time_t __timer; \ @@ -150,20 +118,6 @@ class Timer { proxy_error_func(ecode, "%s %s:%d:%s(): [WARNING] " fmt, __buffer, __FILE__, __LINE__, __func__ , ## __VA_ARGS__); \ } while(0) -/* -#else -#define proxy_warning(fmt, ...) \ - do { \ - time_t __timer; \ - char __buffer[25]; \ - struct tm *__tm_info; \ - time(&__timer); \ - __tm_info = localtime(&__timer); \ - strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \ - proxy_error_func("%s [WARNING] " fmt , __buffer , ## __VA_ARGS__); \ - } while(0) -#endif -*/ #ifdef DEBUG #define proxy_info(fmt, ...) \ do { \ @@ -211,13 +165,26 @@ class Timer { #endif #ifdef DEBUG -//void *debug_logger(); #endif +#define NULL_DB_MSG "The pointer to sqlite3 database is NULL. Cannot get error message." + #define ASSERT_SQLITE_OK(rc, db) \ do { \ if (rc!=SQLITE_OK) { \ - proxy_error("SQLite3 error with return code %d. Error message: %s. Shutting down.\n", rc, db?(*proxy_sqlite3_errmsg)(db->get_db()):"The pointer to sqlite3 database is null. Cannot get error message."); \ + proxy_error( \ + "SQLite3 error. Shutting down rc=%d msg='%s'\n", \ + rc, db ? (*proxy_sqlite3_errmsg)(db->get_db()) : NULL_DB_MSG); \ + assert(0); \ + } \ + } while(0) + +#define ASSERT_SQLITE3_OK(rc, db) \ + do { \ + if (rc!=SQLITE_OK) { \ + proxy_error( \ + "SQLite3 error. Shutting down rc=%d msg='%s'\n", \ + rc, db ? (*proxy_sqlite3_errmsg)(db) : NULL_DB_MSG); \ assert(0); \ } \ } while(0) @@ -243,7 +210,7 @@ SQLite3_result* proxysql_get_message_stats(bool reset=false); */ void proxysql_init_debug_prometheus_metrics(); - +class SQLite3DB; /** * @brief Set or unset if Admin has debugdb_disk fully initialized */ @@ -251,4 +218,4 @@ void proxysql_set_admin_debugdb_disk(SQLite3DB *_db); void proxysql_set_admin_debug_output(unsigned int _do); -#endif +#endif // DEBUG diff --git a/lib/debug.cpp b/lib/debug.cpp index 4f3755c9c..08729b166 100644 --- a/lib/debug.cpp +++ b/lib/debug.cpp @@ -15,37 +15,16 @@ using std::string; using std::unordered_map; #ifdef DEBUG -#ifdef DEBUG_EXTERN -#undef DEBUG_EXTERN -#endif /* DEBUG_EXTERN */ -#endif /* DEBUG */ - -#ifndef CLOCK_MONOTONIC -#define CLOCK_MONOTONIC SYSTEM_CLOCK -#endif // CLOCK_MONOTONIC -#ifdef DEBUG __thread unsigned long long pretime=0; static pthread_mutex_t debug_mutex; static pthread_rwlock_t filters_rwlock; static SQLite3DB * debugdb_disk = NULL; sqlite3_stmt *statement1=NULL; static unsigned int debug_output = 1; -#endif /* DEBUG */ - -/* -static inline unsigned long long debug_monotonic_time() { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000); -} -*/ #define DEBUG_MSG_MAXSIZE 1024 -#ifdef DEBUG - - /** * @brief Contains all filters related to debug. * @details The convention for key value is `filename:line:function`. This key structure also applies also @@ -58,7 +37,6 @@ static inline unsigned long long debug_monotonic_time() { std::set* debug_filters = nullptr; static bool filter_debug_entry(const char *__file, int __line, const char *__func) { - //pthread_mutex_lock(&debug_mutex); pthread_rwlock_rdlock(&filters_rwlock); bool to_filter = false; if (debug_filters && debug_filters->size()) { // if the set is empty we aren't performing any filter, so we won't search @@ -99,7 +77,6 @@ static bool filter_debug_entry(const char *__file, int __line, const char *__fun } } } - //pthread_mutex_unlock(&debug_mutex); pthread_rwlock_unlock(&filters_rwlock); return to_filter; } @@ -107,19 +84,16 @@ static bool filter_debug_entry(const char *__file, int __line, const char *__fun // we use this function to sent the filters to Admin // we hold here the lock on filters_rwlock void proxy_debug_get_filters(std::set& f) { - //pthread_mutex_lock(&debug_mutex); pthread_rwlock_rdlock(&filters_rwlock); if (debug_filters) { f = *debug_filters; } pthread_rwlock_unlock(&filters_rwlock); - //pthread_mutex_unlock(&debug_mutex); } // we use this function to get the filters from Admin // we hold here the lock on filters_rwlock void proxy_debug_load_filters(std::set& f) { - //pthread_mutex_lock(&debug_mutex); pthread_rwlock_wrlock(&filters_rwlock); if (debug_filters) { debug_filters->erase(debug_filters->begin(), debug_filters->end()); @@ -128,11 +102,19 @@ void proxy_debug_load_filters(std::set& f) { debug_filters = new std::set(f); } pthread_rwlock_unlock(&filters_rwlock); - //pthread_mutex_unlock(&debug_mutex); } // REMINDER: This function should always save/restore 'errno', otherwise it could influence error handling. -void proxy_debug_func(enum debug_module module, int verbosity, int thr, const char *__file, int __line, const char *__func, const char *fmt, ...) { +void proxy_debug_func( + enum debug_module module, + int verbosity, + int thr, + const char *__file, + int __line, + const char *__func, + const char *fmt, + ... +) { int saved_errno = errno; assert(module quote_symbol = {"\"", "'", "`"}; vector var_patterns = {}; @@ -198,19 +185,9 @@ void SetParser::generateRE_parse1v2() { string vp = "NULL"; // NULL var_patterns.push_back(vp); - //vp = "\\w+"; // single word - //var_patterns.push_back(vp); + { string vp0 = "(?:\\w|\\d)+"; // single word with letters and digits , for example utf8mb4 and latin1 - //var_patterns.push_back(vp); -/* - string vp1 = "(?:" + vp0 + "(?:," + vp0 + ")*)"; // multiple words (letters and digits) separated by commas WITHOUT any spaces between words . Used also for sql_mode , example: ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO - //var_patterns.push_back(vp1); // do NOT add without quote - for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) { - string s = *it + vp1 + *it; - var_patterns.push_back(s); // add with quote - } -*/ string vp2 = "(?:" + vp0 + "(?:-" + vp0 + ")*)"; // multiple words (letters and digits) separated by dash, WITHOUT any spaces between words . Used also for transaction isolation var_patterns.push_back(vp2); for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) { @@ -218,12 +195,6 @@ void SetParser::generateRE_parse1v2() { var_patterns.push_back(s); // add with quote } } - //vp = "(?:\\w|\\d)+(?:-|\\w|\\d+)*"; // multiple words (letters and digits) separated by dash, WITHOUT any spaces between words . Used ialso for transaction isolation - //var_patterns.push_back(vp); -// for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) { -// string s = *it + vp + *it; -// var_patterns.push_back(s); // add with quote -// } vp = "\\w+(?:,\\w+)+"; // multiple words separated by commas, WITHOUT any spaces between words // NOTE: we do not use multiple words without quotes @@ -268,7 +239,6 @@ void SetParser::generateRE_parse1v2() { vp = "(?:| *(?:\\+|\\-) *)\\d+(?:|\\.\\d+)"; // a signed or unsigned integer or decimal , N7 = merge of N3 and N6 var_patterns.push_back(vp); - { // time_zone in numeric format: // - +/- sign @@ -297,8 +267,6 @@ void SetParser::generateRE_parse1v2() { var_patterns.push_back(s); // add with quote } - - string var_value = "("; for (auto it = var_patterns.begin(); it != var_patterns.end(); it++) { string s = "(?:" + *it + ")"; @@ -317,9 +285,6 @@ void SetParser::generateRE_parse1v2() { parse1v2_opt2->set_case_sensitive(false); parse1v2_opt2->set_longest_match(false); - - - string var_1_0 = "(?:@\\w+|\\w+)"; // @name|name string var_1 = "(" + var_1_0 + "|`" + var_1_0 + "`)"; // var_1_0|`var_1_0` var_1 = SESSION_P1 + var_1; @@ -328,9 +293,6 @@ void SetParser::generateRE_parse1v2() { string name_value = "("; for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) { string s = "(?:" + *it + charset_name + *it + ")"; - //auto it2 = it; - //it2++; - //if (it2 != quote_symbol.end()) s += "|"; name_value += s; } @@ -344,22 +306,7 @@ void SetParser::generateRE_parse1v2() { } #endif -#ifdef PARSERDEBUG -// delete opt2; -// return result; -#endif - -/* -#define QUOTES "(?:'|\"|`)?" -#define SPACES " *" -#define NAMES "(NAMES)" -#define NAME_VALUE "((?:\\w|\\d)+)" -*/ - - - //const std::string pattern="(?:" NAMES SPACES QUOTES NAME_VALUE QUOTES "(?: +COLLATE +" QUOTES NAME_VALUE QUOTES "|)" "|" SESSION_P1 VAR_P1 SPACES "(?:|:)=" SPACES QUOTES VAR_VALUE_P1 QUOTES ") *,? *"; const std::string pattern="(?:" NAMES SPACES + name_value + "(?: +COLLATE +" + name_value + "|)" "|" + var_1 + SPACES "(?:|:)=" SPACES + var_value + ") *,? *"; - //const std::string pattern=var_1 + SPACES "(?:|:)=" SPACES + var_value; #ifdef DEBUG VALGRIND_DISABLE_ERROR_REPORTING; #endif // DEBUG @@ -368,7 +315,6 @@ VALGRIND_DISABLE_ERROR_REPORTING; cout << pattern << endl; } #endif - //re2::RE2 re(pattern, *opt2); parse1v2_pattern = pattern; parse1v2_re = new re2::RE2(parse1v2_pattern, *parse1v2_opt2); parse1v2_init = true; @@ -460,16 +406,8 @@ std::map> SetParser::parse2() { std::map> result; -// regex used: -// SET(?: +)(|SESSION +)TRANSACTION(?: +)(?:(?:(ISOLATION(?: +)LEVEL)(?: +)(REPEATABLE(?: +)READ|READ(?: +)COMMITTED|READ(?: +)UNCOMMITTED|SERIALIZABLE))|(?:(READ)(?: +)(WRITE|ONLY))) -/* -#define SESSION_P2 "(|SESSION)" -#define VAR_P2 "(ISOLATION LEVEL|READ)" -//#define VAR_VALUE "((?:[\\w/\\d:\\+\\-]|,)+)" -//#define VAR_VALUE "((?:CONCAT\\((?:(REPLACE|CONCAT)\\()+@@sql_mode,(?:(?:'|\\w|,| |\"|\\))+(?:\\)))|(?:[@\\w/\\d:\\+\\-]|,)+|(?:)))" -#define VAR_VALUE_P2 "(((?:CONCAT\\()*(?:((?: )*REPLACE|IFNULL|CONCAT)\\()+(?: )*(?:NULL|@OLD_SQL_MODE|@@sql_mode),(?:(?:'|\\w|,| |\"|\\))+(?:\\))*)|(?:[@\\w/\\d:\\+\\-]|,)+|(?:)))" -*/ - //const std::string pattern="(?:" NAMES SPACES QUOTES NAME_VALUE QUOTES "(?: +COLLATE +" QUOTES NAME_VALUE QUOTES "|)" "|" SESSION_P1 VAR_P1 SPACES "(?:|:)=" SPACES QUOTES VAR_VALUE_P1 QUOTES ") *,? *"; + // Regex used: + // SET(?: +)(|SESSION +)TRANSACTION(?: +)(?:(?:(ISOLATION(?: +)LEVEL)(?: +)(REPEATABLE(?: +)READ|READ(?: +)COMMITTED|READ(?: +)UNCOMMITTED|SERIALIZABLE))|(?:(READ)(?: +)(WRITE|ONLY))) const std::string pattern="(|SESSION) *TRANSACTION(?: +)(?:(?:(ISOLATION(?: +)LEVEL)(?: +)(REPEATABLE(?: +)READ|READ(?: +)COMMITTED|READ(?: +)UNCOMMITTED|SERIALIZABLE))|(?:(READ)(?: +)(WRITE|ONLY)))"; re2::RE2 re(pattern, *opt2); std::string var; From 1ebe70bd9e40455b23a175e38757a8a854004b64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 26 Sep 2024 18:10:20 +0200 Subject: [PATCH 3/3] Cleanup: Fix RELEASE build and removed non-used variable --- lib/PgSQL_Monitor.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 8fa95d9b3..5642d830f 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -643,7 +643,9 @@ pair get_task_conn(conn_pool_t& conn_pool, task_st_t& task_st } pgsql_conn_t create_conn(task_st_t& task_st) { +#ifdef DEBUG const mon_srv_t& srv { task_st.op_st.srv_info }; +#endif // Initialize connection parameters const string conn_str { build_conn_str(task_st) }; @@ -1410,12 +1412,10 @@ void* worker_thread(void* args) { static_cast*>(args) }; -#ifdef DEBUG pthread_t self = pthread_self(); -#endif - task_queue_t& tasks_queue = queues->first; - result_queue_t& _ = queues->second; + // TODO: Not used for now; results should be used by scheduler + // result_queue_t& _ = queues->second; bool recv_stop_signal = 0; queue next_tasks {};