Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: workflows #749

Merged
merged 8 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@

Provide a brief description of the changes you've made.

## Linked Issues / Tickets

Reference any related issues or tickets, e.g. "Closes #123".

## Testing Procedure

Describe the tests you've added or any testing steps you've taken.

- [ ] I have added new unit tests
- [ ] All tests pass locally
- [ ] I have tested manually in my local environment
Expand All @@ -18,5 +12,4 @@ Describe the tests you've added or any testing steps you've taken.

- [ ] I have performed a self-review of my own code
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have updated `CHANGELOG.md` with my changes
4 changes: 1 addition & 3 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
name: Go

on:
push:
branches: ["main"]
pull_request:
branches: ["main", "reorg-support"] # TODO: to be removed before merge to main
branches: ["**"]

jobs:
build:
Expand Down
11 changes: 5 additions & 6 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import (

"go.opentelemetry.io/otel/attribute"

"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"

"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/bitcoin-sv/arc/internal/version"
)

Expand Down
7 changes: 0 additions & 7 deletions internal/api/handler/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,13 +851,6 @@ func (ArcDefaultHandler) handleError(_ context.Context, transaction *sdkTx.Trans
return status, arcError
}

// ContextKey type.
type ContextKey int

const (
ContextSizings ContextKey = iota
)

