Skip to content

Commit

Permalink
discovery+graph: track job set dependencies in ValidationBarrier
Browse files Browse the repository at this point in the history
This commit does two things:
- removes the concept of allow / deny. Having this in place was a
  minor optimization and removing it makes the solution simpler.
- changes the job dependency tracking to track sets of abstact
  parent jobs rather than individual parent jobs.

As a note, the purpose of the ValidationBarrier is that it allows us
to launch gossip validation jobs in goroutines while still ensuring
that the validation order of these goroutines is adhered to when it
comes to validating ChannelAnnouncement _before_ ChannelUpdate and
_before_ NodeAnnouncement.
  • Loading branch information
Crypt-iQ committed Dec 6, 2024
1 parent 94bc601 commit 6f66837
Show file tree
Hide file tree
Showing 3 changed files with 495 additions and 132 deletions.
35 changes: 25 additions & 10 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ type AuthenticatedGossiper struct {
// AuthenticatedGossiper lock.
chanUpdateRateLimiter map[uint64][2]*rate.Limiter

// vb is used to enforce job dependency ordering of gossip messages.
vb *graph.ValidationBarrier

sync.Mutex
}

Expand All @@ -537,6 +540,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
banman: newBanman(),
}

gossiper.vb = graph.NewValidationBarrier(1000, gossiper.quit)

gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
Expand Down Expand Up @@ -1398,10 +1403,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
}

// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := graph.NewValidationBarrier(1000, d.quit)

for {
select {
// A new policy update has arrived. We'll commit it to the
Expand Down Expand Up @@ -1470,11 +1471,17 @@ func (d *AuthenticatedGossiper) networkHandler() {
// We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg)
annJobID, err := d.vb.InitJobDependencies(
announcement.msg,
)
if err != nil {
announcement.err <- err
continue
}

d.wg.Add(1)
go d.handleNetworkMessages(
announcement, &announcements, validationBarrier,
announcement, &announcements, annJobID,
)

// The trickle timer has ticked, which indicates we should
Expand Down Expand Up @@ -1525,18 +1532,18 @@ func (d *AuthenticatedGossiper) networkHandler() {
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
deDuped *deDupedAnnouncements, jobID graph.JobID) {

defer d.wg.Done()
defer vb.CompleteJob()
defer d.vb.CompleteJob()

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
err := d.vb.WaitForParents(jobID, nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)
Expand Down Expand Up @@ -1566,7 +1573,15 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,

// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)
err = d.vb.SignalDependents(nMsg.msg, jobID)
if err != nil {
// Something is wrong if SignalDependents returns an error.
log.Errorf("SignalDependents returned error for msg=%v with "+
"JobID=%v", spew.Sdump(nMsg.msg), jobID)

nMsg.err <- err
return
}

// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
Expand Down
Loading

0 comments on commit 6f66837

Please sign in to comment.