Skip to content

Commit

Permalink
fix: update db after processing event (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 authored Dec 6, 2024
1 parent 7da2561 commit 4d546a2
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 59 deletions.
16 changes: 11 additions & 5 deletions internal/services/consumer_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ import (
queuecli "github.com/babylonlabs-io/staking-queue-client/client"
)

func (s *Service) emitActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
func (s *Service) emitActiveDelegationEvent(
ctx context.Context,
stakingTxHashHex string,
stakerBtcPkHex string,
finalityProviderBtcPksHex []string,
stakingAmount uint64,
) *types.Error {
stakingEvent := queuecli.NewActiveStakingEvent(
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
stakingTxHashHex,
stakerBtcPkHex,
finalityProviderBtcPksHex,
stakingAmount,
)

if err := s.queueManager.PushActiveStakingEvent(&stakingEvent); err != nil {
Expand Down
109 changes: 73 additions & 36 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,6 @@ func (s *Service) processCovenantQuorumReachedEvent(
return nil
}

// Update delegation state
newState := types.DelegationState(covenantQuorumReachedEvent.NewState)
if dbErr := s.db.UpdateBTCDelegationState(
ctx,
covenantQuorumReachedEvent.StakingTxHash,
types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState),
newState,
nil,
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation state: %w", dbErr),
)
}

// Emit event and register spend notification
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash)
if dbErr != nil {
Expand All @@ -165,18 +149,52 @@ func (s *Service) processCovenantQuorumReachedEvent(
fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr),
)
}

newState := types.DelegationState(covenantQuorumReachedEvent.NewState)
if newState == types.StateActive {
err = s.emitActiveDelegationEvent(ctx, delegation)
log.Debug().
Str("staking_tx", covenantQuorumReachedEvent.StakingTxHash).
Str("staking_start_height", strconv.FormatUint(uint64(delegation.StartHeight), 10)).
Str("event_type", EventCovenantQuorumReached.String()).
Msg("handling active state")

err = s.emitActiveDelegationEvent(
ctx,
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
)
if err != nil {
return err
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
if err := s.registerStakingSpendNotification(
ctx,
delegation.StakingTxHashHex,
delegation.StakingTxHex,
delegation.StakingOutputIdx,
delegation.StartHeight,
); err != nil {
return err
}
}

// Update delegation state
if dbErr := s.db.UpdateBTCDelegationState(
ctx,
covenantQuorumReachedEvent.StakingTxHash,
types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState),
newState,
nil,
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation state: %w", dbErr),
)
}

return nil
}

Expand All @@ -199,19 +217,6 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent(
return nil
}

// Update delegation details
if dbErr := s.db.UpdateBTCDelegationDetails(
ctx,
inclusionProofEvent.StakingTxHash,
model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent),
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation details: %w", dbErr),
)
}

// Emit event and register spend notification
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash)
if dbErr != nil {
Expand All @@ -223,17 +228,48 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent(
}
newState := types.DelegationState(inclusionProofEvent.NewState)
if newState == types.StateActive {
err = s.emitActiveDelegationEvent(ctx, delegation)
stakingStartHeight, _ := strconv.ParseUint(inclusionProofEvent.StartHeight, 10, 32)

log.Debug().
Str("staking_tx", inclusionProofEvent.StakingTxHash).
Str("staking_start_height", inclusionProofEvent.StartHeight).
Str("event_type", EventBTCDelegationInclusionProofReceived.String()).
Msg("handling active state")

err = s.emitActiveDelegationEvent(
ctx,
inclusionProofEvent.StakingTxHash,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
)
if err != nil {
return err
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
if err := s.registerStakingSpendNotification(ctx,
delegation.StakingTxHashHex,
delegation.StakingTxHex,
delegation.StakingOutputIdx,
uint32(stakingStartHeight),
); err != nil {
return err
}
}

// Update delegation details
if dbErr := s.db.UpdateBTCDelegationDetails(
ctx,
inclusionProofEvent.StakingTxHash,
model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent),
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation details: %w", dbErr),
)
}

return nil
}

