Skip to content

Commit

Permalink
Fix the operator nonsigning rate
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Feb 22, 2024
1 parent 8d5eb99 commit 6f9af75
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 25 deletions.
21 changes: 17 additions & 4 deletions disperser/dataapi/subgraph/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,26 @@ func (a *api) QueryBatchesByBlockTimestampRange(ctx context.Context, start, end
"blockTimestamp_gte": graphql.Int(start),
"blockTimestamp_lte": graphql.Int(end),
}
skip := 0
query := new(queryBatchesByBlockTimestampRange)
err := a.uiMonitoringGgl.Query(ctx, query, variables)
if err != nil {
return nil, err
result := make([]*Batches, 0)
for {
variables["first"] = graphql.Int(maxEntriesPerQuery)
variables["skip"] = graphql.Int(skip)

err := a.uiMonitoringGgl.Query(ctx, &query, variables)
if err != nil {
return nil, err
}

if len(query.Batches) == 0 {
break
}
result = append(result, query.Batches...)
skip += maxEntriesPerQuery
}

return query.Batches, nil
return result, nil
}

func (a *api) QueryOperators(ctx context.Context, first int) ([]*Operator, error) {
Expand Down
2 changes: 1 addition & 1 deletion disperser/dataapi/subgraph/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
Batches []*Batches `graphql:"batches(orderDirection: $orderDirection, orderBy: $orderBy, first: $first, skip: $skip)"`
}
queryBatchesByBlockTimestampRange struct {
Batches []*Batches `graphql:"batches(first: $first, orderBy: blockTimestamp, where: {and: [{ blockTimestamp_gte: $blockTimestamp_gte}, {blockTimestamp_lte: $blockTimestamp_lte}]})"`
Batches []*Batches `graphql:"batches(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{ blockTimestamp_gte: $blockTimestamp_gte}, {blockTimestamp_lte: $blockTimestamp_lte}]})"`
}
queryOperatorRegistereds struct {
OperatorRegistereds []*Operator `graphql:"operatorRegistereds(first: $first)"`
Expand Down
59 changes: 39 additions & 20 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,38 @@ func (sc *subgraphClient) QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx
intervalEventsPool = workerpool.New(maxWorkerPoolSize)
currentTs = uint64(time.Now().Unix())
)
type timeInterval struct {
start, end uint64
}
// Caching the number of batches in a time interval so we don't need to query
// subgraph repeatedly. In usual case, most operators will have no opt-in/opt-out
// events in recent time window, so all of them will just query the same time
// interval.
numBatchesCache := make(map[timeInterval]int)
for _, ie := range intervalEvents {
interval := ie
intervalEventsPool.Submit(func() {
end := interval.end
if end == 0 {
end = currentTs
}
batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end)
if err != nil {
sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err)
return
}
if len(batches) > 0 {
timeRange := timeInterval{start: interval.start, end: end}
mu.Lock()
_, ok := numBatchesCache[timeRange]
mu.Unlock()
if !ok {
batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end)
if err != nil {
sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err)
return
}
mu.Lock()
numBatchesByOperator[interval.OperatorId] += len(batches)
numBatchesCache[timeRange] = len(batches)
mu.Unlock()
}
mu.Lock()
numBatchesByOperator[interval.OperatorId] += numBatchesCache[timeRange]
mu.Unlock()
})
}
intervalEventsPool.StopWait()
Expand Down Expand Up @@ -263,17 +278,15 @@ func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents(
reg := registeredOperators[i]
operatorId := string(reg.OperatorId)

// If the operator is not a nonsigner, skip it.
if _, ok := operators[operatorId]; !ok {
operators[operatorId] = OperatorEvents{
RegistrationEvents: []uint64{},
DeregistrationEvents: []uint64{},
}
continue
}
operator := operators[operatorId]
timestamp, err := strconv.ParseUint(string(reg.BlockTimestamp), 10, 64)
if err != nil {
return nil, err
}
operator := operators[operatorId]
operator.RegistrationEvents = append(operator.RegistrationEvents, timestamp)
operators[operatorId] = operator
}
Expand All @@ -282,15 +295,17 @@ func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents(
dereg := deregisteredOperators[i]
operatorId := string(dereg.OperatorId)

// If the operator is not a nonsigner, skip it.
if _, ok := operators[operatorId]; !ok {
continue
}
timestamp, err := strconv.ParseUint(string(dereg.BlockTimestamp), 10, 64)
if err != nil || timestamp == 0 {
return nil, err
}
operator := operators[operatorId]
if _, ok := operators[operatorId]; ok {
operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp)
operators[operatorId] = operator
}
operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp)
operators[operatorId] = operator
}

events, err := getOperatorInterval(ctx, operators, blockTimestamp, nonSigners)
Expand Down Expand Up @@ -326,6 +341,10 @@ func getOperatorInterval(
return nil, fmt.Errorf("The number of registration and deregistration events cannot differ by more than one, num registration events: %d, num deregistration events: %d, operatorId: %s", len(reg), len(dereg), operatorId)
}

// Note: if an operator registered at block A and then deregistered
// at block B, the range of liveness will be [A, B), i.e. the operator
// will not be responsible for signing at block B.

if len(reg) == 0 && len(dereg) == 0 {
// The operator has no registration/deregistration events: it's live
// for the entire time window.
Expand All @@ -340,7 +359,7 @@ func getOperatorInterval(
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: dereg[0],
end: dereg[0] - 1,
})
} else if len(dereg) == 0 {
// The operator has only registration event: it's live from registration to
Expand All @@ -361,7 +380,7 @@ func getOperatorInterval(
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: dereg[i],
end: dereg[i] - 1,
})
} else {
intervals = append(intervals, OperatorInterval{
Expand All @@ -377,14 +396,14 @@ func getOperatorInterval(
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: dereg[0],
end: dereg[0] - 1,
})
for i := 0; i < len(reg); i++ {
if i+1 < len(dereg) {
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: dereg[i+1],
end: dereg[i+1] - 1,
})
} else {
intervals = append(intervals, OperatorInterval{
Expand Down

0 comments on commit 6f9af75

Please sign in to comment.