Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use old-style prometheus metrics. #141

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions eth/filters/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
dropped := make(chan core.DropTxsEvent)
droppedSub := api.sys.backend.SubscribeDropTxsEvent(dropped)

metricsDroppedTxsNew.Inc()
defer metricsDroppedTxsEnd.Inc()
metricsDroppedTxsNew.Inc(1)
defer metricsDroppedTxsEnd.Inc(1)

for {
select {
case d := <-dropped:
metricsDroppedTxsReceived.Add(float64(len(d.Txs)))
metricsDroppedTxsReceived.Inc(int64(len(d.Txs)))
for _, tx := range d.Txs {
notification := &dropNotification{
Tx: newRPCPendingTransaction(tx),
Expand All @@ -142,7 +142,7 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
peerid, _ := txPeerMap.Get(tx.Hash())
notification.Peer, _ = peerIDMap.Load(peerid)
}
metricsDroppedTxsSent.Inc()
metricsDroppedTxsSent.Inc(1)
if err := notifier.Notify(rpcSub.ID, notification); err != nil {
log.Error("dropped_txs_stream: failed to notify", "err", err)
return
Expand Down
225 changes: 23 additions & 202 deletions eth/filters/metrics.go
Original file line number Diff line number Diff line change
@@ -1,209 +1,30 @@
package filters

import (
"os"

"github.com/prometheus/client_golang/prometheus"

bnPrometheus "github.com/ethereum/go-ethereum/bn/prometheus"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const streamSubsystem string = "stream"

var (
metricsHostName string

metricsPendingTxsNew = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_new",
Help: "Number of pending tx streams created",
},
)

metricsPendingTxsEnd = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_end",
Help: "Number of pending tx streams ended",
},
)

metricsPendingTxsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_received",
Help: "Number of pending txs received",
},
)

metricsPendingTxsGasTooLow = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_gas_too_low",
Help: "Number txs ignore because of gas",
},
)

metricsPendingTxsTraceSuccess = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_trace_success",
Help: "Number txs successfully traced",
},
)

metricsPendingTxsTraceFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_trace_failed",
Help: "Number txs failed to trace",
},
)

metricsPendingTxsSent = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_sent",
Help: "Number of pending txs sent",
},
)

metricsBlocksNew = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_new",
Help: "Number of block streams created",
},
)

metricsBlocksEnd = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_end",
Help: "Number of block streams ended",
},
)

metricsBlocksReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_received",
Help: "Number of blocks received",
},
)

metricsBlocksTraceSuccess = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_trace_success",
Help: "Number of blocks successfully traced",
},
)

metricsBlocksTraceFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_trace_failed",
Help: "Number of blocks failed to trace",
},
)

metricsBlocksSent = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_sent",
Help: "Number of blocks sent",
},
)

metricsDroppedTxsNew = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_new",
Help: "Number of dropped tx streams created",
},
)

metricsDroppedTxsEnd = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_end",
Help: "Number of dropped tx streams ended",
},
)

metricsDroppedTxsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_received",
Help: "Number of dropped txs received",
},
)

metricsDroppedTxsSent = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_sent",
Help: "Number of dropped txs sent",
},
)

metricsTracePendingTxTimer = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: streamSubsystem,
Name: "trace_pending_tx_duration",
Help: "Trace pending tx duration in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{},
)

metricsTraceBlockTimer = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: streamSubsystem,
Name: "trace_blocks_duration",
Help: "Trace blocks duration in seconds",
Buckets: []float64{.01, .025, .05, .1, .25, .5, 1, 5, 10, 15},
},
[]string{},
)
metricsPendingTxsNew = metrics.NewRegisteredCounter("stream/pending_txs/new", nil)
metricsPendingTxsEnd = metrics.NewRegisteredCounter("stream/pending_txs/end", nil)
metricsPendingTxsReceived = metrics.NewRegisteredCounter("stream/pending_txs/received", nil)
metricsPendingTxsGasTooLow = metrics.NewRegisteredCounter("stream/pending_txs/gas_too_low", nil)
metricsPendingTxsTraceSuccess = metrics.NewRegisteredCounter("stream/pending_txs/trace_success", nil)
metricsPendingTxsTraceFailed = metrics.NewRegisteredCounter("stream/pending_txs/trace_failed", nil)
metricsPendingTxsSent = metrics.NewRegisteredCounter("stream/pending_txs/sent", nil)

metricsBlocksNew = metrics.NewRegisteredCounter("stream/blocks/new", nil)
metricsBlocksEnd = metrics.NewRegisteredCounter("stream/blocks/end", nil)
metricsBlocksReceived = metrics.NewRegisteredCounter("stream/blocks/received", nil)
metricsBlocksTraceSuccess = metrics.NewRegisteredCounter("stream/blocks/trace_success", nil)
metricsBlocksTraceFailed = metrics.NewRegisteredCounter("stream/blocks/trace_failed", nil)
metricsBlocksSent = metrics.NewRegisteredCounter("stream/blocks/sent", nil)

metricsDroppedTxsNew = metrics.NewRegisteredCounter("stream/dropped_txs/new", nil)
metricsDroppedTxsEnd = metrics.NewRegisteredCounter("stream/dropped_txs/end", nil)
metricsDroppedTxsReceived = metrics.NewRegisteredCounter("stream/dropped_txs/received", nil)
metricsDroppedTxsSent = metrics.NewRegisteredCounter("stream/dropped_txs/sent", nil)

