Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

feat(BUX-368): Arc Callbacks #558

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ go.list
# Jetbrains
.idea/

#VSCode
.vscode/

# Eclipse
.project

Expand Down
35 changes: 35 additions & 0 deletions action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/utils"
"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/libsv/go-bc"
"github.com/libsv/go-bt"
"github.com/mrz1836/go-datastore"
)
Expand Down Expand Up @@ -381,6 +383,39 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
return err
}

// UpdateTransaction will update the broadcast callback transaction info, like: block height, block hash, status, bump.
func (c *Client) UpdateTransaction(ctx context.Context, callbackResp *broadcast.SubmittedTx) error {
bump, err := bc.NewBUMPFromStr(callbackResp.MerklePath)
if err != nil {
c.options.logger.Err(err).Msgf("failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
return err
}

txInfo := &chainstate.TransactionInfo{
BlockHash: callbackResp.BlockHash,
BlockHeight: callbackResp.BlockHeight,
ID: callbackResp.TxID,
TxStatus: callbackResp.TxStatus,
BUMP: bump,
// it's not possible to get confirmations from broadcast client; zero would be treated as "not confirmed" that's why -1
Confirmations: -1,
}

tx, err := c.GetTransaction(ctx, "", txInfo.ID)
if err != nil {
c.options.logger.Err(err).Msgf("failed to get transaction by id: %v", txInfo.ID)
return err
}

syncTx, err := GetSyncTransactionByTxID(ctx, txInfo.ID, c.DefaultModelOptions()...)
if err != nil {
c.options.logger.Err(err).Msgf("failed to get sync transaction by tx id: %v", txInfo.ID)
return err
}

return processSyncTxSave(ctx, txInfo, syncTx, tx)
}

func generateTxIDFilterConditions(txIDs []string) *map[string]interface{} {
orConditions := make([]map[string]interface{}, len(txIDs))

Expand Down
9 changes: 7 additions & 2 deletions chainstate/broadcast_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,19 @@ func (provider broadcastClientProvider) broadcast(ctx context.Context, c *Client
return broadcastWithBroadcastClient(ctx, c, provider.txID, provider.txHex)
}

func broadcastWithBroadcastClient(ctx context.Context, client ClientInterface, txID, hex string) error {
func broadcastWithBroadcastClient(ctx context.Context, client *Client, txID, hex string) error {
debugLog(client, txID, "executing broadcast request for "+ProviderBroadcastClient)

tx := broadcast.Transaction{
Hex: hex,
}

result, err := client.BroadcastClient().SubmitTransaction(ctx, &tx, broadcast.WithRawFormat())
result, err := client.BroadcastClient().SubmitTransaction(
ctx,
&tx,
broadcast.WithRawFormat(),
broadcast.WithCallback(client.options.config.callbackURL, client.options.config.callbackToken),
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
debugLog(client, txID, "error broadcast request for "+ProviderBroadcastClient+" failed: "+err.Error())
return err
Expand Down
2 changes: 2 additions & 0 deletions chainstate/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type (

// syncConfig holds all the configuration about the different sync processes
syncConfig struct {
callbackURL string // Broadcast callback URL
callbackToken string // Broadcast callback access token
excludedProviders []string // List of provider names
httpClient HTTPInterface // Custom HTTP client (Minercraft, WOC)
minercraftConfig *minercraftConfig // minercraftConfig configuration
Expand Down
8 changes: 8 additions & 0 deletions chainstate/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,11 @@ func WithConnectionToPulse(url, authToken string) ClientOps {
c.config.pulseClient = newPulseClientProvider(url, authToken)
}
}

// WithCallback will set broadcast callback settings
func WithCallback(callbackURL, callbackAuthToken string) ClientOps {
return func(c *clientOptions) {
c.config.callbackURL = callbackURL
c.config.callbackToken = callbackAuthToken
}
}
7 changes: 7 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,10 @@ func WithBroadcastClient(broadcastClient broadcast.Client) ClientOps {
c.chainstate.options = append(c.chainstate.options, chainstate.WithBroadcastClient(broadcastClient))
}
}

// WithCallback set callback settings
func WithCallback(callbackURL string, callbackToken string) ClientOps {
return func(c *clientOptions) {
c.chainstate.options = append(c.chainstate.options, chainstate.WithCallback(callbackURL, callbackToken))
}
}
2 changes: 1 addition & 1 deletion cron_job_declarations.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Client) cronJobs() taskmanager.CronJobs {
Handler: handler(taskBroadcastTransactions),
},
CronJobNameSyncTransactionSync: {
Period: 120 * time.Second,
Period: 600 * time.Second,
Handler: handler(taskSyncTransactions),
},
}
Expand Down
2 changes: 1 addition & 1 deletion definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
dustLimit = uint64(1) // Dust limit
mongoTestVersion = "6.0.4" // Mongo Testing Version
sqliteTestVersion = "3.37.0" // SQLite Testing Version (dummy version for now)
version = "v0.13.0" // bux version
version = "v0.14.2" // bux version
)

// All the base models
Expand Down
2 changes: 1 addition & 1 deletion go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/BuxOrg/bux/cluster"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/bitcoin-sv/go-paymail"
"github.com/mrz1836/go-cachestore"
"github.com/mrz1836/go-datastore"
Expand Down Expand Up @@ -132,6 +133,7 @@ type TransactionService interface {
RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string,
opts ...ModelOps) (*Transaction, error)
RecordRawTransaction(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error)
UpdateTransaction(ctx context.Context, txInfo *broadcast.SubmittedTx) error
UpdateTransactionMetadata(ctx context.Context, xPubID, id string, metadata Metadata) (*Transaction, error)
RevertTransaction(ctx context.Context, id string) error
}
Expand Down
19 changes: 19 additions & 0 deletions sync_tx_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ func GetSyncTransactionByID(ctx context.Context, id string, opts ...ModelOps) (*
return txs[0], nil
}

// GetSyncTransactionByTxID will get a sync transaction by it's transaction id.
func GetSyncTransactionByTxID(ctx context.Context, txID string, opts ...ModelOps) (*SyncTransaction, error) {
// Get the records by status
txs, err := _getSyncTransactionsByConditions(ctx,
map[string]interface{}{
idField: txID,
},
nil, opts...,
)
if err != nil {
return nil, err
}
if len(txs) != 1 {
return nil, nil
}

return txs[0], nil
}

/*** /exported funcs ***/

/*** public unexported funcs ***/
Expand Down
14 changes: 6 additions & 8 deletions sync_tx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,17 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact
}
return err
}
return processSyncTxSave(ctx, txInfo, syncTx, transaction)
}

