Skip to content

Commit

Permalink
Merge pull request #477 from bitcoin-sv/refactor/broadcaster
Browse files Browse the repository at this point in the history
Refactor/broadcaster
  • Loading branch information
boecklim authored Jun 25, 2024
2 parents 0220143 + 4b3ac45 commit ebb9f63
Show file tree
Hide file tree
Showing 9 changed files with 920 additions and 819 deletions.
88 changes: 22 additions & 66 deletions cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package broadcast

import (
"context"
"errors"
"fmt"
"io"
"log"
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"time"

"github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper"
"github.com/bitcoin-sv/arc/internal/broadcaster"
"github.com/bitcoin-sv/arc/internal/woc_client"
"github.com/bitcoin-sv/arc/pkg/keyset"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -46,11 +43,6 @@ var Cmd = &cobra.Command{
return err
}

store, err := helper.GetBool("store")
if err != nil {
return err
}

fullStatusUpdates, err := helper.GetBool("fullStatusUpdates")
if err != nil {
return err
Expand Down Expand Up @@ -88,6 +80,7 @@ var Cmd = &cobra.Command{
if err != nil {
return err
}

if miningFeeSat == 0 {
return errors.New("no mining fee was given")
}
Expand Down Expand Up @@ -116,74 +109,43 @@ var Cmd = &cobra.Command{
return fmt.Errorf("failed to create client: %v", err)
}

rbs := make([]*broadcaster.RateBroadcaster, len(keyFiles))

wg := &sync.WaitGroup{}

var resultsPath string
if store {
network := "mainnet"
if isTestnet {
network = "testnet"
}
resultsPath = filepath.Join(".", fmt.Sprintf("results/%s-%s-rate-%d-batchsize-%d", network, time.Now().Format(time.DateTime), rateTxsPerSecond, batchSize))
err := os.MkdirAll(resultsPath, os.ModePerm)
if err != nil {
return err
}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fundingKeySets := make([]*keyset.KeySet, len(keyFiles))
for i, kf := range keyFiles {

wg.Add(1)

fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf)
if err != nil {
return fmt.Errorf("failed to get key sets: %v", err)
}

wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger))

var writer io.Writer
if store {

_, keyFileName := filepath.Split(kf)

file, err := os.Create(fmt.Sprintf("%s/%s.json", resultsPath, keyFileName))
if err != nil {
return err
}

writer = file

defer file.Close()
}

rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient, broadcaster.WithFees(miningFeeSat), broadcaster.WithIsTestnet(isTestnet), broadcaster.WithCallback(callbackURL, callbackToken), broadcaster.WithFullstatusUpdates(fullStatusUpdates), broadcaster.WithBatchSize(batchSize), broadcaster.WithStoreWriter(writer, 50))
if err != nil {
return fmt.Errorf("failed to create rate broadcaster: %v", err)
}
fundingKeySets[i] = fundingKeySet
}

rbs[i] = rateBroadcaster
wocClient := woc_client.New(woc_client.WithAuth(wocApiKey), woc_client.WithLogger(logger))

err = rateBroadcaster.StartRateBroadcaster(ctx, rateTxsPerSecond, limit, wg)
if err != nil {
return fmt.Errorf("failed to start rate broadcaster: %v", err)
}
rateBroadcaster, err := broadcaster.NewMultiKeyRateBroadcaster(logger, client, fundingKeySets, wocClient, isTestnet,
broadcaster.WithFees(miningFeeSat),
broadcaster.WithCallback(callbackURL, callbackToken),
broadcaster.WithFullstatusUpdates(fullStatusUpdates),
broadcaster.WithBatchSize(batchSize),
)
if err != nil {
return fmt.Errorf("failed to create rate broadcaster: %v", err)
}

go func() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt) // Signal from Ctrl+C
<-signalChan

cancel()
logger.Info("Shutting down broadcaster")
rateBroadcaster.Shutdown()
}()

wg.Wait()
logger.Info("Starting broadcaster", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize))
err = rateBroadcaster.Start(rateTxsPerSecond, limit)
if err != nil {
return fmt.Errorf("failed to start rate broadcaster: %v", err)
}

