diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 5d75d5822..31bc21942 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -2,14 +2,8 @@ Provide a brief description of the changes you've made. -## Linked Issues / Tickets - -Reference any related issues or tickets, e.g. "Closes #123". - ## Testing Procedure -Describe the tests you've added or any testing steps you've taken. - - [ ] I have added new unit tests - [ ] All tests pass locally - [ ] I have tested manually in my local environment @@ -18,5 +12,4 @@ Describe the tests you've added or any testing steps you've taken. - [ ] I have performed a self-review of my own code - [ ] I have made corresponding changes to the documentation -- [ ] My changes generate no new warnings - [ ] I have updated `CHANGELOG.md` with my changes diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 061c6d3a8..e2d445d5c 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -4,10 +4,8 @@ name: Go on: - push: - branches: ["main"] pull_request: - branches: ["main", "reorg-support"] # TODO: to be removed before merge to main + branches: ["**"] jobs: build: diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index cf6c44389..c3183d95e 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -8,18 +8,17 @@ import ( "go.opentelemetry.io/otel/attribute" - "github.com/bitcoin-sv/arc/internal/grpc_opts" - "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core" - "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream" - "github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection" - "github.com/bitcoin-sv/arc/internal/tracing" "github.com/libsv/go-p2p" "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" - + "github.com/bitcoin-sv/arc/internal/grpc_opts" + "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core" + "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream" + "github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection" + "github.com/bitcoin-sv/arc/internal/tracing" "github.com/bitcoin-sv/arc/internal/version" ) diff --git a/internal/api/handler/default.go b/internal/api/handler/default.go index 594f8e667..f8cb061e3 100644 --- a/internal/api/handler/default.go +++ b/internal/api/handler/default.go @@ -851,13 +851,6 @@ func (ArcDefaultHandler) handleError(_ context.Context, transaction *sdkTx.Trans return status, arcError } -// ContextKey type. -type ContextKey int - -const ( - ContextSizings ContextKey = iota -) - // PtrTo returns a pointer to the given value. func PtrTo[T any](v T) *T { return &v diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 36c3613cb..f463c5db9 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -3,6 +3,7 @@ package blocktx import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "log/slog" @@ -916,10 +917,10 @@ func (p *Processor) calculateMerklePaths(ctx context.Context, txs []store.BlockT // gather all transactions with missing merkle paths for each block in a map // to avoid getting all transaction from the same block multiple times - blockTxsMap := make(map[string][]store.BlockTransactionWithMerklePath, 0) + blockTxsMap := make(map[string][]store.BlockTransactionWithMerklePath) for _, tx := range txs { - blockTxsMap[string(tx.BlockHash)] = append(blockTxsMap[string(tx.BlockHash)], store.BlockTransactionWithMerklePath{ + blockTransactionWithMerklePath := store.BlockTransactionWithMerklePath{ BlockTransaction: store.BlockTransaction{ TxHash: tx.TxHash, BlockHash: tx.BlockHash, @@ -927,17 +928,26 @@ func (p *Processor) calculateMerklePaths(ctx context.Context, txs []store.BlockT MerkleTreeIndex: tx.MerkleTreeIndex, BlockStatus: tx.BlockStatus, }, - }) + } + + blockTxsMap[hex.EncodeToString(tx.BlockHash)] = append(blockTxsMap[hex.EncodeToString(tx.BlockHash)], blockTransactionWithMerklePath) } - for _, blockTxs := range blockTxsMap { - blockHash := blockTxs[0].BlockHash + for bh, blockTxs := range blockTxsMap { + blockHash, err := hex.DecodeString(bh) + if err != nil { + return nil, err + } txHashes, err := p.store.GetBlockTransactionsHashes(ctx, blockHash) if err != nil { return nil, errors.Join(ErrFailedToGetBlockTransactions, fmt.Errorf("block hash %s", getHashStringNoErr(blockHash)), err) } + if len(txHashes) == 0 { + continue + } + merkleTree := bc.BuildMerkleTreeStoreChainHash(txHashes) for _, tx := range blockTxs { diff --git a/internal/blocktx/processor_opts.go b/internal/blocktx/processor_opts.go index fe0ac49ed..29b9291db 100644 --- a/internal/blocktx/processor_opts.go +++ b/internal/blocktx/processor_opts.go @@ -7,61 +7,61 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func WithMessageQueueClient(mqClient MessageQueueClient) func(handler *Processor) { +func WithMessageQueueClient(mqClient MessageQueueClient) func(*Processor) { return func(p *Processor) { p.mqClient = mqClient } } -func WithTransactionBatchSize(size int) func(handler *Processor) { +func WithTransactionBatchSize(size int) func(*Processor) { return func(p *Processor) { p.transactionStorageBatchSize = size } } -func WithRetentionDays(dataRetentionDays int) func(handler *Processor) { +func WithRetentionDays(dataRetentionDays int) func(*Processor) { return func(p *Processor) { p.dataRetentionDays = dataRetentionDays } } -func WithRegisterTxsInterval(d time.Duration) func(handler *Processor) { +func WithRegisterTxsInterval(d time.Duration) func(*Processor) { return func(p *Processor) { p.registerTxsInterval = d } } -func WithRegisterRequestTxsInterval(d time.Duration) func(handler *Processor) { +func WithRegisterRequestTxsInterval(d time.Duration) func(*Processor) { return func(p *Processor) { p.registerRequestTxsInterval = d } } -func WithRegisterTxsChan(registerTxsChan chan []byte) func(handler *Processor) { - return func(handler *Processor) { - handler.registerTxsChan = registerTxsChan +func WithRegisterTxsChan(registerTxsChan chan []byte) func(*Processor) { + return func(processor *Processor) { + processor.registerTxsChan = registerTxsChan } } -func WithRequestTxChan(requestTxChannel chan []byte) func(handler *Processor) { - return func(handler *Processor) { - handler.requestTxChannel = requestTxChannel +func WithRequestTxChan(requestTxChannel chan []byte) func(*Processor) { + return func(processor *Processor) { + processor.requestTxChannel = requestTxChannel } } -func WithRegisterTxsBatchSize(size int) func(handler *Processor) { - return func(handler *Processor) { - handler.registerTxsBatchSize = size +func WithRegisterTxsBatchSize(size int) func(*Processor) { + return func(processor *Processor) { + processor.registerTxsBatchSize = size } } -func WithRegisterRequestTxsBatchSize(size int) func(handler *Processor) { - return func(handler *Processor) { - handler.registerRequestTxsBatchSize = size +func WithRegisterRequestTxsBatchSize(size int) func(*Processor) { + return func(processor *Processor) { + processor.registerRequestTxsBatchSize = size } } -func WithTracer(attr ...attribute.KeyValue) func(s *Processor) { +func WithTracer(attr ...attribute.KeyValue) func(*Processor) { return func(p *Processor) { p.tracingEnabled = true if len(attr) > 0 { @@ -74,8 +74,8 @@ func WithTracer(attr ...attribute.KeyValue) func(s *Processor) { } } -func WithMaxBlockProcessingDuration(d time.Duration) func(handler *Processor) { - return func(handler *Processor) { - handler.maxBlockProcessingDuration = d +func WithMaxBlockProcessingDuration(d time.Duration) func(*Processor) { + return func(processor *Processor) { + processor.maxBlockProcessingDuration = d } } diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index d226fc794..c1330b5d9 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -205,6 +205,7 @@ func TestHandleBlock(t *testing.T) { require.NoError(t, err) blockMessage := &p2p.BlockMessage{ + // Hash: testdata.Block1Hash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: tc.prevBlockHash, diff --git a/internal/message_queue/nats/client/nats_core/nats_core_client.go b/internal/message_queue/nats/client/nats_core/nats_core_client.go index e2cb1e4a2..1eabeed54 100644 --- a/internal/message_queue/nats/client/nats_core/nats_core_client.go +++ b/internal/message_queue/nats/client/nats_core/nats_core_client.go @@ -91,7 +91,7 @@ func (c Client) Publish(ctx context.Context, topic string, data []byte) (err err } func (c Client) PublishMarshal(ctx context.Context, topic string, m proto.Message) (err error) { - ctx, span := tracing.StartTracing(ctx, "Publish", c.tracingEnabled, c.tracingAttributes...) + ctx, span := tracing.StartTracing(ctx, "PublishMarshal", c.tracingEnabled, c.tracingAttributes...) defer func() { tracing.EndTracing(span, err) }() diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 827daea91..34a3da922 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -798,7 +798,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques // register transaction in blocktx using message queue if err = p.mqClient.Publish(ctx, RegisterTxTopic, req.Data.Hash[:]); err != nil { - p.logger.Error("failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error())) + p.logger.Error("Failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error())) } // broadcast that transaction is stored to client diff --git a/internal/validator/default/default_validator.go b/internal/validator/default/default_validator.go index 74c4c64e4..6d0097e2b 100644 --- a/internal/validator/default/default_validator.go +++ b/internal/validator/default/default_validator.go @@ -62,11 +62,11 @@ func (v *DefaultValidator) ValidateTransaction(ctx context.Context, tx *sdkTx.Tr // 11) Reject if transaction fee would be too low (minRelayTxFee) to get into an empty block. switch feeValidation { case validator.StandardFeeValidation: - if err = standardCheckFees(tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee)); err != nil { + if err = checkStandardFees(tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee)); err != nil { return err } case validator.CumulativeFeeValidation: - if err = cumulativeCheckFees(ctx, v.txFinder, tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee), tracingEnabled, tracingAttributes...); err != nil { + if err = checkCumulativeFees(ctx, v.txFinder, tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee), tracingEnabled, tracingAttributes...); err != nil { return err } case validator.NoneFeeValidation: @@ -107,7 +107,7 @@ func isExtended(tx *sdkTx.Transaction) bool { return true } -func standardCheckFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validator.Error { +func checkStandardFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validator.Error { feesOK, expFeesPaid, actualFeePaid, err := isFeePaidEnough(feeModel, tx) if err != nil { return validator.NewError(err, api.ErrStatusFees) @@ -121,9 +121,9 @@ func standardCheckFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validato return nil } -func cumulativeCheckFees(ctx context.Context, txFinder validator.TxFinderI, tx *sdkTx.Transaction, feeModel *fees.SatoshisPerKilobyte, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) (vErr *validator.Error) { +func checkCumulativeFees(ctx context.Context, txFinder validator.TxFinderI, tx *sdkTx.Transaction, feeModel *fees.SatoshisPerKilobyte, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) (vErr *validator.Error) { var spanErr error - ctx, span := tracing.StartTracing(ctx, "cumulativeCheckFees", tracingEnabled, tracingAttributes...) + ctx, span := tracing.StartTracing(ctx, "checkCumulativeFees", tracingEnabled, tracingAttributes...) defer func() { if vErr != nil { spanErr = vErr.Err diff --git a/internal/validator/default/default_validator_test.go b/internal/validator/default/default_validator_test.go index 89422795b..449968c56 100644 --- a/internal/validator/default/default_validator_test.go +++ b/internal/validator/default/default_validator_test.go @@ -233,8 +233,8 @@ func TestStandardCheckFees(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := standardCheckFees(tt.args.tx, tt.args.feeModel); (err != nil) != tt.wantErr { - t.Errorf("standardCheckFees() error = %v, wantErr %v", err, tt.wantErr) + if err := checkStandardFees(tt.args.tx, tt.args.feeModel); (err != nil) != tt.wantErr { + t.Errorf("checkStandardFees() error = %v, wantErr %v", err, tt.wantErr) } }) } @@ -248,7 +248,7 @@ func TestStandardCheckFeesTxs(t *testing.T) { sut := &fees.SatoshisPerKilobyte{Satoshis: 50} // when - actualError := standardCheckFees(tx, sut) + actualError := checkStandardFees(tx, sut) // then require.Nil(t, actualError) @@ -438,7 +438,7 @@ func TestCumulativeCheckFees(t *testing.T) { require.NoError(t, err) // when - actualError := cumulativeCheckFees(context.TODO(), txFinder, tx, tc.feeModel, false) + actualError := checkCumulativeFees(context.TODO(), txFinder, tx, tc.feeModel, false) // then if tc.expectedErr == nil {