From 6e1cbdaa78939b1c753fc4e53cbb47ea967141ad Mon Sep 17 00:00:00 2001 From: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com> Date: Tue, 19 Nov 2024 10:45:46 +0100 Subject: [PATCH] bump dskit to v0.0.0-20241115082728-f2a7eb3aa0e9 to leverage benefits for context causes for DoBatch calls. See https://github.com/grafana/dskit/issues/576 Signed-off-by: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com> --- CHANGELOG.md | 1 + cmd/tempo-serverless/cloud-run/go.mod | 2 +- cmd/tempo-serverless/cloud-run/go.sum | 4 +- cmd/tempo-serverless/lambda/go.mod | 2 +- cmd/tempo-serverless/lambda/go.sum | 4 +- go.mod | 2 +- go.sum | 2 + modules/distributor/distributor_test.go | 16 +- .../grafana/dskit/concurrency/runner.go | 5 +- .../grafana/dskit/concurrency/worker.go | 21 +- .../grafana/dskit/grpcclient/grpcclient.go | 26 +- .../grafana/dskit/httpgrpc/httpgrpc.go | 13 +- .../grafana/dskit/httpgrpc/server/server.go | 2 +- .../grafana/dskit/internal/math/math.go | 9 - .../dskit/kv/memberlist/memberlist_client.go | 115 +++++++-- .../dskit/kv/memberlist/tcp_transport.go | 110 +++++++-- vendor/github.com/grafana/dskit/kv/mock.go | 61 +++++ .../grafana/dskit/middleware/http_tracing.go | 5 + .../grafana/dskit/middleware/logging.go | 25 +- .../grafana/dskit/ring/basic_lifecycler.go | 6 +- vendor/github.com/grafana/dskit/ring/batch.go | 6 +- .../grafana/dskit/ring/lifecycler.go | 95 +++++-- .../grafana/dskit/ring/lifecycler_metrics.go | 6 + vendor/github.com/grafana/dskit/ring/model.go | 99 +++++++- .../dskit/ring/partition_instance_ring.go | 16 +- .../grafana/dskit/ring/replication_set.go | 2 +- vendor/github.com/grafana/dskit/ring/ring.go | 232 ++++++++++++++---- .../github.com/grafana/dskit/ring/ring.pb.go | 155 +++++++++--- .../github.com/grafana/dskit/ring/ring.proto | 11 + .../grafana/dskit/ring/ring_http.go | 46 ++-- .../grafana/dskit/ring/ring_status.gohtml | 11 +- .../grafana/dskit/ring/shard/shard.go | 3 + .../grafana/dskit/runtimeconfig/manager.go | 32 ++- .../github.com/grafana/dskit/server/server.go | 8 +- .../grafana/dskit/services/basic_service.go | 48 +++- .../grafana/dskit/services/failure_watcher.go | 53 +++- .../grafana/dskit/services/manager.go | 56 ++++- .../grafana/dskit/services/service.go | 5 +- .../grafana/dskit/spanlogger/spanlogger.go | 84 ++++--- .../grpc/experimental/experimental.go | 65 ----- vendor/modules.txt | 4 +- 41 files changed, 1132 insertions(+), 336 deletions(-) delete mode 100644 vendor/github.com/grafana/dskit/internal/math/math.go delete mode 100644 vendor/google.golang.org/grpc/experimental/experimental.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b9242a2ab06..a393c5f3817 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [FEATURE] TraceQL support for instrumentation scope [#3967](https://github.com/grafana/tempo/pull/3967) (@ie-pham) * [FEATURE] Export cost attribution usage metrics from distributor [#4162](https://github.com/grafana/tempo/pull/4162) (@mdisibio) * [FEATURE] TraceQL metrics: avg_over_time [#4073](https://github.com/grafana/tempo/pull/4073) (@javiermolinar) +* [ENHANCEMENT] Update to the latest dskit [#4341](https://github.com/grafana/tempo/pull/4341) (@dastrobu) * [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137) * [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero) * [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210) diff --git a/cmd/tempo-serverless/cloud-run/go.mod b/cmd/tempo-serverless/cloud-run/go.mod index d0fed2b6eb2..e12856b14b0 100644 --- a/cmd/tempo-serverless/cloud-run/go.mod +++ b/cmd/tempo-serverless/cloud-run/go.mod @@ -50,7 +50,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gorilla/mux v1.8.1 // indirect - github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 // indirect + github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 // indirect github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect diff --git a/cmd/tempo-serverless/cloud-run/go.sum b/cmd/tempo-serverless/cloud-run/go.sum index 22c145f16b0..751dd41e98b 100644 --- a/cmd/tempo-serverless/cloud-run/go.sum +++ b/cmd/tempo-serverless/cloud-run/go.sum @@ -139,8 +139,8 @@ github.com/googleapis/gax-go/v2 v2.13.0 h1:yitjD5f7jQHhyDsnhKEBU52NdvvdSeGzlAnDP github.com/googleapis/gax-go/v2 v2.13.0/go.mod h1:Z/fvTZXF8/uw7Xu5GuslPw+bplx6SS338j1Is2S+B7A= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 h1:hcVxOw584ov31MyEAuNNGjrj4vPZX626F39zuOtWQs0= -github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382/go.mod h1:lcjGB6SuaZ2o44A9nD6p/tR4QXSPbzViRY520Gy6pTQ= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 h1:Dx7+6aU/fhwD2vkMr0PUcyxGat1sjUssHAKQKaS7sDM= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= diff --git a/cmd/tempo-serverless/lambda/go.mod b/cmd/tempo-serverless/lambda/go.mod index d488de58e40..b8ccb0b0940 100644 --- a/cmd/tempo-serverless/lambda/go.mod +++ b/cmd/tempo-serverless/lambda/go.mod @@ -53,7 +53,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gorilla/mux v1.8.1 // indirect - github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 // indirect + github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 // indirect github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect diff --git a/cmd/tempo-serverless/lambda/go.sum b/cmd/tempo-serverless/lambda/go.sum index 259a30d11d9..e1a77a7d813 100644 --- a/cmd/tempo-serverless/lambda/go.sum +++ b/cmd/tempo-serverless/lambda/go.sum @@ -143,8 +143,8 @@ github.com/googleapis/gax-go/v2 v2.13.0 h1:yitjD5f7jQHhyDsnhKEBU52NdvvdSeGzlAnDP github.com/googleapis/gax-go/v2 v2.13.0/go.mod h1:Z/fvTZXF8/uw7Xu5GuslPw+bplx6SS338j1Is2S+B7A= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 h1:hcVxOw584ov31MyEAuNNGjrj4vPZX626F39zuOtWQs0= -github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382/go.mod h1:lcjGB6SuaZ2o44A9nD6p/tR4QXSPbzViRY520Gy6pTQ= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 h1:Dx7+6aU/fhwD2vkMr0PUcyxGat1sjUssHAKQKaS7sDM= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= diff --git a/go.mod b/go.mod index d51195234f4..95b7e563849 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 + github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 github.com/grafana/e2e v0.1.1 github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-plugin v1.6.0 // indirect diff --git a/go.sum b/go.sum index ac6867461ad..7e9295e0db5 100644 --- a/go.sum +++ b/go.sum @@ -415,6 +415,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 h1:hcVxOw584ov31MyEAuNNGjrj4vPZX626F39zuOtWQs0= github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382/go.mod h1:lcjGB6SuaZ2o44A9nD6p/tR4QXSPbzViRY520Gy6pTQ= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 h1:Dx7+6aU/fhwD2vkMr0PUcyxGat1sjUssHAKQKaS7sDM= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc= github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index 95ad35e5ce2..87fe761e339 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "flag" "fmt" "maps" @@ -1082,11 +1083,12 @@ func TestLogDiscardedSpansWhenContextCancelled(t *testing.T) { } traces := batchesToTraces(t, tc.batches) - ctx, cancelFunc := context.WithCancel(ctx) - cancelFunc() // cancel to force all spans to be discarded + ctx, cancelFunc := context.WithCancelCause(ctx) + cause := errors.New("test cause") + cancelFunc(cause) // cancel to force all spans to be discarded _, err := d.PushTraces(ctx, traces) - assert.ErrorContains(t, err, "context canceled") + assert.Equal(t, cause, err) assert.ElementsMatch(t, tc.expectedLogsSpan, actualLogSpan(t, buf)) }) @@ -1664,6 +1666,14 @@ type mockRing struct { replicationFactor uint32 } +func (r mockRing) WritableInstancesWithTokensCount() int { + panic("implement me if required for testing") +} + +func (r mockRing) WritableInstancesWithTokensInZoneCount(string) int { + panic("implement me if required for testing") +} + var _ ring.ReadRing = (*mockRing)(nil) func (r mockRing) Get(key uint32, _ ring.Operation, buf []ring.InstanceDesc, _, _ []string) (ring.ReplicationSet, error) { diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index fcc89299714..f3ee57c857f 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -7,7 +7,6 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "github.com/grafana/dskit/internal/math" "github.com/grafana/dskit/multierror" ) @@ -31,7 +30,7 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun errsMx := sync.Mutex{} wg := sync.WaitGroup{} - for ix := 0; ix < math.Min(concurrency, len(userIDs)); ix++ { + for ix := 0; ix < min(concurrency, len(userIDs)); ix++ { wg.Add(1) go func() { defer wg.Done() @@ -108,7 +107,7 @@ func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, jobs); ix++ { + for ix := 0; ix < min(concurrency, jobs); ix++ { g.Go(func() error { for ctx.Err() == nil { idx := int(indexes.Inc()) diff --git a/vendor/github.com/grafana/dskit/concurrency/worker.go b/vendor/github.com/grafana/dskit/concurrency/worker.go index f40f0334800..10a59e60023 100644 --- a/vendor/github.com/grafana/dskit/concurrency/worker.go +++ b/vendor/github.com/grafana/dskit/concurrency/worker.go @@ -5,12 +5,18 @@ package concurrency // If all workers are busy, Go() will spawn a new goroutine to run the workload. func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { p := &ReusableGoroutinesPool{ - jobs: make(chan func()), + jobs: make(chan func()), + closed: make(chan struct{}), } for i := 0; i < size; i++ { go func() { - for f := range p.jobs { - f() + for { + select { + case f := <-p.jobs: + f() + case <-p.closed: + return + } } }() } @@ -18,7 +24,8 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { } type ReusableGoroutinesPool struct { - jobs chan func() + jobs chan func() + closed chan struct{} } // Go will run the given function in a worker of the pool. @@ -32,7 +39,9 @@ func (p *ReusableGoroutinesPool) Go(f func()) { } // Close stops the workers of the pool. -// No new Do() calls should be performed after calling Close(). +// No new Go() calls should be performed after calling Close(). // Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. // Close is intended to be used in tests to ensure that no goroutines are leaked. -func (p *ReusableGoroutinesPool) Close() { close(p.jobs) } +func (p *ReusableGoroutinesPool) Close() { + close(p.closed) +} diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 75189904715..a8f728c61e2 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -2,6 +2,8 @@ package grpcclient import ( "flag" + "slices" + "strings" "time" "github.com/pkg/errors" @@ -40,6 +42,9 @@ type Config struct { Middleware []grpc.UnaryClientInterceptor `yaml:"-"` StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"` + + // CustomCompressors allows configuring custom compressors. + CustomCompressors []string `yaml:"-"` } // RegisterFlags registers flags. @@ -55,9 +60,19 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.InitialStreamWindowSize = defaultInitialWindowSize cfg.InitialConnectionWindowSize = defaultInitialWindowSize + var supportedCompressors strings.Builder + supportedCompressors.WriteString("Use compression when sending messages. Supported values are: 'gzip', 'snappy'") + for _, cmp := range cfg.CustomCompressors { + supportedCompressors.WriteString(", ") + supportedCompressors.WriteString("'") + supportedCompressors.WriteString(cmp) + supportedCompressors.WriteString("'") + } + supportedCompressors.WriteString(" and '' (disable compression)") + f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).") - f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", supportedCompressors.String()) f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.") @@ -74,11 +89,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } func (cfg *Config) Validate() error { - switch cfg.GRPCCompression { - case gzip.Name, snappy.Name, "": - // valid - default: - return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) + supportedCompressors := []string{gzip.Name, snappy.Name, ""} + supportedCompressors = append(supportedCompressors, cfg.CustomCompressors...) + if !slices.Contains(supportedCompressors, cfg.GRPCCompression) { + return errors.Errorf("unsupported compression type: %q", cfg.GRPCCompression) } return nil } diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index 02e6e493736..616023899b7 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -106,16 +106,23 @@ func FromHeader(hs http.Header) []*Header { return result } -// Errorf returns a HTTP gRPC error than is correctly forwarded over +// Error returns a HTTP gRPC error that is correctly forwarded over // gRPC, and can eventually be converted back to a HTTP response with // HTTPResponseFromError. -func Errorf(code int, tmpl string, args ...interface{}) error { +func Error(code int, msg string) error { return ErrorFromHTTPResponse(&HTTPResponse{ Code: int32(code), - Body: []byte(fmt.Sprintf(tmpl, args...)), + Body: []byte(msg), }) } +// Errorf returns a HTTP gRPC error that is correctly forwarded over +// gRPC, and can eventually be converted back to a HTTP response with +// HTTPResponseFromError. +func Errorf(code int, tmpl string, args ...interface{}) error { + return Error(code, fmt.Sprintf(tmpl, args...)) +} + // ErrorFromHTTPResponse converts an HTTP response into a grpc error, and uses HTTP response body as an error message. // Note that if HTTP response body contains non-utf8 string, then returned error cannot be marshalled by protobuf. func ErrorFromHTTPResponse(resp *HTTPResponse) error { diff --git a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go index 6a831dac0f8..935ec0fc5e3 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go @@ -186,7 +186,7 @@ func NewClient(address string) (*Client, error) { ), } - conn, err := grpc.Dial(address, dialOptions...) + conn, err := grpc.NewClient(address, dialOptions...) if err != nil { return nil, err } diff --git a/vendor/github.com/grafana/dskit/internal/math/math.go b/vendor/github.com/grafana/dskit/internal/math/math.go deleted file mode 100644 index 9d6422e50e3..00000000000 --- a/vendor/github.com/grafana/dskit/internal/math/math.go +++ /dev/null @@ -1,9 +0,0 @@ -package math - -// Min returns the minimum of two ints. -func Min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index a7eefe92fc2..452798e04e7 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -137,6 +137,7 @@ type KVConfig struct { GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` EnableCompression bool `yaml:"compression_enabled" category:"advanced"` + NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") + f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") @@ -251,6 +253,10 @@ type KV struct { watchers map[string][]chan string prefixWatchers map[string][]chan string + // Delayed notifications for watchers + notifMu sync.Mutex + keyNotifications map[string]struct{} + // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - workersChannels: make(map[string]chan valueUpdate), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + keyNotifications: make(map[string]struct{}), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error { return errFailedToJoinCluster } + if m.cfg.NotifyInterval > 0 { + // Start delayed key notifications. + notifTicker := time.NewTicker(m.cfg.NotifyInterval) + defer notifTicker.Stop() + go m.monitorKeyNotifications(ctx, notifTicker.C) + } + var tickerChan <-chan time.Time if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 { t := time.NewTicker(m.cfg.RejoinInterval) @@ -552,7 +566,7 @@ func (m *KV) fastJoinMembersOnStartup(ctx context.Context) { for toJoin > 0 && len(nodes) > 0 && ctx.Err() == nil { reached, err := m.memberlist.Join(nodes[0:1]) // Try to join single node only. if err != nil { - level.Debug(m.logger).Log("msg", "fast-joining node failed", "node", nodes[0], "err", err) + level.Info(m.logger).Log("msg", "fast-joining node failed", "node", nodes[0], "err", err) } totalJoined += reached @@ -905,7 +919,59 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st } } +// notifyWatchers sends notification to all watchers of given key. If delay is +// enabled, it accumulates them for later sending. func (m *KV) notifyWatchers(key string) { + if m.cfg.NotifyInterval <= 0 { + m.notifyWatchersSync(key) + return + } + + m.notifMu.Lock() + defer m.notifMu.Unlock() + m.keyNotifications[key] = struct{}{} +} + +// monitorKeyNotifications sends accumulated notifications to all watchers of +// respective keys when the given channel ticks. +func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { + if m.cfg.NotifyInterval <= 0 { + panic("sendNotifications called with NotifyInterval <= 0") + } + + for { + select { + case <-tickChan: + m.sendKeyNotifications() + case <-ctx.Done(): + return + } + } +} + +// sendKeyNotifications sends accumulated notifications to watchers of respective keys. +func (m *KV) sendKeyNotifications() { + newNotifs := func() map[string]struct{} { + // Grab and clear accumulated notifications. + m.notifMu.Lock() + defer m.notifMu.Unlock() + + if len(m.keyNotifications) == 0 { + return nil + } + newMap := make(map[string]struct{}) + notifs := m.keyNotifications + m.keyNotifications = newMap + return notifs + } + + for key := range newNotifs() { + m.notifyWatchersSync(key) + } +} + +// notifyWatcherSync immediately sends notification to all watchers of given key. +func (m *KV) notifyWatchersSync(key string) { m.watchersMu.Lock() defer m.watchersMu.Unlock() @@ -1018,14 +1084,16 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) } // Don't even try - r, ok := out.(Mergeable) - if !ok || r == nil { + incomingValue, ok := out.(Mergeable) + if !ok || incomingValue == nil { return nil, 0, retry, fmt.Errorf("invalid type: %T, expected Mergeable", out) } // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. - change, newver, err := m.mergeValueForKey(key, r, ver, codec) + // Supplied function may have kept a reference to the returned "incoming value". + // If KV store will keep this value as well, it needs to make a clone. + change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec) if err == errVersionMismatch { return nil, 0, retry, err } @@ -1379,14 +1447,15 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec. return nil, 0, fmt.Errorf("expected Mergeable, got: %T", decodedValue) } - return m.mergeValueForKey(key, incomingValue, 0, codec) + // No need to clone this "incomingValue", since we have just decoded it from bytes, and won't be using it. + return m.mergeValueForKey(key, incomingValue, false, 0, codec) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1398,7 +1467,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui if casVersion > 0 && curr.Version != casVersion { return nil, 0, errVersionMismatch } - result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) + result, change, err := computeNewValue(incomingValue, incomingValueRequiresClone, curr.value, casVersion > 0) if err != nil { return nil, 0, err } @@ -1441,8 +1510,16 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui } // returns [result, change, error] -func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) { +func computeNewValue(incoming Mergeable, incomingValueRequiresClone bool, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) { if oldVal == nil { + // It's OK to return the same value twice (once as result, once as change), because "change" will be cloned + // in mergeValueForKey if needed. + + if incomingValueRequiresClone { + clone := incoming.Clone() + return clone, clone, nil + } + return incoming, incoming, nil } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index 751ad1163a9..241d25b7174 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" @@ -52,7 +51,13 @@ type TCPTransportConfig struct { // Timeout for writing packet data. Zero = no timeout. PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` - // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on + // Maximum number of concurrent writes to other nodes. + MaxConcurrentWrites int `yaml:"max_concurrent_writes" category:"advanced"` + + // Timeout for acquiring one of the concurrent write slots. + AcquireWriterTimeout time.Duration `yaml:"acquire_writer_timeout" category:"advanced"` + + // Transport logs lots of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-" category:"advanced"` // Where to put custom metrics. nil = don't register. @@ -73,12 +78,19 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.") f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 2*time.Second, "Timeout used when connecting to other nodes to send packet.") f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.") + f.IntVar(&cfg.MaxConcurrentWrites, prefix+"memberlist.max-concurrent-writes", 3, "Maximum number of concurrent writes to other nodes.") + f.DurationVar(&cfg.AcquireWriterTimeout, prefix+"memberlist.acquire-writer-timeout", 250*time.Millisecond, "Timeout for acquiring one of the concurrent write slots. After this time, the message will be dropped.") f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.") f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.") cfg.TLS.RegisterFlagsWithPrefix(prefix+"memberlist", f) } +type writeRequest struct { + b []byte + addr string +} + // TCPTransport is a memberlist.Transport implementation that uses TCP for both packet and stream // operations ("packet" and "stream" are terms used by memberlist). // It uses a new TCP connections for each operation. There is no connection reuse. @@ -91,7 +103,11 @@ type TCPTransport struct { tcpListeners []net.Listener tlsConfig *tls.Config - shutdown atomic.Int32 + shutdownMu sync.RWMutex + shutdown bool + writeCh chan writeRequest // this channel is protected by shutdownMu + + writeWG sync.WaitGroup advertiseMu sync.RWMutex advertiseAddr string @@ -107,6 +123,7 @@ type TCPTransport struct { sentPackets prometheus.Counter sentPacketsBytes prometheus.Counter sentPacketsErrors prometheus.Counter + droppedPackets prometheus.Counter unknownConnections prometheus.Counter } @@ -119,11 +136,21 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr // Build out the new transport. var ok bool + concurrentWrites := config.MaxConcurrentWrites + if concurrentWrites <= 0 { + concurrentWrites = 1 + } t := TCPTransport{ cfg: config, logger: log.With(logger, "component", "memberlist TCPTransport"), packetCh: make(chan *memberlist.Packet), connCh: make(chan net.Conn), + writeCh: make(chan writeRequest), + } + + for i := 0; i < concurrentWrites; i++ { + t.writeWG.Add(1) + go t.writeWorker() } var err error @@ -205,7 +232,10 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) { for { conn, err := tcpLn.Accept() if err != nil { - if s := t.shutdown.Load(); s == 1 { + t.shutdownMu.RLock() + isShuttingDown := t.shutdown + t.shutdownMu.RUnlock() + if isShuttingDown { break } @@ -424,29 +454,50 @@ func (t *TCPTransport) getAdvertisedAddr() string { // WriteTo is a packet-oriented interface that fires off the given // payload to the given address. func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { - t.sentPackets.Inc() - t.sentPacketsBytes.Add(float64(len(b))) - - err := t.writeTo(b, addr) - if err != nil { - t.sentPacketsErrors.Inc() - - logLevel := level.Warn(t.logger) - if strings.Contains(err.Error(), "connection refused") { - // The connection refused is a common error that could happen during normal operations when a node - // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. - logLevel = t.debugLog() - } - logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) + t.shutdownMu.RLock() + defer t.shutdownMu.RUnlock() // Unlock at the end to protect the chan + if t.shutdown { + return time.Time{}, errors.New("transport is shutting down") + } + // Send the packet to the write workers + // If this blocks for too long (as configured), abort and log an error. + select { + case <-time.After(t.cfg.AcquireWriterTimeout): + // Dropped packets are not an issue, the memberlist protocol will retry later. + level.Debug(t.logger).Log("msg", "WriteTo failed to acquire a writer. Dropping message", "timeout", t.cfg.AcquireWriterTimeout, "addr", addr) + t.droppedPackets.Inc() // WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors, // but memberlist library doesn't seem to cope with that very well. That is why we return nil instead. return time.Now(), nil + case t.writeCh <- writeRequest{b: b, addr: addr}: + // OK } return time.Now(), nil } +func (t *TCPTransport) writeWorker() { + defer t.writeWG.Done() + for req := range t.writeCh { + b, addr := req.b, req.addr + t.sentPackets.Inc() + t.sentPacketsBytes.Add(float64(len(b))) + err := t.writeTo(b, addr) + if err != nil { + t.sentPacketsErrors.Inc() + + logLevel := level.Warn(t.logger) + if strings.Contains(err.Error(), "connection refused") { + // The connection refused is a common error that could happen during normal operations when a node + // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. + logLevel = t.debugLog() + } + logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) + } + } +} + func (t *TCPTransport) writeTo(b []byte, addr string) error { // Open connection, write packet header and data, data hash, close. Simple. c, err := t.getConnection(addr, t.cfg.PacketDialTimeout) @@ -559,17 +610,31 @@ func (t *TCPTransport) StreamCh() <-chan net.Conn { // Shutdown is called when memberlist is shutting down; this gives the // transport a chance to clean up any listeners. +// This will avoid log spam about errors when we shut down. func (t *TCPTransport) Shutdown() error { + t.shutdownMu.Lock() // This will avoid log spam about errors when we shut down. - t.shutdown.Store(1) + if t.shutdown { + t.shutdownMu.Unlock() + return nil // already shut down + } + + // Set the shutdown flag and close the write channel. + t.shutdown = true + close(t.writeCh) + t.shutdownMu.Unlock() // Rip through all the connections and shut them down. for _, conn := range t.tcpListeners { _ = conn.Close() } + // Wait until all write workers have finished. + t.writeWG.Wait() + // Block until all the listener threads have died. t.wg.Wait() + return nil } @@ -618,6 +683,13 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) { Help: "Number of errors when receiving memberlist packets", }) + t.droppedPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: t.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: "packets_dropped_total", + Help: "Number of dropped memberlist packets. These packets were not sent due to timeout waiting for a writer.", + }) + t.sentPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, diff --git a/vendor/github.com/grafana/dskit/kv/mock.go b/vendor/github.com/grafana/dskit/kv/mock.go index 59d7430676c..99c84e58d6e 100644 --- a/vendor/github.com/grafana/dskit/kv/mock.go +++ b/vendor/github.com/grafana/dskit/kv/mock.go @@ -5,6 +5,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "go.uber.org/atomic" ) // The mockClient does not anything. @@ -37,3 +38,63 @@ func (m mockClient) WatchKey(_ context.Context, _ string, _ func(interface{}) bo func (m mockClient) WatchPrefix(_ context.Context, _ string, _ func(string, interface{}) bool) { } + +// MockCountingClient is a wrapper around the Client interface that counts the number of times its functions are called. +// This is used for testing only. +type MockCountingClient struct { + client Client + + ListCalls *atomic.Uint32 + GetCalls *atomic.Uint32 + DeleteCalls *atomic.Uint32 + CASCalls *atomic.Uint32 + WatchKeyCalls *atomic.Uint32 + WatchPrefixCalls *atomic.Uint32 +} + +func NewMockCountingClient(client Client) *MockCountingClient { + return &MockCountingClient{ + client: client, + ListCalls: atomic.NewUint32(0), + GetCalls: atomic.NewUint32(0), + DeleteCalls: atomic.NewUint32(0), + CASCalls: atomic.NewUint32(0), + WatchKeyCalls: atomic.NewUint32(0), + WatchPrefixCalls: atomic.NewUint32(0), + } +} + +func (mc *MockCountingClient) List(ctx context.Context, prefix string) ([]string, error) { + mc.ListCalls.Inc() + + return mc.client.List(ctx, prefix) +} +func (mc *MockCountingClient) Get(ctx context.Context, key string) (interface{}, error) { + mc.GetCalls.Inc() + + return mc.client.Get(ctx, key) +} + +func (mc *MockCountingClient) Delete(ctx context.Context, key string) error { + mc.DeleteCalls.Inc() + + return mc.client.Delete(ctx, key) +} + +func (mc *MockCountingClient) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { + mc.CASCalls.Inc() + + return mc.client.CAS(ctx, key, f) +} + +func (mc *MockCountingClient) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { + mc.WatchKeyCalls.Inc() + + mc.client.WatchKey(ctx, key, f) +} + +func (mc *MockCountingClient) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) { + mc.WatchPrefixCalls.Inc() + + mc.client.WatchPrefix(ctx, key, f) +} diff --git a/vendor/github.com/grafana/dskit/middleware/http_tracing.go b/vendor/github.com/grafana/dskit/middleware/http_tracing.go index d75535ebe38..b7dfe2d59fc 100644 --- a/vendor/github.com/grafana/dskit/middleware/http_tracing.go +++ b/vendor/github.com/grafana/dskit/middleware/http_tracing.go @@ -38,6 +38,11 @@ func (t Tracer) Wrap(next http.Handler) http.Handler { sp.SetTag("http.user_agent", userAgent) } + // add the content type, useful when query requests are sent as POST + if ct := r.Header.Get("Content-Type"); ct != "" { + sp.SetTag("http.content_type", ct) + } + // add a tag with the client's sourceIPs to the span, if a // SourceIPExtractor is given. if t.SourceIPs != nil { diff --git a/vendor/github.com/grafana/dskit/middleware/logging.go b/vendor/github.com/grafana/dskit/middleware/logging.go index c2306292b3f..920976b3ce2 100644 --- a/vendor/github.com/grafana/dskit/middleware/logging.go +++ b/vendor/github.com/grafana/dskit/middleware/logging.go @@ -94,14 +94,25 @@ func (l Log) Wrap(next http.Handler) http.Handler { if writeErr != nil { if errors.Is(writeErr, context.Canceled) { if l.LogRequestAtInfoLevel { - level.Info(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)) + if l.LogRequestHeaders && headers != nil { + level.Info(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)) + } else { + level.Info(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, request cancelled: %s ws: %v", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r))) + } } else { - level.Debug(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)) + if l.LogRequestHeaders && headers != nil { + level.Debug(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)) + } else { + level.Debug(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, request cancelled: %s ws: %v", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r))) + } } } else { - level.Warn(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)) + if l.LogRequestHeaders && headers != nil { + level.Warn(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)) + } else { + level.Warn(requestLog).Log("msg", dskit_log.LazySprintf("%s %s %s, error: %s ws: %v", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r))) + } } - return } @@ -125,7 +136,11 @@ func (l Log) Wrap(next http.Handler) http.Handler { } } default: - level.Warn(requestLog).Log("msg", dskit_log.LazySprintf("%s %s (%d) %s Response: %q ws: %v; %s", r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers)) + if l.LogRequestHeaders && headers != nil { + level.Warn(requestLog).Log("msg", dskit_log.LazySprintf("%s %s (%d) %s Response: %q ws: %v; %s", r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers)) + } else { + level.Warn(requestLog).Log("msg", dskit_log.LazySprintf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))) + } } }) } diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 0b72ef17117..1675cafac92 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -73,6 +73,8 @@ Rather, it's the delegate's responsibility to call [BasicLifecycler.ChangeState] - The lifecycler will then periodically, based on the [ring.BasicLifecyclerConfig.TokensObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will call [ring.BasicLifecyclerDelegate.OnRingInstanceTokens]. - The lifecycler will update they key/value store with heartbeats and state changes based on the [ring.BasicLifecyclerConfig.HeartbeatPeriod], calling [ring.BasicLifecyclerDelegate.OnRingInstanceHeartbeat] each time. - When the BasicLifecycler is stopped, it will call [ring.BasicLifecyclerDelegate.OnRingInstanceStopping]. + +BasicLifecycler does not support read only instances for now. */ type BasicLifecycler struct { *services.BasicService @@ -316,7 +318,7 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error { // Always overwrite the instance in the ring (even if already exists) because some properties // may have changed (stated, tokens, zone, address) and even if they didn't the heartbeat at // least did. - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt) + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt, false, time.Time{}) return ringDesc, true, nil }) @@ -443,7 +445,7 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc, // a resharding of tenants among instances: to guarantee query correctness we need to update the // registration timestamp to current time. registeredAt := time.Now() - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt) + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt, false, time.Time{}) } prevTimestamp := instanceDesc.Timestamp diff --git a/vendor/github.com/grafana/dskit/ring/batch.go b/vendor/github.com/grafana/dskit/ring/batch.go index f982bd6c68c..e107cab830f 100644 --- a/vendor/github.com/grafana/dskit/ring/batch.go +++ b/vendor/github.com/grafana/dskit/ring/batch.go @@ -131,7 +131,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [ // Get call below takes ~1 microsecond for ~500 instances. // Checking every 10K calls would be every 10ms. if i%10e3 == 0 { - if err := ctx.Err(); err != nil { + if err := context.Cause(ctx); err != nil { o.Cleanup() return err } @@ -161,7 +161,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [ } // One last check before calling the callbacks: it doesn't make sense if context is canceled. - if err := ctx.Err(); err != nil { + if err := context.Cause(ctx); err != nil { o.Cleanup() return err } @@ -196,7 +196,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [ case <-tracker.done: return nil case <-ctx.Done(): - return ctx.Err() + return context.Cause(ctx) } } diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 7c54eabdd87..083f112bdf1 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -147,10 +147,12 @@ type Lifecycler struct { // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. - stateMtx sync.RWMutex - state InstanceState - tokens Tokens - registeredAt time.Time + stateMtx sync.RWMutex + state InstanceState + tokens Tokens + registeredAt time.Time + readOnly bool + readOnlyLastUpdated time.Time // Controls the ready-reporting readyLock sync.Mutex @@ -161,6 +163,7 @@ type Lifecycler struct { countersLock sync.RWMutex healthyInstancesCount int instancesCount int + readOnlyInstancesCount int healthyInstancesInZoneCount int instancesInZoneCount int zonesCount int @@ -349,6 +352,26 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error return <-errCh } +func (i *Lifecycler) ChangeReadOnlyState(ctx context.Context, readOnly bool) error { + errCh := make(chan error) + fn := func() { + prevReadOnly, _ := i.GetReadOnlyState() + if prevReadOnly == readOnly { + errCh <- nil + return + } + + level.Info(i.logger).Log("msg", "changing read-only state of instance in the ring", "readOnly", readOnly, "ring", i.RingName) + i.setReadOnlyState(readOnly, time.Now()) + errCh <- i.updateConsul(ctx) + } + + if err := i.sendToLifecyclerLoop(fn); err != nil { + return err + } + return <-errCh +} + func (i *Lifecycler) getTokens() Tokens { i.stateMtx.RLock() defer i.stateMtx.RUnlock() @@ -379,6 +402,26 @@ func (i *Lifecycler) setRegisteredAt(registeredAt time.Time) { i.registeredAt = registeredAt } +// GetReadOnlyState returns the read-only state of this instance -- whether instance is read-only, and when what the last +// update of read-only state (possibly zero). +func (i *Lifecycler) GetReadOnlyState() (bool, time.Time) { + i.stateMtx.RLock() + defer i.stateMtx.RUnlock() + return i.readOnly, i.readOnlyLastUpdated +} + +func (i *Lifecycler) setReadOnlyState(readOnly bool, readOnlyLastUpdated time.Time) { + i.stateMtx.Lock() + defer i.stateMtx.Unlock() + i.readOnly = readOnly + i.readOnlyLastUpdated = readOnlyLastUpdated + if readOnly { + i.lifecyclerMetrics.readonly.Set(1) + } else { + i.lifecyclerMetrics.readonly.Set(0) + } +} + // ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. // // For this method to work correctly (especially when using gossiping), source ingester (specified by @@ -442,6 +485,14 @@ func (i *Lifecycler) InstancesCount() int { return i.instancesCount } +// ReadOnlyInstancesCount returns the total number of instances in the ring that are read only, updated during the last heartbeat period. +func (i *Lifecycler) ReadOnlyInstancesCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.readOnlyInstancesCount +} + // HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in // this lifecycler's zone, updated during the last heartbeat period. func (i *Lifecycler) HealthyInstancesInZoneCount() int { @@ -629,10 +680,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error { instanceDesc, ok := ringDesc.Ingesters[i.ID] if !ok { - // The instance doesn't exist in the ring, so it's safe to set the registered timestamp - // as of now. - registeredAt := time.Now() - i.setRegisteredAt(registeredAt) + now := time.Now() + // The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now. + i.setRegisteredAt(now) + // Clear read-only state, and set last update time to "zero". + i.setReadOnlyState(false, time.Time{}) // We use the tokens from the file only if it does not exist in the ring yet. if len(tokensFromFile) > 0 { @@ -640,14 +692,16 @@ func (i *Lifecycler) initRing(ctx context.Context) error { if len(tokensFromFile) >= i.cfg.NumTokens { i.setState(ACTIVE) } - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), i.getRegisteredAt(), ro, rots) i.setTokens(tokensFromFile) return ringDesc, true, nil } // Either we are a new ingester, or consul must have restarted level.Info(i.logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), i.getRegisteredAt(), ro, rots) return ringDesc, true, nil } @@ -655,6 +709,9 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // but we need to update the local state accordingly. i.setRegisteredAt(instanceDesc.GetRegisteredAt()) + // Set lifecycler read-only state from ring entry. We will not modify ring entry's read-only state. + i.setReadOnlyState(instanceDesc.GetReadOnlyState()) + // If the ingester is in the JOINING state this means it crashed due to // a failed token transfer or some other reason during startup. We want // to set it back to PENDING in order to start the lifecycle from the @@ -667,8 +724,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } tokens := Tokens(instanceDesc.Tokens) - level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", - len(tokens), "ring", i.RingName) + ro, rots := instanceDesc.GetReadOnlyState() + level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", len(tokens), "ring", i.RingName, "readOnly", ro, "readOnlyStateUpdate", rots) // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its // ring state as LEAVING. Make sure to switch to the ACTIVE state. @@ -747,7 +804,8 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool { ringTokens = append(ringTokens, newTokens...) sort.Sort(ringTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt()) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt(), ro, rots) i.setTokens(ringTokens) @@ -855,7 +913,8 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er sort.Sort(myTokens) i.setTokens(myTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt(), ro, rots) return ringDesc, true, nil }) @@ -889,7 +948,8 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { tokens = instanceDesc.Tokens } - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt()) + ro, rots := i.GetReadOnlyState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt(), ro, rots) return ringDesc, true, nil }) @@ -922,6 +982,7 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 instancesCount := 0 + readOnlyInstancesCount := 0 zones := map[string]int{} healthyInstancesInZone := map[string]int{} @@ -931,6 +992,9 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { for _, ingester := range ringDesc.Ingesters { zones[ingester.Zone]++ instancesCount++ + if ingester.ReadOnly { + readOnlyInstancesCount++ + } // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { @@ -944,6 +1008,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount i.instancesCount = instancesCount + i.readOnlyInstancesCount = readOnlyInstancesCount i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone] i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go index fe29cdfd5fc..e5f85e4e423 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go @@ -8,6 +8,7 @@ import ( type LifecyclerMetrics struct { consulHeartbeats prometheus.Counter shutdownDuration *prometheus.HistogramVec + readonly prometheus.Gauge } func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics { @@ -23,6 +24,11 @@ func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *Lifecycle Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins. ConstLabels: prometheus.Labels{"name": ringName}, }, []string{"op", "status"}), + readonly: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "lifecycler_read_only", + Help: "Set to 1 if this lifecycler's instance entry is in read-only state.", + ConstLabels: prometheus.Labels{"name": ringName}, + }), } } diff --git a/vendor/github.com/grafana/dskit/ring/model.go b/vendor/github.com/grafana/dskit/ring/model.go index 334b027d0f8..c4ba6446693 100644 --- a/vendor/github.com/grafana/dskit/ring/model.go +++ b/vendor/github.com/grafana/dskit/ring/model.go @@ -45,26 +45,30 @@ func NewDesc() *Desc { } } +func timeToUnixSecons(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.Unix() +} + // AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, // any other tokens are removed. -func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc { +func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time, readOnly bool, readOnlyUpdated time.Time) InstanceDesc { if d.Ingesters == nil { d.Ingesters = map[string]InstanceDesc{} } - registeredTimestamp := int64(0) - if !registeredAt.IsZero() { - registeredTimestamp = registeredAt.Unix() - } - ingester := InstanceDesc{ - Id: id, - Addr: addr, - Timestamp: time.Now().Unix(), - RegisteredTimestamp: registeredTimestamp, - State: state, - Tokens: tokens, - Zone: zone, + Id: id, + Addr: addr, + Timestamp: time.Now().Unix(), + State: state, + Tokens: tokens, + Zone: zone, + RegisteredTimestamp: timeToUnixSecons(registeredAt), + ReadOnly: readOnly, + ReadOnlyUpdatedTimestamp: timeToUnixSecons(readOnlyUpdated), } d.Ingesters[id] = ingester @@ -142,6 +146,20 @@ func (i *InstanceDesc) GetRegisteredAt() time.Time { return time.Unix(i.RegisteredTimestamp, 0) } +// GetReadOnlyState returns the read-only state and timestamp of last read-only state update. +func (i *InstanceDesc) GetReadOnlyState() (bool, time.Time) { + if i == nil { + return false, time.Time{} + } + + ts := time.Time{} + if i.ReadOnlyUpdatedTimestamp > 0 { + ts = time.Unix(i.ReadOnlyUpdatedTimestamp, 0) + } + + return i.ReadOnly, ts +} + func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool { healthy := op.IsInstanceInStateHealthy(i.State) @@ -552,6 +570,53 @@ func (d *Desc) instancesWithTokensCountPerZone() map[string]int { return instancesCountPerZone } +func (d *Desc) writableInstancesWithTokensCount() int { + writableInstancesWithTokensCount := 0 + if d != nil { + for _, ingester := range d.Ingesters { + if len(ingester.Tokens) > 0 && !ingester.ReadOnly { + writableInstancesWithTokensCount++ + } + } + } + return writableInstancesWithTokensCount +} + +func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int { + instancesCountPerZone := map[string]int{} + if d != nil { + for _, ingester := range d.Ingesters { + if len(ingester.Tokens) > 0 && !ingester.ReadOnly { + instancesCountPerZone[ingester.Zone]++ + } + } + } + return instancesCountPerZone +} + +func (d *Desc) readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() (int, int64) { + readOnlyInstances := 0 + oldestReadOnlyUpdatedTimestamp := int64(0) + first := true + + if d != nil { + for _, ingester := range d.Ingesters { + if !ingester.ReadOnly { + continue + } + + readOnlyInstances++ + if first { + oldestReadOnlyUpdatedTimestamp = ingester.ReadOnlyUpdatedTimestamp + } else { + oldestReadOnlyUpdatedTimestamp = min(oldestReadOnlyUpdatedTimestamp, ingester.ReadOnlyUpdatedTimestamp) + } + first = false + } + } + return readOnlyInstances, oldestReadOnlyUpdatedTimestamp +} + type CompareResult int // CompareResult responses @@ -600,6 +665,14 @@ func (d *Desc) RingCompare(o *Desc) CompareResult { return Different } + if ing.ReadOnly != oing.ReadOnly { + return Different + } + + if ing.ReadOnlyUpdatedTimestamp != oing.ReadOnlyUpdatedTimestamp { + return Different + } + if len(ing.Tokens) != len(oing.Tokens) { return Different } diff --git a/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go b/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go index 2fb15d8af98..cffa4b2fcc5 100644 --- a/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go +++ b/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go @@ -13,15 +13,25 @@ type PartitionRingReader interface { PartitionRing() *PartitionRing } +type InstanceRingReader interface { + // GetInstance return the InstanceDesc for the given instanceID or an error + // if the instance doesn't exist in the ring. The returned InstanceDesc is NOT a + // deep copy, so the caller should never modify it. + GetInstance(string) (InstanceDesc, error) + + // InstancesCount returns the number of instances in the ring. + InstancesCount() int +} + // PartitionInstanceRing holds a partitions ring and a instances ring, and provide functions // to look up the intersection of the two (e.g. healthy instances by partition). type PartitionInstanceRing struct { partitionsRingReader PartitionRingReader - instancesRing *Ring + instancesRing InstanceRingReader heartbeatTimeout time.Duration } -func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing *Ring, heartbeatTimeout time.Duration) *PartitionInstanceRing { +func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing InstanceRingReader, heartbeatTimeout time.Duration) *PartitionInstanceRing { return &PartitionInstanceRing{ partitionsRingReader: partitionsRingWatcher, instancesRing: instancesRing, @@ -33,7 +43,7 @@ func (r *PartitionInstanceRing) PartitionRing() *PartitionRing { return r.partitionsRingReader.PartitionRing() } -func (r *PartitionInstanceRing) InstanceRing() *Ring { +func (r *PartitionInstanceRing) InstanceRing() InstanceRingReader { return r.instancesRing } diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index ffdcf80ab52..ae378202025 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -316,7 +316,7 @@ func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Contex ext.Error.Set(cfg.Logger.Span, true) } - contextTracker.cancelAllContexts(cancellation.NewErrorf(cause)) + contextTracker.cancelAllContexts(cancellation.NewError(errors.New(cause))) cleanupResultsAlreadyReceived() return nil, err } diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index e1c1f6a5159..d47eb8fe256 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/dskit/flagext" - dsmath "github.com/grafana/dskit/internal/math" "github.com/grafana/dskit/internal/slices" "github.com/grafana/dskit/kv" shardUtil "github.com/grafana/dskit/ring/shard" @@ -36,6 +35,7 @@ const ( ) // ReadRing represents the read interface to the ring. +// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type ReadRing interface { // Get returns n (or more) instances which form the replicas for the given key. // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value @@ -88,6 +88,12 @@ type ReadRing interface { // InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens. InstancesWithTokensInZoneCount(zone string) int + // WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. + WritableInstancesWithTokensCount() int + + // WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. + WritableInstancesWithTokensInZoneCount(zone string) int + // ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring. ZonesCount() int } @@ -167,6 +173,7 @@ type instanceInfo struct { } // Ring is a Service that maintains an in-memory copy of a ring and watches for changes. +// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type Ring struct { services.Service @@ -184,6 +191,12 @@ type Ring struct { // then this value will be 0. oldestRegisteredTimestamp int64 + readOnlyInstances *int // Number of instances with ReadOnly flag set. Only valid if not nil. + // Oldest value of ReadOnlyUpdatedTimestamp for read-only instances. If there are no read-only instances, + // or if any read-only instance has ReadOnlyUpdatedTimestamp == 0 (which should not happen), then this value will be 0. + // Only valid if not nil. + oldestReadOnlyUpdatedTimestamp *int64 + // Maps a token with the information of the instance holding it. This map is immutable and // cannot be changed in place because it's shared "as is" between subrings (the only way to // change it is to create a new one and replace it). @@ -202,9 +215,15 @@ type Ring struct { // Number of registered instances per zone. instancesCountPerZone map[string]int - // Nubmber of registered instances with tokens per zone. + // Number of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int + // Number of registered instances are writable and have tokens. + writableInstancesWithTokensCount int + + // Number of registered instances with tokens per zone that are writable. + writableInstancesWithTokensCountPerZone map[string]int + // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring @@ -302,7 +321,7 @@ func (r *Ring) starting(ctx context.Context) error { func (r *Ring) loop(ctx context.Context) error { // Update the ring metrics at start of the main loop. r.mtx.Lock() - r.updateRingMetrics(Different) + r.updateRingMetrics() r.mtx.Unlock() r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { @@ -343,11 +362,17 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // when watching the ring for updates). r.mtx.Lock() r.ringDesc = ringDesc - r.updateRingMetrics(rc) + if rc != Equal { + r.updateRingMetrics() + } r.mtx.Unlock() return } + r.setRingStateFromDesc(ringDesc, true, true, true) +} + +func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegisteredTimestampCache, updateReadOnlyInstances bool) { now := time.Now() ringTokens := ringDesc.GetTokens() ringTokensByZone := ringDesc.getTokensByZone() @@ -357,6 +382,9 @@ func (r *Ring) updateRingState(ringDesc *Desc) { instancesWithTokensCount := ringDesc.instancesWithTokensCount() instancesCountPerZone := ringDesc.instancesCountPerZone() instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone() + writableInstancesWithTokensCount := ringDesc.writableInstancesWithTokensCount() + writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone() + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() r.mtx.Lock() defer r.mtx.Unlock() @@ -368,8 +396,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.instancesWithTokensCount = instancesWithTokensCount r.instancesCountPerZone = instancesCountPerZone r.instancesWithTokensCountPerZone = instancesWithTokensCountPerZone - r.oldestRegisteredTimestamp = oldestRegisteredTimestamp + r.writableInstancesWithTokensCount = writableInstancesWithTokensCount + r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone + if updateRegisteredTimestampCache { + r.oldestRegisteredTimestamp = oldestRegisteredTimestamp + } r.lastTopologyChange = now + if updateReadOnlyInstances { + r.readOnlyInstances = &readOnlyInstances + r.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp + } // Invalidate all cached subrings. if r.shuffledSubringCache != nil { @@ -379,7 +415,9 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.shuffledSubringWithLookbackCache = make(map[subringCacheKey]cachedSubringWithLookback[*Ring]) } - r.updateRingMetrics(rc) + if updateMetrics { + r.updateRingMetrics() + } } // Get returns n (or more) instances which form the replicas for the given key. @@ -423,7 +461,7 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance distinctHosts = bufHosts[:0] distinctZones = bufZones[:0] ) - for i := start; len(distinctHosts) < dsmath.Min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { + for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { iterations++ // Wrap i around in the ring. i %= len(r.ringTokens) @@ -528,7 +566,7 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro // Given data is replicated to RF different zones, we can tolerate a number of // RF/2 failing zones. However, we need to protect from the case the ring currently // contains instances in a number of zones < RF. - numReplicatedZones := dsmath.Min(len(r.ringZones), r.cfg.ReplicationFactor) + numReplicatedZones := min(len(r.ringZones), r.cfg.ReplicationFactor) minSuccessZones := (numReplicatedZones / 2) + 1 maxUnavailableZones = minSuccessZones - 1 @@ -619,11 +657,7 @@ func (r *Desc) CountTokens() map[string]int64 { } // updateRingMetrics updates ring metrics. Caller must be holding the Write lock! -func (r *Ring) updateRingMetrics(compareResult CompareResult) { - if compareResult == Equal { - return - } - +func (r *Ring) updateRingMetrics() { numByState := map[string]int{} oldestTimestampByState := map[string]int64{} @@ -651,10 +685,6 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp)) } - if compareResult == EqualButStatesAndTimestamps { - return - } - r.totalTokensGauge.Set(float64(len(r.ringTokens))) } @@ -677,17 +707,19 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // // - Shuffling: probabilistically, for a large enough cluster each identifier gets a different // set of instances, with a reduced number of overlapping instances between two identifiers. +// +// Subring returned by this method does not contain instances that have read-only field set. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - // Nothing to do if the shard size is not smaller then the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r - } - if cached := r.getCachedShuffledSubring(identifier, size); cached != nil { return cached } - result := r.shuffleShard(identifier, size, 0, time.Now()) + var result *Ring + if size <= 0 { + result = r.filterOutReadOnlyInstances(0, time.Now()) + } else { + result = r.shuffleShard(identifier, size, 0, time.Now()) + } // Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring, // when we update the cached ring. if result != r { @@ -704,17 +736,20 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // // This function supports caching, but the cache will only be effective if successive calls for the // same identifier are with the same lookbackPeriod and increasing values of now. +// +// Subring returned by this method does not contain read-only instances that have changed their state +// before the lookback period. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { - // Nothing to do if the shard size is not smaller then the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r - } - if cached := r.getCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now); cached != nil { return cached } - result := r.shuffleShard(identifier, size, lookbackPeriod, now) + var result *Ring + if size <= 0 { + result = r.filterOutReadOnlyInstances(lookbackPeriod, now) + } else { + result = r.shuffleShard(identifier, size, lookbackPeriod, now) + } if result != r { r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result) @@ -735,6 +770,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // // If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance), // then r.oldestRegisteredTimestamp is zero too, and we skip this optimization. + // + // Even if some instances are read-only, they must have changed their read-only status within lookback window + // (because they were all registered within lookback window), so they would be included in the result. if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil { return r } @@ -750,13 +788,26 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur actualZones = []string{""} } - shard := make(map[string]InstanceDesc, size) + shard := make(map[string]InstanceDesc, min(len(r.ringDesc.Ingesters), size)) // We need to iterate zones always in the same order to guarantee stability. for _, zone := range actualZones { var tokens []uint32 if r.cfg.ZoneAwarenessEnabled { + // If we're going to include all instances from this zone, we can simply filter out + // unwanted instances, and avoid iterating through tokens. + if numInstancesPerZone >= r.instancesCountPerZone[zone] { + for id, inst := range r.ringDesc.Ingesters { + if inst.Zone == zone && shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) { + shard[id] = inst + } + } + + // We can go to the next zone, no need to iterate tokens. + continue + } + tokens = r.ringTokensByZone[zone] } else { // When zone-awareness is disabled, we just iterate over 1 single fake zone @@ -797,6 +848,11 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur instanceID := info.InstanceID instance := r.ringDesc.Ingesters[instanceID] + + if !shouldIncludeReadonlyInstanceInTheShard(instance, lookbackPeriod, lookbackUntil) { + continue + } + // Include instance in the subring. shard[instanceID] = instance // If the lookback is enabled and this instance has been registered within the lookback period @@ -805,6 +861,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur continue } + // If the lookback is enabled, and this instance is read only or has switched its read-only state + // within the lookback period, then we should include it in the subring, but continue selecting more instances. + // + // * If instance switched to read-only state within the lookback period, then next instance is currently receiving data that previously belonged to this instance. + // * If instance switched to read-write state (read-only=false) within the lookback period, then there was another instance that received data that now belongs back to this instance. + if lookbackPeriod > 0 && (instance.ReadOnly || instance.ReadOnlyUpdatedTimestamp >= lookbackUntil) { + continue + } + found = true break } @@ -818,21 +883,72 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } - // Build a read-only ring for the shard. + return r.buildRingForTheShard(shard) +} + +// shouldIncludeReadonlyInstanceInTheShard returns true if instance is not read-only, or when it is read-only and should be included in the shuffle shard. +func shouldIncludeReadonlyInstanceInTheShard(instance InstanceDesc, lookbackPeriod time.Duration, lookbackUntil int64) bool { + if !instance.ReadOnly { + return true + } + // The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded. + if lookbackPeriod == 0 { + return false + } + // With lookback period >0, read only instances are only included if they have not changed read-only status in the lookback window. + // If ReadOnlyUpdatedTimestamp is not set, we include the instance, and extend the shard later. + if lookbackPeriod > 0 && instance.ReadOnlyUpdatedTimestamp > 0 && instance.ReadOnlyUpdatedTimestamp < lookbackUntil { + return false + } + return true +} + +// filterOutReadOnlyInstances removes all read-only instances from the ring, and returns the resulting ring. +func (r *Ring) filterOutReadOnlyInstances(lookbackPeriod time.Duration, now time.Time) *Ring { + lookbackUntil := now.Add(-lookbackPeriod).Unix() + + r.mtx.RLock() + defer r.mtx.RUnlock() + + // If there are no read-only instances, there's no need to do any filtering. + if r.readOnlyInstances != nil && *r.readOnlyInstances == 0 { + return r + } + + // If all readOnlyUpdatedTimestamp values are within lookback window, we can return the ring without any filtering. + if lookbackPeriod > 0 && r.oldestReadOnlyUpdatedTimestamp != nil && *r.oldestReadOnlyUpdatedTimestamp >= lookbackUntil { + return r + } + + shard := make(map[string]InstanceDesc, len(r.ringDesc.Ingesters)) + + for id, inst := range r.ringDesc.Ingesters { + if shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) { + shard[id] = inst + } + } + + return r.buildRingForTheShard(shard) +} + +// buildRingForTheShard builds read-only ring for the shard (this ring won't be updated in the future). +func (r *Ring) buildRingForTheShard(shard map[string]InstanceDesc) *Ring { shardDesc := &Desc{Ingesters: shard} shardTokensByZone := shardDesc.getTokensByZone() shardTokens := mergeTokenGroups(shardTokensByZone) return &Ring{ - cfg: r.cfg, - strategy: r.strategy, - ringDesc: shardDesc, - ringTokens: shardTokens, - ringTokensByZone: shardTokensByZone, - ringZones: getZones(shardTokensByZone), - instancesWithTokensCount: shardDesc.instancesWithTokensCount(), - instancesCountPerZone: shardDesc.instancesCountPerZone(), - instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(), + cfg: r.cfg, + strategy: r.strategy, + ringDesc: shardDesc, + ringTokens: shardTokens, + ringTokensByZone: shardTokensByZone, + ringZones: getZones(shardTokensByZone), + instancesWithTokensCount: shardDesc.instancesWithTokensCount(), + instancesCountPerZone: shardDesc.instancesCountPerZone(), + instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(), + writableInstancesWithTokensCount: shardDesc.writableInstancesWithTokensCount(), + writableInstancesWithTokensCountPerZone: shardDesc.writableInstancesWithTokensCountPerZone(), oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(), @@ -1036,11 +1152,12 @@ func (r *Ring) setCachedShuffledSubringWithLookback(identifier string, size int, validForLookbackWindowsStartingBefore := int64(math.MaxInt64) for _, instance := range subring.ringDesc.Ingesters { - registeredDuringLookbackWindow := instance.RegisteredTimestamp >= lookbackWindowStart - - if registeredDuringLookbackWindow && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore { + if instance.RegisteredTimestamp >= lookbackWindowStart && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore { validForLookbackWindowsStartingBefore = instance.RegisteredTimestamp } + if instance.ReadOnlyUpdatedTimestamp >= lookbackWindowStart && instance.ReadOnlyUpdatedTimestamp < validForLookbackWindowsStartingBefore { + validForLookbackWindowsStartingBefore = instance.ReadOnlyUpdatedTimestamp + } } r.mtx.Lock() @@ -1141,6 +1258,22 @@ func (r *Ring) InstancesWithTokensInZoneCount(zone string) int { return r.instancesWithTokensCountPerZone[zone] } +// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens. +func (r *Ring) WritableInstancesWithTokensCount() int { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.writableInstancesWithTokensCount +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (r *Ring) WritableInstancesWithTokensInZoneCount(zone string) int { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.writableInstancesWithTokensCountPerZone[zone] +} + func (r *Ring) ZonesCount() int { r.mtx.RLock() defer r.mtx.RUnlock() @@ -1148,6 +1281,19 @@ func (r *Ring) ZonesCount() int { return len(r.ringZones) } +// readOnlyInstanceCount returns the number of read only instances in the ring. +func (r *Ring) readOnlyInstanceCount() int { + r.mtx.RLock() + c := 0 + for _, i := range r.ringDesc.Ingesters { + if i.ReadOnly { + c++ + } + } + r.mtx.RUnlock() + return c +} + // Operation describes which instances can be included in the replica set, based on their state. // // Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states. diff --git a/vendor/github.com/grafana/dskit/ring/ring.pb.go b/vendor/github.com/grafana/dskit/ring/ring.pb.go index fb9d76c480a..f976b7e994d 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.pb.go +++ b/vendor/github.com/grafana/dskit/ring/ring.pb.go @@ -128,6 +128,15 @@ type InstanceDesc struct { RegisteredTimestamp int64 `protobuf:"varint,8,opt,name=registered_timestamp,json=registeredTimestamp,proto3" json:"registered_timestamp,omitempty"` // ID of the instance. This value is the same as the key in the ingesters map in Desc. Id string `protobuf:"bytes,9,opt,name=id,proto3" json:"id,omitempty"` + // Unix timestamp (with seconds precision) of when the read_only flag was updated. This + // is used to find other instances that could have possibly owned a specific token in + // the past on the write path, due to *this* instance being read-only. This value should + // only increase. + ReadOnlyUpdatedTimestamp int64 `protobuf:"varint,10,opt,name=read_only_updated_timestamp,json=readOnlyUpdatedTimestamp,proto3" json:"read_only_updated_timestamp,omitempty"` + // Indicates whether this instance is read only. + // Read-only instances go through standard state changes, and special handling is applied to them + // during shuffle shards. + ReadOnly bool `protobuf:"varint,11,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"` } func (m *InstanceDesc) Reset() { *m = InstanceDesc{} } @@ -211,6 +220,20 @@ func (m *InstanceDesc) GetId() string { return "" } +func (m *InstanceDesc) GetReadOnlyUpdatedTimestamp() int64 { + if m != nil { + return m.ReadOnlyUpdatedTimestamp + } + return 0 +} + +func (m *InstanceDesc) GetReadOnly() bool { + if m != nil { + return m.ReadOnly + } + return false +} + func init() { proto.RegisterEnum("ring.InstanceState", InstanceState_name, InstanceState_value) proto.RegisterType((*Desc)(nil), "ring.Desc") @@ -221,35 +244,37 @@ func init() { func init() { proto.RegisterFile("ring.proto", fileDescriptor_26381ed67e202a6e) } var fileDescriptor_26381ed67e202a6e = []byte{ - // 435 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0x41, 0x8b, 0xd3, 0x40, - 0x1c, 0xc5, 0xe7, 0x9f, 0x4c, 0xb3, 0xe9, 0xbf, 0x6e, 0x09, 0xb3, 0x22, 0x71, 0x91, 0x31, 0xec, - 0x29, 0x0a, 0x76, 0xb1, 0x7a, 0x10, 0xc1, 0xc3, 0xae, 0x1b, 0x25, 0xa5, 0xd4, 0x25, 0x96, 0xbd, - 0x4a, 0xda, 0x8c, 0x31, 0xac, 0x4d, 0x96, 0x64, 0x2a, 0xac, 0x27, 0x3f, 0x82, 0x5f, 0xc0, 0xbb, - 0x1f, 0x65, 0x8f, 0x3d, 0xee, 0x49, 0x6c, 0x0a, 0xe2, 0x71, 0x3f, 0x82, 0xcc, 0xa4, 0x5a, 0x7b, - 0x7b, 0xbf, 0xbc, 0x37, 0xef, 0x4d, 0x60, 0x10, 0xcb, 0x2c, 0x4f, 0x7b, 0x17, 0x65, 0x21, 0x0b, - 0x46, 0x95, 0xde, 0x7f, 0x94, 0x66, 0xf2, 0xc3, 0x7c, 0xd2, 0x9b, 0x16, 0xb3, 0xc3, 0xb4, 0x48, - 0x8b, 0x43, 0x6d, 0x4e, 0xe6, 0xef, 0x35, 0x69, 0xd0, 0xaa, 0x39, 0x74, 0xf0, 0x0d, 0x90, 0x9e, - 0x88, 0x6a, 0xca, 0x5e, 0x60, 0x3b, 0xcb, 0x53, 0x51, 0x49, 0x51, 0x56, 0x2e, 0x78, 0xa6, 0xdf, - 0xe9, 0xdf, 0xed, 0xe9, 0x76, 0x65, 0xf7, 0xc2, 0xbf, 0x5e, 0x90, 0xcb, 0xf2, 0xf2, 0x98, 0x5e, - 0xfd, 0xb8, 0x4f, 0xa2, 0xcd, 0x89, 0xfd, 0x53, 0xec, 0x6e, 0x47, 0x98, 0x83, 0xe6, 0xb9, 0xb8, - 0x74, 0xc1, 0x03, 0xbf, 0x1d, 0x29, 0xc9, 0x7c, 0x6c, 0x7d, 0x8a, 0x3f, 0xce, 0x85, 0x6b, 0x78, - 0xe0, 0x77, 0xfa, 0xac, 0xa9, 0x0f, 0xf3, 0x4a, 0xc6, 0xf9, 0x54, 0xa8, 0x99, 0xa8, 0x09, 0x3c, - 0x37, 0x9e, 0xc1, 0x80, 0xda, 0x86, 0x63, 0x1e, 0xfc, 0x02, 0xbc, 0xf5, 0x7f, 0x82, 0x31, 0xa4, - 0x71, 0x92, 0x94, 0xeb, 0x5e, 0xad, 0xd9, 0x3d, 0x6c, 0xcb, 0x6c, 0x26, 0x2a, 0x19, 0xcf, 0x2e, - 0x74, 0xb9, 0x19, 0x6d, 0x3e, 0xb0, 0x07, 0xd8, 0xaa, 0x64, 0x2c, 0x85, 0x6b, 0x7a, 0xe0, 0x77, - 0xfb, 0x7b, 0xdb, 0xb3, 0x6f, 0x95, 0x15, 0x35, 0x09, 0x76, 0x07, 0x2d, 0x59, 0x9c, 0x8b, 0xbc, - 0x72, 0x2d, 0xcf, 0xf4, 0x77, 0xa3, 0x35, 0xa9, 0xd1, 0xcf, 0x45, 0x2e, 0xdc, 0x9d, 0x66, 0x54, - 0x69, 0xf6, 0x18, 0x6f, 0x97, 0x22, 0xcd, 0xd4, 0x1f, 0x8b, 0xe4, 0xdd, 0x66, 0xdf, 0xd6, 0xfb, - 0x7b, 0x1b, 0x6f, 0xfc, 0xef, 0x26, 0x5d, 0x34, 0xb2, 0xc4, 0x6d, 0xeb, 0x12, 0x23, 0x4b, 0x06, - 0xd4, 0xa6, 0x4e, 0x6b, 0x40, 0xed, 0x96, 0x63, 0x3d, 0x1c, 0xe2, 0xee, 0xd6, 0x95, 0x18, 0xa2, - 0x75, 0xf4, 0x72, 0x1c, 0x9e, 0x05, 0x0e, 0x61, 0x1d, 0xdc, 0x19, 0x06, 0x47, 0x67, 0xe1, 0xe8, - 0xb5, 0x03, 0x0a, 0x4e, 0x83, 0xd1, 0x89, 0x02, 0x43, 0xc1, 0xe0, 0x4d, 0x38, 0x52, 0x60, 0x32, - 0x1b, 0xe9, 0x30, 0x78, 0x35, 0x76, 0xe8, 0xf1, 0xd3, 0xc5, 0x92, 0x93, 0xeb, 0x25, 0x27, 0x37, - 0x4b, 0x0e, 0x5f, 0x6a, 0x0e, 0xdf, 0x6b, 0x0e, 0x57, 0x35, 0x87, 0x45, 0xcd, 0xe1, 0x67, 0xcd, - 0xe1, 0x77, 0xcd, 0xc9, 0x4d, 0xcd, 0xe1, 0xeb, 0x8a, 0x93, 0xc5, 0x8a, 0x93, 0xeb, 0x15, 0x27, - 0x13, 0x4b, 0xbf, 0x89, 0x27, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xad, 0x7a, 0xa4, 0x89, 0x56, - 0x02, 0x00, 0x00, + // 478 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0x31, 0x6f, 0xd3, 0x40, + 0x1c, 0xc5, 0x7d, 0xf6, 0xc5, 0xb5, 0xff, 0xa1, 0x91, 0x75, 0x45, 0xc8, 0xb4, 0xe8, 0xb0, 0x3a, + 0x19, 0x24, 0x52, 0x11, 0x18, 0x10, 0x52, 0x87, 0x96, 0x1a, 0xe4, 0x28, 0x4a, 0x2b, 0x13, 0xba, + 0x46, 0x4e, 0x7c, 0x18, 0xab, 0x89, 0x1d, 0xd9, 0x17, 0xa4, 0x30, 0xf1, 0x11, 0xf8, 0x02, 0xec, + 0x7c, 0x0e, 0xa6, 0x8e, 0x19, 0x3b, 0x21, 0xe2, 0x2c, 0x8c, 0xfd, 0x08, 0xe8, 0xce, 0x6d, 0xdd, + 0x6c, 0xef, 0xe5, 0xbd, 0xff, 0xef, 0xe5, 0x24, 0x03, 0xe4, 0x49, 0x1a, 0xb7, 0x67, 0x79, 0xc6, + 0x33, 0x82, 0x85, 0xde, 0x7d, 0x11, 0x27, 0xfc, 0xcb, 0x7c, 0xd4, 0x1e, 0x67, 0xd3, 0x83, 0x38, + 0x8b, 0xb3, 0x03, 0x19, 0x8e, 0xe6, 0x9f, 0xa5, 0x93, 0x46, 0xaa, 0xea, 0x68, 0xff, 0x27, 0x02, + 0x7c, 0xc2, 0x8a, 0x31, 0x39, 0x04, 0x33, 0x49, 0x63, 0x56, 0x70, 0x96, 0x17, 0x36, 0x72, 0x34, + 0xb7, 0xd9, 0x79, 0xdc, 0x96, 0x74, 0x11, 0xb7, 0xfd, 0xdb, 0xcc, 0x4b, 0x79, 0xbe, 0x38, 0xc6, + 0x97, 0x7f, 0x9e, 0x2a, 0x41, 0x7d, 0xb1, 0x7b, 0x06, 0xad, 0xcd, 0x0a, 0xb1, 0x40, 0xbb, 0x60, + 0x0b, 0x1b, 0x39, 0xc8, 0x35, 0x03, 0x21, 0x89, 0x0b, 0x8d, 0xaf, 0xe1, 0x64, 0xce, 0x6c, 0xd5, + 0x41, 0x6e, 0xb3, 0x43, 0x2a, 0xbc, 0x9f, 0x16, 0x3c, 0x4c, 0xc7, 0x4c, 0xcc, 0x04, 0x55, 0xe1, + 0xad, 0xfa, 0x06, 0x75, 0xb1, 0xa1, 0x5a, 0xda, 0xfe, 0x6f, 0x15, 0x1e, 0xdc, 0x6f, 0x10, 0x02, + 0x38, 0x8c, 0xa2, 0xfc, 0x86, 0x2b, 0x35, 0x79, 0x02, 0x26, 0x4f, 0xa6, 0xac, 0xe0, 0xe1, 0x74, + 0x26, 0xe1, 0x5a, 0x50, 0xff, 0x40, 0x9e, 0x41, 0xa3, 0xe0, 0x21, 0x67, 0xb6, 0xe6, 0x20, 0xb7, + 0xd5, 0xd9, 0xd9, 0x9c, 0xfd, 0x28, 0xa2, 0xa0, 0x6a, 0x90, 0x47, 0xa0, 0xf3, 0xec, 0x82, 0xa5, + 0x85, 0xad, 0x3b, 0x9a, 0xbb, 0x1d, 0xdc, 0x38, 0x31, 0xfa, 0x2d, 0x4b, 0x99, 0xbd, 0x55, 0x8d, + 0x0a, 0x4d, 0x5e, 0xc2, 0xc3, 0x9c, 0xc5, 0x89, 0x78, 0x31, 0x8b, 0x86, 0xf5, 0xbe, 0x21, 0xf7, + 0x77, 0xea, 0x6c, 0x70, 0xf7, 0x4f, 0x5a, 0xa0, 0x26, 0x91, 0x6d, 0x4a, 0x88, 0x9a, 0x44, 0xe4, + 0x10, 0xf6, 0x72, 0x16, 0x46, 0xc3, 0x2c, 0x9d, 0x2c, 0x86, 0xf3, 0x59, 0x14, 0xf2, 0x0d, 0x12, + 0x48, 0x92, 0x2d, 0x2a, 0xa7, 0xe9, 0x64, 0xf1, 0xa9, 0x2a, 0xd4, 0xb8, 0x3d, 0x30, 0xef, 0xce, + 0xed, 0xa6, 0x83, 0x5c, 0x23, 0x30, 0x6e, 0xcb, 0x5d, 0x6c, 0x60, 0xab, 0xd1, 0xc5, 0x46, 0xc3, + 0xd2, 0x9f, 0xf7, 0x60, 0x7b, 0xe3, 0xb9, 0x04, 0x40, 0x3f, 0x7a, 0x37, 0xf0, 0xcf, 0x3d, 0x4b, + 0x21, 0x4d, 0xd8, 0xea, 0x79, 0x47, 0xe7, 0x7e, 0xff, 0x83, 0x85, 0x84, 0x39, 0xf3, 0xfa, 0x27, + 0xc2, 0xa8, 0xc2, 0x74, 0x4f, 0xfd, 0xbe, 0x30, 0x1a, 0x31, 0x00, 0xf7, 0xbc, 0xf7, 0x03, 0x0b, + 0x1f, 0xbf, 0x5e, 0xae, 0xa8, 0x72, 0xb5, 0xa2, 0xca, 0xf5, 0x8a, 0xa2, 0xef, 0x25, 0x45, 0xbf, + 0x4a, 0x8a, 0x2e, 0x4b, 0x8a, 0x96, 0x25, 0x45, 0x7f, 0x4b, 0x8a, 0xfe, 0x95, 0x54, 0xb9, 0x2e, + 0x29, 0xfa, 0xb1, 0xa6, 0xca, 0x72, 0x4d, 0x95, 0xab, 0x35, 0x55, 0x46, 0xba, 0xfc, 0xde, 0x5e, + 0xfd, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x6a, 0x5b, 0x75, 0x81, 0xb2, 0x02, 0x00, 0x00, } func (x InstanceState) String() string { @@ -335,6 +360,12 @@ func (this *InstanceDesc) Equal(that interface{}) bool { if this.Id != that1.Id { return false } + if this.ReadOnlyUpdatedTimestamp != that1.ReadOnlyUpdatedTimestamp { + return false + } + if this.ReadOnly != that1.ReadOnly { + return false + } return true } func (this *Desc) GoString() string { @@ -363,7 +394,7 @@ func (this *InstanceDesc) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 13) s = append(s, "&ring.InstanceDesc{") s = append(s, "Addr: "+fmt.Sprintf("%#v", this.Addr)+",\n") s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n") @@ -372,6 +403,8 @@ func (this *InstanceDesc) GoString() string { s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n") s = append(s, "RegisteredTimestamp: "+fmt.Sprintf("%#v", this.RegisteredTimestamp)+",\n") s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") + s = append(s, "ReadOnlyUpdatedTimestamp: "+fmt.Sprintf("%#v", this.ReadOnlyUpdatedTimestamp)+",\n") + s = append(s, "ReadOnly: "+fmt.Sprintf("%#v", this.ReadOnly)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -450,6 +483,21 @@ func (m *InstanceDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ReadOnly { + i-- + if m.ReadOnly { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x58 + } + if m.ReadOnlyUpdatedTimestamp != 0 { + i = encodeVarintRing(dAtA, i, uint64(m.ReadOnlyUpdatedTimestamp)) + i-- + dAtA[i] = 0x50 + } if len(m.Id) > 0 { i -= len(m.Id) copy(dAtA[i:], m.Id) @@ -570,6 +618,12 @@ func (m *InstanceDesc) Size() (n int) { if l > 0 { n += 1 + l + sovRing(uint64(l)) } + if m.ReadOnlyUpdatedTimestamp != 0 { + n += 1 + sovRing(uint64(m.ReadOnlyUpdatedTimestamp)) + } + if m.ReadOnly { + n += 2 + } return n } @@ -611,6 +665,8 @@ func (this *InstanceDesc) String() string { `Zone:` + fmt.Sprintf("%v", this.Zone) + `,`, `RegisteredTimestamp:` + fmt.Sprintf("%v", this.RegisteredTimestamp) + `,`, `Id:` + fmt.Sprintf("%v", this.Id) + `,`, + `ReadOnlyUpdatedTimestamp:` + fmt.Sprintf("%v", this.ReadOnlyUpdatedTimestamp) + `,`, + `ReadOnly:` + fmt.Sprintf("%v", this.ReadOnly) + `,`, `}`, }, "") return s @@ -1063,6 +1119,45 @@ func (m *InstanceDesc) Unmarshal(dAtA []byte) error { } m.Id = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReadOnlyUpdatedTimestamp", wireType) + } + m.ReadOnlyUpdatedTimestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRing + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ReadOnlyUpdatedTimestamp |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReadOnly", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRing + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.ReadOnly = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRing(dAtA[iNdEx:]) diff --git a/vendor/github.com/grafana/dskit/ring/ring.proto b/vendor/github.com/grafana/dskit/ring/ring.proto index 08bf885096c..7795e8493fc 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.proto +++ b/vendor/github.com/grafana/dskit/ring/ring.proto @@ -44,6 +44,17 @@ message InstanceDesc { // ID of the instance. This value is the same as the key in the ingesters map in Desc. string id = 9; + + // Unix timestamp (with seconds precision) of when the read_only flag was updated. This + // is used to find other instances that could have possibly owned a specific token in + // the past on the write path, due to *this* instance being read-only. This value should + // only increase. + int64 read_only_updated_timestamp = 10; + + // Indicates whether this instance is read only. + // Read-only instances go through standard state changes, and special handling is applied to them + // during shuffle shards. + bool read_only = 11; } enum InstanceState { diff --git a/vendor/github.com/grafana/dskit/ring/ring_http.go b/vendor/github.com/grafana/dskit/ring/ring_http.go index 7300430ddac..67249e2b496 100644 --- a/vendor/github.com/grafana/dskit/ring/ring_http.go +++ b/vendor/github.com/grafana/dskit/ring/ring_http.go @@ -24,9 +24,9 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F if t.IsZero() { return "" } - return t.Format(time.RFC3339Nano) + return t.Format(time.RFC3339) }, - "durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() }, + "durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Second).String() }, }).Parse(defaultPageContent)) type httpResponse struct { @@ -36,15 +36,17 @@ type httpResponse struct { } type ingesterDesc struct { - ID string `json:"id"` - State string `json:"state"` - Address string `json:"address"` - HeartbeatTimestamp time.Time `json:"timestamp"` - RegisteredTimestamp time.Time `json:"registered_timestamp"` - Zone string `json:"zone"` - Tokens []uint32 `json:"tokens"` - NumTokens int `json:"-"` - Ownership float64 `json:"-"` + ID string `json:"id"` + State string `json:"state"` + Address string `json:"address"` + HeartbeatTimestamp time.Time `json:"timestamp"` + RegisteredTimestamp time.Time `json:"registered_timestamp"` + ReadOnly bool `json:"read_only"` + ReadOnlyUpdatedTimestamp time.Time `json:"read_only_updated_timestamp"` + Zone string `json:"zone"` + Tokens []uint32 `json:"tokens"` + NumTokens int `json:"-"` + Ownership float64 `json:"-"` } type ringAccess interface { @@ -110,16 +112,20 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { state = "UNHEALTHY" } + ro, rots := ing.GetReadOnlyState() + ingesters = append(ingesters, ingesterDesc{ - ID: id, - State: state, - Address: ing.Addr, - HeartbeatTimestamp: time.Unix(ing.Timestamp, 0).UTC(), - RegisteredTimestamp: ing.GetRegisteredAt().UTC(), - Tokens: ing.Tokens, - Zone: ing.Zone, - NumTokens: len(ing.Tokens), - Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, + ID: id, + State: state, + Address: ing.Addr, + HeartbeatTimestamp: time.Unix(ing.Timestamp, 0).UTC(), + RegisteredTimestamp: ing.GetRegisteredAt().UTC(), + ReadOnly: ro, + ReadOnlyUpdatedTimestamp: rots.UTC(), + Tokens: ing.Tokens, + Zone: ing.Zone, + NumTokens: len(ing.Tokens), + Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, }) } diff --git a/vendor/github.com/grafana/dskit/ring/ring_status.gohtml b/vendor/github.com/grafana/dskit/ring/ring_status.gohtml index 80e5b6a8f76..157f8d89e63 100644 --- a/vendor/github.com/grafana/dskit/ring/ring_status.gohtml +++ b/vendor/github.com/grafana/dskit/ring/ring_status.gohtml @@ -18,6 +18,8 @@ State Address Registered At + Read-Only + Read-Only Updated Last Heartbeat Tokens Ownership @@ -36,6 +38,13 @@ {{ .State }} {{ .Address }} {{ .RegisteredTimestamp | timeOrEmptyString }} + {{ if .ReadOnly }} + {{ .ReadOnly }} + {{ .ReadOnlyUpdatedTimestamp | durationSince }} ago ({{ .ReadOnlyUpdatedTimestamp.Format "15:04:05.999" }}) + {{ else }} + + {{ .ReadOnlyUpdatedTimestamp | timeOrEmptyString }} + {{ end }} {{ .HeartbeatTimestamp | durationSince }} ago ({{ .HeartbeatTimestamp.Format "15:04:05.999" }}) {{ .NumTokens }} {{ .Ownership | humanFloat }}% @@ -66,4 +75,4 @@ {{ end }} - \ No newline at end of file + diff --git a/vendor/github.com/grafana/dskit/ring/shard/shard.go b/vendor/github.com/grafana/dskit/ring/shard/shard.go index 26d695514c2..bbc96670731 100644 --- a/vendor/github.com/grafana/dskit/ring/shard/shard.go +++ b/vendor/github.com/grafana/dskit/ring/shard/shard.go @@ -30,6 +30,9 @@ func ShuffleShardSeed(identifier, zone string) int64 { // zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible // by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up. func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int { + if shardSize == math.MaxInt { + return math.MaxInt + } return int(math.Ceil(float64(shardSize) / float64(numZones))) } diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go index 84b69de766d..e74d011d995 100644 --- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -2,12 +2,14 @@ package runtimeconfig import ( "bytes" + "compress/gzip" "context" "crypto/sha256" "flag" "fmt" "io" "os" + "strings" "sync" "time" @@ -183,8 +185,8 @@ func (om *Manager) loadConfig() error { mergedConfig := map[string]interface{}{} for _, f := range om.cfg.LoadPath { - yamlFile := map[string]interface{}{} - err := yaml.Unmarshal(rawData[f], &yamlFile) + data := rawData[f] + yamlFile, err := om.unmarshalMaybeGzipped(f, data) if err != nil { om.configLoadSuccess.Set(0) return errors.Wrapf(err, "unmarshal file %q", f) @@ -218,6 +220,32 @@ func (om *Manager) loadConfig() error { return nil } +func (om *Manager) unmarshalMaybeGzipped(filename string, data []byte) (map[string]any, error) { + yamlFile := map[string]any{} + if strings.HasSuffix(filename, ".gz") { + r, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, errors.Wrap(err, "read gzipped file") + } + defer r.Close() + err = yaml.NewDecoder(r).Decode(&yamlFile) + return yamlFile, errors.Wrap(err, "uncompress/unmarshal gzipped file") + } + + if err := yaml.Unmarshal(data, &yamlFile); err != nil { + // Give a hint if we think that file is gzipped. + if isGzip(data) { + return nil, errors.Wrap(err, "file looks gzipped but doesn't have a .gz extension") + } + return nil, err + } + return yamlFile, nil +} + +func isGzip(data []byte) bool { + return len(data) > 2 && data[0] == 0x1f && data[1] == 0x8b +} + func mergeConfigMaps(a, b map[string]interface{}) map[string]interface{} { out := make(map[string]interface{}, len(a)) for k, v := range a { diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index a23eead3891..7b8e7593d9e 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -31,7 +31,6 @@ import ( "golang.org/x/net/netutil" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/experimental" "google.golang.org/grpc/keepalive" "github.com/grafana/dskit/httpgrpc" @@ -197,7 +196,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.GRPCServerMinTimeBetweenPings, "server.grpc.keepalive.min-time-between-pings", 5*time.Minute, "Minimum amount of time a client should wait before sending a keepalive ping. If client sends keepalive ping more often, server will send GOAWAY and close the connection.") f.BoolVar(&cfg.GRPCServerPingWithoutStreamAllowed, "server.grpc.keepalive.ping-without-stream-allowed", false, "If true, server allows keepalive pings even when there are no active streams(RPCs). If false, and client sends ping when there are no active streams, server will send GOAWAY and close the connection.") f.BoolVar(&cfg.GRPCServerStatsTrackingEnabled, "server.grpc.stats-tracking-enabled", true, "If true, the request_message_bytes, response_message_bytes, and inflight_requests metrics will be tracked. Enabling this option prevents the use of memory pools for parsing gRPC request bodies and may lead to more memory allocations.") - f.BoolVar(&cfg.GRPCServerRecvBufferPoolsEnabled, "server.grpc.recv-buffer-pools-enabled", false, "If true, gGPC's buffer pools will be used to handle incoming requests. Enabling this feature can reduce memory allocation, but also requires disabling GRPC server stats tracking by setting `server.grpc.stats-tracking-enabled=false`. This is an experimental gRPC feature, so it might be removed in a future version of the gRPC library.") + f.BoolVar(&cfg.GRPCServerRecvBufferPoolsEnabled, "server.grpc.recv-buffer-pools-enabled", false, "Deprecated option, has no effect and will be removed in a future version.") f.IntVar(&cfg.GRPCServerNumWorkers, "server.grpc.num-workers", 0, "If non-zero, configures the amount of GRPC server workers used to serve the requests.") f.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Base path to serve all API routes from (e.g. /v1/)") f.StringVar(&cfg.LogFormat, "log.format", log.LogfmtFormat, "Output log messages in the given format. Valid formats: [logfmt, json]") @@ -439,10 +438,7 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { } if cfg.GRPCServerRecvBufferPoolsEnabled { - if cfg.GRPCServerStatsTrackingEnabled { - return nil, fmt.Errorf("grpc_server_stats_tracking_enabled must be set to false if grpc_server_recv_buffer_pools_enabled is true") - } - grpcOptions = append(grpcOptions, experimental.RecvBufferPool(grpc.NewSharedBufferPool())) + level.Warn(logger).Log("msg", "'server.grpc.recv-buffer-pools-enabled' is a deprecated option that currently has no effect and will be removed in a future version") } grpcOptions = append(grpcOptions, cfg.GRPCOptions...) diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index 185a4a10a23..6ef464e5c23 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -3,7 +3,10 @@ package services import ( "context" "fmt" + "slices" "sync" + + "go.uber.org/atomic" ) // StartingFn is called when service enters Starting state. If StartingFn returns @@ -325,26 +328,59 @@ func (b *BasicService) State() State { } // AddListener is part of Service interface. -func (b *BasicService) AddListener(listener Listener) { +func (b *BasicService) AddListener(listener Listener) func() { b.stateMu.Lock() defer b.stateMu.Unlock() if b.state == Terminated || b.state == Failed { // no more state transitions will be done, and channel wouldn't get closed - return + return func() {} } // There are max 4 state transitions. We use buffer to avoid blocking the sender, // which holds service lock. - ch := make(chan func(l Listener), 4) - b.listeners = append(b.listeners, ch) + listenerCh := make(chan func(l Listener), 4) + b.listeners = append(b.listeners, listenerCh) + + stop := make(chan struct{}) + stopClosed := atomic.NewBool(false) + + wg := sync.WaitGroup{} + wg.Add(1) // each listener has its own goroutine, processing events. go func() { - for lfn := range ch { - lfn(listener) + defer wg.Done() + for { + select { + // Process events from service. + case lfn, ok := <-listenerCh: + if !ok { + return + } + lfn(listener) + + case <-stop: + return + } } }() + + return func() { + if stopClosed.CompareAndSwap(false, true) { + // Tell listener goroutine to stop. + close(stop) + } + + // Remove channel for notifications from service's list of listeners. + b.stateMu.Lock() + b.listeners = slices.DeleteFunc(b.listeners, func(c chan func(l Listener)) bool { + return listenerCh == c + }) + b.stateMu.Unlock() + + wg.Wait() + } } // lock must be held here. Read lock would be good enough, but since diff --git a/vendor/github.com/grafana/dskit/services/failure_watcher.go b/vendor/github.com/grafana/dskit/services/failure_watcher.go index 657656f50d4..25c59d24b25 100644 --- a/vendor/github.com/grafana/dskit/services/failure_watcher.go +++ b/vendor/github.com/grafana/dskit/services/failure_watcher.go @@ -1,16 +1,22 @@ package services import ( + "sync" + "github.com/pkg/errors" ) var ( errFailureWatcherNotInitialized = errors.New("FailureWatcher has not been initialized") + errFailureWatcherClosed = errors.New("FailureWatcher has been stopped") ) // FailureWatcher waits for service failures, and passed them to the channel. type FailureWatcher struct { - ch chan error + mu sync.Mutex + ch chan error + closed bool + unregisterListeners []func() } func NewFailureWatcher() *FailureWatcher { @@ -35,9 +41,17 @@ func (w *FailureWatcher) WatchService(service Service) { panic(errFailureWatcherNotInitialized) } - service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + panic(errFailureWatcherClosed) + } + + stop := service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) { w.ch <- errors.Wrapf(failure, "service %s failed", DescribeService(service)) })) + w.unregisterListeners = append(w.unregisterListeners, stop) } func (w *FailureWatcher) WatchManager(manager *Manager) { @@ -47,7 +61,40 @@ func (w *FailureWatcher) WatchManager(manager *Manager) { panic(errFailureWatcherNotInitialized) } - manager.AddListener(NewManagerListener(nil, nil, func(service Service) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + panic(errFailureWatcherClosed) + } + + stop := manager.AddListener(NewManagerListener(nil, nil, func(service Service) { w.ch <- errors.Wrapf(service.FailureCase(), "service %s failed", DescribeService(service)) })) + w.unregisterListeners = append(w.unregisterListeners, stop) +} + +// Close stops this failure watcher and closes channel returned by Chan() method. After closing failure watcher, +// it cannot be used to watch additional services or managers. +// Repeated calls to Close() do nothing. +func (w *FailureWatcher) Close() { + // Graceful handle the case FailureWatcher has not been initialized, + // to simplify the code in the components using it. + if w == nil { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return + } + for _, stop := range w.unregisterListeners { + stop() + } + + // All listeners are now stopped, and can't receive more notifications. We can close the channel. + close(w.ch) + w.closed = true } diff --git a/vendor/github.com/grafana/dskit/services/manager.go b/vendor/github.com/grafana/dskit/services/manager.go index da5ebf7331b..7f02c0dc9c9 100644 --- a/vendor/github.com/grafana/dskit/services/manager.go +++ b/vendor/github.com/grafana/dskit/services/manager.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" + "slices" "sync" + + "go.uber.org/atomic" ) type managerState int @@ -31,6 +34,11 @@ type ManagerListener interface { // Manager can start them, and observe their state as a group. // Once all services are running, Manager is said to be Healthy. It is possible for manager to never reach the Healthy state, if some services fail to start. // When all services are stopped (Terminated or Failed), manager is Stopped. +// +// Note: Manager's state is defined by state of services. Services can be started outside of Manager and if all become Running, Manager will be Healthy as well. +// +// Note: Creating a manager immediately installs listeners to all services (to compute manager's state), which may start goroutines. +// To avoid leaking goroutines, make sure to eventually stop all services or the manager (which stops services), even if manager wasn't explicitly started. type Manager struct { services []Service @@ -226,25 +234,61 @@ func (m *Manager) serviceStateChanged(s Service, from State, to State) { // Specifically, a given listener will have its callbacks invoked in the same order as the underlying service enters those states. // Additionally, at most one of the listener's callbacks will execute at once. // However, multiple listeners' callbacks may execute concurrently, and listeners may execute in an order different from the one in which they were registered. -func (m *Manager) AddListener(listener ManagerListener) { +// +// Returned function can be used to stop the listener and free resources used by it (e.g. goroutine). +func (m *Manager) AddListener(listener ManagerListener) func() { m.mu.Lock() defer m.mu.Unlock() if m.state == stopped { // no need to register listener, as no more events will be sent - return + return func() {} } // max number of events is: failed notification for each service + healthy + stopped. // we use buffer to avoid blocking the sender, which holds the manager's lock. - ch := make(chan func(l ManagerListener), len(m.services)+2) - m.listeners = append(m.listeners, ch) + listenerCh := make(chan func(l ManagerListener), len(m.services)+2) + m.listeners = append(m.listeners, listenerCh) + + stop := make(chan struct{}) + stopClosed := atomic.NewBool(false) + + wg := sync.WaitGroup{} + wg.Add(1) + // each listener has its own goroutine, processing events. go func() { - for fn := range ch { - fn(listener) + defer wg.Done() + for { + select { + // Process events from service. + case fn, ok := <-listenerCh: + if !ok { + return + } + fn(listener) + + case <-stop: + return + } } }() + + return func() { + if stopClosed.CompareAndSwap(false, true) { + // Tell listener goroutine to stop. + close(stop) + } + + // Remove channel for notifications from manager's list of listeners. + m.mu.Lock() + m.listeners = slices.DeleteFunc(m.listeners, func(c chan func(listener ManagerListener)) bool { + return listenerCh == c + }) + m.mu.Unlock() + + wg.Wait() + } } // called with lock diff --git a/vendor/github.com/grafana/dskit/services/service.go b/vendor/github.com/grafana/dskit/services/service.go index e3de5a09cf6..a4c0e934877 100644 --- a/vendor/github.com/grafana/dskit/services/service.go +++ b/vendor/github.com/grafana/dskit/services/service.go @@ -91,7 +91,10 @@ type Service interface { // as the service enters those states. Additionally, at most one of the listener's callbacks will execute // at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute // in an order different from the one in which they were registered. - AddListener(listener Listener) + // + // Returned function can be used to stop the listener from receiving additional events from the service, + // and release resources used by the listener (e.g. goroutine, if it was started by adding listener). + AddListener(listener Listener) func() } // NamedService extends Service with a name. diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 8daad995c95..f32bce6f6bc 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -1,8 +1,13 @@ +// Provenance-includes-location: https://github.com/go-kit/log/blob/main/value.go +// Provenance-includes-license: MIT +// Provenance-includes-copyright: Go kit + package spanlogger import ( "context" "runtime" + "strconv" "strings" "go.uber.org/atomic" // Really just need sync/atomic but there is a lint rule preventing it. @@ -163,9 +168,6 @@ func (s *SpanLogger) getLogger() log.Logger { logger = log.With(logger, "trace_id", traceID) } - // Replace the default valuer for the 'caller' attribute with one that gets the caller of the methods in this file. - logger = log.With(logger, "caller", spanLoggerAwareCaller()) - // If the value has been set by another goroutine, fetch that other value and discard the one we made. if !s.logger.CompareAndSwap(nil, &logger) { pLogger := s.logger.Load() @@ -188,46 +190,64 @@ func (s *SpanLogger) SetSpanAndLogTag(key string, value interface{}) { s.logger.Store(&wrappedLogger) } -// spanLoggerAwareCaller is like log.Caller, but ensures that the caller information is -// that of the caller to SpanLogger, not SpanLogger itself. -func spanLoggerAwareCaller() log.Valuer { - valuer := atomic.NewPointer[log.Valuer](nil) - +// Caller is like github.com/go-kit/log's Caller, but ensures that the caller information is +// that of the caller to SpanLogger (if SpanLogger is being used), not SpanLogger itself. +// +// defaultStackDepth should be the number of stack frames to skip by default, as would be +// passed to github.com/go-kit/log's Caller method. +func Caller(defaultStackDepth int) log.Valuer { return func() interface{} { - // If we've already determined the correct stack depth, use it. - existingValuer := valuer.Load() - if existingValuer != nil { - return (*existingValuer)() - } - - // We haven't been called before, determine the correct stack depth to - // skip the configured logger's internals and the SpanLogger's internals too. - // - // Note that we can't do this in spanLoggerAwareCaller() directly because we - // need to do this when invoked by the configured logger - otherwise we cannot - // measure the stack depth of the logger's internals. - - stackDepth := 3 // log.DefaultCaller uses a stack depth of 3, so start searching for the correct stack depth there. + stackDepth := defaultStackDepth + 1 // +1 to account for this method. + seenSpanLogger := false + pc := make([]uintptr, 1) for { - _, file, _, ok := runtime.Caller(stackDepth) + function, file, line, ok := caller(stackDepth, pc) if !ok { // We've run out of possible stack frames. Give up. - valuer.Store(&unknownCaller) - return unknownCaller() + return "" } - if strings.HasSuffix(file, "spanlogger/spanlogger.go") { - stackValuer := log.Caller(stackDepth + 2) // Add one to skip the stack frame for the SpanLogger method, and another to skip the stack frame for the valuer which we'll invoke below. - valuer.Store(&stackValuer) - return stackValuer() + // If we're in a SpanLogger method, we need to continue searching. + // + // Matching on the exact function name like this does mean this will break if we rename or refactor SpanLogger, but + // the tests should catch this. In the worst case scenario, we'll log incorrect caller information, which isn't the + // end of the world. + if function == "github.com/grafana/dskit/spanlogger.(*SpanLogger).Log" || function == "github.com/grafana/dskit/spanlogger.(*SpanLogger).DebugLog" { + seenSpanLogger = true + stackDepth++ + continue } - stackDepth++ + // We need to check for go-kit/log stack frames like this because using log.With, log.WithPrefix or log.WithSuffix + // (including the various level methods like level.Debug, level.Info etc.) to wrap a SpanLogger introduce an + // additional context.Log stack frame that calls into the SpanLogger. This is because the use of SpanLogger + // as the logger means the optimisation to avoid creating a new logger in + // https://github.com/go-kit/log/blob/c7bf81493e581feca11e11a7672b14be3591ca43/log.go#L141-L146 used by those methods + // can't be used, and so the SpanLogger is wrapped in a new logger. + if seenSpanLogger && function == "github.com/go-kit/log.(*context).Log" { + stackDepth++ + continue + } + + return formatCallerInfoForLog(file, line) } } } -var unknownCaller log.Valuer = func() interface{} { - return "" +// caller is like runtime.Caller, but modified to allow reuse of the uintptr slice and return the function name. +func caller(stackDepth int, pc []uintptr) (function string, file string, line int, ok bool) { + n := runtime.Callers(stackDepth+1, pc) + if n < 1 { + return "", "", 0, false + } + + frame, _ := runtime.CallersFrames(pc).Next() + return frame.Function, frame.File, frame.Line, frame.PC != 0 +} + +// This is based on github.com/go-kit/log's Caller, but modified for use by Caller above. +func formatCallerInfoForLog(file string, line int) string { + idx := strings.LastIndexByte(file, '/') + return file[idx+1:] + ":" + strconv.Itoa(line) } diff --git a/vendor/google.golang.org/grpc/experimental/experimental.go b/vendor/google.golang.org/grpc/experimental/experimental.go deleted file mode 100644 index de7f13a2210..00000000000 --- a/vendor/google.golang.org/grpc/experimental/experimental.go +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright 2023 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package experimental is a collection of experimental features that might -// have some rough edges to them. Housing experimental features in this package -// results in a user accessing these APIs as `experimental.Foo`, thereby making -// it explicit that the feature is experimental and using them in production -// code is at their own risk. -// -// All APIs in this package are experimental. -package experimental - -import ( - "google.golang.org/grpc" - "google.golang.org/grpc/internal" -) - -// WithRecvBufferPool returns a grpc.DialOption that configures the use of -// bufferPool for parsing incoming messages on a grpc.ClientConn. Depending on -// the application's workload, this could result in reduced memory allocation. -// -// If you are unsure about how to implement a memory pool but want to utilize -// one, begin with grpc.NewSharedBufferPool. -// -// Note: The shared buffer pool feature will not be active if any of the -// following options are used: WithStatsHandler, EnableTracing, or binary -// logging. In such cases, the shared buffer pool will be ignored. -// -// Note: It is not recommended to use the shared buffer pool when compression is -// enabled. -func WithRecvBufferPool(bufferPool grpc.SharedBufferPool) grpc.DialOption { - return internal.WithRecvBufferPool.(func(grpc.SharedBufferPool) grpc.DialOption)(bufferPool) -} - -// RecvBufferPool returns a grpc.ServerOption that configures the server to use -// the provided shared buffer pool for parsing incoming messages. Depending on -// the application's workload, this could result in reduced memory allocation. -// -// If you are unsure about how to implement a memory pool but want to utilize -// one, begin with grpc.NewSharedBufferPool. -// -// Note: The shared buffer pool feature will not be active if any of the -// following options are used: StatsHandler, EnableTracing, or binary logging. -// In such cases, the shared buffer pool will be ignored. -// -// Note: It is not recommended to use the shared buffer pool when compression is -// enabled. -func RecvBufferPool(bufferPool grpc.SharedBufferPool) grpc.ServerOption { - return internal.RecvBufferPool.(func(grpc.SharedBufferPool) grpc.ServerOption)(bufferPool) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 853fe88c869..37ed12a2023 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -483,7 +483,7 @@ github.com/gorilla/handlers # github.com/gorilla/mux v1.8.1 ## explicit; go 1.20 github.com/gorilla/mux -# github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382 +# github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/cancellation @@ -499,7 +499,6 @@ github.com/grafana/dskit/grpcutil github.com/grafana/dskit/httpgrpc github.com/grafana/dskit/httpgrpc/server github.com/grafana/dskit/instrument -github.com/grafana/dskit/internal/math github.com/grafana/dskit/internal/slices github.com/grafana/dskit/kv github.com/grafana/dskit/kv/codec @@ -1880,7 +1879,6 @@ google.golang.org/grpc/credentials/oauth google.golang.org/grpc/encoding google.golang.org/grpc/encoding/gzip google.golang.org/grpc/encoding/proto -google.golang.org/grpc/experimental google.golang.org/grpc/grpclog google.golang.org/grpc/health google.golang.org/grpc/health/grpc_health_v1