From fe7c23ed54310e18f37603e88dc8e3163af870ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mi=C5=82kowski?= Date: Thu, 2 Feb 2023 18:42:41 +0100 Subject: [PATCH] Pre-populate validator cache (#82) * Prepopulate validator cache * Debug removal --- cmd/dreamboat/main.go | 44 ++++++++++--- pkg/datastore/validator/badger/badger.go | 6 ++ pkg/datastore/validator/postgres/postgres.go | 68 +++++++++++++++++++- 3 files changed, 108 insertions(+), 10 deletions(-) diff --git a/cmd/dreamboat/main.go b/cmd/dreamboat/main.go index 38ca8f3e..254b36b5 100644 --- a/cmd/dreamboat/main.go +++ b/cmd/dreamboat/main.go @@ -206,13 +206,6 @@ var ( config pkg.Config ) -func waitForSignal(cancel context.CancelFunc, osSig chan os.Signal) { - for range osSig { - cancel() - return - } -} - // Main starts the relay func main() { app := &cli.App{ @@ -347,7 +340,7 @@ func run() cli.ActionFunc { dbURL := c.String("relay-validator-database-url") // VALIDATOR MANAGEMENT - var valDS validators.ValidatorStore + var valDS ValidatorStore if dbURL != "" { valPG, err := trPostgres.Open(dbURL, 10, 10, 10) // TODO(l): make configurable if err != nil { @@ -364,6 +357,9 @@ func run() cli.ActionFunc { return fmt.Errorf("fail to initialize validator cache: %w", err) } + // lazyload validators cache, it's optional and we don't care if it errors out + go preloadValidators(c.Context, logger, valDS, validatorCache) + validatorStoreManager := validators.NewStoreManager(config.Log, validatorCache, valDS, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-store-queue-size")) validatorStoreManager.AttachMetrics(m) validatorStoreManager.RunStore(c.Uint("relay-workers-store-validator")) @@ -418,7 +414,6 @@ func run() cli.ActionFunc { } return err }(m) - // wait for the relay service to be ready select { case <-cContext.Done(): @@ -473,6 +468,37 @@ func run() cli.ActionFunc { } } +type ValidatorStore interface { + GetRegistration(context.Context, types.PublicKey) (types.SignedValidatorRegistration, error) + PutNewerRegistration(ctx context.Context, pk types.PublicKey, registration types.SignedValidatorRegistration) error + PopulateAllRegistrations(ctx context.Context, out chan structs.ValidatorCacheEntry) error +} + +func waitForSignal(cancel context.CancelFunc, osSig chan os.Signal) { + for range osSig { + cancel() + return + } +} + +func asyncPopulateAllRegistrations(ctx context.Context, l log.Logger, vs ValidatorStore, ch chan structs.ValidatorCacheEntry) { + defer close(ch) + err := vs.PopulateAllRegistrations(ctx, ch) + if err != nil { + l.WithError(err).Warn("Cache population error") + } +} + +func preloadValidators(ctx context.Context, l log.Logger, vs ValidatorStore, vc *lru.Cache[types.PublicKey, structs.ValidatorCacheEntry]) { + ch := make(chan structs.ValidatorCacheEntry, 100) + go asyncPopulateAllRegistrations(ctx, l, vs, ch) + for v := range ch { + v := v + vc.ContainsOrAdd(v.Entry.Message.Pubkey, v) + } + l.With(log.F{"count": vc.Len()}).Info("Loaded cache validators") +} + func initBeacon(ctx context.Context, config pkg.Config) (pkg.BeaconClient, error) { clients := make([]pkg.BeaconClient, 0, len(config.BeaconEndpoints)) diff --git a/pkg/datastore/validator/badger/badger.go b/pkg/datastore/validator/badger/badger.go index 01ffdfe1..decf1c62 100644 --- a/pkg/datastore/validator/badger/badger.go +++ b/pkg/datastore/validator/badger/badger.go @@ -9,6 +9,7 @@ import ( "time" "github.com/blocknative/dreamboat/pkg/datastore" + "github.com/blocknative/dreamboat/pkg/structs" "github.com/flashbots/go-boost-utils/types" ds "github.com/ipfs/go-datastore" ) @@ -78,3 +79,8 @@ func (s *Datastore) GetRegistration(ctx context.Context, pk types.PublicKey) (sv err = json.Unmarshal(data, &svr) return svr, err } + +func (s *Datastore) PopulateAllRegistrations(ctx context.Context, out chan structs.ValidatorCacheEntry) error { + // NOOP - to be supported + return nil +} diff --git a/pkg/datastore/validator/postgres/postgres.go b/pkg/datastore/validator/postgres/postgres.go index 5604ff64..96f571b1 100644 --- a/pkg/datastore/validator/postgres/postgres.go +++ b/pkg/datastore/validator/postgres/postgres.go @@ -6,6 +6,7 @@ import ( "time" "github.com/blocknative/dreamboat/pkg/datastore" + "github.com/blocknative/dreamboat/pkg/structs" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/flashbots/go-boost-utils/types" ) @@ -18,6 +19,68 @@ func NewDatastore(db *sql.DB) *Datastore { return &Datastore{DB: db} } +func (s *Datastore) PopulateAllRegistrations(ctx context.Context, out chan structs.ValidatorCacheEntry) error { + rows, err := s.DB.QueryContext(ctx, `SELECT signature, updated_at, pubkey, fee_recipient, gas_limit, reg_time FROM validator_registrations`) + if err != nil { + return err + } + defer rows.Close() + + var ( + signature string + feeRecipient string + pubkey string + registrationTime time.Time + ) + + for rows.Next() { + vce := structs.ValidatorCacheEntry{ + Entry: types.SignedValidatorRegistration{ + Message: &types.RegisterValidatorRequestMessage{}, + }, + } + + if err := rows.Scan(&signature, &vce.Time, &pubkey, &feeRecipient, &vce.Entry.Message.GasLimit, ®istrationTime); err != nil { + if err == sql.ErrNoRows { + return datastore.ErrNotFound + } + return err + } + + vce.Entry.Message.Timestamp = uint64(registrationTime.Unix()) + + decsig, err := hexutil.Decode(signature) + if err != nil { + return err + } + + if err = vce.Entry.Signature.FromSlice(decsig); err != nil { + return err + } + + decRecip, err := hexutil.Decode(feeRecipient) + if err != nil { + return err + } + + if err = vce.Entry.Message.FeeRecipient.FromSlice(decRecip); err != nil { + return err + } + + decPub, err := hexutil.Decode(pubkey) + if err != nil { + return err + } + + if err = vce.Entry.Message.Pubkey.FromSlice(decPub); err != nil { + return err + } + + out <- vce + } + return nil +} + func (s *Datastore) GetRegistration(ctx context.Context, pk types.PublicKey) (types.SignedValidatorRegistration, error) { row := s.DB.QueryRowContext(ctx, `SELECT signature, fee_recipient, gas_limit, reg_time FROM validator_registrations WHERE pubkey = $1 LIMIT 1;`, pk.String()) reg := types.SignedValidatorRegistration{ @@ -31,7 +94,10 @@ func (s *Datastore) GetRegistration(ctx context.Context, pk types.PublicKey) (ty t time.Time ) - if err := row.Scan(&signature, &feeRecipient, ®.Message.GasLimit, &t); err == sql.ErrNoRows { + if err := row.Scan(&signature, &feeRecipient, ®.Message.GasLimit, &t); err != nil { + if err == sql.ErrNoRows { + return reg, datastore.ErrNotFound + } return reg, datastore.ErrNotFound }