Skip to content

Commit

Permalink
enable block submission warehouse (#152)
Browse files Browse the repository at this point in the history
* enable block submission warehouse

* change warehouse output files from json to txt

* change warehouse output files from txt to csv

* fix DataType
  • Loading branch information
aratz-lasa authored May 9, 2023
1 parent ee1142a commit bd81b1e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 24 deletions.
2 changes: 1 addition & 1 deletion datastore/warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func newWorker(id int, datadir string, logger log.Logger) *worker {

func (w *worker) getOrCreateFile(req StoreRequest) (*os.File, error) {
// get
filename := fmt.Sprintf("%s/%s/output_%d_%d.json", w.datadir, req.DataType, req.Slot, w.id)
filename := fmt.Sprintf("%s/%s/%d_%d.csv", w.datadir, req.DataType, req.Slot, w.id)
if fileWithTs, ok := w.files[filename]; ok {
fileWithTs.ts = time.Now()
return fileWithTs.File, nil
Expand Down
46 changes: 23 additions & 23 deletions relay/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lthibault/log"

"github.com/blocknative/dreamboat/beacon"
wh "github.com/blocknative/dreamboat/datastore/warehouse"
rpctypes "github.com/blocknative/dreamboat/client/sim/types"
"github.com/blocknative/dreamboat/structs"
"github.com/blocknative/dreamboat/structs/forks/bellatrix"
Expand Down Expand Up @@ -98,24 +99,23 @@ func (rs *Relay) SubmitBlock(ctx context.Context, m *structs.MetricGroup, uc str
return err
}

// TODO
// if rs.wh != nil {
// tStoreWarehouse := time.Now()
// req := wh.StoreRequest{
// DataType: wh.SubmitBlockRequest,
// Data: sbr.Raw(),
// Slot: sbr.Slot(),
// Id: sbr.BlockHash().String(),
// Timestamp: tStart,
// }
// if err := rs.wh.StoreAsync(context.Background(), req); err != nil {
// logger.WithError(err).Warn("failed to store in warehouse")
// // we should not return error because it's already been stored for delivery
// } else {
// m.AppendSince(tStoreWarehouse, "submitBlock", "storeWarehouse")
// logger.Debug("stored in warehouse")
// }
// }
if rs.wh != nil {
tStoreWarehouse := time.Now()
req := wh.StoreRequest{
DataType: "SubmitBlockRequest",
Data: sbr.Raw(),
Slot: sbr.Slot(),
Id: sbr.BlockHash().String(),
Timestamp: tStart,
}
if err := rs.wh.StoreAsync(context.Background(), req); err != nil {
logger.WithError(err).Warn("failed to store in warehouse")
// we should not return error because it's already been stored for delivery
} else {
m.AppendSince(tStoreWarehouse, "submitBlock", "storeWarehouse")
logger.Debug("stored in warehouse")
}
}

processingTime := time.Since(tStart)
// subtract the retry waiting times
Expand Down Expand Up @@ -277,11 +277,11 @@ func verifyBlock(sbr structs.SubmitBlockRequest, beaconState State) (retry bool,
time.Sleep(StateRecheckDelay) // recheck sync state for early blocks
randao := beaconState.Randao(sbr.Slot() - 1)
if randao.Randao == "" {
prev, next := beaconState.Randao(sbr.Slot() - 2), beaconState.Randao(sbr.Slot())
return true, fmt.Errorf("randao for slot %d not found. Previous: %s and Next:%s", sbr.Slot(), prev.Randao, next.Randao)
prev, next := beaconState.Randao(sbr.Slot()-2), beaconState.Randao(sbr.Slot())
return true, fmt.Errorf("randao for slot %d not found. Previous: %s and Next:%s", sbr.Slot(), prev.Randao, next.Randao)
}
if randao.Randao != sbr.Random().String() {
prev, next := beaconState.Randao(sbr.Slot() - 2), beaconState.Randao(sbr.Slot())
prev, next := beaconState.Randao(sbr.Slot()-2), beaconState.Randao(sbr.Slot())
return true, fmt.Errorf("%w: got %s, expected %s. Previous: %s and Next:%s", ErrInvalidRandao, sbr.Random().String(), randao.Randao, prev.Randao, next.Randao)
}
return true, nil
Expand Down Expand Up @@ -316,7 +316,7 @@ func verifyWithdrawals(state State, submitBlockRequest structs.SubmitBlockReques
retried = true
withdrawalState = state.Withdrawals(submitBlockRequest.Slot() - 1)
if withdrawalState.Slot == 0 {
prev, next := state.Withdrawals(submitBlockRequest.Slot() - 2), state.Withdrawals(submitBlockRequest.Slot())
prev, next := state.Withdrawals(submitBlockRequest.Slot()-2), state.Withdrawals(submitBlockRequest.Slot())
return root, retried, fmt.Errorf("withdrawals for slot %d not found. Previous: %s and Next: %s", submitBlockRequest.Slot(), prev.Root.String(), next.Root.String())
}
}
Expand All @@ -330,7 +330,7 @@ func verifyWithdrawals(state State, submitBlockRequest structs.SubmitBlockReques

root = types.Root(withdrawalsRoot)
if withdrawalState.Root != withdrawalsRoot {
prev, next := state.Withdrawals(submitBlockRequest.Slot() - 2), state.Withdrawals(submitBlockRequest.Slot())
prev, next := state.Withdrawals(submitBlockRequest.Slot()-2), state.Withdrawals(submitBlockRequest.Slot())
err = fmt.Errorf("%w: got %s, expected %s. Previous: %s and Next: %s", ErrInvalidWithdrawalRoot, types.Root(withdrawalsRoot).String(), withdrawalState.Root.String(), prev.Root.String(), next.Root.String())
}

Expand Down

0 comments on commit bd81b1e

Please sign in to comment.