Skip to content

Commit

Permalink
Merge branch 'main' into moving-funds-commitment
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaszslabon committed Dec 5, 2023
2 parents c35f245 + 130c988 commit a0647a0
Show file tree
Hide file tree
Showing 24 changed files with 525 additions and 1,001 deletions.
7 changes: 7 additions & 0 deletions pkg/chain/ethereum/tbtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1965,3 +1965,10 @@ func (tc *TbtcChain) GetRedemptionMaxSize() (uint16, error) {
func (tc *TbtcChain) GetRedemptionRequestMinAge() (uint32, error) {
return tc.walletCoordinator.RedemptionRequestMinAge()
}

func (tc *TbtcChain) ValidateHeartbeatProposal(
proposal *tbtc.HeartbeatProposal,
) error {
// TODO: Implementation.
panic("not implemented yet")
}
45 changes: 34 additions & 11 deletions pkg/tbtc/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"math/rand"
"sort"

"github.com/keep-network/keep-core/pkg/internal/pb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"math/rand"
"sort"

"github.com/keep-network/keep-core/pkg/bitcoin"
"github.com/keep-network/keep-core/pkg/chain"
Expand Down Expand Up @@ -195,6 +196,13 @@ func (cf *coordinationFault) String() string {
)
}

// CoordinationProposalRequest represents a request for a coordination proposal.
type CoordinationProposalRequest struct {
WalletPublicKeyHash [20]byte
WalletOperators []chain.Address
ActionsChecklist []WalletActionType
}