// PtrTo returns a pointer to the given value.
func PtrTo[T any](v T) *T {
return &v
Expand Down
20 changes: 15 additions & 5 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocktx
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -916,28 +917,37 @@ func (p *Processor) calculateMerklePaths(ctx context.Context, txs []store.BlockT

// gather all transactions with missing merkle paths for each block in a map
// to avoid getting all transaction from the same block multiple times
blockTxsMap := make(map[string][]store.BlockTransactionWithMerklePath, 0)
blockTxsMap := make(map[string][]store.BlockTransactionWithMerklePath)

for _, tx := range txs {
blockTxsMap[string(tx.BlockHash)] = append(blockTxsMap[string(tx.BlockHash)], store.BlockTransactionWithMerklePath{
blockTransactionWithMerklePath := store.BlockTransactionWithMerklePath{
BlockTransaction: store.BlockTransaction{
TxHash: tx.TxHash,
BlockHash: tx.BlockHash,
BlockHeight: tx.BlockHeight,
MerkleTreeIndex: tx.MerkleTreeIndex,
BlockStatus: tx.BlockStatus,
},
})
}

blockTxsMap[hex.EncodeToString(tx.BlockHash)] = append(blockTxsMap[hex.EncodeToString(tx.BlockHash)], blockTransactionWithMerklePath)
}

for _, blockTxs := range blockTxsMap {
blockHash := blockTxs[0].BlockHash
for bh, blockTxs := range blockTxsMap {
blockHash, err := hex.DecodeString(bh)
if err != nil {
return nil, err
}

txHashes, err := p.store.GetBlockTransactionsHashes(ctx, blockHash)
if err != nil {
return nil, errors.Join(ErrFailedToGetBlockTransactions, fmt.Errorf("block hash %s", getHashStringNoErr(blockHash)), err)
}

if len(txHashes) == 0 {
continue
}

merkleTree := bc.BuildMerkleTreeStoreChainHash(txHashes)

for _, tx := range blockTxs {
Expand Down
42 changes: 21 additions & 21 deletions internal/blocktx/processor_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,61 +7,61 @@ import (
"go.opentelemetry.io/otel/attribute"
)

func WithMessageQueueClient(mqClient MessageQueueClient) func(handler *Processor) {
func WithMessageQueueClient(mqClient MessageQueueClient) func(*Processor) {
return func(p *Processor) {
p.mqClient = mqClient
}
}

func WithTransactionBatchSize(size int) func(handler *Processor) {
func WithTransactionBatchSize(size int) func(*Processor) {
return func(p *Processor) {
p.transactionStorageBatchSize = size
}
}

func WithRetentionDays(dataRetentionDays int) func(handler *Processor) {
func WithRetentionDays(dataRetentionDays int) func(*Processor) {
return func(p *Processor) {
p.dataRetentionDays = dataRetentionDays
}
}

func WithRegisterTxsInterval(d time.Duration) func(handler *Processor) {
func WithRegisterTxsInterval(d time.Duration) func(*Processor) {
return func(p *Processor) {
p.registerTxsInterval = d
}
}

func WithRegisterRequestTxsInterval(d time.Duration) func(handler *Processor) {
func WithRegisterRequestTxsInterval(d time.Duration) func(*Processor) {
return func(p *Processor) {
p.registerRequestTxsInterval = d
}
}

func WithRegisterTxsChan(registerTxsChan chan []byte) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsChan = registerTxsChan
func WithRegisterTxsChan(registerTxsChan chan []byte) func(*Processor) {
return func(processor *Processor) {
processor.registerTxsChan = registerTxsChan
}
}

func WithRequestTxChan(requestTxChannel chan []byte) func(handler *Processor) {
return func(handler *Processor) {
handler.requestTxChannel = requestTxChannel
func WithRequestTxChan(requestTxChannel chan []byte) func(*Processor) {
return func(processor *Processor) {
processor.requestTxChannel = requestTxChannel
}
}

func WithRegisterTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsBatchSize = size
func WithRegisterTxsBatchSize(size int) func(*Processor) {
return func(processor *Processor) {
processor.registerTxsBatchSize = size
}
}

func WithRegisterRequestTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerRequestTxsBatchSize = size
func WithRegisterRequestTxsBatchSize(size int) func(*Processor) {
return func(processor *Processor) {
processor.registerRequestTxsBatchSize = size
}
}

func WithTracer(attr ...attribute.KeyValue) func(s *Processor) {
func WithTracer(attr ...attribute.KeyValue) func(*Processor) {
return func(p *Processor) {
p.tracingEnabled = true
if len(attr) > 0 {
Expand All @@ -74,8 +74,8 @@ func WithTracer(attr ...attribute.KeyValue) func(s *Processor) {
}
}

func WithMaxBlockProcessingDuration(d time.Duration) func(handler *Processor) {
return func(handler *Processor) {
handler.maxBlockProcessingDuration = d
func WithMaxBlockProcessingDuration(d time.Duration) func(*Processor) {
return func(processor *Processor) {
processor.maxBlockProcessingDuration = d
}
}
1 change: 1 addition & 0 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func TestHandleBlock(t *testing.T) {
require.NoError(t, err)

blockMessage := &p2p.BlockMessage{
// Hash: testdata.Block1Hash,
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: tc.prevBlockHash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c Client) Publish(ctx context.Context, topic string, data []byte) (err err
}

func (c Client) PublishMarshal(ctx context.Context, topic string, m proto.Message) (err error) {
ctx, span := tracing.StartTracing(ctx, "Publish", c.tracingEnabled, c.tracingAttributes...)
ctx, span := tracing.StartTracing(ctx, "PublishMarshal", c.tracingEnabled, c.tracingAttributes...)
defer func() {
tracing.EndTracing(span, err)
}()
Expand Down
2 changes: 1 addition & 1 deletion internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques

// register transaction in blocktx using message queue
if err = p.mqClient.Publish(ctx, RegisterTxTopic, req.Data.Hash[:]); err != nil {
p.logger.Error("failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error()))
p.logger.Error("Failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error()))
}

// broadcast that transaction is stored to client
Expand Down
10 changes: 5 additions & 5 deletions internal/validator/default/default_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func (v *DefaultValidator) ValidateTransaction(ctx context.Context, tx *sdkTx.Tr
// 11) Reject if transaction fee would be too low (minRelayTxFee) to get into an empty block.
switch feeValidation {
case validator.StandardFeeValidation:
if err = standardCheckFees(tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee)); err != nil {
if err = checkStandardFees(tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee)); err != nil {
return err
}
case validator.CumulativeFeeValidation:
if err = cumulativeCheckFees(ctx, v.txFinder, tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee), tracingEnabled, tracingAttributes...); err != nil {
if err = checkCumulativeFees(ctx, v.txFinder, tx, internalApi.FeesToFeeModel(v.policy.MinMiningTxFee), tracingEnabled, tracingAttributes...); err != nil {
return err
}
case validator.NoneFeeValidation:
Expand Down Expand Up @@ -107,7 +107,7 @@ func isExtended(tx *sdkTx.Transaction) bool {
return true
}

func standardCheckFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validator.Error {
func checkStandardFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validator.Error {
feesOK, expFeesPaid, actualFeePaid, err := isFeePaidEnough(feeModel, tx)
if err != nil {
return validator.NewError(err, api.ErrStatusFees)
Expand All @@ -121,9 +121,9 @@ func standardCheckFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validato
return nil
}

func cumulativeCheckFees(ctx context.Context, txFinder validator.TxFinderI, tx *sdkTx.Transaction, feeModel *fees.SatoshisPerKilobyte, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) (vErr *validator.Error) {
func checkCumulativeFees(ctx context.Context, txFinder validator.TxFinderI, tx *sdkTx.Transaction, feeModel *fees.SatoshisPerKilobyte, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) (vErr *validator.Error) {
var spanErr error
ctx, span := tracing.StartTracing(ctx, "cumulativeCheckFees", tracingEnabled, tracingAttributes...)
ctx, span := tracing.StartTracing(ctx, "checkCumulativeFees", tracingEnabled, tracingAttributes...)
defer func() {
if vErr != nil {
spanErr = vErr.Err
Expand Down
8 changes: 4 additions & 4 deletions internal/validator/default/default_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ func TestStandardCheckFees(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := standardCheckFees(tt.args.tx, tt.args.feeModel); (err != nil) != tt.wantErr {
t.Errorf("standardCheckFees() error = %v, wantErr %v", err, tt.wantErr)
if err := checkStandardFees(tt.args.tx, tt.args.feeModel); (err != nil) != tt.wantErr {
t.Errorf("checkStandardFees() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
Expand All @@ -248,7 +248,7 @@ func TestStandardCheckFeesTxs(t *testing.T) {
sut := &fees.SatoshisPerKilobyte{Satoshis: 50}

// when
actualError := standardCheckFees(tx, sut)
actualError := checkStandardFees(tx, sut)

// then
require.Nil(t, actualError)
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestCumulativeCheckFees(t *testing.T) {
require.NoError(t, err)

// when
actualError := cumulativeCheckFees(context.TODO(), txFinder, tx, tc.feeModel, false)
actualError := checkCumulativeFees(context.TODO(), txFinder, tx, tc.feeModel, false)

// then
if tc.expectedErr == nil {
Expand Down
Loading