From 8fbb86204ab19f8651e625575e5f0f94b13b568c Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Tue, 19 Mar 2024 16:30:30 -0700 Subject: [PATCH] Make sure the operator quorum events are all fetched (#361) --- disperser/dataapi/subgraph/api.go | 42 ++++++++++++---- disperser/dataapi/subgraph/queries.go | 14 +++--- disperser/dataapi/subgraph_client.go | 38 +++++++++++--- disperser/dataapi/subgraph_client_test.go | 61 ++++++++++++----------- 4 files changed, 105 insertions(+), 50 deletions(-) diff --git a/disperser/dataapi/subgraph/api.go b/disperser/dataapi/subgraph/api.go index 71f115d4c3..e838ea87be 100644 --- a/disperser/dataapi/subgraph/api.go +++ b/disperser/dataapi/subgraph/api.go @@ -219,12 +219,23 @@ func (a *api) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlo "blockNumber_gt": graphql.Int(startBlock - 1), "blockNumber_lt": graphql.Int(endBlock + 1), } - query := new(queryOperatorAddedToQuorum) - err := a.operatorStateGql.Query(ctx, &query, variables) - if err != nil { - return nil, err + skip := 0 + result := new(queryOperatorAddedToQuorum) + addedToQuorums := make([]*OperatorQuorum, 0) + for { + variables["first"] = graphql.Int(maxEntriesPerQuery) + variables["skip"] = graphql.Int(skip) + err := a.operatorStateGql.Query(ctx, &result, variables) + if err != nil { + return nil, err + } + if len(result.OperatorAddedToQuorum) == 0 { + break + } + addedToQuorums = append(addedToQuorums, result.OperatorAddedToQuorum...) + skip += maxEntriesPerQuery } - return query.OperatorAddedToQuorum, nil + return addedToQuorums, nil } // QueryOperatorRemovedFromQuorum finds operators' quorum opt-out history in range [startBlock, endBlock]. @@ -236,10 +247,21 @@ func (a *api) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, en "blockNumber_gt": graphql.Int(startBlock - 1), "blockNumber_lt": graphql.Int(endBlock + 1), } - query := new(queryOperatorRemovedFromQuorum) - err := a.operatorStateGql.Query(ctx, &query, variables) - if err != nil { - return nil, err + skip := 0 + result := new(queryOperatorRemovedFromQuorum) + removedFromQuorums := make([]*OperatorQuorum, 0) + for { + variables["first"] = graphql.Int(maxEntriesPerQuery) + variables["skip"] = graphql.Int(skip) + err := a.operatorStateGql.Query(ctx, &result, variables) + if err != nil { + return nil, err + } + if len(result.OperatorRemovedFromQuorum) == 0 { + break + } + removedFromQuorums = append(removedFromQuorums, result.OperatorRemovedFromQuorum...) + skip += maxEntriesPerQuery } - return query.OperatorRemovedFromQuorum, nil + return removedFromQuorums, nil } diff --git a/disperser/dataapi/subgraph/queries.go b/disperser/dataapi/subgraph/queries.go index 7cb830c275..2a04527c5d 100644 --- a/disperser/dataapi/subgraph/queries.go +++ b/disperser/dataapi/subgraph/queries.go @@ -30,10 +30,11 @@ type ( TransactionHash graphql.String } OperatorQuorum struct { - Id graphql.String - Operator graphql.String - QuorumNumbers graphql.String - BlockNumber graphql.String + Id graphql.String + Operator graphql.String + QuorumNumbers graphql.String + BlockNumber graphql.String + BlockTimestamp graphql.String } BatchNonSigningOperatorIds struct { NonSigning struct { @@ -54,6 +55,7 @@ type ( OperatorId graphql.String `graphql:"operatorId"` } `graphql:"nonSigners"` } `graphql:"nonSigning"` + BlockNumber graphql.String } SocketUpdates struct { Socket graphql.String @@ -102,9 +104,9 @@ type ( Operator IndexedOperatorInfo `graphql:"operator(id: $id)"` } queryOperatorAddedToQuorum struct { - OperatorAddedToQuorum []*OperatorQuorum `graphql:"operatorAddedToQuorums(orderBy: blockTimestamp, where: {and: [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"` + OperatorAddedToQuorum []*OperatorQuorum `graphql:"operatorAddedToQuorums(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"` } queryOperatorRemovedFromQuorum struct { - OperatorRemovedFromQuorum []*OperatorQuorum `graphql:"operatorRemovedFromQuorums(orderBy: blockTimestamp, where: {and: [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"` + OperatorRemovedFromQuorum []*OperatorQuorum `graphql:"operatorRemovedFromQuorums(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"` } ) diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index 4534d55fff..034d8ccd55 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -52,9 +52,10 @@ type ( TransactionHash []byte } OperatorQuorum struct { - Operator string - QuorumNumbers []byte - BlockNumber uint32 + Operator string + QuorumNumbers []byte + BlockNumber uint32 + BlockTimestamp uint64 } OperatorQuorumEvents struct { // AddedToQuorum is mapping from operator address to a list of sorted events @@ -79,6 +80,7 @@ type ( Count int } BatchNonSigningInfo struct { + BlockNumber uint32 QuorumNumbers []uint8 ReferenceBlockNumber uint32 // The operatorIds of nonsigners for the batch. @@ -358,10 +360,29 @@ func parseOperatorQuorum(operatorQuorum []*subgraph.OperatorQuorum) ([]*Operator if err != nil { return nil, err } + blockTimestamp, err := strconv.ParseUint(string(opq.BlockTimestamp), 10, 64) + if err != nil { + return nil, err + } + if len(opq.QuorumNumbers) < 2 || len(opq.QuorumNumbers)%2 != 0 { + return nil, fmt.Errorf("the QuorumNumbers is expected to start with 0x and have an even length, QuorumNumbers: %s", string(opq.QuorumNumbers)) + } + // The quorum numbers string starts with "0x", so we should skip it. + quorumStr := string(opq.QuorumNumbers)[2:] + quorumNumbers := make([]byte, 0) + for i := 0; i < len(quorumStr); i += 2 { + pair := quorumStr[i : i+2] + quorum, err := strconv.Atoi(pair) + if err != nil { + return nil, err + } + quorumNumbers = append(quorumNumbers, uint8(quorum)) + } parsed[i] = &OperatorQuorum{ - Operator: string(opq.Operator), - QuorumNumbers: []byte(opq.QuorumNumbers), - BlockNumber: uint32(blockNum), + Operator: string(opq.Operator), + QuorumNumbers: quorumNumbers, + BlockNumber: uint32(blockNum), + BlockTimestamp: blockTimestamp, } } // Sort the quorum events by ascending order of block number. @@ -387,12 +408,17 @@ func convertNonSigningInfo(infoGql *subgraph.BatchNonSigningInfo) (*BatchNonSign if err != nil { return nil, err } + confirmBlockNum, err := strconv.ParseUint(string(infoGql.BlockNumber), 10, 64) + if err != nil { + return nil, err + } nonSigners := make([]string, len(infoGql.NonSigning.NonSigners)) for i, nonSigner := range infoGql.NonSigning.NonSigners { nonSigners[i] = string(nonSigner.OperatorId) } return &BatchNonSigningInfo{ + BlockNumber: uint32(confirmBlockNum), QuorumNumbers: quorums, ReferenceBlockNumber: uint32(blockNum), NonSigners: nonSigners, diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 3c443d64dd..562880a4eb 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -121,36 +121,42 @@ var ( operatorAddedToQuorum = []*subgraph.OperatorQuorum{ { - Operator: "operator-2", - QuorumNumbers: "2", - BlockNumber: "82", + Operator: "operator-2", + QuorumNumbers: "0x02", + BlockNumber: "82", + BlockTimestamp: "1702666070", }, { - Operator: "operator-1", - QuorumNumbers: "2", - BlockNumber: "82", + Operator: "operator-1", + QuorumNumbers: "0x02", + BlockNumber: "82", + BlockTimestamp: "1702666070", }, { - Operator: "operator-1", - QuorumNumbers: "01", - BlockNumber: "80", + Operator: "operator-1", + QuorumNumbers: "0x01", + BlockNumber: "80", + BlockTimestamp: "1702666046", }, } operatorRemovedFromQuorum = []*subgraph.OperatorQuorum{ { - Operator: "operator-1", - QuorumNumbers: "0", - BlockNumber: "81", + Operator: "operator-1", + QuorumNumbers: "0x00", + BlockNumber: "81", + BlockTimestamp: "1702666058", }, { - Operator: "operator-2", - QuorumNumbers: "2", - BlockNumber: "83", + Operator: "operator-2", + QuorumNumbers: "0x02", + BlockNumber: "83", + BlockTimestamp: "1702666082", }, { - Operator: "operator-1", - QuorumNumbers: "1", - BlockNumber: "83", + Operator: "operator-1", + QuorumNumbers: "0x01", + BlockNumber: "83", + BlockTimestamp: "1702666082", }, } @@ -184,6 +190,7 @@ var ( }, }, }, + BlockNumber: "83", }, { BatchId: "0", @@ -211,6 +218,7 @@ var ( }, }, }, + BlockNumber: "82", }, } @@ -527,15 +535,12 @@ func TestQueryOperatorQuorumEvent(t *testing.T) { assert.Equal(t, 2, len(added1)) assert.Equal(t, "operator-1", added1[0].Operator) assert.Equal(t, uint32(80), added1[0].BlockNumber) - assert.Equal(t, 2, len(added1[0].QuorumNumbers)) - // Note: the quorumId is 48 not 01 is because the string "01" is in UTF-8 - // encoding (the default in golang), and it corresponding to 48 in decimal. - assert.Equal(t, uint8(48), added1[0].QuorumNumbers[0]) - assert.Equal(t, uint8(49), added1[0].QuorumNumbers[1]) + assert.Equal(t, 1, len(added1[0].QuorumNumbers)) + assert.Equal(t, uint8(1), added1[0].QuorumNumbers[0]) assert.Equal(t, "operator-1", added1[1].Operator) assert.Equal(t, uint32(82), added1[1].BlockNumber) assert.Equal(t, 1, len(added1[1].QuorumNumbers)) - assert.Equal(t, uint8(50), added1[1].QuorumNumbers[0]) + assert.Equal(t, uint8(2), added1[1].QuorumNumbers[0]) // Quorum events for operator-2. added2, ok := addedMap["operator-2"] assert.True(t, ok) @@ -543,7 +548,7 @@ func TestQueryOperatorQuorumEvent(t *testing.T) { assert.Equal(t, "operator-2", added2[0].Operator) assert.Equal(t, uint32(82), added2[0].BlockNumber) assert.Equal(t, 1, len(added2[0].QuorumNumbers)) - assert.Equal(t, uint8(50), added2[0].QuorumNumbers[0]) + assert.Equal(t, uint8(2), added2[0].QuorumNumbers[0]) removedMap := result.RemovedFromQuorum assert.Equal(t, 2, len(removedMap)) @@ -554,11 +559,11 @@ func TestQueryOperatorQuorumEvent(t *testing.T) { assert.Equal(t, "operator-1", removed1[0].Operator) assert.Equal(t, uint32(81), removed1[0].BlockNumber) assert.Equal(t, 1, len(removed1[0].QuorumNumbers)) - assert.Equal(t, uint8(48), removed1[0].QuorumNumbers[0]) + assert.Equal(t, uint8(0), removed1[0].QuorumNumbers[0]) assert.Equal(t, "operator-1", removed1[1].Operator) assert.Equal(t, uint32(83), removed1[1].BlockNumber) assert.Equal(t, 1, len(removed1[1].QuorumNumbers)) - assert.Equal(t, uint8(49), removed1[1].QuorumNumbers[0]) + assert.Equal(t, uint8(1), removed1[1].QuorumNumbers[0]) // Quorum events for operator-2. removed2, ok := removedMap["operator-2"] assert.True(t, ok) @@ -566,5 +571,5 @@ func TestQueryOperatorQuorumEvent(t *testing.T) { assert.Equal(t, "operator-2", removed2[0].Operator) assert.Equal(t, uint32(83), removed2[0].BlockNumber) assert.Equal(t, 1, len(removed2[0].QuorumNumbers)) - assert.Equal(t, uint8(50), removed2[0].QuorumNumbers[0]) + assert.Equal(t, uint8(2), removed2[0].QuorumNumbers[0]) }