Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separately track connecting fronts and do not clear them on new configs #50

Merged
merged 26 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d16339d
Change to separately track working fronts
myleshorton Nov 22, 2024
cc002f7
Naming tweaks
myleshorton Nov 25, 2024
41251ed
fix for test compile error
myleshorton Nov 26, 2024
008e810
remove log
myleshorton Nov 30, 2024
9ee0721
tweak else statement
myleshorton Nov 30, 2024
cfa8beb
tone down logging
myleshorton Nov 30, 2024
bd8e939
use go test fmt and check for nil param
myleshorton Nov 30, 2024
2a65be3
try new test format
myleshorton Nov 30, 2024
7f93522
Another try at cleaner test output
myleshorton Nov 30, 2024
b2c702b
do not fail fast
myleshorton Nov 30, 2024
800f367
no pipefail
myleshorton Nov 30, 2024
55e5821
install node
myleshorton Nov 30, 2024
12a4450
add node
myleshorton Nov 30, 2024
6b6fc13
Lots of cleanups to more cleanly handle continually finding working f…
myleshorton Dec 3, 2024
9b577b9
Change to only modify global config details via update call
myleshorton Dec 3, 2024
fe2273f
Merge branch 'main' into myles/track-connected
myleshorton Dec 3, 2024
988625b
no eventual
myleshorton Dec 3, 2024
7eddee1
Fix test
myleshorton Dec 3, 2024
83d7257
Improve naming and make sure requests have a context
myleshorton Dec 3, 2024
8be6775
Updated to return fronted instance
myleshorton Dec 3, 2024
d7ef158
downgraded some dependencies
myleshorton Dec 4, 2024
b9307b7
Improved cert pool handling
myleshorton Dec 4, 2024
67fc744
Added comments
myleshorton Dec 6, 2024
9a7741a
Use worker pool instead of waitgroup
myleshorton Dec 6, 2024
2954726
Use a random range to avoid quick checks
myleshorton Dec 6, 2024
7bc4f00
Do not keep vetting if stopped
myleshorton Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion connecting_fronts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ func newConnectingFronts(size int) *connecting {
}
}

// AddFront adds a new front to the list of fronts.
// onConnected adds a working front to the channel of working fronts.
func (cf *connecting) onConnected(m Front) {
cf.frontsCh <- m
}

