Skip to content

Commit

Permalink
feat(ARCO-291): fix listener
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Jan 17, 2025
1 parent a80f843 commit e8d128e
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions internal/blocktx/bcnet/mcast/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@ import (
"errors"
"log/slog"
"os"
"time"

"github.com/libsv/go-p2p/wire"

"github.com/bitcoin-sv/arc/internal/blocktx/bcnet"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/multicast"
"github.com/libsv/go-p2p/wire"
)

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage")

const (
lockTime = 5 * time.Minute
maxBlocksInProgress = 1
)

var _ multicast.MessageHandlerI = (*Listener)(nil)

// Listener is a multicast message listener specifically designed for processing blocks messages.
Expand Down Expand Up @@ -83,11 +90,12 @@ func (l *Listener) OnReceive(msg wire.Message) {
hash := blockMsg.Hash

l.logger.Info("Received BLOCK msg from multicast group", slog.String("hash", hash.String()))

processedBy, err := l.store.SetBlockProcessing(context.Background(), hash, l.hostname)
processedBy, err := l.store.SetBlockProcessing(context.Background(), hash, l.hostname, lockTime, maxBlocksInProgress)
if err != nil {
// block is already being processed by another blocktx instance
if errors.Is(err, store.ErrBlockProcessingDuplicateKey) {
if errors.Is(err, store.ErrBlockProcessingMaximumReached) {
l.logger.Debug("block processing maximum reached", slog.String("hash", hash.String()), slog.String("processed_by", processedBy))
return
} else if errors.Is(err, store.ErrBlockProcessingInProgress) {
l.logger.Debug("block processing already in progress", slog.String("hash", hash.String()), slog.String("processed_by", processedBy))
return
}
Expand All @@ -96,8 +104,6 @@ func (l *Listener) OnReceive(msg wire.Message) {
return
}

// p.startBlockProcessGuard(p.ctx, hash) // handle it somehow

l.receiveCh <- blockMsg
}

Expand Down

0 comments on commit e8d128e

Please sign in to comment.