Skip to content

Commit

Permalink
Reduce server allocations, GC churn, and CPU overhead for tactics fet…
Browse files Browse the repository at this point in the history
…ches

- Add tactics.Server.cachedTacticsData to cache prepared tactics payloads and
  hash tags for repeated tactics requests matching the same filters.

- Designate a single in-proxy proxy worker as the tactics checker and fetcher.
  • Loading branch information
rod-hynes committed Oct 14, 2024
1 parent 5b012e0 commit 9d0ae9f
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 37 deletions.
9 changes: 8 additions & 1 deletion psiphon/common/inproxy/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,26 @@ type ClientMetrics struct {
// ProxyAnnounceRequest is an API request sent from a proxy to a broker,
// announcing that it is available for a client connection. Proxies send one
// ProxyAnnounceRequest for each available client connection. The broker will
// match the proxy with a a client and return WebRTC connection information
// match the proxy with a client and return WebRTC connection information
// in the response.
//
// PersonalCompartmentIDs limits the clients to those that supply one of the
// specified compartment IDs; personal compartment IDs are distributed from
// proxy operators to client users out-of-band and provide optional access
// control.
//
// When CheckTactics is set, the broker will check for new tactics or indicate
// that the proxy's cached tactics TTL may be extended. Tactics information
// is returned in the response TacticsPayload. To minimize broker processing
// overhead, proxies with multiple workers should designate just one worker
// to set CheckTactics.
//
// The proxy's session public key is an implicit and cryptographically
// verified proxy ID.
type ProxyAnnounceRequest struct {
PersonalCompartmentIDs []ID `cbor:"1,keyasint,omitempty"`
Metrics *ProxyMetrics `cbor:"2,keyasint,omitempty"`
CheckTactics bool `cbor:"3,keyasint,omitempty"`
}

// WebRTCSessionDescription is compatible with pion/webrtc.SessionDescription
Expand Down
27 changes: 15 additions & 12 deletions psiphon/common/inproxy/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,22 +584,25 @@ func (b *Broker) handleProxyAnnounce(
// proxy can store and apply the new tactics before announcing again.

var tacticsPayload []byte
tacticsPayload, newTacticsTag, err = b.config.GetTacticsPayload(geoIPData, apiParams)
if err != nil {
return nil, errors.Trace(err)
}

if tacticsPayload != nil && newTacticsTag != "" {
responsePayload, err := MarshalProxyAnnounceResponse(
&ProxyAnnounceResponse{
TacticsPayload: tacticsPayload,
NoMatch: true,
})
if announceRequest.CheckTactics {
tacticsPayload, newTacticsTag, err =
b.config.GetTacticsPayload(geoIPData, apiParams)
if err != nil {
return nil, errors.Trace(err)
}

return responsePayload, nil
if tacticsPayload != nil && newTacticsTag != "" {
responsePayload, err := MarshalProxyAnnounceResponse(
&ProxyAnnounceResponse{
TacticsPayload: tacticsPayload,
NoMatch: true,
})
if err != nil {
return nil, errors.Trace(err)
}

return responsePayload, nil
}
}

// AllowProxy may be used to disallow proxies from certain geolocations,
Expand Down
15 changes: 12 additions & 3 deletions psiphon/common/inproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,16 @@ func (p *Proxy) Run(ctx context.Context) {
// trip is awaited so that:
//
// - The first announce response will arrive with any new tactics,
// avoiding a start up case where MaxClients initial, concurrent
// announces all return with no-match and a tactics payload.
// which may be applied before launching additions workers.
//
// - The first worker gets no announcement delay and is also guaranteed to
// be the shared session establisher. Since the announcement delays are
// applied _after_ waitToShareSession, it would otherwise be possible,
// with a race of MaxClient initial, concurrent announces, for the
// session establisher to be a different worker than the no-delay worker.
//
// The first worker is only the only proxy worker which sets
// ProxyAnnounceRequest.CheckMetrics.

signalFirstAnnounceCtx, signalFirstAnnounceDone :=
context.WithCancel(context.Background())
Expand Down Expand Up @@ -612,6 +614,10 @@ func (p *Proxy) proxyOneClient(
}
p.nextAnnounceMutex.Unlock()

// Only the first worker, which has signalAnnounceDone configured, checks
// for tactics.
checkTactics := signalAnnounceDone != nil

// A proxy ID is implicitly sent with requests; it's the proxy's session
// public key.
//
Expand All @@ -625,6 +631,7 @@ func (p *Proxy) proxyOneClient(
&ProxyAnnounceRequest{
PersonalCompartmentIDs: personalCompartmentIDs,
Metrics: metrics,
CheckTactics: checkTactics,
})
if logAnnounce() {
p.config.Logger.WithTraceFields(common.LogFields{
Expand All @@ -645,7 +652,9 @@ func (p *Proxy) proxyOneClient(
// discovery but proceed with handling the proxy announcement
// response as there may still be a match.

if p.config.HandleTacticsPayload(tacticsNetworkID, announceResponse.TacticsPayload) {
if p.config.HandleTacticsPayload(
tacticsNetworkID, announceResponse.TacticsPayload) {

p.resetNetworkDiscovery()
}
}
Expand Down
141 changes: 120 additions & 21 deletions psiphon/common/tactics/tactics.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,15 @@ import (
"io/ioutil"
"net/http"
"sort"
"strings"
"time"

"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
lrucache "github.com/cognusion/go-cache-lru"
"golang.org/x/crypto/nacl/box"
)

Expand All @@ -189,6 +191,7 @@ const (
AGGREGATION_MINIMUM = "Minimum"
AGGREGATION_MAXIMUM = "Maximum"
AGGREGATION_MEDIAN = "Median"
PAYLOAD_CACHE_SIZE = 256
)

var (
Expand Down Expand Up @@ -250,6 +253,9 @@ type Server struct {
logger common.Logger
logFieldFormatter common.APIParameterLogFieldFormatter
apiParameterValidator common.APIParameterValidator

cachedTacticsData *lrucache.Cache
filterMatches []bool
}

const (
Expand Down Expand Up @@ -442,6 +448,8 @@ func NewServer(
logger: logger,
logFieldFormatter: logFieldFormatter,
apiParameterValidator: apiParameterValidator,
cachedTacticsData: lrucache.NewWithLRU(
lrucache.NoExpiration, 1*time.Minute, PAYLOAD_CACHE_SIZE),
}

server.ReloadableFile = common.NewReloadableFile(
Expand All @@ -467,6 +475,18 @@ func NewServer(
server.DefaultTactics = newServer.DefaultTactics
server.FilteredTactics = newServer.FilteredTactics

// Any cached, merged tactics data is flushed when the
// configuration changes.
//
// A single filterMatches, used in getTactics, is allocated here
// to avoid allocating a slice per getTactics call.
//
// Server.ReloadableFile.RLock/RUnlock is the mutex for accessing
// these and other Server fields.

server.cachedTacticsData.Flush()
server.filterMatches = make([]bool, len(server.FilteredTactics))

server.initLookups()

server.loaded = true
Expand Down Expand Up @@ -730,29 +750,26 @@ func (server *Server) GetFilterGeoIPScope(geoIPData common.GeoIPData) int {
//
// Elements of the returned Payload, e.g., tactics parameters, will point to
// data in DefaultTactics and FilteredTactics and must not be modifed.
//
// Callers must not mutate returned tactics data, which is cached.
func (server *Server) GetTacticsPayload(
geoIPData common.GeoIPData,
apiParams common.APIParameters) (*Payload, error) {

// includeServerSideOnly is false: server-side only parameters are not
// used by the client, so including them wastes space and unnecessarily
// exposes the values.
tactics, err := server.GetTactics(false, geoIPData, apiParams)
tacticsData, err := server.getTactics(false, geoIPData, apiParams)
if err != nil {
return nil, errors.Trace(err)
}

if tactics == nil {
if tacticsData == nil {
return nil, nil
}

marshaledTactics, tag, err := marshalTactics(tactics)
if err != nil {
return nil, errors.Trace(err)
}

payload := &Payload{
Tag: tag,
Tag: tacticsData.tag,
}

// New clients should always send STORED_TACTICS_TAG_PARAMETER_NAME. When they have no
Expand All @@ -777,7 +794,7 @@ func (server *Server) GetTacticsPayload(
}

if sendPayloadTactics {
payload.Tactics = marshaledTactics
payload.Tactics = tacticsData.payload
}

return payload, nil
Expand All @@ -797,37 +814,63 @@ func marshalTactics(tactics *Tactics) ([]byte, string, error) {
}

// GetTacticsWithTag returns a GetTactics value along with the associated tag value.
//
// Callers must not mutate returned tactics data, which is cached.
func (server *Server) GetTacticsWithTag(
includeServerSideOnly bool,
geoIPData common.GeoIPData,
apiParams common.APIParameters) (*Tactics, string, error) {

tactics, err := server.GetTactics(
tacticsData, err := server.getTactics(
includeServerSideOnly, geoIPData, apiParams)
if err != nil {
return nil, "", errors.Trace(err)
}

if tactics == nil {
if tacticsData == nil {
return nil, "", nil
}

_, tag, err := marshalTactics(tactics)
return tacticsData.tactics, tacticsData.tag, nil
}

// tacticsData is cached tactics data, including the merged Tactics object,
// the JSON marshaled paylod, and hashed tag.
type tacticsData struct {
tactics *Tactics
payload []byte
tag string
}

func newTacticsData(tactics *Tactics) (*tacticsData, error) {

payload, err := json.Marshal(tactics)
if err != nil {
return nil, "", errors.Trace(err)
return nil, errors.Trace(err)
}

return tactics, tag, nil
// MD5 hash is used solely as a data checksum and not for any security
// purpose.
digest := md5.Sum(payload)
tag := hex.EncodeToString(digest[:])

return &tacticsData{
tactics: tactics,
payload: payload,
tag: tag,
}, nil
}

// GetTactics assembles and returns tactics data for a client with the
// specified GeoIP, API parameter, and speed test attributes.
//
// The tactics return value may be nil.
func (server *Server) GetTactics(
//
// Callers must not mutate returned tactics data, which is cached.
func (server *Server) getTactics(
includeServerSideOnly bool,
geoIPData common.GeoIPData,
apiParams common.APIParameters) (*Tactics, error) {
apiParams common.APIParameters) (*tacticsData, error) {

server.ReloadableFile.RLock()
defer server.ReloadableFile.RUnlock()
Expand All @@ -837,11 +880,19 @@ func (server *Server) GetTactics(
return nil, nil
}

tactics := server.DefaultTactics.clone(includeServerSideOnly)
// Two passes are performed, one to get the list of matching filters, and
// then, if no merged tactics data is found for that filter match set,
// another pass to merge all the tactics parameters.

var aggregatedValues map[string]int
filterMatchCount := 0

for _, filteredTactics := range server.FilteredTactics {
// Use the preallocated slice to avoid an allocation per getTactics call.
filterMatches := server.filterMatches

for filterIndex, filteredTactics := range server.FilteredTactics {

filterMatches[filterIndex] = false

if len(filteredTactics.Filter.Regions) > 0 {
if filteredTactics.Filter.regionLookup != nil {
Expand Down Expand Up @@ -944,15 +995,63 @@ func (server *Server) GetTactics(
}
}

tactics.merge(includeServerSideOnly, &filteredTactics.Tactics)
filterMatchCount += 1
filterMatches[filterIndex] = true

// Continue to check for more matches. Last matching tactics filter
// has priority for any field.
}

// For any filter match set, the merged tactics parameters are the same,
// so the resulting merge is cached, along with the JSON encoding of the
// payload and hash tag. This cache reduces, for repeated tactics
// requests, heavy allocations from the JSON marshal and CPU load from
// both the marshal and hashing the marshal result.
//
// getCacheKey still allocates a strings.Builder buffer.

cacheKey := getCacheKey(filterMatchCount > 0, filterMatches)

// Continue to apply more matches. Last matching tactics has priority for any field.
cacheValue, ok := server.cachedTacticsData.Get(cacheKey)
if ok {
return cacheValue.(*tacticsData), nil
}

tactics := server.DefaultTactics.clone(includeServerSideOnly)
if filterMatchCount > 0 {
for filterIndex, filteredTactics := range server.FilteredTactics {
if filterMatches[filterIndex] {
tactics.merge(includeServerSideOnly, &filteredTactics.Tactics)
}
}
}

// See Tactics.Probability doc comment.
tactics.Probability = 1.0

return tactics, nil
tacticsData, err := newTacticsData(tactics)
if err != nil {
return nil, errors.Trace(err)
}

server.cachedTacticsData.Set(cacheKey, tacticsData, 0)

return tacticsData, nil
}

func getCacheKey(hasFilterMatches bool, filterMatches []bool) string {
// When no filters match, the key is "". The input hasFilterMatches allows
// for skipping the strings.Builder setup and loop entirely.
if !hasFilterMatches {
return ""
}
var b strings.Builder
for filterIndex, match := range filterMatches {
if match {
fmt.Fprintf(&b, "%x-", filterIndex)
}
}
return b.String()
}

// TODO: refactor this copy of psiphon/server.getStringRequestParam into common?
Expand Down
Loading

0 comments on commit 9d0ae9f

Please sign in to comment.