// CoordinationProposalGenerator is a component responsible for generating
// coordination proposals.
type CoordinationProposalGenerator interface {
Expand All @@ -204,10 +212,7 @@ type CoordinationProposalGenerator interface {
// expected to return a proposal for the first action from the checklist
// that is valid for the given wallet's state. If none of the actions are
// valid, the generator should return a no-op proposal.
Generate(
walletPublicKeyHash [20]byte,
actionsChecklist []WalletActionType,
) (CoordinationProposal, error)
Generate(request *CoordinationProposalRequest) (CoordinationProposal, error)
}

// CoordinationProposal represents a single action proposal for the given wallet.
Expand Down Expand Up @@ -370,14 +375,22 @@ func (ce *coordinationExecutor) coordinate(

execLogger.Info("actions checklist is: [%v]", actionsChecklist)

// Set up a context that is cancelled when the active phase of the
// coordination window ends.
// Set up a context that is automatically cancelled when the active phase
// of the coordination window ends.
//
// The coordination leader keeps that context active for the lifetime of the
// active phase to provide retransmissions of the coordination message thus
// maximize the chance that all followers receive it on time. The only case
// when the leader cancels the context prematurely is when the leader's
// routine fails.
//
// The coordination follower cancels the context as soon as it receives
// the coordination message.
ctx, cancelCtx := withCancelOnBlock(
context.Background(),
window.activePhaseEndBlock(),
ce.waitForBlockFn,
)
defer cancelCtx()

var proposal CoordinationProposal
var faults []*coordinationFault
Expand All @@ -391,6 +404,10 @@ func (ce *coordinationExecutor) coordinate(
actionsChecklist,
)
if err != nil {
// Cancel the context upon leader's routine failure. There is
// no point to keep the context active as retransmissions do not
// occur anyway.
cancelCtx()
return nil, fmt.Errorf(
"failed to execute leader's routine: [%v]",
err,
Expand All @@ -401,6 +418,9 @@ func (ce *coordinationExecutor) coordinate(
} else {
execLogger.Info("executing follower's routine")

// Cancel the context upon follower's routine completion.
defer cancelCtx()

proposal, faults, err = ce.executeFollowerRoutine(
ctx,
leader,
Expand Down Expand Up @@ -558,8 +578,11 @@ func (ce *coordinationExecutor) executeLeaderRoutine(
walletPublicKeyHash := ce.walletPublicKeyHash()

proposal, err := ce.proposalGenerator.Generate(
walletPublicKeyHash,
actionsChecklist,
&CoordinationProposalRequest{
WalletPublicKeyHash: walletPublicKeyHash,
WalletOperators: ce.coordinatedWallet.signingGroupOperators,
ActionsChecklist: actionsChecklist,
},
)
if err != nil {
return nil, fmt.Errorf("failed to generate proposal: [%v]", err)
Expand Down
24 changes: 12 additions & 12 deletions pkg/tbtc/coordination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"math/big"
"math/rand"
"reflect"
"testing"
"time"

"github.com/go-test/deep"
"github.com/keep-network/keep-core/pkg/bitcoin"
"github.com/keep-network/keep-core/pkg/chain"
Expand All @@ -17,11 +23,6 @@ import (
"github.com/keep-network/keep-core/pkg/protocol/group"
"github.com/keep-network/keep-core/pkg/tecdsa"
"golang.org/x/exp/slices"
"math/big"
"math/rand"
"reflect"
"testing"
"time"

"github.com/keep-network/keep-core/internal/testutils"
)
Expand Down Expand Up @@ -703,7 +704,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) {
for _, action := range actionsChecklist {
if walletPublicKeyHash == publicKeyHash && action == ActionHeartbeat {
return &HeartbeatProposal{
Message: []byte("heartbeat message"),
Message: [16]byte{0x01, 0x02},
}, nil
}
}
Expand Down Expand Up @@ -764,7 +765,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) {
<-ctx.Done()

expectedProposal := &HeartbeatProposal{
Message: []byte("heartbeat message"),
Message: [16]byte{0x01, 0x02},
}

if !reflect.DeepEqual(expectedProposal, proposal) {
Expand Down Expand Up @@ -990,7 +991,7 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine(t *testing.T) {
coordinationBlock: 900,
walletPublicKeyHash: executor.walletPublicKeyHash(),
proposal: &HeartbeatProposal{
Message: []byte("heartbeat message"),
Message: [16]byte{0x01, 0x02},
},
})
if err != nil {
Expand Down Expand Up @@ -1117,7 +1118,7 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine_WithIdleLeader(t *testing.T

provider := netlocal.Connect()

broadcastChannel, err := provider.BroadcastChannelFor("test")
broadcastChannel, err := provider.BroadcastChannelFor("test-idle")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1185,8 +1186,7 @@ func newMockCoordinationProposalGenerator(
}

func (mcpg *mockCoordinationProposalGenerator) Generate(
walletPublicKeyHash [20]byte,
actionsChecklist []WalletActionType,
request *CoordinationProposalRequest,
) (CoordinationProposal, error) {
return mcpg.delegate(walletPublicKeyHash, actionsChecklist)
return mcpg.delegate(request.WalletPublicKeyHash, request.ActionsChecklist)
}
139 changes: 4 additions & 135 deletions pkg/tbtc/deduplicator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package tbtc

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"math/big"
"strconv"
Expand All @@ -19,15 +16,6 @@ const (
// DKGResultHashCachePeriod is the time period the cache maintains
// the given DKG result hash.
DKGResultHashCachePeriod = 7 * 24 * time.Hour
// HeartbeatRequestCachePeriod is the time period the cache maintains
// the given heartbeat request.
HeartbeatRequestCachePeriod = 24 * time.Hour
// DepositSweepProposalCachePeriod is the time period the cache maintains
// the given deposit sweep proposal.
DepositSweepProposalCachePeriod = 7 * 24 * time.Hour
// RedemptionProposalCachePeriod is the time period the cache maintains
// the given redemption proposal.
RedemptionProposalCachePeriod = 24 * time.Hour
)

// deduplicator decides whether the given event should be handled by the
Expand All @@ -43,24 +31,15 @@ const (
// Those events are supported:
// - DKG started
// - DKG result submitted
// - Heartbeat request submission
// - Deposit sweep proposal submission
// - Redemption proposal submission
type deduplicator struct {
dkgSeedCache *cache.TimeCache
dkgResultHashCache *cache.TimeCache
heartbeatRequestCache *cache.TimeCache
depositSweepProposalCache *cache.TimeCache
redemptionProposalCache *cache.TimeCache
dkgSeedCache *cache.TimeCache
dkgResultHashCache *cache.TimeCache
}

func newDeduplicator() *deduplicator {
return &deduplicator{
dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod),
dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod),
heartbeatRequestCache: cache.NewTimeCache(HeartbeatRequestCachePeriod),
depositSweepProposalCache: cache.NewTimeCache(DepositSweepProposalCachePeriod),
redemptionProposalCache: cache.NewTimeCache(RedemptionProposalCachePeriod),
dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod),
dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod),
}
}

Expand Down Expand Up @@ -111,113 +90,3 @@ func (d *deduplicator) notifyDKGResultSubmitted(
// proceed with the execution.
return false
}

// notifyHeartbeatRequestSubmitted notifies the client wants to start some
// actions upon the heartbeat request submitted. It returns boolean indicating
// whether the client should proceed with the actions or ignore the event as
// a duplicate.
func (d *deduplicator) notifyHeartbeatRequestSubmitted(
walletPublicKeyHash [20]byte,
message []byte,
) bool {
d.heartbeatRequestCache.Sweep()

var buffer bytes.Buffer
buffer.Write(walletPublicKeyHash[:])
buffer.Write(message)

bufferSha256 := sha256.Sum256(buffer.Bytes())
cacheKey := hex.EncodeToString(bufferSha256[:])

// If the key is not in the cache, that means the request was not handled
// yet and the client should proceed with the execution.
if !d.heartbeatRequestCache.Has(cacheKey) {
d.heartbeatRequestCache.Add(cacheKey)
return true
}

// Otherwise, the request is a duplicate and the client should not
// proceed with the execution.
return false
}

// notifyDepositSweepProposalSubmitted notifies the client wants to start some
// actions upon the deposit sweep proposal submission. It returns boolean
// indicating whether the client should proceed with the actions or ignore the
// event as a duplicate.
func (d *deduplicator) notifyDepositSweepProposalSubmitted(
newProposal *DepositSweepProposal,
) bool {
d.depositSweepProposalCache.Sweep()

// We build the cache key by hashing the concatenation of relevant fields
// of the proposal. It may be tempting to extract that code into a general
// "hash code" function exposed by the DepositSweepProposal type but this
// is not necessarily a good idea. The deduplicator is responsible for
// detecting duplicates and construction of cache keys is part of that job.
// Extracting this logic outside would push that responsibility out of the
// deduplicator control. That is dangerous as deduplication logic could
// be implicitly changeable from the outside and lead to serious bugs.
var buffer bytes.Buffer
buffer.Write(newProposal.WalletPublicKeyHash[:])
for _, depositKey := range newProposal.DepositsKeys {
buffer.Write(depositKey.FundingTxHash[:])
fundingOutputIndex := make([]byte, 4)
binary.BigEndian.PutUint32(fundingOutputIndex, depositKey.FundingOutputIndex)
buffer.Write(fundingOutputIndex)
}
buffer.Write(newProposal.SweepTxFee.Bytes())

bufferSha256 := sha256.Sum256(buffer.Bytes())
cacheKey := hex.EncodeToString(bufferSha256[:])

// If the key is not in the cache, that means the proposal was not handled
// yet and the client should proceed with the execution.
if !d.depositSweepProposalCache.Has(cacheKey) {
d.depositSweepProposalCache.Add(cacheKey)
return true
}

// Otherwise, the proposal is a duplicate and the client should not
// proceed with the execution.
return false
}

// notifyRedemptionProposalSubmitted notifies the client wants to start some
// actions upon the redemption proposal submission. It returns boolean
// indicating whether the client should proceed with the actions or ignore the
// event as a duplicate.
func (d *deduplicator) notifyRedemptionProposalSubmitted(
newProposal *RedemptionProposal,
) bool {
d.redemptionProposalCache.Sweep()

// We build the cache key by hashing the concatenation of relevant fields
// of the proposal. It may be tempting to extract that code into a general
// "hash code" function exposed by the RedemptionProposal type but this
// is not necessarily a good idea. The deduplicator is responsible for
// detecting duplicates and construction of cache keys is part of that job.
// Extracting this logic outside would push that responsibility out of the
// deduplicator control. That is dangerous as deduplication logic could
// be implicitly changeable from the outside and lead to serious bugs.
var buffer bytes.Buffer
buffer.Write(newProposal.WalletPublicKeyHash[:])
for _, script := range newProposal.RedeemersOutputScripts {
buffer.Write(script)
}
buffer.Write(newProposal.RedemptionTxFee.Bytes())

bufferSha256 := sha256.Sum256(buffer.Bytes())
cacheKey := hex.EncodeToString(bufferSha256[:])

// If the key is not in the cache, that means the proposal was not handled
// yet and the client should proceed with the execution.
if !d.redemptionProposalCache.Has(cacheKey) {
d.redemptionProposalCache.Add(cacheKey)
return true
}

// Otherwise, the proposal is a duplicate and the client should not
// proceed with the execution.
return false
}
Loading

0 comments on commit a0647a0

Please sign in to comment.