Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separately track connecting fronts and do not clear them on new configs #50

Merged
merged 26 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d16339d
Change to separately track working fronts
myleshorton Nov 22, 2024
cc002f7
Naming tweaks
myleshorton Nov 25, 2024
41251ed
fix for test compile error
myleshorton Nov 26, 2024
008e810
remove log
myleshorton Nov 30, 2024
9ee0721
tweak else statement
myleshorton Nov 30, 2024
cfa8beb
tone down logging
myleshorton Nov 30, 2024
bd8e939
use go test fmt and check for nil param
myleshorton Nov 30, 2024
2a65be3
try new test format
myleshorton Nov 30, 2024
7f93522
Another try at cleaner test output
myleshorton Nov 30, 2024
b2c702b
do not fail fast
myleshorton Nov 30, 2024
800f367
no pipefail
myleshorton Nov 30, 2024
55e5821
install node
myleshorton Nov 30, 2024
12a4450
add node
myleshorton Nov 30, 2024
6b6fc13
Lots of cleanups to more cleanly handle continually finding working f…
myleshorton Dec 3, 2024
9b577b9
Change to only modify global config details via update call
myleshorton Dec 3, 2024
fe2273f
Merge branch 'main' into myles/track-connected
myleshorton Dec 3, 2024
988625b
no eventual
myleshorton Dec 3, 2024
7eddee1
Fix test
myleshorton Dec 3, 2024
83d7257
Improve naming and make sure requests have a context
myleshorton Dec 3, 2024
8be6775
Updated to return fronted instance
myleshorton Dec 3, 2024
d7ef158
downgraded some dependencies
myleshorton Dec 4, 2024
b9307b7
Improved cert pool handling
myleshorton Dec 4, 2024
67fc744
Added comments
myleshorton Dec 6, 2024
9a7741a
Use worker pool instead of waitgroup
myleshorton Dec 6, 2024
2954726
Use a random range to avoid quick checks
myleshorton Dec 6, 2024
7bc4f00
Do not keep vetting if stopped
myleshorton Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,24 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 18
- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: "go.mod"
- name: Run unit tests
run: go test -failfast -coverprofile=profile.cov
- name: Install go go-ctrf-json-reporter
run: go install github.com/ctrf-io/go-ctrf-json-reporter/cmd/go-ctrf-json-reporter@latest
- name: Run tests
run: go test -json -race -tags="headless" -coverprofile=profile.cov ./... | go-ctrf-json-reporter -output ctrf-report.json
- name: Upload test results
uses: actions/upload-artifact@v4
with:
name: ctrf-report
path: ctrf-report.json
- name: Publish Test Summary Results
run: npx github-actions-ctrf ctrf-report.json
- name: Install goveralls
run: go install github.com/mattn/goveralls@latest
- name: Send coverage
Expand Down
29 changes: 16 additions & 13 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
)

func (d *fronted) initCaching(cacheFile string) {
d.prepopulateMasquerades(cacheFile)
d.prepopulateFronts(cacheFile)
go d.maintainCache(cacheFile)
}

