Skip to content

Commit

Permalink
feat: emit events on slashed fp (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 authored Dec 3, 2024
1 parent a94996d commit 0ec19f9
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 32 deletions.
4 changes: 2 additions & 2 deletions consumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

type EventConsumer interface {
Start() error
PushStakingEvent(ev *client.StakingEvent) error
PushUnbondingEvent(ev *client.StakingEvent) error
PushActiveStakingEvent(ev *client.StakingEvent) error
PushUnbondingStakingEvent(ev *client.StakingEvent) error
Stop() error
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23.1
require (
github.com/avast/retry-go/v4 v4.5.1
github.com/babylonlabs-io/babylon v0.17.1
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c
github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/btcsuite/btcd/btcutil v1.1.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ github.com/babylonlabs-io/babylon v0.17.1 h1:lyWGdR7B49qDw5pllLyTW/HAM5uQWXXPZef
github.com/babylonlabs-io/babylon v0.17.1/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62 h1:lMj/YjQMUCaynl4EBOZIqQNvTX7muiNKhHeell2PRU4=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129112518-b417aa03ec62/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c h1:SmAd+0HvmOYzP0Q7MMvHMhD8LplaZZaGekjn+ubj1Bs=
github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241201083346-9e79b1ae1e4c/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
28 changes: 28 additions & 0 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,34 @@ func (db *Database) UpdateDelegationsStateByFinalityProvider(
return nil
}

func (db *Database) GetDelegationsByFinalityProvider(
ctx context.Context,
fpBTCPKHex string,
) ([]*model.BTCDelegationDetails, error) {
filter := bson.M{
"finality_provider_btc_pks_hex": fpBTCPKHex,
}

cursor, err := db.client.Database(db.dbName).
Collection(model.BTCDelegationDetailsCollection).
Find(ctx, filter)
if err != nil {
return nil, fmt.Errorf("failed to find delegations: %w", err)
}
defer cursor.Close(ctx)

var delegations []*model.BTCDelegationDetails
if err := cursor.All(ctx, &delegations); err != nil {
return nil, fmt.Errorf("failed to decode delegations: %w", err)
}

log.Printf("Found %d delegations for finality provider %s",
len(delegations),
fpBTCPKHex,
)
return delegations, nil
}

func (db *Database) SaveBTCDelegationSlashingTxHex(
ctx context.Context, stakingTxHash string, slashingTxHex string,
) error {
Expand Down
7 changes: 7 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ type DbInterface interface {
UpdateDelegationsStateByFinalityProvider(
ctx context.Context, fpBtcPkHex string, newState types.DelegationState,
) error
/**
* GetDelegationsByFinalityProvider retrieves the BTC delegations by the finality provider public key.
* @param ctx The context
* @param fpBtcPkHex The finality provider public key
* @return The BTC delegations or an error
*/
GetDelegationsByFinalityProvider(ctx context.Context, fpBtcPkHex string) ([]*model.BTCDelegationDetails, error)
/**
* SaveNewTimeLockExpire saves a new timelock expire to the database.
* If the timelock expire already exists, DuplicateKeyError will be returned.
Expand Down
31 changes: 6 additions & 25 deletions internal/services/consumer_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,34 @@ package services
import (
"context"
"fmt"
"net/http"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
queuecli "github.com/babylonlabs-io/staking-queue-client/client"
)

func (s *Service) emitConsumerEvent(
ctx context.Context, newState types.DelegationState, delegation *model.BTCDelegationDetails,
) *types.Error {
switch newState {
case types.StateActive:
return s.sendActiveDelegationEvent(ctx, delegation)
case types.StateUnbonding:
return s.sendUnbondingDelegationEvent(ctx, delegation)
default:
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("unknown delegation state: %s", newState),
)
}
}

// TODO: fix the queue event schema
func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
stakingEvent := queuecli.NewActiveStakingEventV2(
func (s *Service) emitActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
stakingEvent := queuecli.NewActiveStakingEvent(
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
)

if err := s.queueManager.PushStakingEvent(&stakingEvent); err != nil {
if err := s.queueManager.PushActiveStakingEvent(&stakingEvent); err != nil {
return types.NewInternalServiceError(fmt.Errorf("failed to push the staking event to the queue: %w", err))
}
return nil
}

func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
ev := queuecli.NewUnbondingStakingEventV2(
func (s *Service) emitUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
ev := queuecli.NewUnbondingStakingEvent(
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
)
if err := s.queueManager.PushUnbondingEvent(&ev); err != nil {
if err := s.queueManager.PushUnbondingStakingEvent(&ev); err != nil {
return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err))
}
return nil
Expand Down
23 changes: 19 additions & 4 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Service) processCovenantQuorumReachedEvent(
newState := types.DelegationState(covenantQuorumReachedEvent.NewState)
// Emit consumer event if the new state is active
if newState == types.StateActive {
err = s.emitConsumerEvent(ctx, newState, delegation)
err = s.emitActiveDelegationEvent(ctx, delegation)
if err != nil {
return err
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent(
newState := types.DelegationState(inclusionProofEvent.NewState)
// Emit consumer event if the new state is active
if newState == types.StateActive {
err = s.emitConsumerEvent(ctx, newState, delegation)
err = s.emitActiveDelegationEvent(ctx, delegation)
if err != nil {
return err
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent(
}

// Emit consumer event
if err := s.emitConsumerEvent(ctx, types.StateUnbonding, delegation); err != nil {
if err := s.emitUnbondingDelegationEvent(ctx, delegation); err != nil {
return err
}

Expand Down Expand Up @@ -348,7 +348,7 @@ func (s *Service) processBTCDelegationExpiredEvent(
}

// Emit consumer event
if err := s.emitConsumerEvent(ctx, types.StateUnbonding, delegation); err != nil {
if err := s.emitUnbondingDelegationEvent(ctx, delegation); err != nil {
return err
}

Expand Down Expand Up @@ -424,5 +424,20 @@ func (s *Service) processSlashedFinalityProviderEvent(
)
}

delegations, dbErr := s.db.GetDelegationsByFinalityProvider(ctx, fpBTCPKHex)
if dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get BTC delegations by finality provider: %w", dbErr),
)
}

for _, delegation := range delegations {
if err := s.emitUnbondingDelegationEvent(ctx, delegation); err != nil {
return err
}
}

return nil
}
30 changes: 30 additions & 0 deletions tests/mocks/mock_db_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0ec19f9

Please sign in to comment.