Skip to content

Commit

Permalink
Use worker pool instead of waitgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
myleshorton committed Dec 6, 2024
1 parent 67fc744 commit 9a7741a
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 74 deletions.
76 changes: 44 additions & 32 deletions fronted.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/getlantern/golog"
"github.com/getlantern/ops"

"github.com/alitto/pond/v2"
)

const (
Expand Down Expand Up @@ -210,38 +212,66 @@ func Vet(m *Masquerade, pool *x509.CertPool, testURL string) bool {
return masq.postCheck(conn, testURL)
}

// findWorkingFronts finds working domain fronts by vetting them in batches and in
// parallel. Speed is of the essence here, as without working fronts, users will
// findWorkingFronts finds working domain fronts by testing them using a worker pool. Speed
// is of the essence here, as without working fronts, users will
// be unable to fetch proxy configurations, particularly in the case of a first time
// user who does not have proxies cached on disk.
func (f *fronted) findWorkingFronts() {
// vet fronts in batches
const batchSize int = 40

// Keep looping through all fronts making sure we have working ones.
i := 0
for {
// Continually loop through the fronts in batches until we have 4 working ones,
// always looping around to the beginning if we reach the end.
// Continually loop through the fronts until we have 4 working ones.
// This is important, for example, when the user goes offline and all fronts start failing.
// We want to just keep trying in that case so that we find working fronts as soon as they
// come back online.
if f.connectingFronts.size() < 4 {
f.vetBatch(i, batchSize)
i = index(i, batchSize, f.frontSize())
if !f.hasEnoughWorkingFronts() {
// Note that trying all fronts takes awhile, as it only completes when we either
// have enough working fronts, or we've tried all of them.
log.Debug("findWorkingFronts::Trying all fronts")
f.tryAllFronts()
log.Debug("findWorkingFronts::Tried all fronts")
} else {
log.Debug("findWorkingFronts::Enough working fronts...sleeping")
select {
case <-f.stopCh:
log.Debug("Stopping parallel dialing")
log.Debug("findWorkingFronts::Stopping parallel dialing")
return
case <-time.After(time.Duration(rand.IntN(12000)) * time.Millisecond):
// Run again after a random time between 0 and 12 seconds
}
}
}
}

func index(i, batchSize, size int) int {
return (i + batchSize) % size
func (f *fronted) tryAllFronts() {
// Vet fronts using a worker pool of 40 goroutines.
pool := pond.NewPool(40)

// Submit all fronts to the worker pool.
for i := 0; i < f.frontSize(); i++ {
i := i
m := f.frontAt(i)
pool.Submit(func() {
log.Debugf("Running task #%d with front %v", i, m.getIpAddress())
if f.hasEnoughWorkingFronts() {
// We have enough working fronts, so no need to continue.
log.Debug("Enough working fronts...ignoring task")
return
}
working := f.vetFront(m)
if working {
f.connectingFronts.onConnected(m)
} else {
m.markFailed()
}
})
}

// Stop the pool and wait for all submitted tasks to complete
pool.StopAndWait()
}

func (f *fronted) hasEnoughWorkingFronts() bool {
return f.connectingFronts.size() >= 4
}

func (f *fronted) frontSize() int {
Expand All @@ -256,24 +286,6 @@ func (f *fronted) frontAt(i int) Front {
return f.fronts[i]
}

func (f *fronted) vetBatch(start, batchSize int) {
log.Debugf("Vetting masquerade batch %d-%d", start, start+batchSize)
var wg sync.WaitGroup
for i := start; i < start+batchSize && i < f.frontSize(); i++ {
wg.Add(1)
go func(m Front) {
defer wg.Done()
working := f.vetFront(m)
if working {
f.connectingFronts.onConnected(m)
} else {
m.markFailed()
}
}(f.frontAt(i))
}
wg.Wait()
}

func (f *fronted) vetFront(m Front) bool {
conn, masqueradeGood, err := f.dialFront(m)
if err != nil {
Expand Down
62 changes: 20 additions & 42 deletions fronted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,33 +799,33 @@ func TestFindWorkingMasquerades(t *testing.T) {
{
name: "All successful",
masquerades: []*mockFront{
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockMasquerade("domain2.com", "2.2.2.2", 0, true),
newMockMasquerade("domain3.com", "3.3.3.3", 0, true),
newMockMasquerade("domain4.com", "4.4.4.4", 0, true),
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain2.com", "2.2.2.2", 0, true),
newMockFront("domain3.com", "3.3.3.3", 0, true),
newMockFront("domain4.com", "4.4.4.4", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
},
expectedSuccessful: 4,
},
{
name: "Some successful",
masquerades: []*mockFront{
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockMasquerade("domain2.com", "2.2.2.2", 0, false),
newMockMasquerade("domain3.com", "3.3.3.3", 0, true),
newMockMasquerade("domain4.com", "4.4.4.4", 0, false),
newMockMasquerade("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain1.com", "1.1.1.1", 0, true),
newMockFront("domain2.com", "2.2.2.2", 0, false),
newMockFront("domain3.com", "3.3.3.3", 0, true),
newMockFront("domain4.com", "4.4.4.4", 0, false),
newMockFront("domain1.com", "1.1.1.1", 0, true),
},
expectedSuccessful: 2,
},
{
name: "None successful",
masquerades: []*mockFront{
newMockMasquerade("domain1.com", "1.1.1.1", 0, false),
newMockMasquerade("domain2.com", "2.2.2.2", 0, false),
newMockMasquerade("domain3.com", "3.3.3.3", 0, false),
newMockMasquerade("domain4.com", "4.4.4.4", 0, false),
newMockFront("domain1.com", "1.1.1.1", 0, false),
newMockFront("domain2.com", "2.2.2.2", 0, false),
newMockFront("domain3.com", "3.3.3.3", 0, false),
newMockFront("domain4.com", "4.4.4.4", 0, false),
},
expectedSuccessful: 0,
},
Expand All @@ -834,7 +834,7 @@ func TestFindWorkingMasquerades(t *testing.T) {
masquerades: func() []*mockFront {
var masquerades []*mockFront
for i := 0; i < 50; i++ {
masquerades = append(masquerades, newMockMasquerade(fmt.Sprintf("domain%d.com", i), fmt.Sprintf("1.1.1.%d", i), 0, i%2 == 0))
masquerades = append(masquerades, newMockFront(fmt.Sprintf("domain%d.com", i), fmt.Sprintf("1.1.1.%d", i), 0, i%2 == 0))
}
return masquerades
}(),
Expand All @@ -855,7 +855,7 @@ func TestFindWorkingMasquerades(t *testing.T) {
f.fronts[i] = m
}

f.vetBatch(0, 10)
f.tryAllFronts()

tries := 0
for f.connectingFronts.size() < tt.expectedSuccessful && tries < 100 {
Expand Down Expand Up @@ -900,37 +900,15 @@ func TestLoadFronts(t *testing.T) {
}
}

func TestIndex(t *testing.T) {
tests := []struct {
i, batchSize, size int
expected int
}{
{i: 0, batchSize: 10, size: 100, expected: 10},
{i: 5, batchSize: 10, size: 100, expected: 15},
{i: 95, batchSize: 10, size: 100, expected: 5},
{i: 99, batchSize: 10, size: 100, expected: 9},
{i: 0, batchSize: 5, size: 20, expected: 5},
{i: 15, batchSize: 5, size: 20, expected: 0},
{i: 18, batchSize: 5, size: 20, expected: 3},
}

for _, test := range tests {
t.Run(fmt.Sprintf("i=%d,batchSize=%d,size=%d", test.i, test.batchSize, test.size), func(t *testing.T) {
result := index(test.i, test.batchSize, test.size)
assert.Equal(t, test.expected, result)
})
}
}

// Generate a mock of a MasqueradeInterface with a Dial method that can optionally
// return an error after a specified number of milliseconds.
func newMockMasquerade(domain string, ipAddress string, timeout time.Duration, passesCheck bool) *mockFront {
return newMockMasqueradeWithLastSuccess(domain, ipAddress, timeout, passesCheck, time.Time{})
func newMockFront(domain string, ipAddress string, timeout time.Duration, passesCheck bool) *mockFront {
return newMockFrontWithLastSuccess(domain, ipAddress, timeout, passesCheck, time.Time{})
}

// Generate a mock of a MasqueradeInterface with a Dial method that can optionally
// return an error after a specified number of milliseconds.
func newMockMasqueradeWithLastSuccess(domain string, ipAddress string, timeout time.Duration, passesCheck bool, lastSucceededTime time.Time) *mockFront {
func newMockFrontWithLastSuccess(domain string, ipAddress string, timeout time.Duration, passesCheck bool, lastSucceededTime time.Time) *mockFront {
return &mockFront{
Domain: domain,
IpAddress: ipAddress,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.6
toolchain go1.23.3

require (
github.com/alitto/pond/v2 v2.1.5
github.com/getlantern/golog v0.0.0-20230503153817-8e72de7e0a65
github.com/getlantern/keyman v0.0.0-20200819205636-76fef27c39f1
github.com/getlantern/netx v0.0.0-20240814210628-0984f52e2d18
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/alitto/pond/v2 v2.1.5 h1:2pp/KAPcb02NSpHsjjnxnrTDzogMLsq+vFf/L0DB84A=
github.com/alitto/pond/v2 v2.1.5/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI=
github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down

0 comments on commit 9a7741a

Please sign in to comment.