diff --git a/pkg/chain/ethereum/tbtc.go b/pkg/chain/ethereum/tbtc.go index ea2c1f0ff7..7c5765f7e5 100644 --- a/pkg/chain/ethereum/tbtc.go +++ b/pkg/chain/ethereum/tbtc.go @@ -1965,3 +1965,10 @@ func (tc *TbtcChain) GetRedemptionMaxSize() (uint16, error) { func (tc *TbtcChain) GetRedemptionRequestMinAge() (uint32, error) { return tc.walletCoordinator.RedemptionRequestMinAge() } + +func (tc *TbtcChain) ValidateHeartbeatProposal( + proposal *tbtc.HeartbeatProposal, +) error { + // TODO: Implementation. + panic("not implemented yet") +} diff --git a/pkg/tbtc/coordination.go b/pkg/tbtc/coordination.go index d03860a9dd..d4dc1b2829 100644 --- a/pkg/tbtc/coordination.go +++ b/pkg/tbtc/coordination.go @@ -5,11 +5,12 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + "math/rand" + "sort" + "github.com/keep-network/keep-core/pkg/internal/pb" "go.uber.org/zap" "golang.org/x/exp/slices" - "math/rand" - "sort" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/chain" @@ -195,6 +196,13 @@ func (cf *coordinationFault) String() string { ) } +// CoordinationProposalRequest represents a request for a coordination proposal. +type CoordinationProposalRequest struct { + WalletPublicKeyHash [20]byte + WalletOperators []chain.Address + ActionsChecklist []WalletActionType +} + // CoordinationProposalGenerator is a component responsible for generating // coordination proposals. type CoordinationProposalGenerator interface { @@ -204,10 +212,7 @@ type CoordinationProposalGenerator interface { // expected to return a proposal for the first action from the checklist // that is valid for the given wallet's state. If none of the actions are // valid, the generator should return a no-op proposal. - Generate( - walletPublicKeyHash [20]byte, - actionsChecklist []WalletActionType, - ) (CoordinationProposal, error) + Generate(request *CoordinationProposalRequest) (CoordinationProposal, error) } // CoordinationProposal represents a single action proposal for the given wallet. @@ -370,14 +375,22 @@ func (ce *coordinationExecutor) coordinate( execLogger.Info("actions checklist is: [%v]", actionsChecklist) - // Set up a context that is cancelled when the active phase of the - // coordination window ends. + // Set up a context that is automatically cancelled when the active phase + // of the coordination window ends. + // + // The coordination leader keeps that context active for the lifetime of the + // active phase to provide retransmissions of the coordination message thus + // maximize the chance that all followers receive it on time. The only case + // when the leader cancels the context prematurely is when the leader's + // routine fails. + // + // The coordination follower cancels the context as soon as it receives + // the coordination message. ctx, cancelCtx := withCancelOnBlock( context.Background(), window.activePhaseEndBlock(), ce.waitForBlockFn, ) - defer cancelCtx() var proposal CoordinationProposal var faults []*coordinationFault @@ -391,6 +404,10 @@ func (ce *coordinationExecutor) coordinate( actionsChecklist, ) if err != nil { + // Cancel the context upon leader's routine failure. There is + // no point to keep the context active as retransmissions do not + // occur anyway. + cancelCtx() return nil, fmt.Errorf( "failed to execute leader's routine: [%v]", err, @@ -401,6 +418,9 @@ func (ce *coordinationExecutor) coordinate( } else { execLogger.Info("executing follower's routine") + // Cancel the context upon follower's routine completion. + defer cancelCtx() + proposal, faults, err = ce.executeFollowerRoutine( ctx, leader, @@ -558,8 +578,11 @@ func (ce *coordinationExecutor) executeLeaderRoutine( walletPublicKeyHash := ce.walletPublicKeyHash() proposal, err := ce.proposalGenerator.Generate( - walletPublicKeyHash, - actionsChecklist, + &CoordinationProposalRequest{ + WalletPublicKeyHash: walletPublicKeyHash, + WalletOperators: ce.coordinatedWallet.signingGroupOperators, + ActionsChecklist: actionsChecklist, + }, ) if err != nil { return nil, fmt.Errorf("failed to generate proposal: [%v]", err) diff --git a/pkg/tbtc/coordination_test.go b/pkg/tbtc/coordination_test.go index d8a6ea53b3..1fccf8aac7 100644 --- a/pkg/tbtc/coordination_test.go +++ b/pkg/tbtc/coordination_test.go @@ -6,6 +6,12 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "math/big" + "math/rand" + "reflect" + "testing" + "time" + "github.com/go-test/deep" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/chain" @@ -17,11 +23,6 @@ import ( "github.com/keep-network/keep-core/pkg/protocol/group" "github.com/keep-network/keep-core/pkg/tecdsa" "golang.org/x/exp/slices" - "math/big" - "math/rand" - "reflect" - "testing" - "time" "github.com/keep-network/keep-core/internal/testutils" ) @@ -703,7 +704,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) { for _, action := range actionsChecklist { if walletPublicKeyHash == publicKeyHash && action == ActionHeartbeat { return &HeartbeatProposal{ - Message: []byte("heartbeat message"), + Message: [16]byte{0x01, 0x02}, }, nil } } @@ -764,7 +765,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) { <-ctx.Done() expectedProposal := &HeartbeatProposal{ - Message: []byte("heartbeat message"), + Message: [16]byte{0x01, 0x02}, } if !reflect.DeepEqual(expectedProposal, proposal) { @@ -990,7 +991,7 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine(t *testing.T) { coordinationBlock: 900, walletPublicKeyHash: executor.walletPublicKeyHash(), proposal: &HeartbeatProposal{ - Message: []byte("heartbeat message"), + Message: [16]byte{0x01, 0x02}, }, }) if err != nil { @@ -1117,7 +1118,7 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine_WithIdleLeader(t *testing.T provider := netlocal.Connect() - broadcastChannel, err := provider.BroadcastChannelFor("test") + broadcastChannel, err := provider.BroadcastChannelFor("test-idle") if err != nil { t.Fatal(err) } @@ -1185,8 +1186,7 @@ func newMockCoordinationProposalGenerator( } func (mcpg *mockCoordinationProposalGenerator) Generate( - walletPublicKeyHash [20]byte, - actionsChecklist []WalletActionType, + request *CoordinationProposalRequest, ) (CoordinationProposal, error) { - return mcpg.delegate(walletPublicKeyHash, actionsChecklist) + return mcpg.delegate(request.WalletPublicKeyHash, request.ActionsChecklist) } diff --git a/pkg/tbtc/deduplicator.go b/pkg/tbtc/deduplicator.go index 96982146b9..a674cea370 100644 --- a/pkg/tbtc/deduplicator.go +++ b/pkg/tbtc/deduplicator.go @@ -1,9 +1,6 @@ package tbtc import ( - "bytes" - "crypto/sha256" - "encoding/binary" "encoding/hex" "math/big" "strconv" @@ -19,15 +16,6 @@ const ( // DKGResultHashCachePeriod is the time period the cache maintains // the given DKG result hash. DKGResultHashCachePeriod = 7 * 24 * time.Hour - // HeartbeatRequestCachePeriod is the time period the cache maintains - // the given heartbeat request. - HeartbeatRequestCachePeriod = 24 * time.Hour - // DepositSweepProposalCachePeriod is the time period the cache maintains - // the given deposit sweep proposal. - DepositSweepProposalCachePeriod = 7 * 24 * time.Hour - // RedemptionProposalCachePeriod is the time period the cache maintains - // the given redemption proposal. - RedemptionProposalCachePeriod = 24 * time.Hour ) // deduplicator decides whether the given event should be handled by the @@ -43,24 +31,15 @@ const ( // Those events are supported: // - DKG started // - DKG result submitted -// - Heartbeat request submission -// - Deposit sweep proposal submission -// - Redemption proposal submission type deduplicator struct { - dkgSeedCache *cache.TimeCache - dkgResultHashCache *cache.TimeCache - heartbeatRequestCache *cache.TimeCache - depositSweepProposalCache *cache.TimeCache - redemptionProposalCache *cache.TimeCache + dkgSeedCache *cache.TimeCache + dkgResultHashCache *cache.TimeCache } func newDeduplicator() *deduplicator { return &deduplicator{ - dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod), - dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod), - heartbeatRequestCache: cache.NewTimeCache(HeartbeatRequestCachePeriod), - depositSweepProposalCache: cache.NewTimeCache(DepositSweepProposalCachePeriod), - redemptionProposalCache: cache.NewTimeCache(RedemptionProposalCachePeriod), + dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod), + dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod), } } @@ -111,113 +90,3 @@ func (d *deduplicator) notifyDKGResultSubmitted( // proceed with the execution. return false } - -// notifyHeartbeatRequestSubmitted notifies the client wants to start some -// actions upon the heartbeat request submitted. It returns boolean indicating -// whether the client should proceed with the actions or ignore the event as -// a duplicate. -func (d *deduplicator) notifyHeartbeatRequestSubmitted( - walletPublicKeyHash [20]byte, - message []byte, -) bool { - d.heartbeatRequestCache.Sweep() - - var buffer bytes.Buffer - buffer.Write(walletPublicKeyHash[:]) - buffer.Write(message) - - bufferSha256 := sha256.Sum256(buffer.Bytes()) - cacheKey := hex.EncodeToString(bufferSha256[:]) - - // If the key is not in the cache, that means the request was not handled - // yet and the client should proceed with the execution. - if !d.heartbeatRequestCache.Has(cacheKey) { - d.heartbeatRequestCache.Add(cacheKey) - return true - } - - // Otherwise, the request is a duplicate and the client should not - // proceed with the execution. - return false -} - -// notifyDepositSweepProposalSubmitted notifies the client wants to start some -// actions upon the deposit sweep proposal submission. It returns boolean -// indicating whether the client should proceed with the actions or ignore the -// event as a duplicate. -func (d *deduplicator) notifyDepositSweepProposalSubmitted( - newProposal *DepositSweepProposal, -) bool { - d.depositSweepProposalCache.Sweep() - - // We build the cache key by hashing the concatenation of relevant fields - // of the proposal. It may be tempting to extract that code into a general - // "hash code" function exposed by the DepositSweepProposal type but this - // is not necessarily a good idea. The deduplicator is responsible for - // detecting duplicates and construction of cache keys is part of that job. - // Extracting this logic outside would push that responsibility out of the - // deduplicator control. That is dangerous as deduplication logic could - // be implicitly changeable from the outside and lead to serious bugs. - var buffer bytes.Buffer - buffer.Write(newProposal.WalletPublicKeyHash[:]) - for _, depositKey := range newProposal.DepositsKeys { - buffer.Write(depositKey.FundingTxHash[:]) - fundingOutputIndex := make([]byte, 4) - binary.BigEndian.PutUint32(fundingOutputIndex, depositKey.FundingOutputIndex) - buffer.Write(fundingOutputIndex) - } - buffer.Write(newProposal.SweepTxFee.Bytes()) - - bufferSha256 := sha256.Sum256(buffer.Bytes()) - cacheKey := hex.EncodeToString(bufferSha256[:]) - - // If the key is not in the cache, that means the proposal was not handled - // yet and the client should proceed with the execution. - if !d.depositSweepProposalCache.Has(cacheKey) { - d.depositSweepProposalCache.Add(cacheKey) - return true - } - - // Otherwise, the proposal is a duplicate and the client should not - // proceed with the execution. - return false -} - -// notifyRedemptionProposalSubmitted notifies the client wants to start some -// actions upon the redemption proposal submission. It returns boolean -// indicating whether the client should proceed with the actions or ignore the -// event as a duplicate. -func (d *deduplicator) notifyRedemptionProposalSubmitted( - newProposal *RedemptionProposal, -) bool { - d.redemptionProposalCache.Sweep() - - // We build the cache key by hashing the concatenation of relevant fields - // of the proposal. It may be tempting to extract that code into a general - // "hash code" function exposed by the RedemptionProposal type but this - // is not necessarily a good idea. The deduplicator is responsible for - // detecting duplicates and construction of cache keys is part of that job. - // Extracting this logic outside would push that responsibility out of the - // deduplicator control. That is dangerous as deduplication logic could - // be implicitly changeable from the outside and lead to serious bugs. - var buffer bytes.Buffer - buffer.Write(newProposal.WalletPublicKeyHash[:]) - for _, script := range newProposal.RedeemersOutputScripts { - buffer.Write(script) - } - buffer.Write(newProposal.RedemptionTxFee.Bytes()) - - bufferSha256 := sha256.Sum256(buffer.Bytes()) - cacheKey := hex.EncodeToString(bufferSha256[:]) - - // If the key is not in the cache, that means the proposal was not handled - // yet and the client should proceed with the execution. - if !d.redemptionProposalCache.Has(cacheKey) { - d.redemptionProposalCache.Add(cacheKey) - return true - } - - // Otherwise, the proposal is a duplicate and the client should not - // proceed with the execution. - return false -} diff --git a/pkg/tbtc/deduplicator_test.go b/pkg/tbtc/deduplicator_test.go index eea6e8e045..4fdeb5ec7c 100644 --- a/pkg/tbtc/deduplicator_test.go +++ b/pkg/tbtc/deduplicator_test.go @@ -6,16 +6,11 @@ import ( "testing" "time" - "github.com/keep-network/keep-core/pkg/bitcoin" - "github.com/keep-network/keep-common/pkg/cache" ) const testDKGSeedCachePeriod = 1 * time.Second const testDKGResultHashCachePeriod = 1 * time.Second -const testHeartbeatRequestCachePeriod = 1 * time.Second -const testDepositSweepProposalCachePeriod = 1 * time.Second -const testRedemptionProposalCachePeriod = 1 * time.Second func TestNotifyDKGStarted(t *testing.T) { deduplicator := deduplicator{ @@ -117,286 +112,3 @@ func TestNotifyDKGResultSubmitted(t *testing.T) { t.Fatal("should be allowed to process") } } - -func TestNotifyHeartbeatRequestSubmitted(t *testing.T) { - deduplicator := deduplicator{ - heartbeatRequestCache: cache.NewTimeCache(testHeartbeatRequestCachePeriod), - } - - walletPublicKeyHash1 := [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5} - walletPublicKeyHash2 := [20]byte{2, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 6} - heartbeatMessage1 := []byte{1, 2} - heartbeatMessage2 := []byte{3, 4} - - // Original heartbeat request - canProcess := deduplicator.notifyHeartbeatRequestSubmitted(walletPublicKeyHash1, heartbeatMessage1) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // With another message - canProcess = deduplicator.notifyHeartbeatRequestSubmitted(walletPublicKeyHash1, heartbeatMessage2) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // With another wallet - canProcess = deduplicator.notifyHeartbeatRequestSubmitted(walletPublicKeyHash2, heartbeatMessage1) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // With the same wallet and message - canProcess = deduplicator.notifyHeartbeatRequestSubmitted(walletPublicKeyHash1, heartbeatMessage1) - if canProcess { - t.Fatal("should not be allowed to process") - } - - // Wait until caching period elapses. - time.Sleep(testHeartbeatRequestCachePeriod) - - // With the same wallet and message again - canProcess = deduplicator.notifyHeartbeatRequestSubmitted(walletPublicKeyHash1, heartbeatMessage1) - if !canProcess { - t.Fatal("should be allowed to process") - } -} - -func TestNotifyDepositSweepProposalSubmitted(t *testing.T) { - deduplicator := deduplicator{ - depositSweepProposalCache: cache.NewTimeCache( - testDepositSweepProposalCachePeriod, - ), - } - - newHash := func(t *testing.T, value string) bitcoin.Hash { - hash, err := bitcoin.NewHashFromString(value, bitcoin.InternalByteOrder) - if err != nil { - t.Fatal(err) - } - - return hash - } - - // Original proposal. - proposal := &DepositSweepProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - DepositsKeys: []struct { - FundingTxHash bitcoin.Hash - FundingOutputIndex uint32 - }{ - {newHash(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), 4}, - {newHash(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), 0}, - }, - SweepTxFee: big.NewInt(1000), - } - - // Proposal with different wallet. - proposalDiffWallet := &DepositSweepProposal{ - WalletPublicKeyHash: [20]byte{2, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - DepositsKeys: []struct { - FundingTxHash bitcoin.Hash - FundingOutputIndex uint32 - }{ - {newHash(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), 4}, - {newHash(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), 0}, - }, - SweepTxFee: big.NewInt(1000), - } - - // Proposal with different deposits. - proposalDiffDeposits := &DepositSweepProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - DepositsKeys: []struct { - FundingTxHash bitcoin.Hash - FundingOutputIndex uint32 - }{ - {newHash(t, "84d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), 4}, - {newHash(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), 0}, - }, - SweepTxFee: big.NewInt(1000), - } - - // Proposal with same deposits but in different order. - proposalDiffDepositsOrder := &DepositSweepProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - DepositsKeys: []struct { - FundingTxHash bitcoin.Hash - FundingOutputIndex uint32 - }{ - {newHash(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), 0}, - {newHash(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), 4}, - }, - SweepTxFee: big.NewInt(1000), - } - - // Proposal with different sweep tx fee. - proposalDiffSweepTxFee := &DepositSweepProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - DepositsKeys: []struct { - FundingTxHash bitcoin.Hash - FundingOutputIndex uint32 - }{ - {newHash(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), 4}, - {newHash(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), 0}, - }, - SweepTxFee: big.NewInt(1001), - } - - // Add the original proposal. - canProcess := deduplicator.notifyDepositSweepProposalSubmitted(proposal) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the original proposal before caching period elapses. - canProcess = deduplicator.notifyDepositSweepProposalSubmitted(proposal) - if canProcess { - t.Fatal("should not be allowed to process") - } - - // Add the proposal with different wallet. - canProcess = deduplicator.notifyDepositSweepProposalSubmitted(proposalDiffWallet) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the proposal with different deposits. - canProcess = deduplicator.notifyDepositSweepProposalSubmitted(proposalDiffDeposits) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the proposal with different deposits order. - canProcess = deduplicator.notifyDepositSweepProposalSubmitted(proposalDiffDepositsOrder) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the proposal with different sweep tx fee. - canProcess = deduplicator.notifyDepositSweepProposalSubmitted(proposalDiffSweepTxFee) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Wait until caching period elapses. - time.Sleep(testDepositSweepProposalCachePeriod) - - // Add the original proposal again. - canProcess = deduplicator.notifyDepositSweepProposalSubmitted(proposal) - if !canProcess { - t.Fatal("should be allowed to process") - } -} - -func TestNotifyRedemptionProposalSubmitted(t *testing.T) { - deduplicator := deduplicator{ - redemptionProposalCache: cache.NewTimeCache( - testRedemptionProposalCachePeriod, - ), - } - - newScript := func(t *testing.T, value string) bitcoin.Script { - hash, err := hex.DecodeString(value) - if err != nil { - t.Fatal(err) - } - - return hash - } - - // Original proposal. - proposal := &RedemptionProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - RedeemersOutputScripts: []bitcoin.Script{ - newScript(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), - newScript(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), - }, - RedemptionTxFee: big.NewInt(1000), - } - - // Proposal with different wallet. - proposalDiffWallet := &RedemptionProposal{ - WalletPublicKeyHash: [20]byte{2, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - RedeemersOutputScripts: []bitcoin.Script{ - newScript(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), - newScript(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), - }, - RedemptionTxFee: big.NewInt(1000), - } - - // Proposal with different redeemer scripts. - proposalDiffScripts := &RedemptionProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - RedeemersOutputScripts: []bitcoin.Script{ - newScript(t, "84d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), - newScript(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), - }, - RedemptionTxFee: big.NewInt(1000), - } - - // Proposal with same redeemer scripts but in different order. - proposalDiffScriptsOrder := &RedemptionProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - RedeemersOutputScripts: []bitcoin.Script{ - newScript(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), - newScript(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), - }, - RedemptionTxFee: big.NewInt(1000), - } - - // Proposal with different redemption tx fee. - proposalDiffRedemptionTxFee := &RedemptionProposal{ - WalletPublicKeyHash: [20]byte{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - RedeemersOutputScripts: []bitcoin.Script{ - newScript(t, "74d0e353cdba99a6c17ce2cfeab62a26c09b5eb756eccdcfb83dbc12e67b18bc"), - newScript(t, "f8eaf242a55ea15e602f9f990e33f67f99dfbe25d1802bbde63cc1caabf99668"), - }, - RedemptionTxFee: big.NewInt(1001), - } - - // Add the original proposal. - canProcess := deduplicator.notifyRedemptionProposalSubmitted(proposal) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the original proposal before caching period elapses. - canProcess = deduplicator.notifyRedemptionProposalSubmitted(proposal) - if canProcess { - t.Fatal("should not be allowed to process") - } - - // Add the proposal with different wallet. - canProcess = deduplicator.notifyRedemptionProposalSubmitted(proposalDiffWallet) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the proposal with different redeemer scripts. - canProcess = deduplicator.notifyRedemptionProposalSubmitted(proposalDiffScripts) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the proposal with different redeemer scripts order. - canProcess = deduplicator.notifyRedemptionProposalSubmitted(proposalDiffScriptsOrder) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Add the proposal with different redemption tx fee. - canProcess = deduplicator.notifyRedemptionProposalSubmitted(proposalDiffRedemptionTxFee) - if !canProcess { - t.Fatal("should be allowed to process") - } - - // Wait until caching period elapses. - time.Sleep(testRedemptionProposalCachePeriod) - - // Add the original proposal again. - canProcess = deduplicator.notifyRedemptionProposalSubmitted(proposal) - if !canProcess { - t.Fatal("should be allowed to process") - } -} diff --git a/pkg/tbtc/deposit_sweep.go b/pkg/tbtc/deposit_sweep.go index 63c66ca21d..dd9696d525 100644 --- a/pkg/tbtc/deposit_sweep.go +++ b/pkg/tbtc/deposit_sweep.go @@ -19,34 +19,31 @@ const ( // another actions. The value of 1200 blocks is roughly 4 hours, assuming // 12 seconds per block. depositSweepProposalValidityBlocks = 1200 - // depositSweepProposalConfirmationBlocks determines the block length of the - // confirmation period on the host chain that is preserved after a deposit - // sweep proposal submission. - depositSweepProposalConfirmationBlocks = 20 // DepositSweepRequiredFundingTxConfirmations determines the minimum // number of confirmations that are needed for a deposit funding Bitcoin // transaction in order to consider it a valid part of the deposit sweep // proposal. DepositSweepRequiredFundingTxConfirmations = 6 - // depositSweepSigningTimeoutSafetyMargin determines the duration of the - // safety margin that must be preserved between the signing timeout + // depositSweepSigningTimeoutSafetyMarginBlocks determines the duration of + // the safety margin that must be preserved between the signing timeout // and the timeout of the entire deposit sweep action. This safety // margin prevents against the case where signing completes late and there // is not enough time to broadcast the sweep transaction properly. // In such a case, wallet signatures may leak and make the wallet subject // of fraud accusations. Usage of the safety margin ensures there is enough // time to perform post-signing steps of the deposit sweep action. - depositSweepSigningTimeoutSafetyMargin = 1 * time.Hour + // The value of 300 blocks is roughly 1 hour, assuming 12 seconds per block. + depositSweepSigningTimeoutSafetyMarginBlocks = 300 // depositSweepBroadcastTimeout determines the time window for deposit // sweep transaction broadcast. It is guaranteed that at least - // depositSweepSigningTimeoutSafetyMargin is preserved for the broadcast + // depositSweepSigningTimeoutSafetyMarginBlocks is preserved for the broadcast // step. However, the happy path for the broadcast step is usually quick // and few retries are needed to recover from temporary problems. That // said, if the broadcast step does not succeed in a tight timeframe, // there is no point to retry for the entire possible time window. // Hence, the timeout for broadcast step is set as 25% of the entire - // time widow determined by depositSweepSigningTimeoutSafetyMargin. - depositSweepBroadcastTimeout = depositSweepSigningTimeoutSafetyMargin / 4 + // time widow determined by depositSweepSigningTimeoutSafetyMarginBlocks. + depositSweepBroadcastTimeout = 15 * time.Minute // depositSweepBroadcastCheckDelay determines the delay that must // be preserved between transaction broadcast and the check that ensures // the transaction is known on the Bitcoin chain. This delay is needed @@ -86,12 +83,12 @@ type depositSweepAction struct { proposal *DepositSweepProposal proposalProcessingStartBlock uint64 - proposalExpiresAt time.Time + proposalExpiryBlock uint64 - requiredFundingTxConfirmations uint - signingTimeoutSafetyMargin time.Duration - broadcastTimeout time.Duration - broadcastCheckDelay time.Duration + requiredFundingTxConfirmations uint + signingTimeoutSafetyMarginBlocks uint64 + broadcastTimeout time.Duration + broadcastCheckDelay time.Duration } func newDepositSweepAction( @@ -102,27 +99,29 @@ func newDepositSweepAction( signingExecutor walletSigningExecutor, proposal *DepositSweepProposal, proposalProcessingStartBlock uint64, - proposalExpiresAt time.Time, + proposalExpiryBlock uint64, + waitForBlockFn waitForBlockFn, ) *depositSweepAction { transactionExecutor := newWalletTransactionExecutor( btcChain, sweepingWallet, signingExecutor, + waitForBlockFn, ) return &depositSweepAction{ - logger: logger, - chain: chain, - btcChain: btcChain, - sweepingWallet: sweepingWallet, - transactionExecutor: transactionExecutor, - proposal: proposal, - proposalProcessingStartBlock: proposalProcessingStartBlock, - proposalExpiresAt: proposalExpiresAt, - requiredFundingTxConfirmations: DepositSweepRequiredFundingTxConfirmations, - signingTimeoutSafetyMargin: depositSweepSigningTimeoutSafetyMargin, - broadcastTimeout: depositSweepBroadcastTimeout, - broadcastCheckDelay: depositSweepBroadcastCheckDelay, + logger: logger, + chain: chain, + btcChain: btcChain, + sweepingWallet: sweepingWallet, + transactionExecutor: transactionExecutor, + proposal: proposal, + proposalProcessingStartBlock: proposalProcessingStartBlock, + proposalExpiryBlock: proposalExpiryBlock, + requiredFundingTxConfirmations: DepositSweepRequiredFundingTxConfirmations, + signingTimeoutSafetyMarginBlocks: depositSweepSigningTimeoutSafetyMarginBlocks, + broadcastTimeout: depositSweepBroadcastTimeout, + broadcastCheckDelay: depositSweepBroadcastCheckDelay, } } @@ -188,11 +187,16 @@ func (dsa *depositSweepAction) execute() error { zap.String("step", "signTransaction"), ) + // Just in case. This should never happen. + if dsa.proposalExpiryBlock < dsa.signingTimeoutSafetyMarginBlocks { + return fmt.Errorf("invalid proposal expiry block") + } + sweepTx, err := dsa.transactionExecutor.signTransaction( signTxLogger, unsignedSweepTx, dsa.proposalProcessingStartBlock, - dsa.proposalExpiresAt.Add(-dsa.signingTimeoutSafetyMargin), + dsa.proposalExpiryBlock-dsa.signingTimeoutSafetyMarginBlocks, ) if err != nil { return fmt.Errorf("sign transaction step failed: [%v]", err) diff --git a/pkg/tbtc/deposit_sweep_test.go b/pkg/tbtc/deposit_sweep_test.go index 2174855fcb..d5f7ea14ee 100644 --- a/pkg/tbtc/deposit_sweep_test.go +++ b/pkg/tbtc/deposit_sweep_test.go @@ -1,6 +1,7 @@ package tbtc import ( + "context" "fmt" "math/big" "testing" @@ -22,8 +23,6 @@ func TestDepositSweepAction_Execute(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.Title, func(t *testing.T) { - now := time.Now() - hostChain := Connect() bitcoinChain := newLocalBitcoinChain() @@ -132,7 +131,8 @@ func TestDepositSweepAction_Execute(t *testing.T) { // Choose an arbitrary start block and expiration time. proposalProcessingStartBlock := uint64(100) - proposalExpiresAt := now.Add(4 * time.Hour) + proposalExpiryBlock := proposalProcessingStartBlock + + depositSweepProposalValidityBlocks // Simulate the on-chain proposal validation passes with success. err = hostChain.setDepositSweepProposalValidationResult( @@ -189,7 +189,10 @@ func TestDepositSweepAction_Execute(t *testing.T) { signingExecutor, proposal, proposalProcessingStartBlock, - proposalExpiresAt, + proposalExpiryBlock, + func(ctx context.Context, blockHeight uint64) error { + return nil + }, ) // Modify the default parameters of the action to make diff --git a/pkg/tbtc/heartbeat.go b/pkg/tbtc/heartbeat.go index b4945d91d4..807292b565 100644 --- a/pkg/tbtc/heartbeat.go +++ b/pkg/tbtc/heartbeat.go @@ -3,12 +3,10 @@ package tbtc import ( "context" "fmt" - "math/big" - "time" - "github.com/ipfs/go-log/v2" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/tecdsa" + "math/big" ) const ( @@ -18,20 +16,17 @@ const ( // take another actions. The value of 300 blocks is roughly 1 hour, assuming // 12 seconds per block. heartbeatProposalValidityBlocks = 300 - // heartbeatRequestConfirmationBlocks determines the block length of the - // confirmation period on the host chain that is preserved after a heartbeat - // request submission. - heartbeatRequestConfirmationBlocks = 3 - // heartbeatRequestTimeoutSafetyMargin determines the duration of the + // heartbeatRequestTimeoutSafetyMarginBlocks determines the duration of the // safety margin that must be preserved between the signing timeout // and the timeout of the entire heartbeat action. This safety // margin prevents against the case where signing completes too late and // another action has been already requested by the coordinator. - heartbeatRequestTimeoutSafetyMargin = 5 * time.Minute + // The value of 25 blocks is roughly 5 minutes, assuming 12 seconds per block. + heartbeatRequestTimeoutSafetyMarginBlocks = 25 ) type HeartbeatProposal struct { - Message []byte + Message [16]byte } func (hp *HeartbeatProposal) ActionType() WalletActionType { @@ -55,12 +50,13 @@ type heartbeatSigningExecutor interface { // heartbeatAction is a walletAction implementation handling heartbeat requests // from the wallet coordinator. type heartbeatAction struct { - logger log.StandardLogger - executingWallet wallet - signingExecutor heartbeatSigningExecutor - message []byte - startBlock uint64 - requestExpiresAt time.Time + logger log.StandardLogger + executingWallet wallet + signingExecutor heartbeatSigningExecutor + message []byte + startBlock uint64 + expiryBlock uint64 + waitForBlockFn waitForBlockFn } func newHeartbeatAction( @@ -69,15 +65,17 @@ func newHeartbeatAction( signingExecutor heartbeatSigningExecutor, message []byte, startBlock uint64, - requestExpiresAt time.Time, + expiryBlock uint64, + waitForBlockFn waitForBlockFn, ) *heartbeatAction { return &heartbeatAction{ - logger: logger, - executingWallet: executingWallet, - signingExecutor: signingExecutor, - message: message, - startBlock: startBlock, - requestExpiresAt: requestExpiresAt, + logger: logger, + executingWallet: executingWallet, + signingExecutor: signingExecutor, + message: message, + startBlock: startBlock, + expiryBlock: expiryBlock, + waitForBlockFn: waitForBlockFn, } } @@ -88,9 +86,15 @@ func (ha *heartbeatAction) execute() error { messageBytes := bitcoin.ComputeHash(ha.message) messageToSign := new(big.Int).SetBytes(messageBytes[:]) - heartbeatCtx, cancelHeartbeatCtx := context.WithTimeout( + // Just in case. This should never happen. + if ha.expiryBlock < heartbeatRequestTimeoutSafetyMarginBlocks { + return fmt.Errorf("invalid proposal expiry block") + } + + heartbeatCtx, cancelHeartbeatCtx := withCancelOnBlock( context.Background(), - time.Until(ha.requestExpiresAt.Add(-heartbeatRequestTimeoutSafetyMargin)), + ha.expiryBlock-heartbeatRequestTimeoutSafetyMarginBlocks, + ha.waitForBlockFn, ) defer cancelHeartbeatCtx() diff --git a/pkg/tbtc/heartbeat_test.go b/pkg/tbtc/heartbeat_test.go index a717bb4282..a218887f31 100644 --- a/pkg/tbtc/heartbeat_test.go +++ b/pkg/tbtc/heartbeat_test.go @@ -4,16 +4,15 @@ import ( "context" "encoding/hex" "fmt" - "math/big" - "testing" - "time" - "github.com/keep-network/keep-core/internal/testutils" "github.com/keep-network/keep-core/pkg/tecdsa" + "math/big" + "testing" ) func TestHeartbeatAction_HappyPath(t *testing.T) { startBlock := uint64(10) + expiryBlock := startBlock + heartbeatProposalValidityBlocks messageToSign, err := hex.DecodeString("FFFFFFFFFFFFFFFF0000000000000001") if err != nil { t.Fatal(err) @@ -31,7 +30,10 @@ func TestHeartbeatAction_HappyPath(t *testing.T) { mockExecutor, messageToSign, startBlock, - time.Now(), + expiryBlock, + func(ctx context.Context, blockHeight uint64) error { + return nil + }, ) err = action.execute() @@ -55,6 +57,7 @@ func TestHeartbeatAction_HappyPath(t *testing.T) { func TestHeartbeatAction_SigningError(t *testing.T) { startBlock := uint64(10) + expiryBlock := startBlock + heartbeatProposalValidityBlocks messageToSign, err := hex.DecodeString("FFFFFFFFFFFFFFFF0000000000000001") if err != nil { t.Fatal(err) @@ -69,7 +72,10 @@ func TestHeartbeatAction_SigningError(t *testing.T) { mockExecutor, messageToSign, startBlock, - time.Now(), + expiryBlock, + func(ctx context.Context, blockHeight uint64) error { + return nil + }, ) err = action.execute() diff --git a/pkg/tbtc/marshaling.go b/pkg/tbtc/marshaling.go index 88dacb5183..eb6899dc45 100644 --- a/pkg/tbtc/marshaling.go +++ b/pkg/tbtc/marshaling.go @@ -261,7 +261,7 @@ func (np *NoopProposal) Unmarshal([]byte) error { func (hp *HeartbeatProposal) Marshal() ([]byte, error) { return proto.Marshal( &pb.HeartbeatProposal{ - Message: hp.Message, + Message: hp.Message[:], }, ) } @@ -273,7 +273,17 @@ func (hp *HeartbeatProposal) Unmarshal(bytes []byte) error { return fmt.Errorf("failed to unmarshal HeartbeatProposal: [%v]", err) } - hp.Message = pbMsg.Message + if len(pbMsg.Message) != 16 { + return fmt.Errorf( + "invalid heartbeat message length: [%v]", + len(pbMsg.Message), + ) + } + + var message [16]byte + copy(message[:], pbMsg.Message) + + hp.Message = message return nil } diff --git a/pkg/tbtc/marshaling_test.go b/pkg/tbtc/marshaling_test.go index a2fbb73996..9092eb2ef4 100644 --- a/pkg/tbtc/marshaling_test.go +++ b/pkg/tbtc/marshaling_test.go @@ -134,7 +134,7 @@ func TestCoordinationMessage_MarshalingRoundtrip(t *testing.T) { }, "with heartbeat proposal": { proposal: &HeartbeatProposal{ - Message: []byte("heartbeat message"), + Message: [16]byte{0x01, 0x02}, }, }, "with deposit sweep proposal": { diff --git a/pkg/tbtc/node.go b/pkg/tbtc/node.go index 01fd69109e..c8738b06bd 100644 --- a/pkg/tbtc/node.go +++ b/pkg/tbtc/node.go @@ -8,7 +8,6 @@ import ( "math/big" "sort" "sync" - "time" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/chain" @@ -407,40 +406,25 @@ func (n *node) getCoordinationExecutor( return executor, true, nil } -// handleHeartbeatRequest handles an incoming wallet heartbeat request. -// First, it determines whether the node is supposed to do an action by checking -// whether any of the request's target wallet signers are under the node's control. -// If so, this function orchestrates and dispatches an appropriate wallet action. -func (n *node) handleHeartbeatRequest( - walletPublicKeyHash [20]byte, - message []byte, - requestExpiresAt time.Time, +// handleHeartbeatProposal handles an incoming heartbeat proposal by +// orchestrating and dispatching an appropriate wallet action. +func (n *node) handleHeartbeatProposal( + wallet wallet, + proposal *HeartbeatProposal, startBlock uint64, - delayBlocks uint64, + expiryBlock uint64, ) { - wallet, ok := n.walletRegistry.getWalletByPublicKeyHash( - walletPublicKeyHash, - ) - if !ok { - logger.Infof( - "node does not control signers of wallet PKH [0x%x]; "+ - "ignoring the received heartbeat request", - walletPublicKeyHash, - ) - return - } + walletPublicKeyHash := bitcoin.PublicKeyHash(wallet.publicKey) signingExecutor, ok, err := n.getSigningExecutor(wallet.publicKey) if err != nil { logger.Errorf("cannot get signing executor: [%v]", err) return } - // This check is actually redundant. We know the node controls some // wallet signers as we just got the wallet from the registry using their // public key hash. However, we are doing it just in case. The API - // contract of getWalletByPublicKeyHash and/or getSigningExecutor may - // change one day. + // contract of getSigningExecutor may change one day. if !ok { logger.Infof( "node does not control signers of wallet PKH [0x%x]; "+ @@ -464,15 +448,11 @@ func (n *node) handleHeartbeatRequest( walletPublicKeyBytes, ) - // The request processing started after a confirmation period represented - // by the delayBlocks parameter. Hence, we must add it to the original - // startBlock. - heartbeatRequestProcessingStartBlock := startBlock + delayBlocks - walletActionLogger := logger.With( zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)), zap.String("action", ActionHeartbeat.String()), - zap.Uint64("startBlock", heartbeatRequestProcessingStartBlock), + zap.Uint64("startBlock", startBlock), + zap.Uint64("expiryBlock", expiryBlock), ) walletActionLogger.Infof("dispatching wallet action") @@ -480,9 +460,10 @@ func (n *node) handleHeartbeatRequest( walletActionLogger, wallet, signingExecutor, - message, - heartbeatRequestProcessingStartBlock, - requestExpiresAt, + proposal.Message[:], + startBlock, + expiryBlock, + n.waitForBlockHeight, ) err = n.walletDispatcher.dispatch(action) @@ -494,28 +475,14 @@ func (n *node) handleHeartbeatRequest( walletActionLogger.Infof("wallet action dispatched successfully") } -// handleDepositSweepProposal handles an incoming deposit sweep proposal. -// First, it determines whether the node is supposed to do an action by checking -// whether any of the proposal's target wallet signers are under node's control. -// If so, this function orchestrates and dispatches an appropriate wallet action. +// handleDepositSweepProposal handles an incoming deposit sweep proposal by +// orchestrating and dispatching an appropriate wallet action. func (n *node) handleDepositSweepProposal( + wallet wallet, proposal *DepositSweepProposal, - proposalExpiresAt time.Time, startBlock uint64, - delayBlocks uint64, + expiryBlock uint64, ) { - wallet, ok := n.walletRegistry.getWalletByPublicKeyHash( - proposal.WalletPublicKeyHash, - ) - if !ok { - logger.Infof( - "node does not control signers of wallet PKH [0x%x]; "+ - "ignoring the received deposit sweep proposal", - proposal.WalletPublicKeyHash, - ) - return - } - signingExecutor, ok, err := n.getSigningExecutor(wallet.publicKey) if err != nil { logger.Errorf("cannot get signing executor: [%v]", err) @@ -524,8 +491,7 @@ func (n *node) handleDepositSweepProposal( // This check is actually redundant. We know the node controls some // wallet signers as we just got the wallet from the registry using their // public key hash. However, we are doing it just in case. The API - // contract of getWalletByPublicKeyHash and/or getSigningExecutor may - // change one day. + // contract of getSigningExecutor may change one day. if !ok { logger.Infof( "node does not control signers of wallet PKH [0x%x]; "+ @@ -549,15 +515,11 @@ func (n *node) handleDepositSweepProposal( walletPublicKeyBytes, ) - // The proposal's processing started after a confirmation period represented - // by the delayBlocks parameter. Hence, we must add it to the original - // startBlock. - proposalProcessingStartBlock := startBlock + delayBlocks - walletActionLogger := logger.With( zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)), zap.String("action", ActionDepositSweep.String()), - zap.Uint64("startBlock", proposalProcessingStartBlock), + zap.Uint64("startBlock", startBlock), + zap.Uint64("expiryBlock", expiryBlock), ) walletActionLogger.Infof("dispatching wallet action") @@ -568,8 +530,9 @@ func (n *node) handleDepositSweepProposal( wallet, signingExecutor, proposal, - proposalProcessingStartBlock, - proposalExpiresAt, + startBlock, + expiryBlock, + n.waitForBlockHeight, ) err = n.walletDispatcher.dispatch(action) @@ -581,28 +544,14 @@ func (n *node) handleDepositSweepProposal( walletActionLogger.Infof("wallet action dispatched successfully") } -// handleRedemptionProposal handles an incoming redemption proposal. -// First, it determines whether the node is supposed to do an action by checking -// whether any of the proposal's target wallet signers are under node's control. -// If so, this function orchestrates and dispatches an appropriate wallet action. +// handleRedemptionProposal handles an incoming redemption proposal by +// orchestrating and dispatching an appropriate wallet action. func (n *node) handleRedemptionProposal( + wallet wallet, proposal *RedemptionProposal, - proposalExpiresAt time.Time, startBlock uint64, - delayBlocks uint64, + expiryBlock uint64, ) { - wallet, ok := n.walletRegistry.getWalletByPublicKeyHash( - proposal.WalletPublicKeyHash, - ) - if !ok { - logger.Infof( - "node does not control signers of wallet PKH [0x%x]; "+ - "ignoring the received redemption proposal", - proposal.WalletPublicKeyHash, - ) - return - } - signingExecutor, ok, err := n.getSigningExecutor(wallet.publicKey) if err != nil { logger.Errorf("cannot get signing executor: [%v]", err) @@ -611,8 +560,7 @@ func (n *node) handleRedemptionProposal( // This check is actually redundant. We know the node controls some // wallet signers as we just got the wallet from the registry using their // public key hash. However, we are doing it just in case. The API - // contract of getWalletByPublicKeyHash and/or getSigningExecutor may - // change one day. + // contract of getSigningExecutor may change one day. if !ok { logger.Infof( "node does not control signers of wallet PKH [0x%x]; "+ @@ -636,15 +584,11 @@ func (n *node) handleRedemptionProposal( walletPublicKeyBytes, ) - // The proposal's processing started after a confirmation period represented - // by the delayBlocks parameter. Hence, we must add it to the original - // startBlock. - proposalProcessingStartBlock := startBlock + delayBlocks - walletActionLogger := logger.With( zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)), zap.String("action", ActionRedemption.String()), - zap.Uint64("startBlock", proposalProcessingStartBlock), + zap.Uint64("startBlock", startBlock), + zap.Uint64("expiryBlock", expiryBlock), ) walletActionLogger.Infof("dispatching wallet action") @@ -655,8 +599,9 @@ func (n *node) handleRedemptionProposal( wallet, signingExecutor, proposal, - proposalProcessingStartBlock, - proposalExpiresAt, + startBlock, + expiryBlock, + n.waitForBlockHeight, ) err = n.walletDispatcher.dispatch(action) @@ -1066,16 +1011,59 @@ func executeCoordinationProcedure( func processCoordinationResult(node *node, result *coordinationResult) { logger.Infof("processing coordination result [%s]", result) - // TODO: Record coordination faults. + // TODO: In the future, create coordination faults cache and + // record faults from the processed results there. + + startBlock := result.window.endBlock() + expiryBlock := startBlock + result.proposal.ValidityBlocks() - // TODO: Detect proposal type and run the appropriate handler. switch result.proposal.ActionType() { case ActionHeartbeat: - // node.handleHeartbeatRequest() + if proposal, ok := result.proposal.(*HeartbeatProposal); ok { + node.handleHeartbeatProposal( + result.wallet, + proposal, + startBlock, + expiryBlock, + ) + } case ActionDepositSweep: - // node.handleDepositSweepProposal() + if proposal, ok := result.proposal.(*DepositSweepProposal); ok { + node.handleDepositSweepProposal( + result.wallet, + proposal, + startBlock, + expiryBlock, + ) + } case ActionRedemption: - // node.handleRedemptionProposal() + if proposal, ok := result.proposal.(*RedemptionProposal); ok { + node.handleRedemptionProposal( + result.wallet, + proposal, + startBlock, + expiryBlock, + ) + } + // TODO: Uncomment when moving funds support is implemented. + // case ActionMovingFunds: + // if proposal, ok := result.proposal.(*MovingFundsProposal); ok { + // node.handleMovingFundsProposal( + // result.wallet, + // proposal, + // startBlock, + // expiryBlock, + // ) + // } + // case ActionMovedFundsSweep: + // if proposal, ok := result.proposal.(*MovedFundsSweepProposal); ok { + // node.handleMovedFundsSweepProposal( + // result.wallet, + // proposal, + // startBlock, + // expiryBlock, + // ) + // } default: logger.Errorf("no handler for coordination result [%s]", result) } diff --git a/pkg/tbtc/redemption.go b/pkg/tbtc/redemption.go index c675cf7313..55ba2d293d 100644 --- a/pkg/tbtc/redemption.go +++ b/pkg/tbtc/redemption.go @@ -21,11 +21,7 @@ const ( // another actions. The value of 600 blocks is roughly 2 hours, assuming // 12 seconds per block. redemptionProposalValidityBlocks = 600 - // redemptionProposalConfirmationBlocks determines the block length of the - // confirmation period on the host chain that is preserved after a - // redemption proposal submission. - redemptionProposalConfirmationBlocks = 20 - // redemptionSigningTimeoutSafetyMargin determines the duration of the + // redemptionSigningTimeoutSafetyMarginBlocks determines the duration of the // safety margin that must be preserved between the signing timeout // and the timeout of the entire redemption action. This safety // margin prevents against the case where signing completes late and there @@ -33,17 +29,18 @@ const ( // In such a case, wallet signatures may leak and make the wallet subject // of fraud accusations. Usage of the safety margin ensures there is enough // time to perform post-signing steps of the redemption action. - redemptionSigningTimeoutSafetyMargin = 1 * time.Hour + // The value of 300 blocks is roughly 1 hour, assuming 12 seconds per block. + redemptionSigningTimeoutSafetyMarginBlocks = 300 // redemptionBroadcastTimeout determines the time window for redemption // transaction broadcast. It is guaranteed that at least - // redemptionSigningTimeoutSafetyMargin is preserved for the broadcast + // redemptionSigningTimeoutSafetyMarginBlocks is preserved for the broadcast // step. However, the happy path for the broadcast step is usually quick // and few retries are needed to recover from temporary problems. That // said, if the broadcast step does not succeed in a tight timeframe, // there is no point to retry for the entire possible time window. // Hence, the timeout for broadcast step is set as 25% of the entire - // time widow determined by redemptionSigningTimeoutSafetyMargin. - redemptionBroadcastTimeout = redemptionSigningTimeoutSafetyMargin / 4 + // time widow determined by redemptionSigningTimeoutSafetyMarginBlocks. + redemptionBroadcastTimeout = 15 * time.Minute // redemptionBroadcastCheckDelay determines the delay that must // be preserved between transaction broadcast and the check that ensures // the transaction is known on the Bitcoin chain. This delay is needed @@ -115,11 +112,11 @@ type redemptionAction struct { proposal *RedemptionProposal proposalProcessingStartBlock uint64 - proposalExpiresAt time.Time + proposalExpiryBlock uint64 - signingTimeoutSafetyMargin time.Duration - broadcastTimeout time.Duration - broadcastCheckDelay time.Duration + signingTimeoutSafetyMarginBlocks uint64 + broadcastTimeout time.Duration + broadcastCheckDelay time.Duration feeDistribution redemptionFeeDistributionFn transactionShape RedemptionTransactionShape @@ -133,30 +130,32 @@ func newRedemptionAction( signingExecutor walletSigningExecutor, proposal *RedemptionProposal, proposalProcessingStartBlock uint64, - proposalExpiresAt time.Time, + proposalExpiryBlock uint64, + waitForBlockFn waitForBlockFn, ) *redemptionAction { transactionExecutor := newWalletTransactionExecutor( btcChain, redeemingWallet, signingExecutor, + waitForBlockFn, ) feeDistribution := withRedemptionTotalFee(proposal.RedemptionTxFee.Int64()) return &redemptionAction{ - logger: logger, - chain: chain, - btcChain: btcChain, - redeemingWallet: redeemingWallet, - transactionExecutor: transactionExecutor, - proposal: proposal, - proposalProcessingStartBlock: proposalProcessingStartBlock, - proposalExpiresAt: proposalExpiresAt, - signingTimeoutSafetyMargin: redemptionSigningTimeoutSafetyMargin, - broadcastTimeout: redemptionBroadcastTimeout, - broadcastCheckDelay: redemptionBroadcastCheckDelay, - feeDistribution: feeDistribution, - transactionShape: RedemptionChangeFirst, + logger: logger, + chain: chain, + btcChain: btcChain, + redeemingWallet: redeemingWallet, + transactionExecutor: transactionExecutor, + proposal: proposal, + proposalProcessingStartBlock: proposalProcessingStartBlock, + proposalExpiryBlock: proposalExpiryBlock, + signingTimeoutSafetyMarginBlocks: redemptionSigningTimeoutSafetyMarginBlocks, + broadcastTimeout: redemptionBroadcastTimeout, + broadcastCheckDelay: redemptionBroadcastCheckDelay, + feeDistribution: feeDistribution, + transactionShape: RedemptionChangeFirst, } } @@ -227,11 +226,16 @@ func (ra *redemptionAction) execute() error { zap.String("step", "signTransaction"), ) + // Just in case. This should never happen. + if ra.proposalExpiryBlock < ra.signingTimeoutSafetyMarginBlocks { + return fmt.Errorf("invalid proposal expiry block") + } + redemptionTx, err := ra.transactionExecutor.signTransaction( signTxLogger, unsignedRedemptionTx, ra.proposalProcessingStartBlock, - ra.proposalExpiresAt.Add(-ra.signingTimeoutSafetyMargin), + ra.proposalExpiryBlock-ra.signingTimeoutSafetyMarginBlocks, ) if err != nil { return fmt.Errorf("sign transaction step failed: [%v]", err) diff --git a/pkg/tbtc/redemption_test.go b/pkg/tbtc/redemption_test.go index 6a15f10410..8d0f2b624f 100644 --- a/pkg/tbtc/redemption_test.go +++ b/pkg/tbtc/redemption_test.go @@ -1,6 +1,7 @@ package tbtc import ( + "context" "github.com/keep-network/keep-core/pkg/tecdsa" "math/big" "testing" @@ -22,8 +23,6 @@ func TestRedemptionAction_Execute(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.Title, func(t *testing.T) { - now := time.Now() - hostChain := Connect() bitcoinChain := newLocalBitcoinChain() @@ -77,7 +76,8 @@ func TestRedemptionAction_Execute(t *testing.T) { // Choose an arbitrary start block and expiration time. proposalProcessingStartBlock := uint64(100) - proposalExpiresAt := now.Add(4 * time.Hour) + proposalExpiryBlock := proposalProcessingStartBlock + + redemptionProposalValidityBlocks // Simulate the on-chain proposal validation passes with success. err = hostChain.setRedemptionProposalValidationResult( @@ -130,7 +130,10 @@ func TestRedemptionAction_Execute(t *testing.T) { signingExecutor, proposal, proposalProcessingStartBlock, - proposalExpiresAt, + proposalExpiryBlock, + func(ctx context.Context, blockHeight uint64) error { + return nil + }, ) // Modify the default parameters of the action to make diff --git a/pkg/tbtc/tbtc.go b/pkg/tbtc/tbtc.go index a3e0ccb010..e90356b2e9 100644 --- a/pkg/tbtc/tbtc.go +++ b/pkg/tbtc/tbtc.go @@ -264,310 +264,6 @@ func Initialize( }() }) - // Set up a handler of a wallet heartbeat request coming from the - // WalletCoordinator on-chain contract. Once an event is seen, a handler - // goroutine makes sure that the observed event is not duplicate, waits - // a fixed confirmation period, and ensures the on-chain state justifies - // the occurrence of the event. Once done, the original event is used - // to trigger the heartbeat request action. The handler does not care about - // possible subsequent events being a result of chain reorgs. This is because - // the WalletCoordinator contract is just a coordination point based on - // the chain consensus. If enough clients received the event, they should - // follow it and execute a heartbeat signing. The message for that heartbeat - // request was validated in the contract, so even if there was a reorg and - // another event landed on the canonical chain later, the signature - // will still be valid and not lead to fraud. The only reason the handler - // waits a fixed confirmation period after receiving the coordination event - // is to make sure the right type of action is executed given different types - // of actions may have different lock times. We do not want to run into a - // situation when the majority of clients execute heartbeat with N blocks - // wallet lock time and the chain has M < N blocks wallet lock time because - // the canonical chain - as a result of a reorg - is supposed to execute - // e.g. redemption. - _ = chain.OnHeartbeatRequestSubmitted(func(event *HeartbeatRequestSubmittedEvent) { - go func() { - walletPublicKeyHash := event.WalletPublicKeyHash - message := event.Message - - if ok := deduplicator.notifyHeartbeatRequestSubmitted( - walletPublicKeyHash, - message, - ); !ok { - logger.Infof( - "heartbeat request for wallet PKH [0x%x] and message [0x%x] "+ - "has been already processed", - walletPublicKeyHash, - message, - ) - return - } - - confirmationBlock := event.BlockNumber + - heartbeatRequestConfirmationBlocks - - logger.Infof( - "observed heartbeat request for wallet PKH [0x%x] "+ - "at block [%v]; waiting for block [%v] to confirm", - walletPublicKeyHash, - event.BlockNumber, - confirmationBlock, - ) - - err := node.waitForBlockHeight(ctx, confirmationBlock) - if err != nil { - logger.Errorf( - "failed to confirm heartbeat request for "+ - "wallet PKH [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - return - } - - expiresAt, cause, err := chain.GetWalletLock( - walletPublicKeyHash, - ) - if err != nil { - logger.Errorf( - "failed to get lock for wallet PKH [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - return - } - - // The event is confirmed if the wallet is locked due to a heartbeat - // action. - if time.Now().Before(expiresAt) && cause == ActionHeartbeat { - logger.Infof( - "heartbeat request submitted for "+ - "wallet PKH [0x%x] at block [%v] by [%v]", - walletPublicKeyHash, - event.BlockNumber, - event.Coordinator, - ) - - node.handleHeartbeatRequest( - walletPublicKeyHash, - message, - expiresAt, - event.BlockNumber, - heartbeatRequestConfirmationBlocks, - ) - } else { - logger.Infof( - "heartbeat request for wallet PKH [0x%x] "+ - "at block [%v] was not confirmed; existing wallet lock "+ - "has unexpected expiration time [%s] and/or cause [%v]", - walletPublicKeyHash, - event.BlockNumber, - expiresAt, - cause, - ) - } - }() - }) - - // Set up a handler of deposit sweep proposals coming from the - // WalletCoordinator on-chain contract. Once an event is seen, a handler - // goroutine makes sure that the observed event is not a duplicate, waits - // a fixed confirmation period, and ensures the on-chain state justifies - // the occurrence of the event. Once done, the original event is used - // to trigger the deposit sweep action. The handler does not care about - // possible subsequent events being result of chain reorgs. This is because - // the WalletCoordinator contract is just a coordination point based on - // the chain consensus. If enough clients received the event, they should - // follow it and execute a signature. All input parameters for that - // signature are validated, so even if there was a reorg and another event - // landed on the canonical chain later, the first signature will still be - // valid and approved by Bitcoin. The only reason the handler waits a - // fixed confirmation period after receiving the coordination event is to - // make sure the right type of action is executed given different types of - // actions may have different lock times. We do not want to run into a - // situation when the majority of clients execute sweep with N blocks wallet - // lock time and the chain has M < N blocks wallet lock time because the - // canonical chain - as a result of a reorg - is supposed to execute - // e.g. redemption. - _ = chain.OnDepositSweepProposalSubmitted(func(event *DepositSweepProposalSubmittedEvent) { - go func() { - walletPublicKeyHash := event.Proposal.WalletPublicKeyHash - - if ok := deduplicator.notifyDepositSweepProposalSubmitted( - event.Proposal, - ); !ok { - logger.Infof( - "deposit sweep proposal for wallet PKH [0x%x] "+ - "has been already processed", - walletPublicKeyHash, - ) - return - } - - confirmationBlock := event.BlockNumber + - depositSweepProposalConfirmationBlocks - - logger.Infof( - "observed deposit sweep proposal for wallet PKH [0x%x] "+ - "at block [%v]; waiting for block [%v] to confirm", - walletPublicKeyHash, - event.BlockNumber, - confirmationBlock, - ) - - err := node.waitForBlockHeight(ctx, confirmationBlock) - if err != nil { - logger.Errorf( - "failed to confirm deposit sweep proposal for "+ - "wallet PKH [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - return - } - - expiresAt, cause, err := chain.GetWalletLock( - walletPublicKeyHash, - ) - if err != nil { - logger.Errorf( - "failed to get lock for wallet PKH [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - return - } - - // The event is confirmed if the wallet is locked due to a deposit - // sweep action. - if time.Now().Before(expiresAt) && cause == ActionDepositSweep { - logger.Infof( - "deposit sweep proposal submitted for "+ - "wallet PKH [0x%x] at block [%v] by [%v]", - walletPublicKeyHash, - event.BlockNumber, - event.Coordinator, - ) - - node.handleDepositSweepProposal( - event.Proposal, - expiresAt, - event.BlockNumber, - depositSweepProposalConfirmationBlocks, - ) - } else { - logger.Infof( - "deposit sweep proposal for wallet PKH [0x%x] "+ - "at block [%v] was not confirmed; existing wallet lock "+ - "has unexpected expiration time [%s] and/or cause [%v]", - walletPublicKeyHash, - event.BlockNumber, - expiresAt, - cause, - ) - } - }() - }) - - // Set up a handler of redemption proposals coming from the - // WalletCoordinator on-chain contract. Once an event is seen, a handler - // goroutine makes sure that the observed event is not a duplicate, waits - // a fixed confirmation period, and ensures the on-chain state justifies - // the occurrence of the event. Once done, the original event is used - // to trigger the redemption action. The handler does not care about - // possible subsequent events being result of chain reorgs. This is because - // the WalletCoordinator contract is just a coordination point based on - // the chain consensus. If enough clients received the event, they should - // follow it and execute a signature. All input parameters for that - // signature are validated, so even if there was a reorg and another event - // landed on the canonical chain later, the first signature will still be - // valid and approved by Bitcoin. The only reason the handler waits a - // fixed confirmation period after receiving the coordination event is to - // make sure the right type of action is executed given different types of - // actions may have different lock times. We do not want to run into a - // situation when the majority of clients execute redemption with N blocks - // wallet lock time and the chain has M < N blocks wallet lock time because - // the canonical chain - as a result of a reorg - is supposed to execute - // e.g. wallet heartbeat. - _ = chain.OnRedemptionProposalSubmitted(func(event *RedemptionProposalSubmittedEvent) { - go func() { - walletPublicKeyHash := event.Proposal.WalletPublicKeyHash - - if ok := deduplicator.notifyRedemptionProposalSubmitted( - event.Proposal, - ); !ok { - logger.Infof( - "redemption proposal for wallet PKH [0x%x] "+ - "has been already processed", - walletPublicKeyHash, - ) - return - } - - confirmationBlock := event.BlockNumber + - redemptionProposalConfirmationBlocks - - logger.Infof( - "observed redemption proposal for wallet PKH [0x%x] "+ - "at block [%v]; waiting for block [%v] to confirm", - walletPublicKeyHash, - event.BlockNumber, - confirmationBlock, - ) - - err := node.waitForBlockHeight(ctx, confirmationBlock) - if err != nil { - logger.Errorf( - "failed to confirm redemption proposal for "+ - "wallet PKH [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - return - } - - expiresAt, cause, err := chain.GetWalletLock( - walletPublicKeyHash, - ) - if err != nil { - logger.Errorf( - "failed to get lock for wallet PKH [0x%x]: [%v]", - walletPublicKeyHash, - err, - ) - return - } - - // The event is confirmed if the wallet is locked due to a - // redemption action. - if time.Now().Before(expiresAt) && cause == ActionRedemption { - logger.Infof( - "redemption proposal submitted for "+ - "wallet PKH [0x%x] at block [%v] by [%v]", - walletPublicKeyHash, - event.BlockNumber, - event.Coordinator, - ) - - node.handleRedemptionProposal( - event.Proposal, - expiresAt, - event.BlockNumber, - redemptionProposalConfirmationBlocks, - ) - } else { - logger.Infof( - "redemption proposal for wallet PKH [0x%x] "+ - "at block [%v] was not confirmed; existing wallet lock "+ - "has unexpected expiration time [%s] and/or cause [%v]", - walletPublicKeyHash, - event.BlockNumber, - expiresAt, - cause, - ) - } - }() - }) - return nil } diff --git a/pkg/tbtc/wallet.go b/pkg/tbtc/wallet.go index 9c34c69b44..e5201a702a 100644 --- a/pkg/tbtc/wallet.go +++ b/pkg/tbtc/wallet.go @@ -199,17 +199,21 @@ type walletTransactionExecutor struct { executingWallet wallet signingExecutor walletSigningExecutor + + waitForBlockFn waitForBlockFn } func newWalletTransactionExecutor( btcChain bitcoin.Chain, executingWallet wallet, signingExecutor walletSigningExecutor, + waitForBlockFn waitForBlockFn, ) *walletTransactionExecutor { return &walletTransactionExecutor{ btcChain: btcChain, executingWallet: executingWallet, signingExecutor: signingExecutor, + waitForBlockFn: waitForBlockFn, } } @@ -220,7 +224,7 @@ func (wte *walletTransactionExecutor) signTransaction( signTxLogger log.StandardLogger, unsignedTx *bitcoin.TransactionBuilder, signingStartBlock uint64, - signingTimesOutAt time.Time, + signingTimeoutBlock uint64, ) (*bitcoin.Transaction, error) { signTxLogger.Infof("computing transaction's sig hashes") @@ -234,9 +238,10 @@ func (wte *walletTransactionExecutor) signTransaction( signTxLogger.Infof("signing transaction's sig hashes") - signingCtx, cancelSigningCtx := context.WithTimeout( + signingCtx, cancelSigningCtx := withCancelOnBlock( context.Background(), - time.Until(signingTimesOutAt), + signingTimeoutBlock, + wte.waitForBlockFn, ) defer cancelSigningCtx() diff --git a/pkg/tbtcpg/chain.go b/pkg/tbtcpg/chain.go index e616a5bc26..23114615d9 100644 --- a/pkg/tbtcpg/chain.go +++ b/pkg/tbtcpg/chain.go @@ -1,30 +1,19 @@ package tbtcpg import ( - "github.com/keep-network/keep-core/pkg/bitcoin" - "github.com/keep-network/keep-core/pkg/chain" "math/big" "time" + "github.com/keep-network/keep-core/pkg/bitcoin" + "github.com/keep-network/keep-core/pkg/chain" + "github.com/keep-network/keep-core/pkg/tbtc" ) // Chain represents the interface that the wallet maintainer module expects // to interact with the anchoring blockchain on. type Chain interface { - // GetDepositRequest gets the on-chain deposit request for the given - // funding transaction hash and output index.The returned values represent: - // - deposit request which is non-nil only when the deposit request was - // found, - // - boolean value which is true if the deposit request was found, false - // otherwise, - // - error which is non-nil only when the function execution failed. It will - // be nil if the deposit request was not found, but the function execution - // succeeded. - GetDepositRequest( - fundingTxHash bitcoin.Hash, - fundingOutputIndex uint32, - ) (*tbtc.DepositChainRequest, bool, error) + tbtc.BridgeChain // PastNewWalletRegisteredEvents fetches past new wallet registered events // according to the provided filter or unfiltered if the filter is nil. Returned @@ -86,22 +75,6 @@ type Chain interface { // a processing. GetRedemptionRequestMinAge() (uint32, error) - // PastDepositRevealedEvents fetches past deposit reveal events according - // to the provided filter or unfiltered if the filter is nil. Returned - // events are sorted by the block number in the ascending order, i.e. the - // latest event is at the end of the slice. - PastDepositRevealedEvents( - filter *tbtc.DepositRevealedEventFilter, - ) ([]*tbtc.DepositRevealedEvent, error) - - // GetPendingRedemptionRequest gets the on-chain pending redemption request - // for the given wallet public key hash and redeemer output script. - // The returned bool value indicates whether the request was found or not. - GetPendingRedemptionRequest( - walletPublicKeyHash [20]byte, - redeemerOutputScript bitcoin.Script, - ) (*tbtc.RedemptionRequest, bool, error) - // ValidateDepositSweepProposal validates the given deposit sweep proposal // against the chain. It requires some additional data about the deposits // that must be fetched externally. Returns an error if the proposal is @@ -126,4 +99,9 @@ type Chain interface { BlockCounter() (chain.BlockCounter, error) AverageBlockTime() time.Duration + + // ValidateHeartbeatProposal validates the given heartbeat proposal + // against the chain. Returns an error if the proposal is not valid or + // nil otherwise. + ValidateHeartbeatProposal(proposal *tbtc.HeartbeatProposal) error } diff --git a/pkg/tbtcpg/chain_test.go b/pkg/tbtcpg/chain_test.go index f264c91c3d..b1e09bc9f6 100644 --- a/pkg/tbtcpg/chain_test.go +++ b/pkg/tbtcpg/chain_test.go @@ -48,6 +48,7 @@ type LocalChain struct { averageBlockTime time.Duration pendingRedemptionRequests map[[32]byte]*tbtc.RedemptionRequest redemptionProposalValidations map[[32]byte]bool + heartbeatProposalValidations map[[16]byte]bool } func NewLocalChain() *LocalChain { @@ -59,6 +60,7 @@ func NewLocalChain() *LocalChain { pastRedemptionRequestedEvents: make(map[[32]byte][]*tbtc.RedemptionRequestedEvent), pendingRedemptionRequests: make(map[[32]byte]*tbtc.RedemptionRequest), redemptionProposalValidations: make(map[[32]byte]bool), + heartbeatProposalValidations: make(map[[16]byte]bool), } } @@ -568,6 +570,34 @@ func (lc *LocalChain) SetRedemptionProposalValidationResult( return nil } +func (lc *LocalChain) ValidateHeartbeatProposal( + proposal *tbtc.HeartbeatProposal, +) error { + lc.mutex.Lock() + defer lc.mutex.Unlock() + + result, ok := lc.heartbeatProposalValidations[proposal.Message] + if !ok { + return fmt.Errorf("validation result unknown") + } + + if !result { + return fmt.Errorf("validation failed") + } + + return nil +} + +func (lc *LocalChain) SetHeartbeatProposalValidationResult( + proposal *tbtc.HeartbeatProposal, + result bool, +) { + lc.mutex.Lock() + defer lc.mutex.Unlock() + + lc.heartbeatProposalValidations[proposal.Message] = result +} + func buildRedemptionProposalValidationKey( proposal *tbtc.RedemptionProposal, ) ([32]byte, error) { @@ -634,6 +664,17 @@ func (lc *LocalChain) SetAverageBlockTime(averageBlockTime time.Duration) { lc.averageBlockTime = averageBlockTime } +func (lc *LocalChain) GetWallet(walletPublicKeyHash [20]byte) ( + *tbtc.WalletChainData, + error, +) { + panic("unsupported") +} + +func (lc *LocalChain) ComputeMainUtxoHash(mainUtxo *bitcoin.UnspentTransactionOutput) [32]byte { + panic("unsupported") +} + type MockBlockCounter struct { mutex sync.Mutex currentBlock uint64 diff --git a/pkg/tbtcpg/deposit_sweep.go b/pkg/tbtcpg/deposit_sweep.go index 8df52b76d5..771e0c5934 100644 --- a/pkg/tbtcpg/deposit_sweep.go +++ b/pkg/tbtcpg/deposit_sweep.go @@ -2,12 +2,13 @@ package tbtcpg import ( "fmt" - "github.com/ipfs/go-log/v2" - "go.uber.org/zap" "math" "math/big" "sort" + "github.com/ipfs/go-log/v2" + "go.uber.org/zap" + "github.com/keep-network/keep-core/internal/hexutils" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/tbtc" @@ -31,11 +32,13 @@ func NewDepositSweepTask( } } -func (dst *DepositSweepTask) Run(walletPublicKeyHash [20]byte) ( +func (dst *DepositSweepTask) Run(request *tbtc.CoordinationProposalRequest) ( tbtc.CoordinationProposal, bool, error, ) { + walletPublicKeyHash := request.WalletPublicKeyHash + taskLogger := logger.With( zap.String("task", dst.ActionType().String()), zap.String("walletPKH", fmt.Sprintf("0x%x", walletPublicKeyHash)), diff --git a/pkg/tbtcpg/heartbeat.go b/pkg/tbtcpg/heartbeat.go new file mode 100644 index 0000000000..7f4bf36d67 --- /dev/null +++ b/pkg/tbtcpg/heartbeat.go @@ -0,0 +1,64 @@ +package tbtcpg + +import ( + "crypto/sha256" + "encoding/binary" + "fmt" + + "github.com/keep-network/keep-core/pkg/tbtc" +) + +// HeartbeatTask is a task that may produce a heartbeat proposal. +type HeartbeatTask struct { + chain Chain +} + +func NewHeartbeatTask(chain Chain) *HeartbeatTask { + return &HeartbeatTask{ + chain: chain, + } +} + +func (ht *HeartbeatTask) Run(request *tbtc.CoordinationProposalRequest) ( + tbtc.CoordinationProposal, + bool, + error, +) { + walletPublicKeyHash := request.WalletPublicKeyHash + + blockCounter, err := ht.chain.BlockCounter() + if err != nil { + return nil, false, fmt.Errorf("failed to get block counter: [%v]", err) + } + + block, err := blockCounter.CurrentBlock() + if err != nil { + return nil, false, fmt.Errorf("failed to get current block: [%v]", err) + } + blockBytes := make([]byte, 8) + binary.BigEndian.PutUint64(blockBytes, block) + + hash := sha256.Sum256(append(walletPublicKeyHash[:], blockBytes...)) + + message := [16]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], + } + + proposal := &tbtc.HeartbeatProposal{ + Message: message, + } + + if err := ht.chain.ValidateHeartbeatProposal(proposal); err != nil { + return nil, false, fmt.Errorf( + "failed to verify heartbeat proposal: [%v]", + err, + ) + } + + return proposal, true, nil +} + +func (ht *HeartbeatTask) ActionType() tbtc.WalletActionType { + return tbtc.ActionHeartbeat +} diff --git a/pkg/tbtcpg/heartbeat_test.go b/pkg/tbtcpg/heartbeat_test.go new file mode 100644 index 0000000000..8970510ae5 --- /dev/null +++ b/pkg/tbtcpg/heartbeat_test.go @@ -0,0 +1,88 @@ +package tbtcpg + +import ( + "fmt" + "reflect" + "testing" + + "github.com/keep-network/keep-core/internal/testutils" + "github.com/keep-network/keep-core/pkg/tbtc" +) + +func TestHeartbeatTask_Run(t *testing.T) { + tests := map[string]struct { + validationResult bool + expectedProposal tbtc.CoordinationProposal + expectedOk bool + expectedErr error + }{ + "valid proposal": { + validationResult: true, + expectedProposal: &tbtc.HeartbeatProposal{ + Message: [16]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xe0, 0xd7, 0x5a, 0xec, 0xd2, 0x9e, 0x5b, 0xca, + }, + }, + expectedOk: true, + expectedErr: nil, + }, + "invalid proposal": { + validationResult: false, + expectedProposal: nil, + expectedOk: false, + expectedErr: fmt.Errorf( + "failed to verify heartbeat proposal: [validation failed]", + ), + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + tbtcChain := NewLocalChain() + blockCounter := NewMockBlockCounter() + + blockCounter.SetCurrentBlock(900) + tbtcChain.SetBlockCounter(blockCounter) + + tbtcChain.SetHeartbeatProposalValidationResult( + &tbtc.HeartbeatProposal{ + Message: [16]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xe0, 0xd7, 0x5a, 0xec, 0xd2, 0x9e, 0x5b, 0xca, + }, + }, + test.validationResult, + ) + + walletPublicKeyHash := [20]byte{0x01, 0x02} + + task := NewHeartbeatTask(tbtcChain) + + proposal, ok, err := task.Run( + &tbtc.CoordinationProposalRequest{ + // Set only relevant fields. + WalletPublicKeyHash: walletPublicKeyHash, + }, + ) + + if !reflect.DeepEqual(test.expectedErr, err) { + t.Errorf( + "unexpected error\nexpected: [%v]\nactual: [%v]", + test.expectedErr, + err, + ) + } + + testutils.AssertBoolsEqual(t, "boolean flag", test.expectedOk, ok) + + if !reflect.DeepEqual(test.expectedProposal, proposal) { + t.Errorf( + "unexpected proposal\nexpected: [%v]\nactual: [%v]", + test.expectedProposal, + proposal, + ) + } + }) + } +} diff --git a/pkg/tbtcpg/redemptions.go b/pkg/tbtcpg/redemptions.go index 54fc6aac1e..b1a993dcb0 100644 --- a/pkg/tbtcpg/redemptions.go +++ b/pkg/tbtcpg/redemptions.go @@ -2,12 +2,13 @@ package tbtcpg import ( "fmt" - "github.com/ipfs/go-log/v2" - "go.uber.org/zap" "math/big" "sort" "time" + "github.com/ipfs/go-log/v2" + "go.uber.org/zap" + "github.com/keep-network/keep-core/internal/hexutils" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/tbtc" @@ -29,11 +30,13 @@ func NewRedemptionTask( } } -func (rt *RedemptionTask) Run(walletPublicKeyHash [20]byte) ( +func (rt *RedemptionTask) Run(request *tbtc.CoordinationProposalRequest) ( tbtc.CoordinationProposal, bool, error, ) { + walletPublicKeyHash := request.WalletPublicKeyHash + taskLogger := logger.With( zap.String("task", rt.ActionType().String()), zap.String("walletPKH", fmt.Sprintf("0x%x", walletPublicKeyHash)), @@ -93,11 +96,12 @@ type RedemptionRequest struct { RequestedAmount uint64 } -// FindPendingRedemptions finds pending redemptions requests according to -// the provided filter. The returned value is a map, where the key is -// a 20-byte public key hash of a specific wallet and the value is a list -// of pending requests targeting this wallet. It is guaranteed that an existing -// key has always a non-empty slice as value. +// FindPendingRedemptions finds pending redemptions requests for the +// provided wallet. The returned value is a list of redeemers output +// scripts that come from detected pending requests targeting this wallet. +// The maxNumberOfRequests parameter is used as a ceiling for the number of +// requests in the result. If number of discovered requests meets the +// maxNumberOfRequests the function will stop fetching more requests. func (rt *RedemptionTask) FindPendingRedemptions( taskLogger log.StandardLogger, walletPublicKeyHash [20]byte, diff --git a/pkg/tbtcpg/tbtcpg.go b/pkg/tbtcpg/tbtcpg.go index 92449ff1c6..24d5434b3e 100644 --- a/pkg/tbtcpg/tbtcpg.go +++ b/pkg/tbtcpg/tbtcpg.go @@ -2,6 +2,7 @@ package tbtcpg import ( "fmt" + "github.com/ipfs/go-log/v2" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/tbtc" @@ -16,7 +17,10 @@ var logger = log.Logger("keep-tbtcpg") type ProposalTask interface { // Run executes the task and returns a proposal, a boolean flag indicating // whether the proposal was generated and an error if any. - Run(walletPublicKeyHash [20]byte) (tbtc.CoordinationProposal, bool, error) + Run( + request *tbtc.CoordinationProposalRequest, + ) (tbtc.CoordinationProposal, bool, error) + // ActionType returns the type of the action proposal. ActionType() tbtc.WalletActionType } @@ -35,7 +39,7 @@ func NewProposalGenerator( tasks := []ProposalTask{ NewDepositSweepTask(chain, btcChain), NewRedemptionTask(chain, btcChain), - // newHeartbeatTask(chain, btcChain), + NewHeartbeatTask(chain), // TODO: Uncomment when moving funds support is implemented. // newMovedFundsSweepTask(), // newMovingFundsTask(), @@ -53,19 +57,21 @@ func NewProposalGenerator( // given wallet's state. If none of the actions are valid, the generator // returns a no-op proposal. func (pg *ProposalGenerator) Generate( - walletPublicKeyHash [20]byte, - actionsChecklist []tbtc.WalletActionType, + request *tbtc.CoordinationProposalRequest, ) (tbtc.CoordinationProposal, error) { walletLogger := logger.With( - zap.String("walletPKH", fmt.Sprintf("0x%x", walletPublicKeyHash)), + zap.String( + "walletPKH", + fmt.Sprintf("0x%x", request.WalletPublicKeyHash), + ), ) walletLogger.Info( "starting proposal generation with tasks checklist [%v]", - actionsChecklist, + request.ActionsChecklist, ) - for _, action := range actionsChecklist { + for _, action := range request.ActionsChecklist { walletLogger.Infof("starting proposal task [%s]", action) taskIndex := slices.IndexFunc(pg.tasks, func(task ProposalTask) bool { @@ -77,7 +83,7 @@ func (pg *ProposalGenerator) Generate( continue } - proposal, ok, err := pg.tasks[taskIndex].Run(walletPublicKeyHash) + proposal, ok, err := pg.tasks[taskIndex].Run(request) if err != nil { return nil, fmt.Errorf( "error while running proposal task [%s]: [%v]", diff --git a/pkg/tbtcpg/tbtcpg_test.go b/pkg/tbtcpg/tbtcpg_test.go index b90cf238c4..675537ccdd 100644 --- a/pkg/tbtcpg/tbtcpg_test.go +++ b/pkg/tbtcpg/tbtcpg_test.go @@ -2,9 +2,10 @@ package tbtcpg import ( "fmt" - "github.com/keep-network/keep-core/pkg/tbtc" "reflect" "testing" + + "github.com/keep-network/keep-core/pkg/tbtc" ) func TestProposalGenerator_Generate(t *testing.T) { @@ -131,8 +132,11 @@ func TestProposalGenerator_Generate(t *testing.T) { } proposal, err := generator.Generate( - walletPublicKeyHash, - test.actionsChecklist, + &tbtc.CoordinationProposalRequest{ + WalletPublicKeyHash: walletPublicKeyHash, + WalletOperators: nil, + ActionsChecklist: test.actionsChecklist, + }, ) if !reflect.DeepEqual(test.expectedErr, err) { @@ -167,12 +171,14 @@ type mockProposalTask struct { results map[[20]byte]mockProposalTaskResult } -func (mpt *mockProposalTask) Run(walletPublicKeyHash [20]byte) ( +func (mpt *mockProposalTask) Run( + request *tbtc.CoordinationProposalRequest, +) ( tbtc.CoordinationProposal, bool, error, ) { - result, ok := mpt.results[walletPublicKeyHash] + result, ok := mpt.results[request.WalletPublicKeyHash] if !ok { panic("unexpected wallet public key hash") }