From 9a7741a6000a3ea98c0ba5a18b5c40d340cdaead Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Fri, 6 Dec 2024 10:43:09 -0700 Subject: [PATCH] Use worker pool instead of waitgroup --- fronted.go | 76 ++++++++++++++++++++++++++++--------------------- fronted_test.go | 62 +++++++++++++--------------------------- go.mod | 1 + go.sum | 2 ++ 4 files changed, 67 insertions(+), 74 deletions(-) diff --git a/fronted.go b/fronted.go index d3574d5..7cb6ed0 100644 --- a/fronted.go +++ b/fronted.go @@ -21,6 +21,8 @@ import ( "github.com/getlantern/golog" "github.com/getlantern/ops" + + "github.com/alitto/pond/v2" ) const ( @@ -210,38 +212,66 @@ func Vet(m *Masquerade, pool *x509.CertPool, testURL string) bool { return masq.postCheck(conn, testURL) } -// findWorkingFronts finds working domain fronts by vetting them in batches and in -// parallel. Speed is of the essence here, as without working fronts, users will +// findWorkingFronts finds working domain fronts by testing them using a worker pool. Speed +// is of the essence here, as without working fronts, users will // be unable to fetch proxy configurations, particularly in the case of a first time // user who does not have proxies cached on disk. func (f *fronted) findWorkingFronts() { - // vet fronts in batches - const batchSize int = 40 - // Keep looping through all fronts making sure we have working ones. - i := 0 for { - // Continually loop through the fronts in batches until we have 4 working ones, - // always looping around to the beginning if we reach the end. + // Continually loop through the fronts until we have 4 working ones. // This is important, for example, when the user goes offline and all fronts start failing. // We want to just keep trying in that case so that we find working fronts as soon as they // come back online. - if f.connectingFronts.size() < 4 { - f.vetBatch(i, batchSize) - i = index(i, batchSize, f.frontSize()) + if !f.hasEnoughWorkingFronts() { + // Note that trying all fronts takes awhile, as it only completes when we either + // have enough working fronts, or we've tried all of them. + log.Debug("findWorkingFronts::Trying all fronts") + f.tryAllFronts() + log.Debug("findWorkingFronts::Tried all fronts") } else { + log.Debug("findWorkingFronts::Enough working fronts...sleeping") select { case <-f.stopCh: - log.Debug("Stopping parallel dialing") + log.Debug("findWorkingFronts::Stopping parallel dialing") return case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond): + // Run again after a random time between 0 and 12 seconds } } } } -func index(i, batchSize, size int) int { - return (i + batchSize) % size +func (f *fronted) tryAllFronts() { + // Vet fronts using a worker pool of 40 goroutines. + pool := pond.NewPool(40) + + // Submit all fronts to the worker pool. + for i := 0; i < f.frontSize(); i++ { + i := i + m := f.frontAt(i) + pool.Submit(func() { + log.Debugf("Running task #%d with front %v", i, m.getIpAddress()) + if f.hasEnoughWorkingFronts() { + // We have enough working fronts, so no need to continue. + log.Debug("Enough working fronts...ignoring task") + return + } + working := f.vetFront(m) + if working { + f.connectingFronts.onConnected(m) + } else { + m.markFailed() + } + }) + } + + // Stop the pool and wait for all submitted tasks to complete + pool.StopAndWait() +} + +func (f *fronted) hasEnoughWorkingFronts() bool { + return f.connectingFronts.size() >= 4 } func (f *fronted) frontSize() int { @@ -256,24 +286,6 @@ func (f *fronted) frontAt(i int) Front { return f.fronts[i] } -func (f *fronted) vetBatch(start, batchSize int) { - log.Debugf("Vetting masquerade batch %d-%d", start, start+batchSize) - var wg sync.WaitGroup - for i := start; i < start+batchSize && i < f.frontSize(); i++ { - wg.Add(1) - go func(m Front) { - defer wg.Done() - working := f.vetFront(m) - if working { - f.connectingFronts.onConnected(m) - } else { - m.markFailed() - } - }(f.frontAt(i)) - } - wg.Wait() -} - func (f *fronted) vetFront(m Front) bool { conn, masqueradeGood, err := f.dialFront(m) if err != nil { diff --git a/fronted_test.go b/fronted_test.go index a82ef65..e1a5b21 100644 --- a/fronted_test.go +++ b/fronted_test.go @@ -799,33 +799,33 @@ func TestFindWorkingMasquerades(t *testing.T) { { name: "All successful", masquerades: []*mockFront{ - newMockMasquerade("domain1.com", "1.1.1.1", 0, true), - newMockMasquerade("domain2.com", "2.2.2.2", 0, true), - newMockMasquerade("domain3.com", "3.3.3.3", 0, true), - newMockMasquerade("domain4.com", "4.4.4.4", 0, true), - newMockMasquerade("domain1.com", "1.1.1.1", 0, true), - newMockMasquerade("domain1.com", "1.1.1.1", 0, true), + newMockFront("domain1.com", "1.1.1.1", 0, true), + newMockFront("domain2.com", "2.2.2.2", 0, true), + newMockFront("domain3.com", "3.3.3.3", 0, true), + newMockFront("domain4.com", "4.4.4.4", 0, true), + newMockFront("domain1.com", "1.1.1.1", 0, true), + newMockFront("domain1.com", "1.1.1.1", 0, true), }, expectedSuccessful: 4, }, { name: "Some successful", masquerades: []*mockFront{ - newMockMasquerade("domain1.com", "1.1.1.1", 0, true), - newMockMasquerade("domain2.com", "2.2.2.2", 0, false), - newMockMasquerade("domain3.com", "3.3.3.3", 0, true), - newMockMasquerade("domain4.com", "4.4.4.4", 0, false), - newMockMasquerade("domain1.com", "1.1.1.1", 0, true), + newMockFront("domain1.com", "1.1.1.1", 0, true), + newMockFront("domain2.com", "2.2.2.2", 0, false), + newMockFront("domain3.com", "3.3.3.3", 0, true), + newMockFront("domain4.com", "4.4.4.4", 0, false), + newMockFront("domain1.com", "1.1.1.1", 0, true), }, expectedSuccessful: 2, }, { name: "None successful", masquerades: []*mockFront{ - newMockMasquerade("domain1.com", "1.1.1.1", 0, false), - newMockMasquerade("domain2.com", "2.2.2.2", 0, false), - newMockMasquerade("domain3.com", "3.3.3.3", 0, false), - newMockMasquerade("domain4.com", "4.4.4.4", 0, false), + newMockFront("domain1.com", "1.1.1.1", 0, false), + newMockFront("domain2.com", "2.2.2.2", 0, false), + newMockFront("domain3.com", "3.3.3.3", 0, false), + newMockFront("domain4.com", "4.4.4.4", 0, false), }, expectedSuccessful: 0, }, @@ -834,7 +834,7 @@ func TestFindWorkingMasquerades(t *testing.T) { masquerades: func() []*mockFront { var masquerades []*mockFront for i := 0; i < 50; i++ { - masquerades = append(masquerades, newMockMasquerade(fmt.Sprintf("domain%d.com", i), fmt.Sprintf("1.1.1.%d", i), 0, i%2 == 0)) + masquerades = append(masquerades, newMockFront(fmt.Sprintf("domain%d.com", i), fmt.Sprintf("1.1.1.%d", i), 0, i%2 == 0)) } return masquerades }(), @@ -855,7 +855,7 @@ func TestFindWorkingMasquerades(t *testing.T) { f.fronts[i] = m } - f.vetBatch(0, 10) + f.tryAllFronts() tries := 0 for f.connectingFronts.size() < tt.expectedSuccessful && tries < 100 { @@ -900,37 +900,15 @@ func TestLoadFronts(t *testing.T) { } } -func TestIndex(t *testing.T) { - tests := []struct { - i, batchSize, size int - expected int - }{ - {i: 0, batchSize: 10, size: 100, expected: 10}, - {i: 5, batchSize: 10, size: 100, expected: 15}, - {i: 95, batchSize: 10, size: 100, expected: 5}, - {i: 99, batchSize: 10, size: 100, expected: 9}, - {i: 0, batchSize: 5, size: 20, expected: 5}, - {i: 15, batchSize: 5, size: 20, expected: 0}, - {i: 18, batchSize: 5, size: 20, expected: 3}, - } - - for _, test := range tests { - t.Run(fmt.Sprintf("i=%d,batchSize=%d,size=%d", test.i, test.batchSize, test.size), func(t *testing.T) { - result := index(test.i, test.batchSize, test.size) - assert.Equal(t, test.expected, result) - }) - } -} - // Generate a mock of a MasqueradeInterface with a Dial method that can optionally // return an error after a specified number of milliseconds. -func newMockMasquerade(domain string, ipAddress string, timeout time.Duration, passesCheck bool) *mockFront { - return newMockMasqueradeWithLastSuccess(domain, ipAddress, timeout, passesCheck, time.Time{}) +func newMockFront(domain string, ipAddress string, timeout time.Duration, passesCheck bool) *mockFront { + return newMockFrontWithLastSuccess(domain, ipAddress, timeout, passesCheck, time.Time{}) } // Generate a mock of a MasqueradeInterface with a Dial method that can optionally // return an error after a specified number of milliseconds. -func newMockMasqueradeWithLastSuccess(domain string, ipAddress string, timeout time.Duration, passesCheck bool, lastSucceededTime time.Time) *mockFront { +func newMockFrontWithLastSuccess(domain string, ipAddress string, timeout time.Duration, passesCheck bool, lastSucceededTime time.Time) *mockFront { return &mockFront{ Domain: domain, IpAddress: ipAddress, diff --git a/go.mod b/go.mod index 16df9fc..c4523a9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.6 toolchain go1.23.3 require ( + github.com/alitto/pond/v2 v2.1.5 github.com/getlantern/golog v0.0.0-20230503153817-8e72de7e0a65 github.com/getlantern/keyman v0.0.0-20200819205636-76fef27c39f1 github.com/getlantern/netx v0.0.0-20240814210628-0984f52e2d18 diff --git a/go.sum b/go.sum index 444b727..93f5e6b 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/alitto/pond/v2 v2.1.5 h1:2pp/KAPcb02NSpHsjjnxnrTDzogMLsq+vFf/L0DB84A= +github.com/alitto/pond/v2 v2.1.5/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=