Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ARCO-212): Introduce batch processor for chain hashes #34

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package p2p

import (
"context"
"errors"
"sync"
"time"

"github.com/libsv/go-p2p/chaincfg/chainhash"
)

// BatchProcessor for chain hashes runs a specified function on a batch of chain hashes in specified intervals or if the batch has reached a specified size
type BatchProcessor struct {
fn func([]*chainhash.Hash)
batchSize int
runInterval time.Duration
batch []*chainhash.Hash
hashChannel chan *chainhash.Hash
wg *sync.WaitGroup
cancelAll context.CancelFunc
ctx context.Context
}

func NewBatchProcessor(batchSize int, runInterval time.Duration, fn func(batch []*chainhash.Hash), bufferSize int) (*BatchProcessor, error) {
if batchSize == 0 {
return nil, errors.New("batch size must be greater than zero")
}

b := &BatchProcessor{
fn: fn,
batchSize: batchSize,
runInterval: runInterval,
hashChannel: make(chan *chainhash.Hash, bufferSize),
wg: &sync.WaitGroup{},
}
ctx, cancel := context.WithCancel(context.Background())
b.ctx = ctx
b.cancelAll = cancel

b.start()

return b, nil
}

func (b *BatchProcessor) Put(item *chainhash.Hash) {
b.hashChannel <- item
}

func (b *BatchProcessor) Shutdown() {
if b.cancelAll != nil {
b.cancelAll()
b.wg.Wait()
}
}

func (b *BatchProcessor) runFunction() {
copyBatch := make([]*chainhash.Hash, len(b.batch))

copy(copyBatch, b.batch)

b.batch = b.batch[:0] // Clear the batch slice without reallocating the underlying memory

go b.fn(copyBatch)
}

func (b *BatchProcessor) start() {
runTicker := time.NewTicker(b.runInterval)
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.ctx.Done():
runTicker.Stop()
return
case item := <-b.hashChannel:
b.batch = append(b.batch, item)

if len(b.batch) >= b.batchSize {
b.runFunction()
}

case <-runTicker.C:
if len(b.batch) > 0 {
b.runFunction()
}
}
}
}()
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3
github.com/cenkalti/backoff/v4 v4.2.1
github.com/davecgh/go-spew v1.1.1
github.com/ordishs/go-utils v1.0.24
github.com/ory/dockertest/v3 v3.10.0
github.com/stretchr/testify v1.8.1
)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ github.com/opencontainers/runc v1.1.5 h1:L44KXEpKmfWDcS02aeGm8QNTFXTo2D+8MYGDIJ/
github.com/opencontainers/runc v1.1.5/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg=
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8=
github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc=
github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4=
github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
31 changes: 21 additions & 10 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
"time"

"github.com/cenkalti/backoff/v4"

"github.com/libsv/go-p2p/bsvutil"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
"github.com/ordishs/go-utils"
"github.com/ordishs/go-utils/batcher"
)

const (
Expand Down Expand Up @@ -68,8 +67,8 @@ type Peer struct {
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
invBatchProcessor *BatchProcessor
dataBatchProcessor *BatchProcessor
maximumMessageSize int64
isHealthy atomic.Bool
userAgentName *string
Expand Down Expand Up @@ -130,6 +129,17 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
return nil, fmt.Errorf("failed to apply option, %v", err)
}
}
invBatchProcessor, err := NewBatchProcessor(500, p.batchDelay, p.sendInvBatch, 10000)
if err != nil {
return nil, fmt.Errorf("failed to create inv batch processor, %v", err)
}
p.invBatchProcessor = invBatchProcessor

dataBatchProcessor, err := NewBatchProcessor(500, p.batchDelay, p.sendDataBatch, 10000)
if err != nil {
return nil, fmt.Errorf("failed to create data batch processor, %v", err)
}
p.dataBatchProcessor = dataBatchProcessor

p.start()

