Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ARCO-299): merge register and request transactions in blocktx #687

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to create blocktx store: %v", err)
}

registerTxsChan := make(chan []byte, chanBufferSize)
requestTxChannel := make(chan []byte, chanBufferSize)

natsConnection, err := nats_connection.New(arcConfig.MessageQueue.URL, logger)
Expand All @@ -94,7 +93,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
}

if arcConfig.MessageQueue.Streaming.Enabled {
opts := []nats_jetstream.Option{nats_jetstream.WithSubscribedTopics(blocktx.RegisterTxTopic, blocktx.RequestTxTopic)}
opts := []nats_jetstream.Option{nats_jetstream.WithSubscribedTopics(blocktx.RequestTxTopic)}
if arcConfig.MessageQueue.Streaming.FileStorage {
opts = append(opts, nats_jetstream.WithFileStorage())
}
Expand All @@ -104,7 +103,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
}

mqClient, err = nats_jetstream.New(natsConnection, logger,
[]string{blocktx.MinedTxsTopic, blocktx.RegisterTxTopic, blocktx.RequestTxTopic},
[]string{blocktx.MinedTxsTopic, blocktx.RequestTxTopic},
opts...,
)
if err != nil {
Expand All @@ -121,9 +120,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err

processorOpts = append(processorOpts,
blocktx.WithRetentionDays(btxConfig.RecordRetentionDays),
blocktx.WithRegisterTxsChan(registerTxsChan),
blocktx.WithRequestTxChan(requestTxChannel),
blocktx.WithRegisterTxsInterval(btxConfig.RegisterTxsInterval),
blocktx.WithRequestTxsInterval(btxConfig.RequestTxsInterval),
blocktx.WithMessageQueueClient(mqClient),
)

Expand Down
3 changes: 1 addition & 2 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
}

