Skip to content

Commit

Permalink
various cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
swi-jared committed May 3, 2024
1 parent e5d2a95 commit fd7438b
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 253 deletions.
211 changes: 14 additions & 197 deletions internal/oboe/oboe.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ const (
type Oboe interface {
UpdateSetting(sType int32, layer string, flags []byte, value int64, ttl int64, args map[string][]byte)
CheckSettingsTimeout()
GetSetting() (*oboeSettings, bool)
GetSetting() (*settings, bool)
RemoveSetting()
HasDefaultSetting() bool
OboeSampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision
SampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision
FlushRateCounts() map[string]*metrics.RateCounts
}

func NewOboe() Oboe {
return &oboe{
cfg: &oboeSettingsCfg{
settings: make(map[oboeSettingKey]*oboeSettings),
settings: make(map[settingKey]*settings),
},
}
}
Expand All @@ -72,7 +72,7 @@ var _ Oboe = &oboe{}

// Current settings configuration
type oboeSettingsCfg struct {
settings map[oboeSettingKey]*oboeSettings
settings map[settingKey]*settings
lock sync.RWMutex
}

Expand All @@ -90,37 +90,6 @@ func (o *oboe) FlushRateCounts() map[string]*metrics.RateCounts {
return rcs
}

type oboeSettings struct {
timestamp time.Time
// the flags which may be modified through merging local settings.
flags settingFlag
// the original flags retrieved from the remote collector.
originalFlags settingFlag
// The sample rate. It could be the original value got from remote server
// or a new value after negotiating with local config
value int
// The sample source after negotiating with local config
source SampleSource
ttl int64
layer string
TriggerToken []byte
bucket *tokenBucket
triggerTraceRelaxedBucket *tokenBucket
triggerTraceStrictBucket *tokenBucket
}

func (s *oboeSettings) hasOverrideFlag() bool {
return s.originalFlags&FLAG_OVERRIDE != 0
}

func newOboeSettings() *oboeSettings {
return &oboeSettings{
bucket: &tokenBucket{},
triggerTraceRelaxedBucket: &tokenBucket{},
triggerTraceStrictBucket: &tokenBucket{},
}
}

// token bucket
type tokenBucket struct {
ratePerSec float64
Expand All @@ -142,30 +111,6 @@ func (b *tokenBucket) setRateCap(rate, cap float64) {
}
}

// The identifying keys for a setting
type oboeSettingKey struct {
sType settingType
layer string
}

// Global configuration settings
//var globalSettingsCfg = &oboeSettingsCfg{
// settings: make(map[oboeSettingKey]*oboeSettings),
//}

// The global token bucket. Trace decisions of all the requests are controlled
// by this single bucket.
//
// The rate and capacity will be initialized by the values fetched from the remote
// server, therefore it's initialized with only the default values.
//var globalTokenBucket = &tokenBucket{}

// The token bucket exclusively for trigger trace from authenticated clients
//var triggerTraceRelaxedBucket = &tokenBucket{}

// The token bucket exclusively for trigger trace from unauthenticated clients
//var triggerTraceStrictBucket = &tokenBucket{}

func (b *tokenBucket) count(sampled, hasMetadata, rateLimit bool) bool {
b.RequestedInc()

Expand Down Expand Up @@ -326,27 +271,17 @@ func (tm TriggerTraceMode) Requested() bool {
}
}

func (o *oboe) OboeSampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision {
// TODO: ick!
//if usingTestReporter {
// if r, ok := globalReporter.(*TestReporter); ok {
// if !r.UseSettings {
// return SampleDecision{r.ShouldTrace, 0, SAMPLE_SOURCE_NONE, true, ttEmpty, 0, 0, false} // trace tests
// }
// }
//}

var setting *oboeSettings
var ok bool
func (o *oboe) SampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision {
diceRolled := false
if setting, ok = o.GetSetting(); !ok {
setting, ok := o.GetSetting()
if !ok {
return SampleDecision{false, 0, SAMPLE_SOURCE_NONE, false, ttSettingsNotAvailable, 0, 0, diceRolled}
}

retval := false
doRateLimiting := false

sampleRate, flags, source := mergeURLSetting(setting, url)
sampleRate, flags, source := setting.mergeURLSetting(url)

// Choose an appropriate bucket
bucket := setting.bucket
Expand Down Expand Up @@ -375,7 +310,7 @@ func (o *oboe) OboeSampleRequest(continued bool, url string, triggerTrace Trigge
rsp = ttTriggerTracingDisabled
}
}
ttCap, ttRate := getTokenBucketSetting(setting, triggerTrace)
ttCap, ttRate := setting.getTokenBucketSetting(triggerTrace)
return SampleDecision{ret, -1, SAMPLE_SOURCE_UNSET, flags.Enabled(), rsp, ttRate, ttCap, diceRolled}
}

Expand Down Expand Up @@ -418,7 +353,7 @@ func (o *oboe) OboeSampleRequest(continued bool, url string, triggerTrace Trigge
if unsetBucketAndSampleKVs {
bucketCap, bucketRate, sampleRate, source = -1, -1, -1, SAMPLE_SOURCE_UNSET
} else {
bucketCap, bucketRate = getTokenBucketSetting(setting, ModeTriggerTraceNotPresent)
bucketCap, bucketRate = setting.getTokenBucketSetting(ModeTriggerTraceNotPresent)
}

return SampleDecision{
Expand All @@ -433,24 +368,6 @@ func (o *oboe) OboeSampleRequest(continued bool, url string, triggerTrace Trigge
}
}

func getTokenBucketSetting(setting *oboeSettings, ttMode TriggerTraceMode) (capacity float64, rate float64) {
var bucket *tokenBucket

switch ttMode {
case ModeRelaxedTriggerTrace:
bucket = setting.triggerTraceRelaxedBucket
case ModeStrictTriggerTrace:
bucket = setting.triggerTraceStrictBucket
case ModeTriggerTraceNotPresent, ModeInvalidTriggerTrace:
bucket = setting.bucket
default:
log.Warningf("Could not determine token bucket setting for invalid TriggerTraceMode: %#v", ttMode)
return 0, 0
}

return bucket.capacity, bucket.ratePerSec
}

func bytesToFloat64(b []byte) (float64, error) {
if len(b) != 8 {
return -1, fmt.Errorf("invalid length: %d", len(b))
Expand Down Expand Up @@ -493,53 +410,6 @@ func ParseInt32(args map[string][]byte, key string, fb int32) int32 {
return ret
}

// mergeLocalSetting follow the predefined precedence to decide which one to
// pick from: either the local configs or the remote ones, or the combination.
//
// Note: This function modifies the argument in place.
func mergeLocalSetting(remote *oboeSettings) *oboeSettings {
if remote.hasOverrideFlag() && config.SamplingConfigured() {
// Choose the lower sample rate and merge the flags
if remote.value > config.GetSampleRate() {
remote.value = config.GetSampleRate()
remote.source = SAMPLE_SOURCE_FILE
}
remote.flags &= NewTracingMode(config.GetTracingMode()).toFlags()
} else if config.SamplingConfigured() {
// Use local sample rate and tracing mode config
remote.value = config.GetSampleRate()
remote.flags = NewTracingMode(config.GetTracingMode()).toFlags()
remote.source = SAMPLE_SOURCE_FILE
}

if !config.GetTriggerTrace() {
remote.flags = remote.flags &^ (1 << FlagTriggerTraceOffset)
}
return remote
}

// mergeURLSetting merges the service level setting (merged from remote and local
// settings) and the per-URL sampling flags, if any.
func mergeURLSetting(setting *oboeSettings, url string) (int, settingFlag, SampleSource) {
if url == "" {
return setting.value, setting.flags, setting.source
}

urlTracingMode := urls.GetTracingMode(url)
if urlTracingMode.isUnknown() {
return setting.value, setting.flags, setting.source
}

flags := urlTracingMode.toFlags()
source := SAMPLE_SOURCE_FILE

if setting.hasOverrideFlag() {
flags &= setting.originalFlags
}

return setting.value, flags, source
}

func adjustSampleRate(rate int64) int {
if rate < 0 {
log.Debugf("Invalid sample rate: %d", rate)
Expand Down Expand Up @@ -580,7 +450,7 @@ func (o *oboe) UpdateSetting(sType int32, layer string, flags []byte, value int6

merged := mergeLocalSetting(ns)

key := oboeSettingKey{
key := settingKey{
sType: settingType(sType),
layer: layer,
}
Expand Down Expand Up @@ -608,12 +478,12 @@ func (sc *oboeSettingsCfg) checkSettingsTimeout() {
}
}

func (o *oboe) GetSetting() (*oboeSettings, bool) {
func (o *oboe) GetSetting() (*settings, bool) {
o.cfg.lock.RLock()
defer o.cfg.lock.RUnlock()

// for now only look up the default settings
key := oboeSettingKey{
key := settingKey{
sType: TYPE_DEFAULT,
layer: "",
}
Expand All @@ -628,7 +498,7 @@ func (o *oboe) RemoveSetting() {
o.cfg.lock.Lock()
defer o.cfg.lock.Unlock()

key := oboeSettingKey{
key := settingKey{
sType: TYPE_DEFAULT,
layer: "",
}
Expand Down Expand Up @@ -714,56 +584,3 @@ func (tm TracingMode) ToString() string {
return string(config.UnknownTracingMode)
}
}

type settingType int
type settingFlag uint16

// setting types
const (
TYPE_DEFAULT settingType = iota // default setting which serves as a fallback if no other settings found
TYPE_LAYER // layer specific settings
)

// setting flags offset
const (
FlagInvalidOffset = iota
FlagOverrideOffset
FlagSampleStartOffset
FlagSampleThroughOffset
FlagSampleThroughAlwaysOffset
FlagTriggerTraceOffset
)

// setting flags
const (
FLAG_OK settingFlag = 0x0
FLAG_INVALID settingFlag = 1 << FlagInvalidOffset
FLAG_OVERRIDE settingFlag = 1 << FlagOverrideOffset
FLAG_SAMPLE_START settingFlag = 1 << FlagSampleStartOffset
FLAG_SAMPLE_THROUGH settingFlag = 1 << FlagSampleThroughOffset
FLAG_SAMPLE_THROUGH_ALWAYS settingFlag = 1 << FlagSampleThroughAlwaysOffset
FLAG_TRIGGER_TRACE settingFlag = 1 << FlagTriggerTraceOffset
)

// Enabled returns if the trace is enabled or not.
func (f settingFlag) Enabled() bool {
return f&(FLAG_SAMPLE_START|FLAG_SAMPLE_THROUGH_ALWAYS) != 0
}

// TriggerTraceEnabled returns if the trigger trace is enabled
func (f settingFlag) TriggerTraceEnabled() bool {
return f&FLAG_TRIGGER_TRACE != 0
}

func (st settingType) toSampleSource() SampleSource {
var source SampleSource
switch st {
case TYPE_DEFAULT:
source = SAMPLE_SOURCE_DEFAULT
case TYPE_LAYER:
source = SAMPLE_SOURCE_LAYER
default:
source = SAMPLE_SOURCE_NONE
}
return source
}
Loading

0 comments on commit fd7438b

Please sign in to comment.