Skip to content

Commit

Permalink
send mined status
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Jan 14, 2025
1 parent c2c7851 commit 06260cc
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 8 deletions.
6 changes: 4 additions & 2 deletions internal/metamorph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metamorph
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"runtime"
Expand Down Expand Up @@ -283,7 +284,7 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction

deadline, _ := ctx.Deadline()
// increase time to make sure that expiration happens from inside the metramorph function
newDeadline := deadline.Add(time.Second * 2)
newDeadline := deadline.Add(time.Second * 10)

// Create a new context with the updated deadline
newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline)
Expand All @@ -293,6 +294,7 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction
if err != nil {
return nil, err
}
fmt.Println("shotuna response", response)
txStatus = &TransactionStatus{
TxID: response.GetTxid(),
Status: response.GetStatus().String(),
Expand Down Expand Up @@ -345,7 +347,7 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio

deadline, _ := ctx.Deadline()
// decrease time to get initial deadline
newDeadline := deadline.Add(time.Second * 5)
newDeadline := deadline.Add(time.Second * 10)

// increase time to make sure that expiration happens from inside the metramorph function
newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline)
Expand Down
18 changes: 18 additions & 0 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ func (p *Processor) StartProcessMinedCallbacks() {

txsBlocks = append(txsBlocks, txBlock)

fmt.Println("shotuna", time.Now())
// if we have a pending request with given transaction hash, provide mined status
if len(txBlock.TransactionHash) != 0 {
hash, err := chainhash.NewHash(txBlock.TransactionHash)
if err == nil {
fmt.Println("shotuna 3", time.Now(), hash.String())
}
}

if len(txsBlocks) < p.processMinedBatchSize {
continue
}
Expand Down Expand Up @@ -315,6 +324,13 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr
}

