Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add inflight quote cache #2963

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions services/rfq/relayer/inventory/inflight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package inventory

import (
"context"
"fmt"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb"
"sync"
"time"
)

// inFlightManager stores in-flight quotes and allows retrieval via the db.
// it is thread-safe.
type inFlightManager struct {
ttl time.Duration
db reldb.Service
mux sync.RWMutex
entry *inFlightQuoteCacheEntry
}

// inFlightQuoteCacheEntry represents an entry in the in-flight quote cache.
type inFlightQuoteCacheEntry struct {
createdAt time.Time
quotes []reldb.QuoteRequest
}

// QuoterOption defines a type for functional options.
type QuoterOption func(*inFlightManager)

// WithTTL sets the TTL for the inFlightManager.
func WithTTL(ttl time.Duration) QuoterOption {
return func(q *inFlightManager) {
q.ttl = ttl
}
}

const defaultTTL = 250 * time.Millisecond

// newInflightManager creates a new inFlightManager with the given options.
func newInflightManager(options ...QuoterOption) *inFlightManager {
// Default TTL to 250ms
quoter := &inFlightManager{
ttl: defaultTTL,
}

// Apply options
for _, opt := range options {
opt(quoter)
}

return quoter
}

func (q *inFlightManager) GetInFlightQuotes(ctx context.Context, skipCache bool) (quotes []reldb.QuoteRequest, err error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Consider adding logging for cache hits and misses to aid in debugging and performance monitoring.

if skipCache || q.shouldRefresh() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Ensure that the database query handles large datasets efficiently to avoid performance bottlenecks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Ensure that the database query handles large datasets efficiently to avoid performance bottlenecks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Ensure that the database query handles large datasets efficiently to avoid performance bottlenecks.

inFlightQuotes, err := q.db.GetQuoteResultsByStatus(ctx, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.RelayStarted)
if err != nil {
return nil, fmt.Errorf("could not get in flight quotes: %w", err)
}
q.mux.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider moving the defer statement to immediately after acquiring the lock to ensure it is always released.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider moving the defer statement to immediately after acquiring the lock to ensure it is always released.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider moving the defer statement to immediately after acquiring the lock to ensure it is always released.

q.entry = &inFlightQuoteCacheEntry{
createdAt: time.Now(),
quotes: inFlightQuotes,
}
q.mux.Unlock()
}

return q.entry.quotes, nil
}

func (q *inFlightManager) shouldRefresh() bool {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential race condition if shouldRefresh is called concurrently with GetInFlightQuotes. Ensure proper locking.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential race condition if shouldRefresh is called concurrently with GetInFlightQuotes. Ensure proper locking.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential race condition if shouldRefresh is called concurrently with GetInFlightQuotes. Ensure proper locking.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential race condition if shouldRefresh is called concurrently with GetInFlightQuotes. Ensure proper locking.

q.mux.RLock()
defer q.mux.RUnlock()

return q.entry == nil || time.Since(q.entry.createdAt) > q.ttl
}
30 changes: 17 additions & 13 deletions services/rfq/relayer/inventory/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type inventoryManagerImpl struct {
meter metric.Meter
// balanceGauge is the histogram for balance
balanceGauge metric.Float64ObservableGauge
// inFlightQuoteManager is the cache for in flight quotes
inFlightQuoteManager *inFlightManager
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure cache invalidation logic is robust to prevent stale data issues.

}

