Skip to content

Commit

Permalink
Database Registrations (#81)
Browse files Browse the repository at this point in the history
* Initial commit

* Change submit

* Next batch

* remove duplicated line

* Regenerate mocks

* Fix relay's tests

* Submit data cache add

* add postgress + fix issues

* change cache struct

* Fix service tests

* Cleanup

* cleanup

* next cleanup

* Extend proposer duties warn.

* Add registration cache flag
  • Loading branch information
lukanus authored Feb 1, 2023
1 parent a2b7465 commit e0d75dc
Show file tree
Hide file tree
Showing 33 changed files with 3,342 additions and 1,365 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ cmd/scratch/*
# profiling
*.prof
*.svg

.data
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
build:
go build ./cmd/dreamboat

build-cli:
go build ./cmd/test-cli
build-migration:
go build -o migration-postgres ./cmd/migration/postgres

# Mock testing
mocks: clean-mocks
# This roundabout call to 'go generate' allows us to:
# - use modules
# - prevent grep missing (totally fine) from causing nonzero exit
# - mirror the pkg/ structure under internal/test/mock
# - prevent grep missing (totally fine) from causing nonzero exit
@find . -name '*.go' | xargs -I{} grep -l '//go:generate' {} | xargs -I{} -P 10 go generate {}

clean-mocks:
@find . -name 'mock_*.go' | xargs -I{} rm {}
@find . -name 'mocks.go' | xargs -I{} rm {}
95 changes: 56 additions & 39 deletions cmd/dreamboat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ import (
"github.com/blocknative/dreamboat/pkg/auction"
"github.com/blocknative/dreamboat/pkg/datastore"
relay "github.com/blocknative/dreamboat/pkg/relay"
"github.com/blocknative/dreamboat/pkg/structs"
"github.com/blocknative/dreamboat/pkg/validators"
"github.com/blocknative/dreamboat/pkg/verify"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/go-boost-utils/types"

trPostgres "github.com/blocknative/dreamboat/pkg/datastore/transport/postgres"
valPostgres "github.com/blocknative/dreamboat/pkg/datastore/validator/postgres"

valBadger "github.com/blocknative/dreamboat/pkg/datastore/validator/badger"

lru "github.com/hashicorp/golang-lru/v2"
badger "github.com/ipfs/go-ds-badger2"
"github.com/lthibault/log"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -168,12 +176,24 @@ var flags = []cli.Flag{
Value: 600_000,
EnvVars: []string{"RELAY_REGISTRATIONS_CACHE_SIZE"},
},
&cli.DurationFlag{
Name: "relay-registrations-cache-ttl",
Usage: "registrations cache ttl",
Value: time.Hour,
EnvVars: []string{"RELAY_REGISTRATIONS_CACHE_TTL"},
},
&cli.BoolFlag{
Name: "relay-publish-block",
Usage: "flag for publishing payloads to beacon nodes after a delivery",
Value: false,
EnvVars: []string{"RELAY_PUBLISH_BLOCK"},
},
&cli.StringFlag{
Name: "relay-validator-database-url",
Usage: "address of postgress database for validator registrations, if empty - default, badger will be used",
Value: "",
EnvVars: []string{"RELAY_VALIDATOR_DATABASE_URL"},
},
&cli.BoolFlag{
Name: "relay-fast-boot",
Usage: "speed up booting up of relay, adding temporary inconsistency on the builder_blocks_received endpoint",
Expand Down Expand Up @@ -284,7 +304,7 @@ func run() cli.ActionFunc {
}).Info("data store initialized")

timeRelayStart := time.Now()
as := &pkg.AtomicState{}
state := &pkg.AtomicState{}

hc := datastore.NewHeaderController(config.RelayHeaderMemorySlotLag, config.RelayHeaderMemorySlotTimeLag)
hc.AttachMetrics(m)
Expand Down Expand Up @@ -322,39 +342,50 @@ func run() cli.ActionFunc {
}
beacon.AttachMetrics(m)

v := verify.NewVerificationManager(config.Log, c.Uint("relay-verify-queue-size"))
verificator := verify.NewVerificationManager(config.Log, c.Uint("relay-verify-queue-size"))
verificator.RunVerify(c.Uint("relay-workers-verify"))

dbURL := c.String("relay-validator-database-url")
// VALIDATOR MANAGEMENT
var valDS validators.ValidatorStore
if dbURL != "" {
valPG, err := trPostgres.Open(dbURL, 10, 10, 10) // TODO(l): make configurable
if err != nil {
return fmt.Errorf("failed to connect to the database: %w", err)
}
m.RegisterDB(valPG, "registrations")
valDS = valPostgres.NewDatastore(valPG)
} else { // by default use existsing storage
valDS = valBadger.NewDatastore(storage, config.TTL)
}

validatorCache, err := lru.New[types.PublicKey, structs.ValidatorCacheEntry](c.Int("relay-registrations-cache-size"))
if err != nil {
return fmt.Errorf("fail to initialize validator cache: %w", err)
}

validatorStoreManager := validators.NewStoreManager(config.Log, validatorCache, valDS, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-store-queue-size"))
validatorStoreManager.AttachMetrics(m)
validatorStoreManager.RunStore(c.Uint("relay-workers-store-validator"))

validatorRelay := validators.NewRegister(config.Log, domainBuilder, state, verificator, validatorStoreManager)
validatorRelay.AttachMetrics(m)
service := pkg.NewService(config.Log, config, state)

auctioneer := auction.NewAuctioneer()
r := relay.NewRelay(config.Log, relay.RelayConfig{
BuilderSigningDomain: domainBuilder,
ProposerSigningDomain: domainBeaconProposer,
PubKey: config.PubKey,
SecretKey: config.SecretKey,
RegistrationCacheTTL: c.Duration("relay-registrations-cache-ttl"),
TTL: config.TTL,
PublishBlock: c.Bool("relay-publish-block"),
}, beacon, v, as, ds, auctioneer)
}, beacon, validatorCache, valDS, verificator, state, ds, auctioneer)
r.AttachMetrics(m)

service := pkg.NewService(config.Log, config, ds, as)

regStr, err := validators.NewStoreManager(config.Log, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-store-queue-size"), c.Int("relay-registrations-cache-size"))
if err != nil {
return fmt.Errorf("fail to initialize store manager: %w", err)
}
regStr.AttachMetrics(m)
if !c.Bool("relay-fast-boot") {
loadRegistrations(ds, regStr, logger)
}
go regStr.RunCleanup(uint64(config.TTL), time.Hour)

regM := validators.NewRegister(config.Log, domainBuilder, as, v, regStr, ds)
regM.AttachMetrics(m)

a := api.NewApi(config.Log, r, regM)
a := api.NewApi(config.Log, r, validatorRelay)
a.AttachMetrics(m)

regStr.RunStore(ds, config.TTL, c.Uint("relay-workers-store-validator"))
v.RunVerify(c.Uint("relay-workers-verify"))

logger.With(log.F{
"service": "relay",
"startTimeMs": time.Since(timeRelayStart).Milliseconds(),
Expand All @@ -363,7 +394,7 @@ func run() cli.ActionFunc {
cContext, cancel := context.WithCancel(c.Context)
go func(s *pkg.Service) error {
config.Log.Info("initialized beacon")
err := s.RunBeacon(cContext, beacon)
err := s.RunBeacon(cContext, beacon, validatorStoreManager, validatorCache)
if err != nil {
cancel()
}
Expand Down Expand Up @@ -430,7 +461,7 @@ func run() cli.ActionFunc {
ctx, closeC = context.WithTimeout(context.Background(), shutdownTimeout/2)
defer closeC()
finish := make(chan struct{})
go closemanager(ctx, finish, regStr)
go closemanager(ctx, finish, validatorStoreManager)

select {
case <-finish:
Expand Down Expand Up @@ -460,20 +491,6 @@ func closemanager(ctx context.Context, finish chan struct{}, regMgr *validators.
finish <- struct{}{}
}

func loadRegistrations(ds *datastore.Datastore, regMgr *validators.StoreManager, logger log.Logger) {
reg, err := ds.GetAllRegistration()
if err == nil {
for k, v := range reg {
regMgr.Set(k, v.Message.Timestamp)
}

logger.With(log.F{
"service": "registration",
"count-elements": len(reg),
}).Info("registrations loaded")
}
}

func logger(c *cli.Context) log.Logger {
return log.New(
withLevel(c),
Expand Down
92 changes: 92 additions & 0 deletions cmd/migration/postgres/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"database/sql"
"embed"
"errors"
"flag"
"log"
"os"

migrate "github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"

"github.com/golang-migrate/migrate/v4/source/iofs"
)

//go:embed migrations/*
var content embed.FS

type flags struct {
databaseURL string
migrationType string
version uint
verbose bool
}

var cf = flags{}

func init() {
flag.StringVar(&cf.databaseURL, "db", "", "Database URL")
flag.StringVar(&cf.migrationType, "migrations", "", `Type of migration (available: "validators")`)
flag.BoolVar(&cf.verbose, "verbose", true, "Verbosity of logs during run")
flag.UintVar(&cf.version, "version", 0, "Version parameter sets db changes to specified revision (up or down)")
flag.Parse()
}

func main() {
log.SetOutput(os.Stdout)
// Initialize configuration
if cf.databaseURL == "" {
log.Fatal(errors.New("database url is not set"))
}

if err := migrateDB(cf.databaseURL); err != nil {
log.Fatal(err)
}
}

func migrateDB(dburl string) error {
db, err := sql.Open("postgres", dburl)
if err != nil {
log.Fatal(err)
}
defer db.Close()

if cf.migrationType != "validators" { // it will have more
log.Fatal("migration type (`migrations`) is not properly set")
}

d, err := iofs.New(content, "migrations/"+cf.migrationType)
if err != nil {
log.Fatal(err)
}

m, err := migrate.NewWithSourceInstance("iofs", d, dburl)
if err != nil {
return err
}
defer m.Close()

if cf.version > 0 {
if cf.verbose {
log.Println("Migrating to version: ", cf.version)
}

if err := m.Migrate(cf.version); err != nil {
return err
}
} else {
err = m.Up()
}

if err != nil {
if err != migrate.ErrNoChange {
return err
}
if cf.verbose {
log.Println("No change")
}
}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS validator_registrations;

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS validator_registrations (
pubkey VARCHAR(98) NOT NULL,
fee_recipient VARCHAR(42) NOT NULL,
gas_limit bigint NOT NULL,
reg_time timestamp,
signature VARCHAR NOT NULL,
updated_at timestamp,
UNIQUE(pubkey)
);
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/dgraph-io/badger/v2 v2.2007.3
github.com/ethereum/go-ethereum v1.10.25
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.1
Expand Down Expand Up @@ -41,13 +42,15 @@ require (
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.0 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
Expand Down
Loading

0 comments on commit e0d75dc

Please sign in to comment.