From 6471270ce3780e1385dbc6026b384b1b00e31d20 Mon Sep 17 00:00:00 2001 From: secwall Date: Fri, 23 Aug 2024 18:42:54 +0200 Subject: [PATCH 1/5] Resolve zk hosts in background (#123) --- Makefile | 2 +- internal/app/app.go | 12 +- internal/app/cli.go | 56 +++++----- internal/dcs/config.go | 10 +- internal/dcs/zk.go | 5 +- internal/dcs/zk_host_provider.go | 181 ++++++++++++++++++------------- internal/log/log.go | 10 +- 7 files changed, 155 insertions(+), 121 deletions(-) diff --git a/Makefile b/Makefile index c3035248..3de76f18 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ format: goimports -w `find . -name '*.go'` lint: - docker run --rm -v ${CURDIR}:/app -w /app golangci/golangci-lint:v1.58 golangci-lint run -v + docker run --rm -v ${CURDIR}:/app -w /app golangci/golangci-lint:v1.60 golangci-lint run -v unittests: go test ./cmd/... ./internal/... diff --git a/internal/app/app.go b/internal/app/app.go index 8bf4dbdb..c6c3d639 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -107,7 +107,7 @@ func (app *App) baseContext() context.Context { func (app *App) connectDCS() error { var err error // TODO: support other DCS systems - app.dcs, err = dcs.NewZookeeper(&app.config.Zookeeper, app.logger) + app.dcs, err = dcs.NewZookeeper(app.baseContext(), &app.config.Zookeeper, app.logger) if err != nil { return fmt.Errorf("failed to connect to zkDCS: %s", err.Error()) } @@ -616,7 +616,7 @@ func (app *App) stateManager() appState { // activeNodes are master + alive running replicas activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return stateManager } app.logger.Infof("active: %v", activeNodes) @@ -681,7 +681,7 @@ func (app *App) stateManager() appState { } return stateManager } else if err != dcs.ErrNotFound { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return stateManager } @@ -2303,7 +2303,7 @@ func (app *App) Run() int { err := app.lockFile() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.unlockFile() @@ -2313,14 +2313,14 @@ func (app *App) Run() int { err = app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() diff --git a/internal/app/cli.go b/internal/app/cli.go index 6756f73f..c5389269 100644 --- a/internal/app/cli.go +++ b/internal/app/cli.go @@ -20,7 +20,7 @@ import ( func (app *App) CliInfo(short bool) int { err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } app.dcs.Initialize() @@ -28,12 +28,12 @@ func (app *App) CliInfo(short bool) int { err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() if err := app.cluster.UpdateHostsInfo(); err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -57,7 +57,7 @@ func (app *App) CliInfo(short bool) int { activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } sort.Strings(activeNodes) @@ -123,7 +123,7 @@ func (app *App) CliInfo(short bool) int { } else { tree, err = app.dcs.GetTree("") if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } } @@ -140,20 +140,20 @@ func (app *App) CliInfo(short bool) int { func (app *App) CliState(short bool) int { err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() app.dcs.Initialize() err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() if err := app.cluster.UpdateHostsInfo(); err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -191,20 +191,20 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() app.dcs.Initialize() err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() if err := app.cluster.UpdateHostsInfo(); err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -223,7 +223,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -277,12 +277,12 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration // to avoid switching from one to another, use switch to behavior positions, err := app.getNodePositions(candidates) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } toHost, err = getMostDesirableNode(app.logger, positions, app.switchHelper.GetPriorityChoiceMaxLag()) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } } @@ -295,7 +295,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration return 2 } if err != dcs.ErrNotFound { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 2 } @@ -311,7 +311,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration return 2 } if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } // wait for switchover to complete @@ -353,7 +353,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { ctx := app.baseContext() err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() @@ -365,7 +365,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { } err = app.dcs.Create(pathMaintenance, maintenance) if err != nil && err != dcs.ErrExists { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } // wait for mysync to pause @@ -379,7 +379,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { case <-ticker.C: err = app.dcs.Get(pathMaintenance, maintenance) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) } if maintenance.MySyncPaused { break Out @@ -404,7 +404,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { ctx := app.baseContext() err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() @@ -439,7 +439,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { break Out } if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) } case <-waitCtx.Done(): break Out @@ -495,7 +495,7 @@ func (app *App) CliAbort() int { return 0 } if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -514,7 +514,7 @@ func (app *App) CliAbort() int { err = app.dcs.Delete(pathCurrentSwitch) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -526,7 +526,7 @@ func (app *App) CliAbort() int { func (app *App) CliHostList() int { err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } app.dcs.Initialize() @@ -534,7 +534,7 @@ func (app *App) CliHostList() int { err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() @@ -584,7 +584,7 @@ func (app *App) CliHostAdd(host string, streamFrom *string, priority *int64, dry err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() @@ -601,7 +601,7 @@ func (app *App) CliHostAdd(host string, streamFrom *string, priority *int64, dry err = app.cluster.UpdateHostsInfo() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -667,7 +667,7 @@ func (app *App) CliHostRemove(host string) int { err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() diff --git a/internal/dcs/config.go b/internal/dcs/config.go index 7884ea4d..6eff2e66 100644 --- a/internal/dcs/config.go +++ b/internal/dcs/config.go @@ -31,14 +31,16 @@ type ZookeeperConfig struct { } type RandomHostProviderConfig struct { - LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"` - LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"` + LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"` + LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"` + LookupTickInterval time.Duration `config:"lookup_tick_interval" yaml:"lookup_tick_interval"` } func DefaultRandomHostProviderConfig() RandomHostProviderConfig { return RandomHostProviderConfig{ - LookupTimeout: 3 * time.Second, - LookupTTL: 300 * time.Second, + LookupTimeout: 3 * time.Second, + LookupTTL: 300 * time.Second, + LookupTickInterval: 60 * time.Second, } } diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index 64373fac..c56e8ccd 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -1,6 +1,7 @@ package dcs import ( + "context" "encoding/json" "fmt" "net" @@ -52,7 +53,7 @@ func retry(config *ZookeeperConfig, operation func() error) error { } // NewZookeeper returns Zookeeper based DCS storage -func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) { +func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *log.Logger) (DCS, error) { if len(config.Hosts) == 0 { return nil, fmt.Errorf("zookeeper not configured, fill zookeeper/hosts in config") } @@ -70,7 +71,7 @@ func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) { var ec <-chan zk.Event var err error var operation func() error - hostProvider := NewRandomHostProvider(&config.RandomHostProvider, logger) + hostProvider := NewRandomHostProvider(ctx, &config.RandomHostProvider, logger) if config.UseSSL { if config.CACert == "" || config.KeyFile == "" || config.CertFile == "" { return nil, fmt.Errorf("zookeeper ssl not configured, fill ca_cert/key_file/cert_file in config or disable use_ssl flag") diff --git a/internal/dcs/zk_host_provider.go b/internal/dcs/zk_host_provider.go index 5ead36fc..9a50a475 100644 --- a/internal/dcs/zk_host_provider.go +++ b/internal/dcs/zk_host_provider.go @@ -11,119 +11,150 @@ import ( "github.com/yandex/mysync/internal/log" ) +type zkhost struct { + resolved []string + lastLookup time.Time +} + type RandomHostProvider struct { - lock sync.Mutex - servers []string - resolved []string - tried map[string]struct{} - logger *log.Logger - lastLookup time.Time - lookupTTL time.Duration - lookupTimeout time.Duration - resolver *net.Resolver + ctx context.Context + hosts sync.Map + hostsKeys []string + tried map[string]struct{} + logger *log.Logger + lookupTTL time.Duration + lookupTimeout time.Duration + lookupTickInterval time.Duration + resolver *net.Resolver } -func NewRandomHostProvider(config *RandomHostProviderConfig, logger *log.Logger) *RandomHostProvider { +func NewRandomHostProvider(ctx context.Context, config *RandomHostProviderConfig, logger *log.Logger) *RandomHostProvider { return &RandomHostProvider{ - lookupTTL: config.LookupTTL, - lookupTimeout: config.LookupTimeout, - logger: logger, - tried: make(map[string]struct{}), - resolver: &net.Resolver{}, + ctx: ctx, + lookupTTL: config.LookupTTL, + lookupTimeout: config.LookupTimeout, + lookupTickInterval: config.LookupTickInterval, + logger: logger, + tried: make(map[string]struct{}), + hosts: sync.Map{}, + resolver: &net.Resolver{}, } } func (rhp *RandomHostProvider) Init(servers []string) error { - rhp.lock.Lock() - defer rhp.lock.Unlock() + numResolved := 0 - rhp.servers = servers - - err := rhp.resolveHosts() + for _, host := range servers { + resolved, err := rhp.resolveHost(host) + if err != nil { + rhp.logger.Errorf("host definition %s is invalid %v", host, err) + continue + } + numResolved += len(resolved) + rhp.hosts.Store(host, zkhost{ + resolved: resolved, + lastLookup: time.Now(), + }) + rhp.hostsKeys = append(rhp.hostsKeys, host) + } - if err != nil { - return fmt.Errorf("failed to init zk host provider %v", err) + if numResolved == 0 { + return fmt.Errorf("unable to resolve any host from %v", servers) } + go rhp.resolveHosts() + return nil } -func (rhp *RandomHostProvider) resolveHosts() error { - resolved := []string{} - for _, server := range rhp.servers { - host, port, err := net.SplitHostPort(server) - if err != nil { - return err - } - ctx, cancel := context.WithTimeout(context.Background(), rhp.lookupTimeout) - defer cancel() - addrs, err := rhp.resolver.LookupHost(ctx, host) - if err != nil { - rhp.logger.Errorf("unable to resolve %s: %v", host, err) - } - for _, addr := range addrs { - resolved = append(resolved, net.JoinHostPort(addr, port)) +func (rhp *RandomHostProvider) resolveHosts() { + ticker := time.NewTicker(rhp.lookupTickInterval) + for { + select { + case <-ticker.C: + for _, pair := range rhp.hostsKeys { + host, _ := rhp.hosts.Load(pair) + zhost := host.(zkhost) + + if len(zhost.resolved) == 0 || time.Since(zhost.lastLookup) > rhp.lookupTTL { + resolved, err := rhp.resolveHost(pair) + if err != nil || len(resolved) == 0 { + rhp.logger.Errorf("background resolve for %s failed: %v", pair, err) + continue + } + rhp.hosts.Store(pair, zkhost{ + resolved: resolved, + lastLookup: time.Now(), + }) + } + } + case <-rhp.ctx.Done(): + return } } +} - if len(resolved) == 0 { - return fmt.Errorf("no hosts resolved for %q", rhp.servers) +func (rhp *RandomHostProvider) resolveHost(pair string) ([]string, error) { + var res []string + host, port, err := net.SplitHostPort(pair) + if err != nil { + return res, err + } + ctx, cancel := context.WithTimeout(rhp.ctx, rhp.lookupTimeout) + defer cancel() + addrs, err := rhp.resolver.LookupHost(ctx, host) + if err != nil { + rhp.logger.Errorf("unable to resolve %s: %v", host, err) + } + for _, addr := range addrs { + res = append(res, net.JoinHostPort(addr, port)) } - rhp.lastLookup = time.Now() - rhp.resolved = resolved - - rand.Shuffle(len(rhp.resolved), func(i, j int) { rhp.resolved[i], rhp.resolved[j] = rhp.resolved[j], rhp.resolved[i] }) - - return nil + return res, nil } func (rhp *RandomHostProvider) Len() int { - rhp.lock.Lock() - defer rhp.lock.Unlock() - return len(rhp.resolved) + return len(rhp.hostsKeys) } func (rhp *RandomHostProvider) Next() (server string, retryStart bool) { - rhp.lock.Lock() - defer rhp.lock.Unlock() - lastTime := time.Since(rhp.lastLookup) needRetry := false - if lastTime > rhp.lookupTTL { - err := rhp.resolveHosts() - if err != nil { - rhp.logger.Errorf("resolve zk hosts failed: %v", err) - } - } - notTried := []string{} + var ret string - for _, addr := range rhp.resolved { - if _, ok := rhp.tried[addr]; !ok { - notTried = append(notTried, addr) + for len(ret) == 0 { + notTried := []string{} + + for _, host := range rhp.hostsKeys { + if _, ok := rhp.tried[host]; !ok { + notTried = append(notTried, host) + } } - } - var selected string + var selected string + if len(notTried) == 0 { + needRetry = true + for k := range rhp.tried { + delete(rhp.tried, k) + } + selected = rhp.hostsKeys[rand.Intn(len(rhp.hostsKeys))] + } else { + selected = notTried[rand.Intn(len(notTried))] + } + rhp.tried[selected] = struct{}{} + + host, _ := rhp.hosts.Load(selected) + zhost := host.(zkhost) - if len(notTried) == 0 { - needRetry = true - for k := range rhp.tried { - delete(rhp.tried, k) + if len(zhost.resolved) > 0 { + ret = zhost.resolved[rand.Intn(len(zhost.resolved))] } - selected = rhp.resolved[rand.Intn(len(rhp.resolved))] - } else { - selected = notTried[rand.Intn(len(notTried))] } - rhp.tried[selected] = struct{}{} - - return selected, needRetry + return ret, needRetry } func (rhp *RandomHostProvider) Connected() { - rhp.lock.Lock() - defer rhp.lock.Unlock() for k := range rhp.tried { delete(rhp.tried, k) } diff --git a/internal/log/log.go b/internal/log/log.go index 72ff9f0e..7630efd4 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -125,23 +125,23 @@ func (l *Logger) printf(lvl Level, msg string, args ...interface{}) { } func (l *Logger) Debug(msg string) { - l.Debugf(msg) + l.Debugf("%s", msg) } func (l *Logger) Info(msg string) { - l.Infof(msg) + l.Infof("%s", msg) } func (l *Logger) Warn(msg string) { - l.Warnf(msg) + l.Warnf("%s", msg) } func (l *Logger) Error(msg string) { - l.Errorf(msg) + l.Errorf("%s", msg) } func (l *Logger) Fatal(msg string) { - l.Fatalf(msg) + l.Fatalf("%s", msg) } func (l *Logger) Debugf(msg string, args ...interface{}) { From 9a266acdd089289f74317ab2037a1f10ab1ab946 Mon Sep 17 00:00:00 2001 From: secwall Date: Mon, 26 Aug 2024 14:15:39 +0200 Subject: [PATCH 2/5] Fix go mod tidy (#124) --- go.mod | 22 +++++++++++++--------- go.sum | 57 ++++++++++++++++++++++++++++----------------------------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 8fff835e..f20466a9 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.22 require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/cucumber/godog v0.14.1 - github.com/docker/docker v27.1.1+incompatible - github.com/go-mysql-org/go-mysql v1.8.0 + github.com/docker/docker v27.1.2+incompatible + github.com/go-mysql-org/go-mysql v1.9.0 github.com/go-sql-driver/mysql v1.8.1 github.com/go-zookeeper/zk v1.0.4 github.com/gofrs/flock v0.12.1 @@ -43,7 +43,7 @@ require ( github.com/hashicorp/go-memdb v1.3.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae // indirect + github.com/lufia/plan9stats v0.0.0-20240819163618-b1d8f4d146e7 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect github.com/morikuni/aec v1.0.0 // indirect @@ -59,15 +59,19 @@ require ( github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect go.opentelemetry.io/otel/sdk v1.22.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sys v0.22.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.4.0 // indirect ) diff --git a/go.sum b/go.sum index 61a48d25..9b983894 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= -github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v27.1.2+incompatible h1:AhGzR1xaQIy53qCkxARaFluI00WPGtXn0AJuoQsVYTY= +github.com/docker/docker v27.1.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -68,8 +68,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-mysql-org/go-mysql v1.8.0 h1:bN+/Q5yyQXQOAabXPkI3GZX43w4Tsj2DIthjC9i6CkQ= -github.com/go-mysql-org/go-mysql v1.8.0/go.mod h1:kwbF156Z9Sy8amP3E1SZp7/s/0PuJj/xKaOWToQiq0Y= +github.com/go-mysql-org/go-mysql v1.9.0 h1:2YniuBkyD+Ll8HWfZcaJ3JtibUohZTjwbb27ZWhYdOA= +github.com/go-mysql-org/go-mysql v1.9.0/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= @@ -96,8 +96,6 @@ github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -181,8 +179,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae h1:dIZY4ULFcto4tAFlj1FYZl8ztUZ13bdq+PLY+NOfbyI= -github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= +github.com/lufia/plan9stats v0.0.0-20240819163618-b1d8f4d146e7 h1:5RK988zAqB3/AN3opGfRpoQgAVqr6/A5+qRTi67VUZY= +github.com/lufia/plan9stats v0.0.0-20240819163618-b1d8f4d146e7/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= @@ -293,20 +291,20 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 h1:9M3+rhx7kZCIQQhQRYaZCdNu1V73tm4TvXs2ntl98C4= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0/go.mod h1:noq80iT8rrHP1SfybmPiRGc9dc5M8RPmGvtwo7Oo7tc= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0 h1:FyjCyI9jVEfqhUh2MoSkmolPjfh5fp2hnV0b0irxH4Q= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0/go.mod h1:hYwym2nDEeZfG/motx0p7L7J1N1vyzIThemQsb4g2qY= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw= go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -340,8 +338,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -371,14 +369,14 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -400,17 +398,18 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107 h1:xtNn7qFlagY2mQNFHMSRPjT2RkOV4OXM7P5TVy9xATo= google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= -google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 h1:W18sezcAYs+3tDZX4F80yctqa12jcP1PUS2gQu1zTPU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= +google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8= +google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= -google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= -google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= -google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 9c0f694d937eba5e53c099f36714db91f82c4192 Mon Sep 17 00:00:00 2001 From: munakoiso Date: Tue, 27 Aug 2024 20:17:24 +0500 Subject: [PATCH 3/5] set acl in zk.set, zk.acquire_lock and zk.create functions (#125) --- internal/dcs/zk.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index c56e8ccd..dad96014 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -332,7 +332,7 @@ func (z *zkDCS) AcquireLock(path string) bool { if err != nil { panic(fmt.Sprintf("failed to serialize to JSON %#v", self)) } - _, err = z.retryCreate(fullPath, data, zk.FlagEphemeral, nil) + _, err = z.retryCreate(fullPath, data, zk.FlagEphemeral, z.acl) if err != nil { if err != zk.ErrNodeExists { z.logger.Errorf("failed to acquire lock %s: %v", fullPath, err) @@ -377,7 +377,7 @@ func (z *zkDCS) create(path string, val interface{}, flags int32) error { if err != nil { panic(fmt.Sprintf("failed to serialize to JSON %#v", val)) } - _, err = z.retryCreate(fullPath, data, flags, nil) + _, err = z.retryCreate(fullPath, data, flags, z.acl) if err != nil { if err == zk.ErrNodeExists { return ErrExists @@ -412,7 +412,7 @@ func (z *zkDCS) set(path string, val interface{}, flags int32) error { if err != nil { return err } - _, err = z.retryCreate(fullPath, data, flags, nil) + _, err = z.retryCreate(fullPath, data, flags, z.acl) if err != nil { z.logger.Errorf("failed to create node %s with %v: %v", fullPath, val, err) } From 1cb0a25b4bddc1b9fd6374e82d337c098fc48c9a Mon Sep 17 00:00:00 2001 From: noname0443 Date: Thu, 29 Aug 2024 10:54:12 +0300 Subject: [PATCH 4/5] large binlog replication fix (#126) * large binlog replication fix * Add logs and fix style * Log style fix --- internal/app/app.go | 54 ++++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index c6c3d639..02f79fe4 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -33,6 +33,7 @@ type App struct { cluster *mysql.Cluster filelock *flock.Flock nodeFailedAt map[string]time.Time + slaveReadPositions map[string]string streamFromFailedAt map[string]time.Time daemonState *DaemonState daemonMutex sync.Mutex @@ -71,6 +72,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) { nodeFailedAt: make(map[string]time.Time), streamFromFailedAt: make(map[string]time.Time), replRepairState: make(map[string]*ReplicationRepairState), + slaveReadPositions: make(map[string]string), externalReplication: externalReplication, switchHelper: switchHelper, } @@ -880,7 +882,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt if node.PingDubious || clusterStateDcs[host].PingOk { // we can't rely on ping and slave status if ping was dubios if util.ContainsString(oldActiveNodes, host) { - app.logger.Warnf("calc active nodes: %s is dubious or keep heath lock in dcs, keeping active...", host) + app.logger.Warnf("calc active nodes: %s is dubious or keep health lock in dcs, keeping active...", host) activeNodes = append(activeNodes, host) } continue @@ -896,6 +898,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt } } else { app.logger.Errorf("calc active nodes: %s is down, deleting from active...", host) + delete(app.slaveReadPositions, host) } continue } else { @@ -918,7 +921,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt return activeNodes, nil } -func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activeNodes []string, oldActiveNodes []string, master string) (becomeActive, becomeInactive []string, err error) { +func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activeNodes []string, oldActiveNodes []string, master string) (becomeActive, becomeInactive, becomeDataLag []string, err error) { masterNode := app.cluster.Get(master) var syncReplicas []string var deadReplicas []string @@ -945,29 +948,39 @@ func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activ } } + var dataLagging []string if len(becomeActive) > 0 { // Some replicas are going to become semi-sync. // We need to check that they downloaded (not replayed) almost all binary logs, // in order to prevent master freezing. // We can't check all the replicas on each iteration, because SHOW BINARY LOGS is pretty heavy request - var dataLagging []string masterBinlogs, err := masterNode.GetBinlogs() if err != nil { app.logger.Errorf("calc active nodes: failed to list master binlogs on %s: %v", master, err) - return nil, nil, err + return nil, nil, nil, err } for _, host := range becomeActive { slaveState := clusterState[host].SlaveState dataLag := calcLagBytes(masterBinlogs, slaveState.MasterLogFile, slaveState.MasterLogPos) if dataLag > app.config.SemiSyncEnableLag { - app.logger.Warnf("calc active nodes: %v should become active, but it has data lag %d, delaying...", host, dataLag) - dataLagging = append(dataLagging, host) - becomeInactive = append(becomeInactive, host) + newBinLogPos := fmt.Sprintf("%s%019d", slaveState.MasterLogFile, slaveState.MasterLogPos) + oldBinLogPos := app.slaveReadPositions[host] + + if newBinLogPos <= oldBinLogPos { + app.logger.Warnf("calc active nodes: %v should become active, but it has data lag %d and it's IO is stopped, delaying...", host, dataLag) + becomeInactive = append(becomeInactive, host) + } else { + app.logger.Warnf("calc active nodes: %v has data lag %d, but it's IO is working. Old binlog: %s, new binlog: %s", host, dataLag, oldBinLogPos, newBinLogPos) + dataLagging = append(dataLagging, host) + } + + app.slaveReadPositions[host] = newBinLogPos } } becomeActive = filterOut(becomeActive, dataLagging) + becomeActive = filterOut(becomeActive, becomeInactive) } - return becomeActive, becomeInactive, nil + return becomeActive, becomeInactive, dataLagging, nil } /* @@ -1006,7 +1019,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node return nil } - becomeActive, becomeInactive, err := app.calcActiveNodesChanges(clusterState, activeNodes, oldActiveNodes, master) + becomeActive, becomeInactive, becomeDataLag, err := app.calcActiveNodesChanges(clusterState, activeNodes, oldActiveNodes, master) if err != nil { app.logger.Errorf("update active nodes: failed to calc active nodes changes: %v", err) return err @@ -1043,7 +1056,14 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node } } for _, host := range becomeInactive { - err = app.disableSemiSyncOnSlave(host) + err = app.disableSemiSyncOnSlave(host, true) + if err != nil { + app.logger.Warnf("failed to disable semi-sync on slave %s: %v", host, err) + return err + } + } + for _, host := range becomeDataLag { + err = app.disableSemiSyncOnSlave(host, false) if err != nil { app.logger.Warnf("failed to disable semi-sync on slave %s: %v", host, err) return err @@ -1131,18 +1151,22 @@ func (app *App) enableSemiSyncOnSlave(host string, slaveState, masterState *Node return nil } -func (app *App) disableSemiSyncOnSlave(host string) error { +func (app *App) disableSemiSyncOnSlave(host string, restartIOThread bool) error { node := app.cluster.Get(host) err := node.SemiSyncDisable() if err != nil { app.logger.Errorf("failed to enable semi_sync_slave on %s: %s", host, err) return err } - err = node.RestartSlaveIOThread() - if err != nil { - app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err) - return err + + if restartIOThread { + err = node.RestartSlaveIOThread() + if err != nil { + app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err) + return err + } } + return nil } From dd7ca94f320ed588e2e037c096d424dfe036fb6c Mon Sep 17 00:00:00 2001 From: noname0443 Date: Fri, 30 Aug 2024 15:23:09 +0300 Subject: [PATCH 5/5] Fix tests on jammy (#127) * Run tests on jammy * supervisor fix * fix type --------- Co-authored-by: Aleksandr Shevchuk --- .github/workflows/docker-jepsen.yml | 2 +- .github/workflows/docker-tests-8.0.yml | 4 +- .github/workflows/docker-tests.yml | 4 +- .github/workflows/unit-tests.yml | 2 +- tests/images/base/Dockerfile | 2 +- tests/images/base/setup.sh | 61 +++++++++++++------------- tests/images/zookeeper/setup.sh | 2 +- tests/testutil/docker_composer.go | 2 +- 8 files changed, 39 insertions(+), 40 deletions(-) diff --git a/.github/workflows/docker-jepsen.yml b/.github/workflows/docker-jepsen.yml index 1e08c2b9..1985556b 100644 --- a/.github/workflows/docker-jepsen.yml +++ b/.github/workflows/docker-jepsen.yml @@ -10,7 +10,7 @@ env: jobs: test: name: jepsen - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Set up Go 1.x uses: actions/setup-go@v5 diff --git a/.github/workflows/docker-tests-8.0.yml b/.github/workflows/docker-tests-8.0.yml index 7b4e85f9..01d3a90a 100644 --- a/.github/workflows/docker-tests-8.0.yml +++ b/.github/workflows/docker-tests-8.0.yml @@ -13,7 +13,7 @@ env: jobs: buildimages: name: Build images - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Check out code into the Go module directory uses: actions/checkout@v4 @@ -35,7 +35,7 @@ jobs: test: name: test - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 needs: [ buildimages ] strategy: matrix: diff --git a/.github/workflows/docker-tests.yml b/.github/workflows/docker-tests.yml index c5b80118..af7c9fb3 100644 --- a/.github/workflows/docker-tests.yml +++ b/.github/workflows/docker-tests.yml @@ -13,7 +13,7 @@ env: jobs: buildimages: name: Build images - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Check out code into the Go module directory uses: actions/checkout@v4 @@ -35,7 +35,7 @@ jobs: test: name: test - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 needs: [ buildimages ] strategy: matrix: diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 92e0db24..8e5add53 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -13,7 +13,7 @@ env: jobs: unittest: name: all_unittests - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Set up Go 1.x uses: actions/setup-go@v5 diff --git a/tests/images/base/Dockerfile b/tests/images/base/Dockerfile index ab7a7995..98ab54bc 100644 --- a/tests/images/base/Dockerfile +++ b/tests/images/base/Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:bionic +FROM ubuntu:jammy ENV container docker ENV DEBIAN_FRONTEND noninteractive ENV ZK_VERSION=3.7.1 diff --git a/tests/images/base/setup.sh b/tests/images/base/setup.sh index 313728ea..aa91d2a1 100644 --- a/tests/images/base/setup.sh +++ b/tests/images/base/setup.sh @@ -1,7 +1,6 @@ - set -xe -cat < /etc/apt/apt.conf.d/01buildconfig +cat </etc/apt/apt.conf.d/01buildconfig APT::Install-Recommends "0"; APT::Get::Assume-Yes "true"; APT::Install-Suggests "0"; @@ -10,41 +9,41 @@ EOF apt-get update apt-get install \ - wget \ - ca-certificates \ - lsb-release \ - gpg-agent \ - apt-utils \ - software-properties-common + wget \ + ca-certificates \ + lsb-release \ + gpg-agent \ + apt-utils \ + software-properties-common -apt-key add - < /var/lib/dist/base/percona.gpg -add-apt-repository 'deb http://mirror.yandex.ru/mirrors/percona/percona/apt bionic main' -add-apt-repository 'deb http://mirror.yandex.ru/mirrors/percona/ps-80/apt bionic main' +apt-key add -