// ErrUnsupportedChain is the error for an unsupported chain.
Expand Down Expand Up @@ -106,14 +108,15 @@ func (i *inventoryManagerImpl) GetCommittableBalance(ctx context.Context, chainI
func (i *inventoryManagerImpl) GetCommittableBalances(ctx context.Context, options ...BalanceFetchArgOption) (res map[int]map[common.Address]*big.Int, err error) {
reqOptions := makeOptions(options)
// TODO: hard fail if cache skip breaks
if reqOptions.skipCache {
if reqOptions.shouldRefreshBalances {
// TODO; no need for this if refresh already in flight
_ = i.refreshBalances(ctx)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Avoid redundant balance refreshes if a refresh is already in progress.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Avoid redundant balance refreshes if a refresh is already in progress.

}

// get db first
// Add other committed, but incomplete statuses here
// TODO: clean me up: you can do this by having a IsLiquidityCommitted() method on the type.
inFlightQuotes, err := i.db.GetQuoteResultsByStatus(ctx, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.RelayStarted)
inFlightQuotes, err := i.inFlightQuoteManager.GetInFlightQuotes(ctx, reqOptions.skipDBCache)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding error handling for cache retrieval failures.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure inFlightQuoteManager.GetInFlightQuotes handles cache retrieval failures gracefully.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure inFlightQuoteManager.GetInFlightQuotes handles cache retrieval failures gracefully.

if err != nil {
return nil, fmt.Errorf("could not get in flight quotes: %w", err)
}
Expand Down Expand Up @@ -195,14 +198,15 @@ func NewInventoryManager(ctx context.Context, clientFetcher submitter.ClientFetc
}

i := inventoryManagerImpl{
relayerAddress: relayer,
handler: handler,
cfg: cfg,
chainClient: clientFetcher,
txSubmitter: txSubmitter,
rebalanceManagers: rebalanceManagers,
db: db,
meter: handler.Meter(meterName),
relayerAddress: relayer,
handler: handler,
cfg: cfg,
chainClient: clientFetcher,
txSubmitter: txSubmitter,
rebalanceManagers: rebalanceManagers,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding error handling for the new inFlightManager initialization.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding error handling for the new inFlightManager initialization.

db: db,
meter: handler.Meter(meterName),
inFlightQuoteManager: newInflightManager(),
}

i.balanceGauge, err = i.meter.Float64ObservableGauge("inventory_balance")
Expand Down Expand Up @@ -619,11 +623,11 @@ var logger = log.Logger("inventory")

// refreshBalances refreshes all the token balances.
func (i *inventoryManagerImpl) refreshBalances(ctx context.Context) error {
i.mux.Lock()
defer i.mux.Unlock()
var wg sync.WaitGroup
wg.Add(len(i.tokens))

gasBalances := make(map[int]*big.Int)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Pre-allocate gasBalances map with known chain IDs to improve performance.


// TODO: this can be pre-capped w/ len(cfg.Tokens) for each chain id.
// here we register metrics for exporting through otel. We wait to call these functions until are tokens have been initialized to avoid nil issues.
for cid, tokenMap := range i.tokens {
Expand All @@ -635,7 +639,7 @@ func (i *inventoryManagerImpl) refreshBalances(ctx context.Context) error {

// queue gas token balance fetch
deferredCalls := []w3types.Caller{
eth.Balance(i.relayerAddress, nil).Returns(i.gasBalances[chainID]),
eth.Balance(i.relayerAddress, nil).Returns(gasBalances[chainID]),
}

// queue token balance fetches
Expand Down
16 changes: 12 additions & 4 deletions services/rfq/relayer/inventory/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@ package inventory

// balanceFetchOptions is is an underlying struct used for option fetching.
type balanceFetchOptions struct {
skipCache bool
shouldRefreshBalances bool
skipDBCache bool
}

// BalanceFetchArgOption is an option that can be passed into a balance fetch request.
// we do this to allow optional args.
type BalanceFetchArgOption func(options *balanceFetchOptions)

// SkipCache allows someone fetching balance(s) to skip the cache.
func SkipCache() BalanceFetchArgOption {
// ShouldRefreshBalances allows someone fetching balance(s) to skip the cache.
func ShouldRefreshBalances() BalanceFetchArgOption {
return func(options *balanceFetchOptions) {
options.skipCache = true
options.shouldRefreshBalances = true
}
Comment on lines +13 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure that forcing a balance refresh does not lead to race conditions or inconsistent state.

}

// SkipDBCache allows someone fetching balance(s) to skip the cache.
func SkipDBCache() BalanceFetchArgOption {
return func(options *balanceFetchOptions) {
options.skipDBCache = true
Comment on lines +21 to +23
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Skipping the DB cache might lead to increased load on the database; monitor performance.

}
}

Expand Down
2 changes: 1 addition & 1 deletion services/rfq/relayer/quoter/quoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (m *Manager) SubmitAllQuotes(ctx context.Context) (err error) {
metrics.EndSpanWithErr(span, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure inventory.SkipDBCache() does not introduce inconsistencies in balance retrieval.

}()

inv, err := m.inventoryManager.GetCommittableBalances(ctx)
inv, err := m.inventoryManager.GetCommittableBalances(ctx, inventory.SkipDBCache())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Consider the impact of skipping the DB cache on performance and data consistency.

if err != nil {
return fmt.Errorf("error getting committable balances: %w", err)
}
Expand Down
Loading