Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separately track connecting fronts and do not clear them on new configs #50

Merged
merged 26 commits into from
Dec 6, 2024

Conversation

myleshorton
Copy link
Contributor

The current fronted code suffers from several huge issues. First, we just keep a huge list of fronts sorted by last connect time, but when we're connecting from multiple goroutines, there may be multiple new connections being made all the time, but we just iterate through the master list. While we added re-sorting of that last during iteration, that's pretty awkward code but also is super slow to resort thousands of fronts on every iteration.

Second, especially on startup, we very quickly load either the embedded or the saved global config soon followed by a global config fetched from the network. Previously, the newly fetched config would just abort any previous work testing fronts, and it would start from scratch. This code instead prepents the new fronts to the master list and keeps iterating.

@coveralls
Copy link

coveralls commented Nov 30, 2024

Coverage Status

coverage: 83.697% (+2.2%) from 81.467%
when pulling 7bc4f00 on myles/track-connected
into a556be1 on main.

}
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is a key section

return
case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a key change @garmr-ulfr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is meant to run forever (unless stopped) ensuring that we always have at least 4 working fronts, if possible, correct? I like it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to find 4 as soon as possible, it would be much faster to have X workers running independently instead of in a group. Right now, we can't vet the next batch until the entire current batch has been vetted. We could be waiting on just one to timeout even though the rest have already finished.

Something like this:

func (f *fronted) findWorkingFronts() {
	const workers = 40
	frontCh := make(chan Front, workers)
	for i := 0; i < workers; i++ {
		go f.vetFrontWorker(frontCh)
	}

	// Keep looping through all fronts making sure we have working ones.
	i := 0
	for {
		// Continually loop through the fronts until we have 4 working ones, always looping around
		// to the beginning if we reach the end. This is important, for example, when the user goes
		// offline and all fronts start failing. We want to just keep trying in that case so that we
		// find working fronts as soon as they come back online.
		if f.connectingFronts.size() < 4 {
			// keep sending fronts to the workers
			select {
			case <-f.stopCh:
				return
			case frontCh <- f.frontAt(i):
				i++
				if i >= f.frontSize() {
					i = 0
				}
			}
		} else {
			// wait for a bit
			select {
			case <-f.stopCh:
				return
			case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
			}
		}
	}
}

func (f *fronted) vetFrontWorker(frontCh <-chan Front) {
	for {
		select {
		case <-f.stopCh:
			return
		case m := <-frontCh:
			working := f.vetFront(m)
			if working {
				f.connectingFronts.onConnected(m)
			} else {
				m.markFailed()
			}
		}
	}
}

this isn't tested or anything and it might not be complete. But, just an idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah interesting so make sure x goroutines are always running...gotta think about that but I like the idea

Copy link
Contributor Author

@myleshorton myleshorton Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of just using workers instead of a waitgroup is interesting. My only hesitation is that the pattern tends to be that, if a front is going to fail, it's usually going to timeout, in which case the entire batch just fails at the 5 second mark. That said, that's not always the case, and sometime they fail right away on a cert mismatch or something, so I do agree with the change -- will look more at that today

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req = req.WithContext(ctx)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now the timeout is just based on the request context, which to me seems much cleaner

@myleshorton
Copy link
Contributor Author

You guys mind taking a look at this? Lots of name tweaks, but I highlighted some of the key sections. This ditches the confusing context.go stuff in favor of just having a single fronted instance that periodically gets updated with new fronts from the global config.

@garmr-ulfr
Copy link
Contributor

I can take a look. How soon do you need it done? I'm currently going through reflog's vmess PRs that I owe him, but I don't think he's in a rush.

@garmr-ulfr
Copy link
Contributor

Reviewing it now.

@myleshorton
Copy link
Contributor Author

Oh hey sorry just seeing your comment but not a huge rush. I basically want to just get the client a little more functional, and then I think we should really shift our gaze to your work on the Outline SDK

Comment on lines +21 to +28
// newConnectingFronts creates a new ConnectingFronts struct with a channel of fronts that have
// successfully connected.
func newConnectingFronts(size int) *connecting {
return &connecting{
// We overallocate the channel to avoid blocking.
frontsCh: make(chan Front, size),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the doc is out of sync, frontCh would be empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right -- it would be empty, but with a large capacity based on this call from fronted.go:

connectingFronts: newConnectingFronts(4000),

The idea is to just to make sure that it won't block on adding connecting fronts.

Copy link
Contributor

@garmr-ulfr garmr-ulfr Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"of fronts that have" kind of implies it would already contain some successful fronts by the time newConnectingFronts returns. I could see that being a "gotcha" for someone. Changing it to "for fronts" would avoid confusion.

}
}

// AddFront adds a new front to the list of fronts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also out of sync.

Comment on lines +25 to +26
// We overallocate the channel to avoid blocking.
frontsCh: make(chan Front, size),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to allocate more than size?

