Skip to content

Commit

Permalink
Registration cache (#63)
Browse files Browse the repository at this point in the history
* Initial

* working version

* change in process manager constructor

* Add time check and jitter for ttl

* Correct unix timestamp

* removed todo
  • Loading branch information
lukanus authored Dec 21, 2022
1 parent f65e55c commit 1bb72a6
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 8 deletions.
14 changes: 12 additions & 2 deletions cmd/dreamboat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"math"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -160,6 +161,12 @@ var flags = []cli.Flag{
Value: 1_000,
EnvVars: []string{"RELAY_PAYLOAD_CACHE_SIZE"},
},
&cli.IntFlag{
Name: "relay-registrations-cache-size",
Usage: "relay registrations cache size",
Value: 600_000,
EnvVars: []string{"RELAY_REGISTRATIONS_CACHE_SIZE"},
},
}

var (
Expand Down Expand Up @@ -278,7 +285,7 @@ func run() cli.ActionFunc {
hc := datastore.NewHeaderController(config.RelayHeaderMemorySlotLag, config.RelayHeaderMemorySlotTimeLag)
hc.AttachMetrics(m)

ds, err := datastore.NewDatastore(&datastore.TTLDatastoreBatcher{storage}, storage.DB, hc, c.Int("relay-payload-cache-size")) // TODO: make cache size parameter
ds, err := datastore.NewDatastore(&datastore.TTLDatastoreBatcher{storage}, storage.DB, hc, c.Int("relay-payload-cache-size"))
if err != nil {
return fmt.Errorf("fail to create datastore: %w", err)
}
Expand All @@ -292,7 +299,10 @@ func run() cli.ActionFunc {

go ds.MemoryCleanup(c.Context, config.RelayHeaderMemoryPurgeInterval, config.TTL)

regMgr := relay.NewProcessManager(config.Log, c.Uint("relay-verify-queue-size"), c.Uint("relay-store-queue-size"))
regMgr, err := relay.NewProcessManager(config.Log, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-verify-queue-size"), c.Uint("relay-store-queue-size"), c.Int("relay-registrations-cache-size"))
if err != nil {
return fmt.Errorf("fail to create relay process manager: %w", err)
}
regMgr.AttachMetrics(m)
loadRegistrations(ds, regMgr)

Expand Down
18 changes: 18 additions & 0 deletions pkg/relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func (rm *ProcessManager) initMetrics() {
Name: "mapSize",
Help: "Size of internal map",
})

rm.m.StoreSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "dreamboat",
Subsystem: "relayprocess",
Name: "storeSize",
Help: "Size of stored",
})

}

func (rm *ProcessManager) AttachMetrics(m *metrics.Metrics) {
Expand All @@ -72,6 +80,8 @@ func (rm *ProcessManager) AttachMetrics(m *metrics.Metrics) {

type RelayMetrics struct {
Timing *prometheus.HistogramVec

RegistrationsCacheHits *prometheus.CounterVec
}

func (r *Relay) initMetrics() {
Expand All @@ -81,8 +91,16 @@ func (r *Relay) initMetrics() {
Name: "timing",
Help: "Duration of requests per function",
}, []string{"function", "type"})

r.m.RegistrationsCacheHits = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "dreamboat",
Subsystem: "relayprocess",
Name: "registrationCache",
Help: "cache hit/miss",
}, []string{"result"})
}

func (r *Relay) AttachMetrics(m *metrics.Metrics) {
m.Register(r.m.Timing)
m.Register(r.m.RegistrationsCacheHits)
}
56 changes: 50 additions & 6 deletions pkg/relay/processmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package relay
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/blocknative/dreamboat/pkg/structs"
"github.com/flashbots/go-boost-utils/bls"
"github.com/flashbots/go-boost-utils/types"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/lthibault/log"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -20,6 +23,9 @@ const (
)

