From 37a823823d1bd8bb0d3f17b66f8af87138d2fb88 Mon Sep 17 00:00:00 2001 From: Ox Cart Date: Fri, 20 Oct 2023 10:10:34 -0500 Subject: [PATCH] Reenable reporting of origin_bytes to Teleport --- http_proxy.go | 67 ++++++++------------- instrument/instrument.go | 123 ++++++++++++++++++++------------------- otel/otel.go | 25 ++++---- 3 files changed, 97 insertions(+), 118 deletions(-) diff --git a/http_proxy.go b/http_proxy.go index 968d6d2d..730fffb9 100644 --- a/http_proxy.go +++ b/http_proxy.go @@ -261,11 +261,11 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error { OKDoesNotWaitForUpstream: !p.ConnectOKWaitsForUpstream, OnError: instrumentedErrorHandler, }) - stopTraces := p.configureTraces() - defer stopTraces() + stopProxiedBytes := p.configureTeleportProxiedBytes() + defer stopProxiedBytes() - stopTeleport := p.configureTeleport() - defer stopTeleport() + stopOriginBytes := p.configureTeleportOriginBytes() + defer stopOriginBytes() stopMetrics, err := p.configureOTELMetrics() if err != nil { @@ -671,58 +671,39 @@ func (p *Proxy) createFilterChain(bl *blacklist.Blacklist) (filters.Chain, proxy }, nil } -func (p *Proxy) configureTraces() func() { - if true { - log.Debug("Tracing currently disabled until we figure out how to handle traces in the centralized OTEL collector") +func (p *Proxy) configureTeleportProxiedBytes() func() { + if p.TeleportSampleRate <= 0 { + log.Debug("Not configuring Teleport proxied bytes") return func() {} } - if p.TracesSampleRate <= 0 { - log.Debug("Not configuring tracing") - return func() {} + log.Debug("Configuring Teleport proxied bytes") + tp, stop := otel.BuildTracerProvider(p.buildOTELOpts(teleportHost, true)) + if tp != nil { + go p.instrument.ReportProxiedBytesPeriodically(1*time.Hour, tp) + ogStop := stop + stop = func() { + p.instrument.ReportProxiedBytes(tp) + ogStop() + } } - - log.Debug("Configuring tracing") - return p.configureOTEL( - teleportHost, - p.TracesSampleRate, - 1*time.Minute, - false, - true, - ) + return stop } -func (p *Proxy) configureTeleport() func() { +func (p *Proxy) configureTeleportOriginBytes() func() { if p.TeleportSampleRate <= 0 { - log.Debug("Not configuring Teleport") + log.Debug("Not configuring Teleport origin bytes") return func() {} } - log.Debug("Configuring Teleport") - return p.configureOTEL( - teleportHost, - p.TeleportSampleRate, - 1*time.Hour, - true, - true, - ) -} - -func (p *Proxy) configureOTEL( - endpoint string, - sampleRate int, - reportingInterval time.Duration, - includeDeviceIDs bool, - includeProxyIdentity bool, -) func() { - opts := p.buildOTELOpts(endpoint, includeProxyIdentity) - opts.SampleRate = sampleRate - tp, stop := otel.BuildTracerProvider(opts) + log.Debug("Configuring Teleport origin bytes") + // Note - we do not include the proxy name here to avoid associating origin site usage with devices on that proxy name + tp, stop := otel.BuildTracerProvider(p.buildOTELOpts(teleportHost, false)) if tp != nil { - go p.instrument.ReportToOTELPeriodically(reportingInterval, tp, includeDeviceIDs) + go p.instrument.ReportOriginBytesPeriodically(1*time.Hour, tp) ogStop := stop stop = func() { - p.instrument.ReportToOTEL(tp, includeDeviceIDs) + p.instrument.ReportOriginBytes(tp) ogStop() } } diff --git a/instrument/instrument.go b/instrument/instrument.go index 3863c9aa..c7bab9cf 100644 --- a/instrument/instrument.go +++ b/instrument/instrument.go @@ -37,8 +37,10 @@ type Instrument interface { SuspectedProbing(ctx context.Context, fromIP net.IP, reason string) VersionCheck(ctx context.Context, redirect bool, method, reason string) ProxiedBytes(ctx context.Context, sent, recv int, platform, version, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost string) - ReportToOTELPeriodically(interval time.Duration, tp *sdktrace.TracerProvider, includeDeviceID bool) - ReportToOTEL(tp *sdktrace.TracerProvider, includeDeviceID bool) + ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) + ReportProxiedBytes(tp *sdktrace.TracerProvider) + ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) + ReportOriginBytes(tp *sdktrace.TracerProvider) quicSentPacket(ctx context.Context) quicLostPacket(ctx context.Context) } @@ -71,12 +73,14 @@ func (i NoInstrument) SuspectedProbing(ctx context.Context, fromIP net.IP, reaso func (i NoInstrument) VersionCheck(ctx context.Context, redirect bool, method, reason string) {} func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform, version, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost string) { } -func (i NoInstrument) ReportToOTELPeriodically(interval time.Duration, tp *sdktrace.TracerProvider, includeDeviceID bool) { +func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) { } -func (i NoInstrument) ReportToOTEL(tp *sdktrace.TracerProvider, includeDeviceID bool) { +func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {} +func (i NoInstrument) ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) { } -func (i NoInstrument) quicSentPacket(ctx context.Context) {} -func (i NoInstrument) quicLostPacket(ctx context.Context) {} +func (i NoInstrument) ReportOriginBytes(tp *sdktrace.TracerProvider) {} +func (i NoInstrument) quicSentPacket(ctx context.Context) {} +func (i NoInstrument) quicLostPacket(ctx context.Context) {} // CommonLabels defines a set of common labels apply to all metrics instrumented. type CommonLabels struct { @@ -90,14 +94,13 @@ type CommonLabels struct { // defaultInstrument is an implementation of Instrument which exports metrics // via open telemetry. type defaultInstrument struct { - countryLookup geo.CountryLookup - ispLookup geo.ISPLookup - filters map[string]filters.Filter - errorHandlers map[string]func(conn net.Conn, err error) - clientStats map[clientDetails]*usage - clientStatsWithDeviceID map[clientDetails]*usage - originStats map[originDetails]*usage - statsMx sync.Mutex + countryLookup geo.CountryLookup + ispLookup geo.ISPLookup + filters map[string]filters.Filter + errorHandlers map[string]func(conn net.Conn, err error) + clientStats map[clientDetails]*usage + originStats map[originDetails]*usage + statsMx sync.Mutex } func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup) (*defaultInstrument, error) { @@ -106,13 +109,12 @@ func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup) (*defa } p := &defaultInstrument{ - countryLookup: countryLookup, - ispLookup: ispLookup, - filters: make(map[string]filters.Filter), - errorHandlers: make(map[string]func(conn net.Conn, err error)), - clientStats: make(map[clientDetails]*usage), - clientStatsWithDeviceID: make(map[clientDetails]*usage), - originStats: make(map[originDetails]*usage), + countryLookup: countryLookup, + ispLookup: ispLookup, + filters: make(map[string]filters.Filter), + errorHandlers: make(map[string]func(conn net.Conn, err error)), + clientStats: make(map[clientDetails]*usage), + originStats: make(map[originDetails]*usage), } return p, nil @@ -271,15 +273,6 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int, ) clientKey := clientDetails{ - platform: platform, - version: version, - locale: locale, - country: country, - isp: isp, - asn: asn, - probingError: probingError, - } - clientKeyWithDeviceID := clientDetails{ deviceID: deviceID, platform: platform, version: version, @@ -308,7 +301,6 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int, ins.statsMx.Lock() ins.clientStats[clientKey] = ins.clientStats[clientKey].add(sent, recv) - ins.clientStatsWithDeviceID[clientKeyWithDeviceID] = ins.clientStatsWithDeviceID[clientKeyWithDeviceID].add(sent, recv) if hasOriginKey { ins.originStats[originKey] = ins.originStats[originKey].add(sent, recv) } @@ -395,7 +387,7 @@ func (u *usage) add(sent int, recv int) *usage { return u } -func (ins *defaultInstrument) ReportToOTELPeriodically(interval time.Duration, tp *sdktrace.TracerProvider, includeDeviceID bool) { +func (ins *defaultInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) { for { // We randomize the sleep time to avoid bursty submission to OpenTelemetry. // Even though each proxy sends relatively little data, proxies often run fairly @@ -403,22 +395,15 @@ func (ins *defaultInstrument) ReportToOTELPeriodically(interval time.Duration, t // time. By randomizing each proxy's interval, we smooth out the pattern of submissions. sleepInterval := rand.Int63n(int64(interval * 2)) time.Sleep(time.Duration(sleepInterval)) - ins.ReportToOTEL(tp, includeDeviceID) + ins.ReportProxiedBytes(tp) } } -func (ins *defaultInstrument) ReportToOTEL(tp *sdktrace.TracerProvider, includeDeviceID bool) { +func (ins *defaultInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) { var clientStats map[clientDetails]*usage ins.statsMx.Lock() - if includeDeviceID { - clientStats = ins.clientStatsWithDeviceID - ins.clientStatsWithDeviceID = make(map[clientDetails]*usage) - } else { - clientStats = ins.clientStats - ins.clientStats = make(map[clientDetails]*usage) - } - originStats := ins.originStats - ins.originStats = make(map[originDetails]*usage) + clientStats = ins.clientStats + ins.clientStats = make(map[clientDetails]*usage) ins.statsMx.Unlock() for key, value := range clientStats { @@ -440,24 +425,40 @@ func (ins *defaultInstrument) ReportToOTEL(tp *sdktrace.TracerProvider, includeD attribute.String("probing_error", key.probingError))) span.End() } - if !includeDeviceID { - // In order to prevent associating origins with device IDs, only report - // origin stats if we're not including device IDs. - for key, value := range originStats { - _, span := tp.Tracer(""). - Start( - context.Background(), - "origin_bytes", - trace.WithAttributes( - attribute.Int("origin_bytes_sent", value.sent), - attribute.Int("origin_bytes_recv", value.recv), - attribute.Int("origin_bytes_total", value.sent+value.recv), - attribute.String("origin", key.origin), - attribute.String("client_platform", key.platform), - attribute.String("client_version", key.version), - attribute.String("client_country", key.country))) - span.End() - } +} + +func (ins *defaultInstrument) ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) { + for { + // We randomize the sleep time to avoid bursty submission to OpenTelemetry. + // Even though each proxy sends relatively little data, proxies often run fairly + // closely synchronized since they all update to a new binary and restart around the same + // time. By randomizing each proxy's interval, we smooth out the pattern of submissions. + sleepInterval := rand.Int63n(int64(interval * 2)) + time.Sleep(time.Duration(sleepInterval)) + ins.ReportOriginBytes(tp) + } +} + +func (ins *defaultInstrument) ReportOriginBytes(tp *sdktrace.TracerProvider) { + ins.statsMx.Lock() + originStats := ins.originStats + ins.originStats = make(map[originDetails]*usage) + ins.statsMx.Unlock() + + for key, value := range originStats { + _, span := tp.Tracer(""). + Start( + context.Background(), + "origin_bytes", + trace.WithAttributes( + attribute.Int("origin_bytes_sent", value.sent), + attribute.Int("origin_bytes_recv", value.recv), + attribute.Int("origin_bytes_total", value.sent+value.recv), + attribute.String("origin", key.origin), + attribute.String("client_platform", key.platform), + attribute.String("client_version", key.version), + attribute.String("client_country", key.country))) + span.End() } } diff --git a/otel/otel.go b/otel/otel.go index cf6eeb61..bc44abb1 100644 --- a/otel/otel.go +++ b/otel/otel.go @@ -29,19 +29,17 @@ var ( ) type Opts struct { - Endpoint string - Headers map[string]string - SampleRate int - ProxyName string - Track string - Provider string - DC string - FrontendProvider string - FrontendDC string - ProxyProtocol string - Addr string - IsPro bool - IncludeProxyIdentity bool + Endpoint string + Headers map[string]string + ProxyName string + Track string + Provider string + DC string + FrontendProvider string + FrontendDC string + ProxyProtocol string + Addr string + IsPro bool } func (opts *Opts) buildResource() *resource.Resource { @@ -115,7 +113,6 @@ func BuildTracerProvider(opts *Opts) (*sdktrace.TracerProvider, func()) { sdktrace.WithBlocking(), // it's okay to use blocking mode right now because we're just submitting bandwidth data in a goroutine that doesn't block real work ), sdktrace.WithResource(opts.buildResource()), - sdktrace.WithSampler(sdktrace.ParentBased(newDeterministicSampler(opts.SampleRate))), ) stop := func() {