Skip to content

Commit

Permalink
Merge pull request #446 from bitcoin-sv/callback-stats-collect
Browse files Browse the repository at this point in the history
collect stats from callbacker
  • Loading branch information
shotasilagadzetaal authored Jun 7, 2024
2 parents 991649f + f40db38 commit d78430b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 7 deletions.
7 changes: 6 additions & 1 deletion cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func StartMetamorph(logger *slog.Logger) (func(), error) {
return nil, err
}

callbacker, err := metamorph.NewCallbacker(&http.Client{Timeout: 5 * time.Second})
if err != nil {
return nil, err
}

metamorphProcessor, err := metamorph.NewProcessor(
metamorphStore,
pm,
Expand All @@ -138,7 +143,7 @@ func StartMetamorph(logger *slog.Logger) (func(), error) {
metamorph.WithMessageQueueClient(mqClient),
metamorph.WithMinedTxsChan(minedTxsChan),
metamorph.WithProcessStatusUpdatesInterval(processStatusUpdateInterval),
metamorph.WithCallbackSender(metamorph.NewCallbacker(&http.Client{Timeout: 5 * time.Second})),
metamorph.WithCallbackSender(callbacker),
metamorph.WithStatTimeLimits(statsNotSeenTimeLimit, statsNotMinedTimeLimit),
metamorph.WithMaxRetries(maxRetries),
metamorph.WithMinimumHealthyConnections(minimumHealthyConnections),
Expand Down
77 changes: 71 additions & 6 deletions internal/metamorph/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@ import (
"time"

"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api"
"github.com/ordishs/go-utils"
"github.com/prometheus/client_golang/prometheus"
)

const (
CallbackTries = 5
CallbackIntervalSeconds = 5
)

type CallbackerStats struct {
callbackSeenOnNetworkCount prometheus.Gauge
callbackSeenInOrphanMempoolCount prometheus.Gauge
callbackRejectedCount prometheus.Gauge
callbackMinedCount prometheus.Gauge
callbackFailedCount prometheus.Gauge
}

type Callback struct {
BlockHash *string `json:"blockHash,omitempty"`
BlockHeight *uint64 `json:"blockHeight,omitempty"`
Expand All @@ -30,16 +40,52 @@ type Callback struct {
}

type Callbacker struct {
httpClient HttpClient
wg sync.WaitGroup
mu sync.Mutex
disposed bool
httpClient HttpClient
wg sync.WaitGroup
mu sync.Mutex
disposed bool
callbackerStats *CallbackerStats
}

func NewCallbacker(httpClient HttpClient) *Callbacker {
return &Callbacker{
func NewCallbacker(httpClient HttpClient) (*Callbacker, error) {
callbacker := &Callbacker{
httpClient: httpClient,
callbackerStats: &CallbackerStats{
callbackSeenOnNetworkCount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "arc_callback_seen_on_network_count",
Help: "Number of arc_callback_seen_on_network_count transactions",
}),
callbackSeenInOrphanMempoolCount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "arc_callback_seen_in_orphan_mempool_count",
Help: "Number of arc_callback_seen_in_orphan_mempool_count transactions",
}),
callbackRejectedCount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "arc_callback_rejected_count",
Help: "Number of arc_callback_rejected_count transactions",
}),
callbackMinedCount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "arc_callback_mined_count",
Help: "Number of arc_callback_mined_count transactions",
}),
callbackFailedCount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "arc_callback_failed_count",
Help: "Number of arc_callback_failed_count transactions",
}),
},
}

err := registerStats(
callbacker.callbackerStats.callbackSeenOnNetworkCount,
callbacker.callbackerStats.callbackSeenInOrphanMempoolCount,
callbacker.callbackerStats.callbackRejectedCount,
callbacker.callbackerStats.callbackMinedCount,
callbacker.callbackerStats.callbackFailedCount,
)
if err != nil {
return nil, err
}

return callbacker, nil
}

type HttpClient interface {
Expand Down Expand Up @@ -104,6 +150,16 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) {

// if callback was sent successfully we stop here
if response.StatusCode == http.StatusOK {
switch tx.Status {
case metamorph_api.Status_SEEN_ON_NETWORK:
p.callbackerStats.callbackSeenOnNetworkCount.Inc()
case metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL:
p.callbackerStats.callbackSeenInOrphanMempoolCount.Inc()
case metamorph_api.Status_MINED:
p.callbackerStats.callbackMinedCount.Inc()
case metamorph_api.Status_REJECTED:
p.callbackerStats.callbackRejectedCount.Inc()
}
return
}

Expand All @@ -115,6 +171,7 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) {
sleepDuration *= 2
}

p.callbackerStats.callbackFailedCount.Inc()
logger.Warn("Couldn't send transaction callback after tries", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("retries", CallbackTries))
}

Expand All @@ -126,6 +183,14 @@ func (p *Callbacker) Shutdown(logger *slog.Logger) {
return
}

unregisterStats(
p.callbackerStats.callbackSeenOnNetworkCount,
p.callbackerStats.callbackSeenInOrphanMempoolCount,
p.callbackerStats.callbackRejectedCount,
p.callbackerStats.callbackMinedCount,
p.callbackerStats.callbackFailedCount,
)

p.disposed = true
p.mu.Unlock()

Expand Down

0 comments on commit d78430b

Please sign in to comment.