Skip to content

Commit

Permalink
Reenable reporting of origin_bytes to Teleport
Browse files Browse the repository at this point in the history
  • Loading branch information
oxtoacart committed Oct 20, 2023
1 parent 895b616 commit 37a8238
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 118 deletions.
67 changes: 24 additions & 43 deletions http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error {
OKDoesNotWaitForUpstream: !p.ConnectOKWaitsForUpstream,
OnError: instrumentedErrorHandler,
})
stopTraces := p.configureTraces()
defer stopTraces()
stopProxiedBytes := p.configureTeleportProxiedBytes()
defer stopProxiedBytes()

stopTeleport := p.configureTeleport()
defer stopTeleport()
stopOriginBytes := p.configureTeleportOriginBytes()
defer stopOriginBytes()

stopMetrics, err := p.configureOTELMetrics()
if err != nil {
Expand Down Expand Up @@ -671,58 +671,39 @@ func (p *Proxy) createFilterChain(bl *blacklist.Blacklist) (filters.Chain, proxy
}, nil
}

func (p *Proxy) configureTraces() func() {
if true {
log.Debug("Tracing currently disabled until we figure out how to handle traces in the centralized OTEL collector")
func (p *Proxy) configureTeleportProxiedBytes() func() {
if p.TeleportSampleRate <= 0 {
log.Debug("Not configuring Teleport proxied bytes")
return func() {}
}

if p.TracesSampleRate <= 0 {
log.Debug("Not configuring tracing")
return func() {}
log.Debug("Configuring Teleport proxied bytes")
tp, stop := otel.BuildTracerProvider(p.buildOTELOpts(teleportHost, true))
if tp != nil {
go p.instrument.ReportProxiedBytesPeriodically(1*time.Hour, tp)
ogStop := stop
stop = func() {
p.instrument.ReportProxiedBytes(tp)
ogStop()
}
}

log.Debug("Configuring tracing")
return p.configureOTEL(
teleportHost,
p.TracesSampleRate,
1*time.Minute,
false,
true,
)
return stop
}

func (p *Proxy) configureTeleport() func() {
func (p *Proxy) configureTeleportOriginBytes() func() {
if p.TeleportSampleRate <= 0 {
log.Debug("Not configuring Teleport")
log.Debug("Not configuring Teleport origin bytes")
return func() {}
}

log.Debug("Configuring Teleport")
return p.configureOTEL(
teleportHost,
p.TeleportSampleRate,
1*time.Hour,
true,
true,
)
}

func (p *Proxy) configureOTEL(
endpoint string,
sampleRate int,
reportingInterval time.Duration,
includeDeviceIDs bool,
includeProxyIdentity bool,
) func() {
opts := p.buildOTELOpts(endpoint, includeProxyIdentity)
opts.SampleRate = sampleRate
tp, stop := otel.BuildTracerProvider(opts)
log.Debug("Configuring Teleport origin bytes")
// Note - we do not include the proxy name here to avoid associating origin site usage with devices on that proxy name
tp, stop := otel.BuildTracerProvider(p.buildOTELOpts(teleportHost, false))
if tp != nil {
go p.instrument.ReportToOTELPeriodically(reportingInterval, tp, includeDeviceIDs)
go p.instrument.ReportOriginBytesPeriodically(1*time.Hour, tp)
ogStop := stop
stop = func() {
p.instrument.ReportToOTEL(tp, includeDeviceIDs)
p.instrument.ReportOriginBytes(tp)
ogStop()
}
}
Expand Down
123 changes: 62 additions & 61 deletions instrument/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ type Instrument interface {
SuspectedProbing(ctx context.Context, fromIP net.IP, reason string)
VersionCheck(ctx context.Context, redirect bool, method, reason string)
ProxiedBytes(ctx context.Context, sent, recv int, platform, version, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost string)
ReportToOTELPeriodically(interval time.Duration, tp *sdktrace.TracerProvider, includeDeviceID bool)
ReportToOTEL(tp *sdktrace.TracerProvider, includeDeviceID bool)
ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
ReportProxiedBytes(tp *sdktrace.TracerProvider)
ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
ReportOriginBytes(tp *sdktrace.TracerProvider)
quicSentPacket(ctx context.Context)
quicLostPacket(ctx context.Context)
}
Expand Down Expand Up @@ -71,12 +73,14 @@ func (i NoInstrument) SuspectedProbing(ctx context.Context, fromIP net.IP, reaso
func (i NoInstrument) VersionCheck(ctx context.Context, redirect bool, method, reason string) {}
func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform, version, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost string) {
}
func (i NoInstrument) ReportToOTELPeriodically(interval time.Duration, tp *sdktrace.TracerProvider, includeDeviceID bool) {
func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) ReportToOTEL(tp *sdktrace.TracerProvider, includeDeviceID bool) {
func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {}
func (i NoInstrument) ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) quicSentPacket(ctx context.Context) {}
func (i NoInstrument) quicLostPacket(ctx context.Context) {}
func (i NoInstrument) ReportOriginBytes(tp *sdktrace.TracerProvider) {}
func (i NoInstrument) quicSentPacket(ctx context.Context) {}
func (i NoInstrument) quicLostPacket(ctx context.Context) {}

// CommonLabels defines a set of common labels apply to all metrics instrumented.
type CommonLabels struct {
Expand All @@ -90,14 +94,13 @@ type CommonLabels struct {
// defaultInstrument is an implementation of Instrument which exports metrics
// via open telemetry.
type defaultInstrument struct {
countryLookup geo.CountryLookup
ispLookup geo.ISPLookup
filters map[string]filters.Filter
errorHandlers map[string]func(conn net.Conn, err error)
clientStats map[clientDetails]*usage
clientStatsWithDeviceID map[clientDetails]*usage
originStats map[originDetails]*usage
statsMx sync.Mutex
countryLookup geo.CountryLookup
ispLookup geo.ISPLookup
filters map[string]filters.Filter
errorHandlers map[string]func(conn net.Conn, err error)
clientStats map[clientDetails]*usage
originStats map[originDetails]*usage
statsMx sync.Mutex
}

func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup) (*defaultInstrument, error) {
Expand All @@ -106,13 +109,12 @@ func NewDefault(countryLookup geo.CountryLookup, ispLookup geo.ISPLookup) (*defa
}

p := &defaultInstrument{
countryLookup: countryLookup,
ispLookup: ispLookup,
filters: make(map[string]filters.Filter),
errorHandlers: make(map[string]func(conn net.Conn, err error)),
clientStats: make(map[clientDetails]*usage),
clientStatsWithDeviceID: make(map[clientDetails]*usage),
originStats: make(map[originDetails]*usage),
countryLookup: countryLookup,
ispLookup: ispLookup,
filters: make(map[string]filters.Filter),
errorHandlers: make(map[string]func(conn net.Conn, err error)),
clientStats: make(map[clientDetails]*usage),
originStats: make(map[originDetails]*usage),
}

return p, nil
Expand Down Expand Up @@ -271,15 +273,6 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int,
)

clientKey := clientDetails{
platform: platform,
version: version,
locale: locale,
country: country,
isp: isp,
asn: asn,
probingError: probingError,
}
clientKeyWithDeviceID := clientDetails{
deviceID: deviceID,
platform: platform,
version: version,
Expand Down Expand Up @@ -308,7 +301,6 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int,

ins.statsMx.Lock()
ins.clientStats[clientKey] = ins.clientStats[clientKey].add(sent, recv)
ins.clientStatsWithDeviceID[clientKeyWithDeviceID] = ins.clientStatsWithDeviceID[clientKeyWithDeviceID].add(sent, recv)
if hasOriginKey {
ins.originStats[originKey] = ins.originStats[originKey].add(sent, recv)
}
Expand Down Expand Up @@ -395,30 +387,23 @@ func (u *usage) add(sent int, recv int) *usage {
return u
}

func (ins *defaultInstrument) ReportToOTELPeriodically(interval time.Duration, tp *sdktrace.TracerProvider, includeDeviceID bool) {
func (ins *defaultInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
for {
// We randomize the sleep time to avoid bursty submission to OpenTelemetry.
// Even though each proxy sends relatively little data, proxies often run fairly
// closely synchronized since they all update to a new binary and restart around the same
// time. By randomizing each proxy's interval, we smooth out the pattern of submissions.
sleepInterval := rand.Int63n(int64(interval * 2))
time.Sleep(time.Duration(sleepInterval))
ins.ReportToOTEL(tp, includeDeviceID)
ins.ReportProxiedBytes(tp)
}
}

func (ins *defaultInstrument) ReportToOTEL(tp *sdktrace.TracerProvider, includeDeviceID bool) {
func (ins *defaultInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {
var clientStats map[clientDetails]*usage
ins.statsMx.Lock()
if includeDeviceID {
clientStats = ins.clientStatsWithDeviceID
ins.clientStatsWithDeviceID = make(map[clientDetails]*usage)
} else {
clientStats = ins.clientStats
ins.clientStats = make(map[clientDetails]*usage)
}
originStats := ins.originStats
ins.originStats = make(map[originDetails]*usage)
clientStats = ins.clientStats
ins.clientStats = make(map[clientDetails]*usage)
ins.statsMx.Unlock()

for key, value := range clientStats {
Expand All @@ -440,24 +425,40 @@ func (ins *defaultInstrument) ReportToOTEL(tp *sdktrace.TracerProvider, includeD
attribute.String("probing_error", key.probingError)))
span.End()
}
if !includeDeviceID {
// In order to prevent associating origins with device IDs, only report
// origin stats if we're not including device IDs.
for key, value := range originStats {
_, span := tp.Tracer("").
Start(
context.Background(),
"origin_bytes",
trace.WithAttributes(
attribute.Int("origin_bytes_sent", value.sent),
attribute.Int("origin_bytes_recv", value.recv),
attribute.Int("origin_bytes_total", value.sent+value.recv),
attribute.String("origin", key.origin),
attribute.String("client_platform", key.platform),
attribute.String("client_version", key.version),
attribute.String("client_country", key.country)))
span.End()
}
}

func (ins *defaultInstrument) ReportOriginBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
for {
// We randomize the sleep time to avoid bursty submission to OpenTelemetry.
// Even though each proxy sends relatively little data, proxies often run fairly
// closely synchronized since they all update to a new binary and restart around the same
// time. By randomizing each proxy's interval, we smooth out the pattern of submissions.
sleepInterval := rand.Int63n(int64(interval * 2))
time.Sleep(time.Duration(sleepInterval))
ins.ReportOriginBytes(tp)
}
}

func (ins *defaultInstrument) ReportOriginBytes(tp *sdktrace.TracerProvider) {
ins.statsMx.Lock()
originStats := ins.originStats
ins.originStats = make(map[originDetails]*usage)
ins.statsMx.Unlock()

for key, value := range originStats {
_, span := tp.Tracer("").
Start(
context.Background(),
"origin_bytes",
trace.WithAttributes(
attribute.Int("origin_bytes_sent", value.sent),
attribute.Int("origin_bytes_recv", value.recv),
attribute.Int("origin_bytes_total", value.sent+value.recv),
attribute.String("origin", key.origin),
attribute.String("client_platform", key.platform),
attribute.String("client_version", key.version),
attribute.String("client_country", key.country)))
span.End()
}
}

Expand Down
25 changes: 11 additions & 14 deletions otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,17 @@ var (
)

type Opts struct {
Endpoint string
Headers map[string]string
SampleRate int
ProxyName string
Track string
Provider string
DC string
FrontendProvider string
FrontendDC string
ProxyProtocol string
Addr string
IsPro bool
IncludeProxyIdentity bool
Endpoint string
Headers map[string]string
ProxyName string
Track string
Provider string
DC string
FrontendProvider string
FrontendDC string
ProxyProtocol string
Addr string
IsPro bool
}

func (opts *Opts) buildResource() *resource.Resource {
Expand Down Expand Up @@ -115,7 +113,6 @@ func BuildTracerProvider(opts *Opts) (*sdktrace.TracerProvider, func()) {
sdktrace.WithBlocking(), // it's okay to use blocking mode right now because we're just submitting bandwidth data in a goroutine that doesn't block real work
),
sdktrace.WithResource(opts.buildResource()),
sdktrace.WithSampler(sdktrace.ParentBased(newDeterministicSampler(opts.SampleRate))),
)

stop := func() {
Expand Down

0 comments on commit 37a8238

Please sign in to comment.