Skip to content

Commit

Permalink
refactor(ARCO-212): Introduce batch processor for chain hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Nov 26, 2024
1 parent 08a38f5 commit 0a086c0
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 20 deletions.
90 changes: 90 additions & 0 deletions batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package p2p

import (
"context"
"errors"
"sync"
"time"

"github.com/libsv/go-p2p/chaincfg/chainhash"
)

// BatchProcessor for chain hashes runs a specified function on a batch of chain hashes in specified intervals or if the batch has reached a specified size
type BatchProcessor struct {
fn func([]*chainhash.Hash)
batchSize int
runInterval time.Duration
batch []*chainhash.Hash
hashChannel chan *chainhash.Hash
wg *sync.WaitGroup
cancelAll context.CancelFunc
ctx context.Context
}

func NewBatchProcessor(batchSize int, runInterval time.Duration, fn func(batch []*chainhash.Hash), bufferSize int) (*BatchProcessor, error) {
if batchSize == 0 {
return nil, errors.New("batch size must be greater than zero")
}

b := &BatchProcessor{
fn: fn,
batchSize: batchSize,
runInterval: runInterval,
hashChannel: make(chan *chainhash.Hash, bufferSize),
wg: &sync.WaitGroup{},
}
ctx, cancel := context.WithCancel(context.Background())
b.ctx = ctx
b.cancelAll = cancel

b.start()

return b, nil
}

func (b *BatchProcessor) Put(item *chainhash.Hash) {
b.hashChannel <- item
}

func (b *BatchProcessor) Shutdown() {
if b.cancelAll != nil {
b.cancelAll()
b.wg.Wait()
}
}

func (b *BatchProcessor) runFunction() {
copyBatch := make([]*chainhash.Hash, len(b.batch))

copy(copyBatch, b.batch)

b.batch = b.batch[:0] // Clear the batch slice without reallocating the underlying memory

go b.fn(copyBatch)
}

func (b *BatchProcessor) start() {
runTicker := time.NewTicker(b.runInterval)
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.ctx.Done():
runTicker.Stop()
return
case item := <-b.hashChannel:
b.batch = append(b.batch, item)

if len(b.batch) >= b.batchSize {
b.runFunction()
}

case <-runTicker.C:
if len(b.batch) > 0 {
b.runFunction()
}
}
}
}()
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3
github.com/cenkalti/backoff/v4 v4.2.1
github.com/davecgh/go-spew v1.1.1
github.com/ordishs/go-utils v1.0.24
github.com/ory/dockertest/v3 v3.10.0
github.com/stretchr/testify v1.8.1
)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ github.com/opencontainers/runc v1.1.5 h1:L44KXEpKmfWDcS02aeGm8QNTFXTo2D+8MYGDIJ/
github.com/opencontainers/runc v1.1.5/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg=
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8=
github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc=
github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4=
github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
31 changes: 21 additions & 10 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
"time"

"github.com/cenkalti/backoff/v4"

"github.com/libsv/go-p2p/bsvutil"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
"github.com/ordishs/go-utils"
"github.com/ordishs/go-utils/batcher"
)

const (
Expand Down Expand Up @@ -68,8 +67,8 @@ type Peer struct {
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
invBatchProcessor *BatchProcessor
dataBatchProcessor *BatchProcessor
maximumMessageSize int64
isHealthy atomic.Bool
userAgentName *string
Expand Down Expand Up @@ -130,6 +129,17 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
return nil, fmt.Errorf("failed to apply option, %v", err)
}
}
invBatchProcessor, err := NewBatchProcessor(500, p.batchDelay, p.sendInvBatch, 10000)
if err != nil {
return nil, fmt.Errorf("failed to create inv batch processor, %v", err)
}
p.invBatchProcessor = invBatchProcessor

dataBatchProcessor, err := NewBatchProcessor(500, p.batchDelay, p.sendDataBatch, 10000)
if err != nil {
return nil, fmt.Errorf("failed to create data batch processor, %v", err)
}
p.dataBatchProcessor = dataBatchProcessor

p.start()

