Skip to content

Commit

Permalink
add inflight quote cache (#2963)
Browse files Browse the repository at this point in the history
Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
trajan0x and trajan0x authored Jul 30, 2024
1 parent 9f5bdc2 commit 93aa8e4
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 17 deletions.
81 changes: 81 additions & 0 deletions services/rfq/relayer/inventory/inflight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package inventory

import (
"context"
"fmt"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb"
"sync"
"time"
)

// inFlightManager stores in-flight quotes and allows retrieval via the db.
// it is thread-safe.
type inFlightManager struct {
ttl time.Duration
db reldb.Service
mux sync.RWMutex
entry *inFlightQuoteCacheEntry
}

// inFlightQuoteCacheEntry represents an entry in the in-flight quote cache.
type inFlightQuoteCacheEntry struct {
createdAt time.Time
quotes []reldb.QuoteRequest
}

// QuoterOption defines a type for functional options.
type QuoterOption func(*inFlightManager)

// WithTTL sets the TTL for the inFlightManager.
func WithTTL(ttl time.Duration) QuoterOption {
return func(q *inFlightManager) {
q.ttl = ttl
}
}

const defaultTTL = 2 * time.Second

// newInflightManager creates a new inFlightManager with the given options.
func newInflightManager(db reldb.Service, options ...QuoterOption) *inFlightManager {
// Default TTL to 250ms
quoter := &inFlightManager{
ttl: defaultTTL,
db: db,
}

// Apply options
for _, opt := range options {
opt(quoter)
}

return quoter
}

func (q *inFlightManager) GetInFlightQuotes(ctx context.Context, skipCache bool) (quotes []reldb.QuoteRequest, err error) {
if skipCache || q.shouldRefresh() {
inFlightQuotes, err := q.db.GetQuoteResultsByStatus(ctx, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.RelayStarted)
if err != nil {
return nil, fmt.Errorf("could not get in flight quotes: %w", err)
}
q.mux.Lock()
defer q.mux.Unlock()
q.entry = &inFlightQuoteCacheEntry{
createdAt: time.Now(),
quotes: inFlightQuotes,
}

return inFlightQuotes, nil
}

q.mux.RLock()
defer q.mux.RUnlock()

return q.entry.quotes, nil
}

func (q *inFlightManager) shouldRefresh() bool {
q.mux.RLock()
defer q.mux.RUnlock()

return q.entry == nil || time.Since(q.entry.createdAt) > q.ttl
}
26 changes: 14 additions & 12 deletions services/rfq/relayer/inventory/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type inventoryManagerImpl struct {
meter metric.Meter
// balanceGauge is the histogram for balance
balanceGauge metric.Float64ObservableGauge
// inFlightQuoteManager is the cache for in flight quotes
inFlightQuoteManager *inFlightManager
}

// ErrUnsupportedChain is the error for an unsupported chain.
Expand Down Expand Up @@ -106,14 +108,15 @@ func (i *inventoryManagerImpl) GetCommittableBalance(ctx context.Context, chainI
func (i *inventoryManagerImpl) GetCommittableBalances(ctx context.Context, options ...BalanceFetchArgOption) (res map[int]map[common.Address]*big.Int, err error) {
reqOptions := makeOptions(options)
// TODO: hard fail if cache skip breaks
if reqOptions.skipCache {
if reqOptions.shouldRefreshBalances {
// TODO; no need for this if refresh already in flight
_ = i.refreshBalances(ctx)
}

// get db first
// Add other committed, but incomplete statuses here
// TODO: clean me up: you can do this by having a IsLiquidityCommitted() method on the type.
inFlightQuotes, err := i.db.GetQuoteResultsByStatus(ctx, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.RelayStarted)
inFlightQuotes, err := i.inFlightQuoteManager.GetInFlightQuotes(ctx, reqOptions.skipDBCache)
if err != nil {
return nil, fmt.Errorf("could not get in flight quotes: %w", err)
}
Expand Down Expand Up @@ -195,14 +198,15 @@ func NewInventoryManager(ctx context.Context, clientFetcher submitter.ClientFetc
}

i := inventoryManagerImpl{
relayerAddress: relayer,
handler: handler,
cfg: cfg,
chainClient: clientFetcher,
txSubmitter: txSubmitter,
rebalanceManagers: rebalanceManagers,
db: db,
meter: handler.Meter(meterName),
relayerAddress: relayer,
handler: handler,
cfg: cfg,
chainClient: clientFetcher,
txSubmitter: txSubmitter,
rebalanceManagers: rebalanceManagers,
db: db,
meter: handler.Meter(meterName),
inFlightQuoteManager: newInflightManager(db),
}

i.balanceGauge, err = i.meter.Float64ObservableGauge("inventory_balance")
Expand Down Expand Up @@ -619,8 +623,6 @@ var logger = log.Logger("inventory")

// refreshBalances refreshes all the token balances.
func (i *inventoryManagerImpl) refreshBalances(ctx context.Context) error {
i.mux.Lock()
defer i.mux.Unlock()
var wg sync.WaitGroup
wg.Add(len(i.tokens))

Expand Down
16 changes: 12 additions & 4 deletions services/rfq/relayer/inventory/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@ package inventory

// balanceFetchOptions is is an underlying struct used for option fetching.
type balanceFetchOptions struct {
skipCache bool
shouldRefreshBalances bool
skipDBCache bool
}

// BalanceFetchArgOption is an option that can be passed into a balance fetch request.
// we do this to allow optional args.
type BalanceFetchArgOption func(options *balanceFetchOptions)

// SkipCache allows someone fetching balance(s) to skip the cache.
func SkipCache() BalanceFetchArgOption {
// ShouldRefreshBalances allows someone fetching balance(s) to skip the cache.
func ShouldRefreshBalances() BalanceFetchArgOption {
return func(options *balanceFetchOptions) {
options.skipCache = true
options.shouldRefreshBalances = true
}
}

// SkipDBCache allows someone fetching balance(s) to skip the cache.
func SkipDBCache() BalanceFetchArgOption {
return func(options *balanceFetchOptions) {
options.skipDBCache = true
}
}

Expand Down
2 changes: 1 addition & 1 deletion services/rfq/relayer/quoter/quoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (m *Manager) SubmitAllQuotes(ctx context.Context) (err error) {
metrics.EndSpanWithErr(span, err)
}()

inv, err := m.inventoryManager.GetCommittableBalances(ctx)
inv, err := m.inventoryManager.GetCommittableBalances(ctx, inventory.SkipDBCache())
if err != nil {
return fmt.Errorf("error getting committable balances: %w", err)
}
Expand Down

0 comments on commit 93aa8e4

Please sign in to comment.