diff --git a/internal/pkg/gateway/api.go b/internal/pkg/gateway/api.go index d9343c680c4..6b49902b3c9 100644 --- a/internal/pkg/gateway/api.go +++ b/internal/pkg/gateway/api.go @@ -12,13 +12,13 @@ import ( "io" "math/rand" "strings" - "sync" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" gp "github.com/hyperledger/fabric-protos-go/gateway" ab "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/core/chaincode" @@ -77,7 +77,7 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) defer cancel() pr, err := endorser.client.ProcessProposal(ctx, signedProposal) - code, message, retry, remove := gs.responseStatus(pr, err) + code, message, retry, remove := responseStatus(pr, err) if code == codes.OK { response = pr.Response // Prefer result from proposal response as Response.Payload is not required to be transaction result @@ -157,11 +157,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. chaincodeID := spec.GetChaincodeSpec().GetChaincodeId().GetName() hasTransientData := len(payload.GetTransientMap()) > 0 - defaultInterest := &peer.ChaincodeInterest{ - Chaincodes: []*peer.ChaincodeCall{{ - Name: chaincodeID, - }}, - } + logger := gs.logger.With("channel", channel, "chaincode", chaincodeID, "txID", request.TransactionId) var plan *plan var action *peer.ChaincodeEndorsedAction @@ -173,177 +169,50 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. } } else { // The client is delegating choice of endorsers to the gateway. - - // 1. Choose an endorser from the gateway's organization - plan, err = gs.registry.planForOrgs(channel, chaincodeID, []string{gs.registry.localEndorser.mspid}) + plan, err = gs.planFromFirstEndorser(ctx, channel, chaincodeID, hasTransientData, signedProposal, logger) if err != nil { - // No local org endorsers for this channel/chaincode. If transient data is involved, return error - if hasTransientData { - return nil, status.Error(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data") - } - // Otherwise, just let discovery pick one. - plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil) - if err != nil { - return nil, status.Error(codes.FailedPrecondition, err.Error()) - } - } - firstEndorser := plan.endorsers()[0] - - gs.logger.Debugw("Sending to first endorser:", "channel", channel, "chaincode", chaincodeID, "txID", request.TransactionId, "MSPID", firstEndorser.mspid, "endpoint", firstEndorser.address) - - // 2. Process the proposal on this endorser - var firstResponse *peer.ProposalResponse - var errDetails []proto.Message - - for firstResponse == nil && firstEndorser != nil { - done := make(chan struct{}) - go func() { - defer close(done) - ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) - defer cancel() - firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal) - code, message, _, remove := gs.responseStatus(firstResponse, err) - - if code != codes.OK { - logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", message) - errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message)) - if remove { - gs.registry.removeEndorser(firstEndorser) - } - firstEndorser = plan.nextPeerInGroup(firstEndorser) - firstResponse = nil - } - }() - select { - case <-done: - // Endorser completed normally - case <-ctx.Done(): - // Overall endorsement timeout expired - logger.Warnw("Endorse call timed out while collecting first endorsement", "channel", request.ChannelId, "txID", request.TransactionId) - return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting first endorsement") - } - } - if firstEndorser == nil || firstResponse == nil { - return nil, newRpcError(codes.Aborted, "failed to endorse transaction, see attached details for more info", errDetails...) - } - - // 3. Extract ChaincodeInterest and SBE policies - // The chaincode interest could be nil for legacy peers and for chaincode functions that don't produce a read-write set - interest := firstResponse.Interest - if len(interest.GetChaincodes()) == 0 { - interest = defaultInterest - } - - // 4. If transient data is involved, then we need to ensure that discovery only returns orgs which own the collections involved. - // Do this by setting NoPrivateReads to false on each collection - if hasTransientData { - for _, call := range interest.GetChaincodes() { - call.NoPrivateReads = false - } - } - - // 5. Get a set of endorsers from discovery via the registry - // The preferred discovery layout will contain the firstEndorser's Org. - plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser) - if err != nil { - return nil, status.Error(codes.FailedPrecondition, err.Error()) - } - - // 6. Remove the gateway org's endorser, since we've already done that - action, err = plan.processEndorsement(firstEndorser, firstResponse) - if err != nil { - return nil, status.Error(codes.Aborted, err.Error()) + return nil, err } } - var errorDetails []proto.Message - for action == nil { + for plan.completedLayout == nil { // loop through the layouts until one gets satisfied endorsers := plan.endorsers() if endorsers == nil { // no more layouts break } - var wg sync.WaitGroup - responseCh := make(chan *endorserResponse, plan.size) // send to all the endorsers + waitCh := make(chan bool, len(endorsers)) for _, e := range endorsers { - wg.Add(1) go func(e *endorser) { - defer wg.Done() for e != nil { - done := make(chan *endorserResponse) - go func() { - defer close(done) - gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "txID", request.TransactionId, "MSPID", e.mspid, "endpoint", e.address) - ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) // timeout of individual endorsement - defer cancel() - response, err := e.client.ProcessProposal(ctx, signedProposal) - // Ignore the retry flag returned by the following responseStatus call. Endorse will retry until all endorsement layouts have been exhausted. - // It tries to get a successful endorsement from each org and minimise the changes of a rogue peer scuppering the transaction. - // If an org is behaving badly, it can move on to a different layout. - code, message, _, remove := gs.responseStatus(response, err) - if code == codes.OK { - logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message) - - responseMessage := response.GetResponse() - if responseMessage != nil { - responseMessage.Payload = nil // Remove any duplicate response payload - } - - action, err := plan.processEndorsement(e, response) - if err != nil { - done <- &endorserResponse{err: errorDetail(e.endpointConfig, err.Error())} - return - } - done <- &endorserResponse{action: action} - } else { - logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", message) - if remove { - gs.registry.removeEndorser(e) - } - done <- &endorserResponse{err: errorDetail(e.endpointConfig, message)} - } - }() - select { - case resp := <-done: - // Endorser completed normally - if resp.err != nil { - e = plan.nextPeerInGroup(e) - } else { - e = nil - } - responseCh <- resp - case <-ctx.Done(): - // Overall endorsement timeout expired - responseCh <- &endorserResponse{timeoutExpired: true} - return + if gs.processProposal(ctx, plan, e, signedProposal, logger) { + break } + e = plan.nextPeerInGroup(e) } + waitCh <- true }(e) } - wg.Wait() - close(responseCh) - - for response := range responseCh { - if response.timeoutExpired { - logger.Warnw("Endorse call timed out while collecting endorsements", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers)) + for i := 0; i < len(endorsers); i++ { + select { + case <-waitCh: + // Endorser completedLayout normally + case <-ctx.Done(): + logger.Warnw("Endorse call timed out while collecting endorsements", "numEndorsers", len(endorsers)) return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting endorsements") } - if response.action != nil { - action = response.action - break - } - if response.err != nil { - errorDetails = append(errorDetails, response.err) - } } + } - if action == nil { - return nil, newRpcError(codes.Aborted, "failed to collect enough transaction endorsements, see attached details for more info", errorDetails...) + if plan.completedLayout == nil { + return nil, newRpcError(codes.Aborted, "failed to collect enough transaction endorsements, see attached details for more info", plan.errorDetails...) } + action = &peer.ChaincodeEndorsedAction{ProposalResponsePayload: plan.responsePayload, Endorsements: uniqueEndorsements(plan.completedLayout.endorsements)} + preparedTransaction, err := prepareTransaction(header, payload, action) if err != nil { return nil, status.Errorf(codes.Aborted, "failed to assemble transaction: %s", err) @@ -352,6 +221,144 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. return &gp.EndorseResponse{PreparedTransaction: preparedTransaction}, nil } +type ppResponse struct { + response *peer.ProposalResponse + err error +} + +// processProposal will invoke the given endorsing peer to process the signed proposal, and will update the plan accordingly. +// This function will timeout and return false if the given context timeout or the EndorsementTimeout option expires. +// Returns boolean true if the endorsement was successful. +func (gs *Server) processProposal(ctx context.Context, plan *plan, endorser *endorser, signedProposal *peer.SignedProposal, logger *flogging.FabricLogger) bool { + var response *peer.ProposalResponse + done := make(chan *ppResponse) + go func() { + defer close(done) + logger.Debugw("Sending to endorser:", "MSPID", endorser.mspid, "endpoint", endorser.address) + ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) // timeout of individual endorsement + defer cancel() + response, err := endorser.client.ProcessProposal(ctx, signedProposal) + done <- &ppResponse{response: response, err: err} + }() + select { + case resp := <-done: + // Endorser completedLayout normally + code, message, _, remove := responseStatus(resp.response, resp.err) + if code != codes.OK { + logger.Warnw("Endorse call to endorser failed", "MSPID", endorser.mspid, "endpoint", endorser.address, "error", message) + if remove { + gs.registry.removeEndorser(endorser) + } + plan.addError(errorDetail(endorser.endpointConfig, message)) + return false + } + response = resp.response + logger.Debugw("Endorse call to endorser returned success", "MSPID", endorser.mspid, "endpoint", endorser.address, "status", response.Response.Status, "message", response.Response.Message) + + responseMessage := response.GetResponse() + if responseMessage != nil { + responseMessage.Payload = nil // Remove any duplicate response payload + } + + return plan.processEndorsement(endorser, response) + case <-ctx.Done(): + // Overall endorsement timeout expired + return false + } +} + +// planFromFirstEndorser implements the gateway's strategy of processing the proposal on a single (preferably local) peer +// and using the ChaincodeInterest from the response to invoke discovery and build an endorsement plan. +// Returns the endorsement plan which can be used to request further endorsements, if required. +func (gs *Server) planFromFirstEndorser(ctx context.Context, channel string, chaincodeID string, hasTransientData bool, signedProposal *peer.SignedProposal, logger *flogging.FabricLogger) (*plan, error) { + defaultInterest := &peer.ChaincodeInterest{ + Chaincodes: []*peer.ChaincodeCall{{ + Name: chaincodeID, + }}, + } + + // 1. Choose an endorser from the gateway's organization + plan, err := gs.registry.planForOrgs(channel, chaincodeID, []string{gs.registry.localEndorser.mspid}) + if err != nil { + // No local org endorsers for this channel/chaincode. If transient data is involved, return error + if hasTransientData { + return nil, status.Error(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data") + } + // Otherwise, just let discovery pick one. + plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil) + if err != nil { + return nil, status.Error(codes.FailedPrecondition, err.Error()) + } + } + firstEndorser := plan.endorsers()[0] + + gs.logger.Debugw("Sending to first endorser:", "MSPID", firstEndorser.mspid, "endpoint", firstEndorser.address) + + // 2. Process the proposal on this endorser + var firstResponse *peer.ProposalResponse + var errDetails []proto.Message + + for firstResponse == nil && firstEndorser != nil { + done := make(chan struct{}) + go func() { + defer close(done) + + ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) + defer cancel() + firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal) + code, message, _, remove := responseStatus(firstResponse, err) + + if code != codes.OK { + logger.Warnw("Endorse call to endorser failed", "endorserAddress", firstEndorser.address, "endorserMspid", firstEndorser.mspid, "error", message) + errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message)) + if remove { + gs.registry.removeEndorser(firstEndorser) + } + firstEndorser = plan.nextPeerInGroup(firstEndorser) + firstResponse = nil + } + }() + select { + case <-done: + // Endorser completedLayout normally + case <-ctx.Done(): + // Overall endorsement timeout expired + logger.Warn("Endorse call timed out while collecting first endorsement") + return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting first endorsement") + } + } + if firstEndorser == nil || firstResponse == nil { + return nil, newRpcError(codes.Aborted, "failed to endorse transaction, see attached details for more info", errDetails...) + } + + // 3. Extract ChaincodeInterest and SBE policies + // The chaincode interest could be nil for legacy peers and for chaincode functions that don't produce a read-write set + interest := firstResponse.Interest + if len(interest.GetChaincodes()) == 0 { + interest = defaultInterest + } + + // 4. If transient data is involved, then we need to ensure that discovery only returns orgs which own the collections involved. + // Do this by setting NoPrivateReads to false on each collection + if hasTransientData { + for _, call := range interest.GetChaincodes() { + call.NoPrivateReads = false + } + } + + // 5. Get a set of endorsers from discovery via the registry + // The preferred discovery layout will contain the firstEndorser's Org. + plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser) + if err != nil { + return nil, status.Error(codes.FailedPrecondition, err.Error()) + } + + // 6. Remove the gateway org's endorser, since we've already done that + plan.processEndorsement(firstEndorser, firstResponse) + + return plan, nil +} + // responseStatus unpacks the proposal response and error values that are returned from ProcessProposal and // determines how the gateway should react (retry?, close connection?). // Uses the grpc canonical status error codes and their recommended actions. @@ -360,7 +367,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. // - error message extracted from the err or generated from 500 proposal response (string) // - should the gateway retry (only the Evaluate() uses this) (bool) // - should the gateway close the connection and remove the peer from its registry (bool) -func (gs *Server) responseStatus(response *peer.ProposalResponse, err error) (statusCode codes.Code, message string, retry bool, remove bool) { +func responseStatus(response *peer.ProposalResponse, err error) (statusCode codes.Code, message string, retry bool, remove bool) { if err != nil { if response == nil { // there is no ProposalResponse, so this must have been generated by grpc in response to an unavailable peer diff --git a/internal/pkg/gateway/endorsement.go b/internal/pkg/gateway/endorsement.go index c14ea367462..b3d3d4359af 100644 --- a/internal/pkg/gateway/endorsement.go +++ b/internal/pkg/gateway/endorsement.go @@ -9,9 +9,9 @@ package gateway import ( "bytes" b64 "encoding/base64" - "errors" "sync" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/peer" ) @@ -32,6 +32,8 @@ type plan struct { nextLayout int size int responsePayload []byte + completedLayout *layout + errorDetails []proto.Message planLock sync.Mutex } @@ -88,8 +90,9 @@ func (p *plan) endorsers() []*endorser { // Invoke processEndorsement when an endorsement has been successfully received for the given endorser. // All layouts containing the group that contains this endorser are updated with the endorsement. -// Returns a ChaincodeEndorsedAction, if at least one layout in the plan has been satisfied, otherwise nil. -func (p *plan) processEndorsement(endorser *endorser, response *peer.ProposalResponse) (*peer.ChaincodeEndorsedAction, error) { +// Returns Boolean true if endorser returns with a payload that matches the response payloads of +// the other endorsers in the plan. +func (p *plan) processEndorsement(endorser *endorser, response *peer.ProposalResponse) bool { p.planLock.Lock() defer p.planLock.Unlock() @@ -111,7 +114,8 @@ func (p *plan) processEndorsement(endorser *endorser, response *peer.ProposalRes } else { if !bytes.Equal(p.responsePayload, response.GetPayload()) { logger.Warnw("ProposalResponsePayloads do not match (base64)", "payload1", b64.StdEncoding.EncodeToString(p.responsePayload), "payload2", b64.StdEncoding.EncodeToString(response.GetPayload())) - return nil, errors.New("ProposalResponsePayloads do not match") + p.errorDetails = append(p.errorDetails, errorDetail(endorser.endpointConfig, "ProposalResponsePayloads do not match")) + return false } } @@ -128,12 +132,13 @@ func (p *plan) processEndorsement(endorser *endorser, response *peer.ProposalRes delete(layout.required, group) if len(layout.required) == 0 { // no groups left - this layout is now satisfied - return &peer.ChaincodeEndorsedAction{ProposalResponsePayload: p.responsePayload, Endorsements: uniqueEndorsements(layout.endorsements)}, nil + p.completedLayout = layout + return true } } } } - return nil, nil + return true } // Invoke nextPeerInGroup if an endorsement fails for the given endorser. @@ -168,6 +173,12 @@ func (p *plan) nextPeerInGroup(endorser *endorser) *endorser { return nil } +func (p *plan) addError(detail proto.Message) { + p.planLock.Lock() + defer p.planLock.Unlock() + p.errorDetails = append(p.errorDetails, detail) +} + func uniqueEndorsements(endorsements []*peer.Endorsement) []*peer.Endorsement { endorsersUsed := make(map[string]struct{}) var unique []*peer.Endorsement diff --git a/internal/pkg/gateway/endorsement_test.go b/internal/pkg/gateway/endorsement_test.go index 0f40ca842eb..e2c66751486 100644 --- a/internal/pkg/gateway/endorsement_test.go +++ b/internal/pkg/gateway/endorsement_test.go @@ -32,17 +32,17 @@ func TestSingleLayoutPlan(t *testing.T) { response2 := &peer.ProposalResponse{Payload: []byte("p"), Endorsement: &peer.Endorsement{Endorser: []byte("e2")}} response3 := &peer.ProposalResponse{Payload: []byte("p"), Endorsement: &peer.Endorsement{Endorser: []byte("e3")}} - action, err := plan.processEndorsement(peer1Mock, response1) - require.NoError(t, err) - require.Nil(t, action) - action, err = plan.processEndorsement(peer2Mock, response2) - require.NoError(t, err) - require.Nil(t, action) - action, err = plan.processEndorsement(peer3Mock, response3) - require.NoError(t, err) - require.Equal(t, action.GetProposalResponsePayload(), response1.Payload) - require.Len(t, action.GetEndorsements(), 3) - require.ElementsMatch(t, action.GetEndorsements(), []*peer.Endorsement{response1.Endorsement, response2.Endorsement, response3.Endorsement}) + success := plan.processEndorsement(peer1Mock, response1) + require.True(t, success) + require.Nil(t, plan.completedLayout) + success = plan.processEndorsement(peer2Mock, response2) + require.True(t, success) + require.Nil(t, plan.completedLayout) + success = plan.processEndorsement(peer3Mock, response3) + require.True(t, success) + require.Equal(t, plan.responsePayload, response1.Payload) + require.Len(t, plan.completedLayout.endorsements, 3) + require.ElementsMatch(t, plan.completedLayout.endorsements, []*peer.Endorsement{response1.Endorsement, response2.Endorsement, response3.Endorsement}) } func TestSingleLayoutRetry(t *testing.T) { @@ -66,19 +66,19 @@ func TestSingleLayoutRetry(t *testing.T) { retry := plan.nextPeerInGroup(localhostMock) require.Equal(t, peer1Mock, retry) - action, err := plan.processEndorsement(retry, response1) - require.NoError(t, err) - require.Nil(t, action) - action, err = plan.processEndorsement(peer2Mock, response2) - require.NoError(t, err) - require.Nil(t, action) + success := plan.processEndorsement(retry, response1) + require.True(t, success) + require.Nil(t, plan.completedLayout) + success = plan.processEndorsement(peer2Mock, response2) + require.True(t, success) + require.Nil(t, plan.completedLayout) retry = plan.nextPeerInGroup(peer3Mock) require.Equal(t, peer4Mock, retry) - action, err = plan.processEndorsement(retry, response3) - require.NoError(t, err) - require.Equal(t, action.GetProposalResponsePayload(), response1.Payload) - require.Len(t, action.GetEndorsements(), 3) - require.ElementsMatch(t, action.GetEndorsements(), []*peer.Endorsement{response1.Endorsement, response2.Endorsement, response3.Endorsement}) + success = plan.processEndorsement(retry, response3) + require.True(t, success) + require.Equal(t, plan.responsePayload, response1.Payload) + require.Len(t, plan.completedLayout.endorsements, 3) + require.ElementsMatch(t, plan.completedLayout.endorsements, []*peer.Endorsement{response1.Endorsement, response2.Endorsement, response3.Endorsement}) } func TestMultiLayoutRetry(t *testing.T) { @@ -107,9 +107,8 @@ func TestMultiLayoutRetry(t *testing.T) { require.Equal(t, peer1Mock, retry) // peer2 (g2) succeeds - action, err := plan.processEndorsement(peer2Mock, response1) - require.NoError(t, err) - require.Nil(t, action) + success := plan.processEndorsement(peer2Mock, response1) + require.True(t, success) // peer1 (g1) also fails - returns nil, since no more peers in g1 retry = plan.nextPeerInGroup(retry) @@ -121,11 +120,11 @@ func TestMultiLayoutRetry(t *testing.T) { require.Len(t, endorsers, 1) require.Equal(t, peer4Mock, endorsers[0]) - action, err = plan.processEndorsement(peer4Mock, response2) - require.NoError(t, err) - require.Equal(t, action.GetProposalResponsePayload(), response1.Payload) - require.Len(t, action.GetEndorsements(), 2) - require.ElementsMatch(t, action.GetEndorsements(), []*peer.Endorsement{response1.Endorsement, response2.Endorsement}) + success = plan.processEndorsement(peer4Mock, response2) + require.True(t, success) + require.Equal(t, plan.responsePayload, response1.Payload) + require.Len(t, plan.completedLayout.endorsements, 2) + require.ElementsMatch(t, plan.completedLayout.endorsements, []*peer.Endorsement{response1.Endorsement, response2.Endorsement}) } func TestMultiLayoutFailures(t *testing.T) { @@ -150,9 +149,8 @@ func TestMultiLayoutFailures(t *testing.T) { response2 := &peer.ProposalResponse{Payload: []byte("p"), Endorsement: &peer.Endorsement{Endorser: []byte("e2")}} // localhost (g1) succeeds - action, err := plan.processEndorsement(localhostMock, response1) - require.NoError(t, err) - require.Nil(t, action) + success := plan.processEndorsement(localhostMock, response1) + require.True(t, success) // peer2 (g2) fails - returns peer3 to retry retry := plan.nextPeerInGroup(peer2Mock) @@ -163,9 +161,8 @@ func TestMultiLayoutFailures(t *testing.T) { require.Nil(t, g3retry) // retry g2 - succeeds - action, err = plan.processEndorsement(retry, response2) - require.NoError(t, err) - require.Nil(t, action) + success = plan.processEndorsement(retry, response2) + require.True(t, success) // nothing more to try in this layout - get endorsers for next layout // layout 2 requires a second endorsement from g2, but all g2 peers have been tried - only 1 succeeded @@ -218,16 +215,15 @@ func TestMultiPlan(t *testing.T) { // localhost succeeds, remove from plan 2 endorsers = plan2.endorsers() require.Len(t, endorsers, 2) - action, err := plan2.processEndorsement(localhostMock, response1) - require.NoError(t, err) - require.Nil(t, action) + success := plan2.processEndorsement(localhostMock, response1) + require.True(t, success) // peer2 (g2) succeeds - action, err = plan2.processEndorsement(peer2Mock, response2) - require.NoError(t, err) - require.Equal(t, action.GetProposalResponsePayload(), response1.Payload) - require.Len(t, action.GetEndorsements(), 2) - require.ElementsMatch(t, action.GetEndorsements(), []*peer.Endorsement{response1.Endorsement, response2.Endorsement}) + success = plan2.processEndorsement(peer2Mock, response2) + require.True(t, success) + require.Equal(t, plan2.responsePayload, response1.Payload) + require.Len(t, plan2.completedLayout.endorsements, 2) + require.ElementsMatch(t, plan2.completedLayout.endorsements, []*peer.Endorsement{response1.Endorsement, response2.Endorsement}) } func TestMultiPlanNoOverlap(t *testing.T) { @@ -263,46 +259,27 @@ func TestMultiPlanNoOverlap(t *testing.T) { // localhost succeeds, try to remove from plan 2 (should be no-op) endorsers = plan2.endorsers() require.Len(t, endorsers, 2) - action, err := plan2.processEndorsement(localhostMock, response1) - require.NoError(t, err) - require.Nil(t, action) + success := plan2.processEndorsement(localhostMock, response1) + require.True(t, success) // peer2 (g2) succeeds - action, err = plan2.processEndorsement(peer2Mock, response2) - require.NoError(t, err) - require.Nil(t, action) + success = plan2.processEndorsement(peer2Mock, response2) + require.True(t, success) // peer4 (g3) succeeds - action, err = plan2.processEndorsement(peer4Mock, response3) - require.NoError(t, err) - require.Equal(t, action.GetProposalResponsePayload(), response2.Payload) - require.Len(t, action.GetEndorsements(), 2) - require.ElementsMatch(t, action.GetEndorsements(), []*peer.Endorsement{response2.Endorsement, response3.Endorsement}) + success = plan2.processEndorsement(peer4Mock, response3) + require.True(t, success) + require.Equal(t, plan2.responsePayload, response1.Payload) + require.Len(t, plan2.completedLayout.endorsements, 2) + require.ElementsMatch(t, plan2.completedLayout.endorsements, []*peer.Endorsement{response2.Endorsement, response3.Endorsement}) } -func TestDuplicateEndorsements(t *testing.T) { - layouts := []*layout{ - {required: map[string]int{"g1": 2}}, - } - groupEndorsers := map[string][]*endorser{ - "g1": {peer1Mock, peer2Mock}, - } - plan := newPlan(layouts, groupEndorsers) - require.Equal(t, plan.size, 2) // total number of endorsers in all layouts - - endorsers := plan.endorsers() - require.Len(t, endorsers, 2) - require.ElementsMatch(t, endorsers, []*endorser{peer1Mock, peer2Mock}) - - response1 := &peer.ProposalResponse{Payload: []byte("p"), Endorsement: &peer.Endorsement{Endorser: []byte("duplicate")}} - response2 := &peer.ProposalResponse{Payload: []byte("p"), Endorsement: &peer.Endorsement{Endorser: []byte("duplicate")}} - - action, err := plan.processEndorsement(peer1Mock, response1) - require.NoError(t, err) - require.Nil(t, action) - action, err = plan.processEndorsement(peer2Mock, response2) - require.NoError(t, err) - require.Equal(t, action.GetProposalResponsePayload(), response1.Payload) - require.Len(t, action.GetEndorsements(), 1) - require.ElementsMatch(t, action.GetEndorsements(), []*peer.Endorsement{response1.Endorsement}) +func TestUniqueEndorsements(t *testing.T) { + e1 := &peer.Endorsement{Endorser: []byte("endorserA")} + e2 := &peer.Endorsement{Endorser: []byte("endorserB")} + e3 := &peer.Endorsement{Endorser: []byte("endorserA")} + e4 := &peer.Endorsement{Endorser: []byte("endorserC")} + unique := uniqueEndorsements([]*peer.Endorsement{e1, e2, e3, e4}) + require.Len(t, unique, 3) + require.ElementsMatch(t, unique, []*peer.Endorsement{e1, e2, e4}) }