func (d *fronted) prepopulateMasquerades(cacheFile string) {
func (d *fronted) prepopulateFronts(cacheFile string) {
bytes, err := os.ReadFile(cacheFile)
if err != nil {
// This is not a big deal since we'll just fill the cache later
Expand All @@ -27,22 +27,22 @@ func (d *fronted) prepopulateMasquerades(cacheFile string) {
}

log.Debugf("Attempting to prepopulate masquerades from cache file: %v", cacheFile)
var cachedMasquerades []*masquerade
if err := json.Unmarshal(bytes, &cachedMasquerades); err != nil {
var cachedFronts []*front
if err := json.Unmarshal(bytes, &cachedFronts); err != nil {
log.Errorf("Error reading cached masquerades: %v", err)
return
}

log.Debugf("Cache contained %d masquerades", len(cachedMasquerades))
log.Debugf("Cache contained %d masquerades", len(cachedFronts))
now := time.Now()

// update last succeeded status of masquerades based on cached values
for _, m := range d.masquerades {
for _, cm := range cachedMasquerades {
sameMasquerade := cm.ProviderID == m.getProviderID() && cm.Domain == m.getDomain() && cm.IpAddress == m.getIpAddress()
cachedValueFresh := now.Sub(m.lastSucceeded()) < d.maxAllowedCachedAge
if sameMasquerade && cachedValueFresh {
m.setLastSucceeded(cm.LastSucceeded)
for _, f := range d.fronts {
for _, cf := range cachedFronts {
sameFront := cf.ProviderID == f.getProviderID() && cf.Domain == f.getDomain() && cf.IpAddress == f.getIpAddress()
cachedValueFresh := now.Sub(f.lastSucceeded()) < d.maxAllowedCachedAge
if sameFront && cachedValueFresh {
f.setLastSucceeded(cf.LastSucceeded)
}
}
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (d *fronted) maintainCache(cacheFile string) {

func (d *fronted) updateCache(cacheFile string) {
log.Debugf("Updating cache at %v", cacheFile)
cache := d.masquerades.sortedCopy()
cache := d.fronts.sortedCopy()
sizeToSave := len(cache)
if d.maxCacheSize < sizeToSave {
sizeToSave = d.maxCacheSize
Expand All @@ -98,11 +98,14 @@ func (d *fronted) updateCache(cacheFile string) {
// parent directory does not exist
log.Debugf("Parent directory of cache file does not exist: %v", parent)
}
} else {
log.Debugf("Cache saved to disk")
}
}

func (d *fronted) closeCache() {
func (d *fronted) Close() {
d.closeCacheOnce.Do(func() {
close(d.cacheClosed)
})
d.stopCh <- nil
}
45 changes: 26 additions & 19 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,36 @@ func TestCaching(t *testing.T) {
cloudsackID: NewProvider(nil, "", nil, nil, nil, nil, nil),
}

makeDirect := func() *fronted {
d := &fronted{
masquerades: make(sortedMasquerades, 0, 1000),
log.Debug("Creating fronted")
makeFronted := func() *fronted {
f := &fronted{
fronts: make(sortedFronts, 0, 1000),
maxAllowedCachedAge: 250 * time.Millisecond,
maxCacheSize: 4,
cacheSaveInterval: 50 * time.Millisecond,
cacheDirty: make(chan interface{}, 1),
cacheClosed: make(chan interface{}),
providers: providers,
defaultProviderID: cloudsackID,
stopCh: make(chan interface{}, 10),
}
go d.maintainCache(cacheFile)
return d
go f.maintainCache(cacheFile)
return f
}

now := time.Now()
mb := &masquerade{Masquerade: Masquerade{Domain: "b", IpAddress: "2"}, LastSucceeded: now, ProviderID: testProviderID}
mc := &masquerade{Masquerade: Masquerade{Domain: "c", IpAddress: "3"}, LastSucceeded: now, ProviderID: ""} // defaulted
md := &masquerade{Masquerade: Masquerade{Domain: "d", IpAddress: "4"}, LastSucceeded: now, ProviderID: "sadcloud"} // skipped
mb := &front{Masquerade: Masquerade{Domain: "b", IpAddress: "2"}, LastSucceeded: now, ProviderID: testProviderID}
mc := &front{Masquerade: Masquerade{Domain: "c", IpAddress: "3"}, LastSucceeded: now, ProviderID: ""} // defaulted
md := &front{Masquerade: Masquerade{Domain: "d", IpAddress: "4"}, LastSucceeded: now, ProviderID: "sadcloud"} // skipped

d := makeDirect()
d.masquerades = append(d.masquerades, mb, mc, md)
f := makeFronted()

readCached := func() []*masquerade {
var result []*masquerade
log.Debug("Adding fronts")
f.fronts = append(f.fronts, mb, mc, md)

readCached := func() []*front {
log.Debug("Reading cached fronts")
var result []*front
b, err := os.ReadFile(cacheFile)
require.NoError(t, err, "Unable to read cache file")
err = json.Unmarshal(b, &result)
Expand All @@ -59,22 +64,24 @@ func TestCaching(t *testing.T) {
}

// Save the cache
d.markCacheDirty()
time.Sleep(d.cacheSaveInterval * 2)
d.closeCache()
f.markCacheDirty()

time.Sleep(f.cacheSaveInterval * 2)
f.Close()

time.Sleep(50 * time.Millisecond)

log.Debug("Reopening fronted")
// Reopen cache file and make sure right data was in there
d = makeDirect()
d.prepopulateMasquerades(cacheFile)
f = makeFronted()
f.prepopulateFronts(cacheFile)
masquerades := readCached()
require.Len(t, masquerades, 3, "Wrong number of masquerades read")
for i, expected := range []*masquerade{mb, mc, md} {
for i, expected := range []*front{mb, mc, md} {
require.Equal(t, expected.Domain, masquerades[i].Domain, "Wrong masquerade at position %d", i)
require.Equal(t, expected.IpAddress, masquerades[i].IpAddress, "Masquerade at position %d has wrong IpAddress", 0)
require.Equal(t, expected.ProviderID, masquerades[i].ProviderID, "Masquerade at position %d has wrong ProviderID", 0)
require.Equal(t, now.Unix(), masquerades[i].LastSucceeded.Unix(), "Masquerade at position %d has wrong LastSucceeded", 0)
}
d.closeCache()
f.Close()
}
54 changes: 54 additions & 0 deletions connecting_fronts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package fronted

import (
"context"
)

type connectingFronts interface {
onConnected(m Front)
connectingFront(context.Context) (Front, error)
size() int
}

type connecting struct {
// Create a channel of fronts that are connecting.
frontsCh chan Front
}

// Make sure that connectingFronts is a connectListener
var _ connectingFronts = &connecting{}

// newConnectingFronts creates a new ConnectingFronts struct with a channel of fronts that have
// successfully connected.
func newConnectingFronts(size int) *connecting {
return &connecting{
// We overallocate the channel to avoid blocking.
frontsCh: make(chan Front, size),
Comment on lines +25 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to allocate more than size?

}
}
Comment on lines +21 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the doc is out of sync, frontCh would be empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right -- it would be empty, but with a large capacity based on this call from fronted.go:

connectingFronts: newConnectingFronts(4000),

The idea is to just to make sure that it won't block on adding connecting fronts.

Copy link
Contributor

@garmr-ulfr garmr-ulfr Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"of fronts that have" kind of implies it would already contain some successful fronts by the time newConnectingFronts returns. I could see that being a "gotcha" for someone. Changing it to "for fronts" would avoid confusion.


// AddFront adds a new front to the list of fronts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also out of sync.

func (cf *connecting) onConnected(m Front) {
cf.frontsCh <- m
}

func (cf *connecting) connectingFront(ctx context.Context) (Front, error) {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case m := <-cf.frontsCh:
// The front may have stopped succeeding since we last checked,
// so only return it if it's still succeeding.
if m.isSucceeding() {
// Add the front back to the channel.
cf.frontsCh <- m
return m, nil
}
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is a key section


func (cf *connecting) size() int {
return len(cf.frontsCh)
}
39 changes: 39 additions & 0 deletions connecting_fronts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package fronted

import (
"testing"
)

func TestConnectingFrontsSize(t *testing.T) {
tests := []struct {
name string
setup func() *connecting
expected int
}{
{
name: "empty channel",
setup: func() *connecting {
return newConnectingFronts(10)
},
expected: 0,
},
{
name: "non-empty channel",
setup: func() *connecting {
cf := newConnectingFronts(10)
cf.onConnected(&mockFront{})
return cf
},
expected: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cf := tt.setup()
if got := cf.size(); got != tt.expected {
t.Errorf("size() = %d, want %d", got, tt.expected)
}
})
}
}
115 changes: 0 additions & 115 deletions context.go

This file was deleted.

Loading
Loading