From 1f0f728f8a6f8d1fb52dde8d7a115e1335f17a98 Mon Sep 17 00:00:00 2001 From: Gurjot Singh <111540954+gusin13@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:52:09 +0530 Subject: [PATCH] fix: resubscribe to btc notifications (#82) --- go.mod | 4 +-- go.sum | 10 +++----- internal/db/delegation.go | 36 +++++++++++++++++++++++++++ internal/db/interface.go | 7 ++++++ internal/services/delegation.go | 22 ++++++++-------- internal/services/service.go | 2 ++ internal/services/subscription.go | 23 +++++++++++++++-- internal/services/watch_btc_events.go | 5 ++-- internal/types/state.go | 7 +++--- tests/mocks/mock_db_client.go | 30 ++++++++++++++++++++++ 10 files changed, 119 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 2c86437..5ff2c47 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 - github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c + github.com/babylonlabs-io/babylon v0.17.2 + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241203052145-e50972fc19c9 github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index 26ea697..112c5f4 100644 --- a/go.sum +++ b/go.sum @@ -280,12 +280,10 @@ github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX github.com/aws/aws-sdk-go v1.44.312 h1:llrElfzeqG/YOLFFKjg1xNpZCFJ2xraIi3PqSuP+95k= github.com/aws/aws-sdk-go v1.44.312/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= -github.com/babylonlabs-io/babylon v0.17.1 h1:lyWGdR7B49qDw5pllLyTW/HAM5uQWXXPZefjFzy/Xy0= -github.com/babylonlabs-io/babylon v0.17.1/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62 h1:lMj/YjQMUCaynl4EBOZIqQNvTX7muiNKhHeell2PRU4= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c h1:SmAd+0HvmOYzP0Q7MMvHMhD8LplaZZaGekjn+ubj1Bs= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/babylon v0.17.2 h1:CzBXYFOZLg3+ijfqIsisETAzRVp5AGKNO2FG63w0HUo= +github.com/babylonlabs-io/babylon v0.17.2/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241203052145-e50972fc19c9 h1:4UGpRPm5oxj3P8QNrGrkiQtlQYTAgsWzXJFi8oD7zpY= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241203052145-e50972fc19c9/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/db/delegation.go b/internal/db/delegation.go index 7af979e..92b89e9 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -262,6 +262,7 @@ func (db *Database) SaveBTCDelegationSlashingTxHex( return nil } + func (db *Database) SaveBTCDelegationUnbondingSlashingTxHex( ctx context.Context, stakingTxHash string, unbondingSlashingTxHex string, ) error { @@ -287,3 +288,38 @@ func (db *Database) SaveBTCDelegationUnbondingSlashingTxHex( return nil } + +func (db *Database) GetBTCDelegationsByStates( + ctx context.Context, + states []types.DelegationState, +) ([]*model.BTCDelegationDetails, error) { + // Convert states to a slice of strings + stateStrings := make([]string, len(states)) + for i, state := range states { + stateStrings[i] = state.String() + } + + filter := bson.M{"state": bson.M{"$in": stateStrings}} + + cursor, err := db.client.Database(db.dbName). + Collection(model.BTCDelegationDetailsCollection). + Find(ctx, filter) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var delegations []*model.BTCDelegationDetails + if err := cursor.All(ctx, &delegations); err != nil { + return nil, err + } + + if len(delegations) == 0 { + return nil, &NotFoundError{ + Key: "specified states", + Message: "No BTC delegations found for the specified states", + } + } + + return delegations, nil +} diff --git a/internal/db/interface.go b/internal/db/interface.go index 27d56cb..d63064d 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -219,4 +219,11 @@ type DbInterface interface { * @return An error if the operation failed */ SaveBTCDelegationUnbondingSlashingTxHex(ctx context.Context, stakingTxHashHex string, unbondingSlashingTxHex string) error + /** + * GetBTCDelegationsByStates retrieves the BTC delegations by the states. + * @param ctx The context + * @param states The states + * @return The BTC delegations or an error + */ + GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.BTCDelegationDetails, error) } diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 1157d71..c95d022 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -149,12 +149,16 @@ func (s *Service) processCovenantQuorumReachedEvent( ) } newState := types.DelegationState(covenantQuorumReachedEvent.NewState) - // Emit consumer event if the new state is active if newState == types.StateActive { err = s.emitActiveDelegationEvent(ctx, delegation) if err != nil { return err } + + // Register spend notification + if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + return err + } } if dbErr := s.db.UpdateBTCDelegationState( @@ -203,12 +207,16 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( } newState := types.DelegationState(inclusionProofEvent.NewState) - // Emit consumer event if the new state is active if newState == types.StateActive { err = s.emitActiveDelegationEvent(ctx, delegation) if err != nil { return err } + + // Register spend notification + if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + return err + } } if dbErr := s.db.UpdateBTCDelegationDetails( @@ -310,11 +318,6 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( ) } - // Register unbonding spend notification - if err := s.registerUnbondingSpendNotification(ctx, delegation); err != nil { - return err - } - return nil } @@ -383,11 +386,6 @@ func (s *Service) processBTCDelegationExpiredEvent( ) } - // Register spend notification - if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { - return err - } - return nil } diff --git a/internal/services/service.go b/internal/services/service.go index 0bddfea..39263fb 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -66,6 +66,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) { // Sync global parameters s.SyncGlobalParams(ctx) + // Resubscribe to missed BTC notifications + s.ResubscribeToMissedBtcNotifications(ctx) // Start the expiry checker s.StartExpiryChecker(ctx) // Start the websocket event subscription process diff --git a/internal/services/subscription.go b/internal/services/subscription.go index f9b6685..bee2f49 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -3,7 +3,8 @@ package services import ( "context" - "github.com/cometbft/cometbft/types" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + ctypes "github.com/cometbft/cometbft/types" "github.com/rs/zerolog/log" ) @@ -26,7 +27,7 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) { for { select { case event := <-eventChan: - newBlockEvent, ok := event.Data.(types.EventDataNewBlock) + newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) if !ok { log.Fatal().Msg("Event is not a NewBlock event") } @@ -49,3 +50,21 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) { } }() } + +// Resubscribe to missed BTC notifications +func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) { + go func() { + defer s.wg.Done() + delegations, err := s.db.GetBTCDelegationsByStates(ctx, []types.DelegationState{types.StateUnbonding, types.StateSlashed}) + if err != nil { + log.Fatal().Msgf("Failed to get BTC delegations: %v", err) + } + + for _, delegation := range delegations { + // Register spend notification + if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + log.Fatal().Msgf("Failed to register spend notification: %v", err) + } + } + }() +} diff --git a/internal/services/watch_btc_events.go b/internal/services/watch_btc_events.go index 3fa6b3f..49654e9 100644 --- a/internal/services/watch_btc_events.go +++ b/internal/services/watch_btc_events.go @@ -175,8 +175,9 @@ func (s *Service) handleSpendingStakingTransaction( Str("staking_tx", delegation.StakingTxHashHex). Str("unbonding_tx", spendingTx.TxHash().String()). Msg("staking tx has been spent through unbonding path") - // It's a valid unbonding tx, no further action needed at this stage - return nil + + // Register unbonding spend notification + return s.registerUnbondingSpendNotification(ctx, delegation) } // Try to validate as withdrawal transaction diff --git a/internal/types/state.go b/internal/types/state.go index bd61c82..cc00155 100644 --- a/internal/types/state.go +++ b/internal/types/state.go @@ -53,9 +53,10 @@ func QualifiedStatesForExpired() []DelegationState { // QualifiedStatesForWithdrawn returns the qualified current states for Withdrawn event func QualifiedStatesForWithdrawn() []DelegationState { - // StateUnbonding is included because its possible that expiry checker is slow - // and in meanwhile the btc subscription encounters the spending/withdrawal tx - return []DelegationState{StateUnbonding, StateWithdrawable} + // StateActive/StateUnbonding/StateSlashed is included b/c its possible that expiry checker + // or babylon notifications are slow and in meanwhile the btc subscription encounters + // the spending/withdrawal tx + return []DelegationState{StateActive, StateUnbonding, StateWithdrawable, StateSlashed} } // QualifiedStatesForWithdrawable returns the qualified current states for Withdrawable event diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 6d6082c..a1bc70e 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -127,6 +127,36 @@ func (_m *DbInterface) GetBTCDelegationState(ctx context.Context, stakingTxHash return r0, r1 } +// GetBTCDelegationsByStates provides a mock function with given fields: ctx, states +func (_m *DbInterface) GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.BTCDelegationDetails, error) { + ret := _m.Called(ctx, states) + + if len(ret) == 0 { + panic("no return value specified for GetBTCDelegationsByStates") + } + + var r0 []*model.BTCDelegationDetails + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []types.DelegationState) ([]*model.BTCDelegationDetails, error)); ok { + return rf(ctx, states) + } + if rf, ok := ret.Get(0).(func(context.Context, []types.DelegationState) []*model.BTCDelegationDetails); ok { + r0 = rf(ctx, states) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.BTCDelegationDetails) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []types.DelegationState) error); ok { + r1 = rf(ctx, states) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetDelegationsByFinalityProvider provides a mock function with given fields: ctx, fpBtcPkHex func (_m *DbInterface) GetDelegationsByFinalityProvider(ctx context.Context, fpBtcPkHex string) ([]*model.BTCDelegationDetails, error) { ret := _m.Called(ctx, fpBtcPkHex)