Skip to content

Commit

Permalink
Move to config part 3 (#174)
Browse files Browse the repository at this point in the history
* Init validation config

* Skip on empty

* move secretKey into config

* latest changes

* next changes

* Correct fallback

* Distributed config

* next batch

* Next batch of configuration changes

* next batch

* cleanup

* reenable distributed check

* remove time packege

* Change TTL place

* Remove comments

* Type and init fixes
  • Loading branch information
lukanus authored Jun 15, 2023
1 parent e95a97a commit a2425ab
Show file tree
Hide file tree
Showing 24 changed files with 471 additions and 458 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ cmd/scratch/*

.data
/dreamboat
/migration-postgres
/migration-postgres

config.ini
6 changes: 3 additions & 3 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestServerRouting(t *testing.T) {

service := mock_relay.NewMockRelay(ctrl)
c, _ := lru.New[[48]byte, *rate.Limiter](1000)
server := api.NewApi(logger, &ee, service, nil, nil, api.NewLimitter(1, 1, c, nil), TestDataLimit, false)
server := api.NewApi(logger, &ee, service, nil, nil, api.NewLimitter(1, 1, c), TestDataLimit, false)
m := http.NewServeMux()
server.AttachToHandler(m)

Expand Down Expand Up @@ -403,7 +403,7 @@ func BenchmarkAPISequential(b *testing.B) {
service := mock_relay.NewMockRelay(ctrl)
register := mock_relay.NewMockRegistrations(ctrl)
//Log: log.New(log.WithWriter(ioutil.Discard)),
server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil, nil), TestDataLimit, false)
server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil), TestDataLimit, false)
m := http.NewServeMux()
server.AttachToHandler(m)

Expand Down Expand Up @@ -434,7 +434,7 @@ func BenchmarkAPIParallel(b *testing.B) {

register := mock_relay.NewMockRegistrations(ctrl)
//Log: log.New(log.WithWriter(ioutil.Discard)),
server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil, nil), TestDataLimit, false)
server := api.NewApi(logger, &ee, service, register, nil, api.NewLimitter(1, 1, nil), TestDataLimit, false)
m := http.NewServeMux()
server.AttachToHandler(m)

Expand Down
39 changes: 26 additions & 13 deletions api/limitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ type Limitter struct {
LimitterCacheSize int
}

func NewLimitter(ratel int, burst int, c Cache, ab map[[48]byte]struct{}) *Limitter {
func NewLimitter(ratel int, burst int, c Cache) *Limitter {
return &Limitter{
AllowedBuilders: ab,
c: c,
RateLimit: rate.Limit(ratel),
Burst: burst,
c: c,
RateLimit: rate.Limit(ratel),
Burst: burst,
}
}

Expand All @@ -51,6 +50,11 @@ func (l *Limitter) Allow(ctx context.Context, pubkey [48]byte) error {
}

return nil
}

func (l *Limitter) ParseInitialConfig(keys []string) (err error) {
l.AllowedBuilders, err = makeKeyMap(keys)
return err

}

Expand All @@ -68,16 +72,25 @@ func (l *Limitter) OnConfigChange(c structs.OldNew) (err error) {
}
case "AllowedBuilders":
if keys, ok := c.New.([]string); ok {
newKeys := make(map[[48]byte]struct{})
for _, key := range keys {
var pk types.PublicKey
if err = pk.UnmarshalText([]byte(key)); err != nil {
return fmt.Errorf("ALLOWED BUILDER NOT ADDED - wrong public key: %s - %w", key, err)
}
newKeys[pk] = struct{}{}
ab, err := makeKeyMap(keys)
if err != nil {
return err
}
l.AllowedBuilders = newKeys
l.c.Purge()
l.AllowedBuilders = ab
}
}
return nil
}

func makeKeyMap(keys []string) (map[[48]byte]struct{}, error) {
newKeys := make(map[[48]byte]struct{})
for _, key := range keys {
var pk types.PublicKey
if err := pk.UnmarshalText([]byte(key)); err != nil {
return nil, fmt.Errorf("allowed builder not added - wrong public key: %s - %w", key, err)
}
newKeys[pk] = struct{}{}
}
return newKeys, nil
}
4 changes: 0 additions & 4 deletions beacon/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
"github.com/lthibault/log"
)

const (
Version = "0.3.6"
)

var (
ErrUnkownFork = errors.New("beacon node fork is unknown")
)
Expand Down
81 changes: 50 additions & 31 deletions cmd/dreamboat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type Config struct {
}

var DefaultHTTPConfig = &HTTPConfig{
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
IdleTimeout: 2 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 5 * time.Second,
}

type HTTPConfig struct {
Expand All @@ -73,7 +73,7 @@ type SQLConfig struct {
}

var DefaultBadgerDBConfig = &BadgerDBConfig{
TTL: 48 * time.Hour,
TTL: 24 * time.Hour,
}

type BadgerDBConfig struct {
Expand Down Expand Up @@ -110,8 +110,9 @@ type ApiConfig struct {
}

var DefaultRelayConfig = &RelayConfig{
PublishBlock: true,
MaxBlockPublishDelay: 500 * time.Millisecond,
PublishBlock: true,
GetPayloadResponseDelay: 800 * time.Millisecond,
GetPayloadRequestTimeLimit: 4 * time.Second,
}

type RelayConfig struct {
Expand All @@ -127,7 +128,10 @@ type RelayConfig struct {
PublishBlock bool `config:"publish_block"`

// block publish delay
MaxBlockPublishDelay time.Duration `config:"max_block_publish_delay,allow_dynamic"`
GetPayloadResponseDelay time.Duration `config:"get_payload_response_delay,allow_dynamic"`

// deadline for calling get Payload
GetPayloadRequestTimeLimit time.Duration `config:"get_payload_request_time_limit,allow_dynamic"`

// comma separated list of allowed builder pubkeys"
AllowedBuilders []string `config:"allowed_builders,allow_dynamic"` // map[[48]byte]struct{}
Expand All @@ -136,7 +140,7 @@ type RelayConfig struct {
var DefaultBeaconConfig = &BeaconConfig{
PayloadAttributesSubscription: true,
EventRestart: 5,
EventTimeout: 26 * time.Second,
EventTimeout: 16 * time.Second,
QueryTimeout: 20 * time.Second,
}

Expand All @@ -162,12 +166,18 @@ type BeaconConfig struct {
QueryTimeout time.Duration `config:"query_timeout"`
}

var DefaultBlockSimulation = &BlockSimulationConfig{}
var DefaultBlockSimulation = &BlockSimulationConfig{
WS: &BlockSimulationWSConfig{
Retry: true,
},
RPC: &BlockSimulationRPCConfig{},
HTTP: &BlockSimulationHTTPConfig{},
}

type BlockSimulationConfig struct {
RPC BlockSimulationRPCConfig `config:"rpc"`
WS BlockSimulationWSConfig `config:"ws"`
HTTP BlockSimulationHTTPConfig `config:"http"`
RPC *BlockSimulationRPCConfig `config:"rpc"`
WS *BlockSimulationWSConfig `config:"ws"`
HTTP *BlockSimulationHTTPConfig `config:"http"`
}

type BlockSimulationRPCConfig struct {
Expand All @@ -180,7 +190,7 @@ type BlockSimulationWSConfig struct {
// block validation endpoint address (comma separated list)
Address []string `config:"address,allow_dynamic"`
// retry to other websocket connections on failure"
Retry bool `config:"retry"`
Retry bool `config:"retry,allow_dynamic"`
}

type BlockSimulationHTTPConfig struct {
Expand All @@ -196,11 +206,11 @@ type ValidatorsConfig struct {
// The size of response queue, should be set to expected number of validators in one request
QueueSize uint `config:"queue_size"`
// Number of workers storing validators in parallel
StoreWorkersNum uint64 `config:"store_workers"`
StoreWorkersNum uint `config:"store_workers"`
// Registrations cache size
RegistrationsCacheSize int `config:"registrations_cache_size"`
// Registrations cache ttl
RegistrationsCacheTTL time.Duration `config:"registrations_cache_ttl"`
RegistrationsReadCacheTTL time.Duration `config:"registrations_cache_ttl"`
// Registrations cache ttl
RegistrationsWriteCacheTTL time.Duration `config:"registrations_write_cache_ttl"`
}
Expand All @@ -211,7 +221,7 @@ var DefaultValidatorsConfig = &ValidatorsConfig{
QueueSize: 100_000,
StoreWorkersNum: 400,
RegistrationsCacheSize: 600_000,
RegistrationsCacheTTL: time.Hour,
RegistrationsReadCacheTTL: time.Hour,
RegistrationsWriteCacheTTL: 12 * time.Hour,
}

Expand Down Expand Up @@ -243,37 +253,45 @@ type PayloadConfig struct {
// BadgerDB config
Badger BadgerDBConfig `config:"badger"`
// number of payloads to cache for fast in-memory reads

CacheSize int `config:"cache_size"`

// Redis config
Redis RedisDBConfig `config:"redis"`

// TTL of payload data
TTL time.Duration `config:"TTL,allow_dynamic"`
}

var DefaultPayloadConfig = &PayloadConfig{
Badger: *DefaultBadgerDBConfig,
TTL: 24 * time.Hour,
CacheSize: 1_000,
Redis: *DefaultRedisDBConfig,
}

type RedisDBConfig struct {
Master RedisConfig `config:"master"`
Replica RedisConfig `config:"replica"`
Read *RedisConfig `config:"read"`
Write *RedisConfig `config:"write"`
}

type WarehouseConfig struct {
Enabled bool `config:"enabled"`
var DefaultRedisDBConfig = &RedisDBConfig{
Read: &RedisConfig{},
Write: &RedisConfig{},
}

type WarehouseConfig struct {
// Data directory where the data is stored in the warehouse
Directory string `config:"directory"`

// Number of workers for storing data in warehouse, if 0, then data is not exported
WorkerNumber int `config:"workers"`

// Size of the buffer for processing requests
Buffer int `config:"directory"`
Buffer int `config:"buffer"`
}

var DefaultWarehouseConfig = &WarehouseConfig{
Enabled: true,
Directory: "/data/relay/warehouse",
WorkerNumber: 32,
Buffer: 1_000,
Expand All @@ -282,24 +300,25 @@ var DefaultWarehouseConfig = &WarehouseConfig{
type DistributedConfig struct {
Redis *RedisStreamConfig `config:"redis"`

Enabled bool `config:"enabled"`
InstanceID string `config:"id"`

// Number of workers for storing data in warehouse, if 0, then data is not exported
WorkerNumber int `config:"workers"`

// publish all submitted blocks into pubsub. If false, only blocks returned in GetHeader are published
PublishOnSubmission bool `config:"publish_on_submission"`

// Stream internal channel size
StreamQueueSize int
StreamQueueSize int `config:"stream_queue_size"`

// stream entire block for every bid that is served in GetHeader requests.
StreamServedBids bool `config:"stream_served_bids"`
}

var DefaultDistributedConfig = &DistributedConfig{
Enabled: true,
WorkerNumber: 100,
PublishOnSubmission: false,
StreamQueueSize: 200,
Redis: &RedisStreamConfig{
Topic: "relay",
},
WorkerNumber: 100,
StreamQueueSize: 100,
StreamServedBids: true,
}

type RedisStreamConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions cmd/dreamboat/config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func DefaultConfig() Config {
Validators: DefaultValidatorsConfig,
Payload: DefaultPayloadConfig,
DataAPI: DefaultDataAPIConfig,
Warehouse: DefaultWarehouseConfig,
Distributed: DefaultDistributedConfig,
}
c.ExternalHttp.Address = "0.0.0.0:18550"
c.InternalHttp.Address = "0.0.0.0:19550"
Expand Down
Loading

0 comments on commit a2425ab

Please sign in to comment.