diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 5221944f3..607f5fb76 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -1,21 +1,18 @@ package broadcast import ( - "context" "errors" "fmt" - "io" "log" + "log/slog" "os" "os/signal" - "path/filepath" "strings" - "sync" - "time" "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "github.com/bitcoin-sv/arc/internal/broadcaster" "github.com/bitcoin-sv/arc/internal/woc_client" + "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -46,11 +43,6 @@ var Cmd = &cobra.Command{ return err } - store, err := helper.GetBool("store") - if err != nil { - return err - } - fullStatusUpdates, err := helper.GetBool("fullStatusUpdates") if err != nil { return err @@ -88,6 +80,7 @@ var Cmd = &cobra.Command{ if err != nil { return err } + if miningFeeSat == 0 { return errors.New("no mining fee was given") } @@ -116,63 +109,27 @@ var Cmd = &cobra.Command{ return fmt.Errorf("failed to create client: %v", err) } - rbs := make([]*broadcaster.RateBroadcaster, len(keyFiles)) - - wg := &sync.WaitGroup{} - - var resultsPath string - if store { - network := "mainnet" - if isTestnet { - network = "testnet" - } - resultsPath = filepath.Join(".", fmt.Sprintf("results/%s-%s-rate-%d-batchsize-%d", network, time.Now().Format(time.DateTime), rateTxsPerSecond, batchSize)) - err := os.MkdirAll(resultsPath, os.ModePerm) - if err != nil { - return err - } - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + fundingKeySets := make([]*keyset.KeySet, len(keyFiles)) for i, kf := range keyFiles { - wg.Add(1) - fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf) if err != nil { return fmt.Errorf("failed to get key sets: %v", err) } - wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger)) - - var writer io.Writer - if store { - - _, keyFileName := filepath.Split(kf) - - file, err := os.Create(fmt.Sprintf("%s/%s.json", resultsPath, keyFileName)) - if err != nil { - return err - } - - writer = file - - defer file.Close() - } - - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize), broadcaster.WithStoreWriter(writer, 50)) - if err != nil { - return fmt.Errorf("failed to create rate broadcaster: %v", err) - } + fundingKeySets[i] = fundingKeySet + } - rbs[i] = rateBroadcaster + wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger)) - err = rateBroadcaster.StartRateBroadcaster(ctx, rateTxsPerSecond, limit, wg) - if err != nil { - return fmt.Errorf("failed to start rate broadcaster: %v", err) - } + rateBroadcaster, err := broadcaster.NewMultiKeyRateBroadcaster(logger, client, fundingKeySets, wocClient, isTestnet, + broadcaster.WithFees(miningFeeSat), + broadcaster.WithCallback(callbackURL, callbackToken), + broadcaster.WithFullstatusUpdates(fullStatusUpdates), + broadcaster.WithBatchSize(batchSize), + ) + if err != nil { + return fmt.Errorf("failed to create rate broadcaster: %v", err) } go func() { @@ -180,10 +137,15 @@ var Cmd = &cobra.Command{ signal.Notify(signalChan, os.Interrupt) // Signal from Ctrl+C <-signalChan - cancel() + logger.Info("Shutting down broadcaster") + rateBroadcaster.Shutdown() }() - wg.Wait() + logger.Info("Starting broadcaster", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize)) + err = rateBroadcaster.Start(rateTxsPerSecond, limit) + if err != nil { + return fmt.Errorf("failed to start rate broadcaster: %v", err) + } return nil }, @@ -209,10 +171,4 @@ func init() { if err != nil { log.Fatal(err) } - - Cmd.Flags().Bool("store", false, "Store results in a json file instead of printing") - err = viper.BindPFlag("store", Cmd.Flags().Lookup("store")) - if err != nil { - log.Fatal(err) - } } diff --git a/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go b/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go index 996dee61f..9be99e9a5 100644 --- a/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go +++ b/cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go @@ -1,17 +1,15 @@ package consolidate import ( - "context" "errors" "fmt" - "log/slog" - "strings" - "time" "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "github.com/bitcoin-sv/arc/internal/broadcaster" "github.com/bitcoin-sv/arc/internal/woc_client" + "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/spf13/cobra" + "strings" ) var Cmd = &cobra.Command{ @@ -85,32 +83,30 @@ var Cmd = &cobra.Command{ keyFiles := strings.Split(keyFile, ",") - for _, kf := range keyFiles { + fundingKeySets := make([]*keyset.KeySet, len(keyFiles)) - if wocApiKey == "" { - time.Sleep(1 * time.Second) - } + for i, kf := range keyFiles { fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf) if err != nil { return fmt.Errorf("failed to get key sets: %v", err) } - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, - broadcaster.WithFees(miningFeeSat), - broadcaster.WithIsTestnet(isTestnet), - broadcaster.WithCallback(callbackURL, callbackToken), - broadcaster.WithFullstatusUpdates(fullStatusUpdates), - ) - if err != nil { - return fmt.Errorf("failed to create broadcaster: %v", err) - } + fundingKeySets[i] = fundingKeySet + } - logger.Info("consolidating utxos", slog.String("key", kf), slog.String("address", fundingKeySet.Address(!isTestnet))) - err = rateBroadcaster.Consolidate(context.Background()) - if err != nil { - return fmt.Errorf("failed to consolidate utxos: %v", err) - } + rateBroadcaster, err := broadcaster.NewUTXOConsolidator(logger, client, fundingKeySets, wocClient, isTestnet, + broadcaster.WithFees(miningFeeSat), + broadcaster.WithCallback(callbackURL, callbackToken), + broadcaster.WithFullstatusUpdates(fullStatusUpdates), + ) + if err != nil { + return fmt.Errorf("failed to create broadcaster: %v", err) + } + + err = rateBroadcaster.Consolidate() + if err != nil { + return fmt.Errorf("failed to consolidate utxos: %v", err) } return nil diff --git a/cmd/broadcaster-cli/app/utxos/create/create.go b/cmd/broadcaster-cli/app/utxos/create/create.go index 70ebe490a..17966aa14 100644 --- a/cmd/broadcaster-cli/app/utxos/create/create.go +++ b/cmd/broadcaster-cli/app/utxos/create/create.go @@ -1,17 +1,15 @@ package create import ( - "context" "errors" "fmt" "log" - "log/slog" "strings" - "time" "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "github.com/bitcoin-sv/arc/internal/broadcaster" "github.com/bitcoin-sv/arc/internal/woc_client" + "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -103,32 +101,30 @@ var Cmd = &cobra.Command{ keyFiles := strings.Split(keyFile, ",") - for _, kf := range keyFiles { + fundingKeySets := make([]*keyset.KeySet, len(keyFiles)) - if wocApiKey == "" { - time.Sleep(1 * time.Second) - } + for i, kf := range keyFiles { fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf) if err != nil { return fmt.Errorf("failed to get key sets: %v", err) } - rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, - broadcaster.WithFees(miningFeeSat), - broadcaster.WithIsTestnet(isTestnet), - broadcaster.WithCallback(callbackURL, callbackToken), - broadcaster.WithFullstatusUpdates(fullStatusUpdates), - ) - if err != nil { - return fmt.Errorf("failed to create broadcaster: %v", err) - } + fundingKeySets[i] = fundingKeySet + } - logger.Info("creating utxos", slog.String("key", kf), slog.String("address", fundingKeySet.Address(!isTestnet))) - err = rateBroadcaster.CreateUtxos(context.Background(), outputs, satoshisPerOutput) - if err != nil { - return fmt.Errorf("failed to create utxos: %v", err) - } + rateBroadcaster, err := broadcaster.NewUTXOCreator(logger, client, fundingKeySets, wocClient, isTestnet, + broadcaster.WithFees(miningFeeSat), + broadcaster.WithCallback(callbackURL, callbackToken), + broadcaster.WithFullstatusUpdates(fullStatusUpdates), + ) + if err != nil { + return fmt.Errorf("failed to create broadcaster: %v", err) + } + + err = rateBroadcaster.CreateUtxos(outputs, satoshisPerOutput) + if err != nil { + return fmt.Errorf("failed to create utxos: %v", err) } return nil diff --git a/internal/broadcaster/broadcaster.go b/internal/broadcaster/broadcaster.go new file mode 100644 index 000000000..d4a9dfc07 --- /dev/null +++ b/internal/broadcaster/broadcaster.go @@ -0,0 +1,139 @@ +package broadcaster + +import ( + "container/list" + "context" + "log/slog" + "math" + "time" + + "github.com/libsv/go-bt/v2" + "github.com/libsv/go-bt/v2/bscript" +) + +const ( + maxInputsDefault = 100 + batchSizeDefault = 20 + millisecondsPerSecond = 1000 +) + +type UtxoClient interface { + GetUTXOs(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) ([]*bt.UTXO, error) + GetUTXOsWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) ([]*bt.UTXO, error) + GetUTXOsList(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) (*list.List, error) + GetUTXOsListWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) (*list.List, error) + GetBalance(ctx context.Context, mainnet bool, address string) (int64, int64, error) + GetBalanceWithRetries(ctx context.Context, mainnet bool, address string, constantBackoff time.Duration, retries uint64) (int64, int64, error) + TopUp(ctx context.Context, mainnet bool, address string) error +} + +type Broadcaster struct { + logger *slog.Logger + client ArcClient + isTestnet bool + feeQuote *bt.FeeQuote + utxoClient UtxoClient + standardMiningFee bt.FeeUnit + callbackURL string + callbackToken string + fullStatusUpdates bool + cancelAll context.CancelFunc + ctx context.Context + maxInputs int + batchSize int +} + +func WithBatchSize(batchSize int) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.batchSize = batchSize + } +} + +func WithMaxInputs(maxInputs int) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.maxInputs = maxInputs + } +} + +func WithCallback(callbackURL string, callbackToken string) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.callbackURL = callbackURL + broadcaster.callbackToken = callbackToken + } +} + +func WithFullstatusUpdates(fullStatusUpdates bool) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + broadcaster.fullStatusUpdates = fullStatusUpdates + } +} + +func WithFees(miningFeeSatPerKb int) func(broadcaster *Broadcaster) { + return func(broadcaster *Broadcaster) { + var fq = bt.NewFeeQuote() + + newStdFee := *stdFeeDefault + newDataFee := *dataFeeDefault + + newStdFee.MiningFee.Satoshis = miningFeeSatPerKb + newDataFee.MiningFee.Satoshis = miningFeeSatPerKb + + fq.AddQuote(bt.FeeTypeData, &newStdFee) + fq.AddQuote(bt.FeeTypeStandard, &newDataFee) + + broadcaster.feeQuote = fq + } +} + +func NewBroadcaster(logger *slog.Logger, client ArcClient, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (Broadcaster, error) { + + b := Broadcaster{ + logger: logger, + client: client, + isTestnet: isTestnet, + batchSize: batchSizeDefault, + maxInputs: maxInputsDefault, + feeQuote: bt.NewFeeQuote(), + utxoClient: utxoClient, + } + + for _, opt := range opts { + opt(&b) + } + + standardFee, err := b.feeQuote.Fee(bt.FeeTypeStandard) + if err != nil { + return Broadcaster{}, err + } + + b.standardMiningFee = standardFee.MiningFee + + ctx, cancelAll := context.WithCancel(context.Background()) + b.cancelAll = cancelAll + b.ctx = ctx + + return b, nil +} + +func (b *Broadcaster) calculateFeeSat(tx *bt.Tx) uint64 { + size, err := tx.EstimateSizeWithTypes() + if err != nil { + return 0 + } + varIntUpper := bt.VarInt(tx.OutputCount()).UpperLimitInc() + if varIntUpper == -1 { + return 0 + } + + changeOutputFee := varIntUpper + changeP2pkhByteLen := uint64(8 + 1 + 25) + + totalBytes := size.TotalStdBytes + changeP2pkhByteLen + + miningFeeSat := float64(totalBytes*uint64(b.standardMiningFee.Satoshis)) / float64(b.standardMiningFee.Bytes) + + sFees := uint64(math.Ceil(miningFeeSat)) + txFees := sFees + uint64(changeOutputFee) + + return txFees +} diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go new file mode 100644 index 000000000..0020a67ac --- /dev/null +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -0,0 +1,112 @@ +package broadcaster + +import ( + "context" + "github.com/bitcoin-sv/arc/pkg/keyset" + "log/slog" + "runtime" + "sync" + "time" +) + +type MultiKeyRateBroadcaster struct { + rbs []*RateBroadcaster + logger *slog.Logger + + cancelAll context.CancelFunc + ctx context.Context + wg sync.WaitGroup +} + +func NewMultiKeyRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*MultiKeyRateBroadcaster, error) { + rbs := make([]*RateBroadcaster, 0, len(keySets)) + for _, ks := range keySets { + rb, err := NewRateBroadcaster(logger, client, ks, utxoClient, isTestnet, opts...) + if err != nil { + return nil, err + } + + rbs = append(rbs, rb) + } + + mrb := &MultiKeyRateBroadcaster{ + rbs: rbs, + logger: logger, + } + + ctx, cancelAll := context.WithCancel(context.Background()) + mrb.cancelAll = cancelAll + mrb.ctx = ctx + + return mrb, nil +} + +func (mrb *MultiKeyRateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { + mrb.logStats() + + for _, rb := range mrb.rbs { + err := rb.Start(rateTxsPerSecond, limit) + if err != nil { + return err + } + } + + for _, rb := range mrb.rbs { + rb.wg.Wait() + } + + return nil +} + +func (mrb *MultiKeyRateBroadcaster) Shutdown() { + mrb.cancelAll() + for _, rb := range mrb.rbs { + rb.Shutdown() + } + + mrb.wg.Wait() +} + +func (mrb *MultiKeyRateBroadcaster) logStats() { + mrb.wg.Add(1) + bToMb := func(b uint64) uint64 { + return b / 1024 / 1024 + } + + logStatsTicker := time.NewTicker(2 * time.Second) + logMemStatsTicker := time.NewTicker(10 * time.Second) + + go func() { + defer mrb.wg.Done() + var mem runtime.MemStats + for { + select { + case <-logStatsTicker.C: + totalTxsCount := int64(0) + totalConnectionCount := int64(0) + totalUtxoSetLength := 0 + + for _, rb := range mrb.rbs { + totalTxsCount += rb.GetTxCount() + totalConnectionCount += rb.GetConnectionCount() + totalUtxoSetLength += rb.GetUtxoSetLen() + + } + mrb.logger.Info("stats", + slog.Int64("txs", totalTxsCount), + slog.Int64("connections", totalConnectionCount), + slog.Int("utxos", totalUtxoSetLength), + ) + case <-logMemStatsTicker.C: + mrb.logger.Info("memory", + slog.Uint64("Alloc [MiB]", bToMb(mem.Alloc)), + slog.Uint64("TotalAlloc [MiB]", bToMb(mem.TotalAlloc)), + slog.Uint64("Sys [MiB]", bToMb(mem.Sys)), + slog.Int64("NumGC [MiB]", int64(mem.NumGC)), + ) + case <-mrb.ctx.Done(): + return + } + } + }() +} diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index ff5c1c25d..f982f7d3c 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -1,152 +1,51 @@ package broadcaster import ( - "bufio" - "container/list" "context" "encoding/hex" - "encoding/json" "errors" "fmt" - "io" "log/slog" "math" "sync" + "sync/atomic" "time" "github.com/bitcoin-sv/arc/pkg/keyset" "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" "github.com/libsv/go-bt/v2" - "github.com/libsv/go-bt/v2/bscript" "github.com/libsv/go-bt/v2/unlocker" ) -const ( - maxInputsDefault = 100 - batchSizeDefault = 20 - isTestnetDefault = true - millisecondsPerSecond = 1000 - resultsIterationsDefault = 50 -) - -type UtxoClient interface { - GetUTXOs(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) ([]*bt.UTXO, error) - GetUTXOsWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) ([]*bt.UTXO, error) - GetUTXOsList(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string) (*list.List, error) - GetUTXOsListWithRetries(ctx context.Context, mainnet bool, lockingScript *bscript.Script, address string, constantBackoff time.Duration, retries uint64) (*list.List, error) - GetBalance(ctx context.Context, mainnet bool, address string) (int64, int64, error) - GetBalanceWithRetries(ctx context.Context, mainnet bool, address string, constantBackoff time.Duration, retries uint64) (int64, int64, error) - TopUp(ctx context.Context, mainnet bool, address string) error -} - type RateBroadcaster struct { - logger *slog.Logger - client ArcClient - fundingKeyset *keyset.KeySet - isTestnet bool - callbackURL string - callbackToken string - fullStatusUpdates bool - feeQuote *bt.FeeQuote - utxoClient UtxoClient - standardMiningFee bt.FeeUnit - responseWriter io.Writer - responseWriteIterationInterval int - - shutdown chan struct{} - - maxInputs int - batchSize int - - mu sync.RWMutex - satoshiMap map[string]uint64 - totalTxs int64 -} - -func WithFees(miningFeeSatPerKb int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - var fq = bt.NewFeeQuote() - - newStdFee := *stdFeeDefault - newDataFee := *dataFeeDefault - - newStdFee.MiningFee.Satoshis = miningFeeSatPerKb - newDataFee.MiningFee.Satoshis = miningFeeSatPerKb - - fq.AddQuote(bt.FeeTypeData, &newStdFee) - fq.AddQuote(bt.FeeTypeStandard, &newDataFee) - - broadcaster.feeQuote = fq - } -} - -func WithBatchSize(batchSize int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.batchSize = batchSize - } -} - -func WithMaxInputs(maxInputs int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.maxInputs = maxInputs - } -} - -func WithIsTestnet(isTestnet bool) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.isTestnet = isTestnet - } -} - -func WithCallback(callbackURL string, callbackToken string) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.callbackURL = callbackURL - broadcaster.callbackToken = callbackToken - } -} - -func WithFullstatusUpdates(fullStatusUpdates bool) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.fullStatusUpdates = fullStatusUpdates - } + Broadcaster + totalTxs int64 + connectionCount int64 + shutdown chan struct{} + utxoCh chan *bt.UTXO + wg sync.WaitGroup + satoshiMap sync.Map + ks *keyset.KeySet } -func WithStoreWriter(storeWriter io.Writer, resultIterations int) func(broadcaster *RateBroadcaster) { - return func(broadcaster *RateBroadcaster) { - broadcaster.responseWriter = storeWriter - broadcaster.responseWriteIterationInterval = resultIterations - } -} - -func NewRateBroadcaster(logger *slog.Logger, client ArcClient, fromKeySet *keyset.KeySet, utxoClient UtxoClient, opts ...func(p *RateBroadcaster)) (*RateBroadcaster, error) { - broadcaster := &RateBroadcaster{ - logger: logger, - client: client, - fundingKeyset: fromKeySet, - isTestnet: isTestnetDefault, - feeQuote: bt.NewFeeQuote(), - utxoClient: utxoClient, - batchSize: batchSizeDefault, - maxInputs: maxInputsDefault, - responseWriter: nil, - responseWriteIterationInterval: resultsIterationsDefault, - - shutdown: make(chan struct{}, 10), - satoshiMap: map[string]uint64{}, - } - - for _, opt := range opts { - opt(broadcaster) - } +func NewRateBroadcaster(logger *slog.Logger, client ArcClient, ks *keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { - standardFee, err := broadcaster.feeQuote.Fee(bt.FeeTypeStandard) + b, err := NewBroadcaster(logger.With(slog.String("address", ks.Address(!isTestnet))), client, utxoClient, isTestnet, opts...) if err != nil { return nil, err } + rb := &RateBroadcaster{ + Broadcaster: b, + shutdown: make(chan struct{}, 1), + utxoCh: nil, + wg: sync.WaitGroup{}, + satoshiMap: sync.Map{}, + ks: ks, + totalTxs: 0, + connectionCount: 0, + } - broadcaster.standardMiningFee = standardFee.MiningFee - - return broadcaster, nil + return rb, nil } func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { @@ -172,426 +71,34 @@ func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { return txFees } -func (b *RateBroadcaster) splitToFundingKeyset(tx *bt.Tx, splitSatoshis uint64, requestedSatoshis uint64, requestedOutputs int) (addedOutputs int, err error) { - - if requestedSatoshis > splitSatoshis { - return 0, fmt.Errorf("requested satoshis %d greater than satoshis to be split %d", requestedSatoshis, splitSatoshis) - } - - counter := 0 - - remaining := int64(splitSatoshis) - for remaining > int64(requestedSatoshis) && counter < requestedOutputs { - if uint64(remaining)-requestedSatoshis < b.calculateFeeSat(tx) { - break - } - - err := tx.PayTo(b.fundingKeyset.Script, requestedSatoshis) - if err != nil { - return 0, err - } - - remaining -= int64(requestedSatoshis) - counter++ - - } - - fee := b.calculateFeeSat(tx) - err = tx.PayTo(b.fundingKeyset.Script, uint64(remaining)-fee) - if err != nil { - return 0, err - } - - unlockerGetter := unlocker.Getter{PrivateKey: b.fundingKeyset.PrivateKey} - err = tx.FillAllInputs(context.Background(), &unlockerGetter) - if err != nil { - return 0, err - } - - return counter, nil -} - -func (b *RateBroadcaster) createConsolidationTxs(utxoSet *list.List, satoshiMap map[string]uint64) ([][]*bt.Tx, error) { - tx := bt.NewTx() - txSatoshis := uint64(0) - txsConsolidationBatches := make([][]*bt.Tx, 0) - txsConsolidation := make([]*bt.Tx, 0) - const consolidateBatchSize = 20 - - var next *list.Element - for front := utxoSet.Front(); front != nil; front = next { - next = front.Next() - utxoSet.Remove(front) - utxo, ok := front.Value.(*bt.UTXO) - if !ok { - return nil, errors.New("failed to parse value to utxo") - } - - txSatoshis += utxo.Satoshis - if next == nil { - if len(tx.Inputs) > 0 { - err := tx.FromUTXOs(utxo) - if err != nil { - return nil, err - } - - err = b.consolidateToFundingKeyset(tx, txSatoshis) - if err != nil { - return nil, err - } - - txsConsolidation = append(txsConsolidation, tx) - satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() - } - - if len(txsConsolidation) > 0 { - txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) - } - break - } - - err := tx.FromUTXOs(utxo) - if err != nil { - return nil, err - } - - if len(tx.Inputs) >= b.maxInputs { - err = b.consolidateToFundingKeyset(tx, txSatoshis) - if err != nil { - return nil, err - } - - txsConsolidation = append(txsConsolidation, tx) - - satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() - tx = bt.NewTx() - txSatoshis = 0 - } - - if len(txsConsolidation) >= consolidateBatchSize { - txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) - txsConsolidation = make([]*bt.Tx, 0) - } - - } - - return txsConsolidationBatches, nil -} - -func (b *RateBroadcaster) consolidateToFundingKeyset(tx *bt.Tx, txSatoshis uint64) error { - fee := b.calculateFeeSat(tx) - err := tx.PayTo(b.fundingKeyset.Script, txSatoshis-fee) - if err != nil { - return err - } - unlockerGetter := unlocker.Getter{PrivateKey: b.fundingKeyset.PrivateKey} - err = tx.FillAllInputs(context.Background(), &unlockerGetter) - if err != nil { - return err - } - return nil -} - -func (b *RateBroadcaster) Consolidate(ctx context.Context) error { - _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - if math.Abs(float64(unconfirmed)) > 0 { - return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", b.fundingKeyset.Address(!b.isTestnet), unconfirmed) - } - - utxoSet, err := b.utxoClient.GetUTXOsListWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Script, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return fmt.Errorf("failed to get utxos: %v", err) - } - - if utxoSet.Len() == 1 { - b.logger.Info("utxos already consolidated") - return nil - } - - satoshiMap := map[string]uint64{} - lastUtxoSetLen := 100_000_000 - - for { - if lastUtxoSetLen <= utxoSet.Len() { - b.logger.Error("utxo set length hasn't changed since last iteration") - break - } - lastUtxoSetLen = utxoSet.Len() - - // if requested outputs satisfied, return - if utxoSet.Len() == 1 { - break - } - - b.logger.Info("consolidating outputs", slog.Int("remaining", utxoSet.Len())) - - consolidationTxsBatches, err := b.createConsolidationTxs(utxoSet, satoshiMap) - if err != nil { - return fmt.Errorf("failed to create consolidation txs: %v", err) - } - - for i, batch := range consolidationTxsBatches { - time.Sleep(100 * time.Millisecond) // do not performance test ARC - - nrOutputs := 0 - nrInputs := 0 - for _, txBatch := range batch { - nrOutputs += len(txBatch.Outputs) - nrInputs += len(txBatch.Inputs) - } - - b.logger.Info(fmt.Sprintf("broadcasting consolidation batch %d/%d", i+1, len(consolidationTxsBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) - - resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) - if err != nil { - return fmt.Errorf("failed to broadcast consolidation txs: %v", err) - } - - for _, res := range resp { - if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { - b.logger.Error("consolidation tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) - for _, tx := range batch { - if tx.TxID() == res.Txid { - b.logger.Debug(tx.String()) - break - } - } - continue - } - - txIDBytes, err := hex.DecodeString(res.Txid) - if err != nil { - b.logger.Error("failed to decode txid", slog.String("err", err.Error())) - continue - } - - newUtxo := &bt.UTXO{ - TxID: txIDBytes, - Vout: 0, - LockingScript: b.fundingKeyset.Script, - Satoshis: satoshiMap[res.Txid], - } - - delete(satoshiMap, res.Txid) - - utxoSet.PushBack(newUtxo) - } - } - } - - return nil -} - -type splittingOutput struct { - satoshis uint64 - vout uint32 -} - -func (b *RateBroadcaster) CreateUtxos(ctx context.Context, requestedOutputs int, requestedSatoshisPerOutput uint64) error { - - requestedOutputsSatoshis := int64(requestedOutputs) * int64(requestedSatoshisPerOutput) - - confirmed, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - - if unconfirmed > 0 { - return fmt.Errorf("total balance not confirmed yet") - } - - balance := confirmed + unconfirmed - - if requestedOutputsSatoshis > balance { - return fmt.Errorf("requested total of satoshis %d exceeds balance on funding keyset %d", requestedOutputsSatoshis, balance) - } - - utxos, err := b.utxoClient.GetUTXOsWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Script, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) - if err != nil { - return err - } - - utxoSet := list.New() - for _, utxo := range utxos { - // collect right sized utxos - if utxo.Satoshis >= requestedSatoshisPerOutput { - utxoSet.PushBack(utxo) - continue - } - } - - // if requested outputs satisfied, return - if utxoSet.Len() >= requestedOutputs { - b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) - return nil - } - - satoshiMap := map[string][]splittingOutput{} - lastUtxoSetLen := 0 - - // if requested outputs not satisfied, create them - for { - if lastUtxoSetLen >= utxoSet.Len() { - b.logger.Error("utxo set length hasn't changed since last iteration") - break - } - lastUtxoSetLen = utxoSet.Len() - - // if requested outputs satisfied, return - if utxoSet.Len() >= requestedOutputs { - break - } +func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { - b.logger.Info("splitting outputs", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) - - // create splitting txs - txsSplitBatches, err := b.splitOutputs(requestedOutputs, requestedSatoshisPerOutput, utxoSet, satoshiMap) - if err != nil { - return err - } - - for i, batch := range txsSplitBatches { - nrOutputs := 0 - nrInputs := 0 - for _, txBatch := range batch { - nrOutputs += len(txBatch.Outputs) - nrInputs += len(txBatch.Inputs) - } - - b.logger.Info(fmt.Sprintf("broadcasting splitting batch %d/%d", i+1, len(txsSplitBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) - - resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) - if err != nil { - return fmt.Errorf("failed to braodcast tx: %v", err) - } - - for _, res := range resp { - if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { - b.logger.Error("splitting tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) - for _, tx := range batch { - if tx.TxID() == res.Txid { - b.logger.Debug(tx.String()) - break - } - } - continue - } - - txIDBytes, err := hex.DecodeString(res.Txid) - if err != nil { - b.logger.Error("failed to decode txid", slog.String("err", err.Error())) - continue - } - - foundOutputs, found := satoshiMap[res.Txid] - if !found { - b.logger.Error("output not found", slog.String("hash", res.Txid)) - continue - } - - for _, foundOutput := range foundOutputs { - newUtxo := &bt.UTXO{ - TxID: txIDBytes, - Vout: foundOutput.vout, - LockingScript: b.fundingKeyset.Script, - Satoshis: foundOutput.satoshis, - } - - utxoSet.PushBack(newUtxo) - } - delete(satoshiMap, res.Txid) + b.wg.Add(1) + go func() { + defer b.wg.Done() + for { + select { + case <-b.shutdown: + b.cancelAll() + case <-b.ctx.Done(): + return } - - // do not performance test ARC when creating the utxos - time.Sleep(100 * time.Millisecond) - } - } - - b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) - - return nil -} - -func (b *RateBroadcaster) splitOutputs(requestedOutputs int, requestedSatoshisPerOutput uint64, utxoSet *list.List, satoshiMap map[string][]splittingOutput) ([][]*bt.Tx, error) { - txsSplitBatches := make([][]*bt.Tx, 0) - txsSplit := make([]*bt.Tx, 0) - outputs := utxoSet.Len() - var err error - - var next *list.Element - for front := utxoSet.Front(); front != nil; front = next { - next = front.Next() - - if front == nil || outputs >= requestedOutputs { - break - } - - utxo, ok := front.Value.(*bt.UTXO) - if !ok { - return nil, errors.New("failed to parse value to utxo") - } - - tx := bt.NewTx() - err = tx.FromUTXOs(utxo) - if err != nil { - return nil, err } + }() - // only split if splitting increases nr of outputs - const feeMargin = 50 - if utxo.Satoshis < 2*requestedSatoshisPerOutput+feeMargin { - continue - } - - addedOutputs, err := b.splitToFundingKeyset(tx, utxo.Satoshis, requestedSatoshisPerOutput, requestedOutputs-outputs) - if err != nil { - return nil, err - } - utxoSet.Remove(front) - - outputs += addedOutputs - - txsSplit = append(txsSplit, tx) - - txOutputs := make([]splittingOutput, len(tx.Outputs)) - for i, txOutput := range tx.Outputs { - txOutputs[i] = splittingOutput{satoshis: txOutput.Satoshis, vout: uint32(i)} - } - - satoshiMap[tx.TxID()] = txOutputs - - if len(txsSplit) == b.batchSize { - txsSplitBatches = append(txsSplitBatches, txsSplit) - txsSplit = make([]*bt.Tx, 0) - } - } - - if len(txsSplit) > 0 { - txsSplitBatches = append(txsSplitBatches, txsSplit) - } - return txsSplitBatches, nil -} - -func (b *RateBroadcaster) StartRateBroadcaster(ctx context.Context, rateTxsPerSecond int, limit int64, wg *sync.WaitGroup) error { - - _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) + _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, b.ks.Address(!b.isTestnet), 1*time.Second, 5) if err != nil { return err } if math.Abs(float64(unconfirmed)) > 0 { - return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", b.fundingKeyset.Address(!b.isTestnet), unconfirmed) + return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", b.ks.Address(!b.isTestnet), unconfirmed) } - utxoSet, err := b.utxoClient.GetUTXOsWithRetries(ctx, !b.isTestnet, b.fundingKeyset.Script, b.fundingKeyset.Address(!b.isTestnet), 1*time.Second, 5) + utxoSet, err := b.utxoClient.GetUTXOsWithRetries(b.ctx, !b.isTestnet, b.ks.Script, b.ks.Address(!b.isTestnet), 1*time.Second, 5) if err != nil { return fmt.Errorf("failed to get utxos: %v", err) } - b.logger.Info("starting broadcasting", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", b.batchSize), slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) - submitBatchesPerSecond := float64(rateTxsPerSecond) / float64(b.batchSize) if submitBatchesPerSecond > millisecondsPerSecond { @@ -602,113 +109,45 @@ func (b *RateBroadcaster) StartRateBroadcaster(ctx context.Context, rateTxsPerSe return fmt.Errorf("size of utxo set %d is smaller than requested batch size %d - create more utxos first", len(utxoSet), b.batchSize) } - utxoCh := make(chan *bt.UTXO, len(utxoSet)) - + b.utxoCh = make(chan *bt.UTXO, 100000) for _, utxo := range utxoSet { - utxoCh <- utxo + b.utxoCh <- utxo } submitBatchInterval := time.Duration(millisecondsPerSecond/float64(submitBatchesPerSecond)) * time.Millisecond submitBatchTicker := time.NewTicker(submitBatchInterval) - logSummaryTicker := time.NewTicker(3 * time.Second) - - responseCh := make(chan *metamorph_api.TransactionStatus, 100) errCh := make(chan error, 100) - var writer *bufio.Writer - if b.responseWriter != nil { - writer = bufio.NewWriter(b.responseWriter) - if err != nil { - return err - } - err = writeJsonArrayStart(writer) - if err != nil { - return err - } - } - - resultsMap := map[metamorph_api.Status]int64{} - - counter := 0 + b.wg.Add(1) go func() { - defer func() { - if writer != nil { - err = writeJsonArrayFinish(writer) - if err != nil { - b.logger.Error("failed to write json array finish", slog.String("err", err.Error())) - } - - err = writer.Flush() - if err != nil { - b.logger.Error("failed flush writer", slog.String("err", err.Error())) - } - } - - b.logger.Info("shutting down broadcaster", slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) - wg.Done() + b.logger.Info("shutting down broadcaster") + b.wg.Done() }() for { select { - case <-b.shutdown: - return - case <-ctx.Done(): + case <-b.ctx.Done(): return case <-submitBatchTicker.C: - txs, err := b.createSelfPayingTxs(utxoCh) + txs, err := b.createSelfPayingTxs() if err != nil { b.logger.Error("failed to create self paying txs", slog.String("err", err.Error())) b.shutdown <- struct{}{} continue } - go b.broadcastBatch(ctx, txs, responseCh, errCh, utxoCh, false, metamorph_api.Status_STORED, limit) - - case <-logSummaryTicker.C: - - b.mu.Lock() - totalTxs := b.totalTxs - b.mu.Unlock() + if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { + b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.Int64("limit", limit)) + b.shutdown <- struct{}{} + } - b.logger.Info("summary", - slog.String("address", b.fundingKeyset.Address(!b.isTestnet)), - slog.Int("utxo set length", len(utxoCh)), - slog.Int64("total", totalTxs), - ) + b.broadcastBatchAsync(txs, errCh, metamorph_api.Status_RECEIVED) case responseErr := <-errCh: b.logger.Error("failed to submit transactions", slog.String("err", responseErr.Error())) - if writer == nil { - continue - } - - counter++ - err = b.write(&counter, writer, fmt.Sprintf(",%s\n", responseErr.Error())) - if err != nil { - b.logger.Error("failed to write", slog.String("err", err.Error())) - continue - } - case res := <-responseCh: - resultsMap[res.Status]++ - if writer == nil { - continue - } - // if writer is given, store the response in the writer - counter++ - resBytes, err := json.Marshal(res) - if err != nil { - b.logger.Error("failed to marshal response", slog.String("err", err.Error())) - continue - } - - err = b.write(&counter, writer, fmt.Sprintf(",%s\n", string(resBytes))) - if err != nil { - b.logger.Error("failed to write", slog.String("err", err.Error())) - continue - } } } }() @@ -716,128 +155,112 @@ func (b *RateBroadcaster) StartRateBroadcaster(ctx context.Context, rateTxsPerSe return nil } -func writeJsonArrayStart(writer *bufio.Writer) error { - _, err := fmt.Fprint(writer, "[") - if err != nil { - return fmt.Errorf("failed to print response: %v", err) - } - - return nil -} - -func writeJsonArrayFinish(writer *bufio.Writer) error { - _, err := fmt.Fprint(writer, "]") - if err != nil { - return fmt.Errorf("failed to print response: %v", err) - } +func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { + txs := make([]*bt.Tx, 0, b.batchSize) - return nil -} +utxoLoop: + for { + select { + case <-b.ctx.Done(): + return txs, nil + //case <-time.NewTimer(1 * time.Second).C: + // return txs, nil + case utxo := <-b.utxoCh: + tx := bt.NewTx() + + err := tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } -func (b *RateBroadcaster) write(counter *int, writer *bufio.Writer, content string) error { - _, err := fmt.Fprint(writer, content) - if err != nil { - return fmt.Errorf("failed to print response: %v", err) - } + fee := b.calculateFeeSat(tx) - if *counter%b.responseWriteIterationInterval == 0 { - err = writer.Flush() - if err != nil { - return fmt.Errorf("failed flush writer: %v", err) - } + if utxo.Satoshis <= fee { + continue + } - *counter = 0 - } + err = tx.PayTo(b.ks.Script, utxo.Satoshis-fee) + if err != nil { + return nil, err + } - return nil -} + // Todo: Add OP_RETURN with text "ARC testing" so that WoC can tag it -func (b *RateBroadcaster) createSelfPayingTxs(utxos chan *bt.UTXO) ([]*bt.Tx, error) { - txs := make([]*bt.Tx, 0, b.batchSize) - - for utxo := range utxos { - tx := bt.NewTx() + unlockerGetter := unlocker.Getter{PrivateKey: b.ks.PrivateKey} + err = tx.FillAllInputs(context.Background(), &unlockerGetter) + if err != nil { + return nil, err + } - err := tx.FromUTXOs(utxo) - if err != nil { - return nil, err - } + b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) - fee := b.calculateFeeSat(tx) + txs = append(txs, tx) - if utxo.Satoshis <= fee { - continue + if len(txs) >= b.batchSize { + break utxoLoop + } } + } - err = tx.PayTo(b.fundingKeyset.Script, utxo.Satoshis-fee) - if err != nil { - return nil, err - } + return txs, nil +} - unlockerGetter := unlocker.Getter{PrivateKey: b.fundingKeyset.PrivateKey} - err = tx.FillAllInputs(context.Background(), &unlockerGetter) +func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, waitForStatus metamorph_api.Status) { + b.wg.Add(1) + go func() { + defer b.wg.Done() + atomic.AddInt64(&b.connectionCount, 1) + resp, err := b.client.BroadcastTransactions(b.ctx, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) if err != nil { - return nil, err + if errors.Is(err, context.Canceled) { + return + } + errCh <- err } - b.mu.Lock() - b.satoshiMap[tx.TxID()] = tx.Outputs[0].Satoshis - b.mu.Unlock() - txs = append(txs, tx) + atomic.AddInt64(&b.connectionCount, -1) - if len(txs) >= b.batchSize { - break - } - } + for _, res := range resp { - return txs, nil -} + txIDBytes, err := hex.DecodeString(res.Txid) + if err != nil { + b.logger.Error("failed to decode txid", slog.String("err", err.Error())) + continue + } -func (b *RateBroadcaster) broadcastBatch(ctx context.Context, txs []*bt.Tx, resultCh chan *metamorph_api.TransactionStatus, errCh chan error, utxoCh chan *bt.UTXO, skipFeeValidation bool, waitForStatus metamorph_api.Status, limit int64) { - b.mu.Lock() - if limit > 0 && b.totalTxs >= limit { - return - } - b.mu.Unlock() + sat, found := b.satoshiMap.Load(res.Txid) + satoshis, isValid := sat.(uint64) - limitReachedNotified := false + if found && isValid { + newUtxo := &bt.UTXO{ + TxID: txIDBytes, + Vout: 0, + LockingScript: b.ks.Script, + Satoshis: satoshis, + } + b.utxoCh <- newUtxo + } - ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Minute) - defer cancel() + b.satoshiMap.Delete(res.Txid) - resp, err := b.client.BroadcastTransactions(ctxWithTimeout, txs, waitForStatus, b.callbackURL, b.callbackToken, b.fullStatusUpdates, skipFeeValidation) - if err != nil { - if errors.Is(err, context.Canceled) { - b.logger.Debug("broadcasting canceled", slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) - return + atomic.AddInt64(&b.totalTxs, 1) } - errCh <- err - } + }() +} +func (b *RateBroadcaster) Shutdown() { + b.cancelAll() - for _, res := range resp { - resultCh <- res + b.wg.Wait() +} - txIDBytes, err := hex.DecodeString(res.Txid) - if err != nil { - b.logger.Error("failed to decode txid", slog.String("err", err.Error())) - continue - } - b.mu.Lock() - newUtxo := &bt.UTXO{ - TxID: txIDBytes, - Vout: 0, - LockingScript: b.fundingKeyset.Script, - Satoshis: b.satoshiMap[res.Txid], - } - utxoCh <- newUtxo - - delete(b.satoshiMap, res.Txid) - b.totalTxs++ - if limit > 0 && b.totalTxs >= limit && !limitReachedNotified { - b.logger.Info("limit reached", slog.Int64("total", b.totalTxs), slog.String("address", b.fundingKeyset.Address(!b.isTestnet))) - b.shutdown <- struct{}{} - limitReachedNotified = true - } - b.mu.Unlock() - } +func (b *RateBroadcaster) GetTxCount() int64 { + return atomic.LoadInt64(&b.totalTxs) +} + +func (b *RateBroadcaster) GetConnectionCount() int64 { + return atomic.LoadInt64(&b.connectionCount) +} + +func (b *RateBroadcaster) GetUtxoSetLen() int { + return len(b.utxoCh) } diff --git a/internal/broadcaster/utxo_consolidator.go b/internal/broadcaster/utxo_consolidator.go new file mode 100644 index 000000000..ab31ea97a --- /dev/null +++ b/internal/broadcaster/utxo_consolidator.go @@ -0,0 +1,213 @@ +package broadcaster + +import ( + "container/list" + "context" + "encoding/hex" + "errors" + "fmt" + "log/slog" + "math" + "time" + + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" + "github.com/libsv/go-bt/v2" + "github.com/libsv/go-bt/v2/unlocker" +) + +type UTXOConsolidator struct { + Broadcaster + keySets []*keyset.KeySet +} + +func NewUTXOConsolidator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*UTXOConsolidator, error) { + + b, err := NewBroadcaster(logger, client, utxoClient, isTestnet, opts...) + if err != nil { + return nil, err + } + + consolidator := &UTXOConsolidator{ + Broadcaster: b, + keySets: keySets, + } + + return consolidator, nil +} + +func (b *UTXOConsolidator) Consolidate() error { + for _, ks := range b.keySets { + b.logger.Info("consolidating utxos", slog.String("address", ks.Address(!b.isTestnet))) + _, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + if math.Abs(float64(unconfirmed)) > 0 { + return fmt.Errorf("key with address %s balance has unconfirmed amount %d sat", ks.Address(!b.isTestnet), unconfirmed) + } + + utxoSet, err := b.utxoClient.GetUTXOsListWithRetries(b.ctx, !b.isTestnet, ks.Script, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return fmt.Errorf("failed to get utxos: %v", err) + } + + if utxoSet.Len() == 1 { + b.logger.Info("utxos already consolidated") + return nil + } + + satoshiMap := map[string]uint64{} + lastUtxoSetLen := 100_000_000 + + for { + if lastUtxoSetLen <= utxoSet.Len() { + b.logger.Error("utxo set length hasn't changed since last iteration") + break + } + lastUtxoSetLen = utxoSet.Len() + + // if requested outputs satisfied, return + if utxoSet.Len() == 1 { + break + } + + b.logger.Info("consolidating outputs", slog.Int("remaining", utxoSet.Len())) + + consolidationTxsBatches, err := b.createConsolidationTxs(utxoSet, satoshiMap, ks) + if err != nil { + return fmt.Errorf("failed to create consolidation txs: %v", err) + } + + for i, batch := range consolidationTxsBatches { + time.Sleep(100 * time.Millisecond) // do not performance test ARC + + nrOutputs := 0 + nrInputs := 0 + for _, txBatch := range batch { + nrOutputs += len(txBatch.Outputs) + nrInputs += len(txBatch.Inputs) + } + + b.logger.Info(fmt.Sprintf("broadcasting consolidation batch %d/%d", i+1, len(consolidationTxsBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) + + resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) + if err != nil { + return fmt.Errorf("failed to broadcast consolidation txs: %v", err) + } + + for _, res := range resp { + if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { + b.logger.Error("consolidation tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) + for _, tx := range batch { + if tx.TxID() == res.Txid { + b.logger.Debug(tx.String()) + break + } + } + continue + } + + txIDBytes, err := hex.DecodeString(res.Txid) + if err != nil { + b.logger.Error("failed to decode txid", slog.String("err", err.Error())) + continue + } + + newUtxo := &bt.UTXO{ + TxID: txIDBytes, + Vout: 0, + LockingScript: ks.Script, + Satoshis: satoshiMap[res.Txid], + } + + delete(satoshiMap, res.Txid) + + utxoSet.PushBack(newUtxo) + } + } + } + } + return nil +} + +func (b *UTXOConsolidator) createConsolidationTxs(utxoSet *list.List, satoshiMap map[string]uint64, fundingKeySet *keyset.KeySet) ([][]*bt.Tx, error) { + tx := bt.NewTx() + txSatoshis := uint64(0) + txsConsolidationBatches := make([][]*bt.Tx, 0) + txsConsolidation := make([]*bt.Tx, 0) + const consolidateBatchSize = 20 + + var next *list.Element + for front := utxoSet.Front(); front != nil; front = next { + next = front.Next() + utxoSet.Remove(front) + utxo, ok := front.Value.(*bt.UTXO) + if !ok { + return nil, errors.New("failed to parse value to utxo") + } + + txSatoshis += utxo.Satoshis + if next == nil { + if len(tx.Inputs) > 0 { + err := tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } + + err = b.consolidateToFundingKeyset(tx, txSatoshis, fundingKeySet) + if err != nil { + return nil, err + } + + txsConsolidation = append(txsConsolidation, tx) + satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() + } + + if len(txsConsolidation) > 0 { + txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) + } + break + } + + err := tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } + + if len(tx.Inputs) >= b.maxInputs { + err = b.consolidateToFundingKeyset(tx, txSatoshis, fundingKeySet) + if err != nil { + return nil, err + } + + txsConsolidation = append(txsConsolidation, tx) + + satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() + tx = bt.NewTx() + txSatoshis = 0 + } + + if len(txsConsolidation) >= consolidateBatchSize { + txsConsolidationBatches = append(txsConsolidationBatches, txsConsolidation) + txsConsolidation = make([]*bt.Tx, 0) + } + + } + + return txsConsolidationBatches, nil +} + +func (b *UTXOConsolidator) consolidateToFundingKeyset(tx *bt.Tx, txSatoshis uint64, fundingKeySet *keyset.KeySet) error { + fee := b.calculateFeeSat(tx) + err := tx.PayTo(fundingKeySet.Script, txSatoshis-fee) + if err != nil { + return err + } + unlockerGetter := unlocker.Getter{PrivateKey: fundingKeySet.PrivateKey} + err = tx.FillAllInputs(context.Background(), &unlockerGetter) + if err != nil { + return err + } + return nil +} diff --git a/internal/broadcaster/utxo_creator.go b/internal/broadcaster/utxo_creator.go new file mode 100644 index 000000000..c27e35235 --- /dev/null +++ b/internal/broadcaster/utxo_creator.go @@ -0,0 +1,266 @@ +package broadcaster + +import ( + "container/list" + "context" + "encoding/hex" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api" + "github.com/libsv/go-bt/v2" + "github.com/libsv/go-bt/v2/unlocker" +) + +type UTXOCreator struct { + Broadcaster + keySets []*keyset.KeySet +} + +func NewUTXOCreator(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*UTXOCreator, error) { + b, err := NewBroadcaster(logger, client, utxoClient, isTestnet, opts...) + if err != nil { + return nil, err + } + + creator := &UTXOCreator{ + Broadcaster: b, + keySets: keySets, + } + + return creator, nil +} + +func (b *UTXOCreator) CreateUtxos(requestedOutputs int, requestedSatoshisPerOutput uint64) error { + for _, ks := range b.keySets { + b.logger.Info("creating utxos", slog.String("address", ks.Address(!b.isTestnet))) + + requestedOutputsSatoshis := int64(requestedOutputs) * int64(requestedSatoshisPerOutput) + + confirmed, unconfirmed, err := b.utxoClient.GetBalanceWithRetries(b.ctx, !b.isTestnet, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + + if unconfirmed > 0 { + return fmt.Errorf("total balance not confirmed yet") + } + + balance := confirmed + unconfirmed + + if requestedOutputsSatoshis > balance { + return fmt.Errorf("requested total of satoshis %d exceeds balance on funding keyset %d", requestedOutputsSatoshis, balance) + } + + utxos, err := b.utxoClient.GetUTXOsWithRetries(b.ctx, !b.isTestnet, ks.Script, ks.Address(!b.isTestnet), 1*time.Second, 5) + if err != nil { + return err + } + + utxoSet := list.New() + for _, utxo := range utxos { + // collect right sized utxos + if utxo.Satoshis >= requestedSatoshisPerOutput { + utxoSet.PushBack(utxo) + continue + } + } + + // if requested outputs satisfied, return + if utxoSet.Len() >= requestedOutputs { + b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + return nil + } + + satoshiMap := map[string][]splittingOutput{} + lastUtxoSetLen := 0 + + // if requested outputs not satisfied, create them + for { + if lastUtxoSetLen >= utxoSet.Len() { + b.logger.Error("utxo set length hasn't changed since last iteration") + break + } + lastUtxoSetLen = utxoSet.Len() + + // if requested outputs satisfied, return + if utxoSet.Len() >= requestedOutputs { + break + } + + b.logger.Info("splitting outputs", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + + // create splitting txs + txsSplitBatches, err := b.splitOutputs(requestedOutputs, requestedSatoshisPerOutput, utxoSet, satoshiMap, ks) + if err != nil { + return err + } + + for i, batch := range txsSplitBatches { + nrOutputs := 0 + nrInputs := 0 + for _, txBatch := range batch { + nrOutputs += len(txBatch.Outputs) + nrInputs += len(txBatch.Inputs) + } + + b.logger.Info(fmt.Sprintf("broadcasting splitting batch %d/%d", i+1, len(txsSplitBatches)), slog.Int("size", len(batch)), slog.Int("inputs", nrInputs), slog.Int("outputs", nrOutputs)) + + resp, err := b.client.BroadcastTransactions(context.Background(), batch, metamorph_api.Status_SEEN_ON_NETWORK, b.callbackURL, b.callbackToken, b.fullStatusUpdates, false) + if err != nil { + return fmt.Errorf("failed to braodcast tx: %v", err) + } + + for _, res := range resp { + if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { + b.logger.Error("splitting tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) + for _, tx := range batch { + if tx.TxID() == res.Txid { + b.logger.Debug(tx.String()) + break + } + } + continue + } + + txIDBytes, err := hex.DecodeString(res.Txid) + if err != nil { + b.logger.Error("failed to decode txid", slog.String("err", err.Error())) + continue + } + + foundOutputs, found := satoshiMap[res.Txid] + if !found { + b.logger.Error("output not found", slog.String("hash", res.Txid)) + continue + } + + for _, foundOutput := range foundOutputs { + newUtxo := &bt.UTXO{ + TxID: txIDBytes, + Vout: foundOutput.vout, + LockingScript: ks.Script, + Satoshis: foundOutput.satoshis, + } + + utxoSet.PushBack(newUtxo) + } + delete(satoshiMap, res.Txid) + } + + // do not performance test ARC when creating the utxos + time.Sleep(100 * time.Millisecond) + } + } + + b.logger.Info("utxo set", slog.Int("ready", utxoSet.Len()), slog.Int("requested", requestedOutputs), slog.Uint64("satoshis", requestedSatoshisPerOutput)) + } + return nil +} + +func (b *UTXOCreator) splitOutputs(requestedOutputs int, requestedSatoshisPerOutput uint64, utxoSet *list.List, satoshiMap map[string][]splittingOutput, fundingKeySet *keyset.KeySet) ([][]*bt.Tx, error) { + txsSplitBatches := make([][]*bt.Tx, 0) + txsSplit := make([]*bt.Tx, 0) + outputs := utxoSet.Len() + var err error + + var next *list.Element + for front := utxoSet.Front(); front != nil; front = next { + next = front.Next() + + if front == nil || outputs >= requestedOutputs { + break + } + + utxo, ok := front.Value.(*bt.UTXO) + if !ok { + return nil, errors.New("failed to parse value to utxo") + } + + tx := bt.NewTx() + err = tx.FromUTXOs(utxo) + if err != nil { + return nil, err + } + + // only split if splitting increases nr of outputs + const feeMargin = 50 + if utxo.Satoshis < 2*requestedSatoshisPerOutput+feeMargin { + continue + } + + addedOutputs, err := b.splitToFundingKeyset(tx, utxo.Satoshis, requestedSatoshisPerOutput, requestedOutputs-outputs, fundingKeySet) + if err != nil { + return nil, err + } + utxoSet.Remove(front) + + outputs += addedOutputs + + txsSplit = append(txsSplit, tx) + + txOutputs := make([]splittingOutput, len(tx.Outputs)) + for i, txOutput := range tx.Outputs { + txOutputs[i] = splittingOutput{satoshis: txOutput.Satoshis, vout: uint32(i)} + } + + satoshiMap[tx.TxID()] = txOutputs + + if len(txsSplit) == b.batchSize { + txsSplitBatches = append(txsSplitBatches, txsSplit) + txsSplit = make([]*bt.Tx, 0) + } + } + + if len(txsSplit) > 0 { + txsSplitBatches = append(txsSplitBatches, txsSplit) + } + return txsSplitBatches, nil +} + +func (b *UTXOCreator) splitToFundingKeyset(tx *bt.Tx, splitSatoshis uint64, requestedSatoshis uint64, requestedOutputs int, fundingKeySet *keyset.KeySet) (addedOutputs int, err error) { + + if requestedSatoshis > splitSatoshis { + return 0, fmt.Errorf("requested satoshis %d greater than satoshis to be split %d", requestedSatoshis, splitSatoshis) + } + + counter := 0 + + remaining := int64(splitSatoshis) + for remaining > int64(requestedSatoshis) && counter < requestedOutputs { + if uint64(remaining)-requestedSatoshis < b.calculateFeeSat(tx) { + break + } + + err := tx.PayTo(fundingKeySet.Script, requestedSatoshis) + if err != nil { + return 0, err + } + + remaining -= int64(requestedSatoshis) + counter++ + + } + + fee := b.calculateFeeSat(tx) + err = tx.PayTo(fundingKeySet.Script, uint64(remaining)-fee) + if err != nil { + return 0, err + } + + unlockerGetter := unlocker.Getter{PrivateKey: fundingKeySet.PrivateKey} + err = tx.FillAllInputs(context.Background(), &unlockerGetter) + if err != nil { + return 0, err + } + + return counter, nil +} + +type splittingOutput struct { + satoshis uint64 + vout uint32 +} diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index d7a84fb9a..bd8a8647b 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -416,7 +416,7 @@ func (p *Processor) StartProcessExpiredTransactions() { } if announced > 0 || requested > 0 { - p.logger.Info("Retried unmined transactions", slog.Int("announced", announced), slog.Int("requested", requested), slog.String("since", getUnminedSince.String())) + p.logger.Info("Retried unmined transactions", slog.Int("announced", announced), slog.Int("requested", requested), slog.Time("since", getUnminedSince)) } } }