Skip to content

Commit

Permalink
Pre-populate validator cache (#82)
Browse files Browse the repository at this point in the history
* Prepopulate validator cache

* Debug removal
  • Loading branch information
lukanus authored Feb 2, 2023
1 parent e0d75dc commit fe7c23e
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 10 deletions.
44 changes: 35 additions & 9 deletions cmd/dreamboat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,6 @@ var (
config pkg.Config
)

func waitForSignal(cancel context.CancelFunc, osSig chan os.Signal) {
for range osSig {
cancel()
return
}
}

// Main starts the relay
func main() {
app := &cli.App{
Expand Down Expand Up @@ -347,7 +340,7 @@ func run() cli.ActionFunc {

dbURL := c.String("relay-validator-database-url")
// VALIDATOR MANAGEMENT
var valDS validators.ValidatorStore
var valDS ValidatorStore
if dbURL != "" {
valPG, err := trPostgres.Open(dbURL, 10, 10, 10) // TODO(l): make configurable
if err != nil {
Expand All @@ -364,6 +357,9 @@ func run() cli.ActionFunc {
return fmt.Errorf("fail to initialize validator cache: %w", err)
}

// lazyload validators cache, it's optional and we don't care if it errors out
go preloadValidators(c.Context, logger, valDS, validatorCache)

validatorStoreManager := validators.NewStoreManager(config.Log, validatorCache, valDS, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-store-queue-size"))
validatorStoreManager.AttachMetrics(m)
validatorStoreManager.RunStore(c.Uint("relay-workers-store-validator"))
Expand Down Expand Up @@ -418,7 +414,6 @@ func run() cli.ActionFunc {
}
return err
}(m)

// wait for the relay service to be ready
select {
case <-cContext.Done():
Expand Down Expand Up @@ -473,6 +468,37 @@ func run() cli.ActionFunc {
}
}

type ValidatorStore interface {
GetRegistration(context.Context, types.PublicKey) (types.SignedValidatorRegistration, error)
PutNewerRegistration(ctx context.Context, pk types.PublicKey, registration types.SignedValidatorRegistration) error
PopulateAllRegistrations(ctx context.Context, out chan structs.ValidatorCacheEntry) error
}

func waitForSignal(cancel context.CancelFunc, osSig chan os.Signal) {
for range osSig {
cancel()
return
}
}

func asyncPopulateAllRegistrations(ctx context.Context, l log.Logger, vs ValidatorStore, ch chan structs.ValidatorCacheEntry) {
defer close(ch)
err := vs.PopulateAllRegistrations(ctx, ch)
if err != nil {
l.WithError(err).Warn("Cache population error")
}
}

func preloadValidators(ctx context.Context, l log.Logger, vs ValidatorStore, vc *lru.Cache[types.PublicKey, structs.ValidatorCacheEntry]) {
ch := make(chan structs.ValidatorCacheEntry, 100)
go asyncPopulateAllRegistrations(ctx, l, vs, ch)
for v := range ch {
v := v
vc.ContainsOrAdd(v.Entry.Message.Pubkey, v)
}
l.With(log.F{"count": vc.Len()}).Info("Loaded cache validators")
}

func initBeacon(ctx context.Context, config pkg.Config) (pkg.BeaconClient, error) {
clients := make([]pkg.BeaconClient, 0, len(config.BeaconEndpoints))

Expand Down
6 changes: 6 additions & 0 deletions pkg/datastore/validator/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/blocknative/dreamboat/pkg/datastore"
"github.com/blocknative/dreamboat/pkg/structs"
"github.com/flashbots/go-boost-utils/types"
ds "github.com/ipfs/go-datastore"
)
Expand Down Expand Up @@ -78,3 +79,8 @@ func (s *Datastore) GetRegistration(ctx context.Context, pk types.PublicKey) (sv
err = json.Unmarshal(data, &svr)
return svr, err
}

func (s *Datastore) PopulateAllRegistrations(ctx context.Context, out chan structs.ValidatorCacheEntry) error {
// NOOP - to be supported
return nil
}
68 changes: 67 additions & 1 deletion pkg/datastore/validator/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/blocknative/dreamboat/pkg/datastore"
"github.com/blocknative/dreamboat/pkg/structs"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/go-boost-utils/types"
)
Expand All @@ -18,6 +19,68 @@ func NewDatastore(db *sql.DB) *Datastore {
return &Datastore{DB: db}
}

func (s *Datastore) PopulateAllRegistrations(ctx context.Context, out chan structs.ValidatorCacheEntry) error {
rows, err := s.DB.QueryContext(ctx, `SELECT signature, updated_at, pubkey, fee_recipient, gas_limit, reg_time FROM validator_registrations`)
if err != nil {
return err
}
defer rows.Close()

var (
signature string
feeRecipient string
pubkey string
registrationTime time.Time
)

for rows.Next() {
vce := structs.ValidatorCacheEntry{
Entry: types.SignedValidatorRegistration{
Message: &types.RegisterValidatorRequestMessage{},
},
}

if err := rows.Scan(&signature, &vce.Time, &pubkey, &feeRecipient, &vce.Entry.Message.GasLimit, &registrationTime); err != nil {
if err == sql.ErrNoRows {
return datastore.ErrNotFound
}
return err
}

vce.Entry.Message.Timestamp = uint64(registrationTime.Unix())

decsig, err := hexutil.Decode(signature)
if err != nil {
return err
}

if err = vce.Entry.Signature.FromSlice(decsig); err != nil {
return err
}

decRecip, err := hexutil.Decode(feeRecipient)
if err != nil {
return err
}

if err = vce.Entry.Message.FeeRecipient.FromSlice(decRecip); err != nil {
return err
}

decPub, err := hexutil.Decode(pubkey)
if err != nil {
return err
}

if err = vce.Entry.Message.Pubkey.FromSlice(decPub); err != nil {
return err
}

out <- vce
}
return nil
}

func (s *Datastore) GetRegistration(ctx context.Context, pk types.PublicKey) (types.SignedValidatorRegistration, error) {
row := s.DB.QueryRowContext(ctx, `SELECT signature, fee_recipient, gas_limit, reg_time FROM validator_registrations WHERE pubkey = $1 LIMIT 1;`, pk.String())
reg := types.SignedValidatorRegistration{
Expand All @@ -31,7 +94,10 @@ func (s *Datastore) GetRegistration(ctx context.Context, pk types.PublicKey) (ty
t time.Time
)

if err := row.Scan(&signature, &feeRecipient, &reg.Message.GasLimit, &t); err == sql.ErrNoRows {
if err := row.Scan(&signature, &feeRecipient, &reg.Message.GasLimit, &t); err != nil {
if err == sql.ErrNoRows {
return reg, datastore.ErrNotFound
}
return reg, datastore.ErrNotFound
}

Expand Down

0 comments on commit fe7c23e

Please sign in to comment.