func (m *front) isSucceeding() bool {
m.mx.RLock()
defer m.mx.RUnlock()
return m.LastSucceeded.After(time.Time{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns true if m has succeeded at any point. LastSucceeded > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, except markFailed directly above that does this, so I think it works:

func (m *front) markFailed() {
	m.mx.Lock()
	defer m.mx.Unlock()
	m.LastSucceeded = time.Time{}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK. I see now. That's a bit confusing, but it does work, as long as markFailed is called.

fronted.go Show resolved Hide resolved
return
case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is meant to run forever (unless stopped) ensuring that we always have at least 4 working fronts, if possible, correct? I like it!

return
case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to find 4 as soon as possible, it would be much faster to have X workers running independently instead of in a group. Right now, we can't vet the next batch until the entire current batch has been vetted. We could be waiting on just one to timeout even though the rest have already finished.

Something like this:

func (f *fronted) findWorkingFronts() {
	const workers = 40
	frontCh := make(chan Front, workers)
	for i := 0; i < workers; i++ {
		go f.vetFrontWorker(frontCh)
	}

	// Keep looping through all fronts making sure we have working ones.
	i := 0
	for {
		// Continually loop through the fronts until we have 4 working ones, always looping around
		// to the beginning if we reach the end. This is important, for example, when the user goes
		// offline and all fronts start failing. We want to just keep trying in that case so that we
		// find working fronts as soon as they come back online.
		if f.connectingFronts.size() < 4 {
			// keep sending fronts to the workers
			select {
			case <-f.stopCh:
				return
			case frontCh <- f.frontAt(i):
				i++
				if i >= f.frontSize() {
					i = 0
				}
			}
		} else {
			// wait for a bit
			select {
			case <-f.stopCh:
				return
			case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
			}
		}
	}
}

func (f *fronted) vetFrontWorker(frontCh <-chan Front) {
	for {
		select {
		case <-f.stopCh:
			return
		case m := <-frontCh:
			working := f.vetFront(m)
			if working {
				f.connectingFronts.onConnected(m)
			} else {
				m.markFailed()
			}
		}
	}
}

this isn't tested or anything and it might not be complete. But, just an idea.

@WendelHime
Copy link
Contributor

Just a reminder, d7ef158 can be reverted, jovis recently added support to the newest utls version to all required packages and it should work with latest flashlight and lantern-client

@myleshorton
Copy link
Contributor Author

OK, I really like the idea of just using a worker group vs a waitgroup for vetting masquerades, but I'd like to do that as a separate PR in the interest of getting this out.

@myleshorton
Copy link
Contributor Author

Actually check that -- change incoming!

@myleshorton
Copy link
Contributor Author

OK I just made that worker pool switch @garmr-ulfr -- see what you think. It seems to be working pretty well.

@garmr-ulfr garmr-ulfr self-requested a review December 6, 2024 18:49
fronted.go Outdated

// Submit all fronts to the worker pool.
for i := 0; i < f.frontSize(); i++ {
i := i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i := i isn't necessary anymore. I think it was 1.21 that fixed that.

fronted.go Outdated
Comment on lines 245 to 271
func (f *fronted) tryAllFronts() {
// Vet fronts using a worker pool of 40 goroutines.
pool := pond.NewPool(40)

// Submit all fronts to the worker pool.
for i := 0; i < f.frontSize(); i++ {
i := i
m := f.frontAt(i)
pool.Submit(func() {
log.Debugf("Running task #%d with front %v", i, m.getIpAddress())
if f.hasEnoughWorkingFronts() {
// We have enough working fronts, so no need to continue.
log.Debug("Enough working fronts...ignoring task")
return
}
working := f.vetFront(m)
if working {
f.connectingFronts.onConnected(m)
} else {
m.markFailed()
}
})
}

// Stop the pool and wait for all submitted tasks to complete
pool.StopAndWait()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably still stop testing fronts when Close is called. Other than that, this looks great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I just added a check in the funcs to see if we're stopped -- there's otherwise no good way to kill the extant workers.

Comment on lines +107 to +127
func (f *fronted) UpdateConfig(pool *x509.CertPool, providers map[string]*Provider) {
// Make copies just to avoid any concurrency issues with access that may be happening on the
// caller side.
log.Debug("Updating fronted configuration")
if len(providers) == 0 {
log.Errorf("No providers configured")
return
}
providersCopy := copyProviders(providers)
f.frontedMu.Lock()
defer f.frontedMu.Unlock()
f.addProviders(providersCopy)
f.addFronts(loadFronts(providersCopy))

f.certPool.Store(pool)

// The goroutine for finding working fronts runs forever, so only start it once.
f.crawlOnce.Do(func() {
go f.findWorkingFronts()
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just occurred to me: we should return an error here if fronted has been stopped or track whether findWorkingFronts is running or not and restart it if we receive a new config after it's been stopped. Using sync.Once could be problematic. If a goroutine doesn't check fronted hasn't been Closed, it could call UpdateConfig and go on happy assuming findWorkingFronts is running, which could be difficult to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmnn interesting. The only time Close is intended to be called is when the app is closed, but I suppose safeguarding against random closing is a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly think we should trust callers to not randomly stop fronted

@myleshorton
Copy link
Contributor Author

I'm going to pull this in so we don't leave it dangling over the weekend. Thanks for the great review @garmr-ulfr!!

@myleshorton myleshorton merged commit 24178df into main Dec 6, 2024
1 check passed
@myleshorton myleshorton deleted the myles/track-connected branch December 6, 2024 21:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants