Skip to content

Commit

Permalink
Bump go-conflux-util to improve metrics (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliqun authored Oct 12, 2024
1 parent fa85473 commit b122901
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 106 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.22

require (
github.com/Conflux-Chain/go-conflux-sdk v1.5.10
github.com/Conflux-Chain/go-conflux-util v0.2.2-0.20240718082304-6c749d77bd5a
github.com/Conflux-Chain/web3pay-service v0.0.0-20240718085234-2c8c8f7586bf
github.com/Conflux-Chain/go-conflux-util v0.2.2-0.20241011103527-e8cea809b65b
github.com/Conflux-Chain/web3pay-service v0.0.0-20241012013327-2958dd644fcd
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/buraksezer/consistent v0.9.0
github.com/cespare/xxhash v1.1.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Conflux-Chain/go-conflux-sdk v1.5.10 h1:zvoJmQZyEY3AgdTapghBeAO9xKUd3JTRs0Gs02NaFkI=
github.com/Conflux-Chain/go-conflux-sdk v1.5.10/go.mod h1:nJHYufKOuTqtZkVA1c/peRU9rsx5Bt5QhIW+q/Bg1q0=
github.com/Conflux-Chain/go-conflux-util v0.2.2-0.20240718082304-6c749d77bd5a h1:3ZiUTUU/rNkjKh3uHzfL6qcm0uLZg+1hGzUcapg0dl0=
github.com/Conflux-Chain/go-conflux-util v0.2.2-0.20240718082304-6c749d77bd5a/go.mod h1:ThmsVGfj0muqxC3i7Rvyg1GVb+B39mQFuEoHnqF1MBA=
github.com/Conflux-Chain/web3pay-service v0.0.0-20240718085234-2c8c8f7586bf h1:E4GIdTK6pIIQtNOIwA4SSVm5NjCRGWwby5jzV+vHUPU=
github.com/Conflux-Chain/web3pay-service v0.0.0-20240718085234-2c8c8f7586bf/go.mod h1:5WO9DCYr7/o0jRCbkkmKVIND5yEtxH3i6AdOoJskmGY=
github.com/Conflux-Chain/go-conflux-util v0.2.2-0.20241011103527-e8cea809b65b h1:dg0GewlNnRP4PKK7FSxvQvH56TZ1NebHXy+336x1J1c=
github.com/Conflux-Chain/go-conflux-util v0.2.2-0.20241011103527-e8cea809b65b/go.mod h1:ThmsVGfj0muqxC3i7Rvyg1GVb+B39mQFuEoHnqF1MBA=
github.com/Conflux-Chain/web3pay-service v0.0.0-20241012013327-2958dd644fcd h1:kfggrI/ddbU6xlZHYOAuhobcNdMkx4ikN2kDm/mijKE=
github.com/Conflux-Chain/web3pay-service v0.0.0-20241012013327-2958dd644fcd/go.mod h1:l05H92cO5zzvUfL7KaWTN30FQX9PQITWhCU7X5SVcJ4=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
Expand Down
4 changes: 2 additions & 2 deletions node/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *Status) MarshalJSON() ([]byte, error) {
LatestHeartBeatErrs []string `json:"latestHeartBeatErrs"`
}

availability := metricUtil.GetOrRegisterTimeWindowPercentageDefault(s.metric.availability).Value()
availability := metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, s.metric.availability).Snapshot().Value()
latency := metricUtil.GetOrRegisterHistogram(s.metric.latency).Snapshot()

scopy := Status{
Expand Down Expand Up @@ -205,7 +205,7 @@ func (sm *statusMetrics) update(start time.Time, err error) {
metricUtil.GetOrRegisterHistogram(sm.latency).Update(time.Since(start).Nanoseconds())
}

metricUtil.GetOrRegisterTimeWindowPercentageDefault(sm.availability).Mark(err == nil)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, sm.availability).Mark(err == nil)
}

