Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix stream update and create #7

Merged
merged 2 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
},
NATS: natsclient.Config{
NewStreamAllow: true,
Stream: natsclient.Stream{
Name: "stream",
Streams: []natsclient.Stream{{
Name: "test",

Check warning on line 21 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L20-L21

Added lines #L20 - L21 were not covered by tests
Subject: "test",
},
},

Check warning on line 24 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L24

Added line #L24 was not covered by tests
URL: "localhost:4222",
PublishInterval: 2 * time.Second,
RequestTimeout: 50 * time.Millisecond,
MaxPubAcksInflight: 1000,
QueueSubscriptionGroup: "group",
FlushTimeout: 2 * time.Second,
ClientName: "localhost",

Check warning on line 31 in internal/config/default.go

View check run for this annotation

Codecov / codecov/patch

internal/config/default.go#L31

Added line #L31 was not covered by tests
},
Metric: metric.Config{
Server: metric.Server{Address: ":8080"},
Expand Down
3 changes: 2 additions & 1 deletion internal/natsclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import "time"

type Config struct {
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Stream Stream `json:"stream,omitempty" koanf:"stream"`
Streams []Stream `json:"stream,omitempty" koanf:"stream"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
}

type Stream struct {
Expand Down
91 changes: 62 additions & 29 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package natsclient

import (
"slices"
"time"

"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
successfulSubscribe = "successful subscribe"
failedPublish = "failed publish"
successfulPublish = "successful publish"
)

type Message struct {
Subject string
Data []byte
Expand All @@ -26,14 +34,14 @@
j := &Jetstream{
config: &config,
logger: logger,
metrics: NewMetrics(),
metrics: NewMetrics(config.ClientName),

Check warning on line 37 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L37

Added line #L37 was not covered by tests
}

j.connect()

j.createJetstreamContext()

j.createStream()
j.UpdateOrCreateStream()

Check warning on line 44 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L44

Added line #L44 was not covered by tests

return j
}
Expand Down Expand Up @@ -64,33 +72,49 @@
}
}

func (j *Jetstream) createStream() {
_, err := j.jetstream.StreamInfo(j.config.Stream.Name)
if err == nil {
_, err = j.jetstream.UpdateStream(&nats.StreamConfig{
Name: j.config.Stream.Name,
Subjects: []string{j.config.Stream.Subject},
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))
}
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
_, err = j.jetstream.AddStream(&nats.StreamConfig{
Name: j.config.Stream.Name,
Subjects: []string{j.config.Stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))
func (j *Jetstream) UpdateOrCreateStream() {
for _, stream := range j.config.Streams {
info, err := j.jetstream.StreamInfo(stream.Name)
if err == nil {
j.updateStream(stream, info)
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
j.createStream(stream)
} else {
j.logger.Panic("could not add subject", zap.Error(err))

Check warning on line 83 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L75-L83

Added lines #L75 - L83 were not covered by tests
}
} else {
j.logger.Panic("could not add subject", zap.Error(err))
}
}
func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) {
subjects := append(info.Config.Subjects, stream.Subject)
slices.Sort(subjects)
subjects = slices.Compact(subjects)
_, err := j.jetstream.UpdateStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: subjects,
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))

Check warning on line 96 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L87-L96

Added lines #L87 - L96 were not covered by tests
}
j.logger.Info("stream updated")

Check warning on line 98 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L98

Added line #L98 was not covered by tests
}

func (j *Jetstream) createStream(stream Stream) {
_, err := j.jetstream.AddStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: []string{stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))

Check warning on line 107 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L101-L107

Added lines #L101 - L107 were not covered by tests
}
j.logger.Info("add new stream")

Check warning on line 109 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L109

Added line #L109 was not covered by tests
}

func (j *Jetstream) StartBlackboxTest() {
messageChannel := j.createSubscribe(j.config.Stream.Subject)
go j.jetstreamPublish(j.config.Stream.Subject)
go j.jetstreamSubscribe(messageChannel)
for _, stream := range j.config.Streams {
messageChannel := j.createSubscribe(stream.Subject)
go j.jetstreamPublish(stream.Subject, stream.Name)
go j.jetstreamSubscribe(messageChannel, stream.Name)

Check warning on line 116 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L113-L116

Added lines #L113 - L116 were not covered by tests
}
}

// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
Expand All @@ -115,7 +139,7 @@

}

func (j *Jetstream) jetstreamSubscribe(h chan *Message) {
func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {

Check warning on line 142 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L142

Added line #L142 was not covered by tests
for msg := range h {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
Expand All @@ -126,20 +150,26 @@
}
latency := time.Since(publishTime).Seconds()
j.metrics.Latency.Observe(latency)
j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulSubscribe,
"stream": streamName,
}).Add(1)

Check warning on line 156 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L153-L156

Added lines #L153 - L156 were not covered by tests
j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
}
}

func (j *Jetstream) jetstreamPublish(subject string) {
func (j *Jetstream) jetstreamPublish(subject string, streamName string) {

Check warning on line 161 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L161

Added line #L161 was not covered by tests
for {
t, err := time.Now().MarshalBinary()
if err != nil {
j.logger.Error("could not marshal current time.", zap.Error(err))
}

if ack, err := j.jetstream.Publish(subject, t); err != nil {
j.metrics.SuccessCounter.WithLabelValues("failed publish").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": failedPublish,
"stream": streamName,
}).Add(1)

Check warning on line 172 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L169-L172

Added lines #L169 - L172 were not covered by tests
if err == nats.ErrTimeout {
j.logger.Error("Request timeout: No response received within the timeout period.")
} else if err == nats.ErrNoStreamResponse {
Expand All @@ -148,7 +178,10 @@
j.logger.Error("Request failed: %v", zap.Error(err))
}
} else {
j.metrics.SuccessCounter.WithLabelValues("successful publish").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulPublish,
"stream": streamName,
}).Add(1)

Check warning on line 184 in internal/natsclient/jetstream.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/jetstream.go#L181-L184

Added lines #L181 - L184 were not covered by tests
j.logger.Info("receive ack", zap.String("stream", ack.Stream))
}

Expand Down
22 changes: 11 additions & 11 deletions internal/natsclient/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

const (
Namespace = "nats_blackbox_exporter"
Subsystem = "client"
)

var latencyBuckets = []float64{
0.0001,
0.0005,
0.0007,
0.001,
0.0013,
0.0015,
0.0017,
0.002,
0.0023,
0.0025,
0.0027,
0.003,
0.004,
0.005,
0.006,
}

// Metrics has all the client metrics.
Expand Down Expand Up @@ -68,30 +68,30 @@
return *ev
}

func NewMetrics() Metrics {
func NewMetrics(clinetName string) Metrics {

Check warning on line 71 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L71

Added line #L71 was not covered by tests
return Metrics{
Connection: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,

Check warning on line 75 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L75

Added line #L75 was not covered by tests
Name: "connection_errors_total",
Help: "total number of disconnections and reconnections",
ConstLabels: nil,
}, []string{"type"}),
// nolint: exhaustruct
Latency: newHistogram(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,

Check warning on line 83 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L83

Added line #L83 was not covered by tests
Name: "latency",
Help: "from publish to consume duration in seconds",
ConstLabels: nil,
Buckets: latencyBuckets,
}),
SuccessCounter: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,

Check warning on line 91 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L91

Added line #L91 was not covered by tests
Name: "success_counter",
Help: "publish and consume success rate",
ConstLabels: nil,
}, []string{"type"}),
}, []string{"type", "stream"}),

Check warning on line 95 in internal/natsclient/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/natsclient/metric.go#L95

Added line #L95 was not covered by tests
}
}
Loading