Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Legacy JetStream API and Update updateStream Function #10

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ logger:
level: "debug"

nats:
new_stream_allow: true
streams:
name: "stream"
subject: "test" # optional
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/nats-io/nats.go v1.31.0
github.com/prometheus/client_golang v1.11.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/tidwall/pretty v1.2.1
go.uber.org/zap v1.17.0
)
Expand All @@ -29,11 +30,11 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
6 changes: 5 additions & 1 deletion internal/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"os"
"os/signal"
"syscall"
Expand All @@ -13,7 +14,10 @@
func main(cfg config.Config, logger *zap.Logger) {
natsConfig := cfg.NATS

jetstreamClient := natsclient.NewJetstream(natsConfig, logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Check warning on line 18 in internal/cmd/main.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/main.go#L17-L18

Added lines #L17 - L18 were not covered by tests

jetstreamClient := natsclient.NewJetstream(natsConfig, logger, &ctx)

Check warning on line 20 in internal/cmd/main.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/main.go#L20

Added line #L20 was not covered by tests
jetstreamClient.StartBlackboxTest()

sig := make(chan os.Signal, 1)
Expand Down
5 changes: 1 addition & 4 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
Level: "debug",
},
NATS: natsclient.Config{
AllExistingStreams: false,
NewStreamAllow: true,
Streams: []natsclient.Stream{{
StreamsConfig: []natsclient.StreamConfig{{

Check warning on line 19 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L19

Added line #L19 was not covered by tests
Name: "test",
Subject: "test",
},
Expand All @@ -29,7 +27,6 @@
MaxPubAcksInflight: 1000,
QueueSubscriptionGroup: "group",
FlushTimeout: 2 * time.Second,
ClientName: "localhost",
},
Metric: metric.Config{
Server: metric.Server{Address: ":8080"},
Expand Down
19 changes: 8 additions & 11 deletions internal/natsclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package natsclient
import "time"

type Config struct {
AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"`
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Streams []Stream `json:"streams,omitempty" koanf:"streams"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
StreamsConfig []StreamConfig `json:"streams,omitempty" koanf:"streams"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
}

type Stream struct {
type StreamConfig struct {
Name string `json:"name,omitempty" koanf:"name"`
Subject string `json:"subject,omitempty" koanf:"subject"`
}
159 changes: 93 additions & 66 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package natsclient

import (
"context"
"slices"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -21,21 +23,30 @@
Data []byte
}

type CreatedStream struct {
name string
subject string
streamClient jetstream.Stream
}

// Jetstream represents the NATS core handler
type Jetstream struct {
connection *nats.Conn
jetstream nats.JetStreamContext
config *Config
logger *zap.Logger
metrics Metrics
type JetstreamClient struct {
connection *nats.Conn
jetstream jetstream.JetStream
config *Config
logger *zap.Logger
metrics Metrics
ctx context.Context
createdStreams []CreatedStream
}

// NewJetstream initializes NATS JetStream connection
func NewJetstream(config Config, logger *zap.Logger) *Jetstream {
j := &Jetstream{
func NewJetstream(config Config, logger *zap.Logger, ctx *context.Context) *JetstreamClient {
j := &JetstreamClient{

Check warning on line 45 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L44-L45

Added lines #L44 - L45 were not covered by tests
config: &config,
logger: logger,
metrics: NewMetrics(),
ctx: *ctx,

Check warning on line 49 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L49

Added line #L49 was not covered by tests
}

j.connect()
Expand All @@ -47,7 +58,7 @@
return j
}

func (j *Jetstream) connect() {
func (j *JetstreamClient) connect() {

Check warning on line 61 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L61

Added line #L61 was not covered by tests
var err error
j.connection, err = nats.Connect(j.config.URL)
if err != nil {
Expand All @@ -65,99 +76,115 @@
})
}

func (j *Jetstream) createJetstreamContext() {
func (j *JetstreamClient) createJetstreamContext() {

Check warning on line 79 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L79

Added line #L79 was not covered by tests
var err error
j.jetstream, err = j.connection.JetStream()
j.jetstream, err = jetstream.New(j.connection)

Check warning on line 81 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L81

Added line #L81 was not covered by tests
if err != nil {
j.logger.Panic("could not connect to jetstream", zap.Error(err))
}
}

func (j *Jetstream) UpdateOrCreateStream() {
if j.config.AllExistingStreams {
streamNames := j.jetstream.StreamNames()
for stream := range streamNames {
j.config.Streams = append(j.config.Streams, Stream{Name: stream})
}
}
for i, stream := range j.config.Streams {
if stream.Subject == "" {
j.config.Streams[i].Subject = stream.Name + subjectSuffix
func (j *JetstreamClient) UpdateOrCreateStream() {
for _, stream := range j.config.StreamsConfig {
name := stream.Name
subject := stream.Subject
if subject == "" {
subject = stream.Name + subjectSuffix

Check warning on line 92 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L87-L92

Added lines #L87 - L92 were not covered by tests
}

info, err := j.jetstream.StreamInfo(stream.Name)
var str jetstream.Stream
s, err := j.jetstream.Stream(j.ctx, name)

Check warning on line 96 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L95-L96

Added lines #L95 - L96 were not covered by tests
if err == nil {
j.updateStream(j.config.Streams[i], info)
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
j.createStream(j.config.Streams[i])
info, _ := s.Info(j.ctx)
str, err = j.updateStream(name, subject, info.Config)
} else if err == jetstream.ErrStreamNotFound {
str, err = j.createStream(name, subject)

Check warning on line 101 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L98-L101

Added lines #L98 - L101 were not covered by tests
} else {
j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err))
continue

Check warning on line 104 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L104

Added line #L104 was not covered by tests
}

if err != nil {
j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err))
continue

Check warning on line 109 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L107-L109

Added lines #L107 - L109 were not covered by tests
}

j.createdStreams = append(j.createdStreams, CreatedStream{
name: name,
subject: subject,
streamClient: str,
})
j.logger.Info("create or update stream", zap.String("stream", stream.Name))

Check warning on line 117 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L112-L117

Added lines #L112 - L117 were not covered by tests
}
}
func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) {
subjects := append(info.Config.Subjects, stream.Subject)

func (j *JetstreamClient) updateStream(name string, subject string, config jetstream.StreamConfig) (jetstream.Stream, error) {
subjects := append(config.Subjects, subject)

Check warning on line 122 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L121-L122

Added lines #L121 - L122 were not covered by tests
slices.Sort(subjects)
subjects = slices.Compact(subjects)
_, err := j.jetstream.UpdateStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: subjects,
})
config.Subjects = slices.Compact(subjects)
str, err := j.jetstream.UpdateStream(j.ctx, config)

Check warning on line 125 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L124-L125

Added lines #L124 - L125 were not covered by tests
if err != nil {
j.logger.Error("could not add subject to existing stream", zap.String("stream", stream.Name), zap.Error(err))
j.logger.Error("could not add subject to existing stream", zap.String("stream", name), zap.Error(err))
return nil, err

Check warning on line 128 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L127-L128

Added lines #L127 - L128 were not covered by tests
}
j.logger.Info("stream updated")
return str, nil

Check warning on line 131 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L131

Added line #L131 was not covered by tests
}

func (j *Jetstream) createStream(stream Stream) {
_, err := j.jetstream.AddStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: []string{stream.Subject},
func (j *JetstreamClient) createStream(name string, subject string) (jetstream.Stream, error) {
str, err := j.jetstream.CreateStream(j.ctx, jetstream.StreamConfig{
Name: name,
Subjects: []string{subject},

Check warning on line 137 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L134-L137

Added lines #L134 - L137 were not covered by tests
})
if err != nil {
j.logger.Error("could not add stream", zap.String("stream", stream.Name), zap.Error(err))
j.logger.Error("could not add stream", zap.String("stream", name), zap.Error(err))
return nil, err

Check warning on line 141 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L140-L141

Added lines #L140 - L141 were not covered by tests
}
j.logger.Info("add new stream")
return str, nil

Check warning on line 144 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L144

Added line #L144 was not covered by tests
}

func (j *Jetstream) StartBlackboxTest() {
if j.config.Streams == nil {
func (j *JetstreamClient) StartBlackboxTest() {
if j.createdStreams == nil {

Check warning on line 148 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L147-L148

Added lines #L147 - L148 were not covered by tests
j.logger.Panic("at least one stream is required.")
}
for _, stream := range j.config.Streams {
messageChannel := j.createSubscribe(stream.Subject)
go j.jetstreamPublish(stream.Subject, stream.Name)
go j.jetstreamSubscribe(messageChannel, stream.Name)
for _, stream := range j.createdStreams {
consumer := j.createConsumer(stream.streamClient, stream.subject)
go j.jetstreamPublish(stream.subject, stream.name)
go j.jetstreamSubscribe(consumer, stream.name)

Check warning on line 154 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L151-L154

Added lines #L151 - L154 were not covered by tests
}
}

// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
func (j *Jetstream) createSubscribe(subject string) chan *Message {
func (j *JetstreamClient) createConsumer(str jetstream.Stream, subject string) jetstream.Consumer {

Check warning on line 159 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L159

Added line #L159 was not covered by tests

messageHandler, h := j.messageHandlerFactoryJetstream()
_, err := j.jetstream.Subscribe(
subject,
messageHandler,
nats.DeliverNew(),
nats.ReplayInstant(),
nats.AckExplicit(),
nats.MaxAckPending(j.config.MaxPubAcksInflight),
)
c, err := str.CreateOrUpdateConsumer(j.ctx, jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: subject,
DeliverPolicy: jetstream.DeliverNewPolicy,
})

Check warning on line 165 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L161-L165

Added lines #L161 - L165 were not covered by tests
if err != nil {
j.logger.Panic("could not Subscribe", zap.Error(err))
} else {
j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject))
j.logger.Info("Subscribed to %s successfully", zap.String("stream", str.CachedInfo().Config.Name))

Check warning on line 169 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L169

Added line #L169 was not covered by tests
}

return h

return c

Check warning on line 172 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L172

Added line #L172 was not covered by tests
}

func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {
func (j *JetstreamClient) jetstreamSubscribe(c jetstream.Consumer, streamName string) {
messageHandler, h := j.messageHandlerFactoryJetstream()

Check warning on line 176 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L175-L176

Added lines #L175 - L176 were not covered by tests

cc, err := c.Consume(messageHandler)
if err != nil {
j.logger.Fatal("unable to consume messages", zap.Error(err))

Check warning on line 180 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L178-L180

Added lines #L178 - L180 were not covered by tests
}
defer cc.Stop()

Check warning on line 182 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L182

Added line #L182 was not covered by tests

clusterName := j.connection.ConnectedClusterName()
for msg := range h {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
err = publishTime.UnmarshalBinary(msg.Data)

Check warning on line 187 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L187

Added line #L187 was not covered by tests
if err != nil {
j.logger.Error("unable to unmarshal binary data for publishTime.")
j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject))
Expand All @@ -177,14 +204,14 @@
}
}

func (j *Jetstream) jetstreamPublish(subject string, streamName string) {
func (j *JetstreamClient) jetstreamPublish(subject string, streamName string) {

Check warning on line 207 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L207

Added line #L207 was not covered by tests
clusterName := j.connection.ConnectedClusterName()
for {
t, err := time.Now().MarshalBinary()
if err != nil {
j.logger.Error("could not marshal current time.", zap.Error(err))
}
if ack, err := j.jetstream.Publish(subject, t); err != nil {
if ack, err := j.jetstream.Publish(j.ctx, subject, t); err != nil {

Check warning on line 214 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L214

Added line #L214 was not covered by tests
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": failedPublish,
"stream": streamName,
Expand All @@ -210,12 +237,12 @@
}
}

func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) {
func (j *JetstreamClient) messageHandlerFactoryJetstream() (jetstream.MessageHandler, chan *Message) {

Check warning on line 240 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L240

Added line #L240 was not covered by tests
ch := make(chan *Message)
return func(msg *nats.Msg) {
return func(msg jetstream.Msg) {

Check warning on line 242 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L242

Added line #L242 was not covered by tests
ch <- &Message{
Subject: msg.Subject,
Data: msg.Data,
Subject: msg.Subject(),
Data: msg.Data(),

Check warning on line 245 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L244-L245

Added lines #L244 - L245 were not covered by tests
}
err := msg.Ack()
if err != nil {
Expand All @@ -225,7 +252,7 @@
}

// Close closes NATS connection
func (j *Jetstream) Close() {
func (j *JetstreamClient) Close() {

Check warning on line 255 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L255

Added line #L255 was not covered by tests
if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil {
j.logger.Error("could not flush", zap.Error(err))
}
Expand Down
Loading