Skip to content

Commit

Permalink
Merge pull request #50 from getlantern/myles/track-connected
Browse files Browse the repository at this point in the history
Separately track connecting fronts and do not clear them on new configs
  • Loading branch information
myleshorton authored Dec 6, 2024
2 parents a556be1 + 7bc4f00 commit 24178df
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 554 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,24 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 18
- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: "go.mod"
- name: Run unit tests
run: go test -failfast -coverprofile=profile.cov
- name: Install go go-ctrf-json-reporter
run: go install github.com/ctrf-io/go-ctrf-json-reporter/cmd/go-ctrf-json-reporter@latest
- name: Run tests
run: go test -json -race -tags="headless" -coverprofile=profile.cov ./... | go-ctrf-json-reporter -output ctrf-report.json
- name: Upload test results
uses: actions/upload-artifact@v4
with:
name: ctrf-report
path: ctrf-report.json
- name: Publish Test Summary Results
run: npx github-actions-ctrf ctrf-report.json
- name: Install goveralls
run: go install github.com/mattn/goveralls@latest
- name: Send coverage
Expand Down
32 changes: 14 additions & 18 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
)

func (d *fronted) initCaching(cacheFile string) {
d.prepopulateMasquerades(cacheFile)
d.prepopulateFronts(cacheFile)
go d.maintainCache(cacheFile)
}

