From 2a4fe84f5a59e9fc26d74de0a5a4285817a044ea Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 29 Jun 2024 16:05:53 +0330 Subject: [PATCH] fix: fix stream update and create --- internal/config/default.go | 6 ++- internal/natsclient/config.go | 3 +- internal/natsclient/jetstream.go | 87 ++++++++++++++++++++++---------- internal/natsclient/metric.go | 22 ++++---- 4 files changed, 77 insertions(+), 41 deletions(-) diff --git a/internal/config/default.go b/internal/config/default.go index 0c521da..cd1ae32 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -17,16 +17,18 @@ func Default() Config { }, NATS: natsclient.Config{ NewStreamAllow: true, - Stream: natsclient.Stream{ - Name: "stream", + Streams: []natsclient.Stream{{ + Name: "test", Subject: "test", }, + }, URL: "localhost:4222", PublishInterval: 2 * time.Second, RequestTimeout: 50 * time.Millisecond, MaxPubAcksInflight: 1000, QueueSubscriptionGroup: "group", FlushTimeout: 2 * time.Second, + ClientName: "localhost", }, Metric: metric.Config{ Server: metric.Server{Address: ":8080"}, diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index db78074..9e69f84 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -4,13 +4,14 @@ import "time" type Config struct { NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` - Stream Stream `json:"stream,omitempty" koanf:"stream"` + Streams []Stream `json:"stream,omitempty" koanf:"stream"` URL string `json:"url,omitempty" koanf:"url"` PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"` QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` + ClientName string `json:"client_name" koanf:"client_name"` } type Stream struct { diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 3ca25a9..5ee88c7 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -1,12 +1,20 @@ package natsclient import ( + "slices" "time" "github.com/nats-io/nats.go" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) +var ( + successfulSubscribe = "successful subscribe" + failedPublish = "failed publish" + successfulPublish = "successful publish" +) + type Message struct { Subject string Data []byte @@ -26,7 +34,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream { j := &Jetstream{ config: &config, logger: logger, - metrics: NewMetrics(), + metrics: NewMetrics(config.ClientName), } j.connect() @@ -65,32 +73,48 @@ func (j *Jetstream) createJetstreamContext() { } func (j *Jetstream) createStream() { - _, err := j.jetstream.StreamInfo(j.config.Stream.Name) - if err == nil { - _, err = j.jetstream.UpdateStream(&nats.StreamConfig{ - Name: j.config.Stream.Name, - Subjects: []string{j.config.Stream.Subject}, - }) - if err != nil { - j.logger.Panic("could not add subject to existing stream", zap.Error(err)) - } - } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { - _, err = j.jetstream.AddStream(&nats.StreamConfig{ - Name: j.config.Stream.Name, - Subjects: []string{j.config.Stream.Subject}, - }) - if err != nil { - j.logger.Panic("could not add stream", zap.Error(err)) + for _, stream := range j.config.Streams { + info, err := j.jetstream.StreamInfo(stream.Name) + if err == nil { + j.updateStream(stream, info) + } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { + j.addStream(stream) + } else { + j.logger.Panic("could not add subject", zap.Error(err)) } - } else { - j.logger.Panic("could not add subject", zap.Error(err)) } } +func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) { + subjects := append(info.Config.Subjects, stream.Subject) + slices.Sort(subjects) + subjects = slices.Compact(subjects) + _, err := j.jetstream.UpdateStream(&nats.StreamConfig{ + Name: stream.Name, + Subjects: subjects, + }) + if err != nil { + j.logger.Panic("could not add subject to existing stream", zap.Error(err)) + } + j.logger.Info("stream updated") +} + +func (j *Jetstream) addStream(stream Stream) { + _, err := j.jetstream.AddStream(&nats.StreamConfig{ + Name: stream.Name, + Subjects: []string{stream.Subject}, + }) + if err != nil { + j.logger.Panic("could not add stream", zap.Error(err)) + } + j.logger.Info("add new stream") +} func (j *Jetstream) StartBlackboxTest() { - messageChannel := j.createSubscribe(j.config.Stream.Subject) - go j.jetstreamPublish(j.config.Stream.Subject) - go j.jetstreamSubscribe(messageChannel) + for _, stream := range j.config.Streams { + messageChannel := j.createSubscribe(stream.Subject) + go j.jetstreamPublish(stream.Subject, stream.Name) + go j.jetstreamSubscribe(messageChannel, stream.Name) + } } // Subscribe subscribes to a list of subjects and returns a channel with incoming messages @@ -115,7 +139,7 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message { } -func (j *Jetstream) jetstreamSubscribe(h chan *Message) { +func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) { for msg := range h { var publishTime time.Time err := publishTime.UnmarshalBinary(msg.Data) @@ -126,12 +150,15 @@ func (j *Jetstream) jetstreamSubscribe(h chan *Message) { } latency := time.Since(publishTime).Seconds() j.metrics.Latency.Observe(latency) - j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1) + j.metrics.SuccessCounter.With(prometheus.Labels{ + "type": successfulSubscribe, + "stream": streamName, + }).Add(1) j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency)) } } -func (j *Jetstream) jetstreamPublish(subject string) { +func (j *Jetstream) jetstreamPublish(subject string, streamName string) { for { t, err := time.Now().MarshalBinary() if err != nil { @@ -139,7 +166,10 @@ func (j *Jetstream) jetstreamPublish(subject string) { } if ack, err := j.jetstream.Publish(subject, t); err != nil { - j.metrics.SuccessCounter.WithLabelValues("failed publish").Add(1) + j.metrics.SuccessCounter.With(prometheus.Labels{ + "type": failedPublish, + "stream": streamName, + }).Add(1) if err == nats.ErrTimeout { j.logger.Error("Request timeout: No response received within the timeout period.") } else if err == nats.ErrNoStreamResponse { @@ -148,7 +178,10 @@ func (j *Jetstream) jetstreamPublish(subject string) { j.logger.Error("Request failed: %v", zap.Error(err)) } } else { - j.metrics.SuccessCounter.WithLabelValues("successful publish").Add(1) + j.metrics.SuccessCounter.With(prometheus.Labels{ + "type": successfulPublish, + "stream": streamName, + }).Add(1) j.logger.Info("receive ack", zap.String("stream", ack.Stream)) } diff --git a/internal/natsclient/metric.go b/internal/natsclient/metric.go index 97467cf..853dffe 100644 --- a/internal/natsclient/metric.go +++ b/internal/natsclient/metric.go @@ -8,19 +8,19 @@ import ( const ( Namespace = "nats_blackbox_exporter" - Subsystem = "client" ) var latencyBuckets = []float64{ - 0.0001, - 0.0005, - 0.0007, 0.001, + 0.0013, + 0.0015, + 0.0017, 0.002, + 0.0023, + 0.0025, + 0.0027, 0.003, 0.004, - 0.005, - 0.006, } // Metrics has all the client metrics. @@ -68,11 +68,11 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom return *ev } -func NewMetrics() Metrics { +func NewMetrics(clinetName string) Metrics { return Metrics{ Connection: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: clinetName, Name: "connection_errors_total", Help: "total number of disconnections and reconnections", ConstLabels: nil, @@ -80,7 +80,7 @@ func NewMetrics() Metrics { // nolint: exhaustruct Latency: newHistogram(prometheus.HistogramOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: clinetName, Name: "latency", Help: "from publish to consume duration in seconds", ConstLabels: nil, @@ -88,10 +88,10 @@ func NewMetrics() Metrics { }), SuccessCounter: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: clinetName, Name: "success_counter", Help: "publish and consume success rate", ConstLabels: nil, - }, []string{"type"}), + }, []string{"type", "stream"}), } }