Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
/ bux Public archive
generated from mrz1836/go-template

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(BUX-598): broadcast in EF if possible
Browse files Browse the repository at this point in the history
arkadiuszos4chain committed Feb 15, 2024
1 parent 0ace965 commit 981aaae
Showing 16 changed files with 272 additions and 138 deletions.
19 changes: 9 additions & 10 deletions action_transaction.go
Original file line number Diff line number Diff line change
@@ -11,25 +11,24 @@ import (
"github.com/BuxOrg/bux/utils"
"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/libsv/go-bc"
"github.com/libsv/go-bt"
"github.com/libsv/go-bt/v2"
"github.com/mrz1836/go-datastore"
)

// RecordTransaction will parse the transaction and save it into the Datastore
//
// Internal (known) transactions: there is a corresponding `draft_transaction` via `draft_id`
// External (known) transactions: there are output(s) related to the destination `reference_id`, tx is valid (mempool/on-chain)
// External (unknown) transactions: no reference id but some output(s) match known outputs, tx is valid (mempool/on-chain)
// Unknown transactions: no matching outputs, tx will be disregarded
//
// RecordTransaction will parse the outgoing transaction and save it into the Datastore
// xPubKey is the raw public xPub
// txHex is the raw transaction hex
// draftID is the unique draft id from a previously started New() transaction (draft_transaction.ID)
// opts are model options and can include "metadata"
func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string, opts ...ModelOps) (*Transaction, error) {
ctx = c.GetOrStartTxn(ctx, "record_transaction")

rts, err := getRecordTxStrategy(ctx, c, xPubKey, txHex, draftID)
tx, err := bt.NewTxFromString(txHex)
if err != nil {
return nil, fmt.Errorf("invalid hex: %w", err)
}

rts, err := getOutgoingTxRecordStrategy(xPubKey, tx, draftID)
if err != nil {
return nil, err
}
@@ -135,7 +134,7 @@ func (c *Client) GetTransactionByHex(ctx context.Context, hex string) (*Transact
return nil, err
}

return c.GetTransaction(ctx, "", tx.GetTxID())
return c.GetTransaction(ctx, "", tx.TxID())
}

