Skip to content

Commit

Permalink
Merge pull request #45 from getlantern/myles/parallel-masquerade-checks
Browse files Browse the repository at this point in the history
Parallelize masquerade lookup
  • Loading branch information
myleshorton authored Oct 28, 2024
2 parents 8358df2 + 59aaab8 commit 6616677
Show file tree
Hide file tree
Showing 8 changed files with 557 additions and 331 deletions.
29 changes: 20 additions & 9 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package fronted
import (
"encoding/json"
"os"
"path/filepath"
"time"
)

func (d *direct) initCaching(cacheFile string) {
func (d *fronted) initCaching(cacheFile string) {
d.prepopulateMasquerades(cacheFile)
go d.maintainCache(cacheFile)
}

func (d *direct) prepopulateMasquerades(cacheFile string) {
func (d *fronted) prepopulateMasquerades(cacheFile string) {
bytes, err := os.ReadFile(cacheFile)
if err != nil {
// This is not a big deal since we'll just fill the cache later
Expand All @@ -38,16 +39,16 @@ func (d *direct) prepopulateMasquerades(cacheFile string) {
// update last succeeded status of masquerades based on cached values
for _, m := range d.masquerades {
for _, cm := range cachedMasquerades {
sameMasquerade := cm.ProviderID == m.ProviderID && cm.Domain == m.Domain && cm.IpAddress == m.IpAddress
cachedValueFresh := now.Sub(m.LastSucceeded) < d.maxAllowedCachedAge
sameMasquerade := cm.ProviderID == m.getProviderID() && cm.Domain == m.getDomain() && cm.IpAddress == m.getIpAddress()
cachedValueFresh := now.Sub(m.lastSucceeded()) < d.maxAllowedCachedAge
if sameMasquerade && cachedValueFresh {
m.LastSucceeded = cm.LastSucceeded
m.setLastSucceeded(cm.LastSucceeded)
}
}
}
}

func (d *direct) markCacheDirty() {
func (d *fronted) markCacheDirty() {
select {
case d.cacheDirty <- nil:
// okay
Expand All @@ -56,7 +57,7 @@ func (d *direct) markCacheDirty() {
}
}

func (d *direct) maintainCache(cacheFile string) {
func (d *fronted) maintainCache(cacheFile string) {
for {
select {
case <-d.cacheClosed:
Expand All @@ -72,7 +73,7 @@ func (d *direct) maintainCache(cacheFile string) {
}
}

func (d *direct) updateCache(cacheFile string) {
func (d *fronted) updateCache(cacheFile string) {
log.Debugf("Updating cache at %v", cacheFile)
cache := d.masquerades.sortedCopy()
sizeToSave := len(cache)
Expand All @@ -87,10 +88,20 @@ func (d *direct) updateCache(cacheFile string) {
err = os.WriteFile(cacheFile, b, 0644)
if err != nil {
log.Errorf("Unable to save cache to disk: %v", err)
// Log the directory of the cache file and if it exists for debugging purposes
parent := filepath.Dir(cacheFile)
// check if the parent directory exists
if _, err := os.Stat(parent); err == nil {
// parent directory exists
log.Debugf("Parent directory of cache file exists: %v", parent)
} else {
// parent directory does not exist
log.Debugf("Parent directory of cache file does not exist: %v", parent)
}
}
}

func (d *direct) closeCache() {
func (d *fronted) closeCache() {
d.closeCacheOnce.Do(func() {
close(d.cacheClosed)
})
Expand Down
4 changes: 2 additions & 2 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestCaching(t *testing.T) {
cloudsackID: NewProvider(nil, "", nil, nil, nil, nil, nil),
}

makeDirect := func() *direct {
d := &direct{
makeDirect := func() *fronted {
d := &fronted{
masquerades: make(sortedMasquerades, 0, 1000),
maxAllowedCachedAge: 250 * time.Millisecond,
maxCacheSize: 4,
Expand Down
53 changes: 14 additions & 39 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func Configure(pool *x509.CertPool, providers map[string]*Provider, defaultProvi
}
}

// NewDirect creates a new http.RoundTripper that does direct domain fronting
// NewFronted creates a new http.RoundTripper that does direct domain fronting
// using the default context. If the default context isn't configured within
// the given timeout, this method returns nil, false.
func NewDirect(timeout time.Duration) (http.RoundTripper, bool) {
return DefaultContext.NewDirect(timeout)
func NewFronted(timeout time.Duration) (http.RoundTripper, bool) {
return DefaultContext.NewFronted(timeout)
}

// Close closes any existing cache file in the default context
Expand Down Expand Up @@ -68,55 +68,30 @@ func (fctx *FrontingContext) ConfigureWithHello(pool *x509.CertPool, providers m

_existing, ok := fctx.instance.Get(0)
if ok && _existing != nil {
existing := _existing.(*direct)
existing := _existing.(*fronted)
log.Debugf("Closing cache from existing instance for %s context", fctx.name)
existing.closeCache()
}

size := 0
for _, p := range providers {
size += len(p.Masquerades)
_, err := newFronted(pool, providers, defaultProviderID, cacheFile, clientHelloID, func(f *fronted) {
fctx.instance.Set(f)
})
if err != nil {
return err
}

if size == 0 {
return fmt.Errorf("no masquerades for %s context", fctx.name)
}

d := &direct{
certPool: pool,
masquerades: make(sortedMasquerades, 0, size),
maxAllowedCachedAge: defaultMaxAllowedCachedAge,
maxCacheSize: defaultMaxCacheSize,
cacheSaveInterval: defaultCacheSaveInterval,
cacheDirty: make(chan interface{}, 1),
cacheClosed: make(chan interface{}),
defaultProviderID: defaultProviderID,
providers: make(map[string]*Provider),
clientHelloID: clientHelloID,
}

// copy providers
for k, p := range providers {
d.providers[k] = NewProvider(p.HostAliases, p.TestURL, p.Masquerades, p.Validator, p.PassthroughPatterns, p.SNIConfig, p.VerifyHostname)
}

d.loadCandidates(d.providers)
if cacheFile != "" {
d.initCaching(cacheFile)
}
go d.vet(numberToVetInitially)
fctx.instance.Set(d)
return nil
}

// NewDirect creates a new http.RoundTripper that does direct domain fronting.
// NewFronted creates a new http.RoundTripper that does direct domain fronting.
// If the context isn't configured within the given timeout, this method
// returns nil, false.
func (fctx *FrontingContext) NewDirect(timeout time.Duration) (http.RoundTripper, bool) {
func (fctx *FrontingContext) NewFronted(timeout time.Duration) (http.RoundTripper, bool) {
instance, ok := fctx.instance.Get(timeout)
if !ok {
log.Errorf("No DirectHttpClient available within %v for context %s", timeout, fctx.name)
return nil, false
} else {
log.Debugf("DirectHttpClient available for context %s", fctx.name)
}
return instance.(http.RoundTripper), true
}
Expand All @@ -125,7 +100,7 @@ func (fctx *FrontingContext) NewDirect(timeout time.Duration) (http.RoundTripper
func (fctx *FrontingContext) Close() {
_existing, ok := fctx.instance.Get(0)
if ok && _existing != nil {
existing := _existing.(*direct)
existing := _existing.(*fronted)
log.Debugf("Closing cache from existing instance in %s context", fctx.name)
existing.closeCache()
}
Expand Down
4 changes: 2 additions & 2 deletions default_masquerades.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ var DefaultTrustedCAs = []*CA{
var DefaultAkamaiMasquerades = []*Masquerade{
{
Domain: "a248.e.akamai.net",
IpAddress: "23.53.122.84",
IpAddress: "104.117.247.143",
},
{
Domain: "a248.e.akamai.net",
IpAddress: "2.19.198.29",
IpAddress: "23.47.194.73",
},
}

Expand Down
Loading

0 comments on commit 6616677

Please sign in to comment.