diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go index f1db20cbe..b8d358fff 100644 --- a/cmd/icingadb-migrate/misc.go +++ b/cmd/icingadb-migrate/misc.go @@ -4,7 +4,6 @@ import ( "context" "crypto/sha1" "github.com/icinga/icingadb/pkg/contracts" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/objectpacker" icingadbTypes "github.com/icinga/icingadb/pkg/types" @@ -110,7 +109,7 @@ func sliceIdoHistory[Row any]( args["checkpoint"] = checkpoint args["bulk"] = 20000 - if ht.snapshot.DriverName() != driver.MySQL { + if ht.snapshot.DriverName() != icingadb.MySQL { query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "") } diff --git a/config.example.yml b/config.example.yml index f5fd13a06..b8abaa8d6 100644 --- a/config.example.yml +++ b/config.example.yml @@ -50,6 +50,13 @@ database: # It is not possible to set this option to a smaller number than "1". # max_rows_per_transaction: 8192 + # Enforce Galera cluster nodes to perform strict cluster-wide causality checks before executing + # specific SQL queries determined by the number you provided. + # Note: You can only set this option to a number "0 - 15". + # Defaults to 7. + # See https://icinga.com/docs/icinga-db/latest/doc/03-Configuration/#galera-cluster +# wsrep_sync_wait: 7 + # Connection configuration for the Redis server where Icinga 2 writes its configuration, state and history items. # This is the same connection as configured in the 'icingadb' feature of the corresponding Icinga 2 node. # High availability setups require a dedicated Redis server per Icinga 2 node and diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index 5eb77575a..aa8781b61 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -29,20 +29,20 @@ This is also the database used in [Icinga DB Web](https://icinga.com/docs/icinga-db-web) to view and work with the data. In high availability setups, all Icinga DB instances must write to the same database. -| Option | Description | -|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| type | **Optional.** Either `mysql` (default) or `pgsql`. | -| host | **Required.** Database host or absolute Unix socket path. | -| port | **Optional.** Database port. By default, the MySQL or PostgreSQL port, depending on the database type. | -| database | **Required.** Database name. | -| user | **Required.** Database username. | -| password | **Optional.** Database password. | -| tls | **Optional.** Whether to use TLS. | -| cert | **Optional.** Path to TLS client certificate. | -| key | **Optional.** Path to TLS private key. | -| ca | **Optional.** Path to TLS CA certificate. | -| insecure | **Optional.** Whether not to verify the peer. | -| options | **Optional.** List of low-level database options that can be set to influence some Icinga DB internal default behaviours. See [database options](#database-options) for details. | +| Option | Description | +|----------|------------------------------------------------------------------------------------------------------------------------------------------------| +| type | **Optional.** Either `mysql` (default) or `pgsql`. | +| host | **Required.** Database host or absolute Unix socket path. | +| port | **Optional.** Database port. By default, the MySQL or PostgreSQL port, depending on the database type. | +| database | **Required.** Database name. | +| user | **Required.** Database username. | +| password | **Optional.** Database password. | +| tls | **Optional.** Whether to use TLS. | +| cert | **Optional.** Path to TLS client certificate. | +| key | **Optional.** Path to TLS private key. | +| ca | **Optional.** Path to TLS CA certificate. | +| insecure | **Optional.** Whether not to verify the peer. | +| options | **Optional.** List of low-level [database options](#database-options) that can be set to influence some Icinga DB internal default behaviours. | ### Database Options @@ -61,6 +61,7 @@ manual adjustments. | max_connections_per_table | **Optional.** Maximum number of queries Icinga DB is allowed to execute on a single table concurrently. Defaults to `8`. | | max_placeholders_per_statement | **Optional.** Maximum number of placeholders Icinga DB is allowed to use for a single SQL statement. Defaults to `8192`. | | max_rows_per_transaction | **Optional.** Maximum number of rows Icinga DB is allowed to `SELECT`,`DELETE`,`UPDATE` or `INSERT` in a single transaction. Defaults to `8192`. | +| wsrep_sync_wait | **Optional.** Enforce [Galera cluster](#galera-cluster) nodes to perform strict cluster-wide causality checks. Defaults to `7`. | ## Logging Configuration @@ -112,3 +113,21 @@ allowing to keep this information for longer with a smaller storage footprint. A duration string is a sequence of decimal numbers and a unit suffix, such as `"20s"`. Valid units are `"ms"`, `"s"`, `"m"` and `"h"`. + +### Galera Cluster + +Icinga DB expects a more consistent behaviour from its database than a +[Galera cluster](https://mariadb.com/kb/en/what-is-mariadb-galera-cluster/) provides by default. To accommodate this, +Icinga DB sets the [wsrep_sync_wait](https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait) system +variable for all its database connections. Consequently, strict cluster-wide causality checks are enforced before +executing specific SQL queries, which are determined by the value set in the `wsrep_sync_wait` system variable. +By default, Icinga DB sets this to `7`, which includes `READ, UPDATE, DELETE, INSERT, REPLACE` query types and is +usually sufficient. Unfortunately, this also has the downside that every single Icinga DB query will be blocked until +the cluster nodes resynchronise their states after each executed query, and may result in degraded performance. + +However, this does not necessarily have to be the case if, for instance, Icinga DB is only allowed to connect to a +single cluster node at a time. This is the case when a load balancer does not randomly route connections to all the +nodes evenly, but always to the same node until it fails, or if your database cluster nodes have a virtual IP address +fail over assigned. In such situations, you can set the `wsrep_sync_wait` system variable to `0` in the +`/etc/icingadb/config.yml` file to disable it entirely, as Icinga DB doesn't have to wait for cluster +synchronisation then. diff --git a/pkg/config/database.go b/pkg/config/database.go index 1137b555e..a541d5c9c 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -1,25 +1,25 @@ package config import ( + "context" + "database/sql" + "database/sql/driver" "fmt" "github.com/go-sql-driver/mysql" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" + "github.com/lib/pq" "github.com/pkg/errors" "net" "net/url" "strconv" "strings" - "sync" "time" ) -var registerDriverOnce sync.Once - // Database defines database client configuration. type Database struct { Type string `yaml:"type" default:"mysql"` @@ -35,11 +35,7 @@ type Database struct { // Open prepares the DSN string and driver configuration, // calls sqlx.Open, but returns *icingadb.DB. func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { - registerDriverOnce.Do(func() { - driver.Register(logger) - }) - - var dsn string + var db *sqlx.DB switch d.Type { case "mysql": config := mysql.NewConfig() @@ -75,7 +71,19 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { } } - dsn = config.FormatDSN() + _ = mysql.SetLogger(icingadb.MysqlFuncLogger(logger.Debug)) + + c, err := mysql.NewConnector(config) + if err != nil { + return nil, errors.Wrap(err, "can't open mysql database") + } + + wsrepSyncWait := int64(d.Options.WsrepSyncWait) + setWsrepSyncWait := func(ctx context.Context, conn driver.Conn) error { + return setGaleraOpts(ctx, conn, wsrepSyncWait) + } + + db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(c, logger, setWsrepSyncWait)), icingadb.MySQL) case "pgsql": uri := &url.URL{ Scheme: "postgres", @@ -123,16 +131,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { } uri.RawQuery = query.Encode() - dsn = uri.String() + + connector, err := pq.NewConnector(uri.String()) + if err != nil { + return nil, errors.Wrap(err, "can't open pgsql database") + } + + db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(connector, logger, nil)), icingadb.PostgreSQL) default: return nil, unknownDbType(d.Type) } - db, err := sqlx.Open("icingadb-"+d.Type, dsn) - if err != nil { - return nil, errors.Wrap(err, "can't open database") - } - db.SetMaxIdleConns(d.Options.MaxConnections / 3) db.SetMaxOpenConns(d.Options.MaxConnections) @@ -173,3 +182,36 @@ func (d *Database) isUnixAddr() bool { func unknownDbType(t string) error { return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t) } + +// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed +// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key +// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes, +// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped. +// +// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" + + stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) + if err != nil { + if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable + return nil + } + + return errors.Wrap(err, "cannot prepare "+galeraOpts) + } + // This is just for an unexpected exit and any returned error can safely be ignored and in case + // of the normal function exit, the stmt is closed manually, and its error is handled gracefully. + defer func() { _ = stmt.Close() }() + + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + return errors.Wrap(err, "cannot execute "+galeraOpts) + } + + if err = stmt.Close(); err != nil { + return errors.Wrap(err, "cannot close prepared statement "+galeraOpts) + } + + return nil +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go deleted file mode 100644 index 1eecef863..000000000 --- a/pkg/driver/driver.go +++ /dev/null @@ -1,114 +0,0 @@ -package driver - -import ( - "context" - "database/sql" - "database/sql/driver" - "github.com/go-sql-driver/mysql" - "github.com/icinga/icingadb/pkg/backoff" - "github.com/icinga/icingadb/pkg/icingaredis/telemetry" - "github.com/icinga/icingadb/pkg/logging" - "github.com/icinga/icingadb/pkg/retry" - "github.com/jmoiron/sqlx" - "github.com/pkg/errors" - "go.uber.org/zap" - "time" -) - -const MySQL = "icingadb-mysql" -const PostgreSQL = "icingadb-pgsql" - -var timeout = time.Minute * 5 - -// RetryConnector wraps driver.Connector with retry logic. -type RetryConnector struct { - driver.Connector - driver Driver -} - -// Connect implements part of the driver.Connector interface. -func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { - var conn driver.Conn - err := errors.Wrap(retry.WithBackoff( - ctx, - func(ctx context.Context) (err error) { - conn, err = c.Connector.Connect(ctx) - return - }, - shouldRetry, - backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), - retry.Settings{ - Timeout: timeout, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { - telemetry.UpdateCurrentDbConnErr(err) - - if lastErr == nil || err.Error() != lastErr.Error() { - c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) - } - }, - OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { - telemetry.UpdateCurrentDbConnErr(nil) - - if attempt > 0 { - c.driver.Logger.Infow("Reconnected to database", - zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) - } - }, - }, - ), "can't connect to database") - return conn, err -} - -// Driver implements part of the driver.Connector interface. -func (c RetryConnector) Driver() driver.Driver { - return c.driver -} - -// Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector. -type Driver struct { - ctxDriver - Logger *logging.Logger -} - -// OpenConnector implements the DriverContext interface. -func (d Driver) OpenConnector(name string) (driver.Connector, error) { - c, err := d.ctxDriver.OpenConnector(name) - if err != nil { - return nil, err - } - - return &RetryConnector{ - driver: d, - Connector: c, - }, nil -} - -// Register makes our database Driver available under the name "icingadb-*sql". -func Register(logger *logging.Logger) { - sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) - sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger}) - _ = mysql.SetLogger(mysqlLogger(logger.Debug)) - sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR) -} - -// ctxDriver helps ensure that we only support drivers that implement driver.Driver and driver.DriverContext. -type ctxDriver interface { - driver.Driver - driver.DriverContext -} - -// mysqlLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger. -type mysqlLogger func(v ...interface{}) - -// Print implements the mysql.Logger interface. -func (log mysqlLogger) Print(v ...interface{}) { - log(v) -} - -func shouldRetry(err error) bool { - if errors.Is(err, driver.ErrBadConn) { - return true - } - - return retry.Retryable(err) -} diff --git a/pkg/driver/pgsql.go b/pkg/driver/pgsql.go deleted file mode 100644 index 3c88fe05a..000000000 --- a/pkg/driver/pgsql.go +++ /dev/null @@ -1,22 +0,0 @@ -package driver - -import ( - "database/sql/driver" - "github.com/lib/pq" -) - -// PgSQLDriver extends pq.Driver with driver.DriverContext compliance. -type PgSQLDriver struct { - pq.Driver -} - -// Assert interface compliance. -var ( - _ driver.Driver = PgSQLDriver{} - _ driver.DriverContext = PgSQLDriver{} -) - -// OpenConnector implements the driver.DriverContext interface. -func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error) { - return pq.NewConnector(name) -} diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go index e57eafaa1..15d36e86f 100644 --- a/pkg/icingadb/cleanup.go +++ b/pkg/icingadb/cleanup.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/types" "time" ) @@ -20,10 +19,10 @@ type CleanupStmt struct { // Build assembles the cleanup statement for the specified database driver with the given limit. func (stmt *CleanupStmt) Build(driverName string, limit uint64) string { switch driverName { - case driver.MySQL, "mysql": + case MySQL: return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit) - case driver.PostgreSQL, "postgres": + case PostgreSQL: return fmt.Sprintf(`WITH rows AS ( SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d ) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0dde..1bff6a6c4 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -7,7 +7,6 @@ import ( "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/retry" @@ -54,6 +53,12 @@ type Options struct { // MaxRowsPerTransaction defines the maximum number of rows per transaction. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"` + + // WsrepSyncWait enforces Galera cluster nodes to perform strict cluster-wide causality checks + // before executing specific SQL queries determined by the number you provided. + // Please refer to the below link for a detailed description. + // https://icinga.com/docs/icinga-db/latest/doc/03-Configuration/#galera-cluster + WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"` } // Validate checks constraints in the supplied database options and returns an error if they are violated. @@ -70,6 +75,9 @@ func (o *Options) Validate() error { if o.MaxRowsPerTransaction < 1 { return errors.New("max_rows_per_transaction must be at least 1") } + if o.WsrepSyncWait < 0 || o.WsrepSyncWait > 15 { + return errors.New("wsrep_sync_wait can only be set to a number between 0 and 15") + } return nil } @@ -93,9 +101,9 @@ const ( func (db *DB) CheckSchema(ctx context.Context) error { var expectedDbSchemaVersion uint16 switch db.DriverName() { - case driver.MySQL: + case MySQL: expectedDbSchemaVersion = expectedMysqlSchemaVersion - case driver.PostgreSQL: + case PostgreSQL: expectedDbSchemaVersion = expectedPostgresSchemaVersion } @@ -161,10 +169,10 @@ func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) { var clause string switch db.DriverName() { - case driver.MySQL: + case MySQL: // MySQL treats UPDATE id = id as a no-op. clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0]) - case driver.PostgreSQL: + case PostgreSQL: clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table) } @@ -224,10 +232,10 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in var clause, setFormat string switch db.DriverName() { - case driver.MySQL: + case MySQL: clause = "ON DUPLICATE KEY UPDATE" setFormat = `"%[1]s" = VALUES("%[1]s")` - case driver.PostgreSQL: + case PostgreSQL: clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO UPDATE SET", table) setFormat = `"%[1]s" = EXCLUDED."%[1]s"` } diff --git a/pkg/icingadb/driver.go b/pkg/icingadb/driver.go new file mode 100644 index 000000000..e2712ca83 --- /dev/null +++ b/pkg/icingadb/driver.go @@ -0,0 +1,98 @@ +package icingadb + +import ( + "context" + "database/sql/driver" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/retry" + "github.com/pkg/errors" + "go.uber.org/zap" + "time" +) + +// Driver names as automatically registered in the database/sql package by themselves. +const ( + MySQL string = "mysql" + PostgreSQL string = "postgres" +) + +type InitConnFunc func(context.Context, driver.Conn) error + +// RetryConnector wraps driver.Connector with retry logic. +type RetryConnector struct { + driver.Connector + + logger *logging.Logger + + // initConn can be used to execute post Connect() arbitrary actions. + // It will be called after successfully initiated a new connection using the connector's Connect method. + initConn InitConnFunc +} + +// NewConnector creates a fully initialized RetryConnector from the given args. +func NewConnector(c driver.Connector, logger *logging.Logger, init InitConnFunc) *RetryConnector { + return &RetryConnector{Connector: c, logger: logger, initConn: init} +} + +// Connect implements part of the driver.Connector interface. +func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { + var conn driver.Conn + err := errors.Wrap(retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + conn, err = c.Connector.Connect(ctx) + if err == nil && c.initConn != nil { + if err = c.initConn(ctx, conn); err != nil { + // We're going to retry this, so just don't bother whether Close() fails! + _ = conn.Close() + } + } + + return + }, + shouldRetry, + backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), + retry.Settings{ + Timeout: time.Minute * 5, + OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + telemetry.UpdateCurrentDbConnErr(err) + + if lastErr == nil || err.Error() != lastErr.Error() { + c.logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { + telemetry.UpdateCurrentDbConnErr(nil) + + if attempt > 0 { + c.logger.Infow("Reconnected to database", + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + } + }, + }, + ), "can't connect to database") + return conn, err +} + +// Driver implements part of the driver.Connector interface. +func (c RetryConnector) Driver() driver.Driver { + return c.Connector.Driver() +} + +// MysqlFuncLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger. +type MysqlFuncLogger func(v ...interface{}) + +// Print implements the mysql.Logger interface. +func (log MysqlFuncLogger) Print(v ...interface{}) { + log(v) +} + +func shouldRetry(err error) bool { + if errors.Is(err, driver.ErrBadConn) { + return true + } + + return retry.Retryable(err) +} diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 939446d06..7e821c76a 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -9,7 +9,6 @@ import ( "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" - "github.com/icinga/icingadb/pkg/driver" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -262,7 +261,7 @@ func (h *HA) realize( isoLvl := sql.LevelSerializable selectLock := "" - if h.db.DriverName() == driver.MySQL { + if h.db.DriverName() == MySQL { // The RDBMS may actually be a Percona XtraDB Cluster which doesn't // support serializable transactions, but only their following equivalent: isoLvl = sql.LevelRepeatableRead