type ProcessManager struct {
RegistrationCache *lru.Cache[types.PublicKey, types.RegisterValidatorRequestMessage]
storeTTLHalftimeSeconds int

LastRegTime map[string]uint64 // [pubkey]timestamp
lrtl sync.RWMutex // LastRegTime RWLock

Expand All @@ -37,19 +43,25 @@ type ProcessManager struct {
m ProcessManagerMetrics
}

func NewProcessManager(l log.Logger, verifySize, storeSize uint) *ProcessManager {
rm := &ProcessManager{
l: l,
LastRegTime: make(map[string]uint64),
func NewProcessManager(l log.Logger, storeTTLHalftimeSeconds int, verifySize, storeSize uint, registrationCacheSize int) (*ProcessManager, error) {
cache, err := lru.New[types.PublicKey, types.RegisterValidatorRequestMessage](registrationCacheSize)
if err != nil {
return nil, err
}

rm := &ProcessManager{
l: l,
storeTTLHalftimeSeconds: storeTTLHalftimeSeconds,
LastRegTime: make(map[string]uint64),
RegistrationCache: cache,
VerifySubmitBlockCh: make(chan VerifyReq, verifySize),
VerifyRegisterValidatorCh: make(chan VerifyReq, verifySize),
VerifyOtherCh: make(chan VerifyReq, verifySize),

StoreCh: make(chan StoreReq, storeSize),
}
rm.initMetrics()
return rm
return rm, nil
}

func (pm *ProcessManager) Close(ctx context.Context) {
Expand Down Expand Up @@ -134,7 +146,6 @@ func (rm *ProcessManager) SendStore(request StoreReq) {
if atomic.LoadInt32(&(rm.isClosed)) == 0 {
rm.StoreCh <- request
}

}

func (rm *ProcessManager) VerifyChan() chan VerifyReq {
Expand All @@ -152,6 +163,19 @@ func (rm *ProcessManager) GetVerifyChan(stack uint) chan VerifyReq {
}
}

func (rm *ProcessManager) Check(rvg *types.RegisterValidatorRequestMessage) bool {
v, ok := rm.RegistrationCache.Get(rvg.Pubkey)
if !ok {
return false
}

if uint64(time.Now().Unix())-v.Timestamp > uint64(rm.storeTTLHalftimeSeconds+rand.Intn(rm.storeTTLHalftimeSeconds)) {
return false
}

return v.FeeRecipient == rvg.FeeRecipient && v.GasLimit == rvg.GasLimit
}

func (rm *ProcessManager) Get(k string) (value uint64, ok bool) {
rm.lrtl.RLock()
defer rm.lrtl.RUnlock()
Expand All @@ -172,6 +196,7 @@ func (pm *ProcessManager) ParallelStore(datas Datastore, ttl time.Duration) {
pm.m.StoreSize.Observe(float64(len(payload.Items)))
if err := pm.storeRegistration(ctx, datas, ttl, payload); err != nil {
pm.l.Errorf("error storing registration - %w ", err)
continue
}
}
}
Expand Down Expand Up @@ -202,6 +227,10 @@ func (pm *ProcessManager) storeRegistration(ctx context.Context, datas Datastore
pm.m.StoreErrorRate.Inc()
return err
}
pm.RegistrationCache.Add(i.Pubkey, types.RegisterValidatorRequestMessage{
FeeRecipient: i.FeeRecipient,
Timestamp: i.Time,
GasLimit: i.GasLimit})
t.ObserveDuration()
}
return nil
Expand Down Expand Up @@ -289,6 +318,21 @@ func (s *StoreResp) Done() chan error {
return s.done
}

func (s *StoreResp) SkipOne() {
s.rLock.Lock()
defer s.rLock.Unlock()

if s.IsClosed() {
return
}

s.numAll -= 1
if s.numAll == len(s.nonErrors) {
s.close()
return
}
}

func (s *StoreResp) IsClosed() bool {
return atomic.LoadInt32(&(s.isClosed)) != 0
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/relay/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type StoreReqItem struct {
RawPayload json.RawMessage
Time uint64
Pubkey types.PublicKey

// additional params
FeeRecipient types.Address
GasLimit uint64
}

// Resp respone structure
Expand Down Expand Up @@ -80,6 +84,13 @@ SendPayloads:
default:
}

if rs.regMngr.Check(p.Message) {
response.SkipOne()
rs.m.RegistrationsCacheHits.WithLabelValues("hit").Inc()
continue SendPayloads
}
rs.m.RegistrationsCacheHits.WithLabelValues("miss").Inc()

checkTime := time.Now()
o, ok := verifyOther(be, rs.regMngr, i, p)
if !ok {
Expand Down Expand Up @@ -122,6 +133,9 @@ SendPayloads:
Time: p.Message.Timestamp,
Pubkey: p.Message.Pubkey,
RawPayload: p.Raw,

FeeRecipient: p.Message.FeeRecipient,
GasLimit: p.Message.GasLimit,
}
}
rs.regMngr.SendStore(request)
Expand Down
2 changes: 2 additions & 0 deletions pkg/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type RegistrationManager interface {

SendStore(sReq StoreReq)
Get(k string) (value uint64, ok bool)

Check(*types.RegisterValidatorRequestMessage) bool
}

type RelayConfig struct {
Expand Down

0 comments on commit 1bb72a6

Please sign in to comment.