Skip to content

Commit

Permalink
[7/N][multi quorum ejection] Cleanup (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Mar 17, 2024
1 parent 1a59c94 commit a002ac2
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 297 deletions.
275 changes: 0 additions & 275 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"sort"
"strconv"
"sync"
"time"

"github.com/Layr-Labs/eigenda/core"
Expand All @@ -28,7 +27,6 @@ type (
QueryBatchNonSigningInfoInInterval(ctx context.Context, intervalSeconds int64) ([]*BatchNonSigningInfo, error)
QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error)
QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error)
QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx context.Context, blockTimestamp uint64, nonsigers map[string]int) (map[string]int, error)
}
Batch struct {
Id []byte
Expand Down Expand Up @@ -76,24 +74,6 @@ type (
IndexedDeregisteredOperatorState struct {
Operators map[core.OperatorID]*DeregisteredOperatorInfo
}
// OperatorInterval describes a time interval where the operator is live in
// EigenDA.
OperatorInterval struct {
OperatorId string

// The operator is live from start to end.
start uint64
// If the operator is still live now in EigenDA netowrk, end is set to 0.
end uint64
}
// OperatorEvents describes all the registration and deregistration events associated
// with an operator.
OperatorEvents struct {
// Timestamps of operator's registration, in ascending order.
RegistrationEvents []uint64
// Timestamps of operator's deregistration, in ascending order.
DeregistrationEvents []uint64
}
NonSigner struct {
OperatorId string
Count int
Expand Down Expand Up @@ -235,84 +215,6 @@ func (sc *subgraphClient) QueryOperatorQuorumEvent(ctx context.Context, startBlo
}, nil
}

func (sc *subgraphClient) QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx context.Context, blockTimestamp uint64, nonSigners map[string]int) (map[string]int, error) {
var (
registeredOperators []*subgraph.Operator
deregisteredOperators []*subgraph.Operator
err error
pool = workerpool.New(maxWorkerPoolSize)
)

pool.Submit(func() {
operators, errQ := sc.api.QueryRegisteredOperatorsGreaterThanBlockTimestamp(ctx, blockTimestamp)
if errQ != nil {
err = errQ
}
registeredOperators = operators
})

pool.Submit(func() {
operators, errQ := sc.api.QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx, blockTimestamp)
if errQ != nil {
err = errQ
}
deregisteredOperators = operators
})
pool.StopWait()

if err != nil {
return nil, err
}

intervalEvents, err := sc.getOperatorsWithRegisteredDeregisteredIntervalEvents(ctx, registeredOperators, deregisteredOperators, blockTimestamp, nonSigners)
if err != nil {
return nil, err
}

var (
mu sync.Mutex
numBatchesByOperator = make(map[string]int, 0)
intervalEventsPool = workerpool.New(maxWorkerPoolSize)
currentTs = uint64(time.Now().Unix())
)
type timeInterval struct {
start, end uint64
}
// Caching the number of batches in a time interval so we don't need to query
// subgraph repeatedly. In usual case, most operators will have no opt-in/opt-out
// events in recent time window, so all of them will just query the same time
// interval.
numBatchesCache := make(map[timeInterval]int)
for _, ie := range intervalEvents {
interval := ie
intervalEventsPool.Submit(func() {
end := interval.end
if end == 0 {
end = currentTs
}
timeRange := timeInterval{start: interval.start, end: end}
mu.Lock()
_, ok := numBatchesCache[timeRange]
mu.Unlock()
if !ok {
batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end)
if err != nil {
sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err)
return
}
mu.Lock()
numBatchesCache[timeRange] = len(batches)
mu.Unlock()
}
mu.Lock()
numBatchesByOperator[interval.OperatorId] += numBatchesCache[timeRange]
mu.Unlock()
})
}
intervalEventsPool.StopWait()
return numBatchesByOperator, nil
}

func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error) {
// Query all deregistered operators in the last N days.
lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix())
Expand Down Expand Up @@ -365,183 +267,6 @@ func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx con
}, nil
}

func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents(
ctx context.Context,
registeredOperators []*subgraph.Operator,
deregisteredOperators []*subgraph.Operator,
blockTimestamp uint64,
nonSigners map[string]int,
) ([]OperatorInterval, error) {
sort.SliceStable(registeredOperators, func(i, j int) bool {
return registeredOperators[i].BlockTimestamp < registeredOperators[j].BlockTimestamp
})

sort.SliceStable(deregisteredOperators, func(i, j int) bool {
return deregisteredOperators[i].BlockTimestamp < deregisteredOperators[j].BlockTimestamp
})

operators := make(map[string]OperatorEvents)
for operatorId := range nonSigners {
operators[operatorId] = OperatorEvents{
RegistrationEvents: []uint64{},
DeregistrationEvents: []uint64{},
}
}
for i := range registeredOperators {
reg := registeredOperators[i]
operatorId := string(reg.OperatorId)

// If the operator is not a nonsigner, skip it.
if _, ok := operators[operatorId]; !ok {
continue
}
timestamp, err := strconv.ParseUint(string(reg.BlockTimestamp), 10, 64)
if err != nil {
return nil, err
}
operator := operators[operatorId]
operator.RegistrationEvents = append(operator.RegistrationEvents, timestamp)
operators[operatorId] = operator
}

for i := range deregisteredOperators {
dereg := deregisteredOperators[i]
operatorId := string(dereg.OperatorId)

// If the operator is not a nonsigner, skip it.
if _, ok := operators[operatorId]; !ok {
continue
}
timestamp, err := strconv.ParseUint(string(dereg.BlockTimestamp), 10, 64)
if err != nil || timestamp == 0 {
return nil, err
}
operator := operators[operatorId]
operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp)
operators[operatorId] = operator
}