func (cf *connecting) connectingFront(ctx context.Context) (Front, error) {
for {
select {
// This is typically the context of the HTTP request. If the context is done, return an error.
case <-ctx.Done():
return nil, ctx.Err()
case m := <-cf.frontsCh:
Expand Down
77 changes: 45 additions & 32 deletions fronted.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/getlantern/golog"
"github.com/getlantern/ops"

"github.com/alitto/pond/v2"
)

const (
Expand Down Expand Up @@ -144,6 +146,7 @@ func loadFronts(providers map[string]*Provider) sortedFronts {

fronts := make(sortedFronts, size)

// Note that map iteration order is random, so the order of the providers is automatically randomized.
index := 0
for key, p := range providers {
arr := p.Masquerades
Expand Down Expand Up @@ -209,38 +212,66 @@ func Vet(m *Masquerade, pool *x509.CertPool, testURL string) bool {
return masq.postCheck(conn, testURL)
}

// findWorkingFronts finds working domain fronts by vetting them in batches and in
// parallel. Speed is of the essence here, as without working fronts, users will
// findWorkingFronts finds working domain fronts by testing them using a worker pool. Speed
// is of the essence here, as without working fronts, users will
// be unable to fetch proxy configurations, particularly in the case of a first time
// user who does not have proxies cached on disk.
func (f *fronted) findWorkingFronts() {
// vet fronts in batches
const batchSize int = 40

// Keep looping through all fronts making sure we have working ones.
i := 0
for {
// Continually loop through the fronts in batches until we have 4 working ones,
// always looping around to the beginning if we reach the end.
// Continually loop through the fronts until we have 4 working ones.
// This is important, for example, when the user goes offline and all fronts start failing.
// We want to just keep trying in that case so that we find working fronts as soon as they
// come back online.
if f.connectingFronts.size() < 4 {
f.vetBatch(i, batchSize)
i = index(i, batchSize, f.frontSize())
if !f.hasEnoughWorkingFronts() {
// Note that trying all fronts takes awhile, as it only completes when we either
// have enough working fronts, or we've tried all of them.
log.Debug("findWorkingFronts::Trying all fronts")
f.tryAllFronts()
log.Debug("findWorkingFronts::Tried all fronts")
} else {
log.Debug("findWorkingFronts::Enough working fronts...sleeping")
select {
case <-f.stopCh:
log.Debug("Stopping parallel dialing")
log.Debug("findWorkingFronts::Stopping parallel dialing")
return
case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
// Run again after a random time between 0 and 12 seconds
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a key change @garmr-ulfr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is meant to run forever (unless stopped) ensuring that we always have at least 4 working fronts, if possible, correct? I like it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to find 4 as soon as possible, it would be much faster to have X workers running independently instead of in a group. Right now, we can't vet the next batch until the entire current batch has been vetted. We could be waiting on just one to timeout even though the rest have already finished.

Something like this:

func (f *fronted) findWorkingFronts() {
	const workers = 40
	frontCh := make(chan Front, workers)
	for i := 0; i < workers; i++ {
		go f.vetFrontWorker(frontCh)
	}

	// Keep looping through all fronts making sure we have working ones.
	i := 0
	for {
		// Continually loop through the fronts until we have 4 working ones, always looping around
		// to the beginning if we reach the end. This is important, for example, when the user goes
		// offline and all fronts start failing. We want to just keep trying in that case so that we
		// find working fronts as soon as they come back online.
		if f.connectingFronts.size() < 4 {
			// keep sending fronts to the workers
			select {
			case <-f.stopCh:
				return
			case frontCh <- f.frontAt(i):
				i++
				if i >= f.frontSize() {
					i = 0
				}
			}
		} else {
			// wait for a bit
			select {
			case <-f.stopCh:
				return
			case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
			}
		}
	}
}

func (f *fronted) vetFrontWorker(frontCh <-chan Front) {
	for {
		select {
		case <-f.stopCh:
			return
		case m := <-frontCh:
			working := f.vetFront(m)
			if working {
				f.connectingFronts.onConnected(m)
			} else {
				m.markFailed()
			}
		}
	}
}

this isn't tested or anything and it might not be complete. But, just an idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah interesting so make sure x goroutines are always running...gotta think about that but I like the idea

Copy link
Contributor Author

@myleshorton myleshorton Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of just using workers instead of a waitgroup is interesting. My only hesitation is that the pattern tends to be that, if a front is going to fail, it's usually going to timeout, in which case the entire batch just fails at the 5 second mark. That said, that's not always the case, and sometime they fail right away on a cert mismatch or something, so I do agree with the change -- will look more at that today

}
}

func index(i, batchSize, size int) int {
return (i + batchSize) % size
func (f *fronted) tryAllFronts() {
// Vet fronts using a worker pool of 40 goroutines.
pool := pond.NewPool(40)

// Submit all fronts to the worker pool.
for i := 0; i < f.frontSize(); i++ {
i := i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i := i isn't necessary anymore. I think it was 1.21 that fixed that.

m := f.frontAt(i)
pool.Submit(func() {
log.Debugf("Running task #%d with front %v", i, m.getIpAddress())
if f.hasEnoughWorkingFronts() {
// We have enough working fronts, so no need to continue.
log.Debug("Enough working fronts...ignoring task")
return
}
working := f.vetFront(m)
if working {
f.connectingFronts.onConnected(m)
} else {
m.markFailed()
}
})
}

// Stop the pool and wait for all submitted tasks to complete
pool.StopAndWait()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably still stop testing fronts when Close is called. Other than that, this looks great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I just added a check in the funcs to see if we're stopped -- there's otherwise no good way to kill the extant workers.


func (f *fronted) hasEnoughWorkingFronts() bool {
return f.connectingFronts.size() >= 4
}

func (f *fronted) frontSize() int {
Expand All @@ -255,24 +286,6 @@ func (f *fronted) frontAt(i int) Front {
return f.fronts[i]
}

func (f *fronted) vetBatch(start, batchSize int) {
log.Debugf("Vetting masquerade batch %d-%d", start, start+batchSize)
var wg sync.WaitGroup
for i := start; i < start+batchSize && i < f.frontSize(); i++ {
wg.Add(1)
go func(m Front) {
defer wg.Done()
working := f.vetFront(m)
if working {
f.connectingFronts.onConnected(m)
} else {
m.markFailed()
}
}(f.frontAt(i))
}
wg.Wait()
}

func (f *fronted) vetFront(m Front) bool {
conn, masqueradeGood, err := f.dialFront(m)
if err != nil {
Expand Down
62 changes: 20 additions & 42 deletions fronted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,33 +799,33 @@ func TestFindWorkingMasquerades(t *testing.T) {
{
name: "All successful",
masquerades: []*mockFront{
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockMasquerade("domain2.com", "2.2.2.2", 0, true),
newMockMasquerade("domain3.com", "3.3.3.3", 0, true),
newMockMasquerade("domain4.com", "4.4.4.4", 0, true),
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain2.com", "2.2.2.2", 0, true),
newMockFront("domain3.com", "3.3.3.3", 0, true),
newMockFront("domain4.com", "4.4.4.4", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
},
expectedSuccessful: 4,
},
{
name: "Some successful",
masquerades: []*mockFront{
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockMasquerade("domain2.com", "2.2.2.2", 0, false),
newMockMasquerade("domain3.com", "3.3.3.3", 0, true),
newMockMasquerade("domain4.com", "4.4.4.4", 0, false),
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain2.com", "2.2.2.2", 0, false),
newMockFront("domain3.com", "3.3.3.3", 0, true),
newMockFront("domain4.com", "4.4.4.4", 0, false),
newMockFront("domain1.com", "1.1.1.1", 0, true),
},
expectedSuccessful: 2,
},
{
name: "None successful",
masquerades: []*mockFront{
newMockMasquerade("domain1.com", "1.1.1.1", 0, false),
newMockMasquerade("domain2.com", "2.2.2.2", 0, false),
newMockMasquerade("domain3.com", "3.3.3.3", 0, false),
newMockMasquerade("domain4.com", "4.4.4.4", 0, false),
newMockFront("domain1.com", "1.1.1.1", 0, false),
newMockFront("domain2.com", "2.2.2.2", 0, false),
newMockFront("domain3.com", "3.3.3.3", 0, false),
newMockFront("domain4.com", "4.4.4.4", 0, false),
},
expectedSuccessful: 0,
},
Expand All @@ -834,7 +834,7 @@ func TestFindWorkingMasquerades(t *testing.T) {
masquerades: func() []*mockFront {
var masquerades []*mockFront
for i := 0; i < 50; i++ {
masquerades = append(masquerades, newMockMasquerade(fmt.Sprintf("domain%d.com", i), fmt.Sprintf("1.1.1.%d", i), 0, i%2 == 0))
masquerades = append(masquerades, newMockFront(fmt.Sprintf("domain%d.com", i), fmt.Sprintf("1.1.1.%d", i), 0, i%2 == 0))
}
return masquerades
}(),
Expand All @@ -855,7 +855,7 @@ func TestFindWorkingMasquerades(t *testing.T) {
f.fronts[i] = m
}

f.vetBatch(0, 10)
f.tryAllFronts()

tries := 0
for f.connectingFronts.size() < tt.expectedSuccessful && tries < 100 {
Expand Down Expand Up @@ -900,37 +900,15 @@ func TestLoadFronts(t *testing.T) {
}
}

func TestIndex(t *testing.T) {
tests := []struct {
i, batchSize, size int
expected int
}{
{i: 0, batchSize: 10, size: 100, expected: 10},
{i: 5, batchSize: 10, size: 100, expected: 15},
{i: 95, batchSize: 10, size: 100, expected: 5},
{i: 99, batchSize: 10, size: 100, expected: 9},
{i: 0, batchSize: 5, size: 20, expected: 5},
{i: 15, batchSize: 5, size: 20, expected: 0},
{i: 18, batchSize: 5, size: 20, expected: 3},
}

for _, test := range tests {
t.Run(fmt.Sprintf("i=%d,batchSize=%d,size=%d", test.i, test.batchSize, test.size), func(t *testing.T) {
result := index(test.i, test.batchSize, test.size)
assert.Equal(t, test.expected, result)
})
}
}

// Generate a mock of a MasqueradeInterface with a Dial method that can optionally
// return an error after a specified number of milliseconds.
func newMockMasquerade(domain string, ipAddress string, timeout time.Duration, passesCheck bool) *mockFront {
return newMockMasqueradeWithLastSuccess(domain, ipAddress, timeout, passesCheck, time.Time{})
func newMockFront(domain string, ipAddress string, timeout time.Duration, passesCheck bool) *mockFront {
return newMockFrontWithLastSuccess(domain, ipAddress, timeout, passesCheck, time.Time{})
}

// Generate a mock of a MasqueradeInterface with a Dial method that can optionally
// return an error after a specified number of milliseconds.
func newMockMasqueradeWithLastSuccess(domain string, ipAddress string, timeout time.Duration, passesCheck bool, lastSucceededTime time.Time) *mockFront {
func newMockFrontWithLastSuccess(domain string, ipAddress string, timeout time.Duration, passesCheck bool, lastSucceededTime time.Time) *mockFront {
return &mockFront{
Domain: domain,
IpAddress: ipAddress,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.6
toolchain go1.23.3

require (
github.com/alitto/pond/v2 v2.1.5
github.com/getlantern/golog v0.0.0-20230503153817-8e72de7e0a65
github.com/getlantern/keyman v0.0.0-20200819205636-76fef27c39f1
github.com/getlantern/netx v0.0.0-20240814210628-0984f52e2d18
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/alitto/pond/v2 v2.1.5 h1:2pp/KAPcb02NSpHsjjnxnrTDzogMLsq+vFf/L0DB84A=
github.com/alitto/pond/v2 v2.1.5/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI=
github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down
Loading