Skip to content

Commit

Permalink
feat(ARCO-299): merge register and request transactions in blocktx
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain committed Dec 2, 2024
1 parent 5de1703 commit c01af3e
Show file tree
Hide file tree
Showing 19 changed files with 126 additions and 501 deletions.
8 changes: 3 additions & 5 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to create blocktx store: %v", err)
}

registerTxsChan := make(chan []byte, chanBufferSize)
requestTxChannel := make(chan []byte, chanBufferSize)

natsConnection, err := nats_connection.New(arcConfig.MessageQueue.URL, logger)
Expand All @@ -94,7 +93,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
}

if arcConfig.MessageQueue.Streaming.Enabled {
opts := []nats_jetstream.Option{nats_jetstream.WithSubscribedTopics(blocktx.RegisterTxTopic, blocktx.RequestTxTopic)}
opts := []nats_jetstream.Option{nats_jetstream.WithSubscribedTopics(blocktx.RequestTxTopic)}
if arcConfig.MessageQueue.Streaming.FileStorage {
opts = append(opts, nats_jetstream.WithFileStorage())
}
Expand All @@ -104,7 +103,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
}

mqClient, err = nats_jetstream.New(natsConnection, logger,
[]string{blocktx.MinedTxsTopic, blocktx.RegisterTxTopic, blocktx.RequestTxTopic},
[]string{blocktx.MinedTxsTopic, blocktx.RequestTxTopic},
opts...,
)
if err != nil {
Expand All @@ -121,9 +120,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err

processorOpts = append(processorOpts,
blocktx.WithRetentionDays(btxConfig.RecordRetentionDays),
blocktx.WithRegisterTxsChan(registerTxsChan),
blocktx.WithRequestTxChan(requestTxChannel),
blocktx.WithRegisterTxsInterval(btxConfig.RegisterTxsInterval),
blocktx.WithRequestTxsInterval(btxConfig.RequestTxsInterval),
blocktx.WithMessageQueueClient(mqClient),
)

Expand Down
3 changes: 1 addition & 2 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
}

