diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 2d637fd51..3df64f51d 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -4,14 +4,12 @@ import ( "context" "fmt" "log/slog" - "net/url" "os" "time" "go.opentelemetry.io/otel/attribute" "github.com/libsv/go-p2p" - "github.com/ordishs/go-bitcoin" "google.golang.org/grpc" "github.com/bitcoin-sv/arc/internal/cache" @@ -179,24 +177,6 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore return nil, fmt.Errorf("failed to start metamorph processor: %v", err) } - if mtmConfig.CheckUtxos { - peerRPC := arcConfig.PeerRPC - - rpcURL, err := url.Parse(fmt.Sprintf("rpc://%s:%s@%s:%d", peerRPC.User, peerRPC.Password, peerRPC.Host, peerRPC.Port)) - if err != nil { - stopFn() - return nil, fmt.Errorf("failed to parse rpc URL: %v", err) - } - - node, err := bitcoin.NewFromURL(rpcURL, false) - if err != nil { - stopFn() - return nil, err - } - - optsServer = append(optsServer, metamorph.WithForceCheckUtxos(node)) - } - server, err = metamorph.NewServer(arcConfig.Prometheus.Endpoint, arcConfig.GrpcMessageSize, logger, metamorphStore, processor, arcConfig.Tracing, optsServer...) if err != nil { diff --git a/config/config.go b/config/config.go index 793bfe117..04426964d 100644 --- a/config/config.go +++ b/config/config.go @@ -109,7 +109,6 @@ type MetamorphConfig struct { ProcessStatusUpdateInterval time.Duration `mapstructure:"processStatusUpdateInterval"` RecheckSeen RecheckSeen `mapstructure:"recheckSeen"` MonitorPeers bool `mapstructure:"monitorPeers"` - CheckUtxos bool `mapstructure:"checkUtxos"` Health *HealthConfig `mapstructure:"health"` RejectCallbackContaining []string `mapstructure:"rejectCallbackContaining"` Stats *StatsConfig `mapstructure:"stats"` diff --git a/config/defaults.go b/config/defaults.go index 869d44492..e5047a70d 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -102,7 +102,6 @@ func getMetamorphConfig() *MetamorphConfig { UntilAgo: 1 * time.Hour, }, MonitorPeers: false, - CheckUtxos: false, Health: &HealthConfig{ SeverDialAddr: "localhost:8005", MinimumHealthyConnections: 2, diff --git a/config/example_config.yaml b/config/example_config.yaml index 6f6f20c6f..27ff1c80d 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -78,7 +78,6 @@ metamorph: fromAgo: 24h # start of moving time window untilAgo: 1h # end of moving time window monitorPeers: false # if enabled, peers which do not receive alive signal from nodes will be restarted - checkUtxos: false # force check each utxo for validity. If enabled ARC connects to bitcoin node using rpc for each utxo health: serverDialAddr: localhost:8005 # address at which the grpc health server is exposed minimumHealthyConnections: 2 # minimum number of healthy peer connections for the processor to be considered healthy diff --git a/go.mod b/go.mod index 8e8d332eb..33e70724a 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.31.0 - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/sync v0.10.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 @@ -156,13 +156,13 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.28.0 // indirect + golang.org/x/crypto v0.32.0 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/term v0.25.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/term v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.24.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect diff --git a/go.sum b/go.sum index ee2756a92..e993f8e85 100644 --- a/go.sum +++ b/go.sum @@ -213,8 +213,6 @@ github.com/libsv/go-bt v1.0.4 h1:2Css5lfomk/J97tM5Gk56Lp+tTK6xWYnmHNc/fGO6lE= github.com/libsv/go-bt v1.0.4/go.mod h1:AfXoLFYEbY/TvCq/84xTce2xGjPUuC5imokHmcykF2k= github.com/libsv/go-bt/v2 v2.2.5 h1:VoggBLMRW9NYoFujqe5bSYKqnw5y+fYfufgERSoubog= github.com/libsv/go-bt/v2 v2.2.5/go.mod h1:cV45+jDlPOLfhJLfpLmpQoWzrIvVth9Ao2ZO1f6CcqU= -github.com/libsv/go-p2p v0.3.2 h1:O32CzkqM+jhSuleRHJln6JjL2pKH8aaRTx8lAfhIiic= -github.com/libsv/go-p2p v0.3.2/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M= github.com/libsv/go-p2p v0.3.3 h1:5h+69MsGgFwQWyD8MEqyPeqbqKGRpKLzzOcI5cSLfgY= github.com/libsv/go-p2p v0.3.3/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M= github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw= @@ -408,8 +406,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -427,8 +425,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -439,17 +437,17 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/blocktx/background_workers.go b/internal/blocktx/background_workers.go index 7ef9cf22b..d2d4ee44d 100644 --- a/internal/blocktx/background_workers.go +++ b/internal/blocktx/background_workers.go @@ -6,13 +6,14 @@ import ( "sync" "time" - "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/libsv/go-p2p" + + "github.com/bitcoin-sv/arc/internal/blocktx/store" ) type BackgroundWorkers struct { - l *slog.Logger - s store.BlocktxStore + logger *slog.Logger + store store.BlocktxStore workersWg sync.WaitGroup ctx context.Context @@ -23,8 +24,8 @@ func NewBackgroundWorkers(store store.BlocktxStore, logger *slog.Logger) *Backgr ctx, cancel := context.WithCancel(context.Background()) return &BackgroundWorkers{ - s: store, - l: logger.With(slog.String("module", "background workers")), + store: store, + logger: logger.With(slog.String("module", "background workers")), ctx: ctx, cancelAll: cancel, @@ -32,12 +33,12 @@ func NewBackgroundWorkers(store store.BlocktxStore, logger *slog.Logger) *Backgr } func (w *BackgroundWorkers) GracefulStop() { - w.l.Info("Shutting down") + w.logger.Info("Shutting down") w.cancelAll() w.workersWg.Wait() - w.l.Info("Shutdown complete") + w.logger.Info("Shutdown complete") } func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- BlockRequest) { @@ -46,20 +47,20 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat go func() { defer w.workersWg.Done() - t := time.NewTicker(interval) + ticker := time.NewTicker(interval) i := 0 for { select { - case <-t.C: + case <-ticker.C: i = i % len(peers) err := w.fillGaps(peers[i], retentionDays, blockRequestingCh) if err != nil { - w.l.Error("failed to fill blocks gaps", slog.String("err", err.Error())) + w.logger.Error("failed to fill blocks gaps", slog.String("err", err.Error())) } i++ - t.Reset(interval) + ticker.Reset(interval) case <-w.ctx.Done(): return @@ -75,7 +76,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq ) heightRange := retentionDays * hoursPerDay * blocksPerHour - blockHeightGaps, err := w.s.GetBlockGaps(w.ctx, heightRange) + blockHeightGaps, err := w.store.GetBlockGaps(w.ctx, heightRange) if err != nil || len(blockHeightGaps) == 0 { return err } @@ -85,7 +86,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq break } - w.l.Info("adding request for missing block to request channel", + w.logger.Info("adding request for missing block to request channel", slog.String("hash", block.Hash.String()), slog.Uint64("height", block.Height), slog.String("peer", peer.String()), diff --git a/internal/metamorph/client.go b/internal/metamorph/client.go index 18054a98a..113a8118b 100644 --- a/internal/metamorph/client.go +++ b/internal/metamorph/client.go @@ -3,7 +3,6 @@ package metamorph import ( "context" "errors" - "fmt" "log/slog" "os" "runtime" @@ -213,7 +212,7 @@ func (m *Metamorph) GetTransactionStatus(ctx context.Context, txID string) (txSt return txStatus, nil } -// GetTransactionStatusეს gets the statusეს of all transactions. +// GetTransactionStatuses gets the status of all transactions. func (m *Metamorph) GetTransactionStatuses(ctx context.Context, txIDs []string) (txStatus []*TransactionStatus, err error) { ctx, span := tracing.StartTracing(ctx, "GetTransactionStatus", m.tracingEnabled, append(m.tracingAttributes, attribute.String("txIDs", txIDs[0]))...) defer func() { @@ -284,7 +283,7 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction deadline, _ := ctx.Deadline() // increase time to make sure that expiration happens from inside the metramorph function - newDeadline := deadline.Add(time.Second * 10) + newDeadline := deadline.Add(time.Second * 30) // Create a new context with the updated deadline newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline) @@ -294,7 +293,6 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction if err != nil { return nil, err } - fmt.Println("shotuna response", response) txStatus = &TransactionStatus{ TxID: response.GetTxid(), Status: response.GetStatus().String(), @@ -347,7 +345,7 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio deadline, _ := ctx.Deadline() // decrease time to get initial deadline - newDeadline := deadline.Add(time.Second * 10) + newDeadline := deadline.Add(time.Second * 30) // increase time to make sure that expiration happens from inside the metramorph function newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 5002791d3..827daea91 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -274,15 +274,6 @@ func (p *Processor) StartProcessMinedCallbacks() { txsBlocks = append(txsBlocks, txBlock) - fmt.Println("shotuna", time.Now()) - // if we have a pending request with given transaction hash, provide mined status - if len(txBlock.TransactionHash) != 0 { - hash, err := chainhash.NewHash(txBlock.TransactionHash) - if err == nil { - fmt.Println("shotuna 3", time.Now(), hash.String()) - } - } - if len(txsBlocks) < p.processMinedBatchSize { continue } @@ -325,7 +316,6 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr for _, data := range updatedData { // if we have a pending request with given transaction hash, provide mined status - fmt.Println("shota meore", time.Now(), data.Hash.String()) p.responseProcessor.UpdateStatus(data.Hash, StatusAndError{ Hash: data.Hash, Status: metamorph_api.Status_MINED, @@ -754,8 +744,6 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques tracing.EndTracing(span, err) }() - dl, ok := ctx.Deadline() - fmt.Println("shota registering", dl, ok) statusResponse := NewStatusResponse(ctx, req.Data.Hash, req.ResponseChannel) // check if tx already stored, return it diff --git a/internal/metamorph/response_processor.go b/internal/metamorph/response_processor.go index 180f20a68..b1a3a38b1 100644 --- a/internal/metamorph/response_processor.go +++ b/internal/metamorph/response_processor.go @@ -2,9 +2,7 @@ package metamorph import ( "context" - "fmt" "sync" - "time" "github.com/libsv/go-p2p/chaincfg/chainhash" ) @@ -28,12 +26,11 @@ func (r *StatusResponse) UpdateStatus(statusAndError StatusAndError) { if r.statusCh == nil || r.ctx == nil { return } - fmt.Println("shotuna sending status") + select { case <-r.ctx.Done(): return default: - fmt.Println("shotuna sending status", statusAndError.Status) r.statusCh <- StatusAndError{ Hash: r.Hash, Status: statusAndError.Status, @@ -56,9 +53,6 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) { return } - dl, ok := statusResponse.ctx.Deadline() - fmt.Println("shota registering 2", dl, ok) - _, loaded := p.resMap.LoadOrStore(*statusResponse.Hash, statusResponse) if loaded { return @@ -67,31 +61,21 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) { // we no longer need status response object after response has been returned go func() { <-statusResponse.ctx.Done() - fmt.Println("shota expired", time.Now()) p.resMap.Delete(*statusResponse.Hash) }() } func (p *ResponseProcessor) UpdateStatus(hash *chainhash.Hash, statusAndError StatusAndError) (found bool) { val, ok := p.resMap.Load(*hash) - p.resMap.Range(func(key, value any) bool { - fmt.Println(key, value) - return true - }) if !ok { - fmt.Println("shota hash not found") return false } statusResponse, ok := val.(*StatusResponse) - fmt.Println("shota hash not found 2") - if !ok { return false } - fmt.Println("shota hash found 3") - go statusResponse.UpdateStatus(statusAndError) return true } diff --git a/internal/metamorph/server.go b/internal/metamorph/server.go index e4f30756d..06e5ab0a9 100644 --- a/internal/metamorph/server.go +++ b/internal/metamorph/server.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "errors" - "fmt" "log/slog" "runtime" "strings" @@ -57,19 +56,10 @@ type Server struct { processor ProcessorI store store.MetamorphStore checkStatusInterval time.Duration - bitcoinNode BitcoinNode - forceCheckUtxos bool tracingEnabled bool tracingAttributes []attribute.KeyValue } -func WithForceCheckUtxos(bitcoinNode BitcoinNode) func(*Server) { - return func(s *Server) { - s.bitcoinNode = bitcoinNode - s.forceCheckUtxos = true - } -} - func WithCheckStatusInterval(d time.Duration) func(*Server) { return func(s *Server) { s.checkStatusInterval = d @@ -102,7 +92,6 @@ func NewServer(prometheusEndpoint string, maxMsgSize int, logger *slog.Logger, processor: processor, store: store, checkStatusInterval: checkStatusIntervalDefault, - forceCheckUtxos: false, } for _, opt := range opts { @@ -155,8 +144,8 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact // decrease time to get initial deadline newDeadline := deadline - if time.Now().Add(10 * time.Second).Before(deadline) { - newDeadline = deadline.Add(-(time.Second * 10)) + if time.Now().Add(30 * time.Second).Before(deadline) { + newDeadline = deadline.Add(-(time.Second * 30)) } // Create a new context with the updated deadline @@ -184,8 +173,8 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac // decrease time to get initial deadline newDeadline := deadline - if time.Now().Add(10 * time.Second).Before(deadline) { - newDeadline = deadline.Add(-(time.Second * 10)) + if time.Now().Add(30 * time.Second).Before(deadline) { + newDeadline = deadline.Add(-(time.Second * 30)) } // Create a new context with the updated deadline @@ -305,28 +294,21 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph for { select { case <-ctx.Done(): - fmt.Println("shotuna time", time.Now()) // Ensure that function returns at latest when context times out returnedStatus.TimedOut = true return returnedStatus case <-checkStatusTicker.C: - - // Check in intervals whether the tx was seen on network & updated on DB by another metamorph instance - if waitForStatus != metamorph_api.Status_SEEN_ON_NETWORK { - continue - } - + // it's possible the transaction status was received and updated in db by another metamorph + // check if that's the case and we have a new tx status to return var tx *metamorph_api.TransactionStatus tx, err = s.GetTransactionStatus(ctx, &metamorph_api.TransactionStatusRequest{ Txid: txID, }) - if err == nil && tx.Status == waitForStatus { + if err == nil && tx.Status >= waitForStatus { return tx } case res := <-responseChannel: - fmt.Println("shotuna 7", res) returnedStatus.Status = res.Status - fmt.Println("shotuna st", returnedStatus, time.Now()) if span != nil { span.AddEvent("status change", trace.WithAttributes(attribute.String("status", returnedStatus.Status.String()))) @@ -337,26 +319,22 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph } if res.Err != nil { - fmt.Println("shotuna 23") returnedStatus.RejectReason = res.Err.Error() // Note: return here so that user doesn't have to wait for timeout in case of an error return returnedStatus } else { - fmt.Println("shotuna 24") returnedStatus.RejectReason = "" if res.Status == metamorph_api.Status_MINED { - fmt.Println("shotuna 26") var tx *metamorph_api.TransactionStatus tx, err = s.GetTransactionStatus(ctx, &metamorph_api.TransactionStatusRequest{ Txid: txID, }) if err != nil { - fmt.Println("shotuna 28") s.logger.Error("failed to get mined transaction from storage", slog.String("err", err.Error())) returnedStatus.RejectReason = err.Error() return returnedStatus } - fmt.Println("shotuna 20") + return tx } } diff --git a/internal/metamorph/server_test.go b/internal/metamorph/server_test.go index 919b237f6..dedd442f4 100644 --- a/internal/metamorph/server_test.go +++ b/internal/metamorph/server_test.go @@ -169,7 +169,6 @@ func TestPutTransaction(t *testing.T) { timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() actualStatus, err := sut.PutTransaction(timeoutCtx, txRequest) - fmt.Println("shota ", err) // then assert.NoError(t, err) diff --git a/test/config/config.yaml b/test/config/config.yaml index 09b5dfc62..f65539a5e 100644 --- a/test/config/config.yaml +++ b/test/config/config.yaml @@ -68,7 +68,6 @@ metamorph: checkSeenOnNetworkOlderThan: 3h checkSeenOnNetworkPeriod: 4h monitorPeers: true - checkUtxos: false profilerAddr: localhost:9992 health: serverDialAddr: localhost:8005 @@ -98,7 +97,7 @@ blocktx: registerTxsInterval: 200ms fillGaps: enabled: true - interval: 1s + interval: 30s maxAllowedBlockHeightMismatch: 3 api: diff --git a/test/submit_01_single_test.go b/test/submit_01_single_test.go index 69ba8dfb8..df0c93d88 100644 --- a/test/submit_01_single_test.go +++ b/test/submit_01_single_test.go @@ -6,6 +6,7 @@ import ( "embed" "encoding/hex" "fmt" + "net" "net/http" "strconv" "testing" @@ -14,6 +15,7 @@ import ( sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/libsv/go-bc" "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/node_client" @@ -166,19 +168,42 @@ func TestSubmitMined(t *testing.T) { callbackReceivedChan := make(chan *TransactionResponse) callbackErrChan := make(chan error) - callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, nil) - defer shutdown() + lis, err := net.Listen("tcp", ":9000") + require.NoError(t, err) + mux := http.NewServeMux() + defer func() { + t.Log("closing listener") + err = lis.Close() + require.NoError(t, err) + }() + + callbackURL, token := registerHandlerForCallback(t, callbackReceivedChan, callbackErrChan, nil, mux) + defer func() { + t.Log("closing channels") + + close(callbackReceivedChan) + close(callbackErrChan) + }() + + go func() { + t.Logf("starting callback server") + err = http.Serve(lis, mux) + if err != nil { + t.Log("callback server stopped") + } + }() // when - _ = postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: exRawTx}), + transactionResponse := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: exRawTx}), map[string]string{ "X-WaitFor": StatusMined, "X-CallbackUrl": callbackURL, "X-CallbackToken": token, + "X-MaxTimeout": "10", }, http.StatusOK) // wait for callback - callbackTimeout := time.After(10 * time.Second) + callbackTimeout := time.After(15 * time.Second) select { case status := <-callbackReceivedChan: @@ -190,6 +215,10 @@ func TestSubmitMined(t *testing.T) { case <-callbackTimeout: t.Fatal("callback exceeded timeout") } + + require.Equal(t, rawTx.TxID, transactionResponse.Txid) + require.Equal(t, StatusMined, transactionResponse.TxStatus) + require.Equal(t, merklePathStr, *transactionResponse.MerklePath) }) } @@ -208,11 +237,31 @@ func TestReturnMinedStatus(t *testing.T) { callbackReceivedChan := make(chan *TransactionResponse) callbackErrChan := make(chan error) - callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, nil) - defer shutdown() + lis, err := net.Listen("tcp", ":9000") + require.NoError(t, err) + mux := http.NewServeMux() + defer func() { + t.Log("closing listener") + err = lis.Close() + require.NoError(t, err) + }() + + callbackURL, token := registerHandlerForCallback(t, callbackReceivedChan, callbackErrChan, nil, mux) + defer func() { + t.Log("closing channels") + + close(callbackReceivedChan) + close(callbackErrChan) + }() + + go func() { + t.Logf("starting callback server") + err = http.Serve(lis, mux) + if err != nil { + t.Log("callback server stopped") + } + }() - // when - fmt.Println("shotuna 2", time.Now()) transactionResponse := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: exRawTx}), map[string]string{ "X-WaitFor": StatusMined, @@ -329,7 +378,7 @@ func TestCallback(t *testing.T) { type callbackServer struct { url, token string - responseChan chan *TransactionResponse + responseChan chan TransactionResponse errChan chan error } @@ -337,15 +386,30 @@ func TestCallback(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // given + lis, err := net.Listen("tcp", ":9000") + require.NoError(t, err) + mux := http.NewServeMux() + defer func() { + err = lis.Close() + require.NoError(t, err) + }() // setup callback servers const callbacksNumber = 2 // cannot be greater than 5 callbackServers := make([]*callbackServer, 0, tc.numberOfCallbackServers) for range tc.numberOfCallbackServers { - callbackReceivedChan, callbackErrChan, calbackResponseFn := prepareCallback(t, callbacksNumber) - callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, calbackResponseFn) - defer shutdown() + callbackReceivedChan := make(chan TransactionResponse, 100) // do not block callback server responses + callbackErrChan := make(chan error, 100) + + callbackResponseFn := getResponseFunc[TransactionResponse](t, callbacksNumber) + callbackURL, token := registerHandlerForCallback(t, callbackReceivedChan, callbackErrChan, callbackResponseFn, mux) + defer func() { + t.Log("closing channels") + + close(callbackReceivedChan) + close(callbackErrChan) + }() callbackServers = append(callbackServers, &callbackServer{ url: callbackURL, @@ -355,6 +419,14 @@ func TestCallback(t *testing.T) { }) } + go func() { + t.Logf("starting callback server") + err = http.Serve(lis, mux) + if err != nil { + t.Log("callback server stopped") + } + }() + // create transactions address, privateKey := node_client.GetNewWalletAddress(t, bitcoind) for i := range tc.numberOfTxs { @@ -393,6 +465,8 @@ func TestCallback(t *testing.T) { // then + var errs []error + // verify callbacks were received correctly for i, srv := range callbackServers { t.Logf("listen callbacks on server %s", srv.url) @@ -410,7 +484,7 @@ func TestCallback(t *testing.T) { case callback := <-srv.responseChan: require.NotNil(t, callback) - t.Logf("callback server %d iteration %d, txid: %s result: %s", i, j, callback.Txid, callback.TxStatus) + t.Logf("callback received - server: %d, iteration: %d, txid: %s result: %s", i, j, callback.Txid, callback.TxStatus) visitNumber, expectedTx := expectedTxsCallbacks[callback.Txid] require.True(t, expectedTx) @@ -423,13 +497,19 @@ func TestCallback(t *testing.T) { require.Equal(t, StatusMined, callback.TxStatus) - case err := <-srv.errChan: - t.Fatalf("callback server %d received - failed to parse %d callback %v", i, j, err) + case err = <-srv.errChan: + errs = append(errs, fmt.Errorf("callback received with error - server: %d, callback: %d, err: %v", i, j, err)) + t.Fail() case <-callbackTimeout: - t.Fatalf("callback server %d not received %d callback - timeout", i, j) + errs = append(errs, fmt.Errorf("callback not received - server: %d callback: %d - timeout", i, j)) + t.Fail() } } + for _, err = range errs { + assert.NoError(t, err) + } + require.Empty(t, expectedTxsCallbacks) // ensure all expected callbacks were received } }) @@ -473,7 +553,7 @@ func TestBatchCallback(t *testing.T) { type callbackServer struct { url, token string - responseChan chan *CallbackBatchResponse + responseChan chan CallbackBatchResponse errChan chan error } @@ -485,12 +565,25 @@ func TestBatchCallback(t *testing.T) { const callbacksNumber = 2 // cannot be greater than 5 callbackServers := make([]*callbackServer, 0, tc.numberOfCallbackServers) - + lis, err := net.Listen("tcp", ":9000") + require.NoError(t, err) + mux := http.NewServeMux() + defer func() { + err = lis.Close() + require.NoError(t, err) + }() for range tc.numberOfCallbackServers { - callbackReceivedChan, callbackErrChan, calbackResponseFn := prepareBatchCallback(t, callbacksNumber) - callbackURL, token, shutdown := startBatchCallbackSrv(t, callbackReceivedChan, callbackErrChan, calbackResponseFn) - defer shutdown() + callbackReceivedChan := make(chan CallbackBatchResponse, 100) // do not block callback server responses + callbackErrChan := make(chan error, 100) + + calbackResponseFn := getResponseFunc[CallbackBatchResponse](t, callbacksNumber) + callbackURL, token := registerHandlerForCallback(t, callbackReceivedChan, callbackErrChan, calbackResponseFn, mux) + defer func() { + t.Log("closing channels") + close(callbackReceivedChan) + close(callbackErrChan) + }() callbackServers = append(callbackServers, &callbackServer{ url: callbackURL, token: token, @@ -499,6 +592,14 @@ func TestBatchCallback(t *testing.T) { }) } + go func() { + t.Logf("starting callback server") + err = http.Serve(lis, mux) + if err != nil { + t.Log("callback server stopped") + } + }() + // create transactions address, privateKey := node_client.GetNewWalletAddress(t, bitcoind) for i := range tc.numberOfTxs { diff --git a/test/submit_04_beef_test.go b/test/submit_04_beef_test.go index 72fd73d5c..2982143df 100644 --- a/test/submit_04_beef_test.go +++ b/test/submit_04_beef_test.go @@ -4,6 +4,7 @@ package test import ( "fmt" + "net" "net/http" "strconv" "testing" @@ -32,8 +33,29 @@ func TestBeef(t *testing.T) { callbackReceivedChan := make(chan *TransactionResponse, expectedCallbacks) // do not block callback server responses callbackErrChan := make(chan error, expectedCallbacks) - callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, nil) - defer shutdown() + lis, err := net.Listen("tcp", ":9000") + require.NoError(t, err) + mux := http.NewServeMux() + defer func() { + err = lis.Close() + require.NoError(t, err) + }() + + callbackURL, token := registerHandlerForCallback(t, callbackReceivedChan, callbackErrChan, nil, mux) + defer func() { + t.Log("closing channels") + + close(callbackReceivedChan) + close(callbackErrChan) + }() + + go func() { + t.Logf("starting callback server") + err = http.Serve(lis, mux) + if err != nil { + t.Log("callback server stopped") + } + }() waitForStatusTimeoutSeconds := 30 diff --git a/test/submit_05_reorg_test.go b/test/submit_05_reorg_test.go index 50f336c48..3badd0672 100644 --- a/test/submit_05_reorg_test.go +++ b/test/submit_05_reorg_test.go @@ -4,12 +4,14 @@ package test import ( "fmt" + "net" "net/http" "testing" "time" - "github.com/bitcoin-sv/arc/internal/node_client" "github.com/stretchr/testify/require" + + "github.com/bitcoin-sv/arc/internal/node_client" ) func TestReorg(t *testing.T) { @@ -54,12 +56,32 @@ func TestReorg(t *testing.T) { tx2, err := node_client.CreateTx(privateKey, address, utxo) require.NoError(t, err) + lis, err := net.Listen("tcp", ":9000") + require.NoError(t, err) + mux := http.NewServeMux() + defer func() { + err = lis.Close() + require.NoError(t, err) + }() // prepare a callback server for tx2 callbackReceivedChan := make(chan *TransactionResponse) callbackErrChan := make(chan error) - callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, nil) - defer shutdown() + callbackURL, token := registerHandlerForCallback(t, callbackReceivedChan, callbackErrChan, nil, mux) + defer func() { + t.Log("closing channels") + + close(callbackReceivedChan) + close(callbackErrChan) + }() + + go func() { + t.Logf("starting callback server") + err = http.Serve(lis, mux) + if err != nil { + t.Log("callback server stopped") + } + }() // submit tx2 rawTx, err = tx2.EFHex() diff --git a/test/utils.go b/test/utils.go index 2eabe6124..7d7215a07 100644 --- a/test/utils.go +++ b/test/utils.go @@ -4,7 +4,6 @@ package test import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -55,11 +54,23 @@ type TransactionResponse struct { Txid string `json:"txid"` } +func (c TransactionResponse) GetTxID() string { + return c.Txid +} + type CallbackBatchResponse struct { Count int `json:"count"` Callbacks []*TransactionResponse `json:"callbacks,omitempty"` } +func (c CallbackBatchResponse) GetTxID() string { + return c.Callbacks[0].Txid +} + +type Response interface { + GetTxID() string +} + type ErrorFee struct { Detail string `json:"detail"` Txid string `json:"txid"` @@ -124,13 +135,8 @@ func generateRandomString(length int) string { return string(b) } -type ( - callbackResponseFn func(w http.ResponseWriter, rc chan *TransactionResponse, ec chan error, status *TransactionResponse) - callbackBatchResponseFn func(w http.ResponseWriter, rc chan *CallbackBatchResponse, ec chan error, status *CallbackBatchResponse) -) - -// use buffered channels for multiple callbacks -func startCallbackSrv(t *testing.T, receivedChan chan *TransactionResponse, errChan chan error, alternativeResponseFn callbackResponseFn) (callbackURL, token string, shutdownFn func()) { +// registerHandlerForCallback registers a new handler function that responds to callbacks with bad request response first and at second try with success or alternative given response function. It returns the callback URL and token to be used. +func registerHandlerForCallback[T any](t *testing.T, receivedChan chan T, errChan chan error, alternativeResponseFn func(w http.ResponseWriter, rc chan T, ec chan error, status T), mux *http.ServeMux) (callbackURL, token string) { t.Helper() callback := generateRandomString(16) @@ -142,74 +148,7 @@ func startCallbackSrv(t *testing.T, receivedChan chan *TransactionResponse, errC callbackURL = fmt.Sprintf("http://%s:9000/%s", hostname, callback) - http.HandleFunc(fmt.Sprintf("/%s", callback), func(w http.ResponseWriter, req *http.Request) { - // check auth - if expectedAuthHeader != req.Header.Get("Authorization") { - errChan <- fmt.Errorf("auth header %s not as expected %s", expectedAuthHeader, req.Header.Get("Authorization")) - err = respondToCallback(w, false) - if err != nil { - t.Fatalf("Failed to respond to callback: %v", err) - } - return - } - - status, err := readPayload[*TransactionResponse](t, req) - if err != nil { - errChan <- fmt.Errorf("read callback payload failed: %v", err) - return - } - - if alternativeResponseFn != nil { - alternativeResponseFn(w, receivedChan, errChan, status) - } else { - t.Log("callback received, responding success") - err = respondToCallback(w, true) - if err != nil { - t.Fatalf("Failed to respond to callback: %v", err) - } - - receivedChan <- status - } - }) - - srv := &http.Server{Addr: ":9000"} - shutdownFn = func() { - t.Logf("shutting down callback listener %s", callbackURL) - - if err := srv.Shutdown(context.TODO()); err != nil { - t.Fatal("failed to shut down server") - } - t.Log("callback listener is down") - - close(receivedChan) - close(errChan) - } - - go func(server *http.Server) { - t.Logf("starting callback server %s", callbackURL) - err := server.ListenAndServe() - if err != nil { - return - } - }(srv) - - return -} - -// use buffered channels for multiple callbacks -func startBatchCallbackSrv(t *testing.T, receivedChan chan *CallbackBatchResponse, errChan chan error, alternativeResponseFn callbackBatchResponseFn) (callbackURL, token string, shutdownFn func()) { - t.Helper() - - callback := generateRandomString(16) - token = "1234" - expectedAuthHeader := fmt.Sprintf("Bearer %s", token) - - hostname, err := os.Hostname() - require.NoError(t, err) - - callbackURL = fmt.Sprintf("http://%s:9000/%s/batch", hostname, callback) - - http.HandleFunc(fmt.Sprintf("/%s/batch", callback), func(w http.ResponseWriter, req *http.Request) { + mux.HandleFunc(fmt.Sprintf("/%s", callback), func(w http.ResponseWriter, req *http.Request) { // check auth if expectedAuthHeader != req.Header.Get("Authorization") { errChan <- fmt.Errorf("auth header %s not as expected %s", expectedAuthHeader, req.Header.Get("Authorization")) @@ -220,7 +159,7 @@ func startBatchCallbackSrv(t *testing.T, receivedChan chan *CallbackBatchRespons return } - status, err := readPayload[*CallbackBatchResponse](t, req) + status, err := readPayload[T](t, req) if err != nil { errChan <- fmt.Errorf("read callback payload failed: %v", err) return @@ -239,27 +178,7 @@ func startBatchCallbackSrv(t *testing.T, receivedChan chan *CallbackBatchRespons } }) - srv := &http.Server{Addr: ":9000"} - shutdownFn = func() { - t.Logf("shutting down callback listener %s", callbackURL) - close(receivedChan) - close(errChan) - - if err := srv.Shutdown(context.TODO()); err != nil { - t.Fatal("failed to shut down server") - } - t.Log("callback listener is down") - } - - go func(server *http.Server) { - t.Logf("starting callback server %s", callbackURL) - err := server.ListenAndServe() - if err != nil { - return - } - }(srv) - - return + return callbackURL, token } func readPayload[T any](t *testing.T, req *http.Request) (T, error) { @@ -320,59 +239,24 @@ func testTxSubmission(t *testing.T, callbackURL string, token string, callbackBa require.Equal(t, StatusSeenOnNetwork, response.TxStatus) } -func prepareCallback(t *testing.T, callbackNumbers int) (chan *TransactionResponse, chan error, callbackResponseFn) { +func getResponseFunc[T Response](t *testing.T, respondSuccessAtCallbacks int) func(w http.ResponseWriter, rc chan T, ec chan error, status T) { t.Helper() - callbackReceivedChan := make(chan *TransactionResponse, 100) // do not block callback server responses - callbackErrChan := make(chan error, 100) - responseVisitMap := make(map[string]int) mu := &sync.Mutex{} - calbackResponseFn := func(w http.ResponseWriter, rc chan *TransactionResponse, _ chan error, status *TransactionResponse) { + calbackResponseFn := func(w http.ResponseWriter, rc chan T, _ chan error, status T) { mu.Lock() - callbackNumber := responseVisitMap[status.Txid] - callbackNumber++ - responseVisitMap[status.Txid] = callbackNumber + txID := status.GetTxID() + callbackCounter := responseVisitMap[txID] + callbackCounter++ + responseVisitMap[txID] = callbackCounter mu.Unlock() - // Let ARC send the same callback few times. Respond with success on the last one. - respondWithSuccess := false - if callbackNumber < callbackNumbers { - respondWithSuccess = false - } else { - respondWithSuccess = true - } - - err := respondToCallback(w, respondWithSuccess) - if err != nil { - t.Fatalf("Failed to respond to callback: %v", err) - } - - rc <- status - } - return callbackReceivedChan, callbackErrChan, calbackResponseFn -} - -func prepareBatchCallback(t *testing.T, callbackNumbers int) (chan *CallbackBatchResponse, chan error, callbackBatchResponseFn) { - t.Helper() - - callbackReceivedChan := make(chan *CallbackBatchResponse, 100) // do not block callback server responses - callbackErrChan := make(chan error, 100) - responseVisitMap := make(map[string]int) - mu := &sync.Mutex{} - - calbackResponseFn := func(w http.ResponseWriter, rc chan *CallbackBatchResponse, _ chan error, status *CallbackBatchResponse) { - mu.Lock() - callbackNumber := responseVisitMap[status.Callbacks[0].Txid] - callbackNumber++ - responseVisitMap[status.Callbacks[0].Txid] = callbackNumber - mu.Unlock() - // Let ARC send the same callback few times. Respond with success on the last one. + // Let ARC send the same callback few times respondWithSuccess := false - if callbackNumber < callbackNumbers { - respondWithSuccess = false - } else { + if callbackCounter >= respondSuccessAtCallbacks { + // Respond with success at specified number of requests respondWithSuccess = true } @@ -383,7 +267,7 @@ func prepareBatchCallback(t *testing.T, callbackNumbers int) (chan *CallbackBatc rc <- status } - return callbackReceivedChan, callbackErrChan, calbackResponseFn + return calbackResponseFn } func generateNewUnlockingScriptFromRandomKey() (*script.Script, error) {