Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

feat(BUX-598): broadcast in EF if possible #584

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@ import (
"github.com/BuxOrg/bux/utils"
"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/libsv/go-bc"
"github.com/libsv/go-bt"
"github.com/libsv/go-bt/v2"
"github.com/mrz1836/go-datastore"
)

// RecordTransaction will parse the transaction and save it into the Datastore
//
// Internal (known) transactions: there is a corresponding `draft_transaction` via `draft_id`
// External (known) transactions: there are output(s) related to the destination `reference_id`, tx is valid (mempool/on-chain)
// External (unknown) transactions: no reference id but some output(s) match known outputs, tx is valid (mempool/on-chain)
// Unknown transactions: no matching outputs, tx will be disregarded
//
// RecordTransaction will parse the outgoing transaction and save it into the Datastore
// xPubKey is the raw public xPub
// txHex is the raw transaction hex
// draftID is the unique draft id from a previously started New() transaction (draft_transaction.ID)
// opts are model options and can include "metadata"
func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string, opts ...ModelOps) (*Transaction, error) {
ctx = c.GetOrStartTxn(ctx, "record_transaction")

rts, err := getRecordTxStrategy(ctx, c, xPubKey, txHex, draftID)
tx, err := bt.NewTxFromString(txHex)
if err != nil {
return nil, fmt.Errorf("invalid hex: %w", err)
}

rts, err := getOutgoingTxRecordStrategy(xPubKey, tx, draftID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,7 +134,7 @@ func (c *Client) GetTransactionByHex(ctx context.Context, hex string) (*Transact
return nil, err
}

return c.GetTransaction(ctx, "", tx.GetTxID())
return c.GetTransaction(ctx, "", tx.TxID())
}

// GetTransactions will get all the transactions from the Datastore
Expand Down
14 changes: 9 additions & 5 deletions chainstate/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
//
// NOTE: if successful (in-mempool), no error will be returned
// NOTE: function register the fastest successful broadcast into 'completeChannel' so client doesn't need to wait for other providers
func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Duration, completeChannel, errorChannel chan string) {
func (c *Client) broadcast(ctx context.Context, id, hex string, format HexFormatFlag, timeout time.Duration, completeChannel, errorChannel chan string) {
// Create a context (to cancel or timeout)
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
Expand All @@ -55,7 +55,7 @@ func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Dur
resultsChannel := make(chan broadcastResult)
status := newBroadcastStatus(completeChannel)

for _, broadcastProvider := range createActiveProviders(c, id, hex) {
for _, broadcastProvider := range createActiveProviders(c, id, hex, format) {
wg.Add(1)
go func(provider txBroadcastProvider) {
defer wg.Done()
Expand Down Expand Up @@ -85,11 +85,15 @@ func (c *Client) broadcast(ctx context.Context, id, hex string, timeout time.Dur
}
}

func createActiveProviders(c *Client, txID, txHex string) []txBroadcastProvider {
providers := make([]txBroadcastProvider, 0, 10)
func createActiveProviders(c *Client, txID, txHex string, format HexFormatFlag) []txBroadcastProvider {
providers := make([]txBroadcastProvider, 0)

switch c.ActiveProvider() {
case ProviderMinercraft:
if format != RawTx {
panic("MAPI doesn't support other broadcast format than RawTx")
}

for _, miner := range c.options.config.minercraftConfig.broadcastMiners {
if miner == nil {
continue
Expand All @@ -99,7 +103,7 @@ func createActiveProviders(c *Client, txID, txHex string) []txBroadcastProvider
providers = append(providers, &pvdr)
}
case ProviderBroadcastClient:
pvdr := broadcastClientProvider{txID: txID, txHex: txHex}
pvdr := broadcastClientProvider{txID: txID, txHex: txHex, format: format}
providers = append(providers, &pvdr)
default:
c.options.logger.Warn().Msg("no active provider for broadcast")
Expand Down
60 changes: 43 additions & 17 deletions chainstate/broadcast_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ type mapiBroadcastProvider struct {
txID, txHex string
}

func (provider mapiBroadcastProvider) getName() string {
func (provider *mapiBroadcastProvider) getName() string {
return provider.miner.Name
}

func (provider mapiBroadcastProvider) broadcast(ctx context.Context, c *Client) error {
func (provider *mapiBroadcastProvider) broadcast(ctx context.Context, c *Client) error {
return broadcastMAPI(ctx, c, provider.miner, provider.txID, provider.txHex)
}

// broadcastMAPI will broadcast a transaction to a miner using mAPI
func broadcastMAPI(ctx context.Context, client ClientInterface, miner *minercraft.Miner, id, hex string) error {
debugLog(client, id, "executing broadcast request in mapi using miner: "+miner.Name)
func broadcastMAPI(ctx context.Context, client *Client, miner *minercraft.Miner, id, hex string) error {
logger := client.options.logger
logger.Debug().
Str("txID", id).
Msgf("executing broadcast request in mapi using miner: %s", miner.Name)

resp, err := client.Minercraft().SubmitTransaction(ctx, miner, &minercraft.Transaction{
CallBackEncryption: "", // todo: allow customizing the payload
Expand All @@ -44,7 +47,10 @@ func broadcastMAPI(ctx context.Context, client ClientInterface, miner *minercraf
RawTx: hex,
})
if err != nil {
debugLog(client, id, "error executing request in mapi using miner: "+miner.Name+" failed: "+err.Error())
logger.Debug().
Str("txID", id).
Msgf("error executing request in mapi using miner: %s failed: %s", miner.Name, err.Error())

return err
}

Expand Down Expand Up @@ -83,36 +89,56 @@ func emptyBroadcastResponseErr(txID string) error {
// BroadcastClient provider
type broadcastClientProvider struct {
txID, txHex string
format HexFormatFlag
}

func (provider broadcastClientProvider) getName() string {
func (provider *broadcastClientProvider) getName() string {
return ProviderBroadcastClient
}

// Broadcast using BroadcastClient
func (provider broadcastClientProvider) broadcast(ctx context.Context, c *Client) error {
return broadcastWithBroadcastClient(ctx, c, provider.txID, provider.txHex)
}
func (provider *broadcastClientProvider) broadcast(ctx context.Context, c *Client) error {
logger := c.options.logger

func broadcastWithBroadcastClient(ctx context.Context, client *Client, txID, hex string) error {
debugLog(client, txID, "executing broadcast request for "+ProviderBroadcastClient)
logger.Debug().
Str("txID", provider.txID).
Msgf("executing broadcast request for %s", provider.getName())

tx := broadcast.Transaction{
Hex: hex,
Hex: provider.txHex,
}

formatOpt := broadcast.WithRawFormat()
if provider.format.Contains(Ef) {
formatOpt = broadcast.WithEfFormat()

logger.Debug().
Str("txID", provider.txID).
Msgf("boradcast with broadcast-client in Extended Format")
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.Debug().
Str("txID", provider.txID).
Msgf("boradcast with broadcast-client in RawTx format")
}

result, err := client.BroadcastClient().SubmitTransaction(
result, err := c.BroadcastClient().SubmitTransaction(
ctx,
&tx,
broadcast.WithRawFormat(),
broadcast.WithCallback(client.options.config.callbackURL, client.options.config.callbackToken),
formatOpt,
broadcast.WithCallback(c.options.config.callbackURL, c.options.config.callbackToken),
)

if err != nil {
debugLog(client, txID, "error broadcast request for "+ProviderBroadcastClient+" failed: "+err.Error())
logger.Debug().
Str("txID", provider.txID).
Msgf("error broadcast request for %s failed: %s", provider.getName(), err.Error())

return err
}

debugLog(client, txID, "result broadcast request for "+ProviderBroadcastClient+" blockhash: "+result.BlockHash+" status: "+result.TxStatus.String())
logger.Debug().
Str("txID", provider.txID).
Msgf("result broadcast request for %s blockhash: %s status: %s", provider.getName(), result.BlockHash, result.TxStatus.String())

return nil
}
8 changes: 4 additions & 4 deletions chainstate/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestClient_Broadcast(t *testing.T) {

// when
provider, err := c.Broadcast(
context.Background(), "", onChainExample1TxHex, defaultBroadcastTimeOut,
context.Background(), "", onChainExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
Expand All @@ -55,7 +55,7 @@ func TestClient_Broadcast(t *testing.T) {

// when
provider, err := c.Broadcast(
context.Background(), onChainExample1TxID, "", defaultBroadcastTimeOut,
context.Background(), onChainExample1TxID, "", RawTx, defaultBroadcastTimeOut,
)

// then
Expand All @@ -78,7 +78,7 @@ func TestClient_Broadcast_MAPI(t *testing.T) {

// when
providers, err := c.Broadcast(
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, defaultBroadcastTimeOut,
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestClient_Broadcast_BroadcastClient(t *testing.T) {

// when
providers, err := c.Broadcast(
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, defaultBroadcastTimeOut,
context.Background(), broadcastExample1TxID, broadcastExample1TxHex, RawTx, defaultBroadcastTimeOut,
)

// then
Expand Down
32 changes: 26 additions & 6 deletions chainstate/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,44 @@ import (
"time"
)

type HexFormatFlag byte
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved

const (
RawTx HexFormatFlag = 1
arkadiuszos4chain marked this conversation as resolved.
Show resolved Hide resolved
Ef HexFormatFlag = 2
)
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved

func (flag HexFormatFlag) Contains(other HexFormatFlag) bool {
return (flag & other) == other
}

// Broadcast will attempt to broadcast a transaction using the given providers
func (c *Client) Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error) {
func (c *Client) SupportedBroadcastFormats() HexFormatFlag {
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
switch c.ActiveProvider() {
case ProviderMinercraft:
return RawTx

case ProviderBroadcastClient:
return RawTx | Ef

default:
return RawTx
}
}

func (c *Client) Broadcast(ctx context.Context, id, txHex string, format HexFormatFlag, timeout time.Duration) (string, error) {
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
// Basic validation
if len(id) < 50 {
return "", ErrInvalidTransactionID
} else if len(txHex) <= 0 { // todo: validate the tx hex
return "", ErrInvalidTransactionHex
}

// Debug the id and hex
c.DebugLog("tx_id: " + id)
c.DebugLog("tx_hex: " + txHex)

// Broadcast or die
successCompleteCh := make(chan string)
errorCh := make(chan string)

go c.broadcast(ctx, id, txHex, timeout, successCompleteCh, errorCh)
go c.broadcast(ctx, id, txHex, format, timeout, successCompleteCh, errorCh)

// wait for first success
success := <-successCompleteCh
Expand Down
3 changes: 2 additions & 1 deletion chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type HTTPInterface interface {

// ChainService is the chain related methods
type ChainService interface {
Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error)
SupportedBroadcastFormats() HexFormatFlag
Broadcast(ctx context.Context, id, txHex string, format HexFormatFlag, timeout time.Duration) (string, error)
QueryTransaction(
ctx context.Context, id string, requiredIn RequiredIn, timeout time.Duration,
) (*TransactionInfo, error)
Expand Down
4 changes: 2 additions & 2 deletions chainstate/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (c *Client) fastestQuery(ctx context.Context, id string, requiredIn Require
}
case ProviderBroadcastClient:
wg.Add(1)
go func(ctx context.Context, client *Client, id string, requiredIn RequiredIn) {
go func(ctx context.Context, client *Client, wg *sync.WaitGroup, id string, requiredIn RequiredIn) {
defer wg.Done()
if resp, err := queryBroadcastClient(
ctx, client, id,
); err == nil && checkRequirementArc(requiredIn, id, resp) {
resultsChannel <- resp
}
}(ctxWithCancel, c, id, requiredIn)
}(ctxWithCancel, c, &wg, id, requiredIn)
default:
c.options.logger.Warn().Msg("no active provider for fastestQuery")
}
Expand Down
68 changes: 68 additions & 0 deletions ef_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package bux

import (
"context"
"encoding/hex"

"github.com/libsv/go-bt/v2"
)

func ToEfHex(ctx context.Context, tx *Transaction, store TransactionGetter) (efHex string, ok bool) {
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
btTx := tx.parsedTx

if btTx == nil {
var err error
btTx, err = bt.NewTxFromString(tx.Hex)
if err != nil {
return "", false
}
}

needToHydrate := false
for _, input := range btTx.Inputs {
if input.PreviousTxScript == nil {
needToHydrate = true
break
}
}

if needToHydrate {
if ok := hydrate(ctx, btTx, store); !ok {
return "", false
}
}

return hex.EncodeToString(btTx.ExtendedBytes()), true
}

func hydrate(ctx context.Context, tx *bt.Tx, store TransactionGetter) (ok bool) {
txToGet := make([]string, 0, len(tx.Inputs))

for _, input := range tx.Inputs {
txToGet = append(txToGet, input.PreviousTxIDStr())
}

parentTxs, err := store.GetTransactionsByIDs(ctx, txToGet)
if err != nil {
return false
}
arkadiuszos4chain marked this conversation as resolved.
Show resolved Hide resolved
if len(parentTxs) != len(tx.Inputs) {
return false
}

for _, input := range tx.Inputs {
prevTxId := input.PreviousTxIDStr()
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
pTx := find(parentTxs, func(tx *Transaction) bool { return tx.ID == prevTxId })

pbtTx, err := bt.NewTxFromString((*pTx).Hex)
if err != nil {
return false
}

o := pbtTx.Outputs[input.PreviousTxOutIndex]
input.PreviousTxSatoshis = o.Satoshis
input.PreviousTxScript = o.LockingScript
}

return true
}
6 changes: 5 additions & 1 deletion mock_chainstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type chainStateEverythingInMempool struct {
chainStateBase
}

func (c *chainStateEverythingInMempool) Broadcast(context.Context, string, string, time.Duration) (string, error) {
func (c *chainStateEverythingInMempool) Broadcast(context.Context, string, string, chainstate.HexFormatFlag, time.Duration) (string, error) {
return "", nil
}

Expand Down Expand Up @@ -102,6 +102,10 @@ type chainStateEverythingOnChain struct {
chainStateEverythingInMempool
}

func (c *chainStateEverythingOnChain) SupportedBroadcastFormats() chainstate.HexFormatFlag {
return chainstate.RawTx
}

func (c *chainStateEverythingOnChain) BroadcastClient() broadcast.Client {
return nil
}
Expand Down
Loading
Loading