Skip to content

Commit

Permalink
Fix the operator nonsigning rate (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Feb 26, 2024
1 parent 8859b3b commit b368acd
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 30 deletions.
31 changes: 22 additions & 9 deletions disperser/dataapi/subgraph/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type (
}

api struct {
uiMonitoringGgl *graphql.Client
uiMonitoringGql *graphql.Client
operatorStateGql *graphql.Client
}
)
Expand All @@ -38,10 +38,10 @@ var _ Api = (*api)(nil)

func NewApi(uiMonitoringSocketAddr string, operatorStateSocketAddr string) *api {
once.Do(func() {
uiMonitoringGgl := graphql.NewClient(uiMonitoringSocketAddr, nil)
uiMonitoringGql := graphql.NewClient(uiMonitoringSocketAddr, nil)
operatorStateGql := graphql.NewClient(operatorStateSocketAddr, nil)
instance = &api{
uiMonitoringGgl: uiMonitoringGgl,
uiMonitoringGql: uiMonitoringGql,
operatorStateGql: operatorStateGql,
}
})
Expand All @@ -60,7 +60,7 @@ func (a *api) QueryBatches(ctx context.Context, descending bool, orderByField st
"skip": graphql.Int(skip),
}
result := new(queryBatches)
err := a.uiMonitoringGgl.Query(ctx, result, variables)
err := a.uiMonitoringGql.Query(ctx, result, variables)
if err != nil {
return nil, err
}
Expand All @@ -74,13 +74,26 @@ func (a *api) QueryBatchesByBlockTimestampRange(ctx context.Context, start, end
"blockTimestamp_gte": graphql.Int(start),
"blockTimestamp_lte": graphql.Int(end),
}
skip := 0
query := new(queryBatchesByBlockTimestampRange)
err := a.uiMonitoringGgl.Query(ctx, query, variables)
if err != nil {
return nil, err
result := make([]*Batches, 0)
for {
variables["first"] = graphql.Int(maxEntriesPerQuery)
variables["skip"] = graphql.Int(skip)

err := a.uiMonitoringGql.Query(ctx, &query, variables)
if err != nil {
return nil, err
}

if len(query.Batches) == 0 {
break
}
result = append(result, query.Batches...)
skip += maxEntriesPerQuery
}

return query.Batches, nil
return result, nil
}

func (a *api) QueryOperators(ctx context.Context, first int) ([]*Operator, error) {
Expand Down Expand Up @@ -109,7 +122,7 @@ func (a *api) QueryBatchNonSigningOperatorIdsInInterval(ctx context.Context, int
variables["first"] = graphql.Int(maxEntriesPerQuery)
variables["skip"] = graphql.Int(skip)

err := a.uiMonitoringGgl.Query(ctx, &result, variables)
err := a.uiMonitoringGql.Query(ctx, &result, variables)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/dataapi/subgraph/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
Batches []*Batches `graphql:"batches(orderDirection: $orderDirection, orderBy: $orderBy, first: $first, skip: $skip)"`
}
queryBatchesByBlockTimestampRange struct {
Batches []*Batches `graphql:"batches(first: $first, orderBy: blockTimestamp, where: {and: [{ blockTimestamp_gte: $blockTimestamp_gte}, {blockTimestamp_lte: $blockTimestamp_lte}]})"`
Batches []*Batches `graphql:"batches(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{ blockTimestamp_gte: $blockTimestamp_gte}, {blockTimestamp_lte: $blockTimestamp_lte}]})"`
}
queryOperatorRegistereds struct {
OperatorRegistereds []*Operator `graphql:"operatorRegistereds(first: $first)"`
Expand Down
58 changes: 38 additions & 20 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,38 @@ func (sc *subgraphClient) QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx
intervalEventsPool = workerpool.New(maxWorkerPoolSize)
currentTs = uint64(time.Now().Unix())
)
type timeInterval struct {
start, end uint64
}
// Caching the number of batches in a time interval so we don't need to query
// subgraph repeatedly. In usual case, most operators will have no opt-in/opt-out
// events in recent time window, so all of them will just query the same time
// interval.
numBatchesCache := make(map[timeInterval]int)
for _, ie := range intervalEvents {
interval := ie
intervalEventsPool.Submit(func() {
end := interval.end
if end == 0 {
end = currentTs
}
batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end)
if err != nil {
sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err)
return
}
if len(batches) > 0 {
timeRange := timeInterval{start: interval.start, end: end}
mu.Lock()
_, ok := numBatchesCache[timeRange]
mu.Unlock()
if !ok {
batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end)
if err != nil {
sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err)
return
}
mu.Lock()
numBatchesByOperator[interval.OperatorId] += len(batches)
numBatchesCache[timeRange] = len(batches)
mu.Unlock()
}
mu.Lock()
numBatchesByOperator[interval.OperatorId] += numBatchesCache[timeRange]
mu.Unlock()
})
}
intervalEventsPool.StopWait()
Expand Down Expand Up @@ -263,17 +278,15 @@ func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents(
reg := registeredOperators[i]
operatorId := string(reg.OperatorId)

// If the operator is not a nonsigner, skip it.
if _, ok := operators[operatorId]; !ok {
operators[operatorId] = OperatorEvents{
RegistrationEvents: []uint64{},
DeregistrationEvents: []uint64{},
}
continue
}
operator := operators[operatorId]
timestamp, err := strconv.ParseUint(string(reg.BlockTimestamp), 10, 64)
if err != nil {
return nil, err
}
operator := operators[operatorId]
operator.RegistrationEvents = append(operator.RegistrationEvents, timestamp)
operators[operatorId] = operator
}
Expand All @@ -282,15 +295,17 @@ func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents(
dereg := deregisteredOperators[i]
operatorId := string(dereg.OperatorId)

// If the operator is not a nonsigner, skip it.
if _, ok := operators[operatorId]; !ok {
continue
}
timestamp, err := strconv.ParseUint(string(dereg.BlockTimestamp), 10, 64)
if err != nil || timestamp == 0 {
return nil, err
}
operator := operators[operatorId]
if _, ok := operators[operatorId]; ok {
operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp)
operators[operatorId] = operator
}
operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp)
operators[operatorId] = operator
}

