Skip to content

Commit

Permalink
core: integrating go-eth2-client for for v3 endpoint (#2913)
Browse files Browse the repository at this point in the history
Integrated the forked version of go-eth2-client that supporting UniversalProposal interface.

The changes for go-eth2-client can be found here: attestantio/go-eth2-client#109

category: feature
ticket: #2749
  • Loading branch information
pinebit authored Mar 1, 2024
1 parent 5b3478d commit 9a9e065
Show file tree
Hide file tree
Showing 18 changed files with 584 additions and 44 deletions.
31 changes: 31 additions & 0 deletions app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions app/eth2wrap/genwrap/genwrap.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 122 additions & 11 deletions core/dutydb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ import (
// NewMemDB returns a new in-memory dutyDB instance.
func NewMemDB(deadliner core.Deadliner) *MemDB {
return &MemDB{
attDuties: make(map[attKey]*eth2p0.AttestationData),
attPubKeys: make(map[pkKey]core.PubKey),
attKeysBySlot: make(map[uint64][]pkKey),
builderProDuties: make(map[uint64]*eth2api.VersionedBlindedProposal),
proDuties: make(map[uint64]*eth2api.VersionedProposal),
aggDuties: make(map[aggKey]core.AggregatedAttestation),
aggKeysBySlot: make(map[uint64][]aggKey),
contribDuties: make(map[contribKey]*altair.SyncCommitteeContribution),
contribKeysBySlot: make(map[uint64][]contribKey),
shutdown: make(chan struct{}),
deadliner: deadliner,
attDuties: make(map[attKey]*eth2p0.AttestationData),
attPubKeys: make(map[pkKey]core.PubKey),
attKeysBySlot: make(map[uint64][]pkKey),
builderProDuties: make(map[uint64]*eth2api.VersionedBlindedProposal),
proDuties: make(map[uint64]*eth2api.VersionedProposal),
universalProDuties: make(map[uint64]*eth2api.VersionedUniversalProposal),
aggDuties: make(map[aggKey]core.AggregatedAttestation),
aggKeysBySlot: make(map[uint64][]aggKey),
contribDuties: make(map[contribKey]*altair.SyncCommitteeContribution),
contribKeysBySlot: make(map[uint64][]contribKey),
shutdown: make(chan struct{}),
deadliner: deadliner,
}
}

Expand All @@ -51,6 +52,10 @@ type MemDB struct {
proDuties map[uint64]*eth2api.VersionedProposal
proQueries []proQuery

// DutyUniversalProposer
universalProDuties map[uint64]*eth2api.VersionedUniversalProposal
universalProQueries []universalProQuery

// DutyAggregator
aggDuties map[aggKey]core.AggregatedAttestation
aggKeysBySlot map[uint64][]aggKey
Expand Down Expand Up @@ -105,6 +110,18 @@ func (db *MemDB) Store(_ context.Context, duty core.Duty, unsignedSet core.Unsig
}
}
db.resolveBuilderProQueriesUnsafe()
case core.DutyUniversalProposer:
// Sanity check max one universal proposer per slot
if len(unsignedSet) > 1 {
return errors.New("unexpected universal proposer data set length", z.Int("n", len(unsignedSet)))
}
for _, unsignedData := range unsignedSet {
err := db.storeUniversalProposalUnsafe(unsignedData)
if err != nil {
return err
}
}
db.resolveUniversalProQueriesUnsafe()
case core.DutyAttester:
for pubkey, unsignedData := range unsignedSet {
err := db.storeAttestationUnsafe(pubkey, unsignedData)
Expand Down Expand Up @@ -204,6 +221,31 @@ func (db *MemDB) AwaitBlindedProposal(ctx context.Context, slot uint64) (*eth2ap
}
}

// AwaitUniversalProposal implements core.DutyDB, see its godoc.
func (db *MemDB) AwaitUniversalProposal(ctx context.Context, slot uint64) (*eth2api.VersionedUniversalProposal, error) {
cancel := make(chan struct{})
defer close(cancel)
response := make(chan *eth2api.VersionedUniversalProposal, 1)

db.mu.Lock()
db.universalProQueries = append(db.universalProQueries, universalProQuery{
Key: slot,
Response: response,
Cancel: cancel,
})
db.resolveUniversalProQueriesUnsafe()
db.mu.Unlock()

select {
case <-db.shutdown:
return nil, errors.New("dutydb shutdown")
case <-ctx.Done():
return nil, ctx.Err()
case block := <-response:
return block, nil
}
}

// AwaitAttestation implements core.DutyDB, see its godoc.
func (db *MemDB) AwaitAttestation(ctx context.Context, slot uint64, commIdx uint64) (*eth2p0.AttestationData, error) {
cancel := make(chan struct{})
Expand Down Expand Up @@ -489,6 +531,45 @@ func (db *MemDB) storeProposalUnsafe(unsignedData core.UnsignedData) error {
return nil
}

// storeUniversalProposalUnsafe stores the unsigned UniversalProposal.
// It is unsafe since it assumes the lock is held.
func (db *MemDB) storeUniversalProposalUnsafe(unsignedData core.UnsignedData) error {
cloned, err := unsignedData.Clone() // Clone before storing.
if err != nil {
return err
}

proposal, ok := cloned.(core.VersionedUniversalProposal)
if !ok {
return errors.New("invalid versioned universal proposal")
}

slot, err := proposal.Slot()
if err != nil {
return err
}

if existing, ok := db.universalProDuties[uint64(slot)]; ok {
existingRoot, err := existing.Root()
if err != nil {
return errors.Wrap(err, "universal proposal root")
}

providedRoot, err := proposal.Root()
if err != nil {
return errors.Wrap(err, "universal proposal root")
}

if existingRoot != providedRoot {
return errors.New("clashing blocks")
}
} else {
db.universalProDuties[uint64(slot)] = &proposal.VersionedUniversalProposal
}

return nil
}

// storeBlindedBeaconBlockUnsafe stores the unsigned BlindedBeaconBlock. It is unsafe since it assumes the lock is held.
func (db *MemDB) storeBlindedBeaconBlockUnsafe(unsignedData core.UnsignedData) error {
cloned, err := unsignedData.Clone() // Clone before storing.
Expand Down Expand Up @@ -569,6 +650,27 @@ func (db *MemDB) resolveProQueriesUnsafe() {
db.proQueries = unresolved
}

// resolveUniversalProQueriesUnsafe resolve any universalProQuery to a result if found.
// It is unsafe since it assume that the lock is held.
func (db *MemDB) resolveUniversalProQueriesUnsafe() {
var unresolved []universalProQuery
for _, query := range db.universalProQueries {
if cancelled(query.Cancel) {
continue // Drop cancelled queries.
}

value, ok := db.universalProDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
continue
}

query.Response <- value
}

db.universalProQueries = unresolved
}

// resolveAggQueriesUnsafe resolve any aggQuery to a result if found.
// It is unsafe since it assume that the lock is held.
func (db *MemDB) resolveAggQueriesUnsafe() {
Expand Down Expand Up @@ -639,6 +741,8 @@ func (db *MemDB) deleteDutyUnsafe(duty core.Duty) error {
delete(db.proDuties, duty.Slot)
case core.DutyBuilderProposer:
delete(db.builderProDuties, duty.Slot)
case core.DutyUniversalProposer:
delete(db.universalProDuties, duty.Slot)
case core.DutyAttester:
for _, key := range db.attKeysBySlot[duty.Slot] {
delete(db.attPubKeys, key)
Expand Down Expand Up @@ -702,6 +806,13 @@ type proQuery struct {
Cancel <-chan struct{}
}

// universalProQuery is a waiting universalProQuery with a response channel.
type universalProQuery struct {
Key uint64
Response chan<- *eth2api.VersionedUniversalProposal
Cancel <-chan struct{}
}

// aggQuery is a waiting aggQuery with a response channel.
type aggQuery struct {
Key aggKey
Expand Down
5 changes: 5 additions & 0 deletions core/dutydb/memory_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func TestCancelledQueries(t *testing.T) {
_, err = db.AwaitBlindedProposal(ctx, slot)
require.ErrorContains(t, err, "shutdown")

_, err = db.AwaitUniversalProposal(ctx, slot)
require.ErrorContains(t, err, "shutdown")

_, err = db.AwaitSyncContribution(ctx, slot, 0, eth2p0.Root{})
require.ErrorContains(t, err, "shutdown")

Expand All @@ -42,13 +45,15 @@ func TestCancelledQueries(t *testing.T) {
require.NotEmpty(t, db.proQueries)
require.NotEmpty(t, db.aggQueries)
require.NotEmpty(t, db.builderProQueries)
require.NotEmpty(t, db.universalProQueries)

// Resolve queries
db.resolveAggQueriesUnsafe()
db.resolveAttQueriesUnsafe()
db.resolveContribQueriesUnsafe()
db.resolveProQueriesUnsafe()
db.resolveBuilderProQueriesUnsafe()
db.resolveUniversalProQueriesUnsafe()

// Ensure all queries are gone.
require.Empty(t, db.contribQueries)
Expand Down
Loading

0 comments on commit 9a9e065

Please sign in to comment.