// GetTransactions will get all the transactions from the Datastore
14 changes: 9 additions & 5 deletions chainstate/broadcast.go
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ var (
//
// NOTE: if successful (in-mempool), no error will be returned
// NOTE: function register the fastest successful broadcast into 'completeChannel' so client doesn't need to wait for other providers
func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Duration, completeChannel, errorChannel chan string) {
func (c *Client) broadcast(ctx context.Context, id, hex string, format HexFormatFlag, timeout time.Duration, completeChannel, errorChannel chan string) {
// Create a context (to cancel or timeout)
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
@@ -55,7 +55,7 @@ func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Dur
resultsChannel := make(chan broadcastResult)
status := newBroadcastStatus(completeChannel)

for _, broadcastProvider := range createActiveProviders(c, id, hex) {
for _, broadcastProvider := range createActiveProviders(c, id, hex, format) {
wg.Add(1)
go func(provider txBroadcastProvider) {
defer wg.Done()
@@ -85,11 +85,15 @@ func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Dur
}
}

func createActiveProviders(c *Client, txID, txHex string) []txBroadcastProvider {
providers := make([]txBroadcastProvider, 0, 10)
func createActiveProviders(c *Client, txID, txHex string, format HexFormatFlag) []txBroadcastProvider {
providers := make([]txBroadcastProvider, 0)

switch c.ActiveProvider() {
case ProviderMinercraft:
if format != RawTx {
panic("MAPI doesn't support other broadcast format than RawTx")
}

for _, miner := range c.options.config.minercraftConfig.broadcastMiners {
if miner == nil {
continue
@@ -99,7 +103,7 @@ func createActiveProviders(c *Client, txID, txHex string) []txBroadcastProvider
providers = append(providers, &pvdr)
}
case ProviderBroadcastClient:
pvdr := broadcastClientProvider{txID: txID, txHex: txHex}
pvdr := broadcastClientProvider{txID: txID, txHex: txHex, format: format}
providers = append(providers, &pvdr)
default:
c.options.logger.Warn().Msg("no active provider for broadcast")
40 changes: 25 additions & 15 deletions chainstate/broadcast_providers.go
Original file line number Diff line number Diff line change
@@ -22,11 +22,11 @@ type mapiBroadcastProvider struct {
txID, txHex string
}

func (provider mapiBroadcastProvider) getName() string {
func (provider *mapiBroadcastProvider) getName() string {
return provider.miner.Name
}

func (provider mapiBroadcastProvider) broadcast(ctx context.Context, c *Client) error {
func (provider *mapiBroadcastProvider) broadcast(ctx context.Context, c *Client) error {
return broadcastMAPI(ctx, c, provider.miner, provider.txID, provider.txHex)
}

@@ -83,36 +83,46 @@ func emptyBroadcastResponseErr(txID string) error {
// BroadcastClient provider
type broadcastClientProvider struct {
txID, txHex string
format HexFormatFlag
}

func (provider broadcastClientProvider) getName() string {
func (provider *broadcastClientProvider) getName() string {
return ProviderBroadcastClient
}

// Broadcast using BroadcastClient
func (provider broadcastClientProvider) broadcast(ctx context.Context, c *Client) error {
return broadcastWithBroadcastClient(ctx, c, provider.txID, provider.txHex)
}

func broadcastWithBroadcastClient(ctx context.Context, client *Client, txID, hex string) error {
debugLog(client, txID, "executing broadcast request for "+ProviderBroadcastClient)
func (provider *broadcastClientProvider) broadcast(ctx context.Context, c *Client) error {
c.options.logger.Debug().
Str("txID", provider.txID).
Msgf("executing broadcast request for %s", provider.getName())

tx := broadcast.Transaction{
Hex: hex,
Hex: provider.txHex,
}

formatOpt := broadcast.WithRawFormat()
if provider.format.Contains(Ef) {
formatOpt = broadcast.WithEfFormat()
}

result, err := client.BroadcastClient().SubmitTransaction(
result, err := c.BroadcastClient().SubmitTransaction(
ctx,
&tx,
broadcast.WithRawFormat(),
broadcast.WithCallback(client.options.config.callbackURL, client.options.config.callbackToken),
formatOpt,
broadcast.WithCallback(c.options.config.callbackURL, c.options.config.callbackToken),
)

if err != nil {
debugLog(client, txID, "error broadcast request for "+ProviderBroadcastClient+" failed: "+err.Error())
c.options.logger.Debug().
Str("txID", provider.txID).
Msgf("error broadcast request for %s failed: %s", provider.getName(), err.Error())

return err
}

debugLog(client, txID, "result broadcast request for "+ProviderBroadcastClient+" blockhash: "+result.BlockHash+" status: "+result.TxStatus.String())
c.options.logger.Debug().
Str("txID", provider.txID).
Msgf("result broadcast request for %s blockhash: %s status: %s", provider.getName(), result.BlockHash, result.TxStatus.String())

return nil
}
8 changes: 4 additions & 4 deletions chainstate/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ func TestClient_Broadcast(t *testing.T) {

// when
provider, err := c.Broadcast(
context.Background(), "", onChainExample1TxHex, defaultBroadcastTimeOut,
context.Background(), "", onChainExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
@@ -55,7 +55,7 @@ func TestClient_Broadcast(t *testing.T) {

// when
provider, err := c.Broadcast(
context.Background(), onChainExample1TxID, "", defaultBroadcastTimeOut,
context.Background(), onChainExample1TxID, "", RawTx, defaultBroadcastTimeOut,
)

// then
@@ -78,7 +78,7 @@ func TestClient_Broadcast_MAPI(t *testing.T) {

// when
providers, err := c.Broadcast(
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, defaultBroadcastTimeOut,
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
@@ -112,7 +112,7 @@ func TestClient_Broadcast_BroadcastClient(t *testing.T) {

// when
providers, err := c.Broadcast(
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, defaultBroadcastTimeOut,
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
32 changes: 26 additions & 6 deletions chainstate/chainstate.go
Original file line number Diff line number Diff line change
@@ -9,24 +9,44 @@ import (
"time"
)

type HexFormatFlag byte

const (
RawTx HexFormatFlag = 1
Ef HexFormatFlag = 3
)

func (flag HexFormatFlag) Contains(other HexFormatFlag) bool {
return (flag & other) == other
}

// Broadcast will attempt to broadcast a transaction using the given providers
func (c *Client) Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error) {
func (c *Client) SupportedBroadcastFormats() HexFormatFlag {
switch c.ActiveProvider() {
case ProviderMinercraft:
return RawTx

case ProviderBroadcastClient:
return RawTx + Ef

default:
return RawTx
}
}

func (c *Client) Broadcast(ctx context.Context, id, txHex string, format HexFormatFlag, timeout time.Duration) (string, error) {
// Basic validation
if len(id) < 50 {
return "", ErrInvalidTransactionID
} else if len(txHex) <= 0 { // todo: validate the tx hex
return "", ErrInvalidTransactionHex
}

// Debug the id and hex
c.DebugLog("tx_id: " + id)
c.DebugLog("tx_hex: " + txHex)

// Broadcast or die
successCompleteCh := make(chan string)
errorCh := make(chan string)

go c.broadcast(ctx, id, txHex, timeout, successCompleteCh, errorCh)
go c.broadcast(ctx, id, txHex, format, timeout, successCompleteCh, errorCh)

// wait for first success
success := <-successCompleteCh
3 changes: 2 additions & 1 deletion chainstate/interface.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,8 @@ type HTTPInterface interface {

// ChainService is the chain related methods
type ChainService interface {
Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error)
SupportedBroadcastFormats() HexFormatFlag
Broadcast(ctx context.Context, id, txHex string, format HexFormatFlag, timeout time.Duration) (string, error)
QueryTransaction(
ctx context.Context, id string, requiredIn RequiredIn, timeout time.Duration,
) (*TransactionInfo, error)
4 changes: 2 additions & 2 deletions chainstate/transaction.go
Original file line number Diff line number Diff line change
@@ -78,14 +78,14 @@ func (c *Client) fastestQuery(ctx context.Context, id string, requiredIn Require
}
case ProviderBroadcastClient:
wg.Add(1)
go func(ctx context.Context, client *Client, id string, requiredIn RequiredIn) {
go func(ctx context.Context, client *Client, wg *sync.WaitGroup, id string, requiredIn RequiredIn) {
defer wg.Done()
if resp, err := queryBroadcastClient(
ctx, client, id,
); err == nil && checkRequirementArc(requiredIn, id, resp) {
resultsChannel <- resp
}
}(ctxWithCancel, c, id, requiredIn)
}(ctxWithCancel, c, &wg, id, requiredIn)
default:
c.options.logger.Warn().Msg("no active provider for fastestQuery")
}
68 changes: 68 additions & 0 deletions ef_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package bux

import (
"context"
"encoding/hex"

"github.com/libsv/go-bt/v2"
)

func ToEfHex(ctx context.Context, tx *Transaction, store TransactionGetter) (efHex string, ok bool) {
btTx := tx.parsedTx

if btTx == nil {
var err error
btTx, err = bt.NewTxFromString(tx.Hex)
if err != nil {
return "", false
}
}

needToHydrate := false
for _, input := range btTx.Inputs {
if input.PreviousTxScript == nil {
needToHydrate = true
break
}
}

if needToHydrate {
if ok := hydrate(ctx, btTx, store); !ok {
return "", false
}
}

return hex.EncodeToString(btTx.ExtendedBytes()), true
}

func hydrate(ctx context.Context, tx *bt.Tx, store TransactionGetter) (ok bool) {
txToGet := make([]string, 0, len(tx.Inputs))

for _, input := range tx.Inputs {
txToGet = append(txToGet, input.PreviousTxIDStr())
}

parentTxs, err := store.GetTransactionsByIDs(ctx, txToGet)
if err != nil {
return false
}
if len(parentTxs) != len(tx.Inputs) {
return false
}

for _, input := range tx.Inputs {
prevTxId := input.PreviousTxIDStr()
pTx := find(parentTxs, func(tx *Transaction) bool { return tx.ID == prevTxId })

pbtTx, err := bt.NewTxFromString((*pTx).Hex)
if err != nil {
return false
}

o := pbtTx.Outputs[input.PreviousTxOutIndex]
input.PreviousTxSatoshis = o.Satoshis
input.PreviousTxScript = o.LockingScript
}

return true
}
6 changes: 5 additions & 1 deletion mock_chainstate_test.go
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ type chainStateEverythingInMempool struct {
chainStateBase
}

func (c *chainStateEverythingInMempool) Broadcast(context.Context, string, string, time.Duration) (string, error) {
func (c *chainStateEverythingInMempool) Broadcast(context.Context, string, string, chainstate.HexFormatFlag, time.Duration) (string, error) {
return "", nil
}

@@ -102,6 +102,10 @@ type chainStateEverythingOnChain struct {
chainStateEverythingInMempool
}

func (c *chainStateEverythingOnChain) SupportedBroadcastFormats() chainstate.HexFormatFlag {
return chainstate.RawTx
}

func (c *chainStateEverythingOnChain) BroadcastClient() broadcast.Client {
return nil
}
9 changes: 9 additions & 0 deletions model_transactions.go
Original file line number Diff line number Diff line change
@@ -128,6 +128,15 @@ func newTransactionWithDraftID(txHex, draftID string, opts ...ModelOps) (*Transa
return tx, nil
}

func txFromBtTx(btTx *bt.Tx, opts ...ModelOps) *Transaction {
tx := emptyTx(opts...)
tx.ID = btTx.TxID()
tx.Hex = btTx.String()
tx.parsedTx = btTx

return tx
}

// setXPubID will set the xPub ID on the model
func (m *Transaction) setXPubID() {
if len(m.rawXpubKey) > 0 && len(m.XPubID) == 0 {
Loading

0 comments on commit 981aaae

Please sign in to comment.