events, err := getOperatorInterval(ctx, operators, blockTimestamp, nonSigners)
Expand Down Expand Up @@ -326,6 +341,9 @@ func getOperatorInterval(
return nil, fmt.Errorf("The number of registration and deregistration events cannot differ by more than one, num registration events: %d, num deregistration events: %d, operatorId: %s", len(reg), len(dereg), operatorId)
}

// Note: if an operator registered at block A and then deregistered
// at block B, the range of liveness will be [A, B), i.e. the operator
// will not be responsible for signing at block B.
if len(reg) == 0 && len(dereg) == 0 {
// The operator has no registration/deregistration events: it's live
// for the entire time window.
Expand All @@ -340,7 +358,7 @@ func getOperatorInterval(
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: dereg[0],
end: dereg[0] - 1,
})
} else if len(dereg) == 0 {
// The operator has only registration event: it's live from registration to
Expand All @@ -361,7 +379,7 @@ func getOperatorInterval(
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: dereg[i],
end: dereg[i] - 1,
})
} else {
intervals = append(intervals, OperatorInterval{
Expand All @@ -377,14 +395,14 @@ func getOperatorInterval(
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: blockTimestamp,
end: dereg[0],
end: dereg[0] - 1,
})
for i := 0; i < len(reg); i++ {
if i+1 < len(dereg) {
intervals = append(intervals, OperatorInterval{
OperatorId: operatorId,
start: reg[i],
end: dereg[i+1],
end: dereg[i+1] - 1,
})
} else {
intervals = append(intervals, OperatorInterval{
Expand Down

0 comments on commit b368acd

Please sign in to comment.