From f40db38b7196b86de7fc772858d30301c8212129 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 7 Jun 2024 09:48:39 +0200 Subject: [PATCH] collect stats from callbacker --- cmd/arc/services/metamorph.go | 7 ++- internal/metamorph/callbacker.go | 77 +++++++++++++++++++++++++++++--- 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 908d8ae51..556b76987 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -128,6 +128,11 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { return nil, err } + callbacker, err := metamorph.NewCallbacker(&http.Client{Timeout: 5 * time.Second}) + if err != nil { + return nil, err + } + metamorphProcessor, err := metamorph.NewProcessor( metamorphStore, pm, @@ -138,7 +143,7 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { metamorph.WithMessageQueueClient(mqClient), metamorph.WithMinedTxsChan(minedTxsChan), metamorph.WithProcessStatusUpdatesInterval(processStatusUpdateInterval), - metamorph.WithCallbackSender(metamorph.NewCallbacker(&http.Client{Timeout: 5 * time.Second})), + metamorph.WithCallbackSender(callbacker), metamorph.WithStatTimeLimits(statsNotSeenTimeLimit, statsNotMinedTimeLimit), metamorph.WithMaxRetries(maxRetries), metamorph.WithMinimumHealthyConnections(minimumHealthyConnections), diff --git a/internal/metamorph/callbacker.go b/internal/metamorph/callbacker.go index a8a9e79cf..9f8d7a212 100644 --- a/internal/metamorph/callbacker.go +++ b/internal/metamorph/callbacker.go @@ -11,7 +11,9 @@ import ( "time" "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" "github.com/ordishs/go-utils" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -19,6 +21,14 @@ const ( CallbackIntervalSeconds = 5 ) +type CallbackerStats struct { + callbackSeenOnNetworkCount prometheus.Gauge + callbackSeenInOrphanMempoolCount prometheus.Gauge + callbackRejectedCount prometheus.Gauge + callbackMinedCount prometheus.Gauge + callbackFailedCount prometheus.Gauge +} + type Callback struct { BlockHash *string `json:"blockHash,omitempty"` BlockHeight *uint64 `json:"blockHeight,omitempty"` @@ -30,16 +40,52 @@ type Callback struct { } type Callbacker struct { - httpClient HttpClient - wg sync.WaitGroup - mu sync.Mutex - disposed bool + httpClient HttpClient + wg sync.WaitGroup + mu sync.Mutex + disposed bool + callbackerStats *CallbackerStats } -func NewCallbacker(httpClient HttpClient) *Callbacker { - return &Callbacker{ +func NewCallbacker(httpClient HttpClient) (*Callbacker, error) { + callbacker := &Callbacker{ httpClient: httpClient, + callbackerStats: &CallbackerStats{ + callbackSeenOnNetworkCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "arc_callback_seen_on_network_count", + Help: "Number of arc_callback_seen_on_network_count transactions", + }), + callbackSeenInOrphanMempoolCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "arc_callback_seen_in_orphan_mempool_count", + Help: "Number of arc_callback_seen_in_orphan_mempool_count transactions", + }), + callbackRejectedCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "arc_callback_rejected_count", + Help: "Number of arc_callback_rejected_count transactions", + }), + callbackMinedCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "arc_callback_mined_count", + Help: "Number of arc_callback_mined_count transactions", + }), + callbackFailedCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "arc_callback_failed_count", + Help: "Number of arc_callback_failed_count transactions", + }), + }, + } + + err := registerStats( + callbacker.callbackerStats.callbackSeenOnNetworkCount, + callbacker.callbackerStats.callbackSeenInOrphanMempoolCount, + callbacker.callbackerStats.callbackRejectedCount, + callbacker.callbackerStats.callbackMinedCount, + callbacker.callbackerStats.callbackFailedCount, + ) + if err != nil { + return nil, err } + + return callbacker, nil } type HttpClient interface { @@ -104,6 +150,16 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) { // if callback was sent successfully we stop here if response.StatusCode == http.StatusOK { + switch tx.Status { + case metamorph_api.Status_SEEN_ON_NETWORK: + p.callbackerStats.callbackSeenOnNetworkCount.Inc() + case metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL: + p.callbackerStats.callbackSeenInOrphanMempoolCount.Inc() + case metamorph_api.Status_MINED: + p.callbackerStats.callbackMinedCount.Inc() + case metamorph_api.Status_REJECTED: + p.callbackerStats.callbackRejectedCount.Inc() + } return } @@ -115,6 +171,7 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) { sleepDuration *= 2 } + p.callbackerStats.callbackFailedCount.Inc() logger.Warn("Couldn't send transaction callback after tries", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("retries", CallbackTries)) } @@ -126,6 +183,14 @@ func (p *Callbacker) Shutdown(logger *slog.Logger) { return } + unregisterStats( + p.callbackerStats.callbackSeenOnNetworkCount, + p.callbackerStats.callbackSeenInOrphanMempoolCount, + p.callbackerStats.callbackRejectedCount, + p.callbackerStats.callbackMinedCount, + p.callbackerStats.callbackFailedCount, + ) + p.disposed = true p.mu.Unlock()