Skip to content

Commit

Permalink
Merge pull request #64 from blocknative/clean-logs
Browse files Browse the repository at this point in the history
Clean logs, add metrics and reorg code
  • Loading branch information
aratz-lasa authored Jan 9, 2023
2 parents f81866c + 199fad8 commit 360b00a
Show file tree
Hide file tree
Showing 44 changed files with 1,957 additions and 2,071 deletions.
28 changes: 28 additions & 0 deletions blstools/blstools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package blstools

import (
"github.com/flashbots/go-boost-utils/bls"
"github.com/flashbots/go-boost-utils/types"
)

func GenerateNewKeypair() (sk *bls.SecretKey, pubKey types.PublicKey, err error) {

sk, pk, err := bls.GenerateNewKeypair()
if err != nil {
return nil, pubKey, err
}
err = pubKey.FromSlice(pk.Compress()) //nolint

return sk, pubKey, err
}

func SecretKeyFromBytes(skBytes []byte) (sk *bls.SecretKey, pk types.PublicKey, err error) {
sk, err = bls.SecretKeyFromBytes(skBytes[:])
if err != nil {
return nil, types.PublicKey{}, err
}

err = pk.FromSlice(bls.PublicKeyFromSecretKey(sk).Compress()) //nolint

return sk, pk, err
}
128 changes: 61 additions & 67 deletions cmd/dreamboat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ import (

"time"

"github.com/blocknative/dreamboat/blstools"
"github.com/blocknative/dreamboat/metrics"
pkg "github.com/blocknative/dreamboat/pkg"
"github.com/blocknative/dreamboat/pkg/api"
"github.com/blocknative/dreamboat/pkg/auction"
"github.com/blocknative/dreamboat/pkg/datastore"
relay "github.com/blocknative/dreamboat/pkg/relay"
"github.com/blocknative/dreamboat/pkg/validators"
"github.com/blocknative/dreamboat/pkg/verify"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/go-boost-utils/bls"
"github.com/flashbots/go-boost-utils/types"
badger "github.com/ipfs/go-ds-badger2"
"github.com/lthibault/log"
"github.com/sirupsen/logrus"
blst "github.com/supranational/blst/bindings/go"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -213,7 +214,11 @@ func main() {

func setup() cli.BeforeFunc {
return func(c *cli.Context) (err error) {
sk, pk, err := setupKeys(c)
skBytes, err := hexutil.Decode(c.String("secretKey"))
if err != nil {
return err
}
sk, pk, err := blstools.SecretKeyFromBytes(skBytes)
if err != nil {
return err
}
Expand Down Expand Up @@ -241,27 +246,14 @@ func setup() cli.BeforeFunc {
}
}

func setupKeys(c *cli.Context) (*blst.SecretKey, types.PublicKey, error) {
skBytes, err := hexutil.Decode(c.String("secretKey"))
if err != nil {
return nil, types.PublicKey{}, err
}
sk, err := bls.SecretKeyFromBytes(skBytes[:])
if err != nil {
return nil, types.PublicKey{}, err
}

var pk types.PublicKey
err = pk.FromSlice(bls.PublicKeyFromSecretKey(sk).Compress())
return sk, pk, err
}

func run() cli.ActionFunc {
return func(c *cli.Context) error {
if err := config.Validate(); err != nil {
return err
}

logger := config.Log

domainBuilder, err := pkg.ComputeDomain(types.DomainTypeAppBuilder, config.GenesisForkVersion, types.Root{}.String())
if err != nil {
return err
Expand All @@ -277,10 +269,10 @@ func run() cli.ActionFunc {

storage, err := badger.NewDatastore(config.Datadir, &badger.DefaultOptions)
if err != nil {
config.Log.WithError(err).Error("failed to initialize datastore")
logger.WithError(err).Error("failed to initialize datastore")
return err
}
config.Log.With(log.F{
logger.With(log.F{
"service": "datastore",
"startTimeMs": time.Since(timeDataStoreStart).Milliseconds(),
}).Info("data store initialized")
Expand All @@ -291,7 +283,7 @@ func run() cli.ActionFunc {
hc := datastore.NewHeaderController(config.RelayHeaderMemorySlotLag, config.RelayHeaderMemorySlotTimeLag)
hc.AttachMetrics(m)

ds, err := datastore.NewDatastore(&datastore.TTLDatastoreBatcher{storage}, storage.DB, hc, c.Int("relay-payload-cache-size"))
ds, err := datastore.NewDatastore(&datastore.TTLDatastoreBatcher{TTLDatastore: storage}, storage.DB, hc, c.Int("relay-payload-cache-size"))
if err != nil {
return fmt.Errorf("fail to create datastore: %w", err)
}
Expand All @@ -305,20 +297,13 @@ func run() cli.ActionFunc {

go ds.MemoryCleanup(c.Context, config.RelayHeaderMemoryPurgeInterval, config.TTL)

regMgr, err := relay.NewProcessManager(config.Log, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-verify-queue-size"), c.Uint("relay-store-queue-size"), c.Int("relay-registrations-cache-size"))
if err != nil {
return fmt.Errorf("fail to create relay process manager: %w", err)
}
regMgr.AttachMetrics(m)
loadRegistrations(ds, regMgr)

go regMgr.RunCleanup(uint64(config.TTL), time.Hour)

beacon, err := initBeacon(c.Context, config)
if err != nil {
return fmt.Errorf("fail to initialize beacon: %w", err)
}
beacon.AttachMetrics(m)

v := verify.NewVerificationManager(config.Log, c.Uint("relay-verify-queue-size"))
auctioneer := auction.NewAuctioneer()
r := relay.NewRelay(config.Log, relay.RelayConfig{
BuilderSigningDomain: domainBuilder,
Expand All @@ -327,19 +312,27 @@ func run() cli.ActionFunc {
SecretKey: config.SecretKey,
TTL: config.TTL,
PublishBlock: c.Bool("relay-publish-block"),
}, beacon, as, ds, regMgr, auctioneer)
}, beacon, v, as, ds, auctioneer)
r.AttachMetrics(m)

service := pkg.NewService(config.Log, config, ds, r, as)
service.AttachMetrics(m)
service := pkg.NewService(config.Log, config, ds, as)

regStr, err := validators.NewStoreManager(config.Log, int(math.Floor(config.TTL.Seconds()/2)), c.Uint("relay-store-queue-size"), c.Int("relay-registrations-cache-size"))
if err != nil {
return fmt.Errorf("fail to initialize store manager: %w", err)
}
regStr.AttachMetrics(m)
loadRegistrations(ds, regStr, logger)
go regStr.RunCleanup(uint64(config.TTL), time.Hour)

api := api.NewApi(config.Log, service)
api.AttachMetrics(m)
regM := validators.NewRegister(config.Log, domainBuilder, as, v, regStr, ds)
a := api.NewApi(config.Log, r, regM)
a.AttachMetrics(m)

regMgr.RunStore(ds, config.TTL, c.Uint("relay-workers-store-validator"))
regMgr.RunVerify(c.Uint("relay-workers-verify"))
regStr.RunStore(ds, config.TTL, c.Uint("relay-workers-store-validator"))
v.RunVerify(c.Uint("relay-workers-verify"))

config.Log.With(log.F{
logger.With(log.F{
"service": "relay",
"startTimeMs": time.Since(timeRelayStart).Milliseconds(),
}).Info("initialized")
Expand All @@ -360,7 +353,7 @@ func run() cli.ActionFunc {
metrics.AttachProfiler(internalMux)

internalMux.Handle("/metrics", m.Handler())
config.Log.Info("internal server listening")
logger.Info("internal server listening")
internalSrv := http.Server{
Addr: c.String("internalAddr"),
Handler: internalMux,
Expand All @@ -379,10 +372,10 @@ func run() cli.ActionFunc {
case <-service.Ready():
}

config.Log.Debug("relay service ready")
logger.Info("relay service ready")

mux := http.NewServeMux()
api.AttachToHandler(mux)
a.AttachToHandler(mux)

var srv http.Server
// run the http server
Expand All @@ -395,66 +388,67 @@ func run() cli.ActionFunc {
Handler: mux,
MaxHeaderBytes: 4096,
}
config.Log.Info("http server listening")
logger.Info("http server listening")
if err = svr.ListenAndServe(); err == http.ErrServerClosed {
err = nil
}
config.Log.Info("http server finished")
logger.Info("http server finished")
return err
}(srv)

<-cContext.Done()

ctx, _ := context.WithTimeout(context.Background(), shutdownTimeout)
log.Info("Shutdown initialized")
ctx, closeC := context.WithTimeout(context.Background(), shutdownTimeout)
defer closeC()
logger.Info("Shutdown initialized")
err = srv.Shutdown(ctx)
log.Info("Shutdown returned ", err)
logger.Info("Shutdown returned ", err)

ctx, _ = context.WithTimeout(context.Background(), shutdownTimeout/2)
ctx, closeC = context.WithTimeout(context.Background(), shutdownTimeout/2)
defer closeC()
finish := make(chan struct{})
go closemanager(ctx, finish, regMgr)
go closemanager(ctx, finish, regStr)

select {
case <-finish:
case <-ctx.Done():
log.Warn("Closing manager deadline exceeded ")
logger.Warn("Closing manager deadline exceeded ")
}
return fmt.Errorf("properly exiting... %w", err) // this surprisingly has to return error

return nil
}
}

func closemanager(ctx context.Context, finish chan struct{}, regMgr *relay.ProcessManager) {
func initBeacon(ctx context.Context, config pkg.Config) (pkg.BeaconClient, error) {
clients := make([]pkg.BeaconClient, 0, len(config.BeaconEndpoints))

for _, endpoint := range config.BeaconEndpoints {
client, err := pkg.NewBeaconClient(endpoint, config)
if err != nil {
return nil, err
}
clients = append(clients, client)
}
return pkg.NewMultiBeaconClient(config.Log, clients), nil
}

func closemanager(ctx context.Context, finish chan struct{}, regMgr *validators.StoreManager) {
regMgr.Close(ctx)
finish <- struct{}{}
}

func loadRegistrations(ds *datastore.Datastore, regMgr *relay.ProcessManager) {
func loadRegistrations(ds *datastore.Datastore, regMgr *validators.StoreManager, logger log.Logger) {
reg, err := ds.GetAllRegistration()
if err == nil {
for k, v := range reg {
regMgr.Set(k, v.Message.Timestamp)
}

config.Log.With(log.F{
logger.With(log.F{
"service": "registration",
"count-elements": len(reg),
}).Info("registrations loaded")
}

}

func initBeacon(ctx context.Context, config pkg.Config) (pkg.BeaconClient, error) {
clients := make([]pkg.BeaconClient, 0, len(config.BeaconEndpoints))

for _, endpoint := range config.BeaconEndpoints {
client, err := pkg.NewBeaconClient(endpoint, config)
if err != nil {
return nil, err
}
clients = append(clients, client)
}
return pkg.NewMultiBeaconClient(config.Log, clients), nil
}

func logger(c *cli.Context) log.Logger {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/blocknative/dreamboat

go 1.18
go 1.19

require (
github.com/dgraph-io/badger/v2 v2.2007.3
Expand Down
Loading

0 comments on commit 360b00a

Please sign in to comment.