mqClient, err = nats_jetstream.New(natsClient, logger,
[]string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RegisterTxTopic, metamorph.RequestTxTopic},
[]string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RequestTxTopic},
opts...,
)
if err != nil {
Expand Down Expand Up @@ -196,7 +196,6 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

server, err = metamorph.NewServer(arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, logger,
metamorphStore, processor, arcConfig.Tracing, optsServer...)

if err != nil {
stopFn()
return nil, fmt.Errorf("create GRPCServer failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type BlocktxConfig struct {
HealthServerDialAddr string `mapstructure:"healthServerDialAddr"`
Db *DbConfig `mapstructure:"db"`
RecordRetentionDays int `mapstructure:"recordRetentionDays"`
RegisterTxsInterval time.Duration `mapstructure:"registerTxsInterval"`
RequestTxsInterval time.Duration `mapstructure:"requestTxsInterval"`
MonitorPeers bool `mapstructure:"monitorPeers"`
FillGapsInterval time.Duration `mapstructure:"fillGapsInterval"`
MaxAllowedBlockHeightMismatch int `mapstructure:"maxAllowedBlockHeightMismatch"`
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func getBlocktxConfig() *BlocktxConfig {
HealthServerDialAddr: "localhost:8006",
Db: getDbConfig("blocktx"),
RecordRetentionDays: 28,
RegisterTxsInterval: 10 * time.Second,
RequestTxsInterval: 10 * time.Second,
MonitorPeers: false,
FillGapsInterval: 15 * time.Minute,
MaxAllowedBlockHeightMismatch: 3,
Expand Down
2 changes: 1 addition & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ blocktx:
maxOpenConns: 80 # maximum open connections
sslMode: disable
recordRetentionDays: 28 # number of days for which data integrity is ensured
registerTxsInterval: 10s # time interval to read from the channel registered transactions
requestTxsInterval: 10s # time interval to register and request mined transactions
monitorPeers: false # if enabled, peers which do not receive alive signal from nodes will be restarted
fillGapsInterval: 15m # time interval to check and fill gaps in processed blocks
maxAllowedBlockHeightMismatch: 3 # maximum number of blocks that can be ahead of current highest block in blocktx, used for merkle roots verification
Expand Down
5 changes: 2 additions & 3 deletions internal/blocktx/mq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
)

const (
MinedTxsTopic = "mined-txs"
RegisterTxTopic = "register-tx"
RequestTxTopic = "request-tx"
MinedTxsTopic = "mined-txs"
RequestTxTopic = "request-tx"
)

type MessageQueueClient interface {
Expand Down
117 changes: 30 additions & 87 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ import (
)

var (
ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to register topic")
ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to request-tx topic")
ErrFailedToCreateBUMP = errors.New("failed to create new bump for tx hash from merkle tree and index")
ErrFailedToGetStringFromBUMPHex = errors.New("failed to get string from bump for tx hash")
ErrFailedToInsertBlockTransactions = errors.New("failed to insert block transactions")
ErrFailedToPublishMinedTransaction = errors.New("failed to publish mined transactions")
)

const (
transactionStoringBatchsizeDefault = 8192 // power of 2 for easier memory allocation
maxRequestBlocks = 10
maxBlocksInProgress = 1
registerTxsIntervalDefault = time.Second * 10
registerRequestTxsIntervalDefault = time.Second * 5
registerTxsBatchSizeDefault = 100
registerRequestTxBatchSizeDefault = 100
requestTxsIntervalDefault = time.Second * 5
requestTxBatchSizeDefault = 100
waitForBlockProcessing = 5 * time.Minute
)

Expand All @@ -52,12 +51,9 @@ type Processor struct {
transactionStorageBatchSize int
dataRetentionDays int
mqClient MessageQueueClient
registerTxsChan chan []byte
requestTxChannel chan []byte
registerTxsInterval time.Duration
registerRequestTxsInterval time.Duration
registerTxsBatchSize int
registerRequestTxsBatchSize int
requestTxsInterval time.Duration
requestTxsBatchSize int
tracingEnabled bool
tracingAttributes []attribute.KeyValue
processGuardsMap sync.Map
Expand Down Expand Up @@ -89,10 +85,8 @@ func NewProcessor(
blockRequestCh: blockRequestCh,
blockProcessCh: blockProcessCh,
transactionStorageBatchSize: transactionStoringBatchsizeDefault,
registerTxsInterval: registerTxsIntervalDefault,
registerRequestTxsInterval: registerRequestTxsIntervalDefault,
registerTxsBatchSize: registerTxsBatchSizeDefault,
registerRequestTxsBatchSize: registerRequestTxBatchSizeDefault,
requestTxsInterval: requestTxsIntervalDefault,
requestTxsBatchSize: requestTxBatchSizeDefault,
hostname: hostname,
stats: newProcessorStats(),
statCollectionInterval: statCollectionIntervalDefault,
Expand All @@ -112,15 +106,7 @@ func NewProcessor(
}

func (p *Processor) Start() error {
err := p.mqClient.Subscribe(RegisterTxTopic, func(msg []byte) error {
p.registerTxsChan <- msg
return nil
})
if err != nil {
return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RegisterTxTopic), err)
}

err = p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error {
err := p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error {
p.requestTxChannel <- msg
return nil
})
Expand All @@ -130,7 +116,6 @@ func (p *Processor) Start() error {

p.StartBlockRequesting()
p.StartBlockProcessing()
p.StartProcessRegisterTxs()
p.StartProcessRequestTxs()

return nil
Expand Down Expand Up @@ -287,47 +272,12 @@ func (p *Processor) unlockBlock(ctx context.Context, hash *chainhash.Hash) {
}
}

func (p *Processor) StartProcessRegisterTxs() {
p.waitGroup.Add(1)
txHashes := make([][]byte, 0, p.registerTxsBatchSize)

ticker := time.NewTicker(p.registerTxsInterval)
go func() {
defer p.waitGroup.Done()
for {
select {
case <-p.ctx.Done():
return
case txHash := <-p.registerTxsChan:
txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerTxsBatchSize {
continue
}

p.registerTransactions(txHashes[:])
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)

case <-ticker.C:
if len(txHashes) == 0 {
continue
}

p.registerTransactions(txHashes[:])
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)
}
}
}()
}

func (p *Processor) StartProcessRequestTxs() {
p.waitGroup.Add(1)

txHashes := make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize)
txHashes := make([][]byte, 0, p.requestTxsBatchSize)

ticker := time.NewTicker(p.registerRequestTxsInterval)
ticker := time.NewTicker(p.requestTxsInterval)

go func() {
defer p.waitGroup.Done()
Expand All @@ -337,15 +287,15 @@ func (p *Processor) StartProcessRequestTxs() {
case <-p.ctx.Done():
return
case txHash := <-p.requestTxChannel:
tx, err := chainhash.NewHash(txHash)
_, err := chainhash.NewHash(txHash)
if err != nil {
p.logger.Error("Failed to create hash from byte array", slog.String("err", err.Error()))
continue
}

txHashes = append(txHashes, tx)
txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerRequestTxsBatchSize || len(txHashes) == 0 {
if len(txHashes) < p.requestTxsBatchSize {
continue
}

Expand All @@ -355,8 +305,8 @@ func (p *Processor) StartProcessRequestTxs() {
continue // retry, don't clear the txHashes slice
}

txHashes = make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize)
ticker.Reset(p.registerRequestTxsInterval)
txHashes = make([][]byte, 0, p.requestTxsBatchSize)
ticker.Reset(p.requestTxsInterval)

case <-ticker.C:
if len(txHashes) == 0 {
Expand All @@ -366,54 +316,47 @@ func (p *Processor) StartProcessRequestTxs() {
err := p.publishMinedTxs(txHashes)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
ticker.Reset(p.registerRequestTxsInterval)
ticker.Reset(p.requestTxsInterval)
continue // retry, don't clear the txHashes slice
}

txHashes = make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize)
ticker.Reset(p.registerRequestTxsInterval)
txHashes = make([][]byte, 0, p.requestTxsBatchSize)
ticker.Reset(p.requestTxsInterval)
}
}
}()
}

func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error {
minedTxs, err := p.store.GetMinedTransactions(p.ctx, txHashes)
func (p *Processor) publishMinedTxs(txHashes [][]byte) error {
minedTxs, err := p.store.UpsertAndGetMinedTransactions(p.ctx, txHashes)
if err != nil {
return fmt.Errorf("failed to get mined transactions: %v", err)
}

var publishErr error

for _, minedTx := range minedTxs {
txBlock := &blocktx_api.TransactionBlock{
TransactionHash: minedTx.TxHash,
BlockHash: minedTx.BlockHash,
BlockHeight: minedTx.BlockHeight,
MerklePath: minedTx.MerklePath,
}

err = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined transaction", slog.String("err", err.Error()))
publishErr = err
}
}

if err != nil {
return fmt.Errorf("failed to publish mined transactions: %v", err)
if publishErr != nil {
return ErrFailedToPublishMinedTransaction
}

return nil
}

func (p *Processor) registerTransactions(txHashes [][]byte) {
updatedTxs, err := p.store.RegisterTransactions(p.ctx, txHashes)
if err != nil {
p.logger.Error("failed to register transactions", slog.String("err", err.Error()))
}

if len(updatedTxs) > 0 {
err = p.publishMinedTxs(updatedTxs)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
}
}
}

func (p *Processor) buildMerkleTreeStoreChainHash(ctx context.Context, txids []*chainhash.Hash) []*chainhash.Hash {
_, span := tracing.StartTracing(ctx, "buildMerkleTreeStoreChainHash", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span, nil)
Expand Down
26 changes: 4 additions & 22 deletions internal/blocktx/processor_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,9 @@ func WithRetentionDays(dataRetentionDays int) func(handler *Processor) {
}
}

func WithRegisterTxsInterval(d time.Duration) func(handler *Processor) {
func WithRequestTxsInterval(d time.Duration) func(handler *Processor) {
return func(p *Processor) {
p.registerTxsInterval = d
}
}

func WithRegisterRequestTxsInterval(d time.Duration) func(handler *Processor) {
return func(p *Processor) {
p.registerRequestTxsInterval = d
}
}

func WithRegisterTxsChan(registerTxsChan chan []byte) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsChan = registerTxsChan
p.requestTxsInterval = d
}
}

Expand All @@ -49,15 +37,9 @@ func WithRequestTxChan(requestTxChannel chan []byte) func(handler *Processor) {
}
}

func WithRegisterTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsBatchSize = size
}
}

func WithRegisterRequestTxsBatchSize(size int) func(handler *Processor) {
func WithRequestTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerRequestTxsBatchSize = size
handler.requestTxsBatchSize = size
}
}

Expand Down
Loading

0 comments on commit c01af3e

Please sign in to comment.