Skip to content

Commit

Permalink
fix: resubscribe to btc notifications (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 authored Dec 3, 2024
1 parent 0ec19f9 commit 1f0f728
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 27 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.23.1

require (
github.com/avast/retry-go/v4 v4.5.1
github.com/babylonlabs-io/babylon v0.17.1
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c
github.com/babylonlabs-io/babylon v0.17.2
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241203052145-e50972fc19c9
github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/btcsuite/btcd/btcutil v1.1.6
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,10 @@ github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX
github.com/aws/aws-sdk-go v1.44.312 h1:llrElfzeqG/YOLFFKjg1xNpZCFJ2xraIi3PqSuP+95k=
github.com/aws/aws-sdk-go v1.44.312/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/babylonlabs-io/babylon v0.17.1 h1:lyWGdR7B49qDw5pllLyTW/HAM5uQWXXPZefjFzy/Xy0=
github.com/babylonlabs-io/babylon v0.17.1/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62 h1:lMj/YjQMUCaynl4EBOZIqQNvTX7muiNKhHeell2PRU4=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c h1:SmAd+0HvmOYzP0Q7MMvHMhD8LplaZZaGekjn+ubj1Bs=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/babylonlabs-io/babylon v0.17.2 h1:CzBXYFOZLg3+ijfqIsisETAzRVp5AGKNO2FG63w0HUo=
github.com/babylonlabs-io/babylon v0.17.2/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241203052145-e50972fc19c9 h1:4UGpRPm5oxj3P8QNrGrkiQtlQYTAgsWzXJFi8oD7zpY=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241203052145-e50972fc19c9/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
36 changes: 36 additions & 0 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (db *Database) SaveBTCDelegationSlashingTxHex(

return nil
}

func (db *Database) SaveBTCDelegationUnbondingSlashingTxHex(
ctx context.Context, stakingTxHash string, unbondingSlashingTxHex string,
) error {
Expand All @@ -287,3 +288,38 @@ func (db *Database) SaveBTCDelegationUnbondingSlashingTxHex(

return nil
}

func (db *Database) GetBTCDelegationsByStates(
ctx context.Context,
states []types.DelegationState,
) ([]*model.BTCDelegationDetails, error) {
// Convert states to a slice of strings
stateStrings := make([]string, len(states))
for i, state := range states {
stateStrings[i] = state.String()
}

filter := bson.M{"state": bson.M{"$in": stateStrings}}

cursor, err := db.client.Database(db.dbName).
Collection(model.BTCDelegationDetailsCollection).
Find(ctx, filter)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)

var delegations []*model.BTCDelegationDetails
if err := cursor.All(ctx, &delegations); err != nil {
return nil, err
}

if len(delegations) == 0 {
return nil, &NotFoundError{
Key: "specified states",
Message: "No BTC delegations found for the specified states",
}
}

return delegations, nil
}
7 changes: 7 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,11 @@ type DbInterface interface {
* @return An error if the operation failed
*/
SaveBTCDelegationUnbondingSlashingTxHex(ctx context.Context, stakingTxHashHex string, unbondingSlashingTxHex string) error
/**
* GetBTCDelegationsByStates retrieves the BTC delegations by the states.
* @param ctx The context
* @param states The states
* @return The BTC delegations or an error
*/
GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.BTCDelegationDetails, error)
}
22 changes: 10 additions & 12 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ func (s *Service) processCovenantQuorumReachedEvent(
)
}
newState := types.DelegationState(covenantQuorumReachedEvent.NewState)
// Emit consumer event if the new state is active
if newState == types.StateActive {
err = s.emitActiveDelegationEvent(ctx, delegation)
if err != nil {
return err
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
return err
}
}

if dbErr := s.db.UpdateBTCDelegationState(
Expand Down Expand Up @@ -203,12 +207,16 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent(
}

newState := types.DelegationState(inclusionProofEvent.NewState)
// Emit consumer event if the new state is active
if newState == types.StateActive {
err = s.emitActiveDelegationEvent(ctx, delegation)
if err != nil {
return err
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
return err
}
}

if dbErr := s.db.UpdateBTCDelegationDetails(
Expand Down Expand Up @@ -310,11 +318,6 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent(
)
}

// Register unbonding spend notification
if err := s.registerUnbondingSpendNotification(ctx, delegation); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -383,11 +386,6 @@ func (s *Service) processBTCDelegationExpiredEvent(
)
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
return err
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) {

// Sync global parameters
s.SyncGlobalParams(ctx)
// Resubscribe to missed BTC notifications
s.ResubscribeToMissedBtcNotifications(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the websocket event subscription process
Expand Down
23 changes: 21 additions & 2 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package services
import (
"context"

"github.com/cometbft/cometbft/types"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
ctypes "github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

Expand All @@ -26,7 +27,7 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
for {
select {
case event := <-eventChan:
newBlockEvent, ok := event.Data.(types.EventDataNewBlock)
newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
if !ok {
log.Fatal().Msg("Event is not a NewBlock event")
}
Expand All @@ -49,3 +50,21 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
}
}()
}

// Resubscribe to missed BTC notifications
func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) {
go func() {
defer s.wg.Done()
delegations, err := s.db.GetBTCDelegationsByStates(ctx, []types.DelegationState{types.StateUnbonding, types.StateSlashed})
if err != nil {
log.Fatal().Msgf("Failed to get BTC delegations: %v", err)
}

for _, delegation := range delegations {
// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
log.Fatal().Msgf("Failed to register spend notification: %v", err)
}
}
}()
}
5 changes: 3 additions & 2 deletions internal/services/watch_btc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ func (s *Service) handleSpendingStakingTransaction(
Str("staking_tx", delegation.StakingTxHashHex).
Str("unbonding_tx", spendingTx.TxHash().String()).
Msg("staking tx has been spent through unbonding path")
// It's a valid unbonding tx, no further action needed at this stage
return nil

// Register unbonding spend notification
return s.registerUnbondingSpendNotification(ctx, delegation)
}

// Try to validate as withdrawal transaction
Expand Down
7 changes: 4 additions & 3 deletions internal/types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func QualifiedStatesForExpired() []DelegationState {

// QualifiedStatesForWithdrawn returns the qualified current states for Withdrawn event
func QualifiedStatesForWithdrawn() []DelegationState {
// StateUnbonding is included because its possible that expiry checker is slow
// and in meanwhile the btc subscription encounters the spending/withdrawal tx
return []DelegationState{StateUnbonding, StateWithdrawable}
// StateActive/StateUnbonding/StateSlashed is included b/c its possible that expiry checker
// or babylon notifications are slow and in meanwhile the btc subscription encounters
// the spending/withdrawal tx
return []DelegationState{StateActive, StateUnbonding, StateWithdrawable, StateSlashed}
}

// QualifiedStatesForWithdrawable returns the qualified current states for Withdrawable event
Expand Down
30 changes: 30 additions & 0 deletions tests/mocks/mock_db_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1f0f728

Please sign in to comment.