Expand All @@ -146,9 +156,6 @@ func (p *Peer) start() {

p.startMonitorPingPong()

p.invBatcher = batcher.New(500, p.batchDelay, p.sendInvBatch, true)
p.dataBatcher = batcher.New(500, p.batchDelay, p.sendDataBatch, true)

if p.incomingConn != nil {
go func() {
err := p.connectAndStartReadWriteHandlers()
Expand Down Expand Up @@ -304,7 +311,8 @@ func (p *Peer) Connecting() bool {
}

func (p *Peer) WriteMsg(msg wire.Message) error {
utils.SafeSend(p.writeChan, msg)
p.writeChan <- msg

return nil
}

Expand Down Expand Up @@ -601,11 +609,11 @@ func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) {
}

func (p *Peer) AnnounceTransaction(hash *chainhash.Hash) {
p.invBatcher.Put(hash)
p.invBatchProcessor.Put(hash)
}

func (p *Peer) RequestTransaction(hash *chainhash.Hash) {
p.dataBatcher.Put(hash)
p.dataBatchProcessor.Put(hash)
}

func (p *Peer) AnnounceBlock(blockHash *chainhash.Hash) {
Expand Down Expand Up @@ -871,5 +879,8 @@ func (p *Peer) Shutdown() {
p.writerWg.Wait()
p.readerWg.Wait()

p.invBatchProcessor.Shutdown()
p.dataBatchProcessor.Shutdown()

p.logger.Info("Shutdown complete")
}
7 changes: 0 additions & 7 deletions test/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package test

import (
"encoding/hex"
"time"

"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/ordishs/go-utils"
)

var (
Expand All @@ -17,9 +15,4 @@ var (
TX2Hash, _ = chainhash.NewHashFromStr(TX2)
TX2Raw = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1a0385c40b2f7461616c2e636f6d2f0fde4a8a8fbbf5c636010000ffffffff02c9d74425000000001976a914522cf9e7626d9bd8729e5a1398ece40dad1b6a2f88ac0000000000000000fda502006a04ac1eed884d53027b2276657273696f6e223a22302e31222c22686569676874223a3737313230352c22707265764d696e65724964223a22303365393264336535633366376264393435646662663438653761393933393362316266623366313166333830616533306432383665376666326165633561323730222c22707265764d696e65724964536967223a2233303435303232313030643736333630653464323133333163613836663031386330343665353763393338663139373735303734373333333533363062653337303438636165316166333032323030626536363034353430323162663934363465393966356139353831613938633963663439353430373539386335396234373334623266646234383262663937222c226d696e65724964223a22303365393264336535633366376264393435646662663438653761393933393362316266623366313166333830616533306432383665376666326165633561323730222c2276637478223a7b2274784964223a2235373962343335393235613930656533396133376265336230306239303631653734633330633832343133663664306132303938653162656137613235313566222c22766f7574223a307d2c226d696e6572436f6e74616374223a7b22656d61696c223a22696e666f407461616c2e636f6d222c226e616d65223a225441414c20446973747269627574656420496e666f726d6174696f6e20546563686e6f6c6f67696573222c226d65726368616e74415049456e64506f696e74223a2268747470733a2f2f6d65726368616e746170692e7461616c2e636f6d2f227d7d473045022100f7a4d02865a503d202b4e3d1b045a7efaef830b3d27efd6482c939c376e58c7802202d9332ca0a8b90001fa143512b4b6f3fe00f227d9d882861cc87975cb8778da800000000"
TX2RawBytes, _ = hex.DecodeString(TX2Raw)
TX3 = "3f63399b3d9d94ba9c5b7398b9328dcccfcfd50f07ad8b214e766168c391642b"
TX3Bytes, _ = utils.DecodeAndReverseHexString(TX3)
TX4 = "88eab41a8d0b7b4bc395f8f988ea3d6e63c8bc339526fd2f00cb7ce6fd7df0f7"
TX4Bytes, _ = utils.DecodeAndReverseHexString(TX4)
Time = time.Date(2009, 1, 03, 18, 15, 05, 0, time.UTC)
)