From 1bb72a6fe9d40bd75eb6108f032edf7b9dbb8e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mi=C5=82kowski?= Date: Wed, 21 Dec 2022 14:30:02 +0100 Subject: [PATCH] Registration cache (#63) * Initial * working version * change in process manager constructor * Add time check and jitter for ttl * Correct unix timestamp * removed todo --- cmd/dreamboat/main.go | 14 ++++++++-- pkg/relay/metrics.go | 18 ++++++++++++ pkg/relay/processmanager.go | 56 +++++++++++++++++++++++++++++++++---- pkg/relay/register.go | 14 ++++++++++ pkg/relay/relay.go | 2 ++ 5 files changed, 96 insertions(+), 8 deletions(-) diff --git a/cmd/dreamboat/main.go b/cmd/dreamboat/main.go index b9e46342..f69c440e 100644 --- a/cmd/dreamboat/main.go +++ b/cmd/dreamboat/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "math" "net/http" "os" "os/signal" @@ -160,6 +161,12 @@ var flags = []cli.Flag{ Value: 1_000, EnvVars: []string{"RELAY_PAYLOAD_CACHE_SIZE"}, }, + &cli.IntFlag{ + Name: "relay-registrations-cache-size", + Usage: "relay registrations cache size", + Value: 600_000, + EnvVars: []string{"RELAY_REGISTRATIONS_CACHE_SIZE"}, + }, } var ( @@ -278,7 +285,7 @@ func run() cli.ActionFunc { hc := datastore.NewHeaderController(config.RelayHeaderMemorySlotLag, config.RelayHeaderMemorySlotTimeLag) hc.AttachMetrics(m) - ds, err := datastore.NewDatastore(&datastore.TTLDatastoreBatcher{storage}, storage.DB, hc, c.Int("relay-payload-cache-size")) // TODO: make cache size parameter + ds, err := datastore.NewDatastore(&datastore.TTLDatastoreBatcher{storage}, storage.DB, hc, c.Int("relay-payload-cache-size")) if err != nil { return fmt.Errorf("fail to create datastore: %w", err) } @@ -292,7 +299,10 @@ func run() cli.ActionFunc { go ds.MemoryCleanup(c.Context, config.RelayHeaderMemoryPurgeInterval, config.TTL) - regMgr := relay.NewProcessManager(config.Log, c.Uint("relay-verify-queue-size"), c.Uint("relay-store-queue-size")) + regMgr, err := relay.NewProcessManager(config.Log, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-verify-queue-size"), c.Uint("relay-store-queue-size"), c.Int("relay-registrations-cache-size")) + if err != nil { + return fmt.Errorf("fail to create relay process manager: %w", err) + } regMgr.AttachMetrics(m) loadRegistrations(ds, regMgr) diff --git a/pkg/relay/metrics.go b/pkg/relay/metrics.go index b6393f25..0a04e01d 100644 --- a/pkg/relay/metrics.go +++ b/pkg/relay/metrics.go @@ -59,6 +59,14 @@ func (rm *ProcessManager) initMetrics() { Name: "mapSize", Help: "Size of internal map", }) + + rm.m.StoreSize = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "dreamboat", + Subsystem: "relayprocess", + Name: "storeSize", + Help: "Size of stored", + }) + } func (rm *ProcessManager) AttachMetrics(m *metrics.Metrics) { @@ -72,6 +80,8 @@ func (rm *ProcessManager) AttachMetrics(m *metrics.Metrics) { type RelayMetrics struct { Timing *prometheus.HistogramVec + + RegistrationsCacheHits *prometheus.CounterVec } func (r *Relay) initMetrics() { @@ -81,8 +91,16 @@ func (r *Relay) initMetrics() { Name: "timing", Help: "Duration of requests per function", }, []string{"function", "type"}) + + r.m.RegistrationsCacheHits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "dreamboat", + Subsystem: "relayprocess", + Name: "registrationCache", + Help: "cache hit/miss", + }, []string{"result"}) } func (r *Relay) AttachMetrics(m *metrics.Metrics) { m.Register(r.m.Timing) + m.Register(r.m.RegistrationsCacheHits) } diff --git a/pkg/relay/processmanager.go b/pkg/relay/processmanager.go index 1c0f0b6d..02369430 100644 --- a/pkg/relay/processmanager.go +++ b/pkg/relay/processmanager.go @@ -3,12 +3,15 @@ package relay import ( "context" "fmt" + "math/rand" "sync" "sync/atomic" "time" "github.com/blocknative/dreamboat/pkg/structs" "github.com/flashbots/go-boost-utils/bls" + "github.com/flashbots/go-boost-utils/types" + lru "github.com/hashicorp/golang-lru/v2" "github.com/lthibault/log" "github.com/prometheus/client_golang/prometheus" ) @@ -20,6 +23,9 @@ const ( ) type ProcessManager struct { + RegistrationCache *lru.Cache[types.PublicKey, types.RegisterValidatorRequestMessage] + storeTTLHalftimeSeconds int + LastRegTime map[string]uint64 // [pubkey]timestamp lrtl sync.RWMutex // LastRegTime RWLock @@ -37,11 +43,17 @@ type ProcessManager struct { m ProcessManagerMetrics } -func NewProcessManager(l log.Logger, verifySize, storeSize uint) *ProcessManager { - rm := &ProcessManager{ - l: l, - LastRegTime: make(map[string]uint64), +func NewProcessManager(l log.Logger, storeTTLHalftimeSeconds int, verifySize, storeSize uint, registrationCacheSize int) (*ProcessManager, error) { + cache, err := lru.New[types.PublicKey, types.RegisterValidatorRequestMessage](registrationCacheSize) + if err != nil { + return nil, err + } + rm := &ProcessManager{ + l: l, + storeTTLHalftimeSeconds: storeTTLHalftimeSeconds, + LastRegTime: make(map[string]uint64), + RegistrationCache: cache, VerifySubmitBlockCh: make(chan VerifyReq, verifySize), VerifyRegisterValidatorCh: make(chan VerifyReq, verifySize), VerifyOtherCh: make(chan VerifyReq, verifySize), @@ -49,7 +61,7 @@ func NewProcessManager(l log.Logger, verifySize, storeSize uint) *ProcessManager StoreCh: make(chan StoreReq, storeSize), } rm.initMetrics() - return rm + return rm, nil } func (pm *ProcessManager) Close(ctx context.Context) { @@ -134,7 +146,6 @@ func (rm *ProcessManager) SendStore(request StoreReq) { if atomic.LoadInt32(&(rm.isClosed)) == 0 { rm.StoreCh <- request } - } func (rm *ProcessManager) VerifyChan() chan VerifyReq { @@ -152,6 +163,19 @@ func (rm *ProcessManager) GetVerifyChan(stack uint) chan VerifyReq { } } +func (rm *ProcessManager) Check(rvg *types.RegisterValidatorRequestMessage) bool { + v, ok := rm.RegistrationCache.Get(rvg.Pubkey) + if !ok { + return false + } + + if uint64(time.Now().Unix())-v.Timestamp > uint64(rm.storeTTLHalftimeSeconds+rand.Intn(rm.storeTTLHalftimeSeconds)) { + return false + } + + return v.FeeRecipient == rvg.FeeRecipient && v.GasLimit == rvg.GasLimit +} + func (rm *ProcessManager) Get(k string) (value uint64, ok bool) { rm.lrtl.RLock() defer rm.lrtl.RUnlock() @@ -172,6 +196,7 @@ func (pm *ProcessManager) ParallelStore(datas Datastore, ttl time.Duration) { pm.m.StoreSize.Observe(float64(len(payload.Items))) if err := pm.storeRegistration(ctx, datas, ttl, payload); err != nil { pm.l.Errorf("error storing registration - %w ", err) + continue } } } @@ -202,6 +227,10 @@ func (pm *ProcessManager) storeRegistration(ctx context.Context, datas Datastore pm.m.StoreErrorRate.Inc() return err } + pm.RegistrationCache.Add(i.Pubkey, types.RegisterValidatorRequestMessage{ + FeeRecipient: i.FeeRecipient, + Timestamp: i.Time, + GasLimit: i.GasLimit}) t.ObserveDuration() } return nil @@ -289,6 +318,21 @@ func (s *StoreResp) Done() chan error { return s.done } +func (s *StoreResp) SkipOne() { + s.rLock.Lock() + defer s.rLock.Unlock() + + if s.IsClosed() { + return + } + + s.numAll -= 1 + if s.numAll == len(s.nonErrors) { + s.close() + return + } +} + func (s *StoreResp) IsClosed() bool { return atomic.LoadInt32(&(s.isClosed)) != 0 } diff --git a/pkg/relay/register.go b/pkg/relay/register.go index 92d40884..559557ac 100644 --- a/pkg/relay/register.go +++ b/pkg/relay/register.go @@ -42,6 +42,10 @@ type StoreReqItem struct { RawPayload json.RawMessage Time uint64 Pubkey types.PublicKey + + // additional params + FeeRecipient types.Address + GasLimit uint64 } // Resp respone structure @@ -80,6 +84,13 @@ SendPayloads: default: } + if rs.regMngr.Check(p.Message) { + response.SkipOne() + rs.m.RegistrationsCacheHits.WithLabelValues("hit").Inc() + continue SendPayloads + } + rs.m.RegistrationsCacheHits.WithLabelValues("miss").Inc() + checkTime := time.Now() o, ok := verifyOther(be, rs.regMngr, i, p) if !ok { @@ -122,6 +133,9 @@ SendPayloads: Time: p.Message.Timestamp, Pubkey: p.Message.Pubkey, RawPayload: p.Raw, + + FeeRecipient: p.Message.FeeRecipient, + GasLimit: p.Message.GasLimit, } } rs.regMngr.SendStore(request) diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go index 4a0b7c37..b2be6e75 100644 --- a/pkg/relay/relay.go +++ b/pkg/relay/relay.go @@ -57,6 +57,8 @@ type RegistrationManager interface { SendStore(sReq StoreReq) Get(k string) (value uint64, ok bool) + + Check(*types.RegisterValidatorRequestMessage) bool } type RelayConfig struct {