From 26274f4b851a08fe7c7311354106b69a8dd58099 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Mon, 24 Jun 2024 10:16:58 +0200 Subject: [PATCH 01/14] Log time instead of string --- internal/metamorph/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index d7a84fb9a..bd8a8647b 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -416,7 +416,7 @@ func (p *Processor) StartProcessExpiredTransactions() { } if announced > 0 || requested > 0 { - p.logger.Info("Retried unmined transactions", slog.Int("announced", announced), slog.Int("requested", requested), slog.String("since", getUnminedSince.String())) + p.logger.Info("Retried unmined transactions", slog.Int("announced", announced), slog.Int("requested", requested), slog.Time("since", getUnminedSince)) } } } From 3caa50c4db675bb5ad49b94609e7f83d32a3b377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 12:54:03 +0200 Subject: [PATCH 02/14] Refactor utxo consolidator and utxo creator into separate structs --- .../app/utxos/consolidate/consolidate.go | 41 ++- .../app/utxos/create/create.go | 39 ++- internal/broadcaster/broadcaster.go | 135 +++++++++ internal/broadcaster/utxo_consolidator.go | 211 ++++++++++++++ internal/broadcaster/utxo_creator.go | 264 ++++++++++++++++++ 5 files changed, 647 insertions(+), 43 deletions(-) create mode 100644 internal/broadcaster/broadcaster.go create mode 100644 internal/broadcaster/utxo_consolidator.go create mode 100644 internal/broadcaster/utxo_creator.go diff --git a/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go b/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go index 996dee61f..5a35927c2 100644 --- a/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go +++ b/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go @@ -1,17 +1,15 @@ package consolidate import ( - "context" "errors" "fmt" - "log/slog" - "strings" - "time" "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "github.com/bitcoin-sv/arc/internal/broadcaster" "github.com/bitcoin-sv/arc/internal/woc_client" + "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/spf13/cobra" + "strings" ) var Cmd = &cobra.Command{ @@ -85,32 +83,31 @@ var Cmd = &cobra.Command{ keyFiles := strings.Split(keyFile, ",") - for _, kf := range keyFiles { + fundingKeySets := make([]*keyset.KeySet, len(keyFiles)) - if wocApiKey == "" { - time.Sleep(1 * time.Second) - } + for i, kf := range keyFiles { fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf) if err != nil { return fmt.Errorf("failed to get key sets: %v", err) } - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, - broadcaster.WithFees(miningFeeSat), - broadcaster.WithIsTestnet(isTestnet), - broadcaster.WithCallback(callbackURL, callbackToken), - broadcaster.WithFullstatusUpdates(fullStatusUpdates), - ) - if err != nil { - return fmt.Errorf("failed to create broadcaster: %v", err) - } + fundingKeySets[i] = fundingKeySet + } - logger.Info("consolidating utxos", slog.String("key", kf), slog.String("address", fundingKeySet.Address(!isTestnet))) - err = rateBroadcaster.Consolidate(context.Background()) - if err != nil { - return fmt.Errorf("failed to consolidate utxos: %v", err) - } + rateBroadcaster, err := broadcaster.NewUTXOConsolidator(logger, client, fundingKeySets, wocClient, + broadcaster.WithFees(miningFeeSat), + broadcaster.WithIsTestnet(isTestnet), + broadcaster.WithCallback(callbackURL, callbackToken), + broadcaster.WithFullstatusUpdates(fullStatusUpdates), + ) + if err != nil { + return fmt.Errorf("failed to create broadcaster: %v", err) + } + + err = rateBroadcaster.Consolidate() + if err != nil { + return fmt.Errorf("failed to consolidate utxos: %v", err) } return nil diff --git a/cmd/broadcaster-cli/app/utxos/create/create.go b/cmd/broadcaster-cli/app/utxos/create/create.go index 70ebe490a..6ef6d78dc 100644 --- a/cmd/broadcaster-cli/app/utxos/create/create.go +++ b/cmd/broadcaster-cli/app/utxos/create/create.go @@ -1,17 +1,15 @@ package create import ( - "context" "errors" "fmt" "log" - "log/slog" "strings" - "time" "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "github.com/bitcoin-sv/arc/internal/broadcaster" "github.com/bitcoin-sv/arc/internal/woc_client" + "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -103,32 +101,31 @@ var Cmd = &cobra.Command{ keyFiles := strings.Split(keyFile, ",") - for _, kf := range keyFiles { + fundingKeySets := make([]*keyset.KeySet, len(keyFiles)) - if wocApiKey == "" { - time.Sleep(1 * time.Second) - } + for i, kf := range keyFiles { fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf) if err != nil { return fmt.Errorf("failed to get key sets: %v", err) } - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, - broadcaster.WithFees(miningFeeSat), - broadcaster.WithIsTestnet(isTestnet), - broadcaster.WithCallback(callbackURL, callbackToken), - broadcaster.WithFullstatusUpdates(fullStatusUpdates), - ) - if err != nil { - return fmt.Errorf("failed to create broadcaster: %v", err) - } + fundingKeySets[i] = fundingKeySet + } - logger.Info("creating utxos", slog.String("key", kf), slog.String("address", fundingKeySet.Address(!isTestnet))) - err = rateBroadcaster.CreateUtxos(context.Background(), outputs, satoshisPerOutput) - if err != nil { - return fmt.Errorf("failed to create utxos: %v", err) - } + rateBroadcaster, err := broadcaster.NewUTXOCreator(logger, client, fundingKeySets, wocClient, + broadcaster.WithFees(miningFeeSat), + broadcaster.WithIsTestnet(isTestnet), + broadcaster.WithCallback(callbackURL, callbackToken), + broadcaster.WithFullstatusUpdates(fullStatusUpdates), + ) + if err != nil { + return fmt.Errorf("failed to create broadcaster: %v", err) + } + + err = rateBroadcaster.CreateUtxos(outputs, satoshisPerOutput) + if err != nil { + return fmt.Errorf("failed to create utxos: %v", err) } return nil diff --git a/internal/broadcaster/broadcaster.go b/internal/broadcaster/broadcaster.go new file mode 100644 index 000000000..3310dd0da --- /dev/null +++ b/internal/broadcaster/broadcaster.go @@ -0,0 +1,135 @@ +package broadcaster + +import ( + "context" + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/libsv/go-bt/v2" + "log/slog" + "math" +) + +const ( + maxInputsDefault = 100 + batchSizeDefault = 20 + isTestnetDefault = true + millisecondsPerSecond = 1000 +) + +type Broadcaster struct { + logger *slog.Logger + client ArcClient + keySets []*keyset.KeySet + isTestnet bool + feeQuote *bt.FeeQuote + utxoClient UtxoClient + standardMiningFee bt.FeeUnit + callbackURL string + callbackToken string + fullStatusUpdates bool + cancelAll context.CancelFunc + ctx context.Context + maxInputs int + batchSize int +} + +func WithIsTestnet(isTestnet bool) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.isTestnet = isTestnet + } +} + +func WithBatchSize(batchSize int) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.batchSize = batchSize + } +} + +func WithMaxInputs(maxInputs int) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.maxInputs = maxInputs + } +} + +func WithCallback(callbackURL string, callbackToken string) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.callbackURL = callbackURL + broadcaster.callbackToken = callbackToken + } +} + +func WithFullstatusUpdates(fullStatusUpdates bool) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.fullStatusUpdates = fullStatusUpdates + } +} + +func WithFees(miningFeeSatPerKb int) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + var fq = bt.NewFeeQuote() + + newStdFee := *stdFeeDefault + newDataFee := *dataFeeDefault + + newStdFee.MiningFee.Satoshis = miningFeeSatPerKb + newDataFee.MiningFee.Satoshis = miningFeeSatPerKb + + fq.AddQuote(bt.FeeTypeData, &newStdFee) + fq.AddQuote(bt.FeeTypeStandard, &newDataFee) + + broadcaster.feeQuote = fq + } +} + +func NewBroadcaster(logger *slog.Logger, client ArcClient, fromKeySet []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*Broadcaster, error) { + + b := &Broadcaster{ + logger: logger, + client: client, + keySets: fromKeySet, + isTestnet: isTestnetDefault, + batchSize: batchSizeDefault, + maxInputs: maxInputsDefault, + feeQuote: bt.NewFeeQuote(), + utxoClient: utxoClient, + } + + standardFee, err := b.feeQuote.Fee(bt.FeeTypeStandard) + if err != nil { + return nil, err + } + + b.standardMiningFee = standardFee.MiningFee + + for _, opt := range opts { + opt(b) + } + + ctx, cancelAll := context.WithCancel(context.Background()) + b.cancelAll = cancelAll + b.ctx = ctx + + return b, nil +} + +func (b *Broadcaster) calculateFeeSat(tx *bt.Tx) uint64 { + size, err := tx.EstimateSizeWithTypes() + if err != nil { + return 0 + } + varIntUpper := bt.VarInt(tx.OutputCount()).UpperLimitInc() + if varIntUpper == -1 { + return 0 + } + + changeOutputFee := varIntUpper + changeP2pkhByteLen := uint64(8 + 1 + 25) + + totalBytes := size.TotalStdBytes + changeP2pkhByteLen + + miningFeeSat := float64(totalBytes*uint64(b.standardMiningFee.Satoshis)) / float64(b.standardMiningFee.Bytes) + + sFees := uint64(math.Ceil(miningFeeSat)) + txFees := sFees + uint64(changeOutputFee) + + return txFees +} diff --git a/internal/broadcaster/utxo_consolidator.go b/internal/broadcaster/utxo_consolidator.go new file mode 100644 index 000000000..93b7c2923 --- /dev/null +++ b/internal/broadcaster/utxo_consolidator.go @@ -0,0 +1,211 @@ +package broadcaster + +import ( + "container/list" + "context" + "encoding/hex" + "errors" + "fmt" + "log/slog" + "math" + "time" + + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" + "github.com/libsv/go-bt/v2" + "github.com/libsv/go-bt/v2/unlocker" +) + +type UTXOConsolidator struct { + Broadcaster +} + +func NewUTXOConsolidator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*UTXOConsolidator, error) { + + b, err := NewBroadcaster(logger, client, keySets, utxoClient, opts...) + if err != nil { + return nil, err + } + + consolidator := &UTXOConsolidator{ + Broadcaster: *b, + } + + return consolidator, nil +} + +func (b *UTXOConsolidator) Consolidate() error { + for _, ks := range b.keySets { + b.logger.Info("consolidating utxos", slog.String("address", ks.Address(!b.isTestnet))) + _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + if math.Abs(float64(unconfirmed)) > 0 { + return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", ks.Address(!b.isTestnet), unconfirmed) + } + + utxoSet, err := b.utxoClient.GetUTXOsListWithRetries(b.ctx, !b.isTestnet, ks.Script, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return fmt.Errorf("failed to get utxos: %v", err) + } + + if utxoSet.Len() == 1 { + b.logger.Info("utxos already consolidated") + return nil + } + + satoshiMap := map[string]uint64{} + lastUtxoSetLen := 100_000_000 + + for { + if lastUtxoSetLen <= utxoSet.Len() { + b.logger.Error("utxo set length hasn't changed since last iteration") + break + } + lastUtxoSetLen = utxoSet.Len() + + // if requested outputs satisfied, return + if utxoSet.Len() == 1 { + break + } + + b.logger.Info("consolidating outputs", slog.Int("remaining", utxoSet.Len())) + + consolidationTxsBatches, err := b.createConsolidationTxs(utxoSet, satoshiMap, ks) + if err != nil { + return fmt.Errorf("failed to create consolidation txs: %v", err) + } + + for i, batch := range consolidationTxsBatches { + time.Sleep(100 * time.Millisecond) // do not performance test ARC + + nrOutputs := 0 + nrInputs := 0 + for _, txBatch := range batch { + nrOutputs += len(txBatch.Outputs) + nrInputs += len(txBatch.Inputs) + } + + b.logger.Info(fmt.Sprintf("broadcasting consolidation batch %d/%d", i+1, len(consolidationTxsBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) + + resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) + if err != nil { + return fmt.Errorf("failed to broadcast consolidation txs: %v", err) + } + + for _, res := range resp { + if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { + b.logger.Error("consolidation tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) + for _, tx := range batch { + if tx.TxID() == res.Txid { + b.logger.Debug(tx.String()) + break + } + } + continue + } + + txIDBytes, err := hex.DecodeString(res.Txid) + if err != nil { + b.logger.Error("failed to decode txid", slog.String("err", err.Error())) + continue + } + + newUtxo := &bt.UTXO{ + TxID: txIDBytes, + Vout: 0, + LockingScript: ks.Script, + Satoshis: satoshiMap[res.Txid], + } + + delete(satoshiMap, res.Txid) + + utxoSet.PushBack(newUtxo) + } + } + } + } + return nil +} + +func (b *UTXOConsolidator) createConsolidationTxs(utxoSet *list.List, satoshiMap map[string]uint64, fundingKeySet *keyset.KeySet) ([][]*bt.Tx, error) { + tx := bt.NewTx() + txSatoshis := uint64(0) + txsConsolidationBatches := make([][]*bt.Tx, 0) + txsConsolidation := make([]*bt.Tx, 0) + const consolidateBatchSize = 20 + + var next *list.Element + for front := utxoSet.Front(); front != nil; front = next { + next = front.Next() + utxoSet.Remove(front) + utxo, ok := front.Value.(*bt.UTXO) + if !ok { + return nil, errors.New("failed to parse value to utxo") + } + + txSatoshis += utxo.Satoshis + if next == nil { + if len(tx.Inputs) > 0 { + err := tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } + + err = b.consolidateToFundingKeyset(tx, txSatoshis, fundingKeySet) + if err != nil { + return nil, err + } + + txsConsolidation = append(txsConsolidation, tx) + satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() + } + + if len(txsConsolidation) > 0 { + txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) + } + break + } + + err := tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } + + if len(tx.Inputs) >= b.maxInputs { + err = b.consolidateToFundingKeyset(tx, txSatoshis, fundingKeySet) + if err != nil { + return nil, err + } + + txsConsolidation = append(txsConsolidation, tx) + + satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() + tx = bt.NewTx() + txSatoshis = 0 + } + + if len(txsConsolidation) >= consolidateBatchSize { + txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) + txsConsolidation = make([]*bt.Tx, 0) + } + + } + + return txsConsolidationBatches, nil +} + +func (b *UTXOConsolidator) consolidateToFundingKeyset(tx *bt.Tx, txSatoshis uint64, fundingKeySet *keyset.KeySet) error { + fee := b.calculateFeeSat(tx) + err := tx.PayTo(fundingKeySet.Script, txSatoshis-fee) + if err != nil { + return err + } + unlockerGetter := unlocker.Getter{PrivateKey: fundingKeySet.PrivateKey} + err = tx.FillAllInputs(context.Background(), &unlockerGetter) + if err != nil { + return err + } + return nil +} diff --git a/internal/broadcaster/utxo_creator.go b/internal/broadcaster/utxo_creator.go new file mode 100644 index 000000000..18bc2136c --- /dev/null +++ b/internal/broadcaster/utxo_creator.go @@ -0,0 +1,264 @@ +package broadcaster + +import ( + "container/list" + "context" + "encoding/hex" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" + "github.com/libsv/go-bt/v2" + "github.com/libsv/go-bt/v2/unlocker" +) + +type UTXOCreator struct { + Broadcaster +} + +func NewUTXOCreator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*UTXOCreator, error) { + b, err := NewBroadcaster(logger, client, keySets, utxoClient, opts...) + if err != nil { + return nil, err + } + + creator := &UTXOCreator{ + Broadcaster: *b, + } + + return creator, nil +} + +func (b *UTXOCreator) CreateUtxos(requestedOutputs int, requestedSatoshisPerOutput uint64) error { + for _, ks := range b.keySets { + b.logger.Info("creating utxos", slog.String("address", ks.Address(!b.isTestnet))) + + requestedOutputsSatoshis := int64(requestedOutputs) * int64(requestedSatoshisPerOutput) + + confirmed, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + + if unconfirmed > 0 { + return fmt.Errorf("total balance not confirmed yet") + } + + balance := confirmed + unconfirmed + + if requestedOutputsSatoshis > balance { + return fmt.Errorf("requested total of satoshis %d exceeds balance on funding keyset %d", requestedOutputsSatoshis, balance) + } + + utxos, err := b.utxoClient.GetUTXOsWithRetries(b.ctx, !b.isTestnet, ks.Script, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + + utxoSet := list.New() + for _, utxo := range utxos { + // collect right sized utxos + if utxo.Satoshis >= requestedSatoshisPerOutput { + utxoSet.PushBack(utxo) + continue + } + } + + // if requested outputs satisfied, return + if utxoSet.Len() >= requestedOutputs { + b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + return nil + } + + satoshiMap := map[string][]splittingOutput{} + lastUtxoSetLen := 0 + + // if requested outputs not satisfied, create them + for { + if lastUtxoSetLen >= utxoSet.Len() { + b.logger.Error("utxo set length hasn't changed since last iteration") + break + } + lastUtxoSetLen = utxoSet.Len() + + // if requested outputs satisfied, return + if utxoSet.Len() >= requestedOutputs { + break + } + + b.logger.Info("splitting outputs", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + + // create splitting txs + txsSplitBatches, err := b.splitOutputs(requestedOutputs, requestedSatoshisPerOutput, utxoSet, satoshiMap, ks) + if err != nil { + return err + } + + for i, batch := range txsSplitBatches { + nrOutputs := 0 + nrInputs := 0 + for _, txBatch := range batch { + nrOutputs += len(txBatch.Outputs) + nrInputs += len(txBatch.Inputs) + } + + b.logger.Info(fmt.Sprintf("broadcasting splitting batch %d/%d", i+1, len(txsSplitBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) + + resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) + if err != nil { + return fmt.Errorf("failed to braodcast tx: %v", err) + } + + for _, res := range resp { + if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { + b.logger.Error("splitting tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) + for _, tx := range batch { + if tx.TxID() == res.Txid { + b.logger.Debug(tx.String()) + break + } + } + continue + } + + txIDBytes, err := hex.DecodeString(res.Txid) + if err != nil { + b.logger.Error("failed to decode txid", slog.String("err", err.Error())) + continue + } + + foundOutputs, found := satoshiMap[res.Txid] + if !found { + b.logger.Error("output not found", slog.String("hash", res.Txid)) + continue + } + + for _, foundOutput := range foundOutputs { + newUtxo := &bt.UTXO{ + TxID: txIDBytes, + Vout: foundOutput.vout, + LockingScript: ks.Script, + Satoshis: foundOutput.satoshis, + } + + utxoSet.PushBack(newUtxo) + } + delete(satoshiMap, res.Txid) + } + + // do not performance test ARC when creating the utxos + time.Sleep(100 * time.Millisecond) + } + } + + b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + } + return nil +} + +func (b *UTXOCreator) splitOutputs(requestedOutputs int, requestedSatoshisPerOutput uint64, utxoSet *list.List, satoshiMap map[string][]splittingOutput, fundingKeySet *keyset.KeySet) ([][]*bt.Tx, error) { + txsSplitBatches := make([][]*bt.Tx, 0) + txsSplit := make([]*bt.Tx, 0) + outputs := utxoSet.Len() + var err error + + var next *list.Element + for front := utxoSet.Front(); front != nil; front = next { + next = front.Next() + + if outputs >= requestedOutputs { + break + } + + utxo, ok := front.Value.(*bt.UTXO) + if !ok { + return nil, errors.New("failed to parse value to utxo") + } + + tx := bt.NewTx() + err = tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } + + // only split if splitting increases nr of outputs + const feeMargin = 50 + if utxo.Satoshis < 2*requestedSatoshisPerOutput+feeMargin { + continue + } + + addedOutputs, err := b.splitToFundingKeyset(tx, utxo.Satoshis, requestedSatoshisPerOutput, requestedOutputs-outputs, fundingKeySet) + if err != nil { + return nil, err + } + utxoSet.Remove(front) + + outputs += addedOutputs + + txsSplit = append(txsSplit, tx) + + txOutputs := make([]splittingOutput, len(tx.Outputs)) + for i, txOutput := range tx.Outputs { + txOutputs[i] = splittingOutput{satoshis: txOutput.Satoshis, vout: uint32(i)} + } + + satoshiMap[tx.TxID()] = txOutputs + + if len(txsSplit) == b.batchSize { + txsSplitBatches = append(txsSplitBatches, txsSplit) + txsSplit = make([]*bt.Tx, 0) + } + } + + if len(txsSplit) > 0 { + txsSplitBatches = append(txsSplitBatches, txsSplit) + } + return txsSplitBatches, nil +} + +func (b *UTXOCreator) splitToFundingKeyset(tx *bt.Tx, splitSatoshis uint64, requestedSatoshis uint64, requestedOutputs int, fundingKeySet *keyset.KeySet) (addedOutputs int, err error) { + + if requestedSatoshis > splitSatoshis { + return 0, fmt.Errorf("requested satoshis %d greater than satoshis to be split %d", requestedSatoshis, splitSatoshis) + } + + counter := 0 + + remaining := int64(splitSatoshis) + for remaining > int64(requestedSatoshis) && counter < requestedOutputs { + if uint64(remaining)-requestedSatoshis < b.calculateFeeSat(tx) { + break + } + + err := tx.PayTo(fundingKeySet.Script, requestedSatoshis) + if err != nil { + return 0, err + } + + remaining -= int64(requestedSatoshis) + counter++ + + } + + fee := b.calculateFeeSat(tx) + err = tx.PayTo(fundingKeySet.Script, uint64(remaining)-fee) + if err != nil { + return 0, err + } + + unlockerGetter := unlocker.Getter{PrivateKey: fundingKeySet.PrivateKey} + err = tx.FillAllInputs(context.Background(), &unlockerGetter) + if err != nil { + return 0, err + } + + return counter, nil +} + +type splittingOutput struct { + satoshis uint64 + vout uint32 +} From 134863ac0d6dfe7fa2eceb85fabae11d5f70524d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 13:06:28 +0200 Subject: [PATCH 03/14] Refactor rate broadcaster --- .../app/utxos/broadcast/broadcast.go | 79 +- internal/broadcaster/rate_broadcaster.go | 788 +++--------------- 2 files changed, 135 insertions(+), 732 deletions(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 5221944f3..dcb8b6ebc 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -1,21 +1,17 @@ package broadcast import ( - "context" "errors" "fmt" - "io" "log" "os" "os/signal" - "path/filepath" "strings" - "sync" - "time" "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "github.com/bitcoin-sv/arc/internal/broadcaster" "github.com/bitcoin-sv/arc/internal/woc_client" + "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -46,11 +42,6 @@ var Cmd = &cobra.Command{ return err } - store, err := helper.GetBool("store") - if err != nil { - return err - } - fullStatusUpdates, err := helper.GetBool("fullStatusUpdates") if err != nil { return err @@ -116,63 +107,27 @@ var Cmd = &cobra.Command{ return fmt.Errorf("failed to create client: %v", err) } - rbs := make([]*broadcaster.RateBroadcaster, len(keyFiles)) - - wg := &sync.WaitGroup{} - - var resultsPath string - if store { - network := "mainnet" - if isTestnet { - network = "testnet" - } - resultsPath = filepath.Join(".", fmt.Sprintf("results/%s-%s-rate-%d-batchsize-%d", network, time.Now().Format(time.DateTime), rateTxsPerSecond, batchSize)) - err := os.MkdirAll(resultsPath, os.ModePerm) - if err != nil { - return err - } - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + fundingKeySets := make([]*keyset.KeySet, len(keyFiles)) for i, kf := range keyFiles { - wg.Add(1) - fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf) if err != nil { return fmt.Errorf("failed to get key sets: %v", err) } - wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger)) - - var writer io.Writer - if store { - - _, keyFileName := filepath.Split(kf) - - file, err := os.Create(fmt.Sprintf("%s/%s.json", resultsPath, keyFileName)) - if err != nil { - return err - } - - writer = file - - defer file.Close() - } + fundingKeySets[i] = fundingKeySet + } - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize), broadcaster.WithStoreWriter(writer, 50)) - if err != nil { - return fmt.Errorf("failed to create rate broadcaster: %v", err) - } + wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger)) - rbs[i] = rateBroadcaster + rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySets, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize)) + if err != nil { + return fmt.Errorf("failed to create rate broadcaster: %v", err) + } - err = rateBroadcaster.StartRateBroadcaster(ctx, rateTxsPerSecond, limit, wg) - if err != nil { - return fmt.Errorf("failed to start rate broadcaster: %v", err) - } + err = rateBroadcaster.StartRateBroadcaster(rateTxsPerSecond, limit) + if err != nil { + return fmt.Errorf("failed to start rate broadcaster: %v", err) } go func() { @@ -180,11 +135,9 @@ var Cmd = &cobra.Command{ signal.Notify(signalChan, os.Interrupt) // Signal from Ctrl+C <-signalChan - cancel() + rateBroadcaster.Shutdown() }() - wg.Wait() - return nil }, } @@ -209,10 +162,4 @@ func init() { if err != nil { log.Fatal(err) } - - Cmd.Flags().Bool("store", false, "Store results in a json file instead of printing") - err = viper.BindPFlag("store", Cmd.Flags().Lookup("store")) - if err != nil { - log.Fatal(err) - } } diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index ff5c1c25d..a6a205501 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -1,17 +1,15 @@ package broadcaster import ( - "bufio" "container/list" "context" "encoding/hex" - "encoding/json" "errors" "fmt" - "io" "log/slog" "math" "sync" + "sync/atomic" "time" "github.com/bitcoin-sv/arc/pkg/keyset" @@ -21,14 +19,6 @@ import ( "github.com/libsv/go-bt/v2/unlocker" ) -const ( - maxInputsDefault = 100 - batchSizeDefault = 20 - isTestnetDefault = true - millisecondsPerSecond = 1000 - resultsIterationsDefault = 50 -) - type UtxoClient interface { GetUTXOs(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) ([]*bt.UTXO, error) GetUTXOsWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) ([]*bt.UTXO, error) @@ -40,113 +30,37 @@ type UtxoClient interface { } type RateBroadcaster struct { - logger *slog.Logger - client ArcClient - fundingKeyset *keyset.KeySet - isTestnet bool - callbackURL string - callbackToken string - fullStatusUpdates bool - feeQuote *bt.FeeQuote - utxoClient UtxoClient - standardMiningFee bt.FeeUnit - responseWriter io.Writer - responseWriteIterationInterval int - - shutdown chan struct{} - - maxInputs int - batchSize int - + Broadcaster + wg sync.WaitGroup + totalTxs int64 + shutdown chan struct{} mu sync.RWMutex satoshiMap map[string]uint64 - totalTxs int64 -} - -func WithFees(miningFeeSatPerKb int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - var fq = bt.NewFeeQuote() - - newStdFee := *stdFeeDefault - newDataFee := *dataFeeDefault - - newStdFee.MiningFee.Satoshis = miningFeeSatPerKb - newDataFee.MiningFee.Satoshis = miningFeeSatPerKb - - fq.AddQuote(bt.FeeTypeData, &newStdFee) - fq.AddQuote(bt.FeeTypeStandard, &newDataFee) - - broadcaster.feeQuote = fq - } -} - -func WithBatchSize(batchSize int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.batchSize = batchSize - } -} - -func WithMaxInputs(maxInputs int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.maxInputs = maxInputs - } } -func WithIsTestnet(isTestnet bool) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.isTestnet = isTestnet - } -} - -func WithCallback(callbackURL string, callbackToken string) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.callbackURL = callbackURL - broadcaster.callbackToken = callbackToken - } -} - -func WithFullstatusUpdates(fullStatusUpdates bool) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.fullStatusUpdates = fullStatusUpdates - } -} +func NewRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { -func WithStoreWriter(storeWriter io.Writer, resultIterations int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.responseWriter = storeWriter - broadcaster.responseWriteIterationInterval = resultIterations - } -} - -func NewRateBroadcaster(logger *slog.Logger, client ArcClient, fromKeySet *keyset.KeySet, utxoClient UtxoClient, opts ...func(p *RateBroadcaster)) (*RateBroadcaster, error) { - broadcaster := &RateBroadcaster{ - logger: logger, - client: client, - fundingKeyset: fromKeySet, - isTestnet: isTestnetDefault, - feeQuote: bt.NewFeeQuote(), - utxoClient: utxoClient, - batchSize: batchSizeDefault, - maxInputs: maxInputsDefault, - responseWriter: nil, - responseWriteIterationInterval: resultsIterationsDefault, - - shutdown: make(chan struct{}, 10), - satoshiMap: map[string]uint64{}, - } - - for _, opt := range opts { - opt(broadcaster) - } - - standardFee, err := broadcaster.feeQuote.Fee(bt.FeeTypeStandard) + b, err := NewBroadcaster(logger, client, keySets, utxoClient, opts...) if err != nil { return nil, err } - broadcaster.standardMiningFee = standardFee.MiningFee + rb := &RateBroadcaster{ + Broadcaster: *b, + totalTxs: 0, + mu: sync.RWMutex{}, + shutdown: make(chan struct{}, 10), + satoshiMap: map[string]uint64{}, + wg: sync.WaitGroup{}, + } + + go func() { + for range rb.shutdown { + rb.cancelAll() + } + }() - return broadcaster, nil + return rb, nil } func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { @@ -172,587 +86,100 @@ func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { return txFees } -func (b *RateBroadcaster) splitToFundingKeyset(tx *bt.Tx, splitSatoshis uint64, requestedSatoshis uint64, requestedOutputs int) (addedOutputs int, err error) { - - if requestedSatoshis > splitSatoshis { - return 0, fmt.Errorf("requested satoshis %d greater than satoshis to be split %d", requestedSatoshis, splitSatoshis) - } - - counter := 0 - - remaining := int64(splitSatoshis) - for remaining > int64(requestedSatoshis) && counter < requestedOutputs { - if uint64(remaining)-requestedSatoshis < b.calculateFeeSat(tx) { - break - } - - err := tx.PayTo(b.fundingKeyset.Script, requestedSatoshis) - if err != nil { - return 0, err - } - - remaining -= int64(requestedSatoshis) - counter++ - - } - - fee := b.calculateFeeSat(tx) - err = tx.PayTo(b.fundingKeyset.Script, uint64(remaining)-fee) - if err != nil { - return 0, err - } - - unlockerGetter := unlocker.Getter{PrivateKey: b.fundingKeyset.PrivateKey} - err = tx.FillAllInputs(context.Background(), &unlockerGetter) - if err != nil { - return 0, err - } - - return counter, nil -} - -func (b *RateBroadcaster) createConsolidationTxs(utxoSet *list.List, satoshiMap map[string]uint64) ([][]*bt.Tx, error) { - tx := bt.NewTx() - txSatoshis := uint64(0) - txsConsolidationBatches := make([][]*bt.Tx, 0) - txsConsolidation := make([]*bt.Tx, 0) - const consolidateBatchSize = 20 - - var next *list.Element - for front := utxoSet.Front(); front != nil; front = next { - next = front.Next() - utxoSet.Remove(front) - utxo, ok := front.Value.(*bt.UTXO) - if !ok { - return nil, errors.New("failed to parse value to utxo") - } - - txSatoshis += utxo.Satoshis - if next == nil { - if len(tx.Inputs) > 0 { - err := tx.FromUTXOs(utxo) - if err != nil { - return nil, err - } - - err = b.consolidateToFundingKeyset(tx, txSatoshis) - if err != nil { - return nil, err - } - - txsConsolidation = append(txsConsolidation, tx) - satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() - } - - if len(txsConsolidation) > 0 { - txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) - } - break - } +func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64) error { + for _, ks := range b.keySets { - err := tx.FromUTXOs(utxo) + _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, ks.Address(!b.isTestnet), 1*time.Second, 5) if err != nil { - return nil, err - } - - if len(tx.Inputs) >= b.maxInputs { - err = b.consolidateToFundingKeyset(tx, txSatoshis) - if err != nil { - return nil, err - } - - txsConsolidation = append(txsConsolidation, tx) - - satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() - tx = bt.NewTx() - txSatoshis = 0 - } - - if len(txsConsolidation) >= consolidateBatchSize { - txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) - txsConsolidation = make([]*bt.Tx, 0) - } - - } - - return txsConsolidationBatches, nil -} - -func (b *RateBroadcaster) consolidateToFundingKeyset(tx *bt.Tx, txSatoshis uint64) error { - fee := b.calculateFeeSat(tx) - err := tx.PayTo(b.fundingKeyset.Script, txSatoshis-fee) - if err != nil { - return err - } - unlockerGetter := unlocker.Getter{PrivateKey: b.fundingKeyset.PrivateKey} - err = tx.FillAllInputs(context.Background(), &unlockerGetter) - if err != nil { - return err - } - return nil -} - -func (b *RateBroadcaster) Consolidate(ctx context.Context) error { - _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - if math.Abs(float64(unconfirmed)) > 0 { - return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", b.fundingKeyset.Address(!b.isTestnet), unconfirmed) - } - - utxoSet, err := b.utxoClient.GetUTXOsListWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Script, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return fmt.Errorf("failed to get utxos: %v", err) - } - - if utxoSet.Len() == 1 { - b.logger.Info("utxos already consolidated") - return nil - } - - satoshiMap := map[string]uint64{} - lastUtxoSetLen := 100_000_000 - - for { - if lastUtxoSetLen <= utxoSet.Len() { - b.logger.Error("utxo set length hasn't changed since last iteration") - break + return err } - lastUtxoSetLen = utxoSet.Len() - - // if requested outputs satisfied, return - if utxoSet.Len() == 1 { - break + if math.Abs(float64(unconfirmed)) > 0 { + return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", ks.Address(!b.isTestnet), unconfirmed) } - b.logger.Info("consolidating outputs", slog.Int("remaining", utxoSet.Len())) - - consolidationTxsBatches, err := b.createConsolidationTxs(utxoSet, satoshiMap) + utxoSet, err := b.utxoClient.GetUTXOsWithRetries(b.ctx, !b.isTestnet, ks.Script, ks.Address(!b.isTestnet), 1*time.Second, 5) if err != nil { - return fmt.Errorf("failed to create consolidation txs: %v", err) + return fmt.Errorf("failed to get utxos: %v", err) } - for i, batch := range consolidationTxsBatches { - time.Sleep(100 * time.Millisecond) // do not performance test ARC - - nrOutputs := 0 - nrInputs := 0 - for _, txBatch := range batch { - nrOutputs += len(txBatch.Outputs) - nrInputs += len(txBatch.Inputs) - } - - b.logger.Info(fmt.Sprintf("broadcasting consolidation batch %d/%d", i+1, len(consolidationTxsBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) - - resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) - if err != nil { - return fmt.Errorf("failed to broadcast consolidation txs: %v", err) - } - - for _, res := range resp { - if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { - b.logger.Error("consolidation tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) - for _, tx := range batch { - if tx.TxID() == res.Txid { - b.logger.Debug(tx.String()) - break - } - } - continue - } - - txIDBytes, err := hex.DecodeString(res.Txid) - if err != nil { - b.logger.Error("failed to decode txid", slog.String("err", err.Error())) - continue - } - - newUtxo := &bt.UTXO{ - TxID: txIDBytes, - Vout: 0, - LockingScript: b.fundingKeyset.Script, - Satoshis: satoshiMap[res.Txid], - } + b.logger.Info("starting broadcasting", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", b.batchSize), slog.String("address", ks.Address(!b.isTestnet))) - delete(satoshiMap, res.Txid) + submitBatchesPerSecond := float64(rateTxsPerSecond) / float64(b.batchSize) - utxoSet.PushBack(newUtxo) - } + if submitBatchesPerSecond > millisecondsPerSecond { + return fmt.Errorf("submission rate %d [txs/s] and batch size %d [txs] result in submission frequency %.2f greater than 1000 [/s]", rateTxsPerSecond, b.batchSize, submitBatchesPerSecond) } - } - - return nil -} - -type splittingOutput struct { - satoshis uint64 - vout uint32 -} - -func (b *RateBroadcaster) CreateUtxos(ctx context.Context, requestedOutputs int, requestedSatoshisPerOutput uint64) error { - - requestedOutputsSatoshis := int64(requestedOutputs) * int64(requestedSatoshisPerOutput) - - confirmed, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - - if unconfirmed > 0 { - return fmt.Errorf("total balance not confirmed yet") - } - - balance := confirmed + unconfirmed - - if requestedOutputsSatoshis > balance { - return fmt.Errorf("requested total of satoshis %d exceeds balance on funding keyset %d", requestedOutputsSatoshis, balance) - } - utxos, err := b.utxoClient.GetUTXOsWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Script, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - - utxoSet := list.New() - for _, utxo := range utxos { - // collect right sized utxos - if utxo.Satoshis >= requestedSatoshisPerOutput { - utxoSet.PushBack(utxo) - continue + if len(utxoSet) < b.batchSize { + return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) } - } - // if requested outputs satisfied, return - if utxoSet.Len() >= requestedOutputs { - b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) - return nil - } - - satoshiMap := map[string][]splittingOutput{} - lastUtxoSetLen := 0 + utxoCh := make(chan *bt.UTXO, len(utxoSet)) - // if requested outputs not satisfied, create them - for { - if lastUtxoSetLen >= utxoSet.Len() { - b.logger.Error("utxo set length hasn't changed since last iteration") - break + for _, utxo := range utxoSet { + utxoCh <- utxo } - lastUtxoSetLen = utxoSet.Len() - // if requested outputs satisfied, return - if utxoSet.Len() >= requestedOutputs { - break - } + submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond + submitBatchTicker := time.NewTicker(submitBatchInterval) - b.logger.Info("splitting outputs", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + logSummaryTicker := time.NewTicker(3 * time.Second) - // create splitting txs - txsSplitBatches, err := b.splitOutputs(requestedOutputs, requestedSatoshisPerOutput, utxoSet, satoshiMap) - if err != nil { - return err - } + responseCh := make(chan *metamorph_api.TransactionStatus, 100) + errCh := make(chan error, 100) - for i, batch := range txsSplitBatches { - nrOutputs := 0 - nrInputs := 0 - for _, txBatch := range batch { - nrOutputs += len(txBatch.Outputs) - nrInputs += len(txBatch.Inputs) - } + resultsMap := map[metamorph_api.Status]int64{} - b.logger.Info(fmt.Sprintf("broadcasting splitting batch %d/%d", i+1, len(txsSplitBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) + counter := 0 - resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) - if err != nil { - return fmt.Errorf("failed to braodcast tx: %v", err) - } + b.wg.Add(1) + go func() { - for _, res := range resp { - if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { - b.logger.Error("splitting tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) - for _, tx := range batch { - if tx.TxID() == res.Txid { - b.logger.Debug(tx.String()) - break - } - } - continue - } + defer func() { - txIDBytes, err := hex.DecodeString(res.Txid) - if err != nil { - b.logger.Error("failed to decode txid", slog.String("err", err.Error())) - continue - } + b.logger.Info("shutting down broadcaster", slog.String("address", ks.Address(!b.isTestnet))) + b.wg.Done() + }() - foundOutputs, found := satoshiMap[res.Txid] - if !found { - b.logger.Error("output not found", slog.String("hash", res.Txid)) - continue - } + for { + select { + case <-b.shutdown: + return + case <-b.ctx.Done(): + return + case <-submitBatchTicker.C: - for _, foundOutput := range foundOutputs { - newUtxo := &bt.UTXO{ - TxID: txIDBytes, - Vout: foundOutput.vout, - LockingScript: b.fundingKeyset.Script, - Satoshis: foundOutput.satoshis, + txs, err := b.createSelfPayingTxs(utxoCh, ks) + if err != nil { + b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) + b.shutdown <- struct{}{} + continue } - utxoSet.PushBack(newUtxo) - } - delete(satoshiMap, res.Txid) - } - - // do not performance test ARC when creating the utxos - time.Sleep(100 * time.Millisecond) - } - } - - b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) - - return nil -} - -func (b *RateBroadcaster) splitOutputs(requestedOutputs int, requestedSatoshisPerOutput uint64, utxoSet *list.List, satoshiMap map[string][]splittingOutput) ([][]*bt.Tx, error) { - txsSplitBatches := make([][]*bt.Tx, 0) - txsSplit := make([]*bt.Tx, 0) - outputs := utxoSet.Len() - var err error - - var next *list.Element - for front := utxoSet.Front(); front != nil; front = next { - next = front.Next() - - if front == nil || outputs >= requestedOutputs { - break - } - - utxo, ok := front.Value.(*bt.UTXO) - if !ok { - return nil, errors.New("failed to parse value to utxo") - } - - tx := bt.NewTx() - err = tx.FromUTXOs(utxo) - if err != nil { - return nil, err - } - - // only split if splitting increases nr of outputs - const feeMargin = 50 - if utxo.Satoshis < 2*requestedSatoshisPerOutput+feeMargin { - continue - } - - addedOutputs, err := b.splitToFundingKeyset(tx, utxo.Satoshis, requestedSatoshisPerOutput, requestedOutputs-outputs) - if err != nil { - return nil, err - } - utxoSet.Remove(front) - - outputs += addedOutputs - - txsSplit = append(txsSplit, tx) - - txOutputs := make([]splittingOutput, len(tx.Outputs)) - for i, txOutput := range tx.Outputs { - txOutputs[i] = splittingOutput{satoshis: txOutput.Satoshis, vout: uint32(i)} - } - - satoshiMap[tx.TxID()] = txOutputs - - if len(txsSplit) == b.batchSize { - txsSplitBatches = append(txsSplitBatches, txsSplit) - txsSplit = make([]*bt.Tx, 0) - } - } - - if len(txsSplit) > 0 { - txsSplitBatches = append(txsSplitBatches, txsSplit) - } - return txsSplitBatches, nil -} - -func (b *RateBroadcaster) StartRateBroadcaster(ctx context.Context, rateTxsPerSecond int, limit int64, wg *sync.WaitGroup) error { - - _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - if math.Abs(float64(unconfirmed)) > 0 { - return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", b.fundingKeyset.Address(!b.isTestnet), unconfirmed) - } - - utxoSet, err := b.utxoClient.GetUTXOsWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Script, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return fmt.Errorf("failed to get utxos: %v", err) - } - - b.logger.Info("starting broadcasting", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", b.batchSize), slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) - - submitBatchesPerSecond := float64(rateTxsPerSecond) / float64(b.batchSize) + go b.broadcastBatch(txs, responseCh, errCh, utxoCh, false, metamorph_api.Status_STORED, limit, ks) - if submitBatchesPerSecond > millisecondsPerSecond { - return fmt.Errorf("submission rate %d [txs/s] and batch size %d [txs] result in submission frequency %.2f greater than 1000 [/s]", rateTxsPerSecond, b.batchSize, submitBatchesPerSecond) - } - - if len(utxoSet) < b.batchSize { - return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) - } - - utxoCh := make(chan *bt.UTXO, len(utxoSet)) - - for _, utxo := range utxoSet { - utxoCh <- utxo - } - - submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond - submitBatchTicker := time.NewTicker(submitBatchInterval) - - logSummaryTicker := time.NewTicker(3 * time.Second) - - responseCh := make(chan *metamorph_api.TransactionStatus, 100) - errCh := make(chan error, 100) - - var writer *bufio.Writer - if b.responseWriter != nil { - writer = bufio.NewWriter(b.responseWriter) - if err != nil { - return err - } - err = writeJsonArrayStart(writer) - if err != nil { - return err - } - } + case <-logSummaryTicker.C: - resultsMap := map[metamorph_api.Status]int64{} + total := atomic.LoadInt64(&b.totalTxs) + b.logger.Info("summary", + slog.String("address", ks.Address(!b.isTestnet)), + slog.Int("utxo set length", len(utxoCh)), + slog.Int64("total", total), + ) - counter := 0 - go func() { - - defer func() { - if writer != nil { - err = writeJsonArrayFinish(writer) - if err != nil { - b.logger.Error("failed to write json array finish", slog.String("err", err.Error())) - } - - err = writer.Flush() - if err != nil { - b.logger.Error("failed flush writer", slog.String("err", err.Error())) + case responseErr := <-errCh: + b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) + counter++ + case res := <-responseCh: + resultsMap[res.Status]++ } } - - b.logger.Info("shutting down broadcaster", slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) - wg.Done() }() - - for { - select { - case <-b.shutdown: - return - case <-ctx.Done(): - return - case <-submitBatchTicker.C: - - txs, err := b.createSelfPayingTxs(utxoCh) - if err != nil { - b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) - b.shutdown <- struct{}{} - continue - } - - go b.broadcastBatch(ctx, txs, responseCh, errCh, utxoCh, false, metamorph_api.Status_STORED, limit) - - case <-logSummaryTicker.C: - - b.mu.Lock() - totalTxs := b.totalTxs - b.mu.Unlock() - - b.logger.Info("summary", - slog.String("address", b.fundingKeyset.Address(!b.isTestnet)), - slog.Int("utxo set length", len(utxoCh)), - slog.Int64("total", totalTxs), - ) - - case responseErr := <-errCh: - b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) - if writer == nil { - continue - } - - counter++ - err = b.write(&counter, writer, fmt.Sprintf(",%s\n", responseErr.Error())) - if err != nil { - b.logger.Error("failed to write", slog.String("err", err.Error())) - continue - } - case res := <-responseCh: - resultsMap[res.Status]++ - if writer == nil { - continue - } - // if writer is given, store the response in the writer - counter++ - resBytes, err := json.Marshal(res) - if err != nil { - b.logger.Error("failed to marshal response", slog.String("err", err.Error())) - continue - } - - err = b.write(&counter, writer, fmt.Sprintf(",%s\n", string(resBytes))) - if err != nil { - b.logger.Error("failed to write", slog.String("err", err.Error())) - continue - } - } - } - }() - - return nil -} - -func writeJsonArrayStart(writer *bufio.Writer) error { - _, err := fmt.Fprint(writer, "[") - if err != nil { - return fmt.Errorf("failed to print response: %v", err) - } - - return nil -} - -func writeJsonArrayFinish(writer *bufio.Writer) error { - _, err := fmt.Fprint(writer, "]") - if err != nil { - return fmt.Errorf("failed to print response: %v", err) - } - - return nil -} - -func (b *RateBroadcaster) write(counter *int, writer *bufio.Writer, content string) error { - _, err := fmt.Fprint(writer, content) - if err != nil { - return fmt.Errorf("failed to print response: %v", err) } - - if *counter%b.responseWriteIterationInterval == 0 { - err = writer.Flush() - if err != nil { - return fmt.Errorf("failed flush writer: %v", err) - } - - *counter = 0 - } - return nil } -func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO) ([]*bt.Tx, error) { +func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO, ks *keyset.KeySet) ([]*bt.Tx, error) { txs := make([]*bt.Tx, 0, b.batchSize) for utxo := range utxos { @@ -769,12 +196,12 @@ func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO) ([]*bt.Tx, er continue } - err = tx.PayTo(b.fundingKeyset.Script, utxo.Satoshis-fee) + err = tx.PayTo(ks.Script, utxo.Satoshis-fee) if err != nil { return nil, err } - unlockerGetter := unlocker.Getter{PrivateKey: b.fundingKeyset.PrivateKey} + unlockerGetter := unlocker.Getter{PrivateKey: ks.PrivateKey} err = tx.FillAllInputs(context.Background(), &unlockerGetter) if err != nil { return nil, err @@ -793,22 +220,17 @@ func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO) ([]*bt.Tx, er return txs, nil } -func (b *RateBroadcaster) broadcastBatch(ctx context.Context, txs []*bt.Tx, resultCh chan *metamorph_api.TransactionStatus, errCh chan error, utxoCh chan *bt.UTXO, skipFeeValidation bool, waitForStatus metamorph_api.Status, limit int64) { - b.mu.Lock() - if limit > 0 && b.totalTxs >= limit { +func (b *RateBroadcaster) broadcastBatch(txs []*bt.Tx, resultCh chan *metamorph_api.TransactionStatus, errCh chan error, utxoCh chan *bt.UTXO, skipFeeValidation bool, waitForStatus metamorph_api.Status, limit int64, ks *keyset.KeySet) { + if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { return } - b.mu.Unlock() limitReachedNotified := false - ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Minute) - defer cancel() - - resp, err := b.client.BroadcastTransactions(ctxWithTimeout, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, skipFeeValidation) + resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, skipFeeValidation) if err != nil { if errors.Is(err, context.Canceled) { - b.logger.Debug("broadcasting canceled", slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) + b.logger.Debug("broadcasting canceled", slog.String("address", ks.Address(!b.isTestnet))) return } errCh <- err @@ -826,18 +248,52 @@ func (b *RateBroadcaster) broadcastBatch(ctx context.Context, txs []*bt.Tx, resu newUtxo := &bt.UTXO{ TxID: txIDBytes, Vout: 0, - LockingScript: b.fundingKeyset.Script, + LockingScript: ks.Script, Satoshis: b.satoshiMap[res.Txid], } utxoCh <- newUtxo delete(b.satoshiMap, res.Txid) - b.totalTxs++ - if limit > 0 && b.totalTxs >= limit && !limitReachedNotified { - b.logger.Info("limit reached", slog.Int64("total", b.totalTxs), slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) + + atomic.AddInt64(&b.totalTxs, 1) + if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit && !limitReachedNotified { + b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.String("address", ks.Address(!b.isTestnet))) b.shutdown <- struct{}{} limitReachedNotified = true } b.mu.Unlock() } } + +func (b *RateBroadcaster) Shutdown() { + if b.cancelAll != nil { + b.cancelAll() + b.wg.Wait() + } +} + +// +//ch := make(chan bool) +//var count int32 +//go func() { +// var m runtime.MemStats +// var writer = uilive.New() +// writer.Start() +// for { +// +// b.logger.Info("summary", +// slog.String("address", b.fundingKeyset.Address(!b.isTestnet)), +// slog.Int("utxo set length", len(utxoCh)), +// slog.Int64("total", b.totalTxs), +// ) +// +// <-ch +// _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", atomic.LoadInt32(&count)) +// _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", b.totalTxs.Load()) +// runtime.ReadMemStats(&m) +// _, _ = fmt.Fprintf(writer, "Alloc = %v MiB\n", m.Alloc/1024/1024) +// _, _ = fmt.Fprintf(writer, "TotalAlloc = %v MiB\n", m.TotalAlloc/1024/1024) +// _, _ = fmt.Fprintf(writer, "Sys = %v MiB\n", m.Sys/1024/1024) +// _, _ = fmt.Fprintf(writer, "NumGC = %v\n", m.NumGC) +// } +//}() From dda0a994457d23a5d750b94ad17283e459222ea6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 13:06:51 +0200 Subject: [PATCH 04/14] Wait only for status received --- internal/broadcaster/rate_broadcaster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index a6a205501..f0229c642 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -156,7 +156,7 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 continue } - go b.broadcastBatch(txs, responseCh, errCh, utxoCh, false, metamorph_api.Status_STORED, limit, ks) + go b.broadcastBatch(txs, responseCh, errCh, utxoCh, false, metamorph_api.Status_RECEIVED, limit, ks) case <-logSummaryTicker.C: From 2515030e59724f5d92d2d5c3fb3fc8a70efd7184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 13:50:41 +0200 Subject: [PATCH 05/14] Print stats --- go.mod | 1 + go.sum | 2 + internal/broadcaster/rate_broadcaster.go | 125 +++++++++++------------ 3 files changed, 65 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index 2cd14f374..7d65ba845 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/getkin/kin-openapi v0.103.0 github.com/go-testfixtures/testfixtures/v3 v3.9.0 github.com/golang-migrate/migrate/v4 v4.16.2 + github.com/gosuri/uilive v0.0.4 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 github.com/jedib0t/go-pretty/v6 v6.5.4 diff --git a/go.sum b/go.sum index 0759744a0..7dee1e8e6 100644 --- a/go.sum +++ b/go.sum @@ -254,6 +254,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= +github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpSQwGEnkcRpTqNOIR6bJbR0gAorgP9CSALpRcKoAA= github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index f0229c642..04589772c 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -6,8 +6,10 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/gosuri/uilive" "log/slog" "math" + "runtime" "sync" "sync/atomic" "time" @@ -31,11 +33,14 @@ type UtxoClient interface { type RateBroadcaster struct { Broadcaster - wg sync.WaitGroup - totalTxs int64 - shutdown chan struct{} - mu sync.RWMutex - satoshiMap map[string]uint64 + wg sync.WaitGroup + totalTxs int64 + connectionCount int32 + shutdown chan struct{} + connectionCh chan struct{} + utxoCh chan *bt.UTXO + mu sync.RWMutex + satoshiMap map[string]uint64 } func NewRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { @@ -46,17 +51,25 @@ func NewRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset } rb := &RateBroadcaster{ - Broadcaster: *b, - totalTxs: 0, - mu: sync.RWMutex{}, - shutdown: make(chan struct{}, 10), - satoshiMap: map[string]uint64{}, - wg: sync.WaitGroup{}, + Broadcaster: *b, + totalTxs: 0, + mu: sync.RWMutex{}, + shutdown: make(chan struct{}, 10), + connectionCh: make(chan struct{}, 10), + satoshiMap: map[string]uint64{}, + wg: sync.WaitGroup{}, } + rb.startPrintStats() + go func() { - for range rb.shutdown { - rb.cancelAll() + for { + select { + case <-rb.shutdown: + rb.cancelAll() + case <-rb.ctx.Done(): + return + } } }() @@ -114,7 +127,7 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) } - utxoCh := make(chan *bt.UTXO, len(utxoSet)) + b.utxoCh = make(chan *bt.UTXO, len(utxoSet)) for _, utxo := range utxoSet { utxoCh <- utxo @@ -123,8 +136,6 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond submitBatchTicker := time.NewTicker(submitBatchInterval) - logSummaryTicker := time.NewTicker(3 * time.Second) - responseCh := make(chan *metamorph_api.TransactionStatus, 100) errCh := make(chan error, 100) @@ -133,11 +144,9 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 counter := 0 b.wg.Add(1) - go func() { - + go func(keySet *keyset.KeySet) { defer func() { - - b.logger.Info("shutting down broadcaster", slog.String("address", ks.Address(!b.isTestnet))) + b.logger.Info("shutting down broadcaster", slog.String("address", keySet.Address(!b.isTestnet))) b.wg.Done() }() @@ -149,23 +158,14 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 return case <-submitBatchTicker.C: - txs, err := b.createSelfPayingTxs(utxoCh, ks) + txs, err := b.createSelfPayingTxs(b.utxoCh, keySet) if err != nil { b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) b.shutdown <- struct{}{} continue } - go b.broadcastBatch(txs, responseCh, errCh, utxoCh, false, metamorph_api.Status_RECEIVED, limit, ks) - - case <-logSummaryTicker.C: - - total := atomic.LoadInt64(&b.totalTxs) - b.logger.Info("summary", - slog.String("address", ks.Address(!b.isTestnet)), - slog.Int("utxo set length", len(utxoCh)), - slog.Int64("total", total), - ) + go b.broadcastBatch(txs, responseCh, errCh, b.utxoCh, false, metamorph_api.Status_RECEIVED, limit, keySet) case responseErr := <-errCh: b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) @@ -174,7 +174,7 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 resultsMap[res.Status]++ } } - }() + }(ks) } return nil } @@ -221,10 +221,13 @@ func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO, ks *keyset.Ke } func (b *RateBroadcaster) broadcastBatch(txs []*bt.Tx, resultCh chan *metamorph_api.TransactionStatus, errCh chan error, utxoCh chan *bt.UTXO, skipFeeValidation bool, waitForStatus metamorph_api.Status, limit int64, ks *keyset.KeySet) { + if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { return } + b.connectionCh <- struct{}{} + limitReachedNotified := false resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, skipFeeValidation) @@ -252,8 +255,8 @@ func (b *RateBroadcaster) broadcastBatch(txs []*bt.Tx, resultCh chan *metamorph_ Satoshis: b.satoshiMap[res.Txid], } utxoCh <- newUtxo - delete(b.satoshiMap, res.Txid) + b.mu.Unlock() atomic.AddInt64(&b.totalTxs, 1) if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit && !limitReachedNotified { @@ -261,39 +264,35 @@ func (b *RateBroadcaster) broadcastBatch(txs []*bt.Tx, resultCh chan *metamorph_ b.shutdown <- struct{}{} limitReachedNotified = true } - b.mu.Unlock() } } func (b *RateBroadcaster) Shutdown() { - if b.cancelAll != nil { - b.cancelAll() - b.wg.Wait() - } + b.cancelAll() + b.wg.Wait() } -// -//ch := make(chan bool) -//var count int32 -//go func() { -// var m runtime.MemStats -// var writer = uilive.New() -// writer.Start() -// for { -// -// b.logger.Info("summary", -// slog.String("address", b.fundingKeyset.Address(!b.isTestnet)), -// slog.Int("utxo set length", len(utxoCh)), -// slog.Int64("total", b.totalTxs), -// ) -// -// <-ch -// _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", atomic.LoadInt32(&count)) -// _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", b.totalTxs.Load()) -// runtime.ReadMemStats(&m) -// _, _ = fmt.Fprintf(writer, "Alloc = %v MiB\n", m.Alloc/1024/1024) -// _, _ = fmt.Fprintf(writer, "TotalAlloc = %v MiB\n", m.TotalAlloc/1024/1024) -// _, _ = fmt.Fprintf(writer, "Sys = %v MiB\n", m.Sys/1024/1024) -// _, _ = fmt.Fprintf(writer, "NumGC = %v\n", m.NumGC) -// } -//}() +func (b *RateBroadcaster) startPrintStats() { + b.wg.Add(1) + go func() { + defer b.wg.Done() + var m runtime.MemStats + var writer = uilive.New() + writer.Start() + for { + select { + case <-b.connectionCh: + _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", atomic.LoadInt32(&b.connectionCount)) + _, _ = fmt.Fprintf(writer, "Tx count: %d\n", b.totalTxs.Load()) + _, _ = fmt.Fprintf(writer, "UTXO set length: %d\n", len(b.utxoCh)) + runtime.ReadMemStats(&m) + _, _ = fmt.Fprintf(writer, "Alloc = %v MiB\n", m.Alloc/1024/1024) + _, _ = fmt.Fprintf(writer, "TotalAlloc = %v MiB\n", m.TotalAlloc/1024/1024) + _, _ = fmt.Fprintf(writer, "Sys = %v MiB\n", m.Sys/1024/1024) + _, _ = fmt.Fprintf(writer, "NumGC = %v\n", m.NumGC) + case <-b.ctx.Done(): + return + } + } + }() +} From 66e1ada2cbab1b3f003378aef7f5626210376bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 14:14:28 +0200 Subject: [PATCH 06/14] Use sync map instead of mutex --- internal/broadcaster/rate_broadcaster.go | 112 +++++++++++------------ 1 file changed, 52 insertions(+), 60 deletions(-) diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 04589772c..91a6b5df5 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -33,14 +33,13 @@ type UtxoClient interface { type RateBroadcaster struct { Broadcaster - wg sync.WaitGroup totalTxs int64 connectionCount int32 shutdown chan struct{} connectionCh chan struct{} utxoCh chan *bt.UTXO - mu sync.RWMutex - satoshiMap map[string]uint64 + wg sync.WaitGroup + satoshiMap sync.Map } func NewRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { @@ -53,10 +52,8 @@ func NewRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset rb := &RateBroadcaster{ Broadcaster: *b, totalTxs: 0, - mu: sync.RWMutex{}, shutdown: make(chan struct{}, 10), connectionCh: make(chan struct{}, 10), - satoshiMap: map[string]uint64{}, wg: sync.WaitGroup{}, } @@ -130,19 +127,14 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 b.utxoCh = make(chan *bt.UTXO, len(utxoSet)) for _, utxo := range utxoSet { - utxoCh <- utxo + b.utxoCh <- utxo } submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond submitBatchTicker := time.NewTicker(submitBatchInterval) - responseCh := make(chan *metamorph_api.TransactionStatus, 100) errCh := make(chan error, 100) - resultsMap := map[metamorph_api.Status]int64{} - - counter := 0 - b.wg.Add(1) go func(keySet *keyset.KeySet) { defer func() { @@ -158,20 +150,22 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 return case <-submitBatchTicker.C: - txs, err := b.createSelfPayingTxs(b.utxoCh, keySet) + txs, err := b.createSelfPayingTxs(keySet) if err != nil { b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) b.shutdown <- struct{}{} continue } - go b.broadcastBatch(txs, responseCh, errCh, b.utxoCh, false, metamorph_api.Status_RECEIVED, limit, keySet) + if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { + b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.String("address", ks.Address(!b.isTestnet))) + b.shutdown <- struct{}{} + } + + b.broadcastBatchAsync(txs, keySet, errCh, metamorph_api.Status_RECEIVED) case responseErr := <-errCh: b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) - counter++ - case res := <-responseCh: - resultsMap[res.Status]++ } } }(ks) @@ -179,10 +173,10 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 return nil } -func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO, ks *keyset.KeySet) ([]*bt.Tx, error) { +func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, error) { txs := make([]*bt.Tx, 0, b.batchSize) - for utxo := range utxos { + for utxo := range b.utxoCh { tx := bt.NewTx() err := tx.FromUTXOs(utxo) @@ -207,9 +201,8 @@ func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO, ks *keyset.Ke return nil, err } - b.mu.Lock() - b.satoshiMap[tx.TxID()] = tx.Outputs[0].Satoshis - b.mu.Unlock() + b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) + txs = append(txs, tx) if len(txs) >= b.batchSize { @@ -220,51 +213,50 @@ func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO, ks *keyset.Ke return txs, nil } -func (b *RateBroadcaster) broadcastBatch(txs []*bt.Tx, resultCh chan *metamorph_api.TransactionStatus, errCh chan error, utxoCh chan *bt.UTXO, skipFeeValidation bool, waitForStatus metamorph_api.Status, limit int64, ks *keyset.KeySet) { +func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, ks *keyset.KeySet, errCh chan error, waitForStatus metamorph_api.Status) { - if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { - return - } + b.wg.Add(1) + go func() { + defer b.wg.Done() - b.connectionCh <- struct{}{} + resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) + if err != nil { + if errors.Is(err, context.Canceled) { + b.logger.Debug("broadcasting canceled", slog.String("address", ks.Address(!b.isTestnet))) + return + } + errCh <- err + } + atomic.AddInt32(&b.connectionCount, 1) + b.connectionCh <- struct{}{} - limitReachedNotified := false + for _, res := range resp { - resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, skipFeeValidation) - if err != nil { - if errors.Is(err, context.Canceled) { - b.logger.Debug("broadcasting canceled", slog.String("address", ks.Address(!b.isTestnet))) - return - } - errCh <- err - } + txIDBytes, err := hex.DecodeString(res.Txid) + if err != nil { + b.logger.Error("failed to decode txid", slog.String("err", err.Error())) + continue + } - for _, res := range resp { - resultCh <- res + sat, found := b.satoshiMap.Load(res.Txid) + satoshis, isValid := sat.(uint64) - txIDBytes, err := hex.DecodeString(res.Txid) - if err != nil { - b.logger.Error("failed to decode txid", slog.String("err", err.Error())) - continue - } - b.mu.Lock() - newUtxo := &bt.UTXO{ - TxID: txIDBytes, - Vout: 0, - LockingScript: ks.Script, - Satoshis: b.satoshiMap[res.Txid], - } - utxoCh <- newUtxo - delete(b.satoshiMap, res.Txid) - b.mu.Unlock() - - atomic.AddInt64(&b.totalTxs, 1) - if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit && !limitReachedNotified { - b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.String("address", ks.Address(!b.isTestnet))) - b.shutdown <- struct{}{} - limitReachedNotified = true + if found && isValid { + newUtxo := &bt.UTXO{ + TxID: txIDBytes, + Vout: 0, + LockingScript: ks.Script, + Satoshis: satoshis, + } + b.utxoCh <- newUtxo + } + + b.satoshiMap.Delete(res.Txid) + + atomic.AddInt64(&b.totalTxs, 1) } - } + + }() } func (b *RateBroadcaster) Shutdown() { @@ -283,7 +275,7 @@ func (b *RateBroadcaster) startPrintStats() { select { case <-b.connectionCh: _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", atomic.LoadInt32(&b.connectionCount)) - _, _ = fmt.Fprintf(writer, "Tx count: %d\n", b.totalTxs.Load()) + _, _ = fmt.Fprintf(writer, "Tx count: %d\n", atomic.LoadInt64(&b.totalTxs)) _, _ = fmt.Fprintf(writer, "UTXO set length: %d\n", len(b.utxoCh)) runtime.ReadMemStats(&m) _, _ = fmt.Fprintf(writer, "Alloc = %v MiB\n", m.Alloc/1024/1024) From cc53d0742618ef6b713ac50c729dcb1e8084407d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 14:17:30 +0200 Subject: [PATCH 07/14] Move interface --- internal/broadcaster/broadcaster.go | 18 ++++++++++++++++-- internal/broadcaster/rate_broadcaster.go | 14 ++------------ 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/internal/broadcaster/broadcaster.go b/internal/broadcaster/broadcaster.go index 3310dd0da..7e2e35d41 100644 --- a/internal/broadcaster/broadcaster.go +++ b/internal/broadcaster/broadcaster.go @@ -1,11 +1,15 @@ package broadcaster import ( + "container/list" "context" - "github.com/bitcoin-sv/arc/pkg/keyset" - "github.com/libsv/go-bt/v2" "log/slog" "math" + "time" + + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/libsv/go-bt/v2" + "github.com/libsv/go-bt/v2/bscript" ) const ( @@ -15,6 +19,16 @@ const ( millisecondsPerSecond = 1000 ) +type UtxoClient interface { + GetUTXOs(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) ([]*bt.UTXO, error) + GetUTXOsWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) ([]*bt.UTXO, error) + GetUTXOsList(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) (*list.List, error) + GetUTXOsListWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) (*list.List, error) + GetBalance(ctx context.Context, mainnet bool, address string) (int64, int64, error) + GetBalanceWithRetries(ctx context.Context, mainnet bool, address string, constantBackoff time.Duration, retries uint64) (int64, int64, error) + TopUp(ctx context.Context, mainnet bool, address string) error +} + type Broadcaster struct { logger *slog.Logger client ArcClient diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 91a6b5df5..9daa35587 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -1,7 +1,6 @@ package broadcaster import ( - "container/list" "context" "encoding/hex" "errors" @@ -17,20 +16,9 @@ import ( "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" "github.com/libsv/go-bt/v2" - "github.com/libsv/go-bt/v2/bscript" "github.com/libsv/go-bt/v2/unlocker" ) -type UtxoClient interface { - GetUTXOs(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) ([]*bt.UTXO, error) - GetUTXOsWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) ([]*bt.UTXO, error) - GetUTXOsList(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) (*list.List, error) - GetUTXOsListWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) (*list.List, error) - GetBalance(ctx context.Context, mainnet bool, address string) (int64, int64, error) - GetBalanceWithRetries(ctx context.Context, mainnet bool, address string, constantBackoff time.Duration, retries uint64) (int64, int64, error) - TopUp(ctx context.Context, mainnet bool, address string) error -} - type RateBroadcaster struct { Broadcaster totalTxs int64 @@ -195,6 +183,8 @@ func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, erro return nil, err } + // Todo: Add OP_RETURN with text "ARC testing" so that WoC can tag it + unlockerGetter := unlocker.Getter{PrivateKey: ks.PrivateKey} err = tx.FillAllInputs(context.Background(), &unlockerGetter) if err != nil { From 44a5aa77237c0acde300126e8de690b739f15618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 14:39:15 +0200 Subject: [PATCH 08/14] Wait for finish --- cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go | 10 +++++----- internal/broadcaster/rate_broadcaster.go | 7 +++++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index dcb8b6ebc..4f3ec6c55 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -125,11 +125,6 @@ var Cmd = &cobra.Command{ return fmt.Errorf("failed to create rate broadcaster: %v", err) } - err = rateBroadcaster.StartRateBroadcaster(rateTxsPerSecond, limit) - if err != nil { - return fmt.Errorf("failed to start rate broadcaster: %v", err) - } - go func() { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt) // Signal from Ctrl+C @@ -138,6 +133,11 @@ var Cmd = &cobra.Command{ rateBroadcaster.Shutdown() }() + err = rateBroadcaster.StartRateBroadcaster(rateTxsPerSecond, limit) + if err != nil { + return fmt.Errorf("failed to start rate broadcaster: %v", err) + } + return nil }, } diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 9daa35587..b4ccd6b57 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -158,6 +158,9 @@ func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64 } }(ks) } + + b.wg.Wait() + return nil } @@ -193,6 +196,8 @@ func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, erro b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) + fmt.Println(tx.String()) + txs = append(txs, tx) if len(txs) >= b.batchSize { @@ -245,13 +250,11 @@ func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, ks *keyset.KeySet, e atomic.AddInt64(&b.totalTxs, 1) } - }() } func (b *RateBroadcaster) Shutdown() { b.cancelAll() - b.wg.Wait() } func (b *RateBroadcaster) startPrintStats() { From 04d98c70b2280d52a5006704bdbc016f4d6cd9fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 16:17:06 +0200 Subject: [PATCH 09/14] Multi rate broadcaster --- .../app/utxos/broadcast/broadcast.go | 6 +- internal/broadcaster/broadcaster.go | 5 +- .../broadcaster/multi_rate_broadcaster.go | 108 +++++++++ internal/broadcaster/rate_broadcaster.go | 227 +++++++----------- internal/broadcaster/utxo_consolidator.go | 4 +- internal/broadcaster/utxo_creator.go | 4 +- 6 files changed, 211 insertions(+), 143 deletions(-) create mode 100644 internal/broadcaster/multi_rate_broadcaster.go diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 4f3ec6c55..47e92a9d5 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "log" + "log/slog" "os" "os/signal" "strings" @@ -120,7 +121,7 @@ var Cmd = &cobra.Command{ wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger)) - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySets, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize)) + rateBroadcaster, err := broadcaster.NewMultiRateBroadcaster(logger, client, fundingKeySets, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize)) if err != nil { return fmt.Errorf("failed to create rate broadcaster: %v", err) } @@ -133,7 +134,8 @@ var Cmd = &cobra.Command{ rateBroadcaster.Shutdown() }() - err = rateBroadcaster.StartRateBroadcaster(rateTxsPerSecond, limit) + logger.Info("starting broadcasting", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize)) + err = rateBroadcaster.Start(rateTxsPerSecond, limit) if err != nil { return fmt.Errorf("failed to start rate broadcaster: %v", err) } diff --git a/internal/broadcaster/broadcaster.go b/internal/broadcaster/broadcaster.go index 7e2e35d41..fa4be39f5 100644 --- a/internal/broadcaster/broadcaster.go +++ b/internal/broadcaster/broadcaster.go @@ -7,7 +7,6 @@ import ( "math" "time" - "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/libsv/go-bt/v2" "github.com/libsv/go-bt/v2/bscript" ) @@ -32,7 +31,6 @@ type UtxoClient interface { type Broadcaster struct { logger *slog.Logger client ArcClient - keySets []*keyset.KeySet isTestnet bool feeQuote *bt.FeeQuote utxoClient UtxoClient @@ -94,12 +92,11 @@ func WithFees(miningFeeSatPerKb int) func(broadcaster *Broadcaster) { } } -func NewBroadcaster(logger *slog.Logger, client ArcClient, fromKeySet []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*Broadcaster, error) { +func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*Broadcaster, error) { b := &Broadcaster{ logger: logger, client: client, - keySets: fromKeySet, isTestnet: isTestnetDefault, batchSize: batchSizeDefault, maxInputs: maxInputsDefault, diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go new file mode 100644 index 000000000..c867c3f9e --- /dev/null +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -0,0 +1,108 @@ +package broadcaster + +import ( + "context" + "fmt" + "log/slog" + "runtime" + "sync" + "time" + + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/gosuri/uilive" +) + +type MultiRateBroadcaster struct { + rbs []*RateBroadcaster + + cancelAll context.CancelFunc + ctx context.Context + wg sync.WaitGroup +} + +func NewMultiRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*MultiRateBroadcaster, error) { + + b, err := NewBroadcaster(logger, client, utxoClient, opts...) + if err != nil { + return nil, err + } + rbs := make([]*RateBroadcaster, 0, len(keySets)) + for _, key := range keySets { + rb := &RateBroadcaster{ + Broadcaster: *b, + totalTxs: 0, + shutdown: make(chan struct{}, 10), + wg: sync.WaitGroup{}, + ks: key, + } + rbs = append(rbs, rb) + } + + mrb := &MultiRateBroadcaster{ + rbs: rbs, + } + + ctx, cancelAll := context.WithCancel(context.Background()) + mrb.cancelAll = cancelAll + mrb.ctx = ctx + + return mrb, nil +} + +func (mrb *MultiRateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { + mrb.startPrintStats() + + for _, rb := range mrb.rbs { + err := rb.Start(rateTxsPerSecond, limit) + if err != nil { + return err + } + } + + for _, rb := range mrb.rbs { + rb.wg.Wait() + } + + return nil +} + +func (mrb *MultiRateBroadcaster) Shutdown() { + for _, rb := range mrb.rbs { + rb.Shutdown() + } + + mrb.cancelAll() + + mrb.wg.Wait() +} + +func (mrb *MultiRateBroadcaster) startPrintStats() { + mrb.wg.Add(1) + go func() { + defer mrb.wg.Done() + var m runtime.MemStats + var writer = uilive.New() + writer.Start() + for { + select { + case <-time.NewTicker(500 * time.Millisecond).C: + + for _, rb := range mrb.rbs { + totalTxsCount := rb.GetTxCount() + totalConnectionCount := rb.GetConnectionCount() + totalUtxoSetLength := rb.GetUtxoSetLen() + + _, _ = fmt.Fprintf(writer, "Address: %s\tCurrent connections count: %d\tTx count: %d\tUTXO set length: %d\n", rb.ks.Address(!rb.isTestnet), totalConnectionCount, totalTxsCount, totalUtxoSetLength) + } + + runtime.ReadMemStats(&m) + _, _ = fmt.Fprintf(writer, "Alloc:\t\t%v MiB\n", m.Alloc/1024/1024) + _, _ = fmt.Fprintf(writer, "TotalAlloc:\t%v MiB\n", m.TotalAlloc/1024/1024) + _, _ = fmt.Fprintf(writer, "Sys:\t\t%v MiB\n", m.Sys/1024/1024) + _, _ = fmt.Fprintf(writer, "NumGC:\t\t%v\n", m.NumGC) + case <-mrb.ctx.Done(): + return + } + } + }() +} diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index b4ccd6b57..ab633a509 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -5,10 +5,8 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/gosuri/uilive" "log/slog" "math" - "runtime" "sync" "sync/atomic" "time" @@ -22,43 +20,12 @@ import ( type RateBroadcaster struct { Broadcaster totalTxs int64 - connectionCount int32 + connectionCount int64 shutdown chan struct{} - connectionCh chan struct{} utxoCh chan *bt.UTXO wg sync.WaitGroup satoshiMap sync.Map -} - -func NewRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { - - b, err := NewBroadcaster(logger, client, keySets, utxoClient, opts...) - if err != nil { - return nil, err - } - - rb := &RateBroadcaster{ - Broadcaster: *b, - totalTxs: 0, - shutdown: make(chan struct{}, 10), - connectionCh: make(chan struct{}, 10), - wg: sync.WaitGroup{}, - } - - rb.startPrintStats() - - go func() { - for { - select { - case <-rb.shutdown: - rb.cancelAll() - case <-rb.ctx.Done(): - return - } - } - }() - - return rb, nil + ks *keyset.KeySet } func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { @@ -84,89 +51,93 @@ func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { return txFees } -func (b *RateBroadcaster) StartRateBroadcaster(rateTxsPerSecond int, limit int64) error { - for _, ks := range b.keySets { +func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { - _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, ks.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - if math.Abs(float64(unconfirmed)) > 0 { - return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", ks.Address(!b.isTestnet), unconfirmed) + b.wg.Add(1) + go func() { + defer b.wg.Done() + for { + select { + case <-b.shutdown: + b.cancelAll() + case <-b.ctx.Done(): + return + } } + }() - utxoSet, err := b.utxoClient.GetUTXOsWithRetries(b.ctx, !b.isTestnet, ks.Script, ks.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return fmt.Errorf("failed to get utxos: %v", err) - } + _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, b.ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + if math.Abs(float64(unconfirmed)) > 0 { + return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", b.ks.Address(!b.isTestnet), unconfirmed) + } - b.logger.Info("starting broadcasting", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", b.batchSize), slog.String("address", ks.Address(!b.isTestnet))) + utxoSet, err := b.utxoClient.GetUTXOsWithRetries(b.ctx, !b.isTestnet, b.ks.Script, b.ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return fmt.Errorf("failed to get utxos: %v", err) + } - submitBatchesPerSecond := float64(rateTxsPerSecond) / float64(b.batchSize) + submitBatchesPerSecond := float64(rateTxsPerSecond) / float64(b.batchSize) - if submitBatchesPerSecond > millisecondsPerSecond { - return fmt.Errorf("submission rate %d [txs/s] and batch size %d [txs] result in submission frequency %.2f greater than 1000 [/s]", rateTxsPerSecond, b.batchSize, submitBatchesPerSecond) - } + if submitBatchesPerSecond > millisecondsPerSecond { + return fmt.Errorf("submission rate %d [txs/s] and batch size %d [txs] result in submission frequency %.2f greater than 1000 [/s]", rateTxsPerSecond, b.batchSize, submitBatchesPerSecond) + } - if len(utxoSet) < b.batchSize { - return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) - } + if len(utxoSet) < b.batchSize { + return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) + } - b.utxoCh = make(chan *bt.UTXO, len(utxoSet)) + b.utxoCh = make(chan *bt.UTXO, len(utxoSet)) - for _, utxo := range utxoSet { - b.utxoCh <- utxo - } + for _, utxo := range utxoSet { + b.utxoCh <- utxo + } - submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond - submitBatchTicker := time.NewTicker(submitBatchInterval) - - errCh := make(chan error, 100) - - b.wg.Add(1) - go func(keySet *keyset.KeySet) { - defer func() { - b.logger.Info("shutting down broadcaster", slog.String("address", keySet.Address(!b.isTestnet))) - b.wg.Done() - }() - - for { - select { - case <-b.shutdown: - return - case <-b.ctx.Done(): - return - case <-submitBatchTicker.C: - - txs, err := b.createSelfPayingTxs(keySet) - if err != nil { - b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) - b.shutdown <- struct{}{} - continue - } - - if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { - b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.String("address", ks.Address(!b.isTestnet))) - b.shutdown <- struct{}{} - } - - b.broadcastBatchAsync(txs, keySet, errCh, metamorph_api.Status_RECEIVED) - - case responseErr := <-errCh: - b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) + submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond + submitBatchTicker := time.NewTicker(submitBatchInterval) + + errCh := make(chan error, 100) + + b.wg.Add(1) + go func() { + defer func() { + b.logger.Info("shutting down broadcaster", slog.String("address", b.ks.Address(!b.isTestnet))) + b.wg.Done() + }() + + for { + select { + case <-b.ctx.Done(): + return + case <-submitBatchTicker.C: + + txs, err := b.createSelfPayingTxs() + if err != nil { + b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) + b.shutdown <- struct{}{} + continue } - } - }(ks) - } - b.wg.Wait() + if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { + b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.String("address", b.ks.Address(!b.isTestnet))) + b.shutdown <- struct{}{} + } + + b.broadcastBatchAsync(txs, errCh, metamorph_api.Status_RECEIVED) + + case responseErr := <-errCh: + b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) + } + } + }() return nil } -func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, error) { +func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { txs := make([]*bt.Tx, 0, b.batchSize) - for utxo := range b.utxoCh { tx := bt.NewTx() @@ -181,14 +152,14 @@ func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, erro continue } - err = tx.PayTo(ks.Script, utxo.Satoshis-fee) + err = tx.PayTo(b.ks.Script, utxo.Satoshis-fee) if err != nil { return nil, err } // Todo: Add OP_RETURN with text "ARC testing" so that WoC can tag it - unlockerGetter := unlocker.Getter{PrivateKey: ks.PrivateKey} + unlockerGetter := unlocker.Getter{PrivateKey: b.ks.PrivateKey} err = tx.FillAllInputs(context.Background(), &unlockerGetter) if err != nil { return nil, err @@ -196,8 +167,6 @@ func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, erro b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) - fmt.Println(tx.String()) - txs = append(txs, tx) if len(txs) >= b.batchSize { @@ -208,22 +177,22 @@ func (b *RateBroadcaster) createSelfPayingTxs(ks *keyset.KeySet) ([]*bt.Tx, erro return txs, nil } -func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, ks *keyset.KeySet, errCh chan error, waitForStatus metamorph_api.Status) { - +func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, waitForStatus metamorph_api.Status) { b.wg.Add(1) go func() { defer b.wg.Done() + atomic.AddInt64(&b.connectionCount, 1) resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) if err != nil { if errors.Is(err, context.Canceled) { - b.logger.Debug("broadcasting canceled", slog.String("address", ks.Address(!b.isTestnet))) + b.logger.Debug("broadcasting canceled", slog.String("address", b.ks.Address(!b.isTestnet))) return } errCh <- err } - atomic.AddInt32(&b.connectionCount, 1) - b.connectionCh <- struct{}{} + + atomic.AddInt64(&b.connectionCount, -1) for _, res := range resp { @@ -240,7 +209,7 @@ func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, ks *keyset.KeySet, e newUtxo := &bt.UTXO{ TxID: txIDBytes, Vout: 0, - LockingScript: ks.Script, + LockingScript: b.ks.Script, Satoshis: satoshis, } b.utxoCh <- newUtxo @@ -252,32 +221,20 @@ func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, ks *keyset.KeySet, e } }() } - func (b *RateBroadcaster) Shutdown() { b.cancelAll() + + b.wg.Wait() } -func (b *RateBroadcaster) startPrintStats() { - b.wg.Add(1) - go func() { - defer b.wg.Done() - var m runtime.MemStats - var writer = uilive.New() - writer.Start() - for { - select { - case <-b.connectionCh: - _, _ = fmt.Fprintf(writer, "Current connections count: %d\n", atomic.LoadInt32(&b.connectionCount)) - _, _ = fmt.Fprintf(writer, "Tx count: %d\n", atomic.LoadInt64(&b.totalTxs)) - _, _ = fmt.Fprintf(writer, "UTXO set length: %d\n", len(b.utxoCh)) - runtime.ReadMemStats(&m) - _, _ = fmt.Fprintf(writer, "Alloc = %v MiB\n", m.Alloc/1024/1024) - _, _ = fmt.Fprintf(writer, "TotalAlloc = %v MiB\n", m.TotalAlloc/1024/1024) - _, _ = fmt.Fprintf(writer, "Sys = %v MiB\n", m.Sys/1024/1024) - _, _ = fmt.Fprintf(writer, "NumGC = %v\n", m.NumGC) - case <-b.ctx.Done(): - return - } - } - }() +func (b *RateBroadcaster) GetTxCount() int64 { + return atomic.LoadInt64(&b.totalTxs) +} + +func (b *RateBroadcaster) GetConnectionCount() int64 { + return atomic.LoadInt64(&b.connectionCount) +} + +func (b *RateBroadcaster) GetUtxoSetLen() int { + return len(b.utxoCh) } diff --git a/internal/broadcaster/utxo_consolidator.go b/internal/broadcaster/utxo_consolidator.go index 93b7c2923..15e5b70e1 100644 --- a/internal/broadcaster/utxo_consolidator.go +++ b/internal/broadcaster/utxo_consolidator.go @@ -18,17 +18,19 @@ import ( type UTXOConsolidator struct { Broadcaster + keySets []*keyset.KeySet } func NewUTXOConsolidator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*UTXOConsolidator, error) { - b, err := NewBroadcaster(logger, client, keySets, utxoClient, opts...) + b, err := NewBroadcaster(logger, client, utxoClient, opts...) if err != nil { return nil, err } consolidator := &UTXOConsolidator{ Broadcaster: *b, + keySets: keySets, } return consolidator, nil diff --git a/internal/broadcaster/utxo_creator.go b/internal/broadcaster/utxo_creator.go index 18bc2136c..6baab162a 100644 --- a/internal/broadcaster/utxo_creator.go +++ b/internal/broadcaster/utxo_creator.go @@ -17,16 +17,18 @@ import ( type UTXOCreator struct { Broadcaster + keySets []*keyset.KeySet } func NewUTXOCreator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*UTXOCreator, error) { - b, err := NewBroadcaster(logger, client, keySets, utxoClient, opts...) + b, err := NewBroadcaster(logger, client, utxoClient, opts...) if err != nil { return nil, err } creator := &UTXOCreator{ Broadcaster: *b, + keySets: keySets, } return creator, nil From df195785fbe3516abe7739f287244afae6dd7b69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 16:47:11 +0200 Subject: [PATCH 10/14] New broadcaster for each rate broadcaster --- internal/broadcaster/broadcaster.go | 8 ++++---- internal/broadcaster/multi_rate_broadcaster.go | 10 +++++----- internal/broadcaster/utxo_consolidator.go | 2 +- internal/broadcaster/utxo_creator.go | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/broadcaster/broadcaster.go b/internal/broadcaster/broadcaster.go index fa4be39f5..09855e4a9 100644 --- a/internal/broadcaster/broadcaster.go +++ b/internal/broadcaster/broadcaster.go @@ -92,9 +92,9 @@ func WithFees(miningFeeSatPerKb int) func(broadcaster *Broadcaster) { } } -func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*Broadcaster, error) { +func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (Broadcaster, error) { - b := &Broadcaster{ + b := Broadcaster{ logger: logger, client: client, isTestnet: isTestnetDefault, @@ -106,13 +106,13 @@ func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient standardFee, err := b.feeQuote.Fee(bt.FeeTypeStandard) if err != nil { - return nil, err + return Broadcaster{}, err } b.standardMiningFee = standardFee.MiningFee for _, opt := range opts { - opt(b) + opt(&b) } ctx, cancelAll := context.WithCancel(context.Background()) diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index c867c3f9e..409c7b544 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -22,14 +22,14 @@ type MultiRateBroadcaster struct { func NewMultiRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*MultiRateBroadcaster, error) { - b, err := NewBroadcaster(logger, client, utxoClient, opts...) - if err != nil { - return nil, err - } rbs := make([]*RateBroadcaster, 0, len(keySets)) for _, key := range keySets { + b, err := NewBroadcaster(logger, client, utxoClient, opts...) + if err != nil { + return nil, err + } rb := &RateBroadcaster{ - Broadcaster: *b, + Broadcaster: b, totalTxs: 0, shutdown: make(chan struct{}, 10), wg: sync.WaitGroup{}, diff --git a/internal/broadcaster/utxo_consolidator.go b/internal/broadcaster/utxo_consolidator.go index 15e5b70e1..568078c17 100644 --- a/internal/broadcaster/utxo_consolidator.go +++ b/internal/broadcaster/utxo_consolidator.go @@ -29,7 +29,7 @@ func NewUTXOConsolidator(logger *slog.Logger, client ArcClient, keySets []*keyse } consolidator := &UTXOConsolidator{ - Broadcaster: *b, + Broadcaster: b, keySets: keySets, } diff --git a/internal/broadcaster/utxo_creator.go b/internal/broadcaster/utxo_creator.go index 6baab162a..1bdceaf74 100644 --- a/internal/broadcaster/utxo_creator.go +++ b/internal/broadcaster/utxo_creator.go @@ -27,7 +27,7 @@ func NewUTXOCreator(logger *slog.Logger, client ArcClient, keySets []*keyset.Key } creator := &UTXOCreator{ - Broadcaster: *b, + Broadcaster: b, keySets: keySets, } @@ -171,7 +171,7 @@ func (b *UTXOCreator) splitOutputs(requestedOutputs int, requestedSatoshisPerOut for front := utxoSet.Front(); front != nil; front = next { next = front.Next() - if outputs >= requestedOutputs { + if front == nil || outputs >= requestedOutputs { break } From f85d8cc21e2c0789d04a02b73fbf5c922effb954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 21 Jun 2024 16:47:20 +0200 Subject: [PATCH 11/14] Rm debug log --- internal/broadcaster/rate_broadcaster.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index ab633a509..a353decb2 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -186,7 +186,6 @@ func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, wa resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) if err != nil { if errors.Is(err, context.Canceled) { - b.logger.Debug("broadcasting canceled", slog.String("address", b.ks.Address(!b.isTestnet))) return } errCh <- err From 940b8dcb148f763cfb6b666d95ff3e8b351e8184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Mon, 24 Jun 2024 15:39:52 +0200 Subject: [PATCH 12/14] Creation of rate broadcasting txs can be aborted --- .../app/utxos/broadcast/broadcast.go | 13 ++- .../app/utxos/consolidate/consolidate.go | 3 +- .../app/utxos/create/create.go | 3 +- go.mod | 1 - go.sum | 2 - internal/broadcaster/broadcaster.go | 19 ++-- .../broadcaster/multi_rate_broadcaster.go | 77 ++++++++-------- internal/broadcaster/rate_broadcaster.go | 87 ++++++++++++------- internal/broadcaster/utxo_consolidator.go | 4 +- internal/broadcaster/utxo_creator.go | 4 +- 10 files changed, 118 insertions(+), 95 deletions(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 47e92a9d5..07b70f399 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -80,6 +80,9 @@ var Cmd = &cobra.Command{ if err != nil { return err } + + slog.Default().Info("mining fee set", "mining fee satoshis", miningFeeSat) + if miningFeeSat == 0 { return errors.New("no mining fee was given") } @@ -121,7 +124,12 @@ var Cmd = &cobra.Command{ wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger)) - rateBroadcaster, err := broadcaster.NewMultiRateBroadcaster(logger, client, fundingKeySets, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize)) + rateBroadcaster, err := broadcaster.NewMultiKeyRateBroadcaster(logger, client, fundingKeySets, wocClient, isTestnet, + broadcaster.WithFees(miningFeeSat), + broadcaster.WithCallback(callbackURL, callbackToken), + broadcaster.WithFullstatusUpdates(fullStatusUpdates), + broadcaster.WithBatchSize(batchSize), + ) if err != nil { return fmt.Errorf("failed to create rate broadcaster: %v", err) } @@ -131,10 +139,11 @@ var Cmd = &cobra.Command{ signal.Notify(signalChan, os.Interrupt) // Signal from Ctrl+C <-signalChan + logger.Info("Shutting down broadcaster") rateBroadcaster.Shutdown() }() - logger.Info("starting broadcasting", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize)) + logger.Info("Starting broadcaster", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize)) err = rateBroadcaster.Start(rateTxsPerSecond, limit) if err != nil { return fmt.Errorf("failed to start rate broadcaster: %v", err) diff --git a/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go b/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go index 5a35927c2..9be99e9a5 100644 --- a/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go +++ b/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go @@ -95,9 +95,8 @@ var Cmd = &cobra.Command{ fundingKeySets[i] = fundingKeySet } - rateBroadcaster, err := broadcaster.NewUTXOConsolidator(logger, client, fundingKeySets, wocClient, + rateBroadcaster, err := broadcaster.NewUTXOConsolidator(logger, client, fundingKeySets, wocClient, isTestnet, broadcaster.WithFees(miningFeeSat), - broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), ) diff --git a/cmd/broadcaster-cli/app/utxos/create/create.go b/cmd/broadcaster-cli/app/utxos/create/create.go index 6ef6d78dc..17966aa14 100644 --- a/cmd/broadcaster-cli/app/utxos/create/create.go +++ b/cmd/broadcaster-cli/app/utxos/create/create.go @@ -113,9 +113,8 @@ var Cmd = &cobra.Command{ fundingKeySets[i] = fundingKeySet } - rateBroadcaster, err := broadcaster.NewUTXOCreator(logger, client, fundingKeySets, wocClient, + rateBroadcaster, err := broadcaster.NewUTXOCreator(logger, client, fundingKeySets, wocClient, isTestnet, broadcaster.WithFees(miningFeeSat), - broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), ) diff --git a/go.mod b/go.mod index 7d65ba845..2cd14f374 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/getkin/kin-openapi v0.103.0 github.com/go-testfixtures/testfixtures/v3 v3.9.0 github.com/golang-migrate/migrate/v4 v4.16.2 - github.com/gosuri/uilive v0.0.4 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 github.com/jedib0t/go-pretty/v6 v6.5.4 diff --git a/go.sum b/go.sum index 7dee1e8e6..0759744a0 100644 --- a/go.sum +++ b/go.sum @@ -254,8 +254,6 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= -github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpSQwGEnkcRpTqNOIR6bJbR0gAorgP9CSALpRcKoAA= github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= diff --git a/internal/broadcaster/broadcaster.go b/internal/broadcaster/broadcaster.go index 09855e4a9..d4a9dfc07 100644 --- a/internal/broadcaster/broadcaster.go +++ b/internal/broadcaster/broadcaster.go @@ -14,7 +14,6 @@ import ( const ( maxInputsDefault = 100 batchSizeDefault = 20 - isTestnetDefault = true millisecondsPerSecond = 1000 ) @@ -44,12 +43,6 @@ type Broadcaster struct { batchSize int } -func WithIsTestnet(isTestnet bool) func(broadcaster *Broadcaster) { - return func(broadcaster *Broadcaster) { - broadcaster.isTestnet = isTestnet - } -} - func WithBatchSize(batchSize int) func(broadcaster *Broadcaster) { return func(broadcaster *Broadcaster) { broadcaster.batchSize = batchSize @@ -92,18 +85,22 @@ func WithFees(miningFeeSatPerKb int) func(broadcaster *Broadcaster) { } } -func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (Broadcaster, error) { +func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (Broadcaster, error) { b := Broadcaster{ logger: logger, client: client, - isTestnet: isTestnetDefault, + isTestnet: isTestnet, batchSize: batchSizeDefault, maxInputs: maxInputsDefault, feeQuote: bt.NewFeeQuote(), utxoClient: utxoClient, } + for _, opt := range opts { + opt(&b) + } + standardFee, err := b.feeQuote.Fee(bt.FeeTypeStandard) if err != nil { return Broadcaster{}, err @@ -111,10 +108,6 @@ func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient b.standardMiningFee = standardFee.MiningFee - for _, opt := range opts { - opt(&b) - } - ctx, cancelAll := context.WithCancel(context.Background()) b.cancelAll = cancelAll b.ctx = ctx diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index 409c7b544..761ce644e 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -2,44 +2,36 @@ package broadcaster import ( "context" - "fmt" + "github.com/bitcoin-sv/arc/pkg/keyset" "log/slog" "runtime" "sync" "time" - - "github.com/bitcoin-sv/arc/pkg/keyset" - "github.com/gosuri/uilive" ) -type MultiRateBroadcaster struct { - rbs []*RateBroadcaster +type MultiKeyRateBroadcaster struct { + rbs []*RateBroadcaster + logger *slog.Logger cancelAll context.CancelFunc ctx context.Context wg sync.WaitGroup } -func NewMultiRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*MultiRateBroadcaster, error) { - +func NewMultiKeyRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*MultiKeyRateBroadcaster, error) { rbs := make([]*RateBroadcaster, 0, len(keySets)) - for _, key := range keySets { - b, err := NewBroadcaster(logger, client, utxoClient, opts...) + for _, ks := range keySets { + rb, err := NewRateBroadcaster(logger, client, ks, utxoClient, isTestnet, opts...) if err != nil { return nil, err } - rb := &RateBroadcaster{ - Broadcaster: b, - totalTxs: 0, - shutdown: make(chan struct{}, 10), - wg: sync.WaitGroup{}, - ks: key, - } + rbs = append(rbs, rb) } - mrb := &MultiRateBroadcaster{ - rbs: rbs, + mrb := &MultiKeyRateBroadcaster{ + rbs: rbs, + logger: logger, } ctx, cancelAll := context.WithCancel(context.Background()) @@ -49,8 +41,8 @@ func NewMultiRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*k return mrb, nil } -func (mrb *MultiRateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { - mrb.startPrintStats() +func (mrb *MultiKeyRateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { + mrb.logStats() for _, rb := range mrb.rbs { err := rb.Start(rateTxsPerSecond, limit) @@ -66,40 +58,47 @@ func (mrb *MultiRateBroadcaster) Start(rateTxsPerSecond int, limit int64) error return nil } -func (mrb *MultiRateBroadcaster) Shutdown() { +func (mrb *MultiKeyRateBroadcaster) Shutdown() { + mrb.cancelAll() for _, rb := range mrb.rbs { rb.Shutdown() } - mrb.cancelAll() - mrb.wg.Wait() } -func (mrb *MultiRateBroadcaster) startPrintStats() { +func (mrb *MultiKeyRateBroadcaster) logStats() { mrb.wg.Add(1) + bToMb := func(b uint64) uint64 { + return b / 1024 / 1024 + } go func() { defer mrb.wg.Done() - var m runtime.MemStats - var writer = uilive.New() - writer.Start() + var mem runtime.MemStats for { select { - case <-time.NewTicker(500 * time.Millisecond).C: + case <-time.NewTicker(2 * time.Second).C: + totalTxsCount := int64(0) + totalConnectionCount := int64(0) + totalUtxoSetLength := 0 for _, rb := range mrb.rbs { - totalTxsCount := rb.GetTxCount() - totalConnectionCount := rb.GetConnectionCount() - totalUtxoSetLength := rb.GetUtxoSetLen() + totalTxsCount += rb.GetTxCount() + totalConnectionCount += rb.GetConnectionCount() + totalUtxoSetLength += rb.GetUtxoSetLen() - _, _ = fmt.Fprintf(writer, "Address: %s\tCurrent connections count: %d\tTx count: %d\tUTXO set length: %d\n", rb.ks.Address(!rb.isTestnet), totalConnectionCount, totalTxsCount, totalUtxoSetLength) } - - runtime.ReadMemStats(&m) - _, _ = fmt.Fprintf(writer, "Alloc:\t\t%v MiB\n", m.Alloc/1024/1024) - _, _ = fmt.Fprintf(writer, "TotalAlloc:\t%v MiB\n", m.TotalAlloc/1024/1024) - _, _ = fmt.Fprintf(writer, "Sys:\t\t%v MiB\n", m.Sys/1024/1024) - _, _ = fmt.Fprintf(writer, "NumGC:\t\t%v\n", m.NumGC) + mrb.logger.Info("summary", + slog.Int64("txs", totalTxsCount), + slog.Int64("connections", totalConnectionCount), + slog.Int("utxos", totalUtxoSetLength), + ) + mrb.logger.Debug("stats", + slog.Uint64("Alloc [MiB]", bToMb(mem.Alloc)), + slog.Uint64("TotalAlloc [MiB]", bToMb(mem.TotalAlloc)), + slog.Uint64("Sys [MiB]", bToMb(mem.Sys)), + slog.Int64("NumGC [MiB]", int64(mem.NumGC)), + ) case <-mrb.ctx.Done(): return } diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index a353decb2..90f06fdb1 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -28,6 +28,26 @@ type RateBroadcaster struct { ks *keyset.KeySet } +func NewRateBroadcaster(logger *slog.Logger, client ArcClient, ks *keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { + + b, err := NewBroadcaster(logger.With(slog.String("address", ks.Address(isTestnet))), client, utxoClient, isTestnet, opts...) + if err != nil { + return nil, err + } + rb := &RateBroadcaster{ + Broadcaster: b, + shutdown: make(chan struct{}, 1), + utxoCh: nil, + wg: sync.WaitGroup{}, + satoshiMap: sync.Map{}, + ks: ks, + totalTxs: 0, + connectionCount: 0, + } + + return rb, nil +} + func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { size, err := tx.EstimateSizeWithTypes() if err != nil { @@ -89,8 +109,7 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) } - b.utxoCh = make(chan *bt.UTXO, len(utxoSet)) - + b.utxoCh = make(chan *bt.UTXO, 100000) for _, utxo := range utxoSet { b.utxoCh <- utxo } @@ -103,7 +122,7 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { b.wg.Add(1) go func() { defer func() { - b.logger.Info("shutting down broadcaster", slog.String("address", b.ks.Address(!b.isTestnet))) + b.logger.Info("shutting down broadcaster") b.wg.Done() }() @@ -121,7 +140,7 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { } if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { - b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.String("address", b.ks.Address(!b.isTestnet))) + b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.Int64("limit", limit)) b.shutdown <- struct{}{} } @@ -138,43 +157,51 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { txs := make([]*bt.Tx, 0, b.batchSize) - for utxo := range b.utxoCh { - tx := bt.NewTx() - err := tx.FromUTXOs(utxo) - if err != nil { - return nil, err - } + for { + select { + case <-b.ctx.Done(): + return txs, nil + case <-time.NewTimer(1 * time.Second).C: + return txs, nil + case utxo := <-b.utxoCh: + tx := bt.NewTx() - fee := b.calculateFeeSat(tx) + err := tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } - if utxo.Satoshis <= fee { - continue - } + fee := b.calculateFeeSat(tx) - err = tx.PayTo(b.ks.Script, utxo.Satoshis-fee) - if err != nil { - return nil, err - } + if utxo.Satoshis <= fee { + continue + } - // Todo: Add OP_RETURN with text "ARC testing" so that WoC can tag it + err = tx.PayTo(b.ks.Script, utxo.Satoshis-fee) + if err != nil { + return nil, err + } - unlockerGetter := unlocker.Getter{PrivateKey: b.ks.PrivateKey} - err = tx.FillAllInputs(context.Background(), &unlockerGetter) - if err != nil { - return nil, err - } + // Todo: Add OP_RETURN with text "ARC testing" so that WoC can tag it - b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) + unlockerGetter := unlocker.Getter{PrivateKey: b.ks.PrivateKey} + err = tx.FillAllInputs(context.Background(), &unlockerGetter) + if err != nil { + return nil, err + } + + b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) - txs = append(txs, tx) + txs = append(txs, tx) - if len(txs) >= b.batchSize { - break + if len(txs) >= b.batchSize { + return txs, nil + } } - } - return txs, nil + return txs, nil + } } func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, waitForStatus metamorph_api.Status) { diff --git a/internal/broadcaster/utxo_consolidator.go b/internal/broadcaster/utxo_consolidator.go index 568078c17..ab31ea97a 100644 --- a/internal/broadcaster/utxo_consolidator.go +++ b/internal/broadcaster/utxo_consolidator.go @@ -21,9 +21,9 @@ type UTXOConsolidator struct { keySets []*keyset.KeySet } -func NewUTXOConsolidator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*UTXOConsolidator, error) { +func NewUTXOConsolidator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*UTXOConsolidator, error) { - b, err := NewBroadcaster(logger, client, utxoClient, opts...) + b, err := NewBroadcaster(logger, client, utxoClient, isTestnet, opts...) if err != nil { return nil, err } diff --git a/internal/broadcaster/utxo_creator.go b/internal/broadcaster/utxo_creator.go index 1bdceaf74..c27e35235 100644 --- a/internal/broadcaster/utxo_creator.go +++ b/internal/broadcaster/utxo_creator.go @@ -20,8 +20,8 @@ type UTXOCreator struct { keySets []*keyset.KeySet } -func NewUTXOCreator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, opts ...func(p *Broadcaster)) (*UTXOCreator, error) { - b, err := NewBroadcaster(logger, client, utxoClient, opts...) +func NewUTXOCreator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*UTXOCreator, error) { + b, err := NewBroadcaster(logger, client, utxoClient, isTestnet, opts...) if err != nil { return nil, err } From 8282f913833ef14a13ff699b3f25fa2c5d3ed433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Mon, 24 Jun 2024 16:53:44 +0200 Subject: [PATCH 13/14] Separate memory stats and broadcaster stats --- internal/broadcaster/multi_rate_broadcaster.go | 11 ++++++++--- internal/broadcaster/rate_broadcaster.go | 17 ++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index 761ce644e..0020a67ac 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -72,12 +72,16 @@ func (mrb *MultiKeyRateBroadcaster) logStats() { bToMb := func(b uint64) uint64 { return b / 1024 / 1024 } + + logStatsTicker := time.NewTicker(2 * time.Second) + logMemStatsTicker := time.NewTicker(10 * time.Second) + go func() { defer mrb.wg.Done() var mem runtime.MemStats for { select { - case <-time.NewTicker(2 * time.Second).C: + case <-logStatsTicker.C: totalTxsCount := int64(0) totalConnectionCount := int64(0) totalUtxoSetLength := 0 @@ -88,12 +92,13 @@ func (mrb *MultiKeyRateBroadcaster) logStats() { totalUtxoSetLength += rb.GetUtxoSetLen() } - mrb.logger.Info("summary", + mrb.logger.Info("stats", slog.Int64("txs", totalTxsCount), slog.Int64("connections", totalConnectionCount), slog.Int("utxos", totalUtxoSetLength), ) - mrb.logger.Debug("stats", + case <-logMemStatsTicker.C: + mrb.logger.Info("memory", slog.Uint64("Alloc [MiB]", bToMb(mem.Alloc)), slog.Uint64("TotalAlloc [MiB]", bToMb(mem.TotalAlloc)), slog.Uint64("Sys [MiB]", bToMb(mem.Sys)), diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 90f06fdb1..6eec44d5c 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -30,7 +30,7 @@ type RateBroadcaster struct { func NewRateBroadcaster(logger *slog.Logger, client ArcClient, ks *keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { - b, err := NewBroadcaster(logger.With(slog.String("address", ks.Address(isTestnet))), client, utxoClient, isTestnet, opts...) + b, err := NewBroadcaster(logger.With(slog.String("address", ks.Address(!isTestnet))), client, utxoClient, isTestnet, opts...) if err != nil { return nil, err } @@ -117,6 +117,8 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond submitBatchTicker := time.NewTicker(submitBatchInterval) + b.logger.Info("batch interval", slog.Duration("interval", submitBatchInterval)) + errCh := make(chan error, 100) b.wg.Add(1) @@ -158,12 +160,13 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { txs := make([]*bt.Tx, 0, b.batchSize) +utxoLoop: for { select { case <-b.ctx.Done(): return txs, nil - case <-time.NewTimer(1 * time.Second).C: - return txs, nil + //case <-time.NewTimer(1 * time.Second).C: + // return txs, nil case utxo := <-b.utxoCh: tx := bt.NewTx() @@ -175,6 +178,7 @@ func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { fee := b.calculateFeeSat(tx) if utxo.Satoshis <= fee { + b.logger.Debug("fees too low", slog.Uint64("fee", fee), slog.Uint64("utxo sat", utxo.Satoshis)) continue } @@ -196,19 +200,18 @@ func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { txs = append(txs, tx) if len(txs) >= b.batchSize { - return txs, nil + break utxoLoop } } - - return txs, nil } + + return txs, nil } func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, waitForStatus metamorph_api.Status) { b.wg.Add(1) go func() { defer b.wg.Done() - atomic.AddInt64(&b.connectionCount, 1) resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) if err != nil { From 4b3ac4527c2e909c0d2be18e160d2d8c0e40f8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Tue, 25 Jun 2024 09:43:22 +0200 Subject: [PATCH 14/14] Rm unused logs --- cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go | 2 -- internal/broadcaster/rate_broadcaster.go | 3 --- 2 files changed, 5 deletions(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 07b70f399..607f5fb76 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -81,8 +81,6 @@ var Cmd = &cobra.Command{ return err } - slog.Default().Info("mining fee set", "mining fee satoshis", miningFeeSat) - if miningFeeSat == 0 { return errors.New("no mining fee was given") } diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 6eec44d5c..f982f7d3c 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -117,8 +117,6 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond submitBatchTicker := time.NewTicker(submitBatchInterval) - b.logger.Info("batch interval", slog.Duration("interval", submitBatchInterval)) - errCh := make(chan error, 100) b.wg.Add(1) @@ -178,7 +176,6 @@ utxoLoop: fee := b.calculateFeeSat(tx) if utxo.Satoshis <= fee { - b.logger.Debug("fees too low", slog.Uint64("fee", fee), slog.Uint64("utxo sat", utxo.Satoshis)) continue }