diff --git a/disperser/dataapi/subgraph/api.go b/disperser/dataapi/subgraph/api.go index 8552d106e7..835ec91ddb 100644 --- a/disperser/dataapi/subgraph/api.go +++ b/disperser/dataapi/subgraph/api.go @@ -74,13 +74,26 @@ func (a *api) QueryBatchesByBlockTimestampRange(ctx context.Context, start, end "blockTimestamp_gte": graphql.Int(start), "blockTimestamp_lte": graphql.Int(end), } + skip := 0 query := new(queryBatchesByBlockTimestampRange) - err := a.uiMonitoringGgl.Query(ctx, query, variables) - if err != nil { - return nil, err + result := make([]*Batches, 0) + for { + variables["first"] = graphql.Int(maxEntriesPerQuery) + variables["skip"] = graphql.Int(skip) + + err := a.uiMonitoringGgl.Query(ctx, &query, variables) + if err != nil { + return nil, err + } + + if len(query.Batches) == 0 { + break + } + result = append(result, query.Batches...) + skip += maxEntriesPerQuery } - return query.Batches, nil + return result, nil } func (a *api) QueryOperators(ctx context.Context, first int) ([]*Operator, error) { diff --git a/disperser/dataapi/subgraph/queries.go b/disperser/dataapi/subgraph/queries.go index 21a49e07d9..f4c3ae547f 100644 --- a/disperser/dataapi/subgraph/queries.go +++ b/disperser/dataapi/subgraph/queries.go @@ -62,7 +62,7 @@ type ( Batches []*Batches `graphql:"batches(orderDirection: $orderDirection, orderBy: $orderBy, first: $first, skip: $skip)"` } queryBatchesByBlockTimestampRange struct { - Batches []*Batches `graphql:"batches(first: $first, orderBy: blockTimestamp, where: {and: [{ blockTimestamp_gte: $blockTimestamp_gte}, {blockTimestamp_lte: $blockTimestamp_lte}]})"` + Batches []*Batches `graphql:"batches(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{ blockTimestamp_gte: $blockTimestamp_gte}, {blockTimestamp_lte: $blockTimestamp_lte}]})"` } queryOperatorRegistereds struct { OperatorRegistereds []*Operator `graphql:"operatorRegistereds(first: $first)"` diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index 8c0ad4439f..7bf5555624 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -174,6 +174,14 @@ func (sc *subgraphClient) QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx intervalEventsPool = workerpool.New(maxWorkerPoolSize) currentTs = uint64(time.Now().Unix()) ) + type timeInterval struct { + start, end uint64 + } + // Caching the number of batches in a time interval so we don't need to query + // subgraph repeatedly. In usual case, most operators will have no opt-in/opt-out + // events in recent time window, so all of them will just query the same time + // interval. + numBatchesCache := make(map[timeInterval]int) for _, ie := range intervalEvents { interval := ie intervalEventsPool.Submit(func() { @@ -181,16 +189,23 @@ func (sc *subgraphClient) QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx if end == 0 { end = currentTs } - batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end) - if err != nil { - sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err) - return - } - if len(batches) > 0 { + timeRange := timeInterval{start: interval.start, end: end} + mu.Lock() + _, ok := numBatchesCache[timeRange] + mu.Unlock() + if !ok { + batches, err := sc.api.QueryBatchesByBlockTimestampRange(ctx, interval.start, end) + if err != nil { + sc.logger.Error("failed to query batches by block timestamp range", "start", interval.start, "end", end, "err", err) + return + } mu.Lock() - numBatchesByOperator[interval.OperatorId] += len(batches) + numBatchesCache[timeRange] = len(batches) mu.Unlock() } + mu.Lock() + numBatchesByOperator[interval.OperatorId] += numBatchesCache[timeRange] + mu.Unlock() }) } intervalEventsPool.StopWait() @@ -263,17 +278,15 @@ func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents( reg := registeredOperators[i] operatorId := string(reg.OperatorId) + // If the operator is not a nonsigner, skip it. if _, ok := operators[operatorId]; !ok { - operators[operatorId] = OperatorEvents{ - RegistrationEvents: []uint64{}, - DeregistrationEvents: []uint64{}, - } + continue } - operator := operators[operatorId] timestamp, err := strconv.ParseUint(string(reg.BlockTimestamp), 10, 64) if err != nil { return nil, err } + operator := operators[operatorId] operator.RegistrationEvents = append(operator.RegistrationEvents, timestamp) operators[operatorId] = operator } @@ -282,15 +295,17 @@ func (sc *subgraphClient) getOperatorsWithRegisteredDeregisteredIntervalEvents( dereg := deregisteredOperators[i] operatorId := string(dereg.OperatorId) + // If the operator is not a nonsigner, skip it. + if _, ok := operators[operatorId]; !ok { + continue + } timestamp, err := strconv.ParseUint(string(dereg.BlockTimestamp), 10, 64) if err != nil || timestamp == 0 { return nil, err } operator := operators[operatorId] - if _, ok := operators[operatorId]; ok { - operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp) - operators[operatorId] = operator - } + operator.DeregistrationEvents = append(operator.DeregistrationEvents, timestamp) + operators[operatorId] = operator } events, err := getOperatorInterval(ctx, operators, blockTimestamp, nonSigners) @@ -326,6 +341,10 @@ func getOperatorInterval( return nil, fmt.Errorf("The number of registration and deregistration events cannot differ by more than one, num registration events: %d, num deregistration events: %d, operatorId: %s", len(reg), len(dereg), operatorId) } + // Note: if an operator registered at block A and then deregistered + // at block B, the range of liveness will be [A, B), i.e. the operator + // will not be responsible for signing at block B. + if len(reg) == 0 && len(dereg) == 0 { // The operator has no registration/deregistration events: it's live // for the entire time window. @@ -340,7 +359,7 @@ func getOperatorInterval( intervals = append(intervals, OperatorInterval{ OperatorId: operatorId, start: blockTimestamp, - end: dereg[0], + end: dereg[0] - 1, }) } else if len(dereg) == 0 { // The operator has only registration event: it's live from registration to @@ -361,7 +380,7 @@ func getOperatorInterval( intervals = append(intervals, OperatorInterval{ OperatorId: operatorId, start: reg[i], - end: dereg[i], + end: dereg[i] - 1, }) } else { intervals = append(intervals, OperatorInterval{ @@ -377,14 +396,14 @@ func getOperatorInterval( intervals = append(intervals, OperatorInterval{ OperatorId: operatorId, start: blockTimestamp, - end: dereg[0], + end: dereg[0] - 1, }) for i := 0; i < len(reg); i++ { if i+1 < len(dereg) { intervals = append(intervals, OperatorInterval{ OperatorId: operatorId, start: reg[i], - end: dereg[i+1], + end: dereg[i+1] - 1, }) } else { intervals = append(intervals, OperatorInterval{