From 9d0ae9fcb04230385b9c4362733173f00ca44cc4 Mon Sep 17 00:00:00 2001 From: Rod Hynes Date: Mon, 14 Oct 2024 15:36:03 -0400 Subject: [PATCH] Reduce server allocations, GC churn, and CPU overhead for tactics fetches - Add tactics.Server.cachedTacticsData to cache prepared tactics payloads and hash tags for repeated tactics requests matching the same filters. - Designate a single in-proxy proxy worker as the tactics checker and fetcher. --- psiphon/common/inproxy/api.go | 9 +- psiphon/common/inproxy/broker.go | 27 ++--- psiphon/common/inproxy/proxy.go | 15 ++- psiphon/common/tactics/tactics.go | 141 +++++++++++++++++++++---- psiphon/common/tactics/tactics_test.go | 71 +++++++++++++ psiphon/server/tactics.go | 11 ++ 6 files changed, 237 insertions(+), 37 deletions(-) diff --git a/psiphon/common/inproxy/api.go b/psiphon/common/inproxy/api.go index 9bde2342c..0202e1e41 100644 --- a/psiphon/common/inproxy/api.go +++ b/psiphon/common/inproxy/api.go @@ -215,7 +215,7 @@ type ClientMetrics struct { // ProxyAnnounceRequest is an API request sent from a proxy to a broker, // announcing that it is available for a client connection. Proxies send one // ProxyAnnounceRequest for each available client connection. The broker will -// match the proxy with a a client and return WebRTC connection information +// match the proxy with a client and return WebRTC connection information // in the response. // // PersonalCompartmentIDs limits the clients to those that supply one of the @@ -223,11 +223,18 @@ type ClientMetrics struct { // proxy operators to client users out-of-band and provide optional access // control. // +// When CheckTactics is set, the broker will check for new tactics or indicate +// that the proxy's cached tactics TTL may be extended. Tactics information +// is returned in the response TacticsPayload. To minimize broker processing +// overhead, proxies with multiple workers should designate just one worker +// to set CheckTactics. +// // The proxy's session public key is an implicit and cryptographically // verified proxy ID. type ProxyAnnounceRequest struct { PersonalCompartmentIDs []ID `cbor:"1,keyasint,omitempty"` Metrics *ProxyMetrics `cbor:"2,keyasint,omitempty"` + CheckTactics bool `cbor:"3,keyasint,omitempty"` } // WebRTCSessionDescription is compatible with pion/webrtc.SessionDescription diff --git a/psiphon/common/inproxy/broker.go b/psiphon/common/inproxy/broker.go index 5ebadf6e3..8be4409b3 100644 --- a/psiphon/common/inproxy/broker.go +++ b/psiphon/common/inproxy/broker.go @@ -584,22 +584,25 @@ func (b *Broker) handleProxyAnnounce( // proxy can store and apply the new tactics before announcing again. var tacticsPayload []byte - tacticsPayload, newTacticsTag, err = b.config.GetTacticsPayload(geoIPData, apiParams) - if err != nil { - return nil, errors.Trace(err) - } - - if tacticsPayload != nil && newTacticsTag != "" { - responsePayload, err := MarshalProxyAnnounceResponse( - &ProxyAnnounceResponse{ - TacticsPayload: tacticsPayload, - NoMatch: true, - }) + if announceRequest.CheckTactics { + tacticsPayload, newTacticsTag, err = + b.config.GetTacticsPayload(geoIPData, apiParams) if err != nil { return nil, errors.Trace(err) } - return responsePayload, nil + if tacticsPayload != nil && newTacticsTag != "" { + responsePayload, err := MarshalProxyAnnounceResponse( + &ProxyAnnounceResponse{ + TacticsPayload: tacticsPayload, + NoMatch: true, + }) + if err != nil { + return nil, errors.Trace(err) + } + + return responsePayload, nil + } } // AllowProxy may be used to disallow proxies from certain geolocations, diff --git a/psiphon/common/inproxy/proxy.go b/psiphon/common/inproxy/proxy.go index b5d46c4c6..f221bc31f 100644 --- a/psiphon/common/inproxy/proxy.go +++ b/psiphon/common/inproxy/proxy.go @@ -202,14 +202,16 @@ func (p *Proxy) Run(ctx context.Context) { // trip is awaited so that: // // - The first announce response will arrive with any new tactics, - // avoiding a start up case where MaxClients initial, concurrent - // announces all return with no-match and a tactics payload. + // which may be applied before launching additions workers. // // - The first worker gets no announcement delay and is also guaranteed to // be the shared session establisher. Since the announcement delays are // applied _after_ waitToShareSession, it would otherwise be possible, // with a race of MaxClient initial, concurrent announces, for the // session establisher to be a different worker than the no-delay worker. + // + // The first worker is only the only proxy worker which sets + // ProxyAnnounceRequest.CheckMetrics. signalFirstAnnounceCtx, signalFirstAnnounceDone := context.WithCancel(context.Background()) @@ -612,6 +614,10 @@ func (p *Proxy) proxyOneClient( } p.nextAnnounceMutex.Unlock() + // Only the first worker, which has signalAnnounceDone configured, checks + // for tactics. + checkTactics := signalAnnounceDone != nil + // A proxy ID is implicitly sent with requests; it's the proxy's session // public key. // @@ -625,6 +631,7 @@ func (p *Proxy) proxyOneClient( &ProxyAnnounceRequest{ PersonalCompartmentIDs: personalCompartmentIDs, Metrics: metrics, + CheckTactics: checkTactics, }) if logAnnounce() { p.config.Logger.WithTraceFields(common.LogFields{ @@ -645,7 +652,9 @@ func (p *Proxy) proxyOneClient( // discovery but proceed with handling the proxy announcement // response as there may still be a match. - if p.config.HandleTacticsPayload(tacticsNetworkID, announceResponse.TacticsPayload) { + if p.config.HandleTacticsPayload( + tacticsNetworkID, announceResponse.TacticsPayload) { + p.resetNetworkDiscovery() } } diff --git a/psiphon/common/tactics/tactics.go b/psiphon/common/tactics/tactics.go index cc0503a27..f9bd2b77c 100644 --- a/psiphon/common/tactics/tactics.go +++ b/psiphon/common/tactics/tactics.go @@ -157,6 +157,7 @@ import ( "io/ioutil" "net/http" "sort" + "strings" "time" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common" @@ -164,6 +165,7 @@ import ( "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng" + lrucache "github.com/cognusion/go-cache-lru" "golang.org/x/crypto/nacl/box" ) @@ -189,6 +191,7 @@ const ( AGGREGATION_MINIMUM = "Minimum" AGGREGATION_MAXIMUM = "Maximum" AGGREGATION_MEDIAN = "Median" + PAYLOAD_CACHE_SIZE = 256 ) var ( @@ -250,6 +253,9 @@ type Server struct { logger common.Logger logFieldFormatter common.APIParameterLogFieldFormatter apiParameterValidator common.APIParameterValidator + + cachedTacticsData *lrucache.Cache + filterMatches []bool } const ( @@ -442,6 +448,8 @@ func NewServer( logger: logger, logFieldFormatter: logFieldFormatter, apiParameterValidator: apiParameterValidator, + cachedTacticsData: lrucache.NewWithLRU( + lrucache.NoExpiration, 1*time.Minute, PAYLOAD_CACHE_SIZE), } server.ReloadableFile = common.NewReloadableFile( @@ -467,6 +475,18 @@ func NewServer( server.DefaultTactics = newServer.DefaultTactics server.FilteredTactics = newServer.FilteredTactics + // Any cached, merged tactics data is flushed when the + // configuration changes. + // + // A single filterMatches, used in getTactics, is allocated here + // to avoid allocating a slice per getTactics call. + // + // Server.ReloadableFile.RLock/RUnlock is the mutex for accessing + // these and other Server fields. + + server.cachedTacticsData.Flush() + server.filterMatches = make([]bool, len(server.FilteredTactics)) + server.initLookups() server.loaded = true @@ -730,6 +750,8 @@ func (server *Server) GetFilterGeoIPScope(geoIPData common.GeoIPData) int { // // Elements of the returned Payload, e.g., tactics parameters, will point to // data in DefaultTactics and FilteredTactics and must not be modifed. +// +// Callers must not mutate returned tactics data, which is cached. func (server *Server) GetTacticsPayload( geoIPData common.GeoIPData, apiParams common.APIParameters) (*Payload, error) { @@ -737,22 +759,17 @@ func (server *Server) GetTacticsPayload( // includeServerSideOnly is false: server-side only parameters are not // used by the client, so including them wastes space and unnecessarily // exposes the values. - tactics, err := server.GetTactics(false, geoIPData, apiParams) + tacticsData, err := server.getTactics(false, geoIPData, apiParams) if err != nil { return nil, errors.Trace(err) } - if tactics == nil { + if tacticsData == nil { return nil, nil } - marshaledTactics, tag, err := marshalTactics(tactics) - if err != nil { - return nil, errors.Trace(err) - } - payload := &Payload{ - Tag: tag, + Tag: tacticsData.tag, } // New clients should always send STORED_TACTICS_TAG_PARAMETER_NAME. When they have no @@ -777,7 +794,7 @@ func (server *Server) GetTacticsPayload( } if sendPayloadTactics { - payload.Tactics = marshaledTactics + payload.Tactics = tacticsData.payload } return payload, nil @@ -797,37 +814,63 @@ func marshalTactics(tactics *Tactics) ([]byte, string, error) { } // GetTacticsWithTag returns a GetTactics value along with the associated tag value. +// +// Callers must not mutate returned tactics data, which is cached. func (server *Server) GetTacticsWithTag( includeServerSideOnly bool, geoIPData common.GeoIPData, apiParams common.APIParameters) (*Tactics, string, error) { - tactics, err := server.GetTactics( + tacticsData, err := server.getTactics( includeServerSideOnly, geoIPData, apiParams) if err != nil { return nil, "", errors.Trace(err) } - if tactics == nil { + if tacticsData == nil { return nil, "", nil } - _, tag, err := marshalTactics(tactics) + return tacticsData.tactics, tacticsData.tag, nil +} + +// tacticsData is cached tactics data, including the merged Tactics object, +// the JSON marshaled paylod, and hashed tag. +type tacticsData struct { + tactics *Tactics + payload []byte + tag string +} + +func newTacticsData(tactics *Tactics) (*tacticsData, error) { + + payload, err := json.Marshal(tactics) if err != nil { - return nil, "", errors.Trace(err) + return nil, errors.Trace(err) } - return tactics, tag, nil + // MD5 hash is used solely as a data checksum and not for any security + // purpose. + digest := md5.Sum(payload) + tag := hex.EncodeToString(digest[:]) + + return &tacticsData{ + tactics: tactics, + payload: payload, + tag: tag, + }, nil } // GetTactics assembles and returns tactics data for a client with the // specified GeoIP, API parameter, and speed test attributes. // // The tactics return value may be nil. -func (server *Server) GetTactics( +// +// Callers must not mutate returned tactics data, which is cached. +func (server *Server) getTactics( includeServerSideOnly bool, geoIPData common.GeoIPData, - apiParams common.APIParameters) (*Tactics, error) { + apiParams common.APIParameters) (*tacticsData, error) { server.ReloadableFile.RLock() defer server.ReloadableFile.RUnlock() @@ -837,11 +880,19 @@ func (server *Server) GetTactics( return nil, nil } - tactics := server.DefaultTactics.clone(includeServerSideOnly) + // Two passes are performed, one to get the list of matching filters, and + // then, if no merged tactics data is found for that filter match set, + // another pass to merge all the tactics parameters. var aggregatedValues map[string]int + filterMatchCount := 0 - for _, filteredTactics := range server.FilteredTactics { + // Use the preallocated slice to avoid an allocation per getTactics call. + filterMatches := server.filterMatches + + for filterIndex, filteredTactics := range server.FilteredTactics { + + filterMatches[filterIndex] = false if len(filteredTactics.Filter.Regions) > 0 { if filteredTactics.Filter.regionLookup != nil { @@ -944,15 +995,63 @@ func (server *Server) GetTactics( } } - tactics.merge(includeServerSideOnly, &filteredTactics.Tactics) + filterMatchCount += 1 + filterMatches[filterIndex] = true + + // Continue to check for more matches. Last matching tactics filter + // has priority for any field. + } + + // For any filter match set, the merged tactics parameters are the same, + // so the resulting merge is cached, along with the JSON encoding of the + // payload and hash tag. This cache reduces, for repeated tactics + // requests, heavy allocations from the JSON marshal and CPU load from + // both the marshal and hashing the marshal result. + // + // getCacheKey still allocates a strings.Builder buffer. + + cacheKey := getCacheKey(filterMatchCount > 0, filterMatches) - // Continue to apply more matches. Last matching tactics has priority for any field. + cacheValue, ok := server.cachedTacticsData.Get(cacheKey) + if ok { + return cacheValue.(*tacticsData), nil + } + + tactics := server.DefaultTactics.clone(includeServerSideOnly) + if filterMatchCount > 0 { + for filterIndex, filteredTactics := range server.FilteredTactics { + if filterMatches[filterIndex] { + tactics.merge(includeServerSideOnly, &filteredTactics.Tactics) + } + } } // See Tactics.Probability doc comment. tactics.Probability = 1.0 - return tactics, nil + tacticsData, err := newTacticsData(tactics) + if err != nil { + return nil, errors.Trace(err) + } + + server.cachedTacticsData.Set(cacheKey, tacticsData, 0) + + return tacticsData, nil +} + +func getCacheKey(hasFilterMatches bool, filterMatches []bool) string { + // When no filters match, the key is "". The input hasFilterMatches allows + // for skipping the strings.Builder setup and loop entirely. + if !hasFilterMatches { + return "" + } + var b strings.Builder + for filterIndex, match := range filterMatches { + if match { + fmt.Fprintf(&b, "%x-", filterIndex) + } + } + return b.String() } // TODO: refactor this copy of psiphon/server.getStringRequestParam into common? diff --git a/psiphon/common/tactics/tactics_test.go b/psiphon/common/tactics/tactics_test.go index de189b82e..aa3bda087 100644 --- a/psiphon/common/tactics/tactics_test.go +++ b/psiphon/common/tactics/tactics_test.go @@ -93,6 +93,16 @@ func TestTactics(t *testing.T) { } } }, + { + "Filter" : { + "APIParameters" : {"client_platform" : ["P2"], "client_version": ["V2"]} + }, + "Tactics" : { + "Parameters" : { + "ConnectionWorkerPoolSize" : 1 + } + } + }, { "Filter" : { "Regions": ["R2"] @@ -323,6 +333,24 @@ func TestTactics(t *testing.T) { } } + // Helper to check server-side cachedTacticsData state + + checkServerCache := func(cacheEntryFilterMatches ...[]bool) { + + cacheItems := server.cachedTacticsData.Items() + if len(cacheItems) != len(cacheEntryFilterMatches) { + t.Fatalf("Unexpected cachedTacticsData size: %v", len(cacheItems)) + } + + for _, filterMatches := range cacheEntryFilterMatches { + cacheKey := getCacheKey(true, filterMatches) + _, ok := server.cachedTacticsData.Get(cacheKey) + if !ok { + t.Fatalf("Unexpected missing cachedTacticsData entry: %v", filterMatches) + } + } + } + // Initial tactics request; will also run a speed test // Request should complete in < 1 second @@ -352,6 +380,10 @@ func TestTactics(t *testing.T) { checkParameters(initialFetchTacticsRecord) + // Server should be caching tactics data for tactics matching first two + // filters. + checkServerCache([]bool{true, true, false, false, false}) + // There should now be cached local tactics storedTacticsRecord, err := UseStoredTactics(storer, networkID) @@ -434,6 +466,9 @@ func TestTactics(t *testing.T) { checkParameters(fetchTacticsRecord) + // Server cache should be the same + checkServerCache([]bool{true, true, false, false, false}) + // Modify tactics configuration to change payload tacticsConnectionWorkerPoolSize = 6 @@ -474,6 +509,9 @@ func TestTactics(t *testing.T) { t.Fatalf("Server config failed to reload") } + // Server cache should be flushed + checkServerCache() + // Next fetch should return a different payload fetchTacticsRecord, err = FetchTactics( @@ -509,6 +547,8 @@ func TestTactics(t *testing.T) { checkParameters(fetchTacticsRecord) + checkServerCache([]bool{true, true, false, false, false}) + // Exercise handshake transport of tactics // Wait for tactics to expire; handshake should renew @@ -563,6 +603,8 @@ func TestTactics(t *testing.T) { checkParameters(handshakeTacticsRecord) + checkServerCache([]bool{true, true, false, false, false}) + // Now there should be stored tactics storedTacticsRecord, err = UseStoredTactics(storer, networkID) @@ -596,6 +638,35 @@ func TestTactics(t *testing.T) { t.Fatalf("unexpected stored tactics record") } + // Server should cache a new entry for different filter matches + + apiParams2 := common.APIParameters{ + "client_platform": "P2", + "client_version": "V2"} + + fetchTacticsRecord, err = FetchTactics( + context.Background(), + params, + storer, + getNetworkID, + apiParams2, + endPointProtocol, + endPointRegion, + encodedRequestPublicKey, + encodedObfuscatedKey, + obfuscatedRoundTripper) + if err != nil { + t.Fatalf("FetchTactics failed: %s", err) + } + + if fetchTacticsRecord == nil { + t.Fatalf("expected tactics record") + } + + checkServerCache( + []bool{true, true, false, false, false}, + []bool{false, false, true, false, false}) + // Exercise speed test sample truncation maxSamples := params.Get().Int(parameters.SpeedTestMaxSampleCount) diff --git a/psiphon/server/tactics.go b/psiphon/server/tactics.go index 0445fa824..78ea0e14a 100644 --- a/psiphon/server/tactics.go +++ b/psiphon/server/tactics.go @@ -135,6 +135,17 @@ func (c *ServerTacticsParametersCache) Get( // Construct parameters from tactics. + // Note: since ServerTacticsParametersCache was implemented, + // tactics.Server.cachedTacticsData was added. This new cache is + // primarily intended to reduce server allocations and computations + // when _clients_ request tactics. cachedTacticsData also impacts + // GetTacticsWithTag. + // + // ServerTacticsParametersCache still optimizes performance for + // server-side tactics, since cachedTacticsData doesn't avoid filter + // checks, and ServerTacticsParametersCache includes a prepared + // parameters.ParametersAccessor. + tactics, tag, err := c.support.TacticsServer.GetTacticsWithTag( true, common.GeoIPData(geoIPData), make(common.APIParameters)) if err != nil {