return nil
},
Expand All @@ -209,10 +171,4 @@ func init() {
if err != nil {
log.Fatal(err)
}

Cmd.Flags().Bool("store", false, "Store results in a json file instead of printing")
err = viper.BindPFlag("store", Cmd.Flags().Lookup("store"))
if err != nil {
log.Fatal(err)
}
}
40 changes: 18 additions & 22 deletions cmd/broadcaster-cli/app/utxos/consolidate/consolidate.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package consolidate

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper"
"github.com/bitcoin-sv/arc/internal/broadcaster"
"github.com/bitcoin-sv/arc/internal/woc_client"
"github.com/bitcoin-sv/arc/pkg/keyset"
"github.com/spf13/cobra"
"strings"
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -85,32 +83,30 @@ var Cmd = &cobra.Command{

keyFiles := strings.Split(keyFile, ",")

for _, kf := range keyFiles {
fundingKeySets := make([]*keyset.KeySet, len(keyFiles))

if wocApiKey == "" {
time.Sleep(1 * time.Second)
}
for i, kf := range keyFiles {

fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf)
if err != nil {
return fmt.Errorf("failed to get key sets: %v", err)
}

rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient,
broadcaster.WithFees(miningFeeSat),
broadcaster.WithIsTestnet(isTestnet),
broadcaster.WithCallback(callbackURL, callbackToken),
broadcaster.WithFullstatusUpdates(fullStatusUpdates),
)
if err != nil {
return fmt.Errorf("failed to create broadcaster: %v", err)
}
fundingKeySets[i] = fundingKeySet
}

logger.Info("consolidating utxos", slog.String("key", kf), slog.String("address", fundingKeySet.Address(!isTestnet)))
err = rateBroadcaster.Consolidate(context.Background())
if err != nil {
return fmt.Errorf("failed to consolidate utxos: %v", err)
}
rateBroadcaster, err := broadcaster.NewUTXOConsolidator(logger, client, fundingKeySets, wocClient, isTestnet,
broadcaster.WithFees(miningFeeSat),
broadcaster.WithCallback(callbackURL, callbackToken),
broadcaster.WithFullstatusUpdates(fullStatusUpdates),
)
if err != nil {
return fmt.Errorf("failed to create broadcaster: %v", err)
}

err = rateBroadcaster.Consolidate()
if err != nil {
return fmt.Errorf("failed to consolidate utxos: %v", err)
}

return nil
Expand Down
38 changes: 17 additions & 21 deletions cmd/broadcaster-cli/app/utxos/create/create.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package create

import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"strings"
"time"

"github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper"
"github.com/bitcoin-sv/arc/internal/broadcaster"
"github.com/bitcoin-sv/arc/internal/woc_client"
"github.com/bitcoin-sv/arc/pkg/keyset"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -103,32 +101,30 @@ var Cmd = &cobra.Command{

keyFiles := strings.Split(keyFile, ",")

for _, kf := range keyFiles {
fundingKeySets := make([]*keyset.KeySet, len(keyFiles))

if wocApiKey == "" {
time.Sleep(1 * time.Second)
}
for i, kf := range keyFiles {

fundingKeySet, _, err := helper.GetKeySetsKeyFile(kf)
if err != nil {
return fmt.Errorf("failed to get key sets: %v", err)
}

rateBroadcaster, err := broadcaster.NewRateBroadcaster(logger, client, fundingKeySet, wocClient,
broadcaster.WithFees(miningFeeSat),
broadcaster.WithIsTestnet(isTestnet),
broadcaster.WithCallback(callbackURL, callbackToken),
broadcaster.WithFullstatusUpdates(fullStatusUpdates),
)
if err != nil {
return fmt.Errorf("failed to create broadcaster: %v", err)
}
fundingKeySets[i] = fundingKeySet
}

logger.Info("creating utxos", slog.String("key", kf), slog.String("address", fundingKeySet.Address(!isTestnet)))
err = rateBroadcaster.CreateUtxos(context.Background(), outputs, satoshisPerOutput)
if err != nil {
return fmt.Errorf("failed to create utxos: %v", err)
}
rateBroadcaster, err := broadcaster.NewUTXOCreator(logger, client, fundingKeySets, wocClient, isTestnet,
broadcaster.WithFees(miningFeeSat),
broadcaster.WithCallback(callbackURL, callbackToken),
broadcaster.WithFullstatusUpdates(fullStatusUpdates),
)
if err != nil {
return fmt.Errorf("failed to create broadcaster: %v", err)
}

err = rateBroadcaster.CreateUtxos(outputs, satoshisPerOutput)
if err != nil {
return fmt.Errorf("failed to create utxos: %v", err)
}

return nil
Expand Down
Loading

0 comments on commit ebb9f63

Please sign in to comment.