metricsTracePendingTxTimer = metrics.NewRegisteredHistogram("stream/pending_txs/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015))
metricsTraceBlockTimer = metrics.NewRegisteredHistogram("stream/blocks/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015))
)

func init() {
var err error
metricsHostName, err = os.Hostname()
if err != nil {
log.Error("failed to get hostname for metrics", "err", err)
}

register := func(c prometheus.Collector) {
if err := bnPrometheus.Metrics.Register(c); err != nil {
log.Error("failed to register metrics", "err", err)
}
}

register(metricsPendingTxsNew)
register(metricsPendingTxsEnd)
register(metricsPendingTxsReceived)
register(metricsPendingTxsGasTooLow)
register(metricsPendingTxsTraceSuccess)
register(metricsPendingTxsTraceFailed)
register(metricsPendingTxsSent)

register(metricsBlocksNew)
register(metricsBlocksEnd)
register(metricsBlocksReceived)
register(metricsBlocksTraceSuccess)
register(metricsBlocksTraceFailed)
register(metricsBlocksSent)

register(metricsDroppedTxsNew)
register(metricsDroppedTxsEnd)
register(metricsDroppedTxsReceived)
register(metricsDroppedTxsSent)

register(metricsTracePendingTxTimer)
register(metricsTraceBlockTimer)
}
37 changes: 18 additions & 19 deletions eth/filters/trace_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"errors"
"fmt"
"math/big"

"github.com/prometheus/client_golang/prometheus"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -54,8 +53,8 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
return
}

metricsPendingTxsNew.Inc()
defer metricsPendingTxsEnd.Inc()
metricsPendingTxsNew.Inc(1)
defer metricsPendingTxsEnd.Inc(1)

for {
select {
Expand Down Expand Up @@ -92,7 +91,7 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
return
}

metricsPendingTxsReceived.Add(float64(len(txs)))
metricsPendingTxsReceived.Inc(int64(len(txs)))
for _, tx := range txs {
msg, _ = core.TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
Expand All @@ -102,20 +101,20 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace

if msg.GasFeeCap.Cmp(header.BaseFee) < 0 {
log.Trace("pending_txs_stream: tx gas fee too low", "tx", tx.Hash(), "gasFeeCap", msg.GasFeeCap, "baseFee", header.BaseFee)
metricsPendingTxsGasTooLow.Inc()
metricsPendingTxsGasTooLow.Inc(1)
continue
}

traceCtx.TxHash = tx.Hash()
timer := prometheus.NewTimer(metricsTracePendingTxTimer.With(nil))
startTime := time.Now()
trace, err := traceTx(msg, traceCtx, blockCtx, chainConfig, statedb, tracerOpts)
if err != nil {
log.Error("pending_txs_stream: failed to trace tx", "err", err, "tx", tx.Hash())
metricsPendingTxsTraceFailed.Inc()
metricsPendingTxsTraceFailed.Inc(1)
continue
}
timer.ObserveDuration()
metricsPendingTxsTraceSuccess.Inc()
metricsTracePendingTxTimer.Update(time.Since(startTime).Milliseconds())
metricsPendingTxsTraceSuccess.Inc(1)

gasPrice := hexutil.Big(*tx.GasPrice())
rpcTx := newRPCPendingTransaction(tx)
Expand All @@ -135,7 +134,7 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
log.Error("pending_txs_stream: failed to notify", "err", err)
return
}
metricsPendingTxsSent.Add(float64(len(tracedTxs)))
metricsPendingTxsSent.Inc(int64(len(tracedTxs)))
case <-rpcSub.Err():
return
case <-notifier.Closed():
Expand Down Expand Up @@ -172,8 +171,8 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}

metricsBlocksNew.Inc()
defer metricsBlocksEnd.Inc()
metricsBlocksNew.Inc(1)
defer metricsBlocksEnd.Inc(1)

var hashes []common.Hash
for {
Expand All @@ -195,7 +194,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}

metricsBlocksReceived.Add(float64(len(hashes)))
metricsBlocksReceived.Inc(int64(len(hashes)))
for _, hash := range hashes {
block, err := api.sys.backend.BlockByHash(ctx, hash)
if err != nil {
Expand All @@ -212,11 +211,11 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON

trace, err := traceBlock(block, chainConfig, api.sys.chain, tracerOpts)
if err != nil {
metricsBlocksTraceFailed.Inc()
metricsBlocksTraceFailed.Inc(1)
log.Error("block_stream: failed to trace block", "err", err, "block", block.Number())
continue
}
metricsBlocksTraceSuccess.Inc()
metricsBlocksTraceSuccess.Inc(1)
marshalBlock["trace"] = trace

marshalReceipts := make(map[common.Hash]map[string]interface{})
Expand Down Expand Up @@ -254,7 +253,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}
log.Info("block_stream: sent block", "hash", hash, "number", block.Number(), "sub_id", rpcSub.ID)
metricsBlocksSent.Inc()
metricsBlocksSent.Inc(1)
}
}
}()
Expand Down Expand Up @@ -301,7 +300,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
results = make([]*blocknative.Trace, len(txs))
)

timer := prometheus.NewTimer(metricsTraceBlockTimer.With(nil))
startTime := time.Now()
for i, tx := range txs {
msg, err := core.TransactionToMessage(tx, signer, block.BaseFee())
if err != nil {
Expand All @@ -319,7 +318,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
}
statedb.Finalise(is158)
}
timer.ObserveDuration()
metricsTraceBlockTimer.Update(time.Since(startTime).Milliseconds())

return results, nil
}
Expand Down