diff --git a/datastore/datastore_test.go b/datastore/datastore_test.go index 8278d253..f0969951 100644 --- a/datastore/datastore_test.go +++ b/datastore/datastore_test.go @@ -15,9 +15,6 @@ import ( "github.com/flashbots/go-boost-utils/bls" "github.com/flashbots/go-boost-utils/types" - ds "github.com/ipfs/go-datastore" - - ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/require" ) @@ -28,7 +25,7 @@ func TestPutGetPayload(t *testing.T) { defer cancel() store := newMockDatastore() - cache, _ := lru.New[structs.PayloadKey, *structs.BlockBidAndTrace](10) + cache, _ := lru.New[structs.PayloadKey, structs.BlockBidAndTrace](10) ds := datastore.Datastore{TTLStorage: store, PayloadCache: cache} payload := randomBlockBidAndTrace() @@ -158,32 +155,3 @@ func random256Bytes() (b [256]byte) { rand.Read(b[:]) return b } - -var _ datastore.TTLStorage = (*mockDatastore)(nil) - -type mockDatastore struct{ ds.Datastore } - -func newMockDatastore() mockDatastore { - return mockDatastore{ds_sync.MutexWrap(ds.NewMapDatastore())} -} - -func (d mockDatastore) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error { - go func() { - time.Sleep(ttl) - d.Delete(ctx, key) - }() - - return d.Datastore.Put(ctx, key, value) -} - -func (d mockDatastore) GetBatch(ctx context.Context, keys []ds.Key) (batch [][]byte, err error) { - for _, key := range keys { - data, err := d.Datastore.Get(ctx, key) - if err != nil { - continue - } - batch = append(batch, data) - } - - return -} diff --git a/datastore/evidence/badger/badger_test.go b/datastore/evidence/badger/payload_test.go similarity index 89% rename from datastore/evidence/badger/badger_test.go rename to datastore/evidence/badger/payload_test.go index a8b859ba..1649a09e 100644 --- a/datastore/evidence/badger/badger_test.go +++ b/datastore/evidence/badger/payload_test.go @@ -12,7 +12,7 @@ import ( dbbadger "github.com/ipfs/go-ds-badger2" - "github.com/blocknative/dreamboat/pkg/datastore/evidence/badger" + "github.com/blocknative/dreamboat/datastore/evidence/badger" "github.com/stretchr/testify/require" ) @@ -59,14 +59,14 @@ func TestPutGetHeaderDelivered(t *testing.T) { require.ErrorIs(t, err, ds.ErrNotFound) // get by block number - _, err = d.GetDeliveredPayloads(ctx, uint64(slotInt+1), structs.PayloadTraceQuery{BlockNum: header.Header.BlockNumber}) + _, err = d.GetDeliveredPayloads(ctx, uint64(slotInt+1), structs.PayloadTraceQuery{BlockNum: header.Header.GetBlockNumber()}) require.ErrorIs(t, err, ds.ErrNotFound) _, err = d.GetDeliveredPayloads(ctx, uint64(slotInt+1), structs.PayloadTraceQuery{Pubkey: header.Trace.ProposerPubkey}) require.ErrorIs(t, err, ds.ErrNotFound) // set as delivered and retrieve again - err = d.PutDelivered(ctx, slot, structs.DeliveredTrace{Trace: *header.Trace, BlockNumber: header.Header.BlockNumber}, time.Minute) + err = d.PutDelivered(ctx, slot, structs.DeliveredTrace{Trace: header.Trace, BlockNumber: header.Header.GetBlockNumber()}, time.Minute) require.NoError(t, err) // get @@ -80,7 +80,7 @@ func TestPutGetHeaderDelivered(t *testing.T) { require.EqualValues(t, header.Trace.Value, gotHeader[0].BidTrace.Value) // get by block number - gotHeader, err = d.GetDeliveredPayloads(ctx, uint64(slotInt+1), structs.PayloadTraceQuery{BlockNum: header.Header.BlockNumber}) + gotHeader, err = d.GetDeliveredPayloads(ctx, uint64(slotInt+1), structs.PayloadTraceQuery{BlockNum: header.Header.GetBlockNumber()}) require.NoError(t, err) require.EqualValues(t, header.Trace.Value, gotHeader[0].BidTrace.Value) diff --git a/datastore/evidence/postgres/blocks.go b/datastore/evidence/postgres/blocks.go new file mode 100644 index 00000000..5356e0bf --- /dev/null +++ b/datastore/evidence/postgres/blocks.go @@ -0,0 +1,131 @@ +package dspostgres + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "strings" + "time" + + "github.com/blocknative/dreamboat/structs" +) + +func (s *Datastore) PutBuilderBlockSubmission(ctx context.Context, bid structs.BidTraceWithTimestamp, isMostProfitable bool) (err error) { + _, err = s.DB.ExecContext(ctx, `INSERT INTO builder_block_submission + ( relay_id, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, + gas_used, gas_limit, value, epoch, num_tx, block_number, was_most_profitable, block_time) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15) + ON CONFLICT (relay_id, slot, proposer_pubkey, block_hash) + DO UPDATE SET + parent_hash = EXCLUDED.parent_hash, + builder_pubkey = EXCLUDED.builder_pubkey, + proposer_fee_recipient = EXCLUDED.proposer_fee_recipient, + gas_used = EXCLUDED.gas_used, + gas_limit = EXCLUDED.gas_limit, + value = EXCLUDED.value, + epoch = EXCLUDED.epoch, + num_tx = EXCLUDED.num_tx, + block_number = EXCLUDED.block_number, + was_most_profitable = EXCLUDED.was_most_profitable, + block_time = EXCLUDED.block_time, + inserted_at = NOW()`, + s.RelayID, bid.Slot, bid.ParentHash.String(), bid.BlockHash.String(), bid.BuilderPubkey.String(), bid.ProposerPubkey.String(), + bid.ProposerFeeRecipient.String(), bid.GasUsed, bid.GasLimit, bid.Value.String(), uint64(bid.Slot)/uint64(SlotsPerEpoch), + bid.NumTx, bid.BlockNumber, isMostProfitable, time.UnixMilli(int64(bid.TimestampMs))) + return err +} + +func (s *Datastore) GetBuilderBlockSubmissions(ctx context.Context, headSlot uint64, payload structs.SubmissionTraceQuery) (bts []structs.BidTraceWithTimestamp, err error) { + var i = 1 + parts := []string{"relay_id = $" + strconv.Itoa(i)} + data := []interface{}{s.RelayID} + i++ + + if payload.Slot > 0 { + parts = append(parts, "slot = $"+strconv.Itoa(i)) + data = append(data, payload.Slot) + i++ + } + + if payload.BlockHash != [32]byte{} { + parts = append(parts, "block_hash = $"+strconv.Itoa(i)) + data = append(data, payload.BlockHash.String()) + i++ + } + + if payload.BlockNum > 0 { + parts = append(parts, "block_number = $"+strconv.Itoa(i)) + data = append(data, payload.BlockNum) + i++ + } + /* + if payload. != "" { + parts = append(parts, "builder_pubkey = $"+strconv.Itoa(i)) + data = append(data, payload.BuilderPubkey) + i++ + } + */ + qBuilder := strings.Builder{} + qBuilder.WriteString(`SELECT block_time, slot, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, value, gas_used, gas_limit, block_number, num_tx FROM builder_block_submission `) + + if len(parts) > 0 { + qBuilder.WriteString(" WHERE ") + for i, par := range parts { + if i != 0 { + qBuilder.WriteString(" AND ") + } + qBuilder.WriteString(par) + } + } + + qBuilder.WriteString(` ORDER BY slot DESC, block_time DESC, block_hash DESC LIMIT $` + strconv.Itoa(i)) + data = append(data, payload.Limit) + rows, err := s.DB.QueryContext(ctx, qBuilder.String(), data...) + switch { + case err == sql.ErrNoRows: + return nil, ErrNoRows + case err != nil: + return nil, fmt.Errorf("query error: %w", err) + default: + } + + defer rows.Close() + var ( + builderpubkey []byte + proposerPubkey []byte + proposerFeeRecipient []byte + parentHash []byte + blockHash []byte + value []byte + ) + for rows.Next() { + + bt := structs.BidTraceWithTimestamp{} + t := time.Time{} + err = rows.Scan(&t, &bt.Slot, &builderpubkey, &proposerPubkey, &proposerFeeRecipient, &parentHash, &blockHash, &value, + &bt.GasUsed, &bt.GasLimit, &bt.BlockNumber, &bt.NumTx) + if err != nil { + return nil, err + } + bt.BuilderPubkey.UnmarshalText(builderpubkey) + bt.ProposerPubkey.UnmarshalText(proposerPubkey) + bt.ProposerFeeRecipient.UnmarshalText(proposerFeeRecipient) + bt.ParentHash.UnmarshalText(parentHash) + bt.BlockHash.UnmarshalText(blockHash) + bt.Value.UnmarshalText(value) + + bt.Timestamp = uint64(t.Unix()) + bt.TimestampMs = uint64(t.UnixMicro()) + bts = append(bts, bt) + } + return bts, err +} + +type GetBuilderSubmissionsFilters struct { + Slot uint64 + Limit uint64 + BlockHash string + BlockNumber uint64 + BuilderPubkey string +} diff --git a/datastore/evidence/postgres/payload.go b/datastore/evidence/postgres/payload.go new file mode 100644 index 00000000..8fbda23e --- /dev/null +++ b/datastore/evidence/postgres/payload.go @@ -0,0 +1,162 @@ +package dspostgres + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "strings" + "time" + + "github.com/blocknative/dreamboat/structs" +) + +func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, payload structs.DeliveredTrace, ttl time.Duration) (err error) { + _, err = s.DB.ExecContext(ctx, `INSERT INTO payload_delivered + ( relay_id, slot, epoch, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, num_tx, block_number, gas_used, gas_limit, value ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) + ON CONFLICT (relay_id, slot, proposer_pubkey, block_hash) + DO UPDATE SET + parent_hash = EXCLUDED.parent_hash, + builder_pubkey = EXCLUDED.builder_pubkey, + proposer_fee_recipient = EXCLUDED.proposer_fee_recipient, + gas_used = EXCLUDED.gas_used, + gas_limit = EXCLUDED.gas_limit, + value = EXCLUDED.value, + epoch = EXCLUDED.epoch, + num_tx = EXCLUDED.num_tx, + block_number = EXCLUDED.block_number, + inserted_at = NOW()`, + s.RelayID, uint64(slot), uint64(slot)/uint64(SlotsPerEpoch), payload.Trace.BidTraceExtended.BuilderPubkey.String(), payload.Trace.BidTraceExtended.ProposerPubkey.String(), + payload.Trace.BidTraceExtended.ProposerFeeRecipient.String(), payload.Trace.BidTraceExtended.ParentHash.String(), payload.Trace.BidTraceExtended.BlockHash.String(), + payload.Trace.BidTraceExtended.NumTx, payload.BlockNumber, payload.Trace.BidTraceExtended.GasUsed, payload.Trace.BidTraceExtended.GasLimit, + payload.Trace.BidTraceExtended.Value.String()) + return err +} + +func (s *Datastore) GetDeliveredPayloads(ctx context.Context, headSlot uint64, queryArgs structs.PayloadTraceQuery) (bts []structs.BidTraceExtended, err error) { + var i = 1 + parts := []string{"relay_id = $" + strconv.Itoa(i)} + data := []interface{}{s.RelayID} + i++ + + if queryArgs.Slot > 0 { + parts = append(parts, "slot = $"+strconv.Itoa(i)) + data = append(data, queryArgs.Slot) + i++ + } else if queryArgs.Cursor > 0 { + parts = append(parts, "slot <= $"+strconv.Itoa(i)) + data = append(data, queryArgs.Cursor) + i++ + } + + if queryArgs.BlockHash != [32]byte{} { + parts = append(parts, "block_hash = $"+strconv.Itoa(i)) + data = append(data, queryArgs.BlockHash) + i++ + } + + if queryArgs.BlockNum > 0 { + parts = append(parts, "block_number = $"+strconv.Itoa(i)) + data = append(data, queryArgs.BlockNum) + i++ + } + + if queryArgs.Pubkey.String() != "" { + parts = append(parts, "builder_pubkey = $"+strconv.Itoa(i)) + data = append(data, queryArgs.Pubkey.String()) + i++ + } + + // TODO(l): BUG? Unsupported in relay? + /* + if queryArgs.ProposerPubkey != "" { + parts = append(parts, "proposer_pubkey = $"+strconv.Itoa(i)) + data = append(data, queryArgs.ProposerPubkey) + i++ + } + */ + qBuilder := strings.Builder{} + qBuilder.WriteString(`SELECT slot, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, block_number, num_tx, value, gas_used, gas_limit FROM payload_delivered `) + + if len(parts) > 0 { + qBuilder.WriteString(" WHERE ") + for i, par := range parts { + if i != 0 { + qBuilder.WriteString(" AND ") + } + qBuilder.WriteString(par) + } + } + + // if filters.OrderByValue > 0 { + // qBuilder.WriteString(` ORDER BY value ASC `) + // } else if filters.OrderByValue < 0 { + // qBuilder.WriteString(` ORDER BY value DESC `) + // } else { + qBuilder.WriteString(` ORDER BY slot DESC, inserted_at DESC `) + + if queryArgs.Limit > 0 { + qBuilder.WriteString(` LIMIT $` + strconv.Itoa(i)) + data = append(data, queryArgs.Limit) + } + + rows, err := s.DB.QueryContext(ctx, qBuilder.String(), data...) + switch { + case err == sql.ErrNoRows: + return nil, ErrNoRows + case err != nil: + return nil, fmt.Errorf("query error: %w", err) + default: + } + + defer rows.Close() + + var ( + builderpubkey []byte + proposerPubkey []byte + proposerFeeRecipient []byte + parentHash []byte + blockHash []byte + value []byte + ) + for rows.Next() { + bt := structs.BidTraceExtended{} + err = rows.Scan(&bt.Slot, &builderpubkey, &proposerPubkey, &proposerFeeRecipient, &parentHash, &blockHash, &bt.BlockNumber, &bt.NumTx, &value, &bt.GasUsed, &bt.GasLimit) + if err != nil { + return nil, err + } + + bt.BuilderPubkey.UnmarshalText(builderpubkey) + bt.ProposerPubkey.UnmarshalText(proposerPubkey) + bt.ProposerFeeRecipient.UnmarshalText(proposerFeeRecipient) + bt.ParentHash.UnmarshalText(parentHash) + bt.BlockHash.UnmarshalText(blockHash) + bt.Value.UnmarshalText(value) + + bts = append(bts, bt) + } + return bts, err +} + +/* +func (s *Datastore) CheckSlotDelivered(ctx context.Context, slot uint64) (bool, error) { + var sl uint64 + err := s.DB.QueryRowContext(ctx, "SELECT slot FROM payload_delivered WHERE slot = $1 LIMIT 1", slot).Scan(&sl) + if err == sql.ErrNoRows { + return false, nil + } + return sl == slot, err +} +*/ + +type GetDeliveredPayloadsFilters struct { + Slot uint64 + Cursor uint64 + Limit uint64 + BlockHash string + BlockNumber uint64 + ProposerPubkey string + BuilderPubkey string + OrderByValue int8 +} diff --git a/datastore/evidence/postgres/postgres.go b/datastore/evidence/postgres/postgres.go index 04f3e907..53fd48d1 100644 --- a/datastore/evidence/postgres/postgres.go +++ b/datastore/evidence/postgres/postgres.go @@ -1,15 +1,8 @@ package dspostgres import ( - "context" "database/sql" "errors" - "fmt" - "strconv" - "strings" - "time" - - "github.com/blocknative/dreamboat/structs" ) var SlotsPerEpoch = 32 @@ -25,270 +18,3 @@ func NewDatastore(db *sql.DB, relayID uint64) *Datastore { DB: db, RelayID: relayID} } - -func (s *Datastore) PutBuilderBlockSubmission(ctx context.Context, bid structs.BidTraceWithTimestamp, isMostProfitable bool) (err error) { - _, err = s.DB.ExecContext(ctx, `INSERT INTO builder_block_submission - ( relay_id, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, - gas_used, gas_limit, value, epoch, num_tx, block_number, was_most_profitable, block_time) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15) - ON CONFLICT (relay_id, slot, proposer_pubkey, block_hash) - DO UPDATE SET - parent_hash = EXCLUDED.parent_hash, - builder_pubkey = EXCLUDED.builder_pubkey, - proposer_fee_recipient = EXCLUDED.proposer_fee_recipient, - gas_used = EXCLUDED.gas_used, - gas_limit = EXCLUDED.gas_limit, - value = EXCLUDED.value, - epoch = EXCLUDED.epoch, - num_tx = EXCLUDED.num_tx, - block_number = EXCLUDED.block_number, - was_most_profitable = EXCLUDED.was_most_profitable, - block_time = EXCLUDED.block_time, - inserted_at = NOW()`, - s.RelayID, bid.Slot, bid.ParentHash.String(), bid.BlockHash.String(), bid.BuilderPubkey.String(), bid.ProposerPubkey.String(), - bid.ProposerFeeRecipient.String(), bid.GasUsed, bid.GasLimit, bid.Value.String(), uint64(bid.Slot)/uint64(SlotsPerEpoch), - bid.NumTx, bid.BlockNumber, isMostProfitable, time.UnixMilli(int64(bid.TimestampMs))) - return err -} - -func (s *Datastore) GetBuilderBlockSubmissions(ctx context.Context, headSlot uint64, payload structs.SubmissionTraceQuery) (bts []structs.BidTraceWithTimestamp, err error) { - var i = 1 - parts := []string{"relay_id = $" + strconv.Itoa(i)} - data := []interface{}{s.RelayID} - i++ - - if payload.Slot > 0 { - parts = append(parts, "slot = $"+strconv.Itoa(i)) - data = append(data, payload.Slot) - i++ - } - - if payload.BlockHash != [32]byte{} { - parts = append(parts, "block_hash = $"+strconv.Itoa(i)) - data = append(data, payload.BlockHash.String()) - i++ - } - - if payload.BlockNum > 0 { - parts = append(parts, "block_number = $"+strconv.Itoa(i)) - data = append(data, payload.BlockNum) - i++ - } - /* - if payload. != "" { - parts = append(parts, "builder_pubkey = $"+strconv.Itoa(i)) - data = append(data, payload.BuilderPubkey) - i++ - } - */ - qBuilder := strings.Builder{} - qBuilder.WriteString(`SELECT block_time, slot, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, value, gas_used, gas_limit, block_number, num_tx FROM builder_block_submission `) - - if len(parts) > 0 { - qBuilder.WriteString(" WHERE ") - for i, par := range parts { - if i != 0 { - qBuilder.WriteString(" AND ") - } - qBuilder.WriteString(par) - } - } - - qBuilder.WriteString(` ORDER BY slot DESC, block_time DESC, block_hash DESC LIMIT $` + strconv.Itoa(i)) - data = append(data, payload.Limit) - rows, err := s.DB.QueryContext(ctx, qBuilder.String(), data...) - switch { - case err == sql.ErrNoRows: - return nil, ErrNoRows - case err != nil: - return nil, fmt.Errorf("query error: %w", err) - default: - } - - defer rows.Close() - var ( - builderpubkey []byte - proposerPubkey []byte - proposerFeeRecipient []byte - parentHash []byte - blockHash []byte - value []byte - ) - for rows.Next() { - - bt := structs.BidTraceWithTimestamp{} - t := time.Time{} - err = rows.Scan(&t, &bt.Slot, &builderpubkey, &proposerPubkey, &proposerFeeRecipient, &parentHash, &blockHash, &value, - &bt.GasUsed, &bt.GasLimit, &bt.BlockNumber, &bt.NumTx) - if err != nil { - return nil, err - } - bt.BuilderPubkey.UnmarshalText(builderpubkey) - bt.ProposerPubkey.UnmarshalText(proposerPubkey) - bt.ProposerFeeRecipient.UnmarshalText(proposerFeeRecipient) - bt.ParentHash.UnmarshalText(parentHash) - bt.BlockHash.UnmarshalText(blockHash) - bt.Value.UnmarshalText(value) - - bt.Timestamp = uint64(t.Unix()) - bt.TimestampMs = uint64(t.UnixMicro()) - bts = append(bts, bt) - } - return bts, err -} - -func (s *Datastore) PutDelivered(ctx context.Context, slot structs.Slot, payload structs.DeliveredTrace, ttl time.Duration) (err error) { - _, err = s.DB.ExecContext(ctx, `INSERT INTO payload_delivered - ( relay_id, slot, epoch, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, num_tx, block_number, gas_used, gas_limit, value ) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) - ON CONFLICT (relay_id, slot, proposer_pubkey, block_hash) - DO UPDATE SET - parent_hash = EXCLUDED.parent_hash, - builder_pubkey = EXCLUDED.builder_pubkey, - proposer_fee_recipient = EXCLUDED.proposer_fee_recipient, - gas_used = EXCLUDED.gas_used, - gas_limit = EXCLUDED.gas_limit, - value = EXCLUDED.value, - epoch = EXCLUDED.epoch, - num_tx = EXCLUDED.num_tx, - block_number = EXCLUDED.block_number, - inserted_at = NOW()`, - s.RelayID, uint64(slot), uint64(slot)/uint64(SlotsPerEpoch), payload.Trace.BidTraceExtended.BuilderPubkey.String(), payload.Trace.BidTraceExtended.ProposerPubkey.String(), - payload.Trace.BidTraceExtended.ProposerFeeRecipient.String(), payload.Trace.BidTraceExtended.ParentHash.String(), payload.Trace.BidTraceExtended.BlockHash.String(), - payload.Trace.BidTraceExtended.NumTx, payload.BlockNumber, payload.Trace.BidTraceExtended.GasUsed, payload.Trace.BidTraceExtended.GasLimit, - payload.Trace.BidTraceExtended.Value.String()) - return err -} - -func (s *Datastore) GetDeliveredPayloads(ctx context.Context, headSlot uint64, queryArgs structs.PayloadTraceQuery) (bts []structs.BidTraceExtended, err error) { - var i = 1 - parts := []string{"relay_id = $" + strconv.Itoa(i)} - data := []interface{}{s.RelayID} - i++ - - if queryArgs.Slot > 0 { - parts = append(parts, "slot = $"+strconv.Itoa(i)) - data = append(data, queryArgs.Slot) - i++ - } else if queryArgs.Cursor > 0 { - parts = append(parts, "slot <= $"+strconv.Itoa(i)) - data = append(data, queryArgs.Cursor) - i++ - } - - if queryArgs.BlockHash != [32]byte{} { - parts = append(parts, "block_hash = $"+strconv.Itoa(i)) - data = append(data, queryArgs.BlockHash) - i++ - } - - if queryArgs.BlockNum > 0 { - parts = append(parts, "block_number = $"+strconv.Itoa(i)) - data = append(data, queryArgs.BlockNum) - i++ - } - - if queryArgs.Pubkey.String() != "" { - parts = append(parts, "builder_pubkey = $"+strconv.Itoa(i)) - data = append(data, queryArgs.Pubkey.String()) - i++ - } - - // TODO(l): BUG? Unsupported in relay? - /* - if queryArgs.ProposerPubkey != "" { - parts = append(parts, "proposer_pubkey = $"+strconv.Itoa(i)) - data = append(data, queryArgs.ProposerPubkey) - i++ - } - */ - qBuilder := strings.Builder{} - qBuilder.WriteString(`SELECT slot, builder_pubkey, proposer_pubkey, proposer_fee_recipient, parent_hash, block_hash, block_number, num_tx, value, gas_used, gas_limit FROM payload_delivered `) - - if len(parts) > 0 { - qBuilder.WriteString(" WHERE ") - for i, par := range parts { - if i != 0 { - qBuilder.WriteString(" AND ") - } - qBuilder.WriteString(par) - } - } - - // if filters.OrderByValue > 0 { - // qBuilder.WriteString(` ORDER BY value ASC `) - // } else if filters.OrderByValue < 0 { - // qBuilder.WriteString(` ORDER BY value DESC `) - // } else { - qBuilder.WriteString(` ORDER BY slot DESC, inserted_at DESC `) - - if queryArgs.Limit > 0 { - qBuilder.WriteString(` LIMIT $` + strconv.Itoa(i)) - data = append(data, queryArgs.Limit) - } - - rows, err := s.DB.QueryContext(ctx, qBuilder.String(), data...) - switch { - case err == sql.ErrNoRows: - return nil, ErrNoRows - case err != nil: - return nil, fmt.Errorf("query error: %w", err) - default: - } - - defer rows.Close() - - var ( - builderpubkey []byte - proposerPubkey []byte - proposerFeeRecipient []byte - parentHash []byte - blockHash []byte - value []byte - ) - for rows.Next() { - bt := structs.BidTraceExtended{} - err = rows.Scan(&bt.Slot, &builderpubkey, &proposerPubkey, &proposerFeeRecipient, &parentHash, &blockHash, &bt.BlockNumber, &bt.NumTx, &value, &bt.GasUsed, &bt.GasLimit) - if err != nil { - return nil, err - } - - bt.BuilderPubkey.UnmarshalText(builderpubkey) - bt.ProposerPubkey.UnmarshalText(proposerPubkey) - bt.ProposerFeeRecipient.UnmarshalText(proposerFeeRecipient) - bt.ParentHash.UnmarshalText(parentHash) - bt.BlockHash.UnmarshalText(blockHash) - bt.Value.UnmarshalText(value) - - bts = append(bts, bt) - } - return bts, err -} - -func (s *Datastore) CheckSlotDelivered(ctx context.Context, slot uint64) (bool, error) { - var sl uint64 - err := s.DB.QueryRowContext(ctx, "SELECT slot FROM payload_delivered WHERE slot = $1 LIMIT 1", slot).Scan(&sl) - if err == sql.ErrNoRows { - return false, nil - } - return sl == slot, err -} - -type GetBuilderSubmissionsFilters struct { - Slot uint64 - Limit uint64 - BlockHash string - BlockNumber uint64 - BuilderPubkey string -} - -type GetDeliveredPayloadsFilters struct { - Slot uint64 - Cursor uint64 - Limit uint64 - BlockHash string - BlockNumber uint64 - ProposerPubkey string - BuilderPubkey string - OrderByValue int8 -} diff --git a/relay/submit_test.go b/relay/submit_test.go index dab3edd0..51443ade 100644 --- a/relay/submit_test.go +++ b/relay/submit_test.go @@ -25,6 +25,7 @@ import ( type fields struct { d Datastore + das DataAPIStore a Auctioneer ver Verifier config RelayConfig @@ -41,6 +42,7 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe types.DomainTypeBeaconProposer, types.Root{}.String()) + require.NoError(t, err) conf := RelayConfig{ ProposerSigningDomain: map[structs.ForkVersion]types.Domain{ 0: proposerSigningDomain, @@ -51,6 +53,8 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe } ds := mocks.NewMockDatastore(ctrl) + + das := mocks.NewMockDataAPIStore(ctrl) state := mocks.NewMockState(ctrl) cache := mocks.NewMockValidatorCache(ctrl) vstore := mocks.NewMockValidatorStore(ctrl) @@ -59,7 +63,6 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe a := mocks.NewMockAuctioneer(ctrl) // Submit Block - state.EXPECT().Genesis().MaxTimes(1).Return( structs.GenesisInfo{GenesisTime: genesisTime}, ) @@ -73,7 +76,7 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe structs.ValidatorCacheEntry{}, false, ) - state.EXPECT().ForkVersion(structs.Slot(submitRequest.Slot())).Times(2).Return(fork) + state.EXPECT().ForkVersion(structs.Slot(submitRequest.Slot())).AnyTimes().Return(fork) vstore.EXPECT().GetRegistration(context.Background(), submitRequest.ProposerPubkey()).MaxTimes(1).Return( types.SignedValidatorRegistration{ @@ -93,12 +96,12 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe switch fork { case structs.ForkBellatrix: bvc.EXPECT().ValidateBlock(context.Background(), &rpctypes.BuilderBlockValidationRequest{ - SubmitBlockRequest: submitRequest, + SubmitBlockRequest: submitRequest.(*bellatrix.SubmitBlockRequest), RegisteredGasLimit: 3_000_000, }).Return(nil) case structs.ForkCapella: bvc.EXPECT().ValidateBlockV2(context.Background(), &rpctypes.BuilderBlockValidationRequestV2{ - SubmitBlockRequest: submitRequest, + SubmitBlockRequest: submitRequest.(*capella.SubmitBlockRequest), RegisteredGasLimit: 3_000_000, }).Return(nil) } @@ -115,6 +118,8 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe return true }) + das.EXPECT().PutBuilderBlockSubmission(context.Background(), contents.Header.Trace, true).Times(1) + //// GetHeader a.EXPECT().MaxProfitBlock(structs.Slot(submitRequest.Slot())).Times(1). DoAndReturn(func(slot structs.Slot) (block *structs.CompleteBlockstruct, a bool) { @@ -153,6 +158,7 @@ func simpletest(t require.TestingT, ctrl *gomock.Controller, fork structs.ForkVe return fields{ config: conf, d: ds, + das: das, a: a, ver: verify, cache: cache, @@ -223,6 +229,7 @@ func TestRelay_SubmitBlock(t *testing.T) { f.ver, f.beaconState, f.d, + f.das, f.a, f.bvc)