func processSyncTxSave(ctx context.Context, txInfo *chainstate.TransactionInfo, syncTx *SyncTransaction, transaction *Transaction) error {
if !txInfo.Valid() {
syncTx.Client().Logger().Warn().
Str("txID", syncTx.ID).
Msgf("txInfo is invalid, will try again later")

if syncTx.Client().IsDebug() {
txInfoJSON, _ := json.Marshal(txInfo) //nolint:errchkjson // error is not needed
txInfoJSON, _ := json.Marshal(txInfo)
syncTx.Client().Logger().Debug().
Str("txID", syncTx.ID).
Msgf("txInfo: %s", string(txInfoJSON))
Expand All @@ -238,18 +241,15 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact

transaction.setChainInfo(txInfo)

// Create status message
message := "transaction was found on-chain by " + chainstate.ProviderBroadcastClient

// Save the transaction (should NOT error)
if err = transaction.Save(ctx); err != nil {
if err := transaction.Save(ctx); err != nil {
_bailAndSaveSyncTransaction(
ctx, syncTx, SyncStatusError, syncActionSync, "internal", err.Error(),
)
return err
}

// Update the sync status
syncTx.SyncStatus = SyncStatusComplete
syncTx.Results.LastMessage = message
syncTx.Results.Results = append(syncTx.Results.Results, &SyncResult{
Expand All @@ -259,16 +259,14 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact
StatusMessage: message,
})

// Update the sync transaction record
if err = syncTx.Save(ctx); err != nil {
if err := syncTx.Save(ctx); err != nil {
_bailAndSaveSyncTransaction(ctx, syncTx, SyncStatusError, syncActionSync, "internal", err.Error())
return err
}

syncTx.Client().Logger().Info().
Str("txID", syncTx.ID).
Msgf("Transaction processed successfully")
// Done!
return nil
}

Expand Down
19 changes: 19 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"fmt"
"hash/adler32"
"math"
"strconv"

Expand Down Expand Up @@ -95,6 +97,23 @@ func LittleEndianBytes64(value uint64, resultLength uint32) []byte {
return buf
}

// HashAdler32 returns computed string calculated with Adler32 function.
func HashAdler32(input string) (string, error) {
if input == "" {
return "", fmt.Errorf("input string is empty - cannot apply adler32 hash function")
}
data := []byte(input)
hasher := adler32.New()
_, err := hasher.Write(data)
if err != nil {
return "", err
}

sum := hasher.Sum32()

return fmt.Sprintf("%08x", sum), nil
}

// SafeAssign - Assigns value (not pointer) the src to dest if src is not nil
func SafeAssign[T any](dest *T, src *T) {
if src != nil {
Expand Down
Loading