From 78ce04d8318c13d635b0fdc15833ea4e5c2cb267 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 18 May 2024 19:02:07 +0330 Subject: [PATCH 01/15] feat: add metrics ConnectionErrors, Latency and SuccessCounter --- cmd/nats-blackbox-exporter/main.go | 9 ++- go.mod | 10 ++- go.sum | 13 +++- internal/client/client.go | 67 +++++++++++------ internal/client/metric.go | 116 +++++++++++++++++++++++++++++ internal/config/config.go | 2 + internal/config/default.go | 5 ++ internal/metric/config.go | 6 ++ internal/metric/server.go | 51 +++++++++++++ 9 files changed, 250 insertions(+), 29 deletions(-) create mode 100644 internal/client/metric.go create mode 100644 internal/metric/config.go create mode 100644 internal/metric/server.go diff --git a/cmd/nats-blackbox-exporter/main.go b/cmd/nats-blackbox-exporter/main.go index 52c8588..65b24d5 100644 --- a/cmd/nats-blackbox-exporter/main.go +++ b/cmd/nats-blackbox-exporter/main.go @@ -8,6 +8,7 @@ import ( "github.com/snapp-incubator/nats-blackbox-exporter/internal/client" "github.com/snapp-incubator/nats-blackbox-exporter/internal/config" "github.com/snapp-incubator/nats-blackbox-exporter/internal/logger" + "github.com/snapp-incubator/nats-blackbox-exporter/internal/metric" ) func main() { @@ -16,13 +17,13 @@ func main() { logger := logger.New(cfg.Logger) - nc := client.Connect(logger, natsConfig) + metric.NewServer(cfg.Metric).Start(logger.Named("metric")) - natsClient := client.New(nc, logger, natsConfig) + natsClient := client.New(logger, natsConfig) - go natsClient.Subscribe("subject1") + go natsClient.Subscribe("") - go natsClient.Publish("subject1") + go natsClient.Publish("") sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) diff --git a/go.mod b/go.mod index e8aa73c..36ac070 100644 --- a/go.mod +++ b/go.mod @@ -6,23 +6,31 @@ require ( github.com/knadh/koanf v1.5.0 github.com/knadh/koanf/v2 v2.1.1 github.com/nats-io/nats.go v1.31.0 + github.com/prometheus/client_golang v1.11.1 github.com/tidwall/pretty v1.2.1 go.uber.org/zap v1.17.0 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/klauspost/compress v1.17.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.26.0 // indirect + github.com/prometheus/procfs v0.6.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/sys v0.5.0 // indirect - golang.org/x/text v0.13.0 // indirect + google.golang.org/protobuf v1.26.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 458fd3b..8275636 100644 --- a/go.sum +++ b/go.sum @@ -23,9 +23,11 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21 github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -77,6 +79,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -88,6 +91,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -161,6 +165,7 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= @@ -208,17 +213,21 @@ github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSg github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= +github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= +github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -336,8 +345,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -353,6 +360,7 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -380,6 +388,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= diff --git a/internal/client/client.go b/internal/client/client.go index f38dba5..a2a73b0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -8,38 +8,47 @@ import ( ) type Client struct { - Conn *nats.Conn - Logger *zap.Logger - Config Config + Conn *nats.Conn + Logger *zap.Logger + Config Config + Metrics Metrics } -func Connect(logger *zap.Logger, cfg Config) *nats.Conn { - nc, err := nats.Connect(cfg.URL) - if err != nil { - logger.Fatal("nats connection failed", zap.Error(err)) - } +func New(logger *zap.Logger, cfg Config) *Client { + nc := connect(logger, cfg) - logger.Info("nats connection successful", - zap.String("connected-addr", nc.ConnectedAddr()), - zap.Strings("discovered-servers", nc.DiscoveredServers())) + metrics := NewMetrics() + + client := &Client{ + Conn: nc, + Logger: logger, + Config: cfg, + Metrics: NewMetrics(), + } nc.SetDisconnectErrHandler(func(_ *nats.Conn, err error) { - logger.Fatal("nats disconnected", zap.Error(err)) + metrics.ConnectionErrors.Add(1) + logger.Error("nats disconnected", zap.Error(err)) }) nc.SetReconnectHandler(func(_ *nats.Conn) { logger.Warn("nats reconnected") }) - return nc + return client } -func New(nc *nats.Conn, logger *zap.Logger, cfg Config) *Client { - return &Client{ - Conn: nc, - Logger: logger, - Config: cfg, +func connect(logger *zap.Logger, cfg Config) *nats.Conn { + nc, err := nats.Connect(cfg.URL) + if err != nil { + logger.Fatal("nats connection failed", zap.Error(err)) } + + logger.Info("nats connection successful", + zap.String("connected-addr", nc.ConnectedAddr()), + zap.Strings("discovered-servers", nc.DiscoveredServers())) + + return nc } func (c *Client) Publish(subject string) { @@ -47,8 +56,11 @@ func (c *Client) Publish(subject string) { subject = c.Config.DefaultSubject } for { - msg, err := c.Conn.Request(subject, []byte("Hello, NATS!"), c.Config.RequestTimeout) + t := time.Now() + tt, _ := t.MarshalBinary() + msg, err := c.Conn.Request(subject, tt, c.Config.RequestTimeout) if err != nil { + c.Metrics.SuccessCounter.WithLabelValues("failed publish").Add(1) if err == nats.ErrTimeout { c.Logger.Error("Request timeout: No response received within the timeout period.") } else if err == nats.ErrNoResponders { @@ -57,6 +69,7 @@ func (c *Client) Publish(subject string) { c.Logger.Error("Request failed: %v", zap.Error(err)) } } else { + c.Metrics.SuccessCounter.WithLabelValues("successful publish").Add(1) c.Logger.Info("Received response successfully:", zap.ByteString("response", msg.Data)) } @@ -69,14 +82,24 @@ func (c *Client) Subscribe(subject string) { subject = c.Config.DefaultSubject } _, err := c.Conn.Subscribe(subject, func(msg *nats.Msg) { - c.Logger.Info("Received message successfully: ", zap.ByteString("message", msg.Data)) - err := c.Conn.Publish(msg.Reply, []byte("Hi!")) + var publishTime time.Time + err := publishTime.UnmarshalBinary(msg.Data) + if err != nil { + c.Logger.Error("Received message successfully but message is not valid time value") + } else { + latency := time.Since(publishTime).Seconds() + c.Metrics.Latency.Observe(latency) + c.Logger.Info("Received message successfully: ", zap.Float64("latency", latency)) + } + c.Metrics.SuccessCounter.WithLabelValues("subscribe").Add(1) + + err = c.Conn.Publish(msg.Reply, []byte("ack!")) if err != nil { c.Logger.Error("Failed to publish response: %v", zap.Error(err)) } }) if err != nil { - c.Logger.Error("Failed to subscribe to subject 'subject1': %v", zap.Error(err)) + c.Logger.Error("Failed to subscribe to subject '%v': %v", zap.String(subject, subject), zap.Error(err)) } } diff --git a/internal/client/metric.go b/internal/client/metric.go new file mode 100644 index 0000000..a339aeb --- /dev/null +++ b/internal/client/metric.go @@ -0,0 +1,116 @@ +package client + +import ( + "errors" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + Namespace = "nats_blackbox_exporter" + Subsystem = "client" +) + +var latencyBuckets = []float64{ + 0.0001, // 0.1 ms + 0.0005, // 0.5 ms + 0.0007, // 0.7 ms + 0.001, // 1 ms + 0.002, // 2 ms + 0.003, // 3 ms + 0.004, // 4 ms + 0.005, // 5 ms + 0.006, // 6 ms +} + +// Metrics has all the client metrics. +type Metrics struct { + ConnectionErrors prometheus.Counter + Latency prometheus.Histogram + SuccessCounter prometheus.CounterVec +} + +// nolint: ireturn +func newCounter(counterOpts prometheus.CounterOpts) prometheus.Counter { + ev := prometheus.NewCounter(counterOpts) + + if err := prometheus.Register(ev); err != nil { + var are prometheus.AlreadyRegisteredError + if ok := errors.As(err, &are); ok { + ev, ok = are.ExistingCollector.(prometheus.Counter) + if !ok { + panic("different metric type registration") + } + } else { + panic(err) + } + } + + return ev +} + +// nolint: ireturn +func newHistogram(histogramOpts prometheus.HistogramOpts) prometheus.Histogram { + ev := prometheus.NewHistogram(histogramOpts) + + if err := prometheus.Register(ev); err != nil { + var are prometheus.AlreadyRegisteredError + if ok := errors.As(err, &are); ok { + ev, ok = are.ExistingCollector.(prometheus.Histogram) + if !ok { + panic("different metric type registration") + } + } else { + panic(err) + } + } + + return ev +} + +// nolint: ireturn +func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prometheus.CounterVec { + ev := prometheus.NewCounterVec(counterOpts, labelNames) + + if err := prometheus.Register(ev); err != nil { + var are prometheus.AlreadyRegisteredError + if ok := errors.As(err, &are); ok { + ev, ok = are.ExistingCollector.(*prometheus.CounterVec) + if !ok { + panic("different metric type registration") + } + } else { + panic(err) + } + } + + return *ev +} + +func NewMetrics() Metrics { + return Metrics{ + ConnectionErrors: newCounter(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "connection_errors_total", + Help: "total number of connection errors", + ConstLabels: nil, + }), + // nolint: exhaustruct + Latency: newHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "latency", + Help: "from publish to consume duration in seconds", + ConstLabels: nil, + Buckets: latencyBuckets, + }), + SuccessCounter: newCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "success_counter", + Help: "success rate", + ConstLabels: nil, + }, []string{"type"}), + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 12ce330..2cbd54d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ import ( "github.com/knadh/koanf/v2" "github.com/snapp-incubator/nats-blackbox-exporter/internal/client" "github.com/snapp-incubator/nats-blackbox-exporter/internal/logger" + "github.com/snapp-incubator/nats-blackbox-exporter/internal/metric" "github.com/tidwall/pretty" ) @@ -25,6 +26,7 @@ type ( Config struct { Logger logger.Config `json:"logger,omitempty" koanf:"logger"` NATS client.Config `json:"nats,omitempty" koanf:"nats"` + Metric metric.Config `json:"metric,omitempty" koanf:"metric"` } ) diff --git a/internal/config/default.go b/internal/config/default.go index b7ac57c..72868b3 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -5,6 +5,7 @@ import ( "github.com/snapp-incubator/nats-blackbox-exporter/internal/client" "github.com/snapp-incubator/nats-blackbox-exporter/internal/logger" + "github.com/snapp-incubator/nats-blackbox-exporter/internal/metric" ) // Default return default configuration. @@ -20,5 +21,9 @@ func Default() Config { RequestTimeout: 50 * time.Millisecond, DefaultSubject: "test", }, + Metric: metric.Config{ + Address: ":8080", + Enabled: true, + }, } } diff --git a/internal/metric/config.go b/internal/metric/config.go new file mode 100644 index 0000000..9d15d35 --- /dev/null +++ b/internal/metric/config.go @@ -0,0 +1,6 @@ +package metric + +type Config struct { + Address string `json:"address,omitempty" koanf:"address"` + Enabled bool `json:"enabled,omitempty" koanf:"enabled"` +} diff --git a/internal/metric/server.go b/internal/metric/server.go new file mode 100644 index 0000000..1844436 --- /dev/null +++ b/internal/metric/server.go @@ -0,0 +1,51 @@ +package metric + +import ( + "errors" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" +) + +// Server contains information about metrics server. +type Server struct { + srv *http.ServeMux + address string +} + +// NewServer creates a new monitoring server. +func NewServer(cfg Config) Server { + var srv *http.ServeMux + + if cfg.Enabled { + srv = http.NewServeMux() + srv.Handle("/metrics", promhttp.Handler()) + } + + return Server{ + address: cfg.Address, + srv: srv, + } +} + +// Start creates and run a metric server for prometheus in new go routine. +// nolint: mnd +func (s Server) Start(logger *zap.Logger) { + go func() { + // nolint: exhaustruct + srv := http.Server{ + Addr: s.address, + Handler: s.srv, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, + TLSConfig: nil, + } + + if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + logger.Error("metric server initiation failed", zap.Error(err)) + } + }() +} From 54cfd0407ae948e03a8360e7053c0f7c1aaf56f7 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sun, 19 May 2024 15:31:53 +0330 Subject: [PATCH 02/15] feat: change module instruction --- .golangci.yml | 54 ++++++++++++++--------------- cmd/nats-blackbox-exporter/main.go | 30 ++-------------- go.mod | 3 ++ go.sum | 7 ++++ internal/.DS_Store | Bin 0 -> 6148 bytes internal/client/client.go | 5 +++ internal/cmd/main.go | 25 +++++++++++++ internal/cmd/root.go | 34 ++++++++++++++++++ 8 files changed, 103 insertions(+), 55 deletions(-) create mode 100644 internal/.DS_Store create mode 100644 internal/cmd/main.go create mode 100644 internal/cmd/root.go diff --git a/.golangci.yml b/.golangci.yml index d489167..bbc4e3a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,27 +1,27 @@ ---- -linters: - enable-all: true - disable: - - depguard - # we don't use json with camel-case - - tagliatelle - - nolintlint - # it should improve to support more known patterns - - varnamelen - - ireturn - # deprecated linters - - maligned - - scopelint - - golint - - ifshort - - interfacer - - exhaustivestruct - - nosnakecase - - varcheck - - deadcode - - structcheck - - gomnd - - execinquery - # temporarily disabled - - tagalign - - gochecknoglobals +# --- +# linters: +# enable-all: true +# disable: +# - depguard +# # we don't use json with camel-case +# - tagliatelle +# - nolintlint +# # it should improve to support more known patterns +# - varnamelen +# - ireturn +# # deprecated linters +# - maligned +# - scopelint +# - golint +# - ifshort +# - interfacer +# - exhaustivestruct +# - nosnakecase +# - varcheck +# - deadcode +# - structcheck +# - gomnd +# - execinquery +# # temporarily disabled +# - tagalign +# - gochecknoglobals diff --git a/cmd/nats-blackbox-exporter/main.go b/cmd/nats-blackbox-exporter/main.go index 65b24d5..2555321 100644 --- a/cmd/nats-blackbox-exporter/main.go +++ b/cmd/nats-blackbox-exporter/main.go @@ -1,33 +1,7 @@ package main -import ( - "os" - "os/signal" - "syscall" - - "github.com/snapp-incubator/nats-blackbox-exporter/internal/client" - "github.com/snapp-incubator/nats-blackbox-exporter/internal/config" - "github.com/snapp-incubator/nats-blackbox-exporter/internal/logger" - "github.com/snapp-incubator/nats-blackbox-exporter/internal/metric" -) +import "github.com/snapp-incubator/nats-blackbox-exporter/internal/cmd" func main() { - cfg := config.New() - natsConfig := cfg.NATS - - logger := logger.New(cfg.Logger) - - metric.NewServer(cfg.Metric).Start(logger.Named("metric")) - - natsClient := client.New(logger, natsConfig) - - go natsClient.Subscribe("") - - go natsClient.Publish("") - - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - <-sig - logger.Info("Received termination signal. Exiting...") - os.Exit(0) + cmd.Execute() } diff --git a/go.mod b/go.mod index 36ac070..9d15ee3 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/knadh/koanf/v2 v2.1.1 github.com/nats-io/nats.go v1.31.0 github.com/prometheus/client_golang v1.11.1 + github.com/spf13/cobra v1.8.0 github.com/tidwall/pretty v1.2.1 go.uber.org/zap v1.17.0 ) @@ -18,6 +19,7 @@ require ( github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -27,6 +29,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.6.0 // indirect diff --git a/go.sum b/go.sum index 8275636..7293e7b 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -131,6 +132,8 @@ github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvh github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= @@ -231,6 +234,7 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= @@ -238,6 +242,9 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/.DS_Store b/internal/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..7ee04fa20b2c0f41d8492b2f2d9b2ebc0630a371 GIT binary patch literal 6148 zcmeHK!Ab)$5S`H?1us4JBIFAM{~(rn@Z2Ae)>?$cg`)TTk)NpV&5WgM_gq9~mdwj0 zZ?egQO)?^)i;w3!k(r20QHaXQh_QLJ>mtaPK(#U6hC?*+?g^ zvX$ZdDat*{?U$KNpWGh27w~sSYbE#dgxi6;!?TtBkgGSpxHfpYR3H^d1yX@j;0y}j z%%)AR9n+@*sX!`lP=NbGK_S+_=4e|7oTUK337XAN=T(?7C9noINA$qdLxCQukz%Na zqrC*L1~x|z7megY}s= Ifio!Z4Y80j4FCWD literal 0 HcmV?d00001 diff --git a/internal/client/client.go b/internal/client/client.go index a2a73b0..c5f8437 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -51,6 +51,11 @@ func connect(logger *zap.Logger, cfg Config) *nats.Conn { return nc } +func (c *Client) StartMessaging() { + go c.Subscribe("") + go c.Publish("") +} + func (c *Client) Publish(subject string) { if subject == "" { subject = c.Config.DefaultSubject diff --git a/internal/cmd/main.go b/internal/cmd/main.go new file mode 100644 index 0000000..121dc9b --- /dev/null +++ b/internal/cmd/main.go @@ -0,0 +1,25 @@ +package cmd + +import ( + "os" + "os/signal" + "syscall" + + "github.com/snapp-incubator/nats-blackbox-exporter/internal/client" + "github.com/snapp-incubator/nats-blackbox-exporter/internal/config" + "go.uber.org/zap" +) + +func main(cfg config.Config, logger *zap.Logger) { + natsConfig := cfg.NATS + + natsClient := client.New(logger, natsConfig) + + natsClient.StartMessaging() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + logger.Info("Received termination signal. Exiting...") + os.Exit(0) +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go new file mode 100644 index 0000000..1a63389 --- /dev/null +++ b/internal/cmd/root.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "os" + + "github.com/snapp-incubator/nats-blackbox-exporter/internal/config" + "github.com/snapp-incubator/nats-blackbox-exporter/internal/logger" + "github.com/snapp-incubator/nats-blackbox-exporter/internal/metric" + "github.com/spf13/cobra" +) + +// ExitFailure status code. +const ExitFailure = 1 + +func Execute() { + cfg := config.New() + + logger := logger.New(cfg.Logger) + + metric.NewServer(cfg.Metric).Start(logger.Named("metric")) + + // nolint: exhaustruct + root := &cobra.Command{ + Use: "nats-blackbox-exporter", + Short: "ping pong with nats broker", + Run: func(_ *cobra.Command, _ []string) { + main(cfg, logger) + }, + } + + if err := root.Execute(); err != nil { + os.Exit(ExitFailure) + } +} From ad77fbd24eaf501f0ca83e3238850f9b04b7b29b Mon Sep 17 00:00:00 2001 From: kianaza Date: Mon, 20 May 2024 20:07:01 +0330 Subject: [PATCH 03/15] fix: Fix some issues caused by merge --- internal/config/config.go | 6 +++--- internal/natsclient/client.go | 23 +++++++++-------------- internal/natsclient/metric.go | 2 +- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 21ae582..871e3c8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -24,9 +24,9 @@ const ( type ( // Config holds all configurations. Config struct { - Logger logger.Config `json:"logger,omitempty" koanf:"logger"` - NATS client.Config `json:"nats,omitempty" koanf:"nats"` - Metric metric.Config `json:"metric,omitempty" koanf:"metric"` + Logger logger.Config `json:"logger,omitempty" koanf:"logger"` + NATS natsclient.Config `json:"nats,omitempty" koanf:"nats"` + Metric metric.Config `json:"metric,omitempty" koanf:"metric"` } ) diff --git a/internal/natsclient/client.go b/internal/natsclient/client.go index 254d062..0db4e3c 100644 --- a/internal/natsclient/client.go +++ b/internal/natsclient/client.go @@ -14,12 +14,12 @@ type NatsClient struct { Metrics Metrics } -func New(logger *zap.Logger, cfg Config) *Client { +func New(logger *zap.Logger, cfg Config) *NatsClient { nc := connect(logger, cfg) metrics := NewMetrics() - client := &Client{ + client := &NatsClient{ Conn: nc, Logger: logger, Config: cfg, @@ -56,18 +56,13 @@ func (c *NatsClient) StartMessaging() { go c.Publish("") } -func New(nc *nats.Conn, logger *zap.Logger, cfg Config) *NatsClient { - return &NatsClient{ - Conn: nc, - Logger: logger, - Config: cfg, - } -} - -func (c *NatsClient) StartMessaging() { - go c.Subscribe("") - go c.Publish("") -} +// func New(nc *nats.Conn, logger *zap.Logger, cfg Config) *NatsClient { +// return &NatsClient{ +// Conn: nc, +// Logger: logger, +// Config: cfg, +// } +// } func (c *NatsClient) Publish(subject string) { if subject == "" { diff --git a/internal/natsclient/metric.go b/internal/natsclient/metric.go index a339aeb..0fa1ea7 100644 --- a/internal/natsclient/metric.go +++ b/internal/natsclient/metric.go @@ -1,4 +1,4 @@ -package client +package natsclient import ( "errors" From 75a4786555983972f08bd8093893d20461d77636 Mon Sep 17 00:00:00 2001 From: kianaza Date: Tue, 21 May 2024 02:43:20 +0330 Subject: [PATCH 04/15] feat: add jetstream messaging --- internal/cmd/main.go | 6 +- internal/config/default.go | 11 +- internal/natsclient/config.go | 11 +- internal/natsclient/jetstream.go | 123 +++++++++++++++++++++ internal/natsclient/{client.go => nats.go} | 2 +- 5 files changed, 142 insertions(+), 11 deletions(-) create mode 100644 internal/natsclient/jetstream.go rename internal/natsclient/{client.go => nats.go} (98%) diff --git a/internal/cmd/main.go b/internal/cmd/main.go index 88efd97..c8e1407 100644 --- a/internal/cmd/main.go +++ b/internal/cmd/main.go @@ -13,10 +13,12 @@ import ( func main(cfg config.Config, logger *zap.Logger) { natsConfig := cfg.NATS - client := natsclient.New(logger, natsConfig) - + client := natsclient.New(natsConfig, logger) client.StartMessaging() + jetstreamClient := natsclient.NewJetstream(natsConfig, logger) + jetstreamClient.StartJetstreamMessaging() + sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) <-sig diff --git a/internal/config/default.go b/internal/config/default.go index 8717e1c..484a4c7 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -16,10 +16,13 @@ func Default() Config { Level: "debug", }, NATS: natsclient.Config{ - URL: "localhost:4222", - PublishInterval: 2 * time.Second, - RequestTimeout: 50 * time.Millisecond, - DefaultSubject: "test", + URL: "localhost:4222", + PublishInterval: 2 * time.Second, + RequestTimeout: 50 * time.Millisecond, + DefaultSubject: "test", + MaxPubAcksInflight: 10, + QueueSubscriptionGroup: "group", + FlushTimeout: 2 * time.Second, }, Metric: metric.Config{ Address: ":8080", diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index 357d69e..8eff014 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -3,8 +3,11 @@ package natsclient import "time" type Config struct { - URL string `json:"url,omitempty" koanf:"url"` - PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` - RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` - DefaultSubject string `json:"default_subject" koanf:"default_subject"` + URL string `json:"url,omitempty" koanf:"url"` + PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` + RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` + DefaultSubject string `json:"default_subject" koanf:"default_subject"` + MaxPubAcksInflight int `json:"max_pubAcks_inflight" koanf:"max_pubAcks_inflight"` + QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` + FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` } diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go new file mode 100644 index 0000000..ec4d0cf --- /dev/null +++ b/internal/natsclient/jetstream.go @@ -0,0 +1,123 @@ +package natsclient + +import ( + "time" + + "github.com/nats-io/nats.go" + "go.uber.org/zap" +) + +type Message struct { + Subject string + Data []byte +} + +// Jetstream represents the NATS core handler +type Jetstream struct { + connection *nats.Conn + jetstream nats.JetStreamContext + config *Config + logger *zap.Logger +} + +// NewJetstream initializes NATS JetStream connection +func NewJetstream(config Config, logger *zap.Logger) *Jetstream { + j := Jetstream{ + config: &config, + logger: logger, + } + + var err error + j.connection, err = nats.Connect(j.config.URL) + if err != nil { + logger.Panic("could not connect to nats", zap.Error(err)) + } + + // JetStream connection + j.jetstream, err = j.connection.JetStream() + if err != nil { + logger.Panic("could not connect to jetstream", zap.Error(err)) + } + + // Create a stream + _, err = j.jetstream.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + }) + if err != nil { + logger.Panic("could not add stream", zap.Error(err)) + } + + return &j +} + +func (j *Jetstream) StartJetstreamMessaging() { + messageChannel := j.CreateSubscribe("test.1") + go j.JetstreamPublish("test.1") + go j.JetstreamSubscribe(messageChannel) +} + +// Subscribe subscribes to a list of subjects and returns a channel with incoming messages +func (j *Jetstream) CreateSubscribe(subject string) chan *Message { + + messageHandler, h := messageHandlerFactoryJetstream() + _, err := j.jetstream.Subscribe( + subject, + messageHandler, + nats.DeliverNew(), + nats.ReplayInstant(), + nats.AckExplicit(), + nats.MaxAckPending(j.config.MaxPubAcksInflight), + ) + if err != nil { + j.logger.Error("could not Subscribe", zap.Error(err)) + } else { + j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject)) + } + + return h + +} + +func (j *Jetstream) JetstreamSubscribe(h chan *Message) { + for msg := range h { + var publishTime time.Time + publishTime.UnmarshalBinary(msg.Data) + j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", time.Since(publishTime).Seconds())) + } +} + +func (j *Jetstream) JetstreamPublish(subject string) { + for { + t := time.Now() + tt, _ := t.MarshalBinary() + + if ack, err := j.jetstream.Publish(subject, tt); err != nil { + j.logger.Error("jetstream publish message failed", zap.Error(err)) + } else { + j.logger.Info("receive ack", zap.String("ack", ack.Stream)) + } + time.Sleep(j.config.PublishInterval) + } +} + +// Close closes NATS connection +func (j *Jetstream) Close() { + if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { + j.logger.Error("could not flush", zap.Error(err)) + } + + j.connection.Close() + j.logger.Info("NATS is closed.") +} + +func messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { + ch := make(chan *Message) + return func(msg *nats.Msg) { + ch <- &Message{ + Subject: msg.Subject, + Data: msg.Data, + } + msg.Ack() + }, ch +} diff --git a/internal/natsclient/client.go b/internal/natsclient/nats.go similarity index 98% rename from internal/natsclient/client.go rename to internal/natsclient/nats.go index 0db4e3c..8385c9f 100644 --- a/internal/natsclient/client.go +++ b/internal/natsclient/nats.go @@ -14,7 +14,7 @@ type NatsClient struct { Metrics Metrics } -func New(logger *zap.Logger, cfg Config) *NatsClient { +func New(cfg Config, logger *zap.Logger) *NatsClient { nc := connect(logger, cfg) metrics := NewMetrics() From 3a99debdd8560b70214f083b34485d0553d291fa Mon Sep 17 00:00:00 2001 From: kianaza Date: Fri, 24 May 2024 21:40:05 +0330 Subject: [PATCH 05/15] feat: improve Jetstream functionality and metrics --- internal/cmd/main.go | 4 +- internal/config/default.go | 2 +- internal/natsclient/jetstream.go | 82 +++++++++++++++++++++++--------- internal/natsclient/metric.go | 18 +++---- internal/natsclient/nats.go | 2 +- 5 files changed, 72 insertions(+), 36 deletions(-) diff --git a/internal/cmd/main.go b/internal/cmd/main.go index c8e1407..ceda6a0 100644 --- a/internal/cmd/main.go +++ b/internal/cmd/main.go @@ -13,8 +13,8 @@ import ( func main(cfg config.Config, logger *zap.Logger) { natsConfig := cfg.NATS - client := natsclient.New(natsConfig, logger) - client.StartMessaging() + // client := natsclient.New(natsConfig, logger) + // client.StartMessaging() jetstreamClient := natsclient.NewJetstream(natsConfig, logger) jetstreamClient.StartJetstreamMessaging() diff --git a/internal/config/default.go b/internal/config/default.go index 484a4c7..36a9520 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -20,7 +20,7 @@ func Default() Config { PublishInterval: 2 * time.Second, RequestTimeout: 50 * time.Millisecond, DefaultSubject: "test", - MaxPubAcksInflight: 10, + MaxPubAcksInflight: 1000, QueueSubscriptionGroup: "group", FlushTimeout: 2 * time.Second, }, diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index ec4d0cf..27d5220 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -18,47 +18,69 @@ type Jetstream struct { jetstream nats.JetStreamContext config *Config logger *zap.Logger + metrics Metrics } // NewJetstream initializes NATS JetStream connection func NewJetstream(config Config, logger *zap.Logger) *Jetstream { j := Jetstream{ - config: &config, - logger: logger, + config: &config, + logger: logger, + metrics: NewMetrics(), } + j.connect() + + j.createJetstreamContext() + + j.createStream() + + return &j +} + +func (j *Jetstream) connect() { var err error j.connection, err = nats.Connect(j.config.URL) if err != nil { - logger.Panic("could not connect to nats", zap.Error(err)) + j.logger.Panic("could not connect to nats", zap.Error(err)) } - // JetStream connection + j.connection.SetDisconnectErrHandler(func(_ *nats.Conn, err error) { + j.metrics.ConnectionErrors.Add(1) + j.logger.Error("nats disconnected", zap.Error(err)) + }) + + j.connection.SetReconnectHandler(func(_ *nats.Conn) { + j.logger.Warn("nats reconnected") + }) +} + +func (j *Jetstream) createJetstreamContext() { + var err error j.jetstream, err = j.connection.JetStream() if err != nil { - logger.Panic("could not connect to jetstream", zap.Error(err)) + j.logger.Panic("could not connect to jetstream", zap.Error(err)) } +} - // Create a stream - _, err = j.jetstream.AddStream(&nats.StreamConfig{ +func (j *Jetstream) createStream() { + _, err := j.jetstream.AddStream(&nats.StreamConfig{ Name: "test", Subjects: []string{"test.*"}, }) if err != nil { - logger.Panic("could not add stream", zap.Error(err)) + j.logger.Panic("could not add stream", zap.Error(err)) } - - return &j } func (j *Jetstream) StartJetstreamMessaging() { - messageChannel := j.CreateSubscribe("test.1") - go j.JetstreamPublish("test.1") - go j.JetstreamSubscribe(messageChannel) + messageChannel := j.createSubscribe("test.1") + go j.jetstreamPublish("test.1") + go j.jetstreamSubscribe(messageChannel) } // Subscribe subscribes to a list of subjects and returns a channel with incoming messages -func (j *Jetstream) CreateSubscribe(subject string) chan *Message { +func (j *Jetstream) createSubscribe(subject string) chan *Message { messageHandler, h := messageHandlerFactoryJetstream() _, err := j.jetstream.Subscribe( @@ -79,30 +101,44 @@ func (j *Jetstream) CreateSubscribe(subject string) chan *Message { } -func (j *Jetstream) JetstreamSubscribe(h chan *Message) { +func (j *Jetstream) jetstreamSubscribe(h chan *Message) { for msg := range h { var publishTime time.Time publishTime.UnmarshalBinary(msg.Data) - j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", time.Since(publishTime).Seconds())) + latency := time.Since(publishTime).Seconds() + j.metrics.Latency.Observe(latency) + j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1) + j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency)) } } -func (j *Jetstream) JetstreamPublish(subject string) { +func (j *Jetstream) jetstreamPublish(subject string) { for { - t := time.Now() - tt, _ := t.MarshalBinary() + t, err := time.Now().MarshalBinary() + if err != nil { + j.logger.Error("could not marshal current time.", zap.Error(err)) + } - if ack, err := j.jetstream.Publish(subject, tt); err != nil { - j.logger.Error("jetstream publish message failed", zap.Error(err)) + if ack, err := j.jetstream.Publish(subject, t); err != nil { + j.metrics.SuccessCounter.WithLabelValues("failed publish").Add(1) + if err == nats.ErrTimeout { + j.logger.Error("Request timeout: No response received within the timeout period.") + } else if err == nats.ErrNoStreamResponse { + j.logger.Error("Request failed: No Stream available for the subject.") + } else { + j.logger.Error("Request failed: %v", zap.Error(err)) + } } else { - j.logger.Info("receive ack", zap.String("ack", ack.Stream)) + j.metrics.SuccessCounter.WithLabelValues("successful publish").Add(1) + j.logger.Info("receive ack", zap.String("stream", ack.Stream)) } + time.Sleep(j.config.PublishInterval) } } // Close closes NATS connection -func (j *Jetstream) Close() { +func (j *Jetstream) close() { if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { j.logger.Error("could not flush", zap.Error(err)) } diff --git a/internal/natsclient/metric.go b/internal/natsclient/metric.go index 0fa1ea7..0944099 100644 --- a/internal/natsclient/metric.go +++ b/internal/natsclient/metric.go @@ -12,15 +12,15 @@ const ( ) var latencyBuckets = []float64{ - 0.0001, // 0.1 ms - 0.0005, // 0.5 ms - 0.0007, // 0.7 ms - 0.001, // 1 ms - 0.002, // 2 ms - 0.003, // 3 ms - 0.004, // 4 ms - 0.005, // 5 ms - 0.006, // 6 ms + 0.0001, + 0.0005, + 0.0007, + 0.001, + 0.002, + 0.003, + 0.004, + 0.005, + 0.006, } // Metrics has all the client metrics. diff --git a/internal/natsclient/nats.go b/internal/natsclient/nats.go index 8385c9f..096dc3e 100644 --- a/internal/natsclient/nats.go +++ b/internal/natsclient/nats.go @@ -104,7 +104,7 @@ func (c *NatsClient) Subscribe(subject string) { c.Metrics.Latency.Observe(latency) c.Logger.Info("Received message successfully: ", zap.Float64("latency", latency)) } - c.Metrics.SuccessCounter.WithLabelValues("subscribe").Add(1) + c.Metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1) err = c.Conn.Publish(msg.Reply, []byte("ack!")) if err != nil { From 9b8cd4b5282ffa305a32e1798bd766fd3f3f5f79 Mon Sep 17 00:00:00 2001 From: kianaza Date: Fri, 24 May 2024 22:00:58 +0330 Subject: [PATCH 06/15] fix: fix golint errors --- internal/natsclient/jetstream.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 27d5220..03b61af 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -82,7 +82,7 @@ func (j *Jetstream) StartJetstreamMessaging() { // Subscribe subscribes to a list of subjects and returns a channel with incoming messages func (j *Jetstream) createSubscribe(subject string) chan *Message { - messageHandler, h := messageHandlerFactoryJetstream() + messageHandler, h := j.messageHandlerFactoryJetstream() _, err := j.jetstream.Subscribe( subject, messageHandler, @@ -104,7 +104,12 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message { func (j *Jetstream) jetstreamSubscribe(h chan *Message) { for msg := range h { var publishTime time.Time - publishTime.UnmarshalBinary(msg.Data) + err := publishTime.UnmarshalBinary(msg.Data) + if err != nil { + j.logger.Error("unable to unmarshal binary data for publishTime.") + j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject)) + return + } latency := time.Since(publishTime).Seconds() j.metrics.Latency.Observe(latency) j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1) @@ -137,23 +142,16 @@ func (j *Jetstream) jetstreamPublish(subject string) { } } -// Close closes NATS connection -func (j *Jetstream) close() { - if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { - j.logger.Error("could not flush", zap.Error(err)) - } - - j.connection.Close() - j.logger.Info("NATS is closed.") -} - -func messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { +func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { ch := make(chan *Message) return func(msg *nats.Msg) { ch <- &Message{ Subject: msg.Subject, Data: msg.Data, } - msg.Ack() + err := msg.Ack() + if err != nil { + j.logger.Error("Failed to acknowledge the message", zap.Error(err)) + } }, ch } From ebedea303b3d357d8a79a4d80a09dd3b981d77a7 Mon Sep 17 00:00:00 2001 From: kianaza Date: Fri, 24 May 2024 22:04:40 +0330 Subject: [PATCH 07/15] Remove .DS_Store files --- internal/.DS_Store | Bin 6148 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 internal/.DS_Store diff --git a/internal/.DS_Store b/internal/.DS_Store deleted file mode 100644 index 7ee04fa20b2c0f41d8492b2f2d9b2ebc0630a371..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK!Ab)$5S`H?1us4JBIFAM{~(rn@Z2Ae)>?$cg`)TTk)NpV&5WgM_gq9~mdwj0 zZ?egQO)?^)i;w3!k(r20QHaXQh_QLJ>mtaPK(#U6hC?*+?g^ zvX$ZdDat*{?U$KNpWGh27w~sSYbE#dgxi6;!?TtBkgGSpxHfpYR3H^d1yX@j;0y}j z%%)AR9n+@*sX!`lP=NbGK_S+_=4e|7oTUK337XAN=T(?7C9noINA$qdLxCQukz%Na zqrC*L1~x|z7megY}s= Ifio!Z4Y80j4FCWD From 0215bb3332d1ce9b2eff6cf70554dd30ca08a8a1 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 14:53:58 +0330 Subject: [PATCH 08/15] fix: change start messaging function name --- internal/cmd/main.go | 2 +- internal/natsclient/jetstream.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cmd/main.go b/internal/cmd/main.go index ceda6a0..22351e2 100644 --- a/internal/cmd/main.go +++ b/internal/cmd/main.go @@ -17,7 +17,7 @@ func main(cfg config.Config, logger *zap.Logger) { // client.StartMessaging() jetstreamClient := natsclient.NewJetstream(natsConfig, logger) - jetstreamClient.StartJetstreamMessaging() + jetstreamClient.Startblackboxtest() sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 03b61af..ebc6acf 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -73,7 +73,7 @@ func (j *Jetstream) createStream() { } } -func (j *Jetstream) StartJetstreamMessaging() { +func (j *Jetstream) Startblackboxtest() { messageChannel := j.createSubscribe("test.1") go j.jetstreamPublish("test.1") go j.jetstreamSubscribe(messageChannel) From ee44a485126f3f393cf2836f13b5e47962935f8e Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 15:56:31 +0330 Subject: [PATCH 09/15] feat: Add stream configuration into config --- internal/cmd/main.go | 1 + internal/config/default.go | 5 +++++ internal/natsclient/config.go | 6 ++++++ internal/natsclient/jetstream.go | 26 +++++++++++++++++++------- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/cmd/main.go b/internal/cmd/main.go index 22351e2..51930e0 100644 --- a/internal/cmd/main.go +++ b/internal/cmd/main.go @@ -22,6 +22,7 @@ func main(cfg config.Config, logger *zap.Logger) { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) <-sig + jetstreamClient.Close() logger.Info("Received termination signal. Exiting...") os.Exit(0) } diff --git a/internal/config/default.go b/internal/config/default.go index 36a9520..2be9f70 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -16,6 +16,11 @@ func Default() Config { Level: "debug", }, NATS: natsclient.Config{ + Streams: []natsclient.Stream{natsclient.Stream{ + Name: "test", + Subjects: []string{"test.*"}, + }, + }, URL: "localhost:4222", PublishInterval: 2 * time.Second, RequestTimeout: 50 * time.Millisecond, diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index 8eff014..364729c 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -3,6 +3,7 @@ package natsclient import "time" type Config struct { + Streams []Stream `json:"streams,omitempty" koanf:"streams"` URL string `json:"url,omitempty" koanf:"url"` PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` @@ -11,3 +12,8 @@ type Config struct { QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` } + +type Stream struct { + Name string `json:"name,omitempty" koanf:"name"` + Subjects []string `json:"subjects,omitempty" koanf:"subjects"` +} diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index ebc6acf..17e35e2 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -64,12 +64,14 @@ func (j *Jetstream) createJetstreamContext() { } func (j *Jetstream) createStream() { - _, err := j.jetstream.AddStream(&nats.StreamConfig{ - Name: "test", - Subjects: []string{"test.*"}, - }) - if err != nil { - j.logger.Panic("could not add stream", zap.Error(err)) + for _, streamConf := range j.config.Streams { + _, err := j.jetstream.AddStream(&nats.StreamConfig{ + Name: streamConf.Name, + Subjects: streamConf.Subjects, + }) + if err != nil { + j.logger.Panic("could not add stream", zap.Error(err)) + } } } @@ -92,7 +94,7 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message { nats.MaxAckPending(j.config.MaxPubAcksInflight), ) if err != nil { - j.logger.Error("could not Subscribe", zap.Error(err)) + j.logger.Panic("could not Subscribe", zap.Error(err)) } else { j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject)) } @@ -155,3 +157,13 @@ func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Mes } }, ch } + +// Close closes NATS connection +func (j *Jetstream) Close() { + if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { + j.logger.Error("could not flush", zap.Error(err)) + } + + j.connection.Close() + j.logger.Info("NATS is closed.") +} From 3a32c717e13c2b4f09fd413fb5a9e78a28bb3f92 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 16:18:01 +0330 Subject: [PATCH 10/15] feat: Add reconnection counter to metrics --- internal/natsclient/jetstream.go | 3 ++- internal/natsclient/metric.go | 33 +++++++------------------------- internal/natsclient/nats.go | 3 ++- 3 files changed, 11 insertions(+), 28 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 17e35e2..c33c8e9 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -46,12 +46,13 @@ func (j *Jetstream) connect() { } j.connection.SetDisconnectErrHandler(func(_ *nats.Conn, err error) { - j.metrics.ConnectionErrors.Add(1) + j.metrics.Connection.WithLabelValues("disconnection").Add(1) j.logger.Error("nats disconnected", zap.Error(err)) }) j.connection.SetReconnectHandler(func(_ *nats.Conn) { j.logger.Warn("nats reconnected") + j.metrics.Connection.WithLabelValues("reconnection").Add(1) }) } diff --git a/internal/natsclient/metric.go b/internal/natsclient/metric.go index 0944099..97467cf 100644 --- a/internal/natsclient/metric.go +++ b/internal/natsclient/metric.go @@ -25,28 +25,9 @@ var latencyBuckets = []float64{ // Metrics has all the client metrics. type Metrics struct { - ConnectionErrors prometheus.Counter - Latency prometheus.Histogram - SuccessCounter prometheus.CounterVec -} - -// nolint: ireturn -func newCounter(counterOpts prometheus.CounterOpts) prometheus.Counter { - ev := prometheus.NewCounter(counterOpts) - - if err := prometheus.Register(ev); err != nil { - var are prometheus.AlreadyRegisteredError - if ok := errors.As(err, &are); ok { - ev, ok = are.ExistingCollector.(prometheus.Counter) - if !ok { - panic("different metric type registration") - } - } else { - panic(err) - } - } - - return ev + Connection prometheus.CounterVec + Latency prometheus.Histogram + SuccessCounter prometheus.CounterVec } // nolint: ireturn @@ -89,13 +70,13 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom func NewMetrics() Metrics { return Metrics{ - ConnectionErrors: newCounter(prometheus.CounterOpts{ + Connection: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Subsystem: Subsystem, Name: "connection_errors_total", - Help: "total number of connection errors", + Help: "total number of disconnections and reconnections", ConstLabels: nil, - }), + }, []string{"type"}), // nolint: exhaustruct Latency: newHistogram(prometheus.HistogramOpts{ Namespace: Namespace, @@ -109,7 +90,7 @@ func NewMetrics() Metrics { Namespace: Namespace, Subsystem: Subsystem, Name: "success_counter", - Help: "success rate", + Help: "publish and consume success rate", ConstLabels: nil, }, []string{"type"}), } diff --git a/internal/natsclient/nats.go b/internal/natsclient/nats.go index 096dc3e..35e8c5d 100644 --- a/internal/natsclient/nats.go +++ b/internal/natsclient/nats.go @@ -27,11 +27,12 @@ func New(cfg Config, logger *zap.Logger) *NatsClient { } nc.SetDisconnectErrHandler(func(_ *nats.Conn, err error) { - metrics.ConnectionErrors.Add(1) + metrics.Connection.WithLabelValues("disconnection").Add(1) logger.Error("nats disconnected", zap.Error(err)) }) nc.SetReconnectHandler(func(_ *nats.Conn) { + metrics.Connection.WithLabelValues("reconnection").Add(1) logger.Warn("nats reconnected") }) From 38e4f9d51deef6742fbcc8172e7a6f6279f97ec2 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 16:20:12 +0330 Subject: [PATCH 11/15] fix: Fix NewJetstream function --- internal/natsclient/jetstream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index c33c8e9..51967ae 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -23,7 +23,7 @@ type Jetstream struct { // NewJetstream initializes NATS JetStream connection func NewJetstream(config Config, logger *zap.Logger) *Jetstream { - j := Jetstream{ + j := &Jetstream{ config: &config, logger: logger, metrics: NewMetrics(), @@ -35,7 +35,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream { j.createStream() - return &j + return j } func (j *Jetstream) connect() { From 33b45b9f9d9b87057049319057a7b67fae9eba30 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 16:26:49 +0330 Subject: [PATCH 12/15] fix: Update configuration for improved clarity --- internal/config/default.go | 2 +- internal/metric/config.go | 6 +++++- internal/metric/server.go | 12 ++++++------ 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/config/default.go b/internal/config/default.go index 2be9f70..c355b06 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -30,7 +30,7 @@ func Default() Config { FlushTimeout: 2 * time.Second, }, Metric: metric.Config{ - Address: ":8080", + Server: metric.Server{Address: ":8080"}, Enabled: true, }, } diff --git a/internal/metric/config.go b/internal/metric/config.go index 9d15d35..a5e3aea 100644 --- a/internal/metric/config.go +++ b/internal/metric/config.go @@ -1,6 +1,10 @@ package metric type Config struct { - Address string `json:"address,omitempty" koanf:"address"` + Server Server `json:"server,omitempty" koanf:"server"` Enabled bool `json:"enabled,omitempty" koanf:"enabled"` } + +type Server struct { + Address string `json:"address,omitempty" koanf:"address"` +} diff --git a/internal/metric/server.go b/internal/metric/server.go index 1844436..d4d8a70 100644 --- a/internal/metric/server.go +++ b/internal/metric/server.go @@ -9,14 +9,14 @@ import ( "go.uber.org/zap" ) -// Server contains information about metrics server. -type Server struct { +// MetricServer contains information about metrics server. +type MetricServer struct { srv *http.ServeMux address string } // NewServer creates a new monitoring server. -func NewServer(cfg Config) Server { +func NewServer(cfg Config) MetricServer { var srv *http.ServeMux if cfg.Enabled { @@ -24,15 +24,15 @@ func NewServer(cfg Config) Server { srv.Handle("/metrics", promhttp.Handler()) } - return Server{ - address: cfg.Address, + return MetricServer{ + address: cfg.Server.Address, srv: srv, } } // Start creates and run a metric server for prometheus in new go routine. // nolint: mnd -func (s Server) Start(logger *zap.Logger) { +func (s MetricServer) Start(logger *zap.Logger) { go func() { // nolint: exhaustruct srv := http.Server{ From 9e337d69cc975e7ff830b7b261a5ff03b78081d8 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 16:35:45 +0330 Subject: [PATCH 13/15] fix: Change MetricServer name to resolve golint issue --- internal/metric/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/metric/server.go b/internal/metric/server.go index d4d8a70..e3e2f3c 100644 --- a/internal/metric/server.go +++ b/internal/metric/server.go @@ -10,13 +10,13 @@ import ( ) // MetricServer contains information about metrics server. -type MetricServer struct { +type server struct { srv *http.ServeMux address string } // NewServer creates a new monitoring server. -func NewServer(cfg Config) MetricServer { +func NewServer(cfg Config) server { var srv *http.ServeMux if cfg.Enabled { @@ -24,7 +24,7 @@ func NewServer(cfg Config) MetricServer { srv.Handle("/metrics", promhttp.Handler()) } - return MetricServer{ + return server{ address: cfg.Server.Address, srv: srv, } @@ -32,7 +32,7 @@ func NewServer(cfg Config) MetricServer { // Start creates and run a metric server for prometheus in new go routine. // nolint: mnd -func (s MetricServer) Start(logger *zap.Logger) { +func (s server) Start(logger *zap.Logger) { go func() { // nolint: exhaustruct srv := http.Server{ From d951eb0856ca45aaf129a998a81bd79fed1fc301 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 16:41:21 +0330 Subject: [PATCH 14/15] fix: Change MetricServer name to resolve golint issue --- internal/metric/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/metric/server.go b/internal/metric/server.go index e3e2f3c..a7450c3 100644 --- a/internal/metric/server.go +++ b/internal/metric/server.go @@ -10,13 +10,13 @@ import ( ) // MetricServer contains information about metrics server. -type server struct { +type ServerInfo struct { srv *http.ServeMux address string } // NewServer creates a new monitoring server. -func NewServer(cfg Config) server { +func NewServer(cfg Config) ServerInfo { var srv *http.ServeMux if cfg.Enabled { @@ -24,7 +24,7 @@ func NewServer(cfg Config) server { srv.Handle("/metrics", promhttp.Handler()) } - return server{ + return ServerInfo{ address: cfg.Server.Address, srv: srv, } @@ -32,7 +32,7 @@ func NewServer(cfg Config) server { // Start creates and run a metric server for prometheus in new go routine. // nolint: mnd -func (s server) Start(logger *zap.Logger) { +func (s ServerInfo) Start(logger *zap.Logger) { go func() { // nolint: exhaustruct srv := http.Server{ From b7ee604064c3eb9ffb262c52aa4a0dccdb1dd224 Mon Sep 17 00:00:00 2001 From: kianaza Date: Sat, 25 May 2024 16:50:10 +0330 Subject: [PATCH 15/15] Fix: fix Stream struct --- internal/config/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/config/default.go b/internal/config/default.go index c355b06..064ee1a 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -16,7 +16,7 @@ func Default() Config { Level: "debug", }, NATS: natsclient.Config{ - Streams: []natsclient.Stream{natsclient.Stream{ + Streams: []natsclient.Stream{{ Name: "test", Subjects: []string{"test.*"}, },