Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat(BUX-598): broadcast in EF if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Feb 17, 2024
1 parent dde5783 commit 7b28325
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 138 deletions.
19 changes: 9 additions & 10 deletions action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@ import (
"github.com/BuxOrg/bux/utils"
"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/libsv/go-bc"
"github.com/libsv/go-bt"
"github.com/libsv/go-bt/v2"
"github.com/mrz1836/go-datastore"
)

// RecordTransaction will parse the transaction and save it into the Datastore
//
// Internal (known) transactions: there is a corresponding `draft_transaction` via `draft_id`
// External (known) transactions: there are output(s) related to the destination `reference_id`, tx is valid (mempool/on-chain)
// External (unknown) transactions: no reference id but some output(s) match known outputs, tx is valid (mempool/on-chain)
// Unknown transactions: no matching outputs, tx will be disregarded
//
// RecordTransaction will parse the outgoing transaction and save it into the Datastore
// xPubKey is the raw public xPub
// txHex is the raw transaction hex
// draftID is the unique draft id from a previously started New() transaction (draft_transaction.ID)
// opts are model options and can include "metadata"
func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string, opts ...ModelOps) (*Transaction, error) {
ctx = c.GetOrStartTxn(ctx, "record_transaction")

rts, err := getRecordTxStrategy(ctx, c, xPubKey, txHex, draftID)
tx, err := bt.NewTxFromString(txHex)
if err != nil {
return nil, fmt.Errorf("invalid hex: %w", err)
}

rts, err := getOutgoingTxRecordStrategy(xPubKey, tx, draftID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,7 +134,7 @@ func (c *Client) GetTransactionByHex(ctx context.Context, hex string) (*Transact
return nil, err
}

return c.GetTransaction(ctx, "", tx.GetTxID())
return c.GetTransaction(ctx, "", tx.TxID())
}

// GetTransactions will get all the transactions from the Datastore
Expand Down
14 changes: 9 additions & 5 deletions chainstate/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
//
// NOTE: if successful (in-mempool), no error will be returned
// NOTE: function register the fastest successful broadcast into 'completeChannel' so client doesn't need to wait for other providers
func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Duration, completeChannel, errorChannel chan string) {
func (c *Client) broadcast(ctx context.Context, id, hex string, format HexFormatFlag, timeout time.Duration, completeChannel, errorChannel chan string) {
// Create a context (to cancel or timeout)
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
Expand All @@ -55,7 +55,7 @@ func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Dur
resultsChannel := make(chan broadcastResult)
status := newBroadcastStatus(completeChannel)

for _, broadcastProvider := range createActiveProviders(c, id, hex) {
for _, broadcastProvider := range createActiveProviders(c, id, hex, format) {
wg.Add(1)
go func(provider txBroadcastProvider) {
defer wg.Done()
Expand Down Expand Up @@ -85,11 +85,15 @@ func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Dur
}
}

func createActiveProviders(c *Client, txID, txHex string) []txBroadcastProvider {
providers := make([]txBroadcastProvider, 0, 10)
func createActiveProviders(c *Client, txID, txHex string, format HexFormatFlag) []txBroadcastProvider {
providers := make([]txBroadcastProvider, 0)

switch c.ActiveProvider() {
case ProviderMinercraft:
if format != RawTx {
panic("MAPI doesn't support other broadcast format than RawTx")
}

for _, miner := range c.options.config.minercraftConfig.broadcastMiners {
if miner == nil {
continue
Expand All @@ -99,7 +103,7 @@ func createActiveProviders(c *Client, txID, txHex string) []txBroadcastProvider
providers = append(providers, &pvdr)
}
case ProviderBroadcastClient:
pvdr := broadcastClientProvider{txID: txID, txHex: txHex}
pvdr := broadcastClientProvider{txID: txID, txHex: txHex, format: format}
providers = append(providers, &pvdr)
default:
c.options.logger.Warn().Msg("no active provider for broadcast")
Expand Down
40 changes: 25 additions & 15 deletions chainstate/broadcast_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ type mapiBroadcastProvider struct {
txID, txHex string
}

func (provider mapiBroadcastProvider) getName() string {
func (provider *mapiBroadcastProvider) getName() string {
return provider.miner.Name
}

func (provider mapiBroadcastProvider) broadcast(ctx context.Context, c *Client) error {
func (provider *mapiBroadcastProvider) broadcast(ctx context.Context, c *Client) error {
return broadcastMAPI(ctx, c, provider.miner, provider.txID, provider.txHex)
}

Expand Down Expand Up @@ -83,36 +83,46 @@ func emptyBroadcastResponseErr(txID string) error {
// BroadcastClient provider
type broadcastClientProvider struct {
txID, txHex string
format HexFormatFlag
}

func (provider broadcastClientProvider) getName() string {
func (provider *broadcastClientProvider) getName() string {
return ProviderBroadcastClient
}

// Broadcast using BroadcastClient
func (provider broadcastClientProvider) broadcast(ctx context.Context, c *Client) error {
return broadcastWithBroadcastClient(ctx, c, provider.txID, provider.txHex)
}

func broadcastWithBroadcastClient(ctx context.Context, client *Client, txID, hex string) error {
debugLog(client, txID, "executing broadcast request for "+ProviderBroadcastClient)
func (provider *broadcastClientProvider) broadcast(ctx context.Context, c *Client) error {
c.options.logger.Debug().
Str("txID", provider.txID).
Msgf("executing broadcast request for %s", provider.getName())

tx := broadcast.Transaction{
Hex: hex,
Hex: provider.txHex,
}

formatOpt := broadcast.WithRawFormat()
if provider.format.Contains(Ef) {
formatOpt = broadcast.WithEfFormat()
}

result, err := client.BroadcastClient().SubmitTransaction(
result, err := c.BroadcastClient().SubmitTransaction(
ctx,
&tx,
broadcast.WithRawFormat(),
broadcast.WithCallback(client.options.config.callbackURL, client.options.config.callbackToken),
formatOpt,
broadcast.WithCallback(c.options.config.callbackURL, c.options.config.callbackToken),
)

if err != nil {
debugLog(client, txID, "error broadcast request for "+ProviderBroadcastClient+" failed: "+err.Error())
c.options.logger.Debug().
Str("txID", provider.txID).
Msgf("error broadcast request for %s failed: %s", provider.getName(), err.Error())

return err
}

debugLog(client, txID, "result broadcast request for "+ProviderBroadcastClient+" blockhash: "+result.BlockHash+" status: "+result.TxStatus.String())
c.options.logger.Debug().
Str("txID", provider.txID).
Msgf("result broadcast request for %s blockhash: %s status: %s", provider.getName(), result.BlockHash, result.TxStatus.String())

return nil
}
8 changes: 4 additions & 4 deletions chainstate/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestClient_Broadcast(t *testing.T) {

// when
provider, err := c.Broadcast(
context.Background(), "", onChainExample1TxHex, defaultBroadcastTimeOut,
context.Background(), "", onChainExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
Expand All @@ -55,7 +55,7 @@ func TestClient_Broadcast(t *testing.T) {

// when
provider, err := c.Broadcast(
context.Background(), onChainExample1TxID, "", defaultBroadcastTimeOut,
context.Background(), onChainExample1TxID, "", RawTx, defaultBroadcastTimeOut,
)

// then
Expand All @@ -78,7 +78,7 @@ func TestClient_Broadcast_MAPI(t *testing.T) {

// when
providers, err := c.Broadcast(
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, defaultBroadcastTimeOut,
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestClient_Broadcast_BroadcastClient(t *testing.T) {

// when
providers, err := c.Broadcast(
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, defaultBroadcastTimeOut,
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
Expand Down
32 changes: 26 additions & 6 deletions chainstate/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,44 @@ import (
"time"
)

type HexFormatFlag byte

const (
RawTx HexFormatFlag = 1
Ef HexFormatFlag = 3
)

func (flag HexFormatFlag) Contains(other HexFormatFlag) bool {
return (flag & other) == other
}

// Broadcast will attempt to broadcast a transaction using the given providers
func (c *Client) Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error) {
func (c *Client) SupportedBroadcastFormats() HexFormatFlag {
switch c.ActiveProvider() {
case ProviderMinercraft:
return RawTx

case ProviderBroadcastClient:
return RawTx + Ef

default:
return RawTx
}
}

func (c *Client) Broadcast(ctx context.Context, id, txHex string, format HexFormatFlag, timeout time.Duration) (string, error) {
// Basic validation
if len(id) < 50 {
return "", ErrInvalidTransactionID
} else if len(txHex) <= 0 { // todo: validate the tx hex
return "", ErrInvalidTransactionHex
}

// Debug the id and hex
c.DebugLog("tx_id: " + id)
c.DebugLog("tx_hex: " + txHex)

// Broadcast or die
successCompleteCh := make(chan string)
errorCh := make(chan string)

go c.broadcast(ctx, id, txHex, timeout, successCompleteCh, errorCh)
go c.broadcast(ctx, id, txHex, format, timeout, successCompleteCh, errorCh)

// wait for first success
success := <-successCompleteCh
Expand Down
3 changes: 2 additions & 1 deletion chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type HTTPInterface interface {

// ChainService is the chain related methods
type ChainService interface {
Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error)
SupportedBroadcastFormats() HexFormatFlag
Broadcast(ctx context.Context, id, txHex string, format HexFormatFlag, timeout time.Duration) (string, error)
QueryTransaction(
ctx context.Context, id string, requiredIn RequiredIn, timeout time.Duration,
) (*TransactionInfo, error)
Expand Down
4 changes: 2 additions & 2 deletions chainstate/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (c *Client) fastestQuery(ctx context.Context, id string, requiredIn Require
}
case ProviderBroadcastClient:
wg.Add(1)
go func(ctx context.Context, client *Client, id string, requiredIn RequiredIn) {
go func(ctx context.Context, client *Client, wg *sync.WaitGroup, id string, requiredIn RequiredIn) {
defer wg.Done()
if resp, err := queryBroadcastClient(
ctx, client, id,
); err == nil && checkRequirementArc(requiredIn, id, resp) {
resultsChannel <- resp
}
}(ctxWithCancel, c, id, requiredIn)
}(ctxWithCancel, c, &wg, id, requiredIn)
default:
c.options.logger.Warn().Msg("no active provider for fastestQuery")
}
Expand Down
68 changes: 68 additions & 0 deletions ef_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package bux

import (
"context"
"encoding/hex"

"github.com/libsv/go-bt/v2"
)

func ToEfHex(ctx context.Context, tx *Transaction, store TransactionGetter) (efHex string, ok bool) {
btTx := tx.parsedTx

if btTx == nil {
var err error
btTx, err = bt.NewTxFromString(tx.Hex)
if err != nil {
return "", false
}
}

needToHydrate := false
for _, input := range btTx.Inputs {
if input.PreviousTxScript == nil {
needToHydrate = true
break
}
}

if needToHydrate {
if ok := hydrate(ctx, btTx, store); !ok {
return "", false
}
}

return hex.EncodeToString(btTx.ExtendedBytes()), true
}

func hydrate(ctx context.Context, tx *bt.Tx, store TransactionGetter) (ok bool) {
txToGet := make([]string, 0, len(tx.Inputs))

for _, input := range tx.Inputs {
txToGet = append(txToGet, input.PreviousTxIDStr())
}

parentTxs, err := store.GetTransactionsByIDs(ctx, txToGet)
if err != nil {
return false
}
if len(parentTxs) != len(tx.Inputs) {
return false
}

for _, input := range tx.Inputs {
prevTxId := input.PreviousTxIDStr()
pTx := find(parentTxs, func(tx *Transaction) bool { return tx.ID == prevTxId })

pbtTx, err := bt.NewTxFromString((*pTx).Hex)
if err != nil {
return false
}

o := pbtTx.Outputs[input.PreviousTxOutIndex]
input.PreviousTxSatoshis = o.Satoshis
input.PreviousTxScript = o.LockingScript
}

return true
}
6 changes: 5 additions & 1 deletion mock_chainstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type chainStateEverythingInMempool struct {
chainStateBase
}

func (c *chainStateEverythingInMempool) Broadcast(context.Context, string, string, time.Duration) (string, error) {
func (c *chainStateEverythingInMempool) Broadcast(context.Context, string, string, chainstate.HexFormatFlag, time.Duration) (string, error) {
return "", nil
}

Expand Down Expand Up @@ -102,6 +102,10 @@ type chainStateEverythingOnChain struct {
chainStateEverythingInMempool
}

func (c *chainStateEverythingOnChain) SupportedBroadcastFormats() chainstate.HexFormatFlag {
return chainstate.RawTx
}

func (c *chainStateEverythingOnChain) BroadcastClient() broadcast.Client {
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ func newTransactionWithDraftID(txHex, draftID string, opts ...ModelOps) (*Transa
return tx, nil
}

func txFromBtTx(btTx *bt.Tx, opts ...ModelOps) *Transaction {
tx := emptyTx(opts...)
tx.ID = btTx.TxID()
tx.Hex = btTx.String()
tx.parsedTx = btTx

return tx
}

// setXPubID will set the xPub ID on the model
func (m *Transaction) setXPubID() {
if len(m.rawXpubKey) > 0 && len(m.XPubID) == 0 {
Expand Down
Loading

0 comments on commit 7b28325

Please sign in to comment.