func (d *fronted) prepopulateMasquerades(cacheFile string) {
func (d *fronted) prepopulateFronts(cacheFile string) {
bytes, err := os.ReadFile(cacheFile)
if err != nil {
// This is not a big deal since we'll just fill the cache later
Expand All @@ -27,22 +27,22 @@ func (d *fronted) prepopulateMasquerades(cacheFile string) {
}

log.Debugf("Attempting to prepopulate masquerades from cache file: %v", cacheFile)
var cachedMasquerades []*masquerade
if err := json.Unmarshal(bytes, &cachedMasquerades); err != nil {
var cachedFronts []*front
if err := json.Unmarshal(bytes, &cachedFronts); err != nil {
log.Errorf("Error reading cached masquerades: %v", err)
return
}

log.Debugf("Cache contained %d masquerades", len(cachedMasquerades))
log.Debugf("Cache contained %d masquerades", len(cachedFronts))
now := time.Now()

// update last succeeded status of masquerades based on cached values
for _, m := range d.masquerades {
for _, cm := range cachedMasquerades {
sameMasquerade := cm.ProviderID == m.getProviderID() && cm.Domain == m.getDomain() && cm.IpAddress == m.getIpAddress()
cachedValueFresh := now.Sub(m.lastSucceeded()) < d.maxAllowedCachedAge
if sameMasquerade && cachedValueFresh {
m.setLastSucceeded(cm.LastSucceeded)
for _, f := range d.fronts {
for _, cf := range cachedFronts {
sameFront := cf.ProviderID == f.getProviderID() && cf.Domain == f.getDomain() && cf.IpAddress == f.getIpAddress()
cachedValueFresh := now.Sub(f.lastSucceeded()) < d.maxAllowedCachedAge
if sameFront && cachedValueFresh {
f.setLastSucceeded(cf.LastSucceeded)
}
}
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (d *fronted) maintainCache(cacheFile string) {

func (d *fronted) updateCache(cacheFile string) {
log.Debugf("Updating cache at %v", cacheFile)
cache := d.masquerades.sortedCopy()
cache := d.fronts.sortedCopy()
sizeToSave := len(cache)
if d.maxCacheSize < sizeToSave {
sizeToSave = d.maxCacheSize
Expand All @@ -98,11 +98,7 @@ func (d *fronted) updateCache(cacheFile string) {
// parent directory does not exist
log.Debugf("Parent directory of cache file does not exist: %v", parent)
}
} else {
log.Debugf("Cache saved to disk")
}
}

func (d *fronted) closeCache() {
d.closeCacheOnce.Do(func() {
close(d.cacheClosed)
})
}
45 changes: 26 additions & 19 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,36 @@ func TestCaching(t *testing.T) {
cloudsackID: NewProvider(nil, "", nil, nil, nil, nil, nil),
}

makeDirect := func() *fronted {
d := &fronted{
masquerades: make(sortedMasquerades, 0, 1000),
log.Debug("Creating fronted")
makeFronted := func() *fronted {
f := &fronted{
fronts: make(sortedFronts, 0, 1000),
maxAllowedCachedAge: 250 * time.Millisecond,
maxCacheSize: 4,
cacheSaveInterval: 50 * time.Millisecond,
cacheDirty: make(chan interface{}, 1),
cacheClosed: make(chan interface{}),
providers: providers,
defaultProviderID: cloudsackID,
stopCh: make(chan interface{}, 10),
}
go d.maintainCache(cacheFile)
return d
go f.maintainCache(cacheFile)
return f
}

now := time.Now()
mb := &masquerade{Masquerade: Masquerade{Domain: "b", IpAddress: "2"}, LastSucceeded: now, ProviderID: testProviderID}
mc := &masquerade{Masquerade: Masquerade{Domain: "c", IpAddress: "3"}, LastSucceeded: now, ProviderID: ""} // defaulted
md := &masquerade{Masquerade: Masquerade{Domain: "d", IpAddress: "4"}, LastSucceeded: now, ProviderID: "sadcloud"} // skipped
mb := &front{Masquerade: Masquerade{Domain: "b", IpAddress: "2"}, LastSucceeded: now, ProviderID: testProviderID}
mc := &front{Masquerade: Masquerade{Domain: "c", IpAddress: "3"}, LastSucceeded: now, ProviderID: ""} // defaulted
md := &front{Masquerade: Masquerade{Domain: "d", IpAddress: "4"}, LastSucceeded: now, ProviderID: "sadcloud"} // skipped

d := makeDirect()
d.masquerades = append(d.masquerades, mb, mc, md)
f := makeFronted()

readCached := func() []*masquerade {
var result []*masquerade
log.Debug("Adding fronts")
f.fronts = append(f.fronts, mb, mc, md)

readCached := func() []*front {
log.Debug("Reading cached fronts")
var result []*front
b, err := os.ReadFile(cacheFile)
require.NoError(t, err, "Unable to read cache file")
err = json.Unmarshal(b, &result)
Expand All @@ -59,22 +64,24 @@ func TestCaching(t *testing.T) {
}

// Save the cache
d.markCacheDirty()
time.Sleep(d.cacheSaveInterval * 2)
d.closeCache()
f.markCacheDirty()

time.Sleep(f.cacheSaveInterval * 2)
f.Close()

time.Sleep(50 * time.Millisecond)

log.Debug("Reopening fronted")
// Reopen cache file and make sure right data was in there
d = makeDirect()
d.prepopulateMasquerades(cacheFile)
f = makeFronted()
f.prepopulateFronts(cacheFile)
masquerades := readCached()
require.Len(t, masquerades, 3, "Wrong number of masquerades read")
for i, expected := range []*masquerade{mb, mc, md} {
for i, expected := range []*front{mb, mc, md} {
require.Equal(t, expected.Domain, masquerades[i].Domain, "Wrong masquerade at position %d", i)
require.Equal(t, expected.IpAddress, masquerades[i].IpAddress, "Masquerade at position %d has wrong IpAddress", 0)
require.Equal(t, expected.ProviderID, masquerades[i].ProviderID, "Masquerade at position %d has wrong ProviderID", 0)
require.Equal(t, now.Unix(), masquerades[i].LastSucceeded.Unix(), "Masquerade at position %d has wrong LastSucceeded", 0)
}
d.closeCache()
f.Close()
}
55 changes: 55 additions & 0 deletions connecting_fronts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package fronted

import (
"context"
)

type connectingFronts interface {
onConnected(m Front)
connectingFront(context.Context) (Front, error)
size() int
}

type connecting struct {
// Create a channel of fronts that are connecting.
frontsCh chan Front
}

// Make sure that connectingFronts is a connectListener
var _ connectingFronts = &connecting{}

// newConnectingFronts creates a new ConnectingFronts struct with a channel of fronts that have
// successfully connected.
func newConnectingFronts(size int) *connecting {
return &connecting{
// We overallocate the channel to avoid blocking.
frontsCh: make(chan Front, size),
}
}

// onConnected adds a working front to the channel of working fronts.
func (cf *connecting) onConnected(m Front) {
cf.frontsCh <- m
}

func (cf *connecting) connectingFront(ctx context.Context) (Front, error) {
for {
select {
// This is typically the context of the HTTP request. If the context is done, return an error.
case <-ctx.Done():
return nil, ctx.Err()
case m := <-cf.frontsCh:
// The front may have stopped succeeding since we last checked,
// so only return it if it's still succeeding.
if m.isSucceeding() {
// Add the front back to the channel.
cf.frontsCh <- m
return m, nil
}
}
}
}

func (cf *connecting) size() int {
return len(cf.frontsCh)
}
39 changes: 39 additions & 0 deletions connecting_fronts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package fronted

import (
"testing"
)

func TestConnectingFrontsSize(t *testing.T) {
tests := []struct {
name string
setup func() *connecting
expected int
}{
{
name: "empty channel",
setup: func() *connecting {
return newConnectingFronts(10)
},
expected: 0,
},
{
name: "non-empty channel",
setup: func() *connecting {
cf := newConnectingFronts(10)
cf.onConnected(&mockFront{})
return cf
},
expected: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cf := tt.setup()
if got := cf.size(); got != tt.expected {
t.Errorf("size() = %d, want %d", got, tt.expected)
}
})
}
}
115 changes: 0 additions & 115 deletions context.go

This file was deleted.

Loading

0 comments on commit 24178df

Please sign in to comment.