Skip to content

Commit

Permalink
Make value for SET SESSION wsrep_sync_wait= configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Al2Klimov committed Mar 8, 2024
1 parent 2aa1778 commit 2f2c943
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 11 deletions.
6 changes: 5 additions & 1 deletion pkg/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {

config.DBName = d.Database
config.Timeout = time.Minute
config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"}

config.Params = map[string]string{
"sql_mode": "ANSI_QUOTES",
"wsrep_sync_wait": strconv.FormatInt(int64(d.Options.WsrepSyncWait), 10),
}

tlsConfig, err := d.TlsOptions.MakeConfig(d.Host)
if err != nil {
Expand Down
35 changes: 26 additions & 9 deletions pkg/driver/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
"strconv"
)

// MySQLDriver extends mysql.MySQLDriver with auto-SETting Galera cluster options.
Expand All @@ -24,19 +25,32 @@ func (md MySQLDriver) Open(name string) (driver.Conn, error) {

// OpenConnector implements the driver.DriverContext interface.
func (md MySQLDriver) OpenConnector(name string) (driver.Connector, error) {
var wsrepSyncWait int64

if config, err := mysql.ParseDSN(name); err == nil {
if s, ok := config.Params["wsrep_sync_wait"]; ok {
if i, err := strconv.ParseInt(s, 10, 64); err == nil {
wsrepSyncWait = i
delete(config.Params, "wsrep_sync_wait")
name = config.FormatDSN()
}
}
}

connector, err := md.MySQLDriver.OpenConnector(name)
if err != nil {
return nil, err
}

return &galeraAwareConnector{connector, md}, nil
return &galeraAwareConnector{connector, wsrepSyncWait, md}, nil
}

// galeraAwareConnector extends mysql.connector with auto-SETting Galera cluster options.
type galeraAwareConnector struct {
driver.Connector

driver driver.Driver
wsrepSyncWait int64
driver driver.Driver
}

// Connect implements the driver.Connector interface.
Expand All @@ -46,7 +60,7 @@ func (gac *galeraAwareConnector) Connect(ctx context.Context) (driver.Conn, erro
return nil, err
}

if err := setGaleraOpts(ctx, conn); err != nil {
if err := setGaleraOpts(ctx, conn, gac.wsrepSyncWait); err != nil {
_ = conn.Close()
return nil, err
}
Expand All @@ -61,22 +75,25 @@ func (gac *galeraAwareConnector) Driver() driver.Driver {

var errUnknownSysVar = &mysql.MySQLError{Number: 1193}

// setGaleraOpts tries SET SESSION wsrep_sync_wait=7.
// setGaleraOpts tries SET SESSION wsrep_sync_wait.
//
// This ensures causality checks will take place before executing anything,
// This ensures causality checks will take place before execution,
// ensuring that every statement is executed on a fully synced node.
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
//
// It prevents running into foreign key errors while inserting into linked tables on different MySQL nodes.
// Error 1193 "Unknown system variable" is ignored to support MySQL single nodes.
func setGaleraOpts(ctx context.Context, conn driver.Conn) error {
const galeraOpts = "SET SESSION wsrep_sync_wait=7"
func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error {
const galeraOpts = "SET SESSION wsrep_sync_wait=?"

stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts)
if err != nil {
err = errors.Wrap(err, "can't prepare "+galeraOpts)
} else if _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, nil); err != nil {
err = errors.Wrap(err, "can't execute "+galeraOpts)
} else {
_, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}})
if err != nil {
err = errors.Wrap(err, "can't execute "+galeraOpts)
}
}

if err != nil && errors.Is(err, errUnknownSysVar) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/driver/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestSetGaleraOpts(t *testing.T) {

for _, st := range subtests {
t.Run(st.name, func(t *testing.T) {
assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input), st.output)
assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input, 7), st.output)
assert.GreaterOrEqual(t, st.input.prepareCalls, uint8(1))

if ts, ok := st.input.preparedStmt.(*testStmt); ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/icingadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Options struct {
// MaxRowsPerTransaction defines the maximum number of rows per transaction.
// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"`

// WsrepSyncWait defines which kinds of SQL statements catch up all pending sync between nodes first, see:
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"`
}

// Validate checks constraints in the supplied database options and returns an error if they are violated.
Expand Down

0 comments on commit 2f2c943

Please sign in to comment.