diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go index 6aec021..9f0c3f7 100644 --- a/consumer/event_consumer.go +++ b/consumer/event_consumer.go @@ -6,7 +6,7 @@ import ( type EventConsumer interface { Start() error - PushStakingEvent(ev *client.StakingEvent) error - PushUnbondingEvent(ev *client.StakingEvent) error + PushActiveStakingEvent(ev *client.StakingEvent) error + PushUnbondingStakingEvent(ev *client.StakingEvent) error Stop() error } diff --git a/go.mod b/go.mod index 0aa93a9..2c86437 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62 + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index 5da05a8..26ea697 100644 --- a/go.sum +++ b/go.sum @@ -284,6 +284,8 @@ github.com/babylonlabs-io/babylon v0.17.1 h1:lyWGdR7B49qDw5pllLyTW/HAM5uQWXXPZef github.com/babylonlabs-io/babylon v0.17.1/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62 h1:lMj/YjQMUCaynl4EBOZIqQNvTX7muiNKhHeell2PRU4= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c h1:SmAd+0HvmOYzP0Q7MMvHMhD8LplaZZaGekjn+ubj1Bs= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/db/delegation.go b/internal/db/delegation.go index 330d1d4..7af979e 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -209,6 +209,34 @@ func (db *Database) UpdateDelegationsStateByFinalityProvider( return nil } +func (db *Database) GetDelegationsByFinalityProvider( + ctx context.Context, + fpBTCPKHex string, +) ([]*model.BTCDelegationDetails, error) { + filter := bson.M{ + "finality_provider_btc_pks_hex": fpBTCPKHex, + } + + cursor, err := db.client.Database(db.dbName). + Collection(model.BTCDelegationDetailsCollection). + Find(ctx, filter) + if err != nil { + return nil, fmt.Errorf("failed to find delegations: %w", err) + } + defer cursor.Close(ctx) + + var delegations []*model.BTCDelegationDetails + if err := cursor.All(ctx, &delegations); err != nil { + return nil, fmt.Errorf("failed to decode delegations: %w", err) + } + + log.Printf("Found %d delegations for finality provider %s", + len(delegations), + fpBTCPKHex, + ) + return delegations, nil +} + func (db *Database) SaveBTCDelegationSlashingTxHex( ctx context.Context, stakingTxHash string, slashingTxHex string, ) error { diff --git a/internal/db/interface.go b/internal/db/interface.go index b8ec414..27d56cb 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -154,6 +154,13 @@ type DbInterface interface { UpdateDelegationsStateByFinalityProvider( ctx context.Context, fpBtcPkHex string, newState types.DelegationState, ) error + /** + * GetDelegationsByFinalityProvider retrieves the BTC delegations by the finality provider public key. + * @param ctx The context + * @param fpBtcPkHex The finality provider public key + * @return The BTC delegations or an error + */ + GetDelegationsByFinalityProvider(ctx context.Context, fpBtcPkHex string) ([]*model.BTCDelegationDetails, error) /** * SaveNewTimeLockExpire saves a new timelock expire to the database. * If the timelock expire already exists, DuplicateKeyError will be returned. diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index b48b5a3..1fa67b3 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -3,53 +3,34 @@ package services import ( "context" "fmt" - "net/http" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" queuecli "github.com/babylonlabs-io/staking-queue-client/client" ) -func (s *Service) emitConsumerEvent( - ctx context.Context, newState types.DelegationState, delegation *model.BTCDelegationDetails, -) *types.Error { - switch newState { - case types.StateActive: - return s.sendActiveDelegationEvent(ctx, delegation) - case types.StateUnbonding: - return s.sendUnbondingDelegationEvent(ctx, delegation) - default: - return types.NewError( - http.StatusInternalServerError, - types.InternalServiceError, - fmt.Errorf("unknown delegation state: %s", newState), - ) - } -} - -// TODO: fix the queue event schema -func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - stakingEvent := queuecli.NewActiveStakingEventV2( +func (s *Service) emitActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + stakingEvent := queuecli.NewActiveStakingEvent( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.PushStakingEvent(&stakingEvent); err != nil { + if err := s.queueManager.PushActiveStakingEvent(&stakingEvent); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to push the staking event to the queue: %w", err)) } return nil } -func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queuecli.NewUnbondingStakingEventV2( +func (s *Service) emitUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queuecli.NewUnbondingStakingEvent( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.PushUnbondingEvent(&ev); err != nil { + if err := s.queueManager.PushUnbondingStakingEvent(&ev); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err)) } return nil diff --git a/internal/services/delegation.go b/internal/services/delegation.go index d55ed88..1157d71 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -151,7 +151,7 @@ func (s *Service) processCovenantQuorumReachedEvent( newState := types.DelegationState(covenantQuorumReachedEvent.NewState) // Emit consumer event if the new state is active if newState == types.StateActive { - err = s.emitConsumerEvent(ctx, newState, delegation) + err = s.emitActiveDelegationEvent(ctx, delegation) if err != nil { return err } @@ -205,7 +205,7 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( newState := types.DelegationState(inclusionProofEvent.NewState) // Emit consumer event if the new state is active if newState == types.StateActive { - err = s.emitConsumerEvent(ctx, newState, delegation) + err = s.emitActiveDelegationEvent(ctx, delegation) if err != nil { return err } @@ -256,7 +256,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( } // Emit consumer event - if err := s.emitConsumerEvent(ctx, types.StateUnbonding, delegation); err != nil { + if err := s.emitUnbondingDelegationEvent(ctx, delegation); err != nil { return err } @@ -348,7 +348,7 @@ func (s *Service) processBTCDelegationExpiredEvent( } // Emit consumer event - if err := s.emitConsumerEvent(ctx, types.StateUnbonding, delegation); err != nil { + if err := s.emitUnbondingDelegationEvent(ctx, delegation); err != nil { return err } @@ -424,5 +424,20 @@ func (s *Service) processSlashedFinalityProviderEvent( ) } + delegations, dbErr := s.db.GetDelegationsByFinalityProvider(ctx, fpBTCPKHex) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegations by finality provider: %w", dbErr), + ) + } + + for _, delegation := range delegations { + if err := s.emitUnbondingDelegationEvent(ctx, delegation); err != nil { + return err + } + } + return nil } diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index fbcf319..6d6082c 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -127,6 +127,36 @@ func (_m *DbInterface) GetBTCDelegationState(ctx context.Context, stakingTxHash return r0, r1 } +// GetDelegationsByFinalityProvider provides a mock function with given fields: ctx, fpBtcPkHex +func (_m *DbInterface) GetDelegationsByFinalityProvider(ctx context.Context, fpBtcPkHex string) ([]*model.BTCDelegationDetails, error) { + ret := _m.Called(ctx, fpBtcPkHex) + + if len(ret) == 0 { + panic("no return value specified for GetDelegationsByFinalityProvider") + } + + var r0 []*model.BTCDelegationDetails + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*model.BTCDelegationDetails, error)); ok { + return rf(ctx, fpBtcPkHex) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []*model.BTCDelegationDetails); ok { + r0 = rf(ctx, fpBtcPkHex) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.BTCDelegationDetails) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, fpBtcPkHex) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetFinalityProviderByBtcPk provides a mock function with given fields: ctx, btcPk func (_m *DbInterface) GetFinalityProviderByBtcPk(ctx context.Context, btcPk string) (*model.FinalityProviderDetails, error) { ret := _m.Called(ctx, btcPk)