From 4ed8a0ebb0483f41d8a9509a92b6f67343d164ce Mon Sep 17 00:00:00 2001 From: kianaza Date: Sun, 14 Jul 2024 18:27:40 +0330 Subject: [PATCH 1/5] update: update jetstream to new client --- go.mod | 3 +- go.sum | 2 + internal/cmd/main.go | 6 +- internal/config/default.go | 2 +- internal/natsclient/config.go | 22 ++--- internal/natsclient/jetstream.go | 151 +++++++++++++++---------------- 6 files changed, 94 insertions(+), 92 deletions(-) diff --git a/go.mod b/go.mod index 9d15ee3..71dd3e8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/nats-io/nats.go v1.31.0 github.com/prometheus/client_golang v1.11.1 github.com/spf13/cobra v1.8.0 + github.com/spf13/pflag v1.0.5 github.com/tidwall/pretty v1.2.1 go.uber.org/zap v1.17.0 ) @@ -29,11 +30,11 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/protobuf v1.26.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7293e7b..e61ca92 100644 --- a/go.sum +++ b/go.sum @@ -352,6 +352,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/cmd/main.go b/internal/cmd/main.go index 1f53937..bb6a890 100644 --- a/internal/cmd/main.go +++ b/internal/cmd/main.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "os" "os/signal" "syscall" @@ -13,7 +14,10 @@ import ( func main(cfg config.Config, logger *zap.Logger) { natsConfig := cfg.NATS - jetstreamClient := natsclient.NewJetstream(natsConfig, logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + jetstreamClient := natsclient.NewJetstream(natsConfig, logger, &ctx) jetstreamClient.StartBlackboxTest() sig := make(chan os.Signal, 1) diff --git a/internal/config/default.go b/internal/config/default.go index 267bebc..58b6b49 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -18,7 +18,7 @@ func Default() Config { NATS: natsclient.Config{ AllExistingStreams: false, NewStreamAllow: true, - Streams: []natsclient.Stream{{ + StreamsConfig: []natsclient.StreamConfig{{ Name: "test", Subject: "test", }, diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index fc4f6ed..9da973e 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -3,19 +3,19 @@ package natsclient import "time" type Config struct { - AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"` - NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` - Streams []Stream `json:"streams,omitempty" koanf:"streams"` - URL string `json:"url,omitempty" koanf:"url"` - PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` - RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` - MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"` - QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` - FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` - ClientName string `json:"client_name" koanf:"client_name"` + AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"` + NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` + StreamsConfig []StreamConfig `json:"streams,omitempty" koanf:"streams"` + URL string `json:"url,omitempty" koanf:"url"` + PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` + RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` + MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"` + QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` + FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` + ClientName string `json:"client_name" koanf:"client_name"` } -type Stream struct { +type StreamConfig struct { Name string `json:"name,omitempty" koanf:"name"` Subject string `json:"subject,omitempty" koanf:"subject"` } diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 994cf4a..c23da07 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -1,10 +1,13 @@ package natsclient import ( - "slices" + "context" + "fmt" + "log" "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -21,21 +24,30 @@ type Message struct { Data []byte } +type CreatedStream struct { + name string + subject string + streamClient jetstream.Stream +} + // Jetstream represents the NATS core handler -type Jetstream struct { - connection *nats.Conn - jetstream nats.JetStreamContext - config *Config - logger *zap.Logger - metrics Metrics +type JetstreamClient struct { + connection *nats.Conn + jetstream jetstream.JetStream + config *Config + logger *zap.Logger + metrics Metrics + ctx context.Context + createdStreams []CreatedStream } // NewJetstream initializes NATS JetStream connection -func NewJetstream(config Config, logger *zap.Logger) *Jetstream { - j := &Jetstream{ +func NewJetstream(config Config, logger *zap.Logger, ctx *context.Context) *JetstreamClient { + j := &JetstreamClient{ config: &config, logger: logger, metrics: NewMetrics(), + ctx: *ctx, } j.connect() @@ -47,7 +59,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream { return j } -func (j *Jetstream) connect() { +func (j *JetstreamClient) connect() { var err error j.connection, err = nats.Connect(j.config.URL) if err != nil { @@ -65,99 +77,82 @@ func (j *Jetstream) connect() { }) } -func (j *Jetstream) createJetstreamContext() { +func (j *JetstreamClient) createJetstreamContext() { var err error - j.jetstream, err = j.connection.JetStream() + j.jetstream, err = jetstream.New(j.connection) if err != nil { j.logger.Panic("could not connect to jetstream", zap.Error(err)) } } -func (j *Jetstream) UpdateOrCreateStream() { - if j.config.AllExistingStreams { - streamNames := j.jetstream.StreamNames() - for stream := range streamNames { - j.config.Streams = append(j.config.Streams, Stream{Name: stream}) - } - } - for i, stream := range j.config.Streams { - if stream.Subject == "" { - j.config.Streams[i].Subject = stream.Name + subjectSuffix +func (j *JetstreamClient) UpdateOrCreateStream() { + for _, stream := range j.config.StreamsConfig { + name := stream.Name + subject := stream.Subject + if subject == "" { + subject = stream.Name + subjectSuffix } - info, err := j.jetstream.StreamInfo(stream.Name) + str, err := j.jetstream.CreateOrUpdateStream(j.ctx, jetstream.StreamConfig{ + Name: name, + Subjects: []string{subject}, + }) if err == nil { - j.updateStream(j.config.Streams[i], info) - } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { - j.createStream(j.config.Streams[i]) + j.createdStreams = append(j.createdStreams, CreatedStream{ + name: name, + subject: subject, + streamClient: str, + }) + j.logger.Info("create or update stream", zap.String("stream", stream.Name)) } else { j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err)) } } } -func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) { - subjects := append(info.Config.Subjects, stream.Subject) - slices.Sort(subjects) - subjects = slices.Compact(subjects) - _, err := j.jetstream.UpdateStream(&nats.StreamConfig{ - Name: stream.Name, - Subjects: subjects, - }) - if err != nil { - j.logger.Error("could not add subject to existing stream", zap.String("stream", stream.Name), zap.Error(err)) - } - j.logger.Info("stream updated") -} -func (j *Jetstream) createStream(stream Stream) { - _, err := j.jetstream.AddStream(&nats.StreamConfig{ - Name: stream.Name, - Subjects: []string{stream.Subject}, - }) - if err != nil { - j.logger.Error("could not add stream", zap.String("stream", stream.Name), zap.Error(err)) - } - j.logger.Info("add new stream") -} - -func (j *Jetstream) StartBlackboxTest() { - if j.config.Streams == nil { +func (j *JetstreamClient) StartBlackboxTest() { + if j.createdStreams == nil { j.logger.Panic("at least one stream is required.") } - for _, stream := range j.config.Streams { - messageChannel := j.createSubscribe(stream.Subject) - go j.jetstreamPublish(stream.Subject, stream.Name) - go j.jetstreamSubscribe(messageChannel, stream.Name) + for _, stream := range j.createdStreams { + consumer := j.createConsumer(stream.streamClient, stream.subject) + go j.jetstreamPublish(stream.subject, stream.name) + go j.jetstreamSubscribe(consumer, stream.name) } } // Subscribe subscribes to a list of subjects and returns a channel with incoming messages -func (j *Jetstream) createSubscribe(subject string) chan *Message { +func (j *JetstreamClient) createConsumer(str jetstream.Stream, subject string) jetstream.Consumer { - messageHandler, h := j.messageHandlerFactoryJetstream() - _, err := j.jetstream.Subscribe( - subject, - messageHandler, - nats.DeliverNew(), - nats.ReplayInstant(), - nats.AckExplicit(), - nats.MaxAckPending(j.config.MaxPubAcksInflight), - ) + c, err := str.CreateOrUpdateConsumer(j.ctx, jetstream.ConsumerConfig{ + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: subject, + DeliverPolicy: jetstream.DeliverNewPolicy, + }) if err != nil { j.logger.Panic("could not Subscribe", zap.Error(err)) } else { - j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject)) + j.logger.Info("Subscribed to %s successfully", zap.String("stream", str.CachedInfo().Config.Name)) } - return h - + return c } -func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) { +func (j *JetstreamClient) jetstreamSubscribe(c jetstream.Consumer, streamName string) { + messageHandler, h := j.messageHandlerFactoryJetstream() + + cc, err := c.Consume(messageHandler, jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) { + fmt.Println(err) + })) + if err != nil { + log.Fatal(err) + } + defer cc.Stop() + clusterName := j.connection.ConnectedClusterName() for msg := range h { var publishTime time.Time - err := publishTime.UnmarshalBinary(msg.Data) + err = publishTime.UnmarshalBinary(msg.Data) if err != nil { j.logger.Error("unable to unmarshal binary data for publishTime.") j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject)) @@ -177,14 +172,14 @@ func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) { } } -func (j *Jetstream) jetstreamPublish(subject string, streamName string) { +func (j *JetstreamClient) jetstreamPublish(subject string, streamName string) { clusterName := j.connection.ConnectedClusterName() for { t, err := time.Now().MarshalBinary() if err != nil { j.logger.Error("could not marshal current time.", zap.Error(err)) } - if ack, err := j.jetstream.Publish(subject, t); err != nil { + if ack, err := j.jetstream.Publish(j.ctx, subject, t); err != nil { j.metrics.SuccessCounter.With(prometheus.Labels{ "type": failedPublish, "stream": streamName, @@ -210,12 +205,12 @@ func (j *Jetstream) jetstreamPublish(subject string, streamName string) { } } -func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { +func (j *JetstreamClient) messageHandlerFactoryJetstream() (jetstream.MessageHandler, chan *Message) { ch := make(chan *Message) - return func(msg *nats.Msg) { + return func(msg jetstream.Msg) { ch <- &Message{ - Subject: msg.Subject, - Data: msg.Data, + Subject: msg.Subject(), + Data: msg.Data(), } err := msg.Ack() if err != nil { @@ -225,7 +220,7 @@ func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Mes } // Close closes NATS connection -func (j *Jetstream) Close() { +func (j *JetstreamClient) Close() { if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { j.logger.Error("could not flush", zap.Error(err)) } From 52dbd5c1fb216e7ab1690f167b4911c6d3b1c15f Mon Sep 17 00:00:00 2001 From: kianaza Date: Tue, 16 Jul 2024 14:00:41 +0330 Subject: [PATCH 2/5] fix: debug creating and updating stream --- internal/natsclient/jetstream.go | 56 ++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index c23da07..cf4cd55 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "slices" "time" "github.com/nats-io/nats.go" @@ -93,21 +94,56 @@ func (j *JetstreamClient) UpdateOrCreateStream() { subject = stream.Name + subjectSuffix } - str, err := j.jetstream.CreateOrUpdateStream(j.ctx, jetstream.StreamConfig{ - Name: name, - Subjects: []string{subject}, - }) + var str jetstream.Stream + s, err := j.jetstream.Stream(j.ctx, name) if err == nil { - j.createdStreams = append(j.createdStreams, CreatedStream{ - name: name, - subject: subject, - streamClient: str, - }) - j.logger.Info("create or update stream", zap.String("stream", stream.Name)) + info, _ := s.Info(j.ctx) + str, err = j.updateStream(name, subject, info.Config) + } else if err == nats.ErrStreamNotFound { + str, err = j.createStream(name, subject) } else { j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err)) + continue + } + + if err != nil { + j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err)) + continue } + + j.createdStreams = append(j.createdStreams, CreatedStream{ + name: name, + subject: subject, + streamClient: str, + }) + j.logger.Info("create or update stream", zap.String("stream", stream.Name)) + } +} + +func (j *JetstreamClient) updateStream(name string, subject string, config jetstream.StreamConfig) (jetstream.Stream, error) { + subjects := append(config.Subjects, subject) + slices.Sort(subjects) + config.Subjects = slices.Compact(subjects) + str, err := j.jetstream.UpdateStream(j.ctx, config) + if err != nil { + j.logger.Error("could not add subject to existing stream", zap.String("stream", name), zap.Error(err)) + return nil, err + } + j.logger.Info("stream updated") + return str, nil +} + +func (j *JetstreamClient) createStream(name string, subject string) (jetstream.Stream, error) { + str, err := j.jetstream.CreateStream(j.ctx, jetstream.StreamConfig{ + Name: name, + Subjects: []string{subject}, + }) + if err != nil { + j.logger.Error("could not add stream", zap.String("stream", name), zap.Error(err)) + return nil, err } + j.logger.Info("add new stream") + return str, nil } func (j *JetstreamClient) StartBlackboxTest() { From e855ee0f08fe73c50780415d3a35ff8f0b0633e3 Mon Sep 17 00:00:00 2001 From: kianaza Date: Tue, 16 Jul 2024 14:06:29 +0330 Subject: [PATCH 3/5] fix: delete unused configs --- config.example.yaml | 1 - internal/config/default.go | 3 --- internal/natsclient/config.go | 3 --- 3 files changed, 7 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index 9609e88..f381f38 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -2,7 +2,6 @@ logger: level: "debug" nats: - new_stream_allow: true streams: name: "stream" subject: "test" # optional diff --git a/internal/config/default.go b/internal/config/default.go index 58b6b49..1e37c05 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -16,8 +16,6 @@ func Default() Config { Level: "debug", }, NATS: natsclient.Config{ - AllExistingStreams: false, - NewStreamAllow: true, StreamsConfig: []natsclient.StreamConfig{{ Name: "test", Subject: "test", @@ -29,7 +27,6 @@ func Default() Config { MaxPubAcksInflight: 1000, QueueSubscriptionGroup: "group", FlushTimeout: 2 * time.Second, - ClientName: "localhost", }, Metric: metric.Config{ Server: metric.Server{Address: ":8080"}, diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index 9da973e..b10880a 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -3,8 +3,6 @@ package natsclient import "time" type Config struct { - AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"` - NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` StreamsConfig []StreamConfig `json:"streams,omitempty" koanf:"streams"` URL string `json:"url,omitempty" koanf:"url"` PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` @@ -12,7 +10,6 @@ type Config struct { MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"` QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` - ClientName string `json:"client_name" koanf:"client_name"` } type StreamConfig struct { From d27d577d7c849af1b339975655f170ca539e565c Mon Sep 17 00:00:00 2001 From: kianaza Date: Tue, 16 Jul 2024 14:17:42 +0330 Subject: [PATCH 4/5] fix: error handling --- internal/natsclient/jetstream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index cf4cd55..d71f912 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -99,7 +99,7 @@ func (j *JetstreamClient) UpdateOrCreateStream() { if err == nil { info, _ := s.Info(j.ctx) str, err = j.updateStream(name, subject, info.Config) - } else if err == nats.ErrStreamNotFound { + } else if err == jetstream.ErrStreamNotFound { str, err = j.createStream(name, subject) } else { j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err)) From 2b39ef815716668f91aa4ee0ae7470365d20605b Mon Sep 17 00:00:00 2001 From: kianaza Date: Tue, 16 Jul 2024 14:39:00 +0330 Subject: [PATCH 5/5] fix: logging and error handling --- internal/natsclient/jetstream.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index d71f912..092d149 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -2,8 +2,6 @@ package natsclient import ( "context" - "fmt" - "log" "slices" "time" @@ -177,11 +175,9 @@ func (j *JetstreamClient) createConsumer(str jetstream.Stream, subject string) j func (j *JetstreamClient) jetstreamSubscribe(c jetstream.Consumer, streamName string) { messageHandler, h := j.messageHandlerFactoryJetstream() - cc, err := c.Consume(messageHandler, jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) { - fmt.Println(err) - })) + cc, err := c.Consume(messageHandler) if err != nil { - log.Fatal(err) + j.logger.Fatal("unable to consume messages", zap.Error(err)) } defer cc.Stop()