Skip to content

Commit

Permalink
Finalize Relay Distribution with Streaming Layer and Redis Pubsub Int…
Browse files Browse the repository at this point in the history
…egration (#113)

* implement unoptimized stream

* add topic to subscription log

* encode decode stream prepending slot

* fix main

* enable relay-distribution-publish-submissions by default

* remove badger init duplicate

* add stream cache to main.go

* gert rid of publishers

* fix publish topic

* fix warehouse service log

* enable distribution flag

* remove debug statement

* add state

* update streamBlock

* add metric for cache hit in GetPayload

* add metrics to GetPayload

* fix processingTimeMs field

* Make stream structs lightweight (#157)

* make stream structs lightweight

* fix main

* resolve todo

* remove cache from datastore

* fix cache nil pointer

* fix nil channels in stream

* fix nil pointer block cache stream

* add fallthrough to decode

* add flag distributed

* remove unnecessary flag

* add cache hit metric per function

* make payload cache multi-slot

* put back fast boot flag

* set stream queue sizes

* fix auctioneer hardcoded const

* init relay subscriber

* fix block cache stream sub

* fix nil pointer panic in non-distributed version

* make bid interface

add slot to log

add json tags to structs

make BuilderBidExtended interface

fix auctioneer

add logs about stream receivals

add logs about stream receivals

fix nil pointer

remove debug

* separate pubsub and storage redis

* separate read and write redis clients

* Per parent MEV (#160)

* implement bid capellla ssz encoding

* fix bid format

* Optimize Stream Functionality and Implement SSZ Encode/Decode for Improved Performance (#164)

* add ssz marshal of BlockAndTraceExtended

* untested unmarshalSSZ

* enable ssz for capella block cache stream

* fix ExecutionPayload ssz encoding

* add test for block cache ssz encoding

* add working test for BlockAndTrace SSZ encoding decoding

* refactor BlockAndTraceExtended, so that extension is the taiñ

* Correct lib version

* fix stream encoding

* decode forkFormat

* improve encoding decoding

* fix relay

* refactor bid extended

* refactor payload cache counter

* add stream publish metrics

* Per parent beacon state: withdrawals + randao (#165)

* Revert dep to not failing

* make randao and withdrawals per parentHash

* fix merge

* fix build

* optimize slot processing

* fix randao set

* fix mutex pointer

* fix infinite loop

* fix mutex pointer

* fix updateWithdrawalsAndRandao

* add log to diff parent blockhash

* fix state set

* remove unused log fields

---------

Co-authored-by: Łukasz Miłkowski <[email protected]>

* fix metric for bandwidth

* add log of send and receive from stream

* add size to log

* add field to easily identify events for same stream item

* fix function name metric

* add log before publishing

* add gzip close

* clean old maps

* add size to block submissions

* Additional redis metrics (#167)

* add request limit to getHeader

* log blockHash

* improved err logging on beacon Run

---------

Co-authored-by: Łukasz Miłkowski <[email protected]>
  • Loading branch information
aratz-lasa and lukanus authored Jun 6, 2023
1 parent c51a50d commit 9cb272a
Show file tree
Hide file tree
Showing 26 changed files with 2,282 additions and 457 deletions.
96 changes: 50 additions & 46 deletions auction/auctioneer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

type Auctioneer struct {
auctions [3]*Auction
auctions [structs.NumberOfSlotsInState]*Auction
}

type Auction struct {
mu sync.RWMutex
Slot uint64
maxProfit map[types.Hash]*structs.CompleteBlockstruct
latestBlockByBuilder map[LatestKey]*structs.CompleteBlockstruct
maxProfit map[types.Hash]structs.BuilderBidExtended
latestBlockByBuilder map[LatestKey]structs.BuilderBidExtended
}

type LatestKey struct {
Expand All @@ -24,84 +24,88 @@ type LatestKey struct {
}

func NewAuctioneer() *Auctioneer {
return &Auctioneer{
auctions: [3]*Auction{
{ // slot - 1
latestBlockByBuilder: make(map[LatestKey]*structs.CompleteBlockstruct),
maxProfit: make(map[types.Hash]*structs.CompleteBlockstruct),
},
{ // slot
latestBlockByBuilder: make(map[LatestKey]*structs.CompleteBlockstruct),
maxProfit: make(map[types.Hash]*structs.CompleteBlockstruct),
},
{ // slot + 1
latestBlockByBuilder: make(map[LatestKey]*structs.CompleteBlockstruct),
maxProfit: make(map[types.Hash]*structs.CompleteBlockstruct),
},
},
a := &Auctioneer{}
for i := 0; i < structs.NumberOfSlotsInState; i++ {
a.auctions[i] = &Auction{
Slot: uint64(i),
latestBlockByBuilder: make(map[LatestKey]structs.BuilderBidExtended),
maxProfit: make(map[types.Hash]structs.BuilderBidExtended),
}
}

return a

}

func (a *Auctioneer) AddBlock(block *structs.CompleteBlockstruct) bool {
auction := a.auctions[block.Header.Trace.Slot%3]
parent := block.Header.Header.GetParentHash()
func (a *Auctioneer) AddBlock(bid structs.BuilderBidExtended) bool {
auction := a.auctions[bid.Slot()%structs.NumberOfSlotsInState]
parent := bid.BuilderBid().Header().GetParentHash()
bbid := bid.BuilderBid()

auction.mu.Lock()
defer auction.mu.Unlock()

// always discard submissions lower than latest slot
if auction.Slot > block.Header.Trace.Slot {
if auction.Slot > bid.Slot() {
return false
}

// always set new value and bigger slot
if auction.Slot < block.Header.Trace.Slot {
a.auctions[block.Header.Trace.Slot%3] = &Auction{
Slot: block.Header.Trace.Slot,
latestBlockByBuilder: make(map[LatestKey]*structs.CompleteBlockstruct),
maxProfit: make(map[types.Hash]*structs.CompleteBlockstruct),
if auction.Slot < bid.Slot() {
a.auctions[bid.Slot()%structs.NumberOfSlotsInState] = &Auction{
Slot: bid.Slot(),
latestBlockByBuilder: make(map[LatestKey]structs.BuilderBidExtended),
maxProfit: make(map[types.Hash]structs.BuilderBidExtended),
}

auction.latestBlockByBuilder[LatestKey{ParentHash: parent, Pk: block.Header.Trace.BuilderPubkey}] = block
auction.maxProfit[parent] = block
auction.maxProfit[parent] = bid
auction.latestBlockByBuilder[LatestKey{ParentHash: parent, Pk: bbid.Pubkey()}] = bid
return true
}

auction.latestBlockByBuilder[LatestKey{ParentHash: parent, Pk: block.Header.Trace.BuilderPubkey}] = block

auction.latestBlockByBuilder[LatestKey{ParentHash: parent, Pk: bbid.Pubkey()}] = bid
mp, ok := auction.maxProfit[parent]
if !ok {
auction.maxProfit[parent] = block
auction.maxProfit[parent] = bid
return true
}

// accept bigger bid
if mp.Header.Trace.Value.Cmp(&block.Header.Trace.Value) <= 0 {
auction.maxProfit[parent] = block
maxBidValue := mp.BuilderBid().Value()
bidValue := bid.BuilderBid().Value()
if maxBidValue.Cmp(&bidValue) <= 0 {
auction.maxProfit[parent] = bid
return true
}

// reassign biggest for resubmission from the same builder with lower bid
if mp.Header.Trace.BuilderPubkey == block.Header.Trace.BuilderPubkey &&
mp.Header.Trace.Value.Cmp(&block.Header.Trace.Value) > 0 {
auction.maxProfit[parent] = block
if mp.BuilderBid().Pubkey() == bbid.Pubkey() &&
maxBidValue.Cmp(&bidValue) > 0 {
auction.maxProfit[parent] = bid
for _, b := range auction.latestBlockByBuilder {
if mp.Header.Trace.Slot == b.Header.Trace.Slot && // Only check the current slot
mp.Header.Trace.Value.Cmp(&b.Header.Trace.Value) <= 0 {
auction.maxProfit[parent] = b
if mp.Slot() == b.Slot() { // Only check the current slot
mp, ok := auction.maxProfit[parent]
if !ok {
continue
}
bidValue := b.BuilderBid().Value()
maxBidValue = mp.BuilderBid().Value()
if maxBidValue.Cmp(&bidValue) <= 0 {
auction.maxProfit[parent] = b
}
}
}
}

return block == mp
return bid == mp
}

func (a *Auctioneer) MaxProfitBlock(slot structs.Slot, parentHash types.Hash) (*structs.CompleteBlockstruct, bool) {
auction := a.auctions[slot%3]
func (a *Auctioneer) MaxProfitBlock(slot structs.Slot, parentHash types.Hash) (structs.BuilderBidExtended, bool) {
auction := a.auctions[slot%structs.NumberOfSlotsInState]

auction.mu.RLock()
defer auction.mu.RUnlock()

if auction.maxProfit != nil {
if mp, ok := auction.maxProfit[parentHash]; ok && structs.Slot(mp.Header.Trace.Slot) == slot {
if mp, ok := auction.maxProfit[parentHash]; ok && structs.Slot(mp.Slot()) == slot {
return mp, true
}
}
Expand Down
Loading

0 comments on commit 9cb272a

Please sign in to comment.