Skip to content

Commit

Permalink
Merge pull request #199 from bitcoin-sv/callbacker
Browse files Browse the repository at this point in the history
Callbacker
  • Loading branch information
shotasilagadzetaal authored Dec 12, 2023
2 parents e25644c + bba7867 commit c5d6de6
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 103 deletions.
28 changes: 0 additions & 28 deletions cmd/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"time"

awscfg "github.com/aws/aws-sdk-go-v2/config"
awsdynamodb "github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/bitcoin-sv/arc/asynccaller"
"github.com/bitcoin-sv/arc/blocktx"
"github.com/bitcoin-sv/arc/blocktx/blocktx_api"
blockTxStore "github.com/bitcoin-sv/arc/blocktx/store"
"github.com/bitcoin-sv/arc/callbacker"
"github.com/bitcoin-sv/arc/callbacker/callbacker_api"
"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/metamorph"
"github.com/bitcoin-sv/arc/metamorph/store"
Expand Down Expand Up @@ -115,29 +111,6 @@ func StartMetamorph(logger utils.Logger) (func(), error) {

pm, statusMessageCh := initPeerManager(logger, s)

callbackerAddress := viper.GetString("callbacker.dialAddr")
if callbackerAddress == "" {
logger.Fatalf("no callbacker.dialAddr setting found")
}
cb := callbacker.NewClient(callbackerAddress)

callbackRegisterPath, err := filepath.Abs(path.Join(folder, "callback-register"))
if err != nil {
logger.Fatalf("Could not get absolute path: %v", err)
}

// create an async caller to callbacker
var cbAsyncCaller *asynccaller.AsyncCaller[callbacker_api.Callback]
cbAsyncCaller, err = asynccaller.New[callbacker_api.Callback](
logger,
callbackRegisterPath,
10*time.Second,
metamorph.NewRegisterCallbackClient(cb),
)
if err != nil {
logger.Fatalf("error creating async caller: %v", err)
}

mapExpiryStr := viper.GetString("metamorph.processorCacheExpiryTime")
mapExpiry, err := time.ParseDuration(mapExpiryStr)
if err != nil {
Expand All @@ -159,7 +132,6 @@ func StartMetamorph(logger utils.Logger) (func(), error) {
metamorphProcessor, err := metamorph.NewProcessor(
s,
pm,
cbAsyncCaller.GetChannel(),
btx,
metamorph.WithCacheExpiryTime(mapExpiry),
metamorph.WithProcessorLogger(processorLogger),
Expand Down
97 changes: 97 additions & 0 deletions metamorph/callbacker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package metamorph

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"strconv"
"time"

"github.com/bitcoin-sv/arc/api"
"github.com/bitcoin-sv/arc/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/metamorph/store"
"github.com/ordishs/go-utils"
)

const (
CallbackTries = 5
CallbackIntervalSeconds = 5
)

func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreData) {
sleepDuration := CallbackIntervalSeconds
for i := 0; i < CallbackTries; i++ {
statusString := metamorph_api.Status(tx.Status).String()
blockHash := ""
if tx.BlockHash != nil {
blockHash = utils.ReverseAndHexEncodeSlice(tx.BlockHash.CloneBytes())
}

logger.Info("sending callback for transaction", slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash))

status := &api.TransactionStatus{
BlockHash: &blockHash,
BlockHeight: &tx.BlockHeight,
TxStatus: &statusString,
Txid: tx.Hash.String(),
Timestamp: time.Now(),
}
statusBytes, err := json.Marshal(status)
if err != nil {
logger.Error("Couldn't marshal status - ", err)
return
}

var request *http.Request
request, err = http.NewRequest("POST", tx.CallbackUrl, bytes.NewBuffer(statusBytes))
if err != nil {
logger.Error("Couldn't marshal status - ", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash)))
return
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
if tx.CallbackToken != "" {
request.Header.Set("Authorization", "Bearer "+tx.CallbackToken)
}

// default http client
httpClient := http.Client{
Timeout: 5 * time.Second,
}

var response *http.Response
response, err = httpClient.Do(request)
if err != nil {
logger.Error("Couldn't send transaction info through callback url - ", err)
continue
}
defer response.Body.Close()

// if callback was sent successfully we stop here
if response.StatusCode == http.StatusOK {
err = s.RemoveCallbacker(context.Background(), tx.Hash)
if err != nil {
logger.Error("Couldn't update/remove callback url - ", err)
continue
}
return
}

logger.Error("callback response status code not ok - ", slog.String("status", strconv.Itoa(response.StatusCode)))

// sleep before trying again
time.Sleep(time.Duration(sleepDuration) * time.Second)
// increase intervals on each failure
sleepDuration *= 2
}

err := s.RemoveCallbacker(context.Background(), tx.Hash)
if err != nil {
logger.Error("Couldn't update/remove callback url - ", err)
return
}
logger.Error("Couldn't send transaction info through callback url after tries: ", slog.String("status", strconv.Itoa(CallbackTries)))
}
50 changes: 50 additions & 0 deletions metamorph/mocks/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 3 additions & 23 deletions metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/bitcoin-sv/arc/blocktx"
"github.com/bitcoin-sv/arc/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/callbacker/callbacker_api"
"github.com/bitcoin-sv/arc/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/metamorph/processor_response"
"github.com/bitcoin-sv/arc/metamorph/store"
Expand Down Expand Up @@ -41,7 +40,6 @@ const (

type Processor struct {
store store.MetamorphStore
cbChannel chan *callbacker_api.Callback
ProcessorResponseMap *ProcessorResponseMap
pm p2p.PeerManagerI
btc blocktx.ClientI
Expand Down Expand Up @@ -72,8 +70,7 @@ type Processor struct {

type Option func(f *Processor)

func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI,
cbChannel chan *callbacker_api.Callback, btc blocktx.ClientI, opts ...Option) (*Processor, error) {
func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, btc blocktx.ClientI, opts ...Option) (*Processor, error) {
if s == nil {
return nil, errors.New("store cannot be nil")
}
Expand All @@ -85,7 +82,6 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI,
p := &Processor{
startTime: time.Now().UTC(),
store: s,
cbChannel: cbChannel,
pm: pm,
btc: btc,
dataRetentionPeriod: dataRetentionPeriodDefault,
Expand Down Expand Up @@ -151,9 +147,6 @@ func (p *Processor) Shutdown() {
p.processExpiredSeenTxsTicker.Stop()
p.processExpiredTxsTicker.Stop()
p.ProcessorResponseMap.Close()
if p.cbChannel != nil {
close(p.cbChannel)
}
}

func (p *Processor) unlockItems() error {
Expand Down Expand Up @@ -387,21 +380,8 @@ func (p *Processor) SendStatusMinedForTransaction(hash *chainhash.Hash, blockHas
resp.Close()
p.ProcessorResponseMap.Delete(hash)

if p.cbChannel != nil {
data, _ := p.store.Get(spanCtx, hash[:])

if data != nil && data.CallbackUrl != "" {
p.cbChannel <- &callbacker_api.Callback{
Hash: data.Hash[:],
Url: data.CallbackUrl,
Token: data.CallbackToken,
Status: int32(data.Status),
BlockHash: data.BlockHash[:],
BlockHeight: data.BlockHeight,
}
}
}

data, _ := p.store.Get(spanCtx, hash[:])
go SendCallback(p.logger, p.store, data)
},
})

Expand Down
Loading

0 comments on commit c5d6de6

Please sign in to comment.