Expand Down Expand Up @@ -304,7 +340,8 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent(
Str("unbonding_time", strconv.FormatUint(uint64(delegation.UnbondingTime), 10)).
Str("unbonding_expire_height", strconv.FormatUint(uint64(unbondingExpireHeight), 10)).
Str("sub_state", subState.String()).
Msg("updating delegation state to early unbonding")
Str("event_type", EventBTCDelgationUnbondedEarly.String()).
Msg("updating delegation state")

// Update delegation state
if err := s.db.UpdateBTCDelegationState(
Expand Down
23 changes: 11 additions & 12 deletions internal/services/delegation_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ func (s *Service) registerUnbondingSpendNotification(

func (s *Service) registerStakingSpendNotification(
ctx context.Context,
delegation *model.BTCDelegationDetails,
stakingTxHashHex string,
stakingTxHex string,
stakingOutputIdx uint32,
stakingStartHeight uint32,
) *types.Error {
stakingTxHash, err := chainhash.NewHashFromStr(delegation.StakingTxHashHex)
stakingTxHash, err := chainhash.NewHashFromStr(stakingTxHashHex)
if err != nil {
return types.NewError(
http.StatusInternalServerError,
Expand All @@ -79,7 +82,7 @@ func (s *Service) registerStakingSpendNotification(
)
}

stakingTx, err := utils.DeserializeBtcTransactionFromHex(delegation.StakingTxHex)
stakingTx, err := utils.DeserializeBtcTransactionFromHex(stakingTxHex)
if err != nil {
return types.NewError(
http.StatusInternalServerError,
Expand All @@ -88,30 +91,26 @@ func (s *Service) registerStakingSpendNotification(
)
}

log.Debug().
Str("staking_tx", delegation.StakingTxHashHex).
Msg("registering staking spend notification")

stakingOutpoint := wire.OutPoint{
Hash: *stakingTxHash,
Index: delegation.StakingOutputIdx,
Index: stakingOutputIdx,
}

spendEv, err := s.btcNotifier.RegisterSpendNtfn(
&stakingOutpoint,
stakingTx.TxOut[delegation.StakingOutputIdx].PkScript,
delegation.StartHeight,
stakingTx.TxOut[stakingOutputIdx].PkScript,
stakingStartHeight,
)
if err != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to register spend ntfn for staking tx %s: %w", delegation.StakingTxHashHex, err),
fmt.Errorf("failed to register spend ntfn for staking tx %s: %w", stakingTxHashHex, err),
)
}

s.wg.Add(1)
go s.watchForSpendStakingTx(spendEv, delegation)
go s.watchForSpendStakingTx(spendEv, stakingTxHashHex)

return nil
}
4 changes: 4 additions & 0 deletions internal/services/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type EventTypes string

type EventCategory string

func (e EventTypes) String() string {
return string(e)
}

const (
BlockCategory EventCategory = "block"
TxCategory EventCategory = "tx"
Expand Down
8 changes: 7 additions & 1 deletion internal/services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) {

for _, delegation := range delegations {
// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
if err := s.registerStakingSpendNotification(
ctx,
delegation.StakingTxHashHex,
delegation.StakingTxHex,
delegation.StakingOutputIdx,
delegation.StartHeight,
); err != nil {
log.Fatal().Msgf("Failed to register spend notification: %v", err)
}
}
Expand Down
15 changes: 10 additions & 5 deletions internal/services/watch_btc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func (s *Service) watchForSpendStakingTx(
spendEvent *notifier.SpendEvent,
delegation *model.BTCDelegationDetails,
stakingTxHashHex string,
) {
defer s.wg.Done()
quitCtx, cancel := s.quitContext()
Expand All @@ -32,19 +32,19 @@ func (s *Service) watchForSpendStakingTx(
select {
case spendDetail := <-spendEvent.Spend:
log.Debug().
Str("staking_tx", delegation.StakingTxHashHex).
Str("staking_tx", stakingTxHashHex).
Str("spending_tx", spendDetail.SpendingTx.TxHash().String()).
Msg("staking tx has been spent")
if err := s.handleSpendingStakingTransaction(
quitCtx,
spendDetail.SpendingTx,
spendDetail.SpenderInputIndex,
uint32(spendDetail.SpendingHeight),
delegation,
stakingTxHashHex,
); err != nil {
log.Error().
Err(err).
Str("staking_tx", delegation.StakingTxHashHex).
Str("staking_tx", stakingTxHashHex).
Str("spending_tx", spendDetail.SpendingTx.TxHash().String()).
Msg("failed to handle spending staking transaction")
return
Expand Down Expand Up @@ -158,8 +158,13 @@ func (s *Service) handleSpendingStakingTransaction(
spendingTx *wire.MsgTx,
spendingInputIdx uint32,
spendingHeight uint32,
delegation *model.BTCDelegationDetails,
stakingTxHashHex string,
) error {
delegation, err := s.db.GetBTCDelegationByStakingTxHash(ctx, stakingTxHashHex)
if err != nil {
return fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", err)
}

params, err := s.db.GetStakingParams(ctx, delegation.ParamsVersion)
if err != nil {
return fmt.Errorf("failed to get staking params: %w", err)
Expand Down

0 comments on commit 4d546a2

Please sign in to comment.