Skip to content

Commit

Permalink
Lots of cleanups to more cleanly handle continually finding working f…
Browse files Browse the repository at this point in the history
…ronts
  • Loading branch information
myleshorton committed Dec 3, 2024
1 parent 12a4450 commit 6b6fc13
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 387 deletions.
9 changes: 5 additions & 4 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (d *fronted) prepopulateMasquerades(cacheFile string) {
}

log.Debugf("Attempting to prepopulate masquerades from cache file: %v", cacheFile)
var cachedMasquerades []*masquerade
var cachedMasquerades []*front
if err := json.Unmarshal(bytes, &cachedMasquerades); err != nil {
log.Errorf("Error reading cached masquerades: %v", err)
return
Expand All @@ -37,7 +37,7 @@ func (d *fronted) prepopulateMasquerades(cacheFile string) {
now := time.Now()

// update last succeeded status of masquerades based on cached values
for _, m := range d.masquerades {
for _, m := range d.fronts {
for _, cm := range cachedMasquerades {
sameMasquerade := cm.ProviderID == m.getProviderID() && cm.Domain == m.getDomain() && cm.IpAddress == m.getIpAddress()
cachedValueFresh := now.Sub(m.lastSucceeded()) < d.maxAllowedCachedAge
Expand Down Expand Up @@ -75,7 +75,7 @@ func (d *fronted) maintainCache(cacheFile string) {

func (d *fronted) updateCache(cacheFile string) {
log.Debugf("Updating cache at %v", cacheFile)
cache := d.masquerades.sortedCopy()
cache := d.fronts.sortedCopy()
sizeToSave := len(cache)
if d.maxCacheSize < sizeToSave {
sizeToSave = d.maxCacheSize
Expand All @@ -101,8 +101,9 @@ func (d *fronted) updateCache(cacheFile string) {
}
}

func (d *fronted) closeCache() {
func (d *fronted) Close() {
d.closeCacheOnce.Do(func() {
close(d.cacheClosed)
})
d.stopCh <- nil
}
38 changes: 19 additions & 19 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func TestCaching(t *testing.T) {
cloudsackID: NewProvider(nil, "", nil, nil, nil, nil, nil),
}

makeDirect := func() *fronted {
d := &fronted{
masquerades: make(sortedMasquerades, 0, 1000),
makeFronted := func() *fronted {
f := &fronted{
fronts: make(sortedFronts, 0, 1000),
maxAllowedCachedAge: 250 * time.Millisecond,
maxCacheSize: 4,
cacheSaveInterval: 50 * time.Millisecond,
Expand All @@ -37,20 +37,20 @@ func TestCaching(t *testing.T) {
providers: providers,
defaultProviderID: cloudsackID,
}
go d.maintainCache(cacheFile)
return d
go f.maintainCache(cacheFile)
return f
}

now := time.Now()
mb := &masquerade{Masquerade: Masquerade{Domain: "b", IpAddress: "2"}, LastSucceeded: now, ProviderID: testProviderID}
mc := &masquerade{Masquerade: Masquerade{Domain: "c", IpAddress: "3"}, LastSucceeded: now, ProviderID: ""} // defaulted
md := &masquerade{Masquerade: Masquerade{Domain: "d", IpAddress: "4"}, LastSucceeded: now, ProviderID: "sadcloud"} // skipped
mb := &front{Masquerade: Masquerade{Domain: "b", IpAddress: "2"}, LastSucceeded: now, ProviderID: testProviderID}
mc := &front{Masquerade: Masquerade{Domain: "c", IpAddress: "3"}, LastSucceeded: now, ProviderID: ""} // defaulted
md := &front{Masquerade: Masquerade{Domain: "d", IpAddress: "4"}, LastSucceeded: now, ProviderID: "sadcloud"} // skipped

d := makeDirect()
d.masquerades = append(d.masquerades, mb, mc, md)
f := makeFronted()
f.fronts = append(f.fronts, mb, mc, md)

readCached := func() []*masquerade {
var result []*masquerade
readCached := func() []*front {
var result []*front
b, err := os.ReadFile(cacheFile)
require.NoError(t, err, "Unable to read cache file")
err = json.Unmarshal(b, &result)
Expand All @@ -59,22 +59,22 @@ func TestCaching(t *testing.T) {
}

// Save the cache
d.markCacheDirty()
time.Sleep(d.cacheSaveInterval * 2)
d.closeCache()
f.markCacheDirty()
time.Sleep(f.cacheSaveInterval * 2)
f.Close()

time.Sleep(50 * time.Millisecond)

// Reopen cache file and make sure right data was in there
d = makeDirect()
d.prepopulateMasquerades(cacheFile)
f = makeFronted()
f.prepopulateMasquerades(cacheFile)
masquerades := readCached()
require.Len(t, masquerades, 3, "Wrong number of masquerades read")
for i, expected := range []*masquerade{mb, mc, md} {
for i, expected := range []*front{mb, mc, md} {
require.Equal(t, expected.Domain, masquerades[i].Domain, "Wrong masquerade at position %d", i)
require.Equal(t, expected.IpAddress, masquerades[i].IpAddress, "Masquerade at position %d has wrong IpAddress", 0)
require.Equal(t, expected.ProviderID, masquerades[i].ProviderID, "Masquerade at position %d has wrong ProviderID", 0)
require.Equal(t, now.Unix(), masquerades[i].LastSucceeded.Unix(), "Masquerade at position %d has wrong LastSucceeded", 0)
}
d.closeCache()
f.Close()
}
69 changes: 27 additions & 42 deletions connecting_fronts.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,53 @@
package fronted

import (
"errors"
"sort"
"sync"
"time"
"context"
)

type connectTimeFront struct {
MasqueradeInterface
connectTime time.Duration
type workingFronts interface {
onConnected(m Front)
connectingFront(context.Context) (Front, error)
size() int
}

type connectingFronts struct {
fronts []connectTimeFront
//frontsChan chan MasqueradeInterface
sync.RWMutex
// Create a channel of fronts that are connecting.
frontsCh chan Front
}

// Make sure that connectingFronts is a connectListener
var _ workingFronts = &connectingFronts{}

// newConnectingFronts creates a new ConnectingFronts struct with an empty slice of Masquerade IPs and domains.
func newConnectingFronts() *connectingFronts {
func newConnectingFronts(size int) *connectingFronts {
return &connectingFronts{
fronts: make([]connectTimeFront, 0),
//frontsChan: make(chan MasqueradeInterface),
// We overallocate the channel to avoid blocking.
frontsCh: make(chan Front, size),
}
}

// AddFront adds a new front to the list of fronts.
func (cf *connectingFronts) onConnected(m MasqueradeInterface, connectTime time.Duration) {
cf.Lock()
defer cf.Unlock()

cf.fronts = append(cf.fronts, connectTimeFront{
MasqueradeInterface: m,
connectTime: connectTime,
})
// Sort fronts by connect time.
sort.Slice(cf.fronts, func(i, j int) bool {
return cf.fronts[i].connectTime < cf.fronts[j].connectTime
})
//cf.frontsChan <- m
func (cf *connectingFronts) onConnected(m Front) {
cf.frontsCh <- m
}

func (cf *connectingFronts) onError(m MasqueradeInterface) {
cf.Lock()
defer cf.Unlock()

// Remove the front from connecting fronts.
for i, front := range cf.fronts {
if front.MasqueradeInterface == m {
cf.fronts = append(cf.fronts[:i], cf.fronts[i+1:]...)
return
func (cf *connectingFronts) connectingFront(ctx context.Context) (Front, error) {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case m := <-cf.frontsCh:
// The front may have stopped succeeding since we last checked,
// so only return it if it's still succeeding.
if m.isSucceeding() {
// Add the front back to the channel.
cf.frontsCh <- m
return m, nil
}
}
}
}

func (cf *connectingFronts) workingFront() (MasqueradeInterface, error) {
cf.RLock()
defer cf.RUnlock()
if len(cf.fronts) == 0 {
return nil, errors.New("no fronts available")
}
return cf.fronts[0].MasqueradeInterface, nil
func (cf *connectingFronts) size() int {
return len(cf.frontsCh)
}
39 changes: 39 additions & 0 deletions connecting_fronts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package fronted

import (
"testing"
)

func TestConnectingFrontsSize(t *testing.T) {
tests := []struct {
name string
setup func() *connectingFronts
expected int
}{
{
name: "empty channel",
setup: func() *connectingFronts {
return newConnectingFronts(10)
},
expected: 0,
},
{
name: "non-empty channel",
setup: func() *connectingFronts {
cf := newConnectingFronts(10)
cf.onConnected(&mockFront{})
return cf
},
expected: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cf := tt.setup()
if got := cf.size(); got != tt.expected {
t.Errorf("size() = %d, want %d", got, tt.expected)
}
})
}
}
121 changes: 0 additions & 121 deletions context.go

This file was deleted.

Loading

0 comments on commit 6b6fc13

Please sign in to comment.