events, err := getOperatorInterval(ctx, operators, blockTimestamp, nonSigners)
if err != nil {
return nil, err
}

return events, nil
}

func getOperatorInterval(
ctx context.Context,
operators map[string]OperatorEvents,
blockTimestamp uint64,
nonSigners map[string]int,
) ([]OperatorInterval, error) {
currentTs := uint64(time.Now().Unix())
intervals := make([]OperatorInterval, 0)

// For the time window [blockTimestamp, now], compute the sub intervals during
// which the operator is live in EigenDA network for validating batches.
for operatorId := range nonSigners {
reg := operators[operatorId].RegistrationEvents
dereg := operators[operatorId].DeregistrationEvents

// In EigenDA, the registration and deregistration events on timeline for an
// operator will be like reg-dereg-reg-dereg...
//
// The reason is that registering an operator that's already registered will fail
// and deregistering an operator that's already deregistered will also fail. So
// the registeration and deregistration will alternate.
if len(reg)-len(dereg) > 1 || len(dereg)-len(reg) > 1 {
return nil, fmt.Errorf("The number of registration and deregistration events cannot differ by more than one, num registration events: %d, num deregistration events: %d, operatorId: %s", len(reg), len(dereg), operatorId)
}

// Note: if an operator registered at block A and then deregistered
// at block B, the range of liveness will be [A, B), i.e. the operator
// will not be responsible for signing at block B.
if len(reg) == 0 && len(dereg) == 0 {
// The operator has no registration/deregistration events: it's live
// for the entire time window.
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: currentTs,
})
} else if len(reg) == 0 {
// The operator has only deregistration event: it's live from the beginning
// of the time window until the deregistration.
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: dereg[0] - 1,
})
} else if len(dereg) == 0 {
// The operator has only registration event: it's live from registration to
// the end of the time window.
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[0],
end: currentTs,
})
} else {
// The operator has both registration and deregistration events in the time
// window.
if reg[0] < dereg[0] {
// The first event in the time window is registration. This means at
// the beginning (i.e. blockTimestamp) it's not live.
for i := 0; i < len(reg); i++ {
if i < len(dereg) {
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: dereg[i] - 1,
})
} else {
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: currentTs,
})
}
}
} else {
// The first event in the time window is deregistration. This means at
// the beginning (i.e. blockTimestamp) it's live already.
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: dereg[0] - 1,
})
for i := 0; i < len(reg); i++ {
if i+1 < len(dereg) {
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: dereg[i+1] - 1,
})
} else {
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: currentTs,
})
}
}
}
}
}

// Validate the registration and deregistration events are in timeline order.
for i := 0; i < len(intervals); i++ {
if intervals[i].start > intervals[i].end {
return nil, fmt.Errorf("Start timestamp should not be greater than end or current timestamp for operatorId %s, start timestamp: %d, end or current timestamp: %d", intervals[i].OperatorId, intervals[i].start, intervals[i].end)
}
if i > 0 && intervals[i-1].OperatorId == intervals[i].OperatorId && intervals[i-1].end > intervals[i].start {
return nil, fmt.Errorf("the operator live intervals should never overlap, but found two overlapping intervals [%d, %d] and [%d, %d] for operatorId %s", intervals[i-1].start, intervals[i-1].end, intervals[i].start, intervals[i].end, intervals[i].OperatorId)
}
}

return intervals, nil
}

func convertBatches(subgraphBatches []*subgraph.Batches) ([]*Batch, error) {
batches := make([]*Batch, len(subgraphBatches))
for i, batch := range subgraphBatches {
Expand Down
22 changes: 0 additions & 22 deletions disperser/dataapi/subgraph_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,6 @@ var (
},
}

nonSigners = map[string]int{
"0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311": 1,
"0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568310": 1,
"0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311": 1,
}

operatorAddedToQuorum = []*subgraph.OperatorQuorum{
{
Operator: "operator-2",
Expand Down Expand Up @@ -483,22 +477,6 @@ func TestQueryIndexedDeregisteredOperatorsForTimeWindow(t *testing.T) {
assert.Equal(t, uint64(22), uint64(operator.Metadata.BlockNumber))
}

func TestQueryNumBatchesByOperatorsInThePastBlockTimestamp(t *testing.T) {
mockSubgraphApi := &subgraphmock.MockSubgraphApi{}
mockSubgraphApi.On("QueryRegisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorRegistereds, nil)
mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil)
mockSubgraphApi.On("QueryBatchesByBlockTimestampRange").Return(subgraphBatches, nil)
subgraphClient := dataapi.NewSubgraphClient(mockSubgraphApi, logging.NewNoopLogger())
numBatchesByOperators, err := subgraphClient.QueryNumBatchesByOperatorsInThePastBlockTimestamp(context.Background(), uint64(1), nonSigners)
assert.NoError(t, err)

// We compute the num batches for each nonsigning operator.
assert.Equal(t, 3, len(numBatchesByOperators))

numBatches := numBatchesByOperators["0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568310"]
assert.Equal(t, 3, numBatches)
}

func TestQueryBatchNonSigningInfoInInterval(t *testing.T) {
mockSubgraphApi := &subgraphmock.MockSubgraphApi{}
mockSubgraphApi.On("QueryBatchNonSigningInfo").Return(batchNonSigningInfo, nil)
Expand Down

0 comments on commit a002ac2

Please sign in to comment.