Expand All @@ -146,9 +156,6 @@ func (p *Peer) start() {

p.startMonitorPingPong()

p.invBatcher = batcher.New(500, p.batchDelay, p.sendInvBatch, true)
p.dataBatcher = batcher.New(500, p.batchDelay, p.sendDataBatch, true)

if p.incomingConn != nil {
go func() {
err := p.connectAndStartReadWriteHandlers()
Expand Down Expand Up @@ -304,7 +311,8 @@ func (p *Peer) Connecting() bool {
}

func (p *Peer) WriteMsg(msg wire.Message) error {
utils.SafeSend(p.writeChan, msg)
p.writeChan <- msg

return nil
}

Expand Down Expand Up @@ -601,11 +609,11 @@ func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) {
}

func (p *Peer) AnnounceTransaction(hash *chainhash.Hash) {
p.invBatcher.Put(hash)
p.invBatchProcessor.Put(hash)
}

func (p *Peer) RequestTransaction(hash *chainhash.Hash) {
p.dataBatcher.Put(hash)
p.dataBatchProcessor.Put(hash)
}

func (p *Peer) AnnounceBlock(blockHash *chainhash.Hash) {
Expand Down Expand Up @@ -871,5 +879,8 @@ func (p *Peer) Shutdown() {
p.writerWg.Wait()
p.readerWg.Wait()

p.invBatchProcessor.Shutdown()
p.dataBatchProcessor.Shutdown()

p.logger.Info("Shutdown complete")
}
7 changes: 0 additions & 7 deletions test/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package test

import (
"encoding/hex"
"time"

"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/ordishs/go-utils"
)

var (
Expand All @@ -17,9 +15,4 @@ var (
TX2Hash, _ = chainhash.NewHashFromStr(TX2)
TX2Raw = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1a0385c40b2f7461616c2e636f6d2f0fde4a8a8fbbf5c636010000ffffffff02c9d74425000000001976a914522cf9e7626d9bd8729e5a1398ece40dad1b6a2f88ac0000000000000000fda502006a04ac1eed884d53027b2276657273696f6e223a22302e31222c22686569676874223a3737313230352c22707265764d696e65724964223a22303365393264336535633366376264393435646662663438653761393933393362316266623366313166333830616533306432383665376666326165633561323730222c22707265764d696e65724964536967223a2233303435303232313030643736333630653464323133333163613836663031386330343665353763393338663139373735303734373333333533363062653337303438636165316166333032323030626536363034353430323162663934363465393966356139353831613938633963663439353430373539386335396234373334623266646234383262663937222c226d696e65724964223a22303365393264336535633366376264393435646662663438653761393933393362316266623366313166333830616533306432383665376666326165633561323730222c2276637478223a7b2274784964223a2235373962343335393235613930656533396133376265336230306239303631653734633330633832343133663664306132303938653162656137613235313566222c22766f7574223a307d2c226d696e6572436f6e74616374223a7b22656d61696c223a22696e666f407461616c2e636f6d222c226e616d65223a225441414c20446973747269627574656420496e666f726d6174696f6e20546563686e6f6c6f67696573222c226d65726368616e74415049456e64506f696e74223a2268747470733a2f2f6d65726368616e746170692e7461616c2e636f6d2f227d7d473045022100f7a4d02865a503d202b4e3d1b045a7efaef830b3d27efd6482c939c376e58c7802202d9332ca0a8b90001fa143512b4b6f3fe00f227d9d882861cc87975cb8778da800000000"
TX2RawBytes, _ = hex.DecodeString(TX2Raw)
TX3 = "3f63399b3d9d94ba9c5b7398b9328dcccfcfd50f07ad8b214e766168c391642b"
TX3Bytes, _ = utils.DecodeAndReverseHexString(TX3)
TX4 = "88eab41a8d0b7b4bc395f8f988ea3d6e63c8bc339526fd2f00cb7ce6fd7df0f7"
TX4Bytes, _ = utils.DecodeAndReverseHexString(TX4)
Time = time.Date(2009, 1, 03, 18, 15, 05, 0, time.UTC)
)

0 comments on commit 0a086c0

Please sign in to comment.