From 941291ae9f6c6093321a47ac361bd5cacaa68a34 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 6 Jul 2024 02:17:17 +0330 Subject: [PATCH] feat: add flag for settings path --- internal/cmd/main.go | 3 --- internal/cmd/root.go | 11 ++++++++++- internal/config/config.go | 4 ++-- internal/natsclient/jetstream.go | 23 ++++++++++++++--------- internal/natsclient/metric.go | 15 ++++++++------- 5 files changed, 34 insertions(+), 22 deletions(-) diff --git a/internal/cmd/main.go b/internal/cmd/main.go index 1cbb932..1f53937 100644 --- a/internal/cmd/main.go +++ b/internal/cmd/main.go @@ -13,9 +13,6 @@ import ( func main(cfg config.Config, logger *zap.Logger) { natsConfig := cfg.NATS - // client := natsclient.New(natsConfig, logger) - // client.StartMessaging() - jetstreamClient := natsclient.NewJetstream(natsConfig, logger) jetstreamClient.StartBlackboxTest() diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 1a63389..5357de9 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -1,19 +1,28 @@ package cmd import ( + "fmt" "os" "github.com/snapp-incubator/nats-blackbox-exporter/internal/config" "github.com/snapp-incubator/nats-blackbox-exporter/internal/logger" "github.com/snapp-incubator/nats-blackbox-exporter/internal/metric" "github.com/spf13/cobra" + "github.com/spf13/pflag" ) // ExitFailure status code. const ExitFailure = 1 +var settingsPath string + func Execute() { - cfg := config.New() + pflag.StringVar(&settingsPath, "settings", "/opt/nats-blackbox-exporter/settings.yml", "Path to settings file") + pflag.Parse() + + fmt.Println("settingsPath", settingsPath) + + cfg := config.New(settingsPath) logger := logger.New(cfg.Logger) diff --git a/internal/config/config.go b/internal/config/config.go index 9596a02..719447d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -31,7 +31,7 @@ type ( ) // New reads configuration with koanf. -func New() Config { +func New(settingsPath string) Config { var instance Config k := koanf.New(".") @@ -42,7 +42,7 @@ func New() Config { } // load configuration from file - if err := k.Load(file.Provider("setting/config.yaml"), yaml.Parser()); err != nil { + if err := k.Load(file.Provider(settingsPath), yaml.Parser()); err != nil { log.Printf("error loading config.yaml") } diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index df4c79b..f1b0b96 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -34,7 +34,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream { j := &Jetstream{ config: &config, logger: logger, - metrics: NewMetrics(config.ClientName), + metrics: NewMetrics(), } j.connect() @@ -140,6 +140,7 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message { } func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) { + clusterName := j.connection.ConnectedClusterName() for msg := range h { var publishTime time.Time err := publishTime.UnmarshalBinary(msg.Data) @@ -150,27 +151,30 @@ func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) { } latency := time.Since(publishTime).Seconds() j.metrics.Latency.With(prometheus.Labels{ - "stream": streamName, + "stream": streamName, + "cluster": clusterName, }).Observe(latency) j.metrics.SuccessCounter.With(prometheus.Labels{ - "type": successfulSubscribe, - "stream": streamName, + "type": successfulSubscribe, + "stream": streamName, + "cluster": clusterName, }).Add(1) j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency)) } } func (j *Jetstream) jetstreamPublish(subject string, streamName string) { + clusterName := j.connection.ConnectedClusterName() for { t, err := time.Now().MarshalBinary() if err != nil { j.logger.Error("could not marshal current time.", zap.Error(err)) } - if ack, err := j.jetstream.Publish(subject, t); err != nil { j.metrics.SuccessCounter.With(prometheus.Labels{ - "type": failedPublish, - "stream": streamName, + "type": failedPublish, + "stream": streamName, + "cluster": clusterName, }).Add(1) if err == nats.ErrTimeout { j.logger.Error("Request timeout: No response received within the timeout period.") @@ -181,8 +185,9 @@ func (j *Jetstream) jetstreamPublish(subject string, streamName string) { } } else { j.metrics.SuccessCounter.With(prometheus.Labels{ - "type": successfulPublish, - "stream": streamName, + "type": successfulPublish, + "stream": streamName, + "cluster": clusterName, }).Add(1) j.logger.Info("receive ack", zap.String("stream", ack.Stream)) } diff --git a/internal/natsclient/metric.go b/internal/natsclient/metric.go index b8f6075..35de9f5 100644 --- a/internal/natsclient/metric.go +++ b/internal/natsclient/metric.go @@ -8,6 +8,7 @@ import ( const ( Namespace = "nats_blackbox_exporter" + Subsystem = "client" ) var latencyBuckets = []float64{ @@ -68,30 +69,30 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom return *ev } -func NewMetrics(clinetName string) Metrics { +func NewMetrics() Metrics { return Metrics{ Connection: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: clinetName, + Subsystem: Subsystem, Name: "connection_errors_total", Help: "total number of disconnections and reconnections", ConstLabels: nil, - }, []string{"type"}), + }, []string{"type", "cluster"}), // nolint: exhaustruct Latency: newHistogramVec(prometheus.HistogramOpts{ Namespace: Namespace, - Subsystem: clinetName, + Subsystem: Subsystem, Name: "latency", Help: "from publish to consume duration in seconds", ConstLabels: nil, Buckets: latencyBuckets, - }, []string{"stream"}), + }, []string{"stream", "cluster"}), SuccessCounter: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: clinetName, + Subsystem: Subsystem, Name: "success_counter", Help: "publish and consume success rate", ConstLabels: nil, - }, []string{"type", "stream"}), + }, []string{"type", "stream", "cluster"}), } }