Skip to content

Commit

Permalink
feat: add flag for settings path
Browse files Browse the repository at this point in the history
  • Loading branch information
kianaza committed Jul 5, 2024
1 parent 276bb3a commit 941291a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 22 deletions.
3 changes: 0 additions & 3 deletions internal/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
func main(cfg config.Config, logger *zap.Logger) {
natsConfig := cfg.NATS

// client := natsclient.New(natsConfig, logger)
// client.StartMessaging()

jetstreamClient := natsclient.NewJetstream(natsConfig, logger)
jetstreamClient.StartBlackboxTest()

Expand Down
11 changes: 10 additions & 1 deletion internal/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package cmd

import (
"fmt"
"os"

"github.com/snapp-incubator/nats-blackbox-exporter/internal/config"
"github.com/snapp-incubator/nats-blackbox-exporter/internal/logger"
"github.com/snapp-incubator/nats-blackbox-exporter/internal/metric"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

// ExitFailure status code.
const ExitFailure = 1

var settingsPath string

func Execute() {
cfg := config.New()
pflag.StringVar(&settingsPath, "settings", "/opt/nats-blackbox-exporter/settings.yml", "Path to settings file")
pflag.Parse()

fmt.Println("settingsPath", settingsPath)

cfg := config.New(settingsPath)

logger := logger.New(cfg.Logger)

Expand Down
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type (
)

// New reads configuration with koanf.
func New() Config {
func New(settingsPath string) Config {
var instance Config

k := koanf.New(".")
Expand All @@ -42,7 +42,7 @@ func New() Config {
}

// load configuration from file
if err := k.Load(file.Provider("setting/config.yaml"), yaml.Parser()); err != nil {
if err := k.Load(file.Provider(settingsPath), yaml.Parser()); err != nil {
log.Printf("error loading config.yaml")
}

Expand Down
23 changes: 14 additions & 9 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream {
j := &Jetstream{
config: &config,
logger: logger,
metrics: NewMetrics(config.ClientName),
metrics: NewMetrics(),
}

j.connect()
Expand Down Expand Up @@ -140,6 +140,7 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message {
}

func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {
clusterName := j.connection.ConnectedClusterName()
for msg := range h {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
Expand All @@ -150,27 +151,30 @@ func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {
}
latency := time.Since(publishTime).Seconds()
j.metrics.Latency.With(prometheus.Labels{
"stream": streamName,
"stream": streamName,
"cluster": clusterName,
}).Observe(latency)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulSubscribe,
"stream": streamName,
"type": successfulSubscribe,
"stream": streamName,
"cluster": clusterName,
}).Add(1)
j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
}
}

func (j *Jetstream) jetstreamPublish(subject string, streamName string) {
clusterName := j.connection.ConnectedClusterName()
for {
t, err := time.Now().MarshalBinary()
if err != nil {
j.logger.Error("could not marshal current time.", zap.Error(err))
}

if ack, err := j.jetstream.Publish(subject, t); err != nil {
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": failedPublish,
"stream": streamName,
"type": failedPublish,
"stream": streamName,
"cluster": clusterName,
}).Add(1)
if err == nats.ErrTimeout {
j.logger.Error("Request timeout: No response received within the timeout period.")
Expand All @@ -181,8 +185,9 @@ func (j *Jetstream) jetstreamPublish(subject string, streamName string) {
}
} else {
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulPublish,
"stream": streamName,
"type": successfulPublish,
"stream": streamName,
"cluster": clusterName,
}).Add(1)
j.logger.Info("receive ack", zap.String("stream", ack.Stream))
}
Expand Down
15 changes: 8 additions & 7 deletions internal/natsclient/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

const (
Namespace = "nats_blackbox_exporter"
Subsystem = "client"
)

var latencyBuckets = []float64{
Expand Down Expand Up @@ -68,30 +69,30 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom
return *ev
}

func NewMetrics(clinetName string) Metrics {
func NewMetrics() Metrics {
return Metrics{
Connection: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: clinetName,
Subsystem: Subsystem,
Name: "connection_errors_total",
Help: "total number of disconnections and reconnections",
ConstLabels: nil,
}, []string{"type"}),
}, []string{"type", "cluster"}),
// nolint: exhaustruct
Latency: newHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: clinetName,
Subsystem: Subsystem,
Name: "latency",
Help: "from publish to consume duration in seconds",
ConstLabels: nil,
Buckets: latencyBuckets,
}, []string{"stream"}),
}, []string{"stream", "cluster"}),
SuccessCounter: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: clinetName,
Subsystem: Subsystem,
Name: "success_counter",
Help: "publish and consume success rate",
ConstLabels: nil,
}, []string{"type", "stream"}),
}, []string{"type", "stream", "cluster"}),
}
}

0 comments on commit 941291a

Please sign in to comment.