diff --git a/batcher.go b/batcher.go new file mode 100644 index 0000000..7f0e486 --- /dev/null +++ b/batcher.go @@ -0,0 +1,90 @@ +package p2p + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/libsv/go-p2p/chaincfg/chainhash" +) + +// BatchProcessor for chain hashes runs a specified function on a batch of chain hashes in specified intervals or if the batch has reached a specified size +type BatchProcessor struct { + fn func([]*chainhash.Hash) + batchSize int + runInterval time.Duration + batch []*chainhash.Hash + hashChannel chan *chainhash.Hash + wg *sync.WaitGroup + cancelAll context.CancelFunc + ctx context.Context +} + +func NewBatchProcessor(batchSize int, runInterval time.Duration, fn func(batch []*chainhash.Hash), bufferSize int) (*BatchProcessor, error) { + if batchSize == 0 { + return nil, errors.New("batch size must be greater than zero") + } + + b := &BatchProcessor{ + fn: fn, + batchSize: batchSize, + runInterval: runInterval, + hashChannel: make(chan *chainhash.Hash, bufferSize), + wg: &sync.WaitGroup{}, + } + ctx, cancel := context.WithCancel(context.Background()) + b.ctx = ctx + b.cancelAll = cancel + + b.start() + + return b, nil +} + +func (b *BatchProcessor) Put(item *chainhash.Hash) { + b.hashChannel <- item +} + +func (b *BatchProcessor) Shutdown() { + if b.cancelAll != nil { + b.cancelAll() + b.wg.Wait() + } +} + +func (b *BatchProcessor) runFunction() { + copyBatch := make([]*chainhash.Hash, len(b.batch)) + + copy(copyBatch, b.batch) + + b.batch = b.batch[:0] // Clear the batch slice without reallocating the underlying memory + + go b.fn(copyBatch) +} + +func (b *BatchProcessor) start() { + runTicker := time.NewTicker(b.runInterval) + b.wg.Add(1) + go func() { + defer b.wg.Done() + for { + select { + case <-b.ctx.Done(): + runTicker.Stop() + return + case item := <-b.hashChannel: + b.batch = append(b.batch, item) + + if len(b.batch) >= b.batchSize { + b.runFunction() + } + + case <-runTicker.C: + if len(b.batch) > 0 { + b.runFunction() + } + } + } + }() +} diff --git a/go.mod b/go.mod index 1ffeeea..388e9f3 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3 github.com/cenkalti/backoff/v4 v4.2.1 github.com/davecgh/go-spew v1.1.1 - github.com/ordishs/go-utils v1.0.24 github.com/ory/dockertest/v3 v3.10.0 github.com/stretchr/testify v1.8.1 ) diff --git a/go.sum b/go.sum index f463d5a..625ffc9 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,6 @@ github.com/opencontainers/runc v1.1.5 h1:L44KXEpKmfWDcS02aeGm8QNTFXTo2D+8MYGDIJ/ github.com/opencontainers/runc v1.1.5/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= -github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8= -github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc= github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/peer.go b/peer.go index 4173de8..120c099 100644 --- a/peer.go +++ b/peer.go @@ -16,11 +16,10 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/libsv/go-p2p/bsvutil" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" - "github.com/ordishs/go-utils" - "github.com/ordishs/go-utils/batcher" ) const ( @@ -68,8 +67,8 @@ type Peer struct { sentVerAck atomic.Bool receivedVerAck atomic.Bool batchDelay time.Duration - invBatcher *batcher.Batcher[chainhash.Hash] - dataBatcher *batcher.Batcher[chainhash.Hash] + invBatchProcessor *BatchProcessor + dataBatchProcessor *BatchProcessor maximumMessageSize int64 isHealthy atomic.Bool userAgentName *string @@ -130,6 +129,17 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw return nil, fmt.Errorf("failed to apply option, %v", err) } } + invBatchProcessor, err := NewBatchProcessor(500, p.batchDelay, p.sendInvBatch, 10000) + if err != nil { + return nil, fmt.Errorf("failed to create inv batch processor, %v", err) + } + p.invBatchProcessor = invBatchProcessor + + dataBatchProcessor, err := NewBatchProcessor(500, p.batchDelay, p.sendDataBatch, 10000) + if err != nil { + return nil, fmt.Errorf("failed to create data batch processor, %v", err) + } + p.dataBatchProcessor = dataBatchProcessor p.start() @@ -146,9 +156,6 @@ func (p *Peer) start() { p.startMonitorPingPong() - p.invBatcher = batcher.New(500, p.batchDelay, p.sendInvBatch, true) - p.dataBatcher = batcher.New(500, p.batchDelay, p.sendDataBatch, true) - if p.incomingConn != nil { go func() { err := p.connectAndStartReadWriteHandlers() @@ -304,7 +311,8 @@ func (p *Peer) Connecting() bool { } func (p *Peer) WriteMsg(msg wire.Message) error { - utils.SafeSend(p.writeChan, msg) + p.writeChan <- msg + return nil } @@ -601,11 +609,11 @@ func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) { } func (p *Peer) AnnounceTransaction(hash *chainhash.Hash) { - p.invBatcher.Put(hash) + p.invBatchProcessor.Put(hash) } func (p *Peer) RequestTransaction(hash *chainhash.Hash) { - p.dataBatcher.Put(hash) + p.dataBatchProcessor.Put(hash) } func (p *Peer) AnnounceBlock(blockHash *chainhash.Hash) { @@ -871,5 +879,8 @@ func (p *Peer) Shutdown() { p.writerWg.Wait() p.readerWg.Wait() + p.invBatchProcessor.Shutdown() + p.dataBatchProcessor.Shutdown() + p.logger.Info("Shutdown complete") } diff --git a/test/data.go b/test/data.go index 528976f..1a92ceb 100644 --- a/test/data.go +++ b/test/data.go @@ -2,10 +2,8 @@ package test import ( "encoding/hex" - "time" "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/ordishs/go-utils" ) var ( @@ -17,9 +15,4 @@ var ( TX2Hash, _ = chainhash.NewHashFromStr(TX2) TX2Raw = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1a0385c40b2f7461616c2e636f6d2f0fde4a8a8fbbf5c636010000ffffffff02c9d74425000000001976a914522cf9e7626d9bd8729e5a1398ece40dad1b6a2f88ac0000000000000000fda502006a04ac1eed884d53027b2276657273696f6e223a22302e31222c22686569676874223a3737313230352c22707265764d696e65724964223a22303365393264336535633366376264393435646662663438653761393933393362316266623366313166333830616533306432383665376666326165633561323730222c22707265764d696e65724964536967223a2233303435303232313030643736333630653464323133333163613836663031386330343665353763393338663139373735303734373333333533363062653337303438636165316166333032323030626536363034353430323162663934363465393966356139353831613938633963663439353430373539386335396234373334623266646234383262663937222c226d696e65724964223a22303365393264336535633366376264393435646662663438653761393933393362316266623366313166333830616533306432383665376666326165633561323730222c2276637478223a7b2274784964223a2235373962343335393235613930656533396133376265336230306239303631653734633330633832343133663664306132303938653162656137613235313566222c22766f7574223a307d2c226d696e6572436f6e74616374223a7b22656d61696c223a22696e666f407461616c2e636f6d222c226e616d65223a225441414c20446973747269627574656420496e666f726d6174696f6e20546563686e6f6c6f67696573222c226d65726368616e74415049456e64506f696e74223a2268747470733a2f2f6d65726368616e746170692e7461616c2e636f6d2f227d7d473045022100f7a4d02865a503d202b4e3d1b045a7efaef830b3d27efd6482c939c376e58c7802202d9332ca0a8b90001fa143512b4b6f3fe00f227d9d882861cc87975cb8778da800000000" TX2RawBytes, _ = hex.DecodeString(TX2Raw) - TX3 = "3f63399b3d9d94ba9c5b7398b9328dcccfcfd50f07ad8b214e766168c391642b" - TX3Bytes, _ = utils.DecodeAndReverseHexString(TX3) - TX4 = "88eab41a8d0b7b4bc395f8f988ea3d6e63c8bc339526fd2f00cb7ce6fd7df0f7" - TX4Bytes, _ = utils.DecodeAndReverseHexString(TX4) - Time = time.Date(2009, 1, 03, 18, 15, 05, 0, time.UTC) )