diff --git a/.gitignore b/.gitignore index c7dc39ef..8c28f278 100644 --- a/.gitignore +++ b/.gitignore @@ -29,4 +29,6 @@ cmd/scratch/* .data /dreamboat -/migration-postgres \ No newline at end of file +/migration-postgres + +config.ini \ No newline at end of file diff --git a/api/api_test.go b/api/api_test.go index 7447d5a3..3c3f25a6 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -111,7 +111,7 @@ func TestServerRouting(t *testing.T) { service := mock_relay.NewMockRelay(ctrl) c, _ := lru.New[[48]byte, *rate.Limiter](1000) - server := api.NewApi(logger, &ee, service, nil, nil, api.NewLimitter(1, 1, c, nil), TestDataLimit, false) + server := api.NewApi(logger, &ee, service, nil, nil, api.NewLimitter(1, 1, c), TestDataLimit, false) m := http.NewServeMux() server.AttachToHandler(m) @@ -403,7 +403,7 @@ func BenchmarkAPISequential(b *testing.B) { service := mock_relay.NewMockRelay(ctrl) register := mock_relay.NewMockRegistrations(ctrl) //Log: log.New(log.WithWriter(ioutil.Discard)), - server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil, nil), TestDataLimit, false) + server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil), TestDataLimit, false) m := http.NewServeMux() server.AttachToHandler(m) @@ -434,7 +434,7 @@ func BenchmarkAPIParallel(b *testing.B) { register := mock_relay.NewMockRegistrations(ctrl) //Log: log.New(log.WithWriter(ioutil.Discard)), - server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil, nil), TestDataLimit, false) + server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil), TestDataLimit, false) m := http.NewServeMux() server.AttachToHandler(m) diff --git a/api/limitter.go b/api/limitter.go index 1cbc1dd6..545d3c34 100644 --- a/api/limitter.go +++ b/api/limitter.go @@ -25,12 +25,11 @@ type Limitter struct { LimitterCacheSize int } -func NewLimitter(ratel int, burst int, c Cache, ab map[[48]byte]struct{}) *Limitter { +func NewLimitter(ratel int, burst int, c Cache) *Limitter { return &Limitter{ - AllowedBuilders: ab, - c: c, - RateLimit: rate.Limit(ratel), - Burst: burst, + c: c, + RateLimit: rate.Limit(ratel), + Burst: burst, } } @@ -51,6 +50,11 @@ func (l *Limitter) Allow(ctx context.Context, pubkey [48]byte) error { } return nil +} + +func (l *Limitter) ParseInitialConfig(keys []string) (err error) { + l.AllowedBuilders, err = makeKeyMap(keys) + return err } @@ -68,16 +72,25 @@ func (l *Limitter) OnConfigChange(c structs.OldNew) (err error) { } case "AllowedBuilders": if keys, ok := c.New.([]string); ok { - newKeys := make(map[[48]byte]struct{}) - for _, key := range keys { - var pk types.PublicKey - if err = pk.UnmarshalText([]byte(key)); err != nil { - return fmt.Errorf("ALLOWED BUILDER NOT ADDED - wrong public key: %s - %w", key, err) - } - newKeys[pk] = struct{}{} + ab, err := makeKeyMap(keys) + if err != nil { + return err } - l.AllowedBuilders = newKeys + l.c.Purge() + l.AllowedBuilders = ab } } return nil } + +func makeKeyMap(keys []string) (map[[48]byte]struct{}, error) { + newKeys := make(map[[48]byte]struct{}) + for _, key := range keys { + var pk types.PublicKey + if err := pk.UnmarshalText([]byte(key)); err != nil { + return nil, fmt.Errorf("allowed builder not added - wrong public key: %s - %w", key, err) + } + newKeys[pk] = struct{}{} + } + return newKeys, nil +} diff --git a/beacon/manager.go b/beacon/manager.go index ded154a0..fc8eca1f 100644 --- a/beacon/manager.go +++ b/beacon/manager.go @@ -14,10 +14,6 @@ import ( "github.com/lthibault/log" ) -const ( - Version = "0.3.6" -) - var ( ErrUnkownFork = errors.New("beacon node fork is unknown") ) diff --git a/cmd/dreamboat/config/config.go b/cmd/dreamboat/config/config.go index 36decdf9..726f2b16 100644 --- a/cmd/dreamboat/config/config.go +++ b/cmd/dreamboat/config/config.go @@ -44,9 +44,9 @@ type Config struct { } var DefaultHTTPConfig = &HTTPConfig{ - ReadTimeout: 2 * time.Second, - WriteTimeout: 2 * time.Second, - IdleTimeout: 2 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 5 * time.Second, } type HTTPConfig struct { @@ -73,7 +73,7 @@ type SQLConfig struct { } var DefaultBadgerDBConfig = &BadgerDBConfig{ - TTL: 48 * time.Hour, + TTL: 24 * time.Hour, } type BadgerDBConfig struct { @@ -110,8 +110,9 @@ type ApiConfig struct { } var DefaultRelayConfig = &RelayConfig{ - PublishBlock: true, - MaxBlockPublishDelay: 500 * time.Millisecond, + PublishBlock: true, + GetPayloadResponseDelay: 800 * time.Millisecond, + GetPayloadRequestTimeLimit: 4 * time.Second, } type RelayConfig struct { @@ -127,7 +128,10 @@ type RelayConfig struct { PublishBlock bool `config:"publish_block"` // block publish delay - MaxBlockPublishDelay time.Duration `config:"max_block_publish_delay,allow_dynamic"` + GetPayloadResponseDelay time.Duration `config:"get_payload_response_delay,allow_dynamic"` + + // deadline for calling get Payload + GetPayloadRequestTimeLimit time.Duration `config:"get_payload_request_time_limit,allow_dynamic"` // comma separated list of allowed builder pubkeys" AllowedBuilders []string `config:"allowed_builders,allow_dynamic"` // map[[48]byte]struct{} @@ -136,7 +140,7 @@ type RelayConfig struct { var DefaultBeaconConfig = &BeaconConfig{ PayloadAttributesSubscription: true, EventRestart: 5, - EventTimeout: 26 * time.Second, + EventTimeout: 16 * time.Second, QueryTimeout: 20 * time.Second, } @@ -162,12 +166,18 @@ type BeaconConfig struct { QueryTimeout time.Duration `config:"query_timeout"` } -var DefaultBlockSimulation = &BlockSimulationConfig{} +var DefaultBlockSimulation = &BlockSimulationConfig{ + WS: &BlockSimulationWSConfig{ + Retry: true, + }, + RPC: &BlockSimulationRPCConfig{}, + HTTP: &BlockSimulationHTTPConfig{}, +} type BlockSimulationConfig struct { - RPC BlockSimulationRPCConfig `config:"rpc"` - WS BlockSimulationWSConfig `config:"ws"` - HTTP BlockSimulationHTTPConfig `config:"http"` + RPC *BlockSimulationRPCConfig `config:"rpc"` + WS *BlockSimulationWSConfig `config:"ws"` + HTTP *BlockSimulationHTTPConfig `config:"http"` } type BlockSimulationRPCConfig struct { @@ -180,7 +190,7 @@ type BlockSimulationWSConfig struct { // block validation endpoint address (comma separated list) Address []string `config:"address,allow_dynamic"` // retry to other websocket connections on failure" - Retry bool `config:"retry"` + Retry bool `config:"retry,allow_dynamic"` } type BlockSimulationHTTPConfig struct { @@ -196,11 +206,11 @@ type ValidatorsConfig struct { // The size of response queue, should be set to expected number of validators in one request QueueSize uint `config:"queue_size"` // Number of workers storing validators in parallel - StoreWorkersNum uint64 `config:"store_workers"` + StoreWorkersNum uint `config:"store_workers"` // Registrations cache size RegistrationsCacheSize int `config:"registrations_cache_size"` // Registrations cache ttl - RegistrationsCacheTTL time.Duration `config:"registrations_cache_ttl"` + RegistrationsReadCacheTTL time.Duration `config:"registrations_cache_ttl"` // Registrations cache ttl RegistrationsWriteCacheTTL time.Duration `config:"registrations_write_cache_ttl"` } @@ -211,7 +221,7 @@ var DefaultValidatorsConfig = &ValidatorsConfig{ QueueSize: 100_000, StoreWorkersNum: 400, RegistrationsCacheSize: 600_000, - RegistrationsCacheTTL: time.Hour, + RegistrationsReadCacheTTL: time.Hour, RegistrationsWriteCacheTTL: 12 * time.Hour, } @@ -243,25 +253,34 @@ type PayloadConfig struct { // BadgerDB config Badger BadgerDBConfig `config:"badger"` // number of payloads to cache for fast in-memory reads + CacheSize int `config:"cache_size"` // Redis config Redis RedisDBConfig `config:"redis"` + + // TTL of payload data + TTL time.Duration `config:"TTL,allow_dynamic"` } var DefaultPayloadConfig = &PayloadConfig{ Badger: *DefaultBadgerDBConfig, + TTL: 24 * time.Hour, CacheSize: 1_000, + Redis: *DefaultRedisDBConfig, } type RedisDBConfig struct { - Master RedisConfig `config:"master"` - Replica RedisConfig `config:"replica"` + Read *RedisConfig `config:"read"` + Write *RedisConfig `config:"write"` } -type WarehouseConfig struct { - Enabled bool `config:"enabled"` +var DefaultRedisDBConfig = &RedisDBConfig{ + Read: &RedisConfig{}, + Write: &RedisConfig{}, +} +type WarehouseConfig struct { // Data directory where the data is stored in the warehouse Directory string `config:"directory"` @@ -269,11 +288,10 @@ type WarehouseConfig struct { WorkerNumber int `config:"workers"` // Size of the buffer for processing requests - Buffer int `config:"directory"` + Buffer int `config:"buffer"` } var DefaultWarehouseConfig = &WarehouseConfig{ - Enabled: true, Directory: "/data/relay/warehouse", WorkerNumber: 32, Buffer: 1_000, @@ -282,24 +300,25 @@ var DefaultWarehouseConfig = &WarehouseConfig{ type DistributedConfig struct { Redis *RedisStreamConfig `config:"redis"` - Enabled bool `config:"enabled"` InstanceID string `config:"id"` // Number of workers for storing data in warehouse, if 0, then data is not exported WorkerNumber int `config:"workers"` - // publish all submitted blocks into pubsub. If false, only blocks returned in GetHeader are published - PublishOnSubmission bool `config:"publish_on_submission"` - // Stream internal channel size - StreamQueueSize int + StreamQueueSize int `config:"stream_queue_size"` + + // stream entire block for every bid that is served in GetHeader requests. + StreamServedBids bool `config:"stream_served_bids"` } var DefaultDistributedConfig = &DistributedConfig{ - Enabled: true, - WorkerNumber: 100, - PublishOnSubmission: false, - StreamQueueSize: 200, + Redis: &RedisStreamConfig{ + Topic: "relay", + }, + WorkerNumber: 100, + StreamQueueSize: 100, + StreamServedBids: true, } type RedisStreamConfig struct { diff --git a/cmd/dreamboat/config/manager.go b/cmd/dreamboat/config/manager.go index dd20f4ea..f2cd23e9 100644 --- a/cmd/dreamboat/config/manager.go +++ b/cmd/dreamboat/config/manager.go @@ -33,6 +33,8 @@ func DefaultConfig() Config { Validators: DefaultValidatorsConfig, Payload: DefaultPayloadConfig, DataAPI: DefaultDataAPIConfig, + Warehouse: DefaultWarehouseConfig, + Distributed: DefaultDistributedConfig, } c.ExternalHttp.Address = "0.0.0.0:18550" c.InternalHttp.Address = "0.0.0.0:19550" diff --git a/cmd/dreamboat/config/source/file/file.go b/cmd/dreamboat/config/source/file/file.go index 074c62c7..4ab0f8ea 100644 --- a/cmd/dreamboat/config/source/file/file.go +++ b/cmd/dreamboat/config/source/file/file.go @@ -47,6 +47,7 @@ func parseIni(r io.Reader, cfg *config.Config, initial bool) (e error) { var ( currentSection *reflect.Value + currentTag string sRoot config.Propagator ) @@ -66,19 +67,12 @@ func parseIni(r io.Reader, cfg *config.Config, initial bool) (e error) { return errors.New("parse failure") } tag = strings.TrimSpace(tag) - for i := 0; i < t.NumField(); i++ { f := t.Field(i) if name, ok := f.Tag.Lookup("config"); ok && name == tag { a := elem.FieldByName(f.Name) + currentTag = tag currentSection = &a - /* - for j := 0; j < currentSection.NumMethod(); j++ { - m := t.Method(j) - if m.Name == "Propagate" { - sRoot = currentSection.Interface().(config.Propagator) - } - }*/ continue } } @@ -99,11 +93,11 @@ func parseIni(r io.Reader, cfg *config.Config, initial bool) (e error) { if !currentSection.IsNil() { a := currentSection.Elem() - if err := parseParam(&a, sRoot, strings.TrimSpace(key), strings.TrimSpace(value), initial); err != nil { + if err := parseParam(&a, sRoot, currentTag+"."+strings.TrimSpace(key), strings.TrimSpace(key), strings.TrimSpace(value), initial); err != nil { return err } } else { - if err := parseParam(currentSection, sRoot, strings.TrimSpace(key), strings.TrimSpace(value), initial); err != nil { + if err := parseParam(currentSection, sRoot, currentTag+"."+strings.TrimSpace(key), strings.TrimSpace(key), strings.TrimSpace(value), initial); err != nil { return err } } @@ -112,7 +106,7 @@ func parseIni(r io.Reader, cfg *config.Config, initial bool) (e error) { if currentSection.Kind() == reflect.Struct { a := currentSection.Elem() - if err := parseParam(&a, sRoot, strings.TrimSpace(key), strings.TrimSpace(value), initial); err != nil { + if err := parseParam(&a, sRoot, currentTag+"."+strings.TrimSpace(key), strings.TrimSpace(key), strings.TrimSpace(value), initial); err != nil { return err } continue @@ -129,15 +123,10 @@ func parseIni(r io.Reader, cfg *config.Config, initial bool) (e error) { return nil } -func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagator, key, value string, initial bool) error { +func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagator, fullPath, key, value string, initial bool) error { key, rest, _ := strings.Cut(key, ".") sRoot := subscribtionRoot - /* - if currentSection.Kind() == reflect.Ptr { - a := currentSection.Elem() - currentSection = &a - }*/ t := currentSection.Type() for j := 0; j < t.NumMethod(); j++ { m := t.Method(j) @@ -169,9 +158,10 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato } if sRoot != nil { sRoot.Propagate(structs.OldNew{ - Name: f.Name, // or maybe `name` - New: el, - Old: b, + Name: f.Name, + ParamPath: fullPath, + New: el, + Old: b, }) } el.Set(reflect.ValueOf(b)) @@ -182,7 +172,7 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato switch f.Type.Kind() { case reflect.Pointer: v := el.Elem() - if err := parseParam(&v, sRoot, rest, value, initial); err != nil { + if err := parseParam(&v, sRoot, fullPath, rest, value, initial); err != nil { return err } @@ -193,28 +183,27 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato } if el.Bool() != b { if !initial { - log.Println("different value, setting bool ", b) if !inArr(params, "allow_dynamic") { return fmt.Errorf("dynamic change of %s parameter is not allowed ", key) } if sRoot != nil { sRoot.Propagate(structs.OldNew{ - Name: f.Name, - New: b, - Old: el.Bool(), + Name: f.Name, + ParamPath: fullPath, + New: b, + Old: el.Bool(), }) } } el.SetBool(b) } - case reflect.Uint: + case reflect.Uint, reflect.Uint64: uintP, err := paramParseUint(v) if err != nil { return err } if el.Uint() != uintP { if !initial { - log.Println("different value, setting ui ", uintP) if !inArr(params, "allow_dynamic") { return fmt.Errorf("dynamic change of %s parameter is not allowed ", key) @@ -222,15 +211,16 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato if sRoot != nil { sRoot.Propagate(structs.OldNew{ - Name: f.Name, - New: uintP, - Old: el.Uint(), + Name: f.Name, + ParamPath: fullPath, + New: uintP, + Old: el.Uint(), }) } } el.SetUint(uintP) } - case reflect.Int: + case reflect.Int, reflect.Int64: intP, err := paramParseInt(v) if err != nil { return err @@ -240,13 +230,12 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato if !inArr(params, "allow_dynamic") { return fmt.Errorf("dynamic change of %s parameter is not allowed ", key) } - - log.Println("different value, setting i ", intP) if sRoot != nil { sRoot.Propagate(structs.OldNew{ - Name: f.Name, - New: intP, - Old: el.Int(), + Name: f.Name, + ParamPath: fullPath, + New: intP, + Old: el.Int(), }) } } @@ -263,12 +252,13 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato return fmt.Errorf("dynamic change of %s parameter is not allowed ", key) } - log.Println("different value, setting s ", s) + // log.Println("different value, setting s ", s) if sRoot != nil { sRoot.Propagate(structs.OldNew{ - Name: f.Name, - New: s, - Old: el.String(), + Name: f.Name, + ParamPath: fullPath, + New: s, + Old: el.String(), }) } } @@ -290,9 +280,10 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato } if sRoot != nil { sRoot.Propagate(structs.OldNew{ - Name: f.Name, - New: s, - Old: el, + Name: f.Name, + ParamPath: fullPath, + New: s, + Old: el, }) } } @@ -302,7 +293,7 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato log.Panic("unsupported slice type: ", f.Type.Elem().Kind()) } case reflect.Struct: - err := parseParam(&el, sRoot, rest, value, initial) + err := parseParam(&el, sRoot, fullPath, rest, value, initial) if err != nil { return err } @@ -313,9 +304,7 @@ func parseParam(currentSection *reflect.Value, subscribtionRoot config.Propagato } else { log.Panic("unsupported type ", currentSection.Kind()) } - } - log.Println("key", key, value, currentSection.Kind(), el) } } return nil diff --git a/cmd/dreamboat/main.go b/cmd/dreamboat/main.go index 6a164369..ed555d62 100644 --- a/cmd/dreamboat/main.go +++ b/cmd/dreamboat/main.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "os/signal" - "strings" "syscall" "time" @@ -24,10 +23,8 @@ import ( "github.com/blocknative/dreamboat/blstools" wh "github.com/blocknative/dreamboat/datastore/warehouse" "github.com/blocknative/dreamboat/metrics" + "github.com/blocknative/dreamboat/sim" "github.com/blocknative/dreamboat/sim/client/fallback" - "github.com/blocknative/dreamboat/sim/client/transport/gethhttp" - "github.com/blocknative/dreamboat/sim/client/transport/gethrpc" - "github.com/blocknative/dreamboat/sim/client/transport/gethws" "github.com/blocknative/dreamboat/stream" "github.com/google/uuid" badger "github.com/ipfs/go-ds-badger2" @@ -72,65 +69,8 @@ var ( datadir string configFile string - flagAddr string - flagInternalAddr string - - flagTimeout time.Duration - flagBeaconList string - flagBeaconPublishList string - - flagNetwork string - flagSecretKey string - flagTTL time.Duration - - flagWorkersVerify uint64 - flagWorkersStoreValidator uint64 - flagVerifyQueueSize uint64 - flagStoreQueueSize uint64 - flagPayloadCacheSize int - - flagRegistrationsCacheSize int - flagRegistrationsCacheReadTTL time.Duration - flagRegistrationsCacheWriteTTL time.Duration - - flagPublishBlock bool - - flagValidatorDatabaseUrl string - flagDataapiDatabaseUrl string - - flagAllowListedBuilderList string - - flagBlockValidationEndpointHTTP string - flagBlockValidationEndpointWSList string - flagBlockValidationWSRetry bool - flagBlockValidationRPC string - - flagDistribution bool - flagDistributionID string - - flagDistributionStreamWorkers uint64 - flagDistributionStreamServedBids bool - - flagDistributionStreamTTL time.Duration - flagDistributionStreamTopic string - flagDistributionStreamQueue int - - flagStorageReadRedisURI string - flagStorageWriteRedisURI string - flagPubsubRedisURI string - - flagGetPayloadResponseDelay time.Duration - flagGetPayloadRequestTimeLimit time.Duration - - flagWarehouse bool - flagWarehouseDir string - flagWarehouseWorkers int - flagWarehouseBuffer int - - flagBeaconEventRestart int - flagBeaconEventTimeout time.Duration - flagBeaconQueryTimeout time.Duration - flagBeaconPayloadAttributesSubscription bool + flagDistribution bool + flagWarehouse bool ) func init() { @@ -139,73 +79,8 @@ func init() { flag.StringVar(&configFile, "config", "", "configuration file needed for relay to run") flag.StringVar(&datadir, "datadir", "/tmp/relay", "data directory where blocks and validators are stored in the default datastore implementation") - flag.StringVar(&flagAddr, "addr", "localhost:18550", "server listen address") - flag.StringVar(&flagInternalAddr, "internalAddr", "0.0.0.0:19550", "internal server listen address") - flag.DurationVar(&flagTimeout, "timeout", time.Second*5, "request timeout") - flag.StringVar(&flagBeaconList, "beacon", "", "`url` for beacon endpoint") - flag.StringVar(&flagBeaconPublishList, "beacon-publish", "", "`url` for beacon endpoints that publish blocks") - - flag.StringVar(&flagNetwork, "network", "mainnet", "the networks the relay works on") - - flag.StringVar(&flagSecretKey, "secretKey", "", "secret key used to sign messages") - flag.DurationVar(&flagTTL, "ttl", 24*time.Hour, "ttl of the data") - - flag.Uint64Var(&flagWorkersVerify, "relay-workers-verify", 2000, "number of workers running verify in parallel") - - flag.Uint64Var(&flagWorkersStoreValidator, "relay-workers-store-validator", 400, "number of workers storing validators in parallel") - - flag.Uint64Var(&flagVerifyQueueSize, "relay-verify-queue-size", 100_000, "size of verify queue") - flag.Uint64Var(&flagStoreQueueSize, "relay-store-queue-size", 100_000, "size of store queue") - - flag.IntVar(&flagPayloadCacheSize, "relay-payload-cache-size", 1_000, "number of payloads to cache for fast in-memory reads") - - flag.IntVar(&flagRegistrationsCacheSize, "relay-registrations-cache-size", 600_000, "relay registrations cache size") - - flag.DurationVar(&flagRegistrationsCacheReadTTL, "relay-registrations-cache-read-ttl", time.Hour, "registrations cache ttl for reading") - flag.DurationVar(&flagRegistrationsCacheWriteTTL, "relay-registrations-cache-write-ttl", 12*time.Hour, "registrations cache ttl for writing") - - flag.BoolVar(&flagPublishBlock, "relay-publish-block", true, "relay registrations cache size") - - flag.StringVar(&flagValidatorDatabaseUrl, "relay-validator-database-url", "", "address of postgress database for validator registrations, if empty - default, badger will be used") - flag.StringVar(&flagDataapiDatabaseUrl, "relay-dataapi-database-url", "", "address of postgress database for dataapi, if empty - default, badger will be used") - - flag.StringVar(&flagAllowListedBuilderList, "relay-allow-listed-builder", "", "comma separated list of allowed builder pubkeys") - - flag.StringVar(&flagBlockValidationEndpointHTTP, "block-validation-endpoint-http", "", "http block validation endpoint address") - flag.StringVar(&flagBlockValidationEndpointWSList, "block-validation-endpoint-ws", "", "ws block validation endpoint address (comma separated list)") - flag.BoolVar(&flagBlockValidationWSRetry, "block-validation-ws-retry", false, "retry to other connection on failure") - flag.StringVar(&flagBlockValidationRPC, "block-validation-endpoint-rpc", "", "rpc block validation rawurl (eg. ipc path)") - flag.BoolVar(&flagDistribution, "relay-distribution", false, "run relay as a distributed system with multiple replicas") - flag.StringVar(&flagDistributionID, "relay-distribution-id", "", "the id of the relay to differentiate from other replicas") - flag.Uint64Var(&flagDistributionStreamWorkers, "relay-distribution-stream-workers", 100, "number of workers publishing and processing subscriptions in the stream") - - flag.BoolVar(&flagDistributionStreamServedBids, "relay-distribution-stream-served-bids", true, "stream entire block for every bid that is served in GetHeader requests.") - - flag.DurationVar(&flagDistributionStreamTTL, "relay-distribution-stream-ttl", time.Minute, "TTL of the data that is distributed") - - flag.StringVar(&flagDistributionStreamTopic, "relay-distribution-stream-topic", "relay", "Pubsub topic for streaming payloads") - flag.IntVar(&flagDistributionStreamQueue, "relay-distribution-stream-queue", 100, "Pubsub publish queue size") - - flag.StringVar(&flagStorageReadRedisURI, "relay-storage-read-redis-uri", "", "Redis Storage URI for read replica") - flag.StringVar(&flagStorageWriteRedisURI, "relay-storage-write-redis-uri", "", "Redis Storage URI for write") - flag.StringVar(&flagPubsubRedisURI, "relay-pubsub-redis-uri", "", "Redis Pub/Sub URI") - - flag.DurationVar(&flagGetPayloadResponseDelay, "getpayload-response-delay", 1*time.Second, "Delay between block publication and returning request to validator") - flag.DurationVar(&flagGetPayloadRequestTimeLimit, "getpayload-request-time-limit", 4*time.Second, "Time allowed for GetPayload requests since the slot started") - flag.BoolVar(&flagWarehouse, "warehouse", true, "Enable warehouse storage of data") - - flag.StringVar(&flagWarehouseDir, "warehouse-dir", "/data/relay/warehouse", "Data directory where the data is stored in the warehouse") - - flag.IntVar(&flagWarehouseWorkers, "warehouse-workers", 32, "Number of workers for storing data in warehouse, if 0, then data is not exported") - flag.IntVar(&flagWarehouseBuffer, "warehouse-buffer", 1_000, "Size of the buffer for processing requests") - - flag.DurationVar(&flagBeaconEventTimeout, "beacon-event-timeout", 16*time.Second, "The maximum time allowed to wait for head events from the beacon, we recommend setting it to 'durationPerSlot * 1.25'") - flag.IntVar(&flagBeaconEventRestart, "beacon-event-restart", 5, "The number of consecutive timeouts allowed before restarting the head event subscription") - flag.DurationVar(&flagBeaconQueryTimeout, "beacon-query-timeout", 20*time.Second, "The maximum time allowed to wait for a response from the beacon") - flag.BoolVar(&flagBeaconPayloadAttributesSubscription, "beacon-payload-attributes-subscription", true, "instead of polling withdrawals+prevRandao, use SSE event (requires Prysm v4+)") - flag.Parse() } @@ -233,9 +108,9 @@ func main() { } chainCfg := config.NewChainConfig() - chainCfg.LoadNetwork(flagNetwork) + chainCfg.LoadNetwork(cfg.Relay.Network) if chainCfg.GenesisForkVersion == "" { - if err := chainCfg.ReadNetworkConfig(datadir, flagNetwork); err != nil { + if err := chainCfg.ReadNetworkConfig(datadir, cfg.Relay.Network); err != nil { logger.WithError(err).Fatal("failed read chain configuration") return } @@ -261,17 +136,30 @@ func main() { return } + state := &beacon.MultiSlotState{} + if flagDistribution { readClient := redis.NewClient(&redis.Options{ - Addr: flagStorageReadRedisURI, + Addr: cfg.Payload.Redis.Read.Address, }) m.RegisterRedis("redis", "readreplica", readClient) writeClient := redis.NewClient(&redis.Options{ - Addr: flagStorageWriteRedisURI, + Addr: cfg.Payload.Redis.Write.Address, }) m.RegisterRedis("redis", "master", writeClient) storage = &dsRedis.RedisDatastore{Read: readClient, Write: writeClient} + + redisClient := redis.NewClient(&redis.Options{ + Addr: cfg.Distributed.Redis.Address, + }) + + m.RegisterRedis("redis", "stream", redisClient) + streamer, err = initStreamer(ctx, cfg.Distributed, redisClient, logger, m, state) + if err != nil { + logger.WithError(err).Error("fail to create streamer") + return + } } else { storage = badgerDs } @@ -282,78 +170,47 @@ func main() { }).Info("data store initialized") timeRelayStart := time.Now() - state := &beacon.MultiSlotState{} ds := datastore.NewDatastore(storage, badgerDs.DB) if err != nil { logger.WithError(err).Error("failed to create datastore") return } - if flagDistribution { - redisClient := redis.NewClient(&redis.Options{ - Addr: flagPubsubRedisURI, - }) - - m.RegisterRedis("redis", "stream", redisClient) - streamer, err = initStreamer(ctx, redisClient, logger, m, state) - if err != nil { - logger.WithError(err).Error("fail to create streamer") - return - } - } - beaconConfig := bcli.BeaconConfig{ - BeaconEventTimeout: flagBeaconEventTimeout, - BeaconEventRestart: flagBeaconEventRestart, - BeaconQueryTimeout: flagBeaconQueryTimeout, + BeaconEventTimeout: cfg.Beacon.EventTimeout, + BeaconEventRestart: cfg.Beacon.EventRestart, + BeaconQueryTimeout: cfg.Beacon.QueryTimeout, } - beaconCli, err := initBeaconClients(logger, strings.Split(flagBeaconList, ","), m, beaconConfig) + beaconCli, err := initBeaconClients(logger, cfg.Beacon.Addresses, m, beaconConfig) if err != nil { logger.WithError(err).Error("fail to initialize beacon") return } - beaconPubCli, err := initBeaconClients(logger, strings.Split(flagBeaconPublishList, ","), m, beaconConfig) + beaconPubCli, err := initBeaconClients(logger, cfg.Beacon.PublishAddresses, m, beaconConfig) if err != nil { logger.WithError(err).Error("fail to initialize publish beacon") return } - // SIM Client simFallb := fallback.NewFallback() simFallb.AttachMetrics(m) - if simHttpAddr := flagBlockValidationRPC; simHttpAddr != "" { - simRPCCli := gethrpc.NewClient(gethSimNamespace, simHttpAddr) - if err := simRPCCli.Dial(ctx); err != nil { - logger.WithError(err).WithField("address", simHttpAddr).Error("fail to initialize rpc connection") - return - } - simFallb.AddClient(simRPCCli) - } - - if simWSAddr := flagBlockValidationEndpointWSList; simWSAddr != "" { - simWSConn := gethws.NewReConn(logger) - for _, s := range strings.Split(simWSAddr, ",") { - input := make(chan []byte, 1000) - go simWSConn.KeepConnection(s, input) - } - simWSCli := gethws.NewClient(simWSConn, gethSimNamespace, flagBlockValidationWSRetry, logger) - simFallb.AddClient(simWSCli) - } - if simHttpAddr := flagBlockValidationEndpointHTTP; simHttpAddr != "" { - simHTTPCli := gethhttp.NewClient(simHttpAddr, gethSimNamespace, logger) - simFallb.AddClient(simHTTPCli) + simManager := sim.NewManager(logger, simFallb) + simManager.AddRPCClient(ctx, cfg.BlockSimulation.RPC.Address) + simManager.AddHTTPClient(ctx, cfg.BlockSimulation.HTTP.Address) + for _, addr := range cfg.BlockSimulation.WS.Address { + simManager.AddWsClients(ctx, addr, cfg.BlockSimulation.WS.Retry) } - verificator := verify.NewVerificationManager(logger, uint(flagVerifyQueueSize)) - verificator.RunVerify(uint(flagWorkersVerify)) + verificator := verify.NewVerificationManager(logger, cfg.Verify.QueueSize) + verificator.RunVerify(uint(cfg.Verify.WorkersNum)) // VALIDATOR MANAGEMENT var valDS ValidatorStore - if flagValidatorDatabaseUrl != "" { - valPG, err := trPostgres.Open(flagValidatorDatabaseUrl, 10, 10, 10*time.Second) + if cfg.Validators.DB.URL != "" { + valPG, err := trPostgres.Open(cfg.Validators.DB.URL, 10, 10, 10*time.Second) if err != nil { logger.WithError(err).Error("failed to connect to validator database") return @@ -361,10 +218,10 @@ func main() { m.RegisterDB(valPG, "registrations") valDS = valPostgres.NewDatastore(valPG) } else { // by default use existsing storage - valDS = valBadger.NewDatastore(storage, flagTTL) + valDS = valBadger.NewDatastore(storage, cfg.Validators.Badger.TTL) } - validatorCache, err := lru.New[types.PublicKey, structs.ValidatorCacheEntry](flagRegistrationsCacheSize) + validatorCache, err := lru.New[types.PublicKey, structs.ValidatorCacheEntry](cfg.Validators.RegistrationsCacheSize) if err != nil { logger.WithError(err).Error("fail to initialize validator cache") return @@ -372,8 +229,8 @@ func main() { // DATAAPI var daDS relay.DataAPIStore - if flagDataapiDatabaseUrl != "" { - valPG, err := trPostgres.Open(flagDataapiDatabaseUrl, 10, 10, 10*time.Second) // TODO(l): make configurable + if cfg.DataAPI.DB.URL != "" { + valPG, err := trPostgres.Open(cfg.DataAPI.DB.URL, 10, 10, 10*time.Second) // TODO(l): make configurable if err != nil { logger.WithError(err).Error("failed to connect to dataapi database") } @@ -384,16 +241,16 @@ func main() { defer valPG.Close() } else { // by default use badger - daDS = daBadger.NewDatastore(storage, badgerDs.DB, flagTTL) + daDS = daBadger.NewDatastore(storage, badgerDs.DB, cfg.DataAPI.Badger.TTL) } // lazyload validators cache, it's optional and we don't care if it errors out go preloadValidators(ctx, logger, valDS, validatorCache) - validatorStoreManager := validators.NewStoreManager(logger, validatorCache, valDS, flagRegistrationsCacheWriteTTL, uint(flagStoreQueueSize)) + validatorStoreManager := validators.NewStoreManager(logger, validatorCache, valDS, cfg.Validators.RegistrationsWriteCacheTTL, cfg.Validators.QueueSize) validatorStoreManager.AttachMetrics(m) - if flagWorkersStoreValidator > 0 { - validatorStoreManager.RunStore(uint(flagWorkersStoreValidator)) + if cfg.Validators.StoreWorkersNum > 0 { + validatorStoreManager.RunStore(uint(cfg.Validators.StoreWorkersNum)) } domainBuilder, err := ComputeDomain(types.DomainTypeAppBuilder, chainCfg.GenesisForkVersion, types.Root{}.String()) @@ -407,25 +264,10 @@ func main() { b := beacon.NewManager(logger, beacon.Config{ BellatrixForkVersion: chainCfg.BellatrixForkVersion, CapellaForkVersion: chainCfg.CapellaForkVersion, - RunPayloadAttributesSubscription: flagBeaconPayloadAttributesSubscription, + RunPayloadAttributesSubscription: cfg.Beacon.PayloadAttributesSubscription, }) - auctioneer := auction.NewAuctioneer() - - var allowed map[[48]byte]struct{} - if flagAllowListedBuilderList != "" { - allowed = make(map[[48]byte]struct{}) - for _, k := range strings.Split(flagAllowListedBuilderList, ",") { - var pk types.PublicKey - if err := pk.UnmarshalText([]byte(k)); err != nil { - logger.WithError(err).With(log.F{"key": k}).Error("ALLOWED BUILDER NOT ADDED - wrong public key") - continue - } - allowed[pk] = struct{}{} - } - } - - skBytes, err := hexutil.Decode(flagSecretKey) + skBytes, err := hexutil.Decode(cfg.Relay.SecretKey) if err != nil { logger.WithError(err).Error("fail to decode secretKey") return @@ -450,14 +292,13 @@ func main() { var relayWh *wh.Warehouse if flagWarehouse { - warehouse := wh.NewWarehouse(logger, flagWarehouseBuffer) - - if err := os.MkdirAll(flagWarehouseDir, 0755); err != nil { + warehouse := wh.NewWarehouse(logger, cfg.Warehouse.Buffer) + if err := os.MkdirAll(cfg.Warehouse.Directory, 0755); err != nil { logger.WithError(err).Error("failed to create datadir") return } - if err := warehouse.RunParallel(ctx, flagWarehouseDir, flagWarehouseWorkers); err != nil { + if err := warehouse.RunParallel(ctx, cfg.Warehouse.Directory, cfg.Warehouse.WorkerNumber); err != nil { logger.WithError(err).Error("failed to run data exporter") return } @@ -466,39 +307,44 @@ func main() { logger.With(log.F{ "subService": "warehouse", - "datadir": flagWarehouseDir, - "workers": flagWarehouseWorkers, + "datadir": cfg.Warehouse.Directory, + "workers": cfg.Warehouse.WorkerNumber, }).Info("initialized") relayWh = warehouse } - payloadCache, err := structs.NewMultiSlotPayloadCache(flagPayloadCacheSize) + payloadCache, err := structs.NewMultiSlotPayloadCache(cfg.Payload.CacheSize) if err != nil { logger.WithError(err).Error("fail to initialize stream cache") return } - r := relay.NewRelay(logger, relay.RelayConfig{ + rCfg := relay.RelayConfig{ BuilderSigningDomain: domainBuilder, - GetPayloadResponseDelay: flagGetPayloadResponseDelay, - GetPayloadRequestTimeLimit: flagGetPayloadRequestTimeLimit, + GetPayloadResponseDelay: cfg.Relay.GetPayloadResponseDelay, + GetPayloadRequestTimeLimit: cfg.Relay.GetPayloadRequestTimeLimit, ProposerSigningDomain: map[structs.ForkVersion]types.Domain{ structs.ForkBellatrix: bellatrixBeaconProposer, structs.ForkCapella: capellaBeaconProposer}, - PubKey: pk, - SecretKey: sk, - RegistrationCacheTTL: flagRegistrationsCacheReadTTL, - TTL: flagTTL, - AllowedListedBuilders: allowed, - PublishBlock: flagPublishBlock, - Distributed: flagDistribution, - StreamServedBids: flagDistributionStreamServedBids, - }, beaconPubCli, validatorCache, valDS, verificator, state, payloadCache, ds, daDS, auctioneer, simFallb, relayWh, streamer) + PubKey: pk, + SecretKey: sk, + RegistrationCacheTTL: cfg.Validators.RegistrationsReadCacheTTL, + PayloadDataTTL: cfg.Payload.TTL, + PublishBlock: cfg.Relay.PublishBlock, + Distributed: flagDistribution, + StreamServedBids: cfg.Distributed.StreamServedBids, + } + rCfg.ParseInitialConfig(cfg.Relay.AllowedBuilders) + cfg.Relay.SubscribeForUpdates(&rCfg) + + auctioneer := auction.NewAuctioneer() + r := relay.NewRelay(logger, rCfg, beaconPubCli, validatorCache, valDS, verificator, + state, payloadCache, ds, daDS, auctioneer, simFallb, relayWh, streamer) r.AttachMetrics(m) if flagDistribution { - r.RunSubscribersParallel(ctx, uint(flagDistributionStreamWorkers)) + r.RunSubscribersParallel(ctx, uint(cfg.Distributed.WorkerNumber)) } ee := &api.EnabledEndpoints{ @@ -509,7 +355,8 @@ func main() { iApi := inner.NewAPI(ee, ds) limitterCache, _ := lru.New[[48]byte, *rate.Limiter](cfg.Api.LimitterCacheSize) - apiLimitter := api.NewLimitter(cfg.Api.SubmissionLimitRate, cfg.Api.SubmissionLimitBurst, limitterCache, allowed) + apiLimitter := api.NewLimitter(cfg.Api.SubmissionLimitRate, cfg.Api.SubmissionLimitBurst, limitterCache) + apiLimitter.ParseInitialConfig(cfg.Api.AllowedBuilders) cfg.Api.SubscribeForUpdates(apiLimitter) a := api.NewApi(logger, ee, r, validatorRelay, state, apiLimitter, cfg.Api.DataLimit, cfg.Api.ErrorsOnDisable) @@ -538,7 +385,7 @@ func main() { internalMux.Handle("/metrics", m.Handler()) logger.Info("internal server listening") internalSrv := http.Server{ - Addr: flagInternalAddr, + Addr: cfg.InternalHttp.Address, Handler: internalMux, } if err = internalSrv.ListenAndServe(); err == http.ErrServerClosed { @@ -551,10 +398,10 @@ func main() { a.AttachToHandler(mux) srv := &http.Server{ - Addr: flagAddr, - ReadTimeout: flagTimeout, - WriteTimeout: flagTimeout, - IdleTimeout: flagTimeout, + Addr: cfg.ExternalHttp.Address, + ReadTimeout: cfg.ExternalHttp.ReadTimeout, + WriteTimeout: cfg.ExternalHttp.WriteTimeout, + IdleTimeout: cfg.ExternalHttp.IdleTimeout, Handler: mux, MaxHeaderBytes: 4096, } @@ -660,12 +507,12 @@ func ComputeDomain(domainType types.DomainType, forkVersionHex string, genesisVa return types.ComputeDomain(domainType, forkVersion, genesisValidatorsRoot), nil } -func initStreamer(ctx context.Context, redisClient *redis.Client, l log.Logger, m *metrics.Metrics, st stream.State) (relay.Streamer, error) { +func initStreamer(ctx context.Context, cfg *config.DistributedConfig, redisClient *redis.Client, l log.Logger, m *metrics.Metrics, st stream.State) (relay.Streamer, error) { timeStreamStart := time.Now() pubsub := &redisStream.Pubsub{Redis: redisClient, Logger: l} - id := flagDistributionID + id := cfg.InstanceID if id == "" { id = uuid.NewString() } @@ -673,15 +520,14 @@ func initStreamer(ctx context.Context, redisClient *redis.Client, l log.Logger, streamConfig := stream.StreamConfig{ Logger: l, ID: id, - TTL: flagDistributionStreamTTL, - PubsubTopic: flagDistributionStreamTopic, - StreamQueueSize: flagDistributionStreamQueue, + PubsubTopic: cfg.Redis.Topic, + StreamQueueSize: cfg.StreamQueueSize, } redisStreamer := stream.NewClient(pubsub, st, streamConfig) redisStreamer.AttachMetrics(m) - if err := redisStreamer.RunSubscriberParallel(ctx, uint(flagDistributionStreamWorkers)); err != nil { + if err := redisStreamer.RunSubscriberParallel(ctx, uint(cfg.WorkerNumber)); err != nil { return nil, fmt.Errorf("fail to start stream subscriber: %w", err) } diff --git a/datastore/evidence/badger/payload.go b/datastore/evidence/badger/payload.go index ffa5a421..79565e7b 100644 --- a/datastore/evidence/badger/payload.go +++ b/datastore/evidence/badger/payload.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "time" "github.com/blocknative/dreamboat/structs" "github.com/dgraph-io/badger/v2" @@ -33,7 +32,7 @@ func DeliveredPubkeyKey(pk types.PublicKey) ds.Key { return ds.NewKey(fmt.Sprintf("delivered-pk-%s", pk.String())) } -func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, trace structs.DeliveredTrace, ttl time.Duration) error { +func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, trace structs.DeliveredTrace) error { data, err := json.Marshal(trace.Trace) if err != nil { return err @@ -41,35 +40,22 @@ func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, trace s txn := s.DBInter.NewTransaction(true) defer txn.Discard() - if err := txn.SetEntry(badger.NewEntry(DeliveredHashKey(trace.Trace.BlockHash).Bytes(), DeliveredKey(slot).Bytes()).WithTTL(ttl)); err != nil { + if err := txn.SetEntry(badger.NewEntry(DeliveredHashKey(trace.Trace.BlockHash).Bytes(), DeliveredKey(slot).Bytes()).WithTTL(s.TTL)); err != nil { return err } - if err := txn.SetEntry(badger.NewEntry(DeliveredNumKey(trace.BlockNumber).Bytes(), DeliveredKey(slot).Bytes()).WithTTL(ttl)); err != nil { + if err := txn.SetEntry(badger.NewEntry(DeliveredNumKey(trace.BlockNumber).Bytes(), DeliveredKey(slot).Bytes()).WithTTL(s.TTL)); err != nil { return err } - if err := txn.SetEntry(badger.NewEntry(DeliveredPubkeyKey(trace.Trace.ProposerPubkey).Bytes(), DeliveredKey(slot).Bytes()).WithTTL(ttl)); err != nil { + if err := txn.SetEntry(badger.NewEntry(DeliveredPubkeyKey(trace.Trace.ProposerPubkey).Bytes(), DeliveredKey(slot).Bytes()).WithTTL(s.TTL)); err != nil { return err } - if err := txn.SetEntry(badger.NewEntry(DeliveredKey(slot).Bytes(), data).WithTTL(ttl)); err != nil { + if err := txn.SetEntry(badger.NewEntry(DeliveredKey(slot).Bytes(), data).WithTTL(s.TTL)); err != nil { return err } return txn.Commit() } -/* -func (s *Datastore) CheckSlotDelivered(ctx context.Context, slot uint64) (bool, error) { - tx := s.DBInter.NewTransaction(false) - defer tx.Discard() - - _, err := tx.Get(DeliveredKey(structs.Slot(slot)).Bytes()) - if err == badger.ErrKeyNotFound { - return false, nil - } - return (err == nil), err -} -*/ - func (s *Datastore) GetDeliveredPayloads(ctx context.Context, w io.Writer, headSlot uint64, query structs.PayloadTraceQuery) error { var ( key ds.Key diff --git a/datastore/evidence/badger/payload_test.go b/datastore/evidence/badger/payload_test.go index fdfb311b..072dfb52 100644 --- a/datastore/evidence/badger/payload_test.go +++ b/datastore/evidence/badger/payload_test.go @@ -90,7 +90,7 @@ func TestPutGetHeaderDelivered2(t *testing.T) { require.Len(t, gotHeader, 0) // set as delivered and retrieve again - err = d.PutDelivered(ctx, slot, dt, time.Minute) + err = d.PutDelivered(ctx, slot, dt) require.NoError(t, err) buf.Reset() diff --git a/datastore/evidence/postgres/payload.go b/datastore/evidence/postgres/payload.go index e0299e03..e32c2d57 100644 --- a/datastore/evidence/postgres/payload.go +++ b/datastore/evidence/postgres/payload.go @@ -8,12 +8,11 @@ import ( "io" "strconv" "strings" - "time" "github.com/blocknative/dreamboat/structs" ) -func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, payload structs.DeliveredTrace, ttl time.Duration) (err error) { +func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, payload structs.DeliveredTrace) (err error) { _, err = s.DB.ExecContext(ctx, `INSERT INTO payload_delivered ( relay_id, slot, epoch, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, num_tx, block_number, gas_used, gas_limit, value ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) diff --git a/relay/config.go b/relay/config.go new file mode 100644 index 00000000..7f05d72c --- /dev/null +++ b/relay/config.go @@ -0,0 +1,79 @@ +package relay + +import ( + "fmt" + "time" + + "github.com/blocknative/dreamboat/structs" + "github.com/flashbots/go-boost-utils/bls" + "github.com/flashbots/go-boost-utils/types" +) + +type RelayConfig struct { + BuilderSigningDomain types.Domain + ProposerSigningDomain map[structs.ForkVersion]types.Domain + PubKey types.PublicKey + SecretKey *bls.SecretKey + + GetPayloadResponseDelay time.Duration + GetPayloadRequestTimeLimit time.Duration + + PayloadDataTTL time.Duration + RegistrationCacheTTL time.Duration + + Distributed, StreamServedBids, PublishBlock bool + + AllowedListedBuilders map[[48]byte]struct{} +} + +func (rc *RelayConfig) ParseInitialConfig(keys []string) (err error) { + rc.AllowedListedBuilders, err = makeKeyMap(keys) + return err + +} + +func (rc *RelayConfig) OnConfigChange(c structs.OldNew) (err error) { + switch c.Name { + case "PublishBlock": + if b, ok := c.New.(bool); ok { + rc.PublishBlock = b + } + + case "StreamServedBids": + if b, ok := c.New.(bool); ok { + rc.PublishBlock = b + } + + case "GetPayloadResponseDelay": + if dur, ok := c.New.(time.Duration); ok { + rc.GetPayloadResponseDelay = dur + } + + case "GetPayloadRequestTimeLimit": + if dur, ok := c.New.(time.Duration); ok { + rc.GetPayloadRequestTimeLimit = dur + } + + case "AllowedBuilders": + if keys, ok := c.New.([]string); ok { + ab, err := makeKeyMap(keys) + if err != nil { + return err + } + rc.AllowedListedBuilders = ab + } + } + return nil +} + +func makeKeyMap(keys []string) (map[[48]byte]struct{}, error) { + newKeys := make(map[[48]byte]struct{}) + for _, key := range keys { + var pk types.PublicKey + if err := pk.UnmarshalText([]byte(key)); err != nil { + return nil, fmt.Errorf("allowed builder not added - wrong public key: %s - %w", key, err) + } + newKeys[pk] = struct{}{} + } + return newKeys, nil +} diff --git a/relay/mocks/mocks.go b/relay/mocks/mocks.go index 15722e0d..3e0ecb67 100644 --- a/relay/mocks/mocks.go +++ b/relay/mocks/mocks.go @@ -83,17 +83,17 @@ func (mr *MockDataAPIStoreMockRecorder) PutBuilderBlockSubmission(arg0, arg1, ar } // PutDelivered mocks base method. -func (m *MockDataAPIStore) PutDelivered(arg0 context.Context, arg1 structs.Slot, arg2 structs.DeliveredTrace, arg3 time.Duration) error { +func (m *MockDataAPIStore) PutDelivered(arg0 context.Context, arg1 structs.Slot, arg2 structs.DeliveredTrace) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutDelivered", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "PutDelivered", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // PutDelivered indicates an expected call of PutDelivered. -func (mr *MockDataAPIStoreMockRecorder) PutDelivered(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockDataAPIStoreMockRecorder) PutDelivered(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutDelivered", reflect.TypeOf((*MockDataAPIStore)(nil).PutDelivered), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutDelivered", reflect.TypeOf((*MockDataAPIStore)(nil).PutDelivered), arg0, arg1, arg2) } // MockDatastore is a mock of Datastore interface. diff --git a/relay/relay.go b/relay/relay.go index fa2cb68c..346d9d99 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/flashbots/go-boost-utils/bls" "github.com/flashbots/go-boost-utils/types" "github.com/lthibault/log" @@ -80,8 +79,7 @@ type Verifier interface { } type DataAPIStore interface { - //CheckSlotDelivered(context.Context, uint64) (bool, error) - PutDelivered(context.Context, structs.Slot, structs.DeliveredTrace, time.Duration) error + PutDelivered(context.Context, structs.Slot, structs.DeliveredTrace) error GetDeliveredPayloads(ctx context.Context, w io.Writer, headSlot uint64, queryArgs structs.PayloadTraceQuery) error PutBuilderBlockSubmission(ctx context.Context, bid structs.BidTraceWithTimestamp, isMostProfitable bool) (err error) @@ -120,25 +118,6 @@ type Warehouse interface { StoreAsync(ctx context.Context, req wh.StoreRequest) error } -type RelayConfig struct { - BuilderSigningDomain types.Domain - ProposerSigningDomain map[structs.ForkVersion]types.Domain - PubKey types.PublicKey - SecretKey *bls.SecretKey - GetPayloadResponseDelay time.Duration - GetPayloadRequestTimeLimit time.Duration - - AllowedListedBuilders map[[48]byte]struct{} - - PublishBlock bool - - TTL time.Duration - - RegistrationCacheTTL time.Duration - - Distributed, StreamServedBids bool -} - type Relay struct { d Datastore pc PayloadCache @@ -200,10 +179,6 @@ func (rs *Relay) RunSubscribersParallel(ctx context.Context, num uint) { } } -func (rs *Relay) runSlotDeliveredSubscriber(ctx context.Context) error { - return nil // TODO -} - func (rs *Relay) runSubscriberBlockCache(ctx context.Context) error { for { select { @@ -663,7 +638,7 @@ func (rs *Relay) storeTraceDelivered(logger log.Logger, slot uint64, payload str return } - if err := rs.das.PutDelivered(context.Background(), structs.Slot(slot), trace, rs.config.TTL); err != nil { + if err := rs.das.PutDelivered(context.Background(), structs.Slot(slot), trace); err != nil { logger.WithField("event", "evidence_failure").WithError(err).Warn("failed to set payload after delivery") return } diff --git a/relay/submit.go b/relay/submit.go index dd4e9a68..3fdff958 100644 --- a/relay/submit.go +++ b/relay/submit.go @@ -237,7 +237,7 @@ func (rs *Relay) storeSubmission(ctx context.Context, logger log.Logger, m *stru tPutPayload := time.Now() - if err := rs.d.PutPayload(context.Background(), sbr.ToPayloadKey(), complete.Payload, rs.config.TTL); err != nil { + if err := rs.d.PutPayload(context.Background(), sbr.ToPayloadKey(), complete.Payload, rs.config.PayloadDataTTL); err != nil { return false, fmt.Errorf("%w block as payload: %s", ErrStore, err.Error()) // TODO: multiple err wrapping in Go 1.20 } m.AppendSince(tPutPayload, "submitBlock", "putPayload") diff --git a/relay/submit_test.go b/relay/submit_test.go index 74ac1762..fd5a0d47 100644 --- a/relay/submit_test.go +++ b/relay/submit_test.go @@ -131,7 +131,7 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe require.NoError(t, err) log.Debug(contents) - ds.EXPECT().PutPayload(context.Background(), submitRequest.ToPayloadKey(), contents.Payload, conf.TTL).Return(nil) + ds.EXPECT().PutPayload(context.Background(), submitRequest.ToPayloadKey(), contents.Payload, conf.PayloadDataTTL).Return(nil) var bl structs.BuilderBidExtended a.EXPECT().AddBlock(gomock.Any()).Times(1).DoAndReturn(func(block structs.BuilderBidExtended) bool { @@ -174,7 +174,7 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe pc.EXPECT().Get(submitRequest.ToPayloadKey()).Times(1) ds.EXPECT().GetPayload(gomock.Any(), fork, submitRequest.ToPayloadKey()).Return(contents.Payload, nil).Times(1) // state.EXPECT().ForkVersion(structs.Slot(submitRequest.Slot())).Times(1).Return(fork) - das.EXPECT().PutDelivered(gomock.Any(), structs.Slot(submitRequest.Slot()), gomock.Any(), conf.TTL).Times(1) + das.EXPECT().PutDelivered(gomock.Any(), structs.Slot(submitRequest.Slot()), gomock.Any()).Times(1) wh.EXPECT().StoreAsync(gomock.Any(), gomock.Any()).Times(1) diff --git a/sim/client/client.go b/sim/client/client.go index da13c8ef..857fd4ba 100644 --- a/sim/client/client.go +++ b/sim/client/client.go @@ -1 +1,14 @@ package client + +import ( + "context" + + "github.com/blocknative/dreamboat/sim/client/types" +) + +type Client interface { + ValidateBlock(ctx context.Context, block *types.BuilderBlockValidationRequest) (err error) + ValidateBlockV2(ctx context.Context, block *types.BuilderBlockValidationRequestV2) (err error) + Kind() string + ID() string +} diff --git a/sim/client/fallback/fallback.go b/sim/client/fallback/fallback.go index ae3c591d..1d8c9b2c 100644 --- a/sim/client/fallback/fallback.go +++ b/sim/client/fallback/fallback.go @@ -8,15 +8,12 @@ import ( "github.com/blocknative/dreamboat/sim/client/types" ) -type Client interface { - ValidateBlock(ctx context.Context, block *types.BuilderBlockValidationRequest) (err error) - ValidateBlockV2(ctx context.Context, block *types.BuilderBlockValidationRequestV2) (err error) - Kind() string -} - type Fallback struct { - clients []Client - m Metrics + clientsRPC []sim.Client + clientsWS []sim.Client + clientsHTTP []sim.Client + atLeastOne bool + m Metrics } func NewFallback() *Fallback { @@ -26,76 +23,152 @@ func NewFallback() *Fallback { } func (f *Fallback) IsSet() bool { - return len(f.clients) > 0 + return f.atLeastOne +} + +func (f *Fallback) AddClient(cli sim.Client) { + switch cli.Kind() { + case "ws": + f.clientsWS = addClient(f.clientsWS, cli) + case "http": + f.clientsHTTP = addClient(f.clientsHTTP, cli) + case "rpc": + f.clientsRPC = addClient(f.clientsRPC, cli) + } + f.atLeastOne = true } -func (f *Fallback) AddClient(cli Client) { - f.clients = append(f.clients, cli) +func (f *Fallback) RemoveClient(kind string, id string) { + switch kind { + case "ws": + f.clientsWS = removeClient(f.clientsWS, id) + case "http": + f.clientsHTTP = removeClient(f.clientsHTTP, id) + case "rpc": + f.clientsRPC = removeClient(f.clientsRPC, id) + } + if len(f.clientsWS) == 0 && len(f.clientsHTTP) == 0 && len(f.clientsRPC) == 0 { + f.atLeastOne = false + } +} + +func addClient(cSlice []sim.Client, cli sim.Client) []sim.Client { + for _, c := range cSlice { + if c.ID() == cli.ID() { + return cSlice + } + } + return append(cSlice, cli) } -func (f *Fallback) Len() int { - return len(f.clients) +func removeClient(cSlice []sim.Client, id string) []sim.Client { + for i, c := range cSlice { + if c.ID() == id { + return append(cSlice[:i], cSlice[i+1:]...) + } + } + return cSlice } func (f *Fallback) ValidateBlock(ctx context.Context, block *types.BuilderBlockValidationRequest) (err error) { - if len(f.clients) == 0 { + if !f.atLeastOne { f.m.ServedFrom.WithLabelValues("none", "error").Inc() return sim.ErrNotFound } - for _, c := range f.clients { - if ctx.Err() != nil { - f.m.ServedFrom.WithLabelValues(c.Kind(), "ctx").Inc() - return ctx.Err() - } - err = c.ValidateBlock(ctx, block) - if err == nil { - f.m.ServedFrom.WithLabelValues(c.Kind(), "ok").Inc() - return + var keepTrying bool + for _, c := range f.clientsRPC { + err, keepTrying = f.validateBlock(ctx, c, block) + if !keepTrying { + return err } + } - if !(errors.Is(err, sim.ErrNotFound) || errors.Is(err, sim.ErrConnectionFailure)) { - f.m.ServedFrom.WithLabelValues(c.Kind(), "error").Inc() + for _, c := range f.clientsWS { + err, keepTrying = f.validateBlock(ctx, c, block) + if !keepTrying { return err } + } - f.m.ServedFrom.WithLabelValues(c.Kind(), "fallback").Inc() + for _, c := range f.clientsHTTP { + err, keepTrying = f.validateBlock(ctx, c, block) + if !keepTrying { + return err + } } + f.m.ServedFrom.WithLabelValues("all", "fatal").Inc() return err } +func (f *Fallback) validateBlock(ctx context.Context, c sim.Client, block *types.BuilderBlockValidationRequest) (err error, keepTrying bool) { + if ctx.Err() != nil { + f.m.ServedFrom.WithLabelValues(c.Kind(), "ctx").Inc() + return ctx.Err(), false + } + err = c.ValidateBlock(ctx, block) + if err == nil { + f.m.ServedFrom.WithLabelValues(c.Kind(), "ok").Inc() + return + } + + if !(errors.Is(err, sim.ErrNotFound) || errors.Is(err, sim.ErrConnectionFailure)) { + f.m.ServedFrom.WithLabelValues(c.Kind(), "error").Inc() + return err, false + } + + f.m.ServedFrom.WithLabelValues(c.Kind(), "fallback").Inc() + return err, true +} + func (f *Fallback) ValidateBlockV2(ctx context.Context, block *types.BuilderBlockValidationRequestV2) (err error) { - if len(f.clients) == 0 { + if !f.atLeastOne { f.m.ServedFrom.WithLabelValues("none", "error").Inc() return sim.ErrNotFound } - for _, c := range f.clients { - if ctx.Err() != nil { - f.m.ServedFrom.WithLabelValues(c.Kind(), "ctx").Inc() - return ctx.Err() - } - err = c.ValidateBlockV2(ctx, block) - if err == nil { - f.m.ServedFrom.WithLabelValues(c.Kind(), "ok").Inc() - return + var keepTrying bool + for _, c := range f.clientsRPC { + err, keepTrying = f.validateBlockV2(ctx, c, block) + if !keepTrying { + return err } + } - if ctx.Err() != nil { - f.m.ServedFrom.WithLabelValues(c.Kind(), "ctx").Inc() - return ctx.Err() - } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - f.m.ServedFrom.WithLabelValues(c.Kind(), "ctx").Inc() + for _, c := range f.clientsWS { + err, keepTrying = f.validateBlockV2(ctx, c, block) + if !keepTrying { return err } + } - if !(errors.Is(err, sim.ErrNotFound) || errors.Is(err, sim.ErrConnectionFailure)) { - f.m.ServedFrom.WithLabelValues(c.Kind(), "error").Inc() + for _, c := range f.clientsHTTP { + err, keepTrying = f.validateBlockV2(ctx, c, block) + if !keepTrying { return err } - - f.m.ServedFrom.WithLabelValues(c.Kind(), "fallback").Inc() } + + f.m.ServedFrom.WithLabelValues("all", "fatal").Inc() return err } + +func (f *Fallback) validateBlockV2(ctx context.Context, c sim.Client, block *types.BuilderBlockValidationRequestV2) (err error, keepTrying bool) { + if ctx.Err() != nil { + f.m.ServedFrom.WithLabelValues(c.Kind(), "ctx").Inc() + return ctx.Err(), false + } + err = c.ValidateBlockV2(ctx, block) + if err == nil { + f.m.ServedFrom.WithLabelValues(c.Kind(), "ok").Inc() + return + } + + if !(errors.Is(err, sim.ErrNotFound) || errors.Is(err, sim.ErrConnectionFailure)) { + f.m.ServedFrom.WithLabelValues(c.Kind(), "error").Inc() + return err, false + } + + f.m.ServedFrom.WithLabelValues(c.Kind(), "fallback").Inc() + return err, true +} diff --git a/sim/client/transport/gethhttp/gethhttp.go b/sim/client/transport/gethhttp/gethhttp.go index d87f6e94..aff869e9 100644 --- a/sim/client/transport/gethhttp/gethhttp.go +++ b/sim/client/transport/gethhttp/gethhttp.go @@ -28,6 +28,10 @@ func NewClient(address string, namespace string, l log.Logger) *Client { } } +func (c *Client) ID() string { + return c.address +} + func (c *Client) Kind() string { return "http" } diff --git a/sim/client/transport/gethrpc/gethrpc.go b/sim/client/transport/gethrpc/gethrpc.go index 36aee215..e31042a6 100644 --- a/sim/client/transport/gethrpc/gethrpc.go +++ b/sim/client/transport/gethrpc/gethrpc.go @@ -20,6 +20,10 @@ func NewClient(namespace string, rawurl string) *Client { } } +func (c *Client) ID() string { + return c.rawurl +} + func (f *Client) IsSet() bool { return f.namespace != "" && f.rawurl != "" } diff --git a/sim/client/transport/gethws/client.go b/sim/client/transport/gethws/client.go index 9209a1d9..eb4f9047 100644 --- a/sim/client/transport/gethws/client.go +++ b/sim/client/transport/gethws/client.go @@ -31,6 +31,10 @@ func NewClient(nodeConn Connectionner, namespace string, try bool, l log.Logger) } } +func (c *Client) ID() string { + return "" +} + func (f *Client) IsSet() bool { return f.namespace != "" && f.nodeConn != nil } diff --git a/sim/sim.go b/sim/sim.go index 5a850783..b6305efc 100644 --- a/sim/sim.go +++ b/sim/sim.go @@ -5,53 +5,50 @@ import ( "github.com/lthibault/log" - "github.com/blocknative/dreamboat/sim/client/fallback" + "github.com/blocknative/dreamboat/sim/client" "github.com/blocknative/dreamboat/sim/client/transport/gethhttp" "github.com/blocknative/dreamboat/sim/client/transport/gethrpc" "github.com/blocknative/dreamboat/sim/client/transport/gethws" - "github.com/blocknative/dreamboat/sim/client/types" ) const ( gethSimNamespace = "flashbots" ) -type Client interface { - ValidateBlock(ctx context.Context, block *types.BuilderBlockValidationRequest) (err error) - ValidateBlockV2(ctx context.Context, block *types.BuilderBlockValidationRequestV2) (err error) - Kind() string -} - type Fallback interface { - AddClient() + AddClient(cli client.Client) } type Manager struct { - fb *fallback.Fallback + fb Fallback l log.Logger ws *gethws.ReConn } -func NewManager(l log.Logger, fb *fallback.Fallback) (m *Manager) { +func NewManager(l log.Logger, fb Fallback) (m *Manager) { return &Manager{ l: l, fb: fb, } } -func (m *Manager) AddRPCClient(ctx context.Context, simHttpAddr string) { - simRPCCli := gethrpc.NewClient(gethSimNamespace, simHttpAddr) +func (m *Manager) AddRPCClient(ctx context.Context, address string) { + if address == "" { + return + } + simRPCCli := gethrpc.NewClient(gethSimNamespace, address) if err := simRPCCli.Dial(ctx); err != nil { - m.l.WithError(err).Fatalf("fail to initialize rpc connection (%s): %w", simHttpAddr, err) + m.l.WithError(err).Fatalf("fail to initialize rpc connection (%s): %w", address, err) return } m.fb.AddClient(simRPCCli) } func (m *Manager) AddWsClients(ctx context.Context, address string, retry bool) { - //if len(cfg.BlockSimulation.WS.Address) > 0 {//} - //for _, s := range cfg.BlockSimulation.WS.Address { + if address == "" { + return + } if m.ws == nil { m.ws = gethws.NewReConn(m.l) simWSCli := gethws.NewClient(m.ws, gethSimNamespace, retry, m.l) @@ -63,8 +60,20 @@ func (m *Manager) AddWsClients(ctx context.Context, address string, retry bool) } -func (m *Manager) AddHTTPClient(ctx context.Context, simHttpAddr string) { - // if simHttpAddr := cfg.BlockSimulation.HTTP.Address; simHttpAddr != "" { //} - simHTTPCli := gethhttp.NewClient(simHttpAddr, gethSimNamespace, m.l) - m.fb.AddClient(simHTTPCli) +func (m *Manager) AddHTTPClient(ctx context.Context, address string) { + if address == "" { + return + } + m.fb.AddClient(gethhttp.NewClient(address, gethSimNamespace, m.l)) +} + +/* +func (m *Manager) OnConfigChange(c structs.OldNew) (err error) { + switch c.ParamPath { + case "block_simulation.ws.address": + case "block_simulation.http.address": + case "block_simulation.rpc.address": + } + return nil } +*/ diff --git a/stream/stream.go b/stream/stream.go index 25e561f0..89891b9f 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -32,7 +32,6 @@ type Pubsub interface { type StreamConfig struct { Logger log.Logger ID string - TTL time.Duration PubsubTopic string // pubsub topic name for block submissions StreamQueueSize int } diff --git a/structs/config.go b/structs/config.go index ec6fded8..2e673dd0 100644 --- a/structs/config.go +++ b/structs/config.go @@ -1,7 +1,8 @@ package structs type OldNew struct { - Name string - Old any - New any + Name string + ParamPath string + Old any + New any }