Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prometheus metrics for monitoring #110

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f84b232
Initialise a common prometheus metrics registry for API and Ingest
aditya1702 Jan 14, 2025
36359c0
Start /metrics server
aditya1702 Jan 14, 2025
7260e67
Start a separate ingest prometheus server
aditya1702 Jan 14, 2025
aa8780f
Add db metrics to ingest and api prom registry
aditya1702 Jan 15, 2025
908fd94
Merge branch 'main' into add-prometheus
aditya1702 Jan 15, 2025
29946f4
Add new metrics to ingest service
aditya1702 Jan 17, 2025
4294b13
Add ingestion metrics - 1
aditya1702 Jan 24, 2025
a58da1a
register db metrics in separate function
aditya1702 Jan 31, 2025
af099e9
Merge remote-tracking branch 'upstream/main' into add-prometheus
aditya1702 Feb 3, 2025
6737111
Move metrics calculation to MetricsService
aditya1702 Feb 3, 2025
1b5d0db
Add account level metrics
aditya1702 Feb 3, 2025
cab649d
Use summary vec for ingestion duration
aditya1702 Feb 4, 2025
b980f4f
Change function names for metric
aditya1702 Feb 4, 2025
036f6e3
Add rpc service metrics
aditya1702 Feb 4, 2025
86f7d7a
Add metrics for API requests
aditya1702 Feb 4, 2025
9c12fb5
Fix failing tests after metrics service changes
aditya1702 Feb 5, 2025
d8465bd
Register all metrics in single function call
aditya1702 Feb 5, 2025
eff26ae
Add tss pool metrics
aditya1702 Feb 5, 2025
43697f1
Remove unused variables
aditya1702 Feb 5, 2025
19fcd4f
Add idle workers to tss pool metrics
aditya1702 Feb 6, 2025
3a13422
Remove httpErrors metric and just use totalRequests metric
aditya1702 Feb 6, 2025
d183895
Just track the number of active registered accounts
aditya1702 Feb 6, 2025
5f6391f
Track db query counts - payments and accounts
aditya1702 Feb 6, 2025
7af1693
Fix tests
aditya1702 Feb 6, 2025
96e36da
Track db query counts - transactions
aditya1702 Feb 7, 2025
052d249
Fix failing tests
aditya1702 Feb 7, 2025
1a82420
Track db query durations
aditya1702 Feb 7, 2025
f08ea5a
Add metrics tests
aditya1702 Feb 7, 2025
fe2ec0f
Add numTSSTransactionsSubmitted
aditya1702 Feb 10, 2025
35c5d18
Add metric for tss txn inclusion time
aditya1702 Feb 12, 2025
ce60411
Add tss txn inclusion time - 2
aditya1702 Feb 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ go 1.23.2
require (
github.com/alitto/pond v1.9.2
github.com/aws/aws-sdk-go v1.55.5
github.com/dlmiddlecote/sqlstats v1.0.2
github.com/getsentry/sentry-go v0.29.1
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-playground/validator/v10 v10.22.1
github.com/google/uuid v1.6.0
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.22
github.com/prometheus/client_golang v1.17.0
github.com/rubenv/sql-migrate v1.7.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -49,7 +52,6 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
63 changes: 63 additions & 0 deletions go.sum

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions internal/data/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,41 @@ package data
import (
"context"
"fmt"
"time"

"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/metrics"
)

type AccountModel struct {
DB db.ConnectionPool
DB db.ConnectionPool
MetricsService *metrics.MetricsService
}

func (m *AccountModel) Insert(ctx context.Context, address string) error {
const query = `INSERT INTO accounts (stellar_address) VALUES ($1) ON CONFLICT DO NOTHING`
start := time.Now()
_, err := m.DB.ExecContext(ctx, query, address)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("INSERT", "accounts", duration)
if err != nil {
return fmt.Errorf("inserting address %s: %w", address, err)
}
m.MetricsService.IncDBQuery("INSERT", "accounts")

return nil
}

func (m *AccountModel) Delete(ctx context.Context, address string) error {
const query = `DELETE FROM accounts WHERE stellar_address = $1`
start := time.Now()
_, err := m.DB.ExecContext(ctx, query, address)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("DELETE", "accounts", duration)
if err != nil {
return fmt.Errorf("deleting address %s: %w", address, err)
}

m.MetricsService.IncDBQuery("DELETE", "accounts")
return nil
}

Expand All @@ -43,10 +53,13 @@ func (m *AccountModel) IsAccountFeeBumpEligible(ctx context.Context, address str
)
`
var exists bool
start := time.Now()
err := m.DB.GetContext(ctx, &exists, query, address)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "accounts", duration)
if err != nil {
return false, fmt.Errorf("checking if account %s is fee bump eligible: %w", address, err)
}

m.MetricsService.IncDBQuery("SELECT", "accounts")
return exists, nil
}
19 changes: 16 additions & 3 deletions internal/data/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stellar/go/keypair"
"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/db/dbtest"
"github.com/stellar/wallet-backend/internal/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -19,9 +20,13 @@ func TestAccountModelInsert(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

m := &AccountModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}

ctx := context.Background()
Expand All @@ -44,9 +49,13 @@ func TestAccountModelDelete(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

m := &AccountModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}

ctx := context.Background()
Expand All @@ -72,9 +81,13 @@ func TestAccountModelIsAccountFeeBumpEligible(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

m := &AccountModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}

ctx := context.Background()
Expand Down
7 changes: 4 additions & 3 deletions internal/data/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ import (
"errors"

"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/metrics"
)

type Models struct {
Payments *PaymentModel
Account *AccountModel
}

func NewModels(db db.ConnectionPool) (*Models, error) {
func NewModels(db db.ConnectionPool, metricsService *metrics.MetricsService) (*Models, error) {
if db == nil {
return nil, errors.New("ConnectionPool must be initialized")
}

return &Models{
Payments: &PaymentModel{DB: db},
Account: &AccountModel{DB: db},
Payments: &PaymentModel{DB: db, MetricsService: metricsService},
Account: &AccountModel{DB: db, MetricsService: metricsService},
}, nil
}
26 changes: 22 additions & 4 deletions internal/data/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"time"

"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/metrics"
)

type PaymentModel struct {
DB db.ConnectionPool
DB db.ConnectionPool
MetricsService *metrics.MetricsService
}

type Payment struct {
Expand All @@ -36,7 +38,10 @@ type Payment struct {

func (m *PaymentModel) GetLatestLedgerSynced(ctx context.Context, cursorName string) (uint32, error) {
var lastSyncedLedger uint32
start := time.Now()
err := m.DB.GetContext(ctx, &lastSyncedLedger, `SELECT value FROM ingest_store WHERE key = $1`, cursorName)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "ingest_store", duration)
// First run, key does not exist yet
if err == sql.ErrNoRows {
return 0, nil
Expand All @@ -53,10 +58,14 @@ func (m *PaymentModel) UpdateLatestLedgerSynced(ctx context.Context, cursorName
INSERT INTO ingest_store (key, value) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = excluded.value
`
start := time.Now()
_, err := m.DB.ExecContext(ctx, query, cursorName, ledger)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("INSERT", "ingest_store", duration)
if err != nil {
return fmt.Errorf("updating last synced ledger to %d: %w", ledger, err)
}
m.MetricsService.IncDBQuery("INSERT", "ingest_store")

return nil
}
Expand Down Expand Up @@ -90,12 +99,15 @@ func (m *PaymentModel) AddPayment(ctx context.Context, tx db.Transaction, paymen
memo_type = EXCLUDED.memo_type
;
`
start := time.Now()
_, err := tx.ExecContext(ctx, query, payment.OperationID, payment.OperationType, payment.TransactionID, payment.TransactionHash, payment.FromAddress, payment.ToAddress, payment.SrcAssetCode, payment.SrcAssetIssuer, payment.SrcAssetType, payment.SrcAmount,
payment.DestAssetCode, payment.DestAssetIssuer, payment.DestAssetType, payment.DestAmount, payment.CreatedAt, payment.Memo, payment.MemoType)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("INSERT", "ingest_payments", duration)
if err != nil {
return fmt.Errorf("inserting payment: %w", err)
}

m.MetricsService.IncDBQuery("INSERT", "ingest_payments")
return nil
}

Expand Down Expand Up @@ -142,7 +154,10 @@ func (m *PaymentModel) GetPaymentsPaginated(ctx context.Context, address string,
if err != nil {
return nil, false, false, fmt.Errorf("preparing named query: %w", err)
}
start := time.Now()
err = m.DB.SelectContext(ctx, &payments, query, args...)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "ingest_payments", duration)
if err != nil {
return nil, false, false, fmt.Errorf("fetching payments: %w", err)
}
Expand All @@ -151,7 +166,7 @@ func (m *PaymentModel) GetPaymentsPaginated(ctx context.Context, address string,
if err != nil {
return nil, false, false, fmt.Errorf("checking prev and next pages: %w", err)
}

m.MetricsService.IncDBQuery("SELECT", "ingest_payments")
return payments, prevExists, nextExists, nil
}

Expand Down Expand Up @@ -187,11 +202,14 @@ func (m *PaymentModel) existsPrevNext(ctx context.Context, filteredSetCTE string
}

var prevExists, nextExists bool
start := time.Now()
err = m.DB.QueryRowxContext(ctx, query, args...).Scan(&prevExists, &nextExists)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "ingest_payments", duration)
if err != nil {
return false, false, fmt.Errorf("fetching prev and next exists: %w", err)
}

m.MetricsService.IncDBQuery("SELECT", "ingest_payments")
return prevExists, nextExists, nil
}

Expand Down
25 changes: 21 additions & 4 deletions internal/data/payments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stellar/go/xdr"
"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/db/dbtest"
"github.com/stellar/wallet-backend/internal/metrics"
"github.com/stellar/wallet-backend/internal/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -21,9 +22,13 @@ func TestPaymentModelAddPayment(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

m := &PaymentModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}
ctx := context.Background()

Expand Down Expand Up @@ -145,10 +150,14 @@ func TestPaymentModelGetLatestLedgerSynced(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

ctx := context.Background()
m := &PaymentModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}

const key = "ingest_store_key"
Expand All @@ -170,10 +179,14 @@ func TestPaymentModelUpdateLatestLedgerSynced(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

ctx := context.Background()
m := &PaymentModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}

const key = "ingest_store_key"
Expand All @@ -192,10 +205,14 @@ func TestPaymentModelGetPaymentsPaginated(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
sqlxDB, err := dbConnectionPool.SqlxDB(context.Background())
require.NoError(t, err)
metricsService := metrics.NewMetricsService(sqlxDB)

ctx := context.Background()
m := &PaymentModel{
DB: dbConnectionPool,
DB: dbConnectionPool,
MetricsService: metricsService,
}

dbPayments := []Payment{
Expand Down
24 changes: 19 additions & 5 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/stellar/go/support/log"

"github.com/stellar/wallet-backend/internal/apptracker"
"github.com/stellar/wallet-backend/internal/data"
"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/metrics"
"github.com/stellar/wallet-backend/internal/services"
"github.com/stellar/wallet-backend/internal/tss"
tssrouter "github.com/stellar/wallet-backend/internal/tss/router"
Expand Down Expand Up @@ -49,22 +51,26 @@ func Ingest(cfg Configs) error {
}

func setupDeps(cfg Configs) (services.IngestService, error) {
// Open DB connection pool
dbConnectionPool, err := db.OpenDBConnectionPool(cfg.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("connecting to the database: %w", err)
}
models, err := data.NewModels(dbConnectionPool)
db, err := dbConnectionPool.SqlxDB(context.Background())
if err != nil {
return nil, fmt.Errorf("getting sqlx db: %w", err)
}
metricsService := metrics.NewMetricsService(db)
models, err := data.NewModels(dbConnectionPool, metricsService)
if err != nil {
return nil, fmt.Errorf("creating models: %w", err)
}
httpClient := &http.Client{Timeout: 30 * time.Second}
rpcService, err := services.NewRPCService(cfg.RPCURL, httpClient)
rpcService, err := services.NewRPCService(cfg.RPCURL, httpClient, metricsService)
if err != nil {
return nil, fmt.Errorf("instantiating rpc service: %w", err)
}
go rpcService.TrackRPCServiceHealth(context.Background())
tssStore, err := tssstore.NewStore(dbConnectionPool)
tssStore, err := tssstore.NewStore(dbConnectionPool, metricsService)
if err != nil {
return nil, fmt.Errorf("instantiating tss store: %w", err)
}
Expand All @@ -75,10 +81,18 @@ func setupDeps(cfg Configs) (services.IngestService, error) {
router := tssrouter.NewRouter(tssRouterConfig)

ingestService, err := services.NewIngestService(
models, cfg.LedgerCursorName, cfg.AppTracker, rpcService, router, tssStore)
models, cfg.LedgerCursorName, cfg.AppTracker, rpcService, router, tssStore, metricsService)
if err != nil {
return nil, fmt.Errorf("instantiating ingest service: %w", err)
}

http.Handle("/ingest-metrics", promhttp.HandlerFor(metricsService.GetRegistry(), promhttp.HandlerOpts{}))
go func() {
err := http.ListenAndServe(":8002", nil)
if err != nil {
log.Ctx(context.Background()).Fatalf("starting ingest metrics server: %v", err)
}
}()

return ingestService, nil
}
Loading