func (sm *statusMetrics) unregisterAll() {
Expand Down
5 changes: 3 additions & 2 deletions store/epoch_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"fmt"
"time"

citypes "github.com/Conflux-Chain/confura/types"
"github.com/Conflux-Chain/confura/util"
Expand Down Expand Up @@ -94,8 +95,8 @@ func (epoch *EpochData) IsContinuousTo(prev *EpochData) (continuous bool, desc s

// QueryEpochData queries blockchain data for the specified epoch number.
func QueryEpochData(cfx sdk.ClientOperator, epochNumber uint64, useBatch bool) (EpochData, error) {
updater := metrics.Registry.Sync.QueryEpochData("cfx")
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Sync.QueryEpochData("cfx").UpdateSince(startTime)

data, err := queryEpochData(cfx, epochNumber, useBatch)
metrics.Registry.Sync.QueryEpochDataAvailability("cfx").
Expand Down
5 changes: 3 additions & 2 deletions store/eth_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Conflux-Chain/confura/util/metrics"
"github.com/Conflux-Chain/go-conflux-util/viper"
Expand Down Expand Up @@ -245,8 +246,8 @@ func QueryEthData(
blockNumber uint64,
opts ...QueryOption,
) (*EthData, error) {
updater := metrics.Registry.Sync.QueryEpochData("eth")
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Sync.QueryEpochData("eth").UpdateSince(startTime)

var opt QueryOption
if len(opts) > 0 {
Expand Down
13 changes: 7 additions & 6 deletions store/mysql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"sort"
"time"

"github.com/Conflux-Chain/confura/store"
citypes "github.com/Conflux-Chain/confura/types"
Expand Down Expand Up @@ -100,8 +101,8 @@ func (ms *MysqlStore) PushnWithFinalizer(dataSlice []*store.EpochData, finalizer
return err
}

updater := metrics.Registry.Store.Push("mysql")
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Store.Push("mysql").UpdateSince(startTime)

// the log partition to write universal event logs
var logPartition bnPartition
Expand Down Expand Up @@ -207,8 +208,8 @@ func (ms *MysqlStore) PopnWithFinalizer(epochUntil uint64, finalizer func(*gorm.
return nil
}

updater := metrics.Registry.Store.Pop("mysql")
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Store.Pop("mysql").UpdateSince(startTime)

return ms.baseStore.db.Transaction(func(dbTx *gorm.DB) error {
if !ms.disabler.IsChainBlockDisabled() {
Expand Down Expand Up @@ -264,8 +265,8 @@ func (ms *MysqlStore) PopnWithFinalizer(epochUntil uint64, finalizer func(*gorm.
}

func (ms *MysqlStore) GetLogs(ctx context.Context, storeFilter store.LogFilter) ([]*store.Log, error) {
updater := metrics.Registry.Store.GetLogs()
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Store.GetLogs().UpdateSince(startTime)

contracts := storeFilter.Contracts.ToSlice()

Expand Down
8 changes: 4 additions & 4 deletions store/redis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (rs *RedisStore) Pushn(dataSlice []*store.EpochData) error {
return nil
}

updater := metrics.Registry.Store.Push("redis")
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Store.Push("redis").UpdateSince(startTime)

_, lastEpoch, err := rs.GetGlobalEpochRange()
if rs.IsRecordNotFound(err) {
Expand Down Expand Up @@ -467,8 +467,8 @@ func (rs *RedisStore) remove(epochFrom, epochTo uint64, option store.EpochRemove
return errors.Errorf("invalid epoch range (%v,%v)", epochFrom, epochTo)
}

updater := metrics.Registry.Store.Pop("redis")
defer updater.Update()
startTime := time.Now()
defer metrics.Registry.Store.Pop("redis").UpdateSince(startTime)

numEpochs := epochTo - epochFrom + 1
watchKeys := make([]string, 0, numEpochs)
Expand Down
61 changes: 30 additions & 31 deletions util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func (*RpcMetrics) UpdateDuration(method string, err error, start time.Time) {
}

// Overall rate statistics
metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/rate/success").Mark(isNilErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/rate/rpcErr").Mark(isRpcErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/rate/nonRpcErr").Mark(!isNilErr && !isRpcErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(100, "infura/rpc/rate/success").Mark(isNilErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/rate/rpcErr").Mark(isRpcErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/rate/nonRpcErr").Mark(!isNilErr && !isRpcErr)

// RPC rate statistics
metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/rate/success/%v", method).Mark(isNilErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/rate/rpcErr/%v", method).Mark(isRpcErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/rate/nonRpcErr/%v", method).Mark(!isNilErr && !isRpcErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(100, "infura/rpc/rate/success/%v", method).Mark(isNilErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/rate/rpcErr/%v", method).Mark(isRpcErr)
metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/rate/nonRpcErr/%v", method).Mark(!isNilErr && !isRpcErr)

// Only update QPS & Latency if success or rpc error. Because, io error usually takes long time
// and impact the average latency.
Expand All @@ -65,19 +65,19 @@ func (*RpcMetrics) UpdateDuration(method string, err error, start time.Time) {
// RPC metrics - inputs

func (*RpcMetrics) InputEpoch(method, epoch string) metricUtil.Percentage {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/input/epoch/%v/%v", method, epoch)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/input/epoch/%v/%v", method, epoch)
}

func (*RpcMetrics) InputBlockHash(method string) metricUtil.Percentage {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/input/blockHash/%v", method)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/input/blockHash/%v", method)
}

func (*RpcMetrics) InputEpochGap(method string) metrics.Histogram {
return metricUtil.GetOrRegisterHistogram("infura/rpc/input/epoch/gap/%v", method)
}

func (*RpcMetrics) InputBlock(method, block string) metricUtil.Percentage {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/input/block/%v/%v", method, block)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/input/block/%v/%v", method, block)
}

func (*RpcMetrics) InputBlockGap(method string) metrics.Histogram {
Expand All @@ -93,14 +93,14 @@ func (*RpcMetrics) LogFilterSplit(method, name string) metrics.Histogram {
// PRC metrics - percentages

func (*RpcMetrics) Percentage(method, name string) metricUtil.Percentage {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/percentage/%v/%v", method, name)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/percentage/%v/%v", method, name)
}

// RPC metrics - store hit ratio

func (*RpcMetrics) StoreHit(method, storeName string) metricUtil.Percentage {
// use rpc method to distinguish cfx and eth
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/store/hit/%v/%v", storeName, method)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/store/hit/%v/%v", storeName, method)
}

// RPC metrics - fullnode
Expand All @@ -115,18 +115,18 @@ func (*RpcMetrics) FullnodeQps(node, space, method string, err error) metrics.Ti

func (*RpcMetrics) FullnodeErrorRate(node ...string) metricUtil.Percentage {
if len(node) == 0 {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/fullnode/rate/error")
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/fullnode/rate/error")
}

return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/fullnode/rate/error/%v", node[0])
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/fullnode/rate/error/%v", node[0])
}

func (*RpcMetrics) FullnodeNonRpcErrorRate(node ...string) metricUtil.Percentage {
if len(node) == 0 {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/fullnode/rate/nonRpcErr")
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/fullnode/rate/nonRpcErr")
}

return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/rpc/fullnode/rate/nonRpcErr/%v", node[0])
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/rpc/fullnode/rate/nonRpcErr/%v", node[0])
}

// Sync service metrics
Expand All @@ -144,27 +144,27 @@ func (*SyncMetrics) SyncOnceSize(space, storeName string) metrics.Histogram {
return metricUtil.GetOrRegisterHistogram("infura/sync/%v/%v/once/size", space, storeName)
}

func (*SyncMetrics) QueryEpochData(space string) metricUtil.TimerUpdater {
return metricUtil.NewTimerUpdaterByName(fmt.Sprintf("infura/sync/%v/fullnode", space))
func (*SyncMetrics) QueryEpochData(space string) metrics.Timer {
return metricUtil.GetOrRegisterTimer("infura/sync/%v/fullnode", space)
}

func (*SyncMetrics) QueryEpochDataAvailability(space string) metricUtil.Percentage {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/sync/%v/fullnode/availability", space)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/sync/%v/fullnode/availability", space)
}

// Store metrics
type StoreMetrics struct{}

func (*StoreMetrics) Push(storeName string) metricUtil.TimerUpdater {
return metricUtil.NewTimerUpdaterByName(fmt.Sprintf("infura/store/%v/push", storeName))
func (*StoreMetrics) Push(storeName string) metrics.Timer {
return metricUtil.GetOrRegisterTimer("infura/store/%v/push", storeName)
}

func (*StoreMetrics) Pop(storeName string) metricUtil.TimerUpdater {
return metricUtil.NewTimerUpdaterByName(fmt.Sprintf("infura/store/%v/pop", storeName))
func (*StoreMetrics) Pop(storeName string) metrics.Timer {
return metricUtil.GetOrRegisterTimer("infura/store/%v/pop", storeName)
}

func (*StoreMetrics) GetLogs() metricUtil.TimerUpdater {
return metricUtil.NewTimerUpdaterByName("infura/store/mysql/getlogs")
func (*StoreMetrics) GetLogs() metrics.Timer {
return metricUtil.GetOrRegisterTimer("infura/store/mysql/getlogs")
}

// Node manager metrics
Expand All @@ -190,7 +190,7 @@ func (*PubSubMetrics) Sessions(space, topic, node string) metrics.Gauge {
}

func (*PubSubMetrics) InputLogFilter(space string) metricUtil.Percentage {
return metricUtil.GetOrRegisterTimeWindowPercentageDefault("infura/pubsub/%v/input/logFilter", space)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/pubsub/%v/input/logFilter", space)
}

// Virtual filter metrics
Expand All @@ -212,16 +212,15 @@ func (*VirtualFilterMetrics) PollOnceSize(space, node string) metrics.Histogram
return metricUtil.GetOrRegisterHistogram("infura/virtualFilter/%v/poll/%v/once/size", space, node)
}

func (*VirtualFilterMetrics) PersistFilterChanges(space, node, store string) metricUtil.TimerUpdater {
return metricUtil.NewTimerUpdaterByName(fmt.Sprintf("infura/virtualFilter/%v/persist/%v/filterChanges/%v", space, node, store))
func (*VirtualFilterMetrics) PersistFilterChanges(space, node, store string) metrics.Timer {
return metricUtil.GetOrRegisterTimer("infura/virtualFilter/%v/persist/%v/filterChanges/%v", space, node, store)
}

func (*VirtualFilterMetrics) QueryFilterChanges(space, node, store string) metricUtil.TimerUpdater {
metricName := fmt.Sprintf("infura/virtualFilter/%v/query/%v/filterChanges/%v", space, node, store)
return metricUtil.NewTimerUpdaterByName(metricName)
func (*VirtualFilterMetrics) QueryFilterChanges(space, node, store string) metrics.Timer {
return metricUtil.GetOrRegisterTimer("infura/virtualFilter/%v/query/%v/filterChanges/%v", space, node, store)
}

func (*VirtualFilterMetrics) StoreQueryPercentage(space string, node, store string) metricUtil.Percentage {
metricName := fmt.Sprintf("infura/virtualFilter/%v/percentage/query/%v/filterChanges/%v", space, node, store)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(metricName)
return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, metricName)
}
5 changes: 4 additions & 1 deletion util/metrics/service/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func (r *Registry) GetOrRegisterMeter(name string, args ...interface{}) metrics.
func (r *Registry) GetOrRegisterHistogram(name string, args ...interface{}) metrics.Histogram {
metricName := fmt.Sprintf(name, args...)
return r.Registry.GetOrRegister(name, func() metrics.Histogram {
return &Histogram{clientMetric{metricName, r.updater}, metricUtil.NewDefaultHistogram()}
return &Histogram{
clientMetric{metricName, r.updater},
metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015)),
}
}).(metrics.Histogram)
}

Expand Down
12 changes: 6 additions & 6 deletions util/metrics/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ func (api *MetricsAPI) StopTimer(name string) {
}

// Percentage
func (api *MetricsAPI) MarkPercentage(name string, marked bool) {
metricUtil.GetOrRegisterPercentage(name).Mark(marked)
func (api *MetricsAPI) MarkPercentage(defaultVal float64, name string, marked bool) {
metricUtil.GetOrRegisterPercentage(defaultVal, name).Mark(marked)
}

// TimeWindowPercentage
func (api *MetricsAPI) MarkTimeWindowPercentageDefault(name string, marked bool) {
metricUtil.GetOrRegisterTimeWindowPercentageDefault(name).Mark(marked)
func (api *MetricsAPI) MarkTimeWindowPercentageDefault(defaultVal float64, name string, marked bool) {
metricUtil.GetOrRegisterTimeWindowPercentageDefault(defaultVal, name).Mark(marked)
}

func (api *MetricsAPI) MarkTimeWindowPercentage(name string, marked bool, slots int, slotIntervalNanos int64) {
metricUtil.GetOrRegisterTimeWindowPercentage(time.Duration(slotIntervalNanos), slots, name).Mark(marked)
func (api *MetricsAPI) MarkTimeWindowPercentage(defaultVal float64, name string, marked bool, slots int, slotIntervalNanos int64) {
metricUtil.GetOrRegisterTimeWindowPercentage(defaultVal, time.Duration(slotIntervalNanos), slots, name).Mark(marked)
}
Loading

0 comments on commit b122901

Please sign in to comment.