diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e1c..72f59ca78 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -61,7 +61,11 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { config.DBName = d.Database config.Timeout = time.Minute - config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"} + + config.Params = map[string]string{ + "sql_mode": "ANSI_QUOTES", + "wsrep_sync_wait": strconv.FormatInt(int64(d.Options.WsrepSyncWait), 10), + } tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) if err != nil { diff --git a/pkg/driver/mysql.go b/pkg/driver/mysql.go index 728ca2bd9..c59870139 100644 --- a/pkg/driver/mysql.go +++ b/pkg/driver/mysql.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "github.com/go-sql-driver/mysql" "github.com/pkg/errors" + "strconv" ) // MySQLDriver extends mysql.MySQLDriver with auto-SETting Galera cluster options. @@ -24,19 +25,34 @@ func (md MySQLDriver) Open(name string) (driver.Conn, error) { // OpenConnector implements the driver.DriverContext interface. func (md MySQLDriver) OpenConnector(name string) (driver.Connector, error) { + var wsrepSyncWait int64 + + if config, err := mysql.ParseDSN(name); err == nil { + if s, ok := config.Params["wsrep_sync_wait"]; ok { + if i, err := strconv.ParseInt(s, 10, 64); err == nil { + // MySQL single nodes don't know wsrep_sync_wait and fail with error 1193 "Unknown system variable". + // We have to SET it manually later and swallow error 1193 not to fail our connections. + wsrepSyncWait = i + delete(config.Params, "wsrep_sync_wait") + name = config.FormatDSN() + } + } + } + connector, err := md.MySQLDriver.OpenConnector(name) if err != nil { return nil, err } - return &galeraAwareConnector{connector, md}, nil + return &galeraAwareConnector{connector, wsrepSyncWait, md}, nil } // galeraAwareConnector extends mysql.connector with auto-SETting Galera cluster options. type galeraAwareConnector struct { driver.Connector - driver driver.Driver + wsrepSyncWait int64 + driver driver.Driver } // Connect implements the driver.Connector interface. @@ -46,7 +62,7 @@ func (gac *galeraAwareConnector) Connect(ctx context.Context) (driver.Conn, erro return nil, err } - if err := setGaleraOpts(ctx, conn); err != nil { + if err := setGaleraOpts(ctx, conn, gac.wsrepSyncWait); err != nil { _ = conn.Close() return nil, err } @@ -61,22 +77,25 @@ func (gac *galeraAwareConnector) Driver() driver.Driver { var errUnknownSysVar = &mysql.MySQLError{Number: 1193} -// setGaleraOpts tries SET SESSION wsrep_sync_wait=7. +// setGaleraOpts tries SET SESSION wsrep_sync_wait. // -// This ensures causality checks will take place before executing anything, +// This ensures causality checks will take place before execution, // ensuring that every statement is executed on a fully synced node. // https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait // // It prevents running into foreign key errors while inserting into linked tables on different MySQL nodes. // Error 1193 "Unknown system variable" is ignored to support MySQL single nodes. -func setGaleraOpts(ctx context.Context, conn driver.Conn) error { - const galeraOpts = "SET SESSION wsrep_sync_wait=7" +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) if err != nil { err = errors.Wrap(err, "can't prepare "+galeraOpts) - } else if _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, nil); err != nil { - err = errors.Wrap(err, "can't execute "+galeraOpts) + } else { + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + err = errors.Wrap(err, "can't execute "+galeraOpts) + } } if err != nil && errors.Is(err, errUnknownSysVar) { diff --git a/pkg/driver/mysql_test.go b/pkg/driver/mysql_test.go index 38afc531a..16587674a 100644 --- a/pkg/driver/mysql_test.go +++ b/pkg/driver/mysql_test.go @@ -96,7 +96,7 @@ func TestSetGaleraOpts(t *testing.T) { for _, st := range subtests { t.Run(st.name, func(t *testing.T) { - assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input), st.output) + assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input, 7), st.output) assert.GreaterOrEqual(t, st.input.prepareCalls, uint8(1)) if ts, ok := st.input.preparedStmt.(*testStmt); ok { diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0dde..91901608d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -54,6 +54,10 @@ type Options struct { // MaxRowsPerTransaction defines the maximum number of rows per transaction. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"` + + // WsrepSyncWait defines which kinds of SQL statements catch up all pending sync between nodes first, see: + // https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait + WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"` } // Validate checks constraints in the supplied database options and returns an error if they are violated.