for _, data := range updatedData {
// if we have a pending request with given transaction hash, provide mined status
fmt.Println("shota meore", time.Now(), data.Hash.String())
p.responseProcessor.UpdateStatus(data.Hash, StatusAndError{
Hash: data.Hash,
Status: metamorph_api.Status_MINED,
})

if len(data.Callbacks) > 0 {
requests := toSendRequest(data)
for _, request := range requests {
Expand Down Expand Up @@ -738,6 +754,8 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques
tracing.EndTracing(span, err)
}()

dl, ok := ctx.Deadline()
fmt.Println("shota registering", dl, ok)
statusResponse := NewStatusResponse(ctx, req.Data.Hash, req.ResponseChannel)

// check if tx already stored, return it
Expand Down
18 changes: 17 additions & 1 deletion internal/metamorph/response_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package metamorph

import (
"context"
"fmt"
"sync"
"time"

"github.com/libsv/go-p2p/chaincfg/chainhash"
)
Expand All @@ -26,11 +28,12 @@ func (r *StatusResponse) UpdateStatus(statusAndError StatusAndError) {
if r.statusCh == nil || r.ctx == nil {
return
}

fmt.Println("shotuna sending status")
select {
case <-r.ctx.Done():
return
default:
fmt.Println("shotuna sending status", statusAndError.Status)
r.statusCh <- StatusAndError{
Hash: r.Hash,
Status: statusAndError.Status,
Expand All @@ -53,6 +56,9 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) {
return
}

dl, ok := statusResponse.ctx.Deadline()
fmt.Println("shota registering 2", dl, ok)

_, loaded := p.resMap.LoadOrStore(*statusResponse.Hash, statusResponse)
if loaded {
return
Expand All @@ -61,21 +67,31 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) {
// we no longer need status response object after response has been returned
go func() {
<-statusResponse.ctx.Done()
fmt.Println("shota expired", time.Now())
p.resMap.Delete(*statusResponse.Hash)
}()
}

func (p *ResponseProcessor) UpdateStatus(hash *chainhash.Hash, statusAndError StatusAndError) (found bool) {
val, ok := p.resMap.Load(*hash)
p.resMap.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
if !ok {
fmt.Println("shota hash not found")
return false
}

statusResponse, ok := val.(*StatusResponse)
fmt.Println("shota hash not found 2")

if !ok {
return false
}

fmt.Println("shota hash found 3")

go statusResponse.UpdateStatus(statusAndError)
return true
}
Expand Down
18 changes: 13 additions & 5 deletions internal/metamorph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"log/slog"
"runtime"
"strings"
Expand Down Expand Up @@ -154,8 +155,8 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact

// decrease time to get initial deadline
newDeadline := deadline
if time.Now().Add(2 * time.Second).Before(deadline) {
newDeadline = deadline.Add(-(time.Second * 2))
if time.Now().Add(10 * time.Second).Before(deadline) {
newDeadline = deadline.Add(-(time.Second * 10))
}

// Create a new context with the updated deadline
Expand Down Expand Up @@ -183,8 +184,8 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac

// decrease time to get initial deadline
newDeadline := deadline
if time.Now().Add(2 * time.Second).Before(deadline) {
newDeadline = deadline.Add(-(time.Second * 2))
if time.Now().Add(10 * time.Second).Before(deadline) {
newDeadline = deadline.Add(-(time.Second * 10))
}

// Create a new context with the updated deadline
Expand Down Expand Up @@ -304,6 +305,7 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph
for {
select {
case <-ctx.Done():
fmt.Println("shotuna time", time.Now())
// Ensure that function returns at latest when context times out
returnedStatus.TimedOut = true
return returnedStatus
Expand All @@ -322,7 +324,9 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph
return tx
}
case res := <-responseChannel:
fmt.Println("shotuna 7", res)
returnedStatus.Status = res.Status
fmt.Println("shotuna st", returnedStatus, time.Now())

if span != nil {
span.AddEvent("status change", trace.WithAttributes(attribute.String("status", returnedStatus.Status.String())))
Expand All @@ -333,22 +337,26 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph
}

if res.Err != nil {
fmt.Println("shotuna 23")
returnedStatus.RejectReason = res.Err.Error()
// Note: return here so that user doesn't have to wait for timeout in case of an error
return returnedStatus
} else {
fmt.Println("shotuna 24")
returnedStatus.RejectReason = ""
if res.Status == metamorph_api.Status_MINED {
fmt.Println("shotuna 26")
var tx *metamorph_api.TransactionStatus
tx, err = s.GetTransactionStatus(ctx, &metamorph_api.TransactionStatusRequest{
Txid: txID,
})
if err != nil {
fmt.Println("shotuna 28")
s.logger.Error("failed to get mined transaction from storage", slog.String("err", err.Error()))
returnedStatus.RejectReason = err.Error()
return returnedStatus
}

fmt.Println("shotuna 20")
return tx
}
}
Expand Down
45 changes: 45 additions & 0 deletions test/submit_01_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,51 @@ func TestSubmitMined(t *testing.T) {
})
}

func TestReturnMinedStatus(t *testing.T) {
t.Run("submit mined tx", func(t *testing.T) {
// submit an unregistered, already mined transaction. ARC should return the status as MINED for the transaction.

// given
address, _ := node_client.FundNewWallet(t, bitcoind)
utxos := node_client.GetUtxos(t, bitcoind, address)

rawTx, _ := bitcoind.GetRawTransaction(utxos[0].Txid)
tx, _ := sdkTx.NewTransactionFromHex(rawTx.Hex)
exRawTx := tx.String()

callbackReceivedChan := make(chan *TransactionResponse)
callbackErrChan := make(chan error)

callbackURL, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, nil)
defer shutdown()

// when
fmt.Println("shotuna 2", time.Now())
transactionResponse := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: exRawTx}),
map[string]string{
"X-WaitFor": StatusMined,
"X-CallbackUrl": callbackURL,
"X-CallbackToken": token,
"X-MaxTimeout": "10",
}, http.StatusOK)

// wait for callback
callbackTimeout := time.After(15 * time.Second)

select {
case status := <-callbackReceivedChan:
require.Equal(t, rawTx.TxID, status.Txid)
require.Equal(t, StatusMined, status.TxStatus)
case err := <-callbackErrChan:
t.Fatalf("callback error: %v", err)
case <-callbackTimeout:
t.Fatal("callback exceeded timeout")
}

Check failure on line 235 in test/submit_01_single_test.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

undefined: startCallbackSrv (typecheck)

require.Equal(t, StatusMined, transactionResponse.TxStatus)
})
}

func TestSubmitQueued(t *testing.T) {
t.Run("queued", func(t *testing.T) {
address, privateKey := node_client.FundNewWallet(t, bitcoind)
Expand Down

0 comments on commit 06260cc

Please sign in to comment.