Skip to content

Commit

Permalink
Merge branch 'main' into update-mined-status
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Jan 15, 2025
2 parents 06260cc + 4c36025 commit c812e52
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 290 deletions.
20 changes: 0 additions & 20 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"context"
"fmt"
"log/slog"
"net/url"
"os"
"time"

"go.opentelemetry.io/otel/attribute"

"github.com/libsv/go-p2p"
"github.com/ordishs/go-bitcoin"
"google.golang.org/grpc"

"github.com/bitcoin-sv/arc/internal/cache"
Expand Down Expand Up @@ -179,24 +177,6 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
return nil, fmt.Errorf("failed to start metamorph processor: %v", err)
}

if mtmConfig.CheckUtxos {
peerRPC := arcConfig.PeerRPC

rpcURL, err := url.Parse(fmt.Sprintf("rpc://%s:%s@%s:%d", peerRPC.User, peerRPC.Password, peerRPC.Host, peerRPC.Port))
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to parse rpc URL: %v", err)
}

node, err := bitcoin.NewFromURL(rpcURL, false)
if err != nil {
stopFn()
return nil, err
}

optsServer = append(optsServer, metamorph.WithForceCheckUtxos(node))
}

server, err = metamorph.NewServer(arcConfig.Prometheus.Endpoint, arcConfig.GrpcMessageSize, logger,
metamorphStore, processor, arcConfig.Tracing, optsServer...)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ type MetamorphConfig struct {
ProcessStatusUpdateInterval time.Duration `mapstructure:"processStatusUpdateInterval"`
RecheckSeen RecheckSeen `mapstructure:"recheckSeen"`
MonitorPeers bool `mapstructure:"monitorPeers"`
CheckUtxos bool `mapstructure:"checkUtxos"`
Health *HealthConfig `mapstructure:"health"`
RejectCallbackContaining []string `mapstructure:"rejectCallbackContaining"`
Stats *StatsConfig `mapstructure:"stats"`
Expand Down
1 change: 0 additions & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func getMetamorphConfig() *MetamorphConfig {
UntilAgo: 1 * time.Hour,
},
MonitorPeers: false,
CheckUtxos: false,
Health: &HealthConfig{
SeverDialAddr: "localhost:8005",
MinimumHealthyConnections: 2,
Expand Down
1 change: 0 additions & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ metamorph:
fromAgo: 24h # start of moving time window
untilAgo: 1h # end of moving time window
monitorPeers: false # if enabled, peers which do not receive alive signal from nodes will be restarted
checkUtxos: false # force check each utxo for validity. If enabled ARC connects to bitcoin node using rpc for each utxo
health:
serverDialAddr: localhost:8005 # address at which the grpc health server is exposed
minimumHealthyConnections: 2 # minimum number of healthy peer connections for the processor to be considered healthy
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.31.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sync v0.10.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
Expand Down Expand Up @@ -156,13 +156,13 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
Expand Down
22 changes: 10 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ github.com/libsv/go-bt v1.0.4 h1:2Css5lfomk/J97tM5Gk56Lp+tTK6xWYnmHNc/fGO6lE=
github.com/libsv/go-bt v1.0.4/go.mod h1:AfXoLFYEbY/TvCq/84xTce2xGjPUuC5imokHmcykF2k=
github.com/libsv/go-bt/v2 v2.2.5 h1:VoggBLMRW9NYoFujqe5bSYKqnw5y+fYfufgERSoubog=
github.com/libsv/go-bt/v2 v2.2.5/go.mod h1:cV45+jDlPOLfhJLfpLmpQoWzrIvVth9Ao2ZO1f6CcqU=
github.com/libsv/go-p2p v0.3.2 h1:O32CzkqM+jhSuleRHJln6JjL2pKH8aaRTx8lAfhIiic=
github.com/libsv/go-p2p v0.3.2/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/libsv/go-p2p v0.3.3 h1:5h+69MsGgFwQWyD8MEqyPeqbqKGRpKLzzOcI5cSLfgY=
github.com/libsv/go-p2p v0.3.3/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw=
Expand Down Expand Up @@ -408,8 +406,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand All @@ -427,8 +425,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -439,17 +437,17 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
27 changes: 14 additions & 13 deletions internal/blocktx/background_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"sync"
"time"

"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/internal/blocktx/store"
)

type BackgroundWorkers struct {
l *slog.Logger
s store.BlocktxStore
logger *slog.Logger
store store.BlocktxStore

workersWg sync.WaitGroup
ctx context.Context
Expand All @@ -23,21 +24,21 @@ func NewBackgroundWorkers(store store.BlocktxStore, logger *slog.Logger) *Backgr
ctx, cancel := context.WithCancel(context.Background())

return &BackgroundWorkers{
s: store,
l: logger.With(slog.String("module", "background workers")),
store: store,
logger: logger.With(slog.String("module", "background workers")),

ctx: ctx,
cancelAll: cancel,
}
}

func (w *BackgroundWorkers) GracefulStop() {
w.l.Info("Shutting down")
w.logger.Info("Shutting down")

w.cancelAll()
w.workersWg.Wait()

w.l.Info("Shutdown complete")
w.logger.Info("Shutdown complete")
}

func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- BlockRequest) {
Expand All @@ -46,20 +47,20 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat
go func() {
defer w.workersWg.Done()

t := time.NewTicker(interval)
ticker := time.NewTicker(interval)
i := 0

for {
select {
case <-t.C:
case <-ticker.C:
i = i % len(peers)
err := w.fillGaps(peers[i], retentionDays, blockRequestingCh)
if err != nil {
w.l.Error("failed to fill blocks gaps", slog.String("err", err.Error()))
w.logger.Error("failed to fill blocks gaps", slog.String("err", err.Error()))
}

i++
t.Reset(interval)
ticker.Reset(interval)

case <-w.ctx.Done():
return
Expand All @@ -75,7 +76,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq
)

heightRange := retentionDays * hoursPerDay * blocksPerHour
blockHeightGaps, err := w.s.GetBlockGaps(w.ctx, heightRange)
blockHeightGaps, err := w.store.GetBlockGaps(w.ctx, heightRange)
if err != nil || len(blockHeightGaps) == 0 {
return err
}
Expand All @@ -85,7 +86,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq
break
}

w.l.Info("adding request for missing block to request channel",
w.logger.Info("adding request for missing block to request channel",
slog.String("hash", block.Hash.String()),
slog.Uint64("height", block.Height),
slog.String("peer", peer.String()),
Expand Down
8 changes: 3 additions & 5 deletions internal/metamorph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package metamorph
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"runtime"
Expand Down Expand Up @@ -213,7 +212,7 @@ func (m *Metamorph) GetTransactionStatus(ctx context.Context, txID string) (txSt
return txStatus, nil
}

// GetTransactionStatusეს gets the statusეს of all transactions.
// GetTransactionStatuses gets the status of all transactions.
func (m *Metamorph) GetTransactionStatuses(ctx context.Context, txIDs []string) (txStatus []*TransactionStatus, err error) {
ctx, span := tracing.StartTracing(ctx, "GetTransactionStatus", m.tracingEnabled, append(m.tracingAttributes, attribute.String("txIDs", txIDs[0]))...)
defer func() {
Expand Down Expand Up @@ -284,7 +283,7 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction

deadline, _ := ctx.Deadline()
// increase time to make sure that expiration happens from inside the metramorph function
newDeadline := deadline.Add(time.Second * 10)
newDeadline := deadline.Add(time.Second * 30)

// Create a new context with the updated deadline
newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline)
Expand All @@ -294,7 +293,6 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction
if err != nil {
return nil, err
}
fmt.Println("shotuna response", response)
txStatus = &TransactionStatus{
TxID: response.GetTxid(),
Status: response.GetStatus().String(),
Expand Down Expand Up @@ -347,7 +345,7 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio

deadline, _ := ctx.Deadline()
// decrease time to get initial deadline
newDeadline := deadline.Add(time.Second * 10)
newDeadline := deadline.Add(time.Second * 30)

// increase time to make sure that expiration happens from inside the metramorph function
newCtx, newCancel := context.WithDeadline(context.Background(), newDeadline)
Expand Down
12 changes: 0 additions & 12 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,6 @@ func (p *Processor) StartProcessMinedCallbacks() {

txsBlocks = append(txsBlocks, txBlock)

fmt.Println("shotuna", time.Now())
// if we have a pending request with given transaction hash, provide mined status
if len(txBlock.TransactionHash) != 0 {
hash, err := chainhash.NewHash(txBlock.TransactionHash)
if err == nil {
fmt.Println("shotuna 3", time.Now(), hash.String())
}
}

if len(txsBlocks) < p.processMinedBatchSize {
continue
}
Expand Down Expand Up @@ -325,7 +316,6 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr

for _, data := range updatedData {
// if we have a pending request with given transaction hash, provide mined status
fmt.Println("shota meore", time.Now(), data.Hash.String())
p.responseProcessor.UpdateStatus(data.Hash, StatusAndError{
Hash: data.Hash,
Status: metamorph_api.Status_MINED,
Expand Down Expand Up @@ -754,8 +744,6 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques
tracing.EndTracing(span, err)
}()

dl, ok := ctx.Deadline()
fmt.Println("shota registering", dl, ok)
statusResponse := NewStatusResponse(ctx, req.Data.Hash, req.ResponseChannel)

// check if tx already stored, return it
Expand Down
18 changes: 1 addition & 17 deletions internal/metamorph/response_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package metamorph

import (
"context"
"fmt"
"sync"
"time"

"github.com/libsv/go-p2p/chaincfg/chainhash"
)
Expand All @@ -28,12 +26,11 @@ func (r *StatusResponse) UpdateStatus(statusAndError StatusAndError) {
if r.statusCh == nil || r.ctx == nil {
return
}
fmt.Println("shotuna sending status")

select {
case <-r.ctx.Done():
return
default:
fmt.Println("shotuna sending status", statusAndError.Status)
r.statusCh <- StatusAndError{
Hash: r.Hash,
Status: statusAndError.Status,
Expand All @@ -56,9 +53,6 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) {
return
}

dl, ok := statusResponse.ctx.Deadline()
fmt.Println("shota registering 2", dl, ok)

_, loaded := p.resMap.LoadOrStore(*statusResponse.Hash, statusResponse)
if loaded {
return
Expand All @@ -67,31 +61,21 @@ func (p *ResponseProcessor) Add(statusResponse *StatusResponse) {
// we no longer need status response object after response has been returned
go func() {
<-statusResponse.ctx.Done()
fmt.Println("shota expired", time.Now())
p.resMap.Delete(*statusResponse.Hash)
}()
}

func (p *ResponseProcessor) UpdateStatus(hash *chainhash.Hash, statusAndError StatusAndError) (found bool) {
val, ok := p.resMap.Load(*hash)
p.resMap.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
if !ok {
fmt.Println("shota hash not found")
return false
}

statusResponse, ok := val.(*StatusResponse)
fmt.Println("shota hash not found 2")

if !ok {
return false
}

fmt.Println("shota hash found 3")

go statusResponse.UpdateStatus(statusAndError)
return true
}
Expand Down
Loading

0 comments on commit c812e52

Please sign in to comment.