Skip to content

Commit

Permalink
fix: fix stream update and create
Browse files Browse the repository at this point in the history
  • Loading branch information
kianaza committed Jun 29, 2024
1 parent 96d0ce0 commit 2a4fe84
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
6 changes: 4 additions & 2 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ func Default() Config {
},
NATS: natsclient.Config{
NewStreamAllow: true,
Stream: natsclient.Stream{
Name: "stream",
Streams: []natsclient.Stream{{
Name: "test",

Check warning on line 21 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L20-L21

Added lines #L20 - L21 were not covered by tests
Subject: "test",
},
},

Check warning on line 24 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L24

Added line #L24 was not covered by tests
URL: "localhost:4222",
PublishInterval: 2 * time.Second,
RequestTimeout: 50 * time.Millisecond,
MaxPubAcksInflight: 1000,
QueueSubscriptionGroup: "group",
FlushTimeout: 2 * time.Second,
ClientName: "localhost",

Check warning on line 31 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L31

Added line #L31 was not covered by tests
},
Metric: metric.Config{
Server: metric.Server{Address: ":8080"},
Expand Down
3 changes: 2 additions & 1 deletion internal/natsclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import "time"

type Config struct {
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Stream Stream `json:"stream,omitempty" koanf:"stream"`
Streams []Stream `json:"stream,omitempty" koanf:"stream"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
}

type Stream struct {
Expand Down
87 changes: 60 additions & 27 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package natsclient

import (
"slices"
"time"

"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
successfulSubscribe = "successful subscribe"
failedPublish = "failed publish"
successfulPublish = "successful publish"
)

type Message struct {
Subject string
Data []byte
Expand All @@ -26,7 +34,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream {
j := &Jetstream{
config: &config,
logger: logger,
metrics: NewMetrics(),
metrics: NewMetrics(config.ClientName),

Check warning on line 37 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L37

Added line #L37 was not covered by tests
}

j.connect()
Expand Down Expand Up @@ -65,32 +73,48 @@ func (j *Jetstream) createJetstreamContext() {
}

func (j *Jetstream) createStream() {
_, err := j.jetstream.StreamInfo(j.config.Stream.Name)
if err == nil {
_, err = j.jetstream.UpdateStream(&nats.StreamConfig{
Name: j.config.Stream.Name,
Subjects: []string{j.config.Stream.Subject},
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))
}
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
_, err = j.jetstream.AddStream(&nats.StreamConfig{
Name: j.config.Stream.Name,
Subjects: []string{j.config.Stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))
for _, stream := range j.config.Streams {
info, err := j.jetstream.StreamInfo(stream.Name)
if err == nil {
j.updateStream(stream, info)
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
j.addStream(stream)
} else {
j.logger.Panic("could not add subject", zap.Error(err))

Check warning on line 83 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L76-L83

Added lines #L76 - L83 were not covered by tests
}
} else {
j.logger.Panic("could not add subject", zap.Error(err))
}
}
func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) {
subjects := append(info.Config.Subjects, stream.Subject)
slices.Sort(subjects)
subjects = slices.Compact(subjects)
_, err := j.jetstream.UpdateStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: subjects,
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))

Check warning on line 96 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L87-L96

Added lines #L87 - L96 were not covered by tests
}
j.logger.Info("stream updated")

Check warning on line 98 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L98

Added line #L98 was not covered by tests
}

func (j *Jetstream) addStream(stream Stream) {
_, err := j.jetstream.AddStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: []string{stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))

Check warning on line 107 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L101-L107

Added lines #L101 - L107 were not covered by tests
}
j.logger.Info("add new stream")

Check warning on line 109 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L109

Added line #L109 was not covered by tests
}

func (j *Jetstream) StartBlackboxTest() {
messageChannel := j.createSubscribe(j.config.Stream.Subject)
go j.jetstreamPublish(j.config.Stream.Subject)
go j.jetstreamSubscribe(messageChannel)
for _, stream := range j.config.Streams {
messageChannel := j.createSubscribe(stream.Subject)
go j.jetstreamPublish(stream.Subject, stream.Name)
go j.jetstreamSubscribe(messageChannel, stream.Name)

Check warning on line 116 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L113-L116

Added lines #L113 - L116 were not covered by tests
}
}

// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
Expand All @@ -115,7 +139,7 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message {

}

func (j *Jetstream) jetstreamSubscribe(h chan *Message) {
func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {

Check warning on line 142 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L142

Added line #L142 was not covered by tests
for msg := range h {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
Expand All @@ -126,20 +150,26 @@ func (j *Jetstream) jetstreamSubscribe(h chan *Message) {
}
latency := time.Since(publishTime).Seconds()
j.metrics.Latency.Observe(latency)
j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulSubscribe,
"stream": streamName,
}).Add(1)

Check warning on line 156 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L153-L156

Added lines #L153 - L156 were not covered by tests
j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
}
}

func (j *Jetstream) jetstreamPublish(subject string) {
func (j *Jetstream) jetstreamPublish(subject string, streamName string) {

Check warning on line 161 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L161

Added line #L161 was not covered by tests
for {
t, err := time.Now().MarshalBinary()
if err != nil {
j.logger.Error("could not marshal current time.", zap.Error(err))
}

if ack, err := j.jetstream.Publish(subject, t); err != nil {
j.metrics.SuccessCounter.WithLabelValues("failed publish").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": failedPublish,
"stream": streamName,
}).Add(1)

Check warning on line 172 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L169-L172

Added lines #L169 - L172 were not covered by tests
if err == nats.ErrTimeout {
j.logger.Error("Request timeout: No response received within the timeout period.")
} else if err == nats.ErrNoStreamResponse {
Expand All @@ -148,7 +178,10 @@ func (j *Jetstream) jetstreamPublish(subject string) {
j.logger.Error("Request failed: %v", zap.Error(err))
}
} else {
j.metrics.SuccessCounter.WithLabelValues("successful publish").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulPublish,
"stream": streamName,
}).Add(1)

Check warning on line 184 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L181-L184

Added lines #L181 - L184 were not covered by tests
j.logger.Info("receive ack", zap.String("stream", ack.Stream))
}

Expand Down
22 changes: 11 additions & 11 deletions internal/natsclient/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import (

const (
Namespace = "nats_blackbox_exporter"
Subsystem = "client"
)

var latencyBuckets = []float64{
0.0001,
0.0005,
0.0007,
0.001,
0.0013,
0.0015,
0.0017,
0.002,
0.0023,
0.0025,
0.0027,
0.003,
0.004,
0.005,
0.006,
}

// Metrics has all the client metrics.
Expand Down Expand Up @@ -68,30 +68,30 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom
return *ev
}

func NewMetrics() Metrics {
func NewMetrics(clinetName string) Metrics {

Check warning on line 71 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L71

Added line #L71 was not covered by tests
return Metrics{
Connection: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,

Check warning on line 75 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L75

Added line #L75 was not covered by tests
Name: "connection_errors_total",
Help: "total number of disconnections and reconnections",
ConstLabels: nil,
}, []string{"type"}),
// nolint: exhaustruct
Latency: newHistogram(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,

Check warning on line 83 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L83

Added line #L83 was not covered by tests
Name: "latency",
Help: "from publish to consume duration in seconds",
ConstLabels: nil,
Buckets: latencyBuckets,
}),
SuccessCounter: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,

Check warning on line 91 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L91

Added line #L91 was not covered by tests
Name: "success_counter",
Help: "publish and consume success rate",
ConstLabels: nil,
}, []string{"type"}),
}, []string{"type", "stream"}),

Check warning on line 95 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L95

Added line #L95 was not covered by tests
}
}

0 comments on commit 2a4fe84

Please sign in to comment.