mqClient, err = nats_jetstream.New(natsClient, logger,
[]string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RegisterTxTopic, metamorph.RequestTxTopic},
[]string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RequestTxTopic},
opts...,
)
if err != nil {
Expand Down Expand Up @@ -196,7 +196,6 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

server, err = metamorph.NewServer(arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, logger,
metamorphStore, processor, arcConfig.Tracing, optsServer...)

if err != nil {
stopFn()
return nil, fmt.Errorf("create GRPCServer failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type BlocktxConfig struct {
HealthServerDialAddr string `mapstructure:"healthServerDialAddr"`
Db *DbConfig `mapstructure:"db"`
RecordRetentionDays int `mapstructure:"recordRetentionDays"`
RegisterTxsInterval time.Duration `mapstructure:"registerTxsInterval"`
RequestTxsInterval time.Duration `mapstructure:"requestTxsInterval"`
MonitorPeers bool `mapstructure:"monitorPeers"`
FillGapsInterval time.Duration `mapstructure:"fillGapsInterval"`
MaxAllowedBlockHeightMismatch int `mapstructure:"maxAllowedBlockHeightMismatch"`
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func getBlocktxConfig() *BlocktxConfig {
HealthServerDialAddr: "localhost:8006",
Db: getDbConfig("blocktx"),
RecordRetentionDays: 28,
RegisterTxsInterval: 10 * time.Second,
RequestTxsInterval: 10 * time.Second,
MonitorPeers: false,
FillGapsInterval: 15 * time.Minute,
MaxAllowedBlockHeightMismatch: 3,
Expand Down
2 changes: 1 addition & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ blocktx:
maxOpenConns: 80 # maximum open connections
sslMode: disable
recordRetentionDays: 28 # number of days for which data integrity is ensured
registerTxsInterval: 10s # time interval to read from the channel registered transactions
requestTxsInterval: 10s # time interval to register and request mined transactions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep here the name registerTxsInterval and also topic RegisterTxTopic = "register-tx" and drop requestTxsInterval instead because a new tx is always registered. So that is kind a core functionality.

The functionality for requesting a tx because it wasn't mined for too long is rather an exception in case that e.g. blocktx was failing for some reason. So that is like an auxiliary functionality.

monitorPeers: false # if enabled, peers which do not receive alive signal from nodes will be restarted
fillGapsInterval: 15m # time interval to check and fill gaps in processed blocks
maxAllowedBlockHeightMismatch: 3 # maximum number of blocks that can be ahead of current highest block in blocktx, used for merkle roots verification
Expand Down
5 changes: 2 additions & 3 deletions internal/blocktx/mq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
)

const (
MinedTxsTopic = "mined-txs"
RegisterTxTopic = "register-tx"
RequestTxTopic = "request-tx"
MinedTxsTopic = "mined-txs"
RequestTxTopic = "request-tx"
)

type MessageQueueClient interface {
Expand Down
117 changes: 30 additions & 87 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ import (
)

var (
ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to register topic")
ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to request-tx topic")
ErrFailedToCreateBUMP = errors.New("failed to create new bump for tx hash from merkle tree and index")
ErrFailedToGetStringFromBUMPHex = errors.New("failed to get string from bump for tx hash")
ErrFailedToInsertBlockTransactions = errors.New("failed to insert block transactions")
ErrFailedToPublishMinedTransaction = errors.New("failed to publish mined transactions")
)

const (
transactionStoringBatchsizeDefault = 8192 // power of 2 for easier memory allocation
maxRequestBlocks = 10
maxBlocksInProgress = 1
registerTxsIntervalDefault = time.Second * 10
registerRequestTxsIntervalDefault = time.Second * 5
registerTxsBatchSizeDefault = 100
registerRequestTxBatchSizeDefault = 100
requestTxsIntervalDefault = time.Second * 5
requestTxBatchSizeDefault = 100
waitForBlockProcessing = 5 * time.Minute
)

Expand All @@ -52,12 +51,9 @@ type Processor struct {
transactionStorageBatchSize int
dataRetentionDays int
mqClient MessageQueueClient
registerTxsChan chan []byte
requestTxChannel chan []byte
registerTxsInterval time.Duration
registerRequestTxsInterval time.Duration
registerTxsBatchSize int
registerRequestTxsBatchSize int
requestTxsInterval time.Duration
requestTxsBatchSize int
tracingEnabled bool
tracingAttributes []attribute.KeyValue
processGuardsMap sync.Map
Expand Down Expand Up @@ -89,10 +85,8 @@ func NewProcessor(
blockRequestCh: blockRequestCh,
blockProcessCh: blockProcessCh,
transactionStorageBatchSize: transactionStoringBatchsizeDefault,
registerTxsInterval: registerTxsIntervalDefault,
registerRequestTxsInterval: registerRequestTxsIntervalDefault,
registerTxsBatchSize: registerTxsBatchSizeDefault,
registerRequestTxsBatchSize: registerRequestTxBatchSizeDefault,
requestTxsInterval: requestTxsIntervalDefault,
requestTxsBatchSize: requestTxBatchSizeDefault,
hostname: hostname,
stats: newProcessorStats(),
statCollectionInterval: statCollectionIntervalDefault,
Expand All @@ -112,15 +106,7 @@ func NewProcessor(
}

func (p *Processor) Start() error {
err := p.mqClient.Subscribe(RegisterTxTopic, func(msg []byte) error {
p.registerTxsChan <- msg
return nil
})
if err != nil {
return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RegisterTxTopic), err)
}

err = p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error {
err := p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error {
p.requestTxChannel <- msg
return nil
})
Expand All @@ -130,7 +116,6 @@ func (p *Processor) Start() error {

p.StartBlockRequesting()
p.StartBlockProcessing()
p.StartProcessRegisterTxs()
p.StartProcessRequestTxs()

return nil
Expand Down Expand Up @@ -287,47 +272,12 @@ func (p *Processor) unlockBlock(ctx context.Context, hash *chainhash.Hash) {
}
}

func (p *Processor) StartProcessRegisterTxs() {
p.waitGroup.Add(1)
txHashes := make([][]byte, 0, p.registerTxsBatchSize)

ticker := time.NewTicker(p.registerTxsInterval)
go func() {
defer p.waitGroup.Done()
for {
select {
case <-p.ctx.Done():
return
case txHash := <-p.registerTxsChan:
txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerTxsBatchSize {
continue
}

p.registerTransactions(txHashes[:])
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)

case <-ticker.C:
if len(txHashes) == 0 {
continue
}

p.registerTransactions(txHashes[:])
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)
}
}
}()
}

func (p *Processor) StartProcessRequestTxs() {
p.waitGroup.Add(1)

txHashes := make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize)
txHashes := make([][]byte, 0, p.requestTxsBatchSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


ticker := time.NewTicker(p.registerRequestTxsInterval)
ticker := time.NewTicker(p.requestTxsInterval)

go func() {
defer p.waitGroup.Done()
Expand All @@ -337,15 +287,15 @@ func (p *Processor) StartProcessRequestTxs() {
case <-p.ctx.Done():
return
case txHash := <-p.requestTxChannel:
tx, err := chainhash.NewHash(txHash)
_, err := chainhash.NewHash(txHash)
if err != nil {
p.logger.Error("Failed to create hash from byte array", slog.String("err", err.Error()))
continue
}

txHashes = append(txHashes, tx)
txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerRequestTxsBatchSize || len(txHashes) == 0 {
if len(txHashes) < p.requestTxsBatchSize {
continue
}

Expand All @@ -355,8 +305,8 @@ func (p *Processor) StartProcessRequestTxs() {
continue // retry, don't clear the txHashes slice
}

txHashes = make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize)
ticker.Reset(p.registerRequestTxsInterval)
txHashes = make([][]byte, 0, p.requestTxsBatchSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
txHashes = make([][]byte, 0, p.requestTxsBatchSize)
txHashes = txHashes [:0]

ticker.Reset(p.requestTxsInterval)

case <-ticker.C:
if len(txHashes) == 0 {
Expand All @@ -366,54 +316,47 @@ func (p *Processor) StartProcessRequestTxs() {
err := p.publishMinedTxs(txHashes)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
ticker.Reset(p.registerRequestTxsInterval)
ticker.Reset(p.requestTxsInterval)
continue // retry, don't clear the txHashes slice
}

txHashes = make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize)
ticker.Reset(p.registerRequestTxsInterval)
txHashes = make([][]byte, 0, p.requestTxsBatchSize)
ticker.Reset(p.requestTxsInterval)
}
}
}()
}

func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error {
minedTxs, err := p.store.GetMinedTransactions(p.ctx, txHashes)
func (p *Processor) publishMinedTxs(txHashes [][]byte) error {
minedTxs, err := p.store.UpsertAndGetMinedTransactions(p.ctx, txHashes)
if err != nil {
return fmt.Errorf("failed to get mined transactions: %v", err)
}

var publishErr error

for _, minedTx := range minedTxs {
txBlock := &blocktx_api.TransactionBlock{
TransactionHash: minedTx.TxHash,
BlockHash: minedTx.BlockHash,
BlockHeight: minedTx.BlockHeight,
MerklePath: minedTx.MerklePath,
}

err = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock)
publishErr = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock)

if err != nil {
p.logger.Error("failed to publish mined transaction", slog.String("err", err.Error()))
publishErr = err
}
}

if err != nil {
return fmt.Errorf("failed to publish mined transactions: %v", err)
if publishErr != nil {
return ErrFailedToPublishMinedTransaction
}

return nil
}

func (p *Processor) registerTransactions(txHashes [][]byte) {
updatedTxs, err := p.store.RegisterTransactions(p.ctx, txHashes)
if err != nil {
p.logger.Error("failed to register transactions", slog.String("err", err.Error()))
}

if len(updatedTxs) > 0 {
err = p.publishMinedTxs(updatedTxs)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
}
}
}

func (p *Processor) buildMerkleTreeStoreChainHash(ctx context.Context, txids []*chainhash.Hash) []*chainhash.Hash {
_, span := tracing.StartTracing(ctx, "buildMerkleTreeStoreChainHash", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span, nil)
Expand Down
26 changes: 4 additions & 22 deletions internal/blocktx/processor_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,9 @@ func WithRetentionDays(dataRetentionDays int) func(handler *Processor) {
}
}

func WithRegisterTxsInterval(d time.Duration) func(handler *Processor) {
func WithRequestTxsInterval(d time.Duration) func(handler *Processor) {
return func(p *Processor) {
p.registerTxsInterval = d
}
}

func WithRegisterRequestTxsInterval(d time.Duration) func(handler *Processor) {
return func(p *Processor) {
p.registerRequestTxsInterval = d
}
}

func WithRegisterTxsChan(registerTxsChan chan []byte) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsChan = registerTxsChan
p.requestTxsInterval = d
}
}

Expand All @@ -49,15 +37,9 @@ func WithRequestTxChan(requestTxChannel chan []byte) func(handler *Processor) {
}
}

func WithRegisterTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsBatchSize = size
}
}

func WithRegisterRequestTxsBatchSize(size int) func(handler *Processor) {
func WithRequestTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerRequestTxsBatchSize = size
handler.requestTxsBatchSize = size
}
}

Expand Down
Loading