From 06260cc85552eb7f801cd912f7ac9085b9281e40 Mon Sep 17 00:00:00 2001 From: "shota.silagadze" Date: Tue, 14 Jan 2025 11:10:01 +0400 Subject: [PATCH] send mined status --- internal/metamorph/client.go | 6 ++-- internal/metamorph/processor.go | 18 ++++++++++ internal/metamorph/response_processor.go | 18 +++++++++- internal/metamorph/server.go | 18 +++++++--- test/submit_01_single_test.go | 45 ++++++++++++++++++++++++ 5 files changed, 97 insertions(+), 8 deletions(-) diff --git a/internal/metamorph/client.go b/internal/metamorph/client.go index c9992d61d..18054a98a 100644 --- a/internal/metamorph/client.go +++ b/internal/metamorph/client.go @@ -3,6 +3,7 @@ package metamorph import ( "context" "errors" + "fmt" "log/slog" "os" "runtime" @@ -283,7 +284,7 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction deadline, _ := ctx.Deadline() // increase time to make sure that expiration happens from inside the metramorph function - newDeadline := deadline.Add(time.Second * 2) + newDeadline := deadline.Add(time.Second * 10) // Create a new context with the updated deadline newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline) @@ -293,6 +294,7 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction if err != nil { return nil, err } + fmt.Println("shotuna response", response) txStatus = &TransactionStatus{ TxID: response.GetTxid(), Status: response.GetStatus().String(), @@ -345,7 +347,7 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio deadline, _ := ctx.Deadline() // decrease time to get initial deadline - newDeadline := deadline.Add(time.Second * 5) + newDeadline := deadline.Add(time.Second * 10) // increase time to make sure that expiration happens from inside the metramorph function newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index dea8596b5..5002791d3 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -274,6 +274,15 @@ func (p *Processor) StartProcessMinedCallbacks() { txsBlocks = append(txsBlocks, txBlock) + fmt.Println("shotuna", time.Now()) + // if we have a pending request with given transaction hash, provide mined status + if len(txBlock.TransactionHash) != 0 { + hash, err := chainhash.NewHash(txBlock.TransactionHash) + if err == nil { + fmt.Println("shotuna 3", time.Now(), hash.String()) + } + } + if len(txsBlocks) < p.processMinedBatchSize { continue } @@ -315,6 +324,13 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr } for _, data := range updatedData { + // if we have a pending request with given transaction hash, provide mined status + fmt.Println("shota meore", time.Now(), data.Hash.String()) + p.responseProcessor.UpdateStatus(data.Hash, StatusAndError{ + Hash: data.Hash, + Status: metamorph_api.Status_MINED, + }) + if len(data.Callbacks) > 0 { requests := toSendRequest(data) for _, request := range requests { @@ -738,6 +754,8 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques tracing.EndTracing(span, err) }() + dl, ok := ctx.Deadline() + fmt.Println("shota registering", dl, ok) statusResponse := NewStatusResponse(ctx, req.Data.Hash, req.ResponseChannel) // check if tx already stored, return it diff --git a/internal/metamorph/response_processor.go b/internal/metamorph/response_processor.go index b1a3a38b1..180f20a68 100644 --- a/internal/metamorph/response_processor.go +++ b/internal/metamorph/response_processor.go @@ -2,7 +2,9 @@ package metamorph import ( "context" + "fmt" "sync" + "time" "github.com/libsv/go-p2p/chaincfg/chainhash" ) @@ -26,11 +28,12 @@ func (r *StatusResponse) UpdateStatus(statusAndError StatusAndError) { if r.statusCh == nil || r.ctx == nil { return } - + fmt.Println("shotuna sending status") select { case <-r.ctx.Done(): return default: + fmt.Println("shotuna sending status", statusAndError.Status) r.statusCh <- StatusAndError{ Hash: r.Hash, Status: statusAndError.Status, @@ -53,6 +56,9 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) { return } + dl, ok := statusResponse.ctx.Deadline() + fmt.Println("shota registering 2", dl, ok) + _, loaded := p.resMap.LoadOrStore(*statusResponse.Hash, statusResponse) if loaded { return @@ -61,21 +67,31 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) { // we no longer need status response object after response has been returned go func() { <-statusResponse.ctx.Done() + fmt.Println("shota expired", time.Now()) p.resMap.Delete(*statusResponse.Hash) }() } func (p *ResponseProcessor) UpdateStatus(hash *chainhash.Hash, statusAndError StatusAndError) (found bool) { val, ok := p.resMap.Load(*hash) + p.resMap.Range(func(key, value any) bool { + fmt.Println(key, value) + return true + }) if !ok { + fmt.Println("shota hash not found") return false } statusResponse, ok := val.(*StatusResponse) + fmt.Println("shota hash not found 2") + if !ok { return false } + fmt.Println("shota hash found 3") + go statusResponse.UpdateStatus(statusAndError) return true } diff --git a/internal/metamorph/server.go b/internal/metamorph/server.go index da73ae22d..e4f30756d 100644 --- a/internal/metamorph/server.go +++ b/internal/metamorph/server.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "errors" + "fmt" "log/slog" "runtime" "strings" @@ -154,8 +155,8 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact // decrease time to get initial deadline newDeadline := deadline - if time.Now().Add(2 * time.Second).Before(deadline) { - newDeadline = deadline.Add(-(time.Second * 2)) + if time.Now().Add(10 * time.Second).Before(deadline) { + newDeadline = deadline.Add(-(time.Second * 10)) } // Create a new context with the updated deadline @@ -183,8 +184,8 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac // decrease time to get initial deadline newDeadline := deadline - if time.Now().Add(2 * time.Second).Before(deadline) { - newDeadline = deadline.Add(-(time.Second * 2)) + if time.Now().Add(10 * time.Second).Before(deadline) { + newDeadline = deadline.Add(-(time.Second * 10)) } // Create a new context with the updated deadline @@ -304,6 +305,7 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph for { select { case <-ctx.Done(): + fmt.Println("shotuna time", time.Now()) // Ensure that function returns at latest when context times out returnedStatus.TimedOut = true return returnedStatus @@ -322,7 +324,9 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph return tx } case res := <-responseChannel: + fmt.Println("shotuna 7", res) returnedStatus.Status = res.Status + fmt.Println("shotuna st", returnedStatus, time.Now()) if span != nil { span.AddEvent("status change", trace.WithAttributes(attribute.String("status", returnedStatus.Status.String()))) @@ -333,22 +337,26 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph } if res.Err != nil { + fmt.Println("shotuna 23") returnedStatus.RejectReason = res.Err.Error() // Note: return here so that user doesn't have to wait for timeout in case of an error return returnedStatus } else { + fmt.Println("shotuna 24") returnedStatus.RejectReason = "" if res.Status == metamorph_api.Status_MINED { + fmt.Println("shotuna 26") var tx *metamorph_api.TransactionStatus tx, err = s.GetTransactionStatus(ctx, &metamorph_api.TransactionStatusRequest{ Txid: txID, }) if err != nil { + fmt.Println("shotuna 28") s.logger.Error("failed to get mined transaction from storage", slog.String("err", err.Error())) returnedStatus.RejectReason = err.Error() return returnedStatus } - + fmt.Println("shotuna 20") return tx } } diff --git a/test/submit_01_single_test.go b/test/submit_01_single_test.go index 55aa66942..69ba8dfb8 100644 --- a/test/submit_01_single_test.go +++ b/test/submit_01_single_test.go @@ -193,6 +193,51 @@ func TestSubmitMined(t *testing.T) { }) } +func TestReturnMinedStatus(t *testing.T) { + t.Run("submit mined tx", func(t *testing.T) { + // submit an unregistered, already mined transaction. ARC should return the status as MINED for the transaction. + + // given + address, _ := node_client.FundNewWallet(t, bitcoind) + utxos := node_client.GetUtxos(t, bitcoind, address) + + rawTx, _ := bitcoind.GetRawTransaction(utxos[0].Txid) + tx, _ := sdkTx.NewTransactionFromHex(rawTx.Hex) + exRawTx := tx.String() + + callbackReceivedChan := make(chan *TransactionResponse) + callbackErrChan := make(chan error) + + callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, nil) + defer shutdown() + + // when + fmt.Println("shotuna 2", time.Now()) + transactionResponse := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: exRawTx}), + map[string]string{ + "X-WaitFor": StatusMined, + "X-CallbackUrl": callbackURL, + "X-CallbackToken": token, + "X-MaxTimeout": "10", + }, http.StatusOK) + + // wait for callback + callbackTimeout := time.After(15 * time.Second) + + select { + case status := <-callbackReceivedChan: + require.Equal(t, rawTx.TxID, status.Txid) + require.Equal(t, StatusMined, status.TxStatus) + case err := <-callbackErrChan: + t.Fatalf("callback error: %v", err) + case <-callbackTimeout: + t.Fatal("callback exceeded timeout") + } + + require.Equal(t, StatusMined, transactionResponse.TxStatus) + }) +} + func TestSubmitQueued(t *testing.T) { t.Run("queued", func(t *testing.T) { address, privateKey := node_client.FundNewWallet(t, bitcoind)