Skip to content

Commit

Permalink
Add rpc service metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Feb 4, 2025
1 parent b980f4f commit 036f6e3
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 10 deletions.
2 changes: 1 addition & 1 deletion internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func setupDeps(cfg Configs) (services.IngestService, error) {
return nil, fmt.Errorf("creating models: %w", err)
}
httpClient := &http.Client{Timeout: 30 * time.Second}
rpcService, err := services.NewRPCService(cfg.RPCURL, httpClient)
rpcService, err := services.NewRPCService(cfg.RPCURL, httpClient, metricsService)
if err != nil {
return nil, fmt.Errorf("instantiating rpc service: %w", err)
}
Expand Down
98 changes: 91 additions & 7 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@ type MetricsService struct {
latestLedgerIngested prometheus.Gauge
ingestionDuration *prometheus.SummaryVec

// Account Service Metrics
// Account Metrics
numAccountsRegistered *prometheus.CounterVec
numAccountsDeregistered *prometheus.CounterVec

// RPC Service Metrics
rpcRequestsTotal *prometheus.CounterVec
rpcRequestsDuration *prometheus.SummaryVec
rpcEndpointFailures *prometheus.CounterVec
rpcEndpointSuccesses *prometheus.CounterVec
rpcServiceHealth prometheus.Gauge
rpcLatestLedger prometheus.Gauge
}

// NewMetricsService creates a new metrics service with all metrics registered
Expand All @@ -29,29 +37,27 @@ func NewMetricsService(db *sqlx.DB) *MetricsService {
db: db,
}

// Ingest Service Metrics
m.numPaymentOpsIngestedPerLedger = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "num_payment_ops_ingested_per_ledger",
Help: "Number of payment operations ingested per ledger",
},
[]string{"operation_type"},
)

m.numTssTransactionsIngestedPerLedger = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "num_tss_transactions_ingested_per_ledger",
Help: "Number of tss transactions ingested per ledger",
},
[]string{"status"},
)

m.latestLedgerIngested = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "latest_ledger_ingested",
Help: "Latest ledger ingested",
},
)

m.ingestionDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "ingestion_duration_seconds",
Expand All @@ -61,14 +67,14 @@ func NewMetricsService(db *sqlx.DB) *MetricsService {
[]string{"type"},
)

// Account Metrics
m.numAccountsRegistered = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "num_accounts_registered",
Help: "Number of accounts registered",
},
[]string{"address"},
)

m.numAccountsDeregistered = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "num_accounts_deregistered",
Expand All @@ -77,6 +83,49 @@ func NewMetricsService(db *sqlx.DB) *MetricsService {
[]string{"address"},
)

// RPC Service Metrics
m.rpcRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_requests_total",
Help: "Total number of RPC requests",
},
[]string{"endpoint"},
)
m.rpcRequestsDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "rpc_requests_duration_seconds",
Help: "Duration of RPC requests in seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"endpoint"},
)
m.rpcEndpointFailures = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_endpoint_failures_total",
Help: "Total number of RPC endpoint failures",
},
[]string{"endpoint"},
)
m.rpcEndpointSuccesses = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_endpoint_successes_total",
Help: "Total number of successful RPC requests",
},
[]string{"endpoint"},
)
m.rpcServiceHealth = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "rpc_service_health",
Help: "RPC service health status (1 for healthy, 0 for unhealthy)",
},
)
m.rpcLatestLedger = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "rpc_latest_ledger",
Help: "Latest ledger number reported by the RPC service",
},
)

m.registerMetrics()
return m
}
Expand All @@ -90,6 +139,12 @@ func (m *MetricsService) registerMetrics() {
m.registry.MustRegister(m.ingestionDuration)
m.registry.MustRegister(m.numAccountsRegistered)
m.registry.MustRegister(m.numAccountsDeregistered)
m.registry.MustRegister(m.rpcRequestsTotal)
m.registry.MustRegister(m.rpcRequestsDuration)
m.registry.MustRegister(m.rpcEndpointFailures)
m.registry.MustRegister(m.rpcEndpointSuccesses)
m.registry.MustRegister(m.rpcServiceHealth)
m.registry.MustRegister(m.rpcLatestLedger)
}

// GetRegistry returns the prometheus registry
Expand All @@ -115,10 +170,39 @@ func (m *MetricsService) ObserveIngestionDuration(ingestionType string, duration
}

