Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve zk hosts in background #77

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewApp(configFile, logLevel string) (*App, error) {

func (app *App) connectDCS() error {
var err error
app.dcs, err = dcs.NewZookeeper(&app.config.Zookeeper, app.logger)
app.dcs, err = dcs.NewZookeeper(app.ctx, &app.config.Zookeeper, app.logger)
if err != nil {
return fmt.Errorf("failed to connect to zkDCS: %s", err.Error())
}
Expand Down
10 changes: 6 additions & 4 deletions internal/dcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type ZookeeperConfig struct {
}

type RandomHostProviderConfig struct {
LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"`
LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"`
LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"`
LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"`
LookupTickInterval time.Duration `config:"lookup_tick_interval" yaml:"lookup_tick_interval"`
}

func DefaultRandomHostProviderConfig() RandomHostProviderConfig {
return RandomHostProviderConfig{
LookupTimeout: 3 * time.Second,
LookupTTL: 300 * time.Second,
LookupTimeout: 3 * time.Second,
LookupTTL: 300 * time.Second,
LookupTickInterval: 60 * time.Second,
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/dcs/zk.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dcs

import (
"context"
"encoding/json"
"fmt"
"log/slog"
Expand Down Expand Up @@ -50,7 +51,7 @@ func retry(config *ZookeeperConfig, operation func() error) error {
}

// NewZookeeper returns Zookeeper based DCS storage
func NewZookeeper(config *ZookeeperConfig, logger *slog.Logger) (DCS, error) {
func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *slog.Logger) (DCS, error) {
if len(config.Hosts) == 0 {
return nil, fmt.Errorf("zookeeper not configured, fill zookeeper/hosts in config")
}
Expand All @@ -72,7 +73,7 @@ func NewZookeeper(config *ZookeeperConfig, logger *slog.Logger) (DCS, error) {

var operation func() error

hostProvider := NewRandomHostProvider(&config.RandomHostProvider, logger)
hostProvider := NewRandomHostProvider(ctx, &config.RandomHostProvider, logger)

if config.UseSSL {
if config.CACert == "" || config.KeyFile == "" || config.CertFile == "" {
Expand Down
181 changes: 106 additions & 75 deletions internal/dcs/zk_host_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,119 +10,150 @@ import (
"time"
)

type zkhost struct {
resolved []string
lastLookup time.Time
}

type RandomHostProvider struct {
lock sync.Mutex
servers []string
resolved []string
tried map[string]struct{}
logger *slog.Logger
lastLookup time.Time
lookupTTL time.Duration
lookupTimeout time.Duration
resolver *net.Resolver
ctx context.Context
hosts sync.Map
hostsKeys []string
tried map[string]struct{}
logger *slog.Logger
lookupTTL time.Duration
lookupTimeout time.Duration
lookupTickInterval time.Duration
resolver *net.Resolver
}

func NewRandomHostProvider(config *RandomHostProviderConfig, logger *slog.Logger) *RandomHostProvider {
func NewRandomHostProvider(ctx context.Context, config *RandomHostProviderConfig, logger *slog.Logger) *RandomHostProvider {
return &RandomHostProvider{
lookupTTL: config.LookupTTL,
lookupTimeout: config.LookupTimeout,
logger: logger,
tried: make(map[string]struct{}),
resolver: &net.Resolver{},
ctx: ctx,
lookupTTL: config.LookupTTL,
lookupTimeout: config.LookupTimeout,
lookupTickInterval: config.LookupTickInterval,
logger: logger,
tried: make(map[string]struct{}),
hosts: sync.Map{},
resolver: &net.Resolver{},
}
}

func (rhp *RandomHostProvider) Init(servers []string) error {
rhp.lock.Lock()
defer rhp.lock.Unlock()
numResolved := 0

rhp.servers = servers

err := rhp.resolveHosts()
for _, host := range servers {
resolved, err := rhp.resolveHost(host)
if err != nil {
rhp.logger.Error(fmt.Sprintf("host definition %s is invalid", host), "error", err)
continue
}
numResolved += len(resolved)
rhp.hosts.Store(host, zkhost{
resolved: resolved,
lastLookup: time.Now(),
})
rhp.hostsKeys = append(rhp.hostsKeys, host)
}

if err != nil {
return fmt.Errorf("failed to init zk host provider %v", err)
if numResolved == 0 {
return fmt.Errorf("unable to resolve any host from %v", servers)
}

go rhp.resolveHosts()

return nil
}

func (rhp *RandomHostProvider) resolveHosts() error {
resolved := []string{}
for _, server := range rhp.servers {
host, port, err := net.SplitHostPort(server)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), rhp.lookupTimeout)
defer cancel()
addrs, err := rhp.resolver.LookupHost(ctx, host)
if err != nil {
rhp.logger.Error(fmt.Sprintf("unable to resolve %s", host), "error", err)
}
for _, addr := range addrs {
resolved = append(resolved, net.JoinHostPort(addr, port))
func (rhp *RandomHostProvider) resolveHosts() {
ticker := time.NewTicker(rhp.lookupTickInterval)
for {
select {
case <-ticker.C:
for _, pair := range rhp.hostsKeys {
host, _ := rhp.hosts.Load(pair)
zhost := host.(zkhost)

if len(zhost.resolved) == 0 || time.Since(zhost.lastLookup) > rhp.lookupTTL {
resolved, err := rhp.resolveHost(pair)
if err != nil || len(resolved) == 0 {
rhp.logger.Error(fmt.Sprintf("background resolve for %s failed", pair), "error", err)
continue
}
rhp.hosts.Store(pair, zkhost{
resolved: resolved,
lastLookup: time.Now(),
})
}
}
case <-rhp.ctx.Done():
return
}
}
}

if len(resolved) == 0 {
return fmt.Errorf("no hosts resolved for %q", rhp.servers)
func (rhp *RandomHostProvider) resolveHost(pair string) ([]string, error) {
var res []string
host, port, err := net.SplitHostPort(pair)
if err != nil {
return res, err
}
ctx, cancel := context.WithTimeout(rhp.ctx, rhp.lookupTimeout)
defer cancel()
addrs, err := rhp.resolver.LookupHost(ctx, host)
if err != nil {
rhp.logger.Error(fmt.Sprintf("unable to resolve %s", host), "error", err)
}
for _, addr := range addrs {
res = append(res, net.JoinHostPort(addr, port))
}

rhp.lastLookup = time.Now()
rhp.resolved = resolved

rand.Shuffle(len(rhp.resolved), func(i, j int) { rhp.resolved[i], rhp.resolved[j] = rhp.resolved[j], rhp.resolved[i] })

return nil
return res, nil
}

func (rhp *RandomHostProvider) Len() int {
rhp.lock.Lock()
defer rhp.lock.Unlock()
return len(rhp.resolved)
return len(rhp.hostsKeys)
}

func (rhp *RandomHostProvider) Next() (server string, retryStart bool) {
rhp.lock.Lock()
defer rhp.lock.Unlock()
lastTime := time.Since(rhp.lastLookup)
needRetry := false
if lastTime > rhp.lookupTTL {
err := rhp.resolveHosts()
if err != nil {
rhp.logger.Error("resolve zk hosts failed", "error", err)
}
}

notTried := []string{}
var ret string

for _, addr := range rhp.resolved {
if _, ok := rhp.tried[addr]; !ok {
notTried = append(notTried, addr)
for len(ret) == 0 {
notTried := []string{}

for _, host := range rhp.hostsKeys {
if _, ok := rhp.tried[host]; !ok {
notTried = append(notTried, host)
}
}
}

var selected string
var selected string
if len(notTried) == 0 {
needRetry = true
for k := range rhp.tried {
delete(rhp.tried, k)
}
selected = rhp.hostsKeys[rand.Intn(len(rhp.hostsKeys))]
} else {
selected = notTried[rand.Intn(len(notTried))]
}
rhp.tried[selected] = struct{}{}

host, _ := rhp.hosts.Load(selected)
zhost := host.(zkhost)

if len(notTried) == 0 {
needRetry = true
for k := range rhp.tried {
delete(rhp.tried, k)
if len(zhost.resolved) > 0 {
ret = zhost.resolved[rand.Intn(len(zhost.resolved))]
}
selected = rhp.resolved[rand.Intn(len(rhp.resolved))]
} else {
selected = notTried[rand.Intn(len(notTried))]
}

rhp.tried[selected] = struct{}{}

return selected, needRetry
return ret, needRetry
}

func (rhp *RandomHostProvider) Connected() {
rhp.lock.Lock()
defer rhp.lock.Unlock()
for k := range rhp.tried {
delete(rhp.tried, k)
}
Expand Down
Loading