// Account Service Metrics
func (m *MetricsService) SetNumAccountsRegistered(address string) {
func (m *MetricsService) IncNumAccountsRegistered(address string) {
m.numAccountsRegistered.WithLabelValues(address).Inc()
}

func (m *MetricsService) SetNumAccountsDeregistered(address string) {
func (m *MetricsService) IncNumAccountsDeregistered(address string) {
m.numAccountsDeregistered.WithLabelValues(address).Inc()
}

// RPC Service Metrics
func (m *MetricsService) IncRPCRequests(endpoint string) {
m.rpcRequestsTotal.WithLabelValues(endpoint).Inc()
}

func (m *MetricsService) ObserveRPCRequestDuration(endpoint string, duration float64) {
m.rpcRequestsDuration.WithLabelValues(endpoint).Observe(duration)
}

func (m *MetricsService) IncRPCEndpointFailure(endpoint string) {
m.rpcEndpointFailures.WithLabelValues(endpoint).Inc()
}

func (m *MetricsService) IncRPCEndpointSuccess(endpoint string) {
m.rpcEndpointSuccesses.WithLabelValues(endpoint).Inc()
}

func (m *MetricsService) SetRPCServiceHealth(healthy bool) {
if healthy {
m.rpcServiceHealth.Set(1)
} else {
m.rpcServiceHealth.Set(0)
}
}

func (m *MetricsService) SetRPCLatestLedger(ledger int64) {
m.rpcLatestLedger.Set(float64(ledger))
}
2 changes: 1 addition & 1 deletion internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
}

httpClient := http.Client{Timeout: time.Duration(30 * time.Second)}
rpcService, err := services.NewRPCService(cfg.RPCURL, &httpClient)
rpcService, err := services.NewRPCService(cfg.RPCURL, &httpClient, metricsService)
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err)
}
Expand Down
24 changes: 23 additions & 1 deletion internal/services/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/entities"
"github.com/stellar/wallet-backend/internal/metrics"
"github.com/stellar/wallet-backend/internal/utils"
)

Expand All @@ -35,25 +36,30 @@ type rpcService struct {
rpcURL string
httpClient utils.HTTPClient
heartbeatChannel chan entities.RPCGetHealthResult
metricsService *metrics.MetricsService
}

var PageLimit = 200

var _ RPCService = (*rpcService)(nil)

func NewRPCService(rpcURL string, httpClient utils.HTTPClient) (*rpcService, error) {
func NewRPCService(rpcURL string, httpClient utils.HTTPClient, metricsService *metrics.MetricsService) (*rpcService, error) {
if rpcURL == "" {
return nil, errors.New("rpcURL cannot be nil")
}
if httpClient == nil {
return nil, errors.New("httpClient cannot be nil")
}
if metricsService == nil {
return nil, errors.New("metricsService cannot be nil")
}

heartbeatChannel := make(chan entities.RPCGetHealthResult, 1)
return &rpcService{
rpcURL: rpcURL,
httpClient: httpClient,
heartbeatChannel: heartbeatChannel,
metricsService: metricsService,
}, nil
}

Expand Down Expand Up @@ -182,20 +188,30 @@ func (r *rpcService) TrackRPCServiceHealth(ctx context.Context) {
return
case <-warningTicker.C:
log.Warn(fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime))
r.metricsService.SetRPCServiceHealth(false)
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
case <-healthCheckTicker.C:
result, err := r.GetHealth()
if err != nil {
log.Warnf("rpc health check failed: %v", err)
r.metricsService.SetRPCServiceHealth(false)
continue
}
r.heartbeatChannel <- result
r.metricsService.SetRPCServiceHealth(true)
r.metricsService.SetRPCLatestLedger(int64(result.LatestLedger))
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
}
}
}

func (r *rpcService) sendRPCRequest(method string, params entities.RPCParams) (json.RawMessage, error) {
startTime := time.Now()
r.metricsService.IncRPCRequests(method)
defer func() {
duration := time.Since(startTime).Seconds()
r.metricsService.ObserveRPCRequestDuration(method, duration)
}()

payload := map[string]interface{}{
"jsonrpc": "2.0",
Expand All @@ -210,28 +226,34 @@ func (r *rpcService) sendRPCRequest(method string, params entities.RPCParams) (j

jsonData, err := json.Marshal(payload)
if err != nil {
r.metricsService.IncRPCEndpointFailure(method)
return nil, fmt.Errorf("marshaling payload")
}
resp, err := r.httpClient.Post(r.rpcURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
r.metricsService.IncRPCEndpointFailure(method)
return nil, fmt.Errorf("sending POST request to RPC: %w", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
r.metricsService.IncRPCEndpointFailure(method)
return nil, fmt.Errorf("unmarshaling RPC response: %w", err)
}

var res entities.RPCResponse
err = json.Unmarshal(body, &res)
if err != nil {
r.metricsService.IncRPCEndpointFailure(method)
return nil, fmt.Errorf("parsing RPC response JSON: %w", err)
}

if res.Result == nil {
r.metricsService.IncRPCEndpointFailure(method)
return nil, fmt.Errorf("response %s missing result field", string(body))
}

r.metricsService.IncRPCEndpointSuccess(method)
return res.Result, nil
}

0 comments on commit 036f6e3

Please sign in to comment.