Skip to content

Commit

Permalink
APIGOV-29555 timebox handling
Browse files Browse the repository at this point in the history
  • Loading branch information
alrosca committed Feb 26, 2025
1 parent 6481aeb commit 40832da
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 256 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/Axway/agent-sdk/pkg/cache"
"github.com/Axway/agent-sdk/pkg/config"
"github.com/Axway/agent-sdk/pkg/customunit"
"github.com/Axway/agent-sdk/pkg/traceability/sampling"
"github.com/Axway/agent-sdk/pkg/util"
"github.com/Axway/agent-sdk/pkg/util/errors"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
Expand Down Expand Up @@ -764,7 +765,7 @@ func newHandlers() []handler.Handler {
handlers := []handler.Handler{
handler.NewAPISvcHandler(agent.cacheManager, envName),
handler.NewInstanceHandler(agent.cacheManager, envName),
handler.NewAgentResourceHandler(agent.agentResourceManager),
handler.NewAgentResourceHandler(agent.agentResourceManager, sampling.GetGlobalSampling()),
handler.NewWatchResourceHandler(agent.cacheManager, agent.cfg),
agent.proxyResourceHandler,
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/agent/handler/agentresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package handler

import (
"context"
"time"

"github.com/Axway/agent-sdk/pkg/agent/resource"
v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1"
"github.com/Axway/agent-sdk/pkg/watchmanager/proto"
)

Expand All @@ -14,14 +16,20 @@ const (
governanceAgent = "GovernanceAgent"
)

type sampling interface {
EnableSampling(samplingLimit int32, samplingEndTime time.Time)
}

type agentResourceHandler struct {
agentResourceManager resource.Manager
sampler sampling
}

// NewAgentResourceHandler - creates a Handler for Agent resources
func NewAgentResourceHandler(agentResourceManager resource.Manager) Handler {
func NewAgentResourceHandler(agentResourceManager resource.Manager, sampler sampling) Handler {
return &agentResourceHandler{
agentResourceManager: agentResourceManager,
sampler: sampler,
}
}

Expand All @@ -38,5 +46,16 @@ func (h *agentResourceHandler) Handle(ctx context.Context, _ *proto.EventMeta, r
h.agentResourceManager.SetAgentResource(resource)
}
}

if action == proto.Event_SUBRESOURCEUPDATED && resource.Kind == traceabilityAgent {
ta := &management.TraceabilityAgent{}
err := ta.FromInstance(resource)
if err != nil {
return err
}
if ta.Agentstate.Sampling.Enabled {
h.sampler.EnableSampling(ta.Agentstate.Sampling.Limit, time.Time(ta.Agentstate.Sampling.EndTime))
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/agent/handler/agentresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestAgentResourceHandler(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
resourceManager := &mockResourceManager{}

handler := NewAgentResourceHandler(resourceManager)
handler := NewAgentResourceHandler(resourceManager, nil)

err := handler.Handle(NewEventContext(tc.action, nil, tc.resource.Kind, tc.resource.Name), nil, tc.resource)
if tc.hasError {
Expand Down
4 changes: 2 additions & 2 deletions pkg/traceability/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ func readConfig(cfg *common.Config, info beat.Info) (*Config, error) {

// Setup the sampling config, if central config can not be found assume online mode
if agent.GetCentralConfig() != nil && agent.GetCentralConfig().GetUsageReportingConfig() != nil {
err = sampling.SetupSampling(outputConfig.Sampling, agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode())
err = sampling.SetupSampling(outputConfig.Sampling, agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode(), agent.GetCentralConfig().GetAPICDeployment())
} else {
err = sampling.SetupSampling(outputConfig.Sampling, false)
err = sampling.SetupSampling(outputConfig.Sampling, false, agent.GetCentralConfig().GetAPICDeployment())
}

if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/traceability/sampling/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package sampling
// SampleKey - the key used in the metadata when a transaction qualifies for sampling and should be sent to Observer
// defaultSamplingRate - the default sampling rate in percentage
const (
SampleKey = "sample"
countMax = 100
defaultSamplingRate = 0
maximumSamplingRate = 10
globalCounter = "global"
SampleKey = "sample"
countMax = 100
defaultSamplingRate = 0
defaultSamplingLimit = 0
maximumSamplingRate = 10
globalCounter = "global"
)

// TransactionDetails - details about the transaction that are used for sampling
Expand Down
26 changes: 16 additions & 10 deletions pkg/traceability/sampling/globalsampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/Axway/agent-sdk/pkg/agent"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/shopspring/decimal"
Expand All @@ -28,6 +28,9 @@ type Sampling struct {
OnlyErrors bool `config:"onlyErrors" yaml:"onlyErrors"`
countMax int
shouldSampleMax int
enabled bool
endTime time.Time
limit int32
}

// DefaultConfig - returns a default sampling config where all transactions are sent
Expand All @@ -48,13 +51,13 @@ func GetGlobalSamplingPercentage() (float64, error) {
}

// GetGlobalSampling -
func GetGlobalSampling() Sampling {
return agentSamples.config
func GetGlobalSampling() *sample {
return agentSamples
}

func getSamplingPercentageConfig(percentage float64) (float64, error) {
func getSamplingPercentageConfig(percentage float64, apicDeployment string) (float64, error) {
maxAllowedSampling := float64(maximumSamplingRate)
if !strings.HasPrefix(agent.GetCentralConfig().GetAPICDeployment(), "prod") {
if !strings.HasPrefix(apicDeployment, "prod") {
if val := os.Getenv(qaSamplingPercentageEnvVar); val != "" {
if qaSamplingPercentage, err := strconv.ParseFloat(val, 64); err == nil {
log.Tracef("Using %s (%s) rather than the default (%d) for non-production", qaSamplingPercentageEnvVar, val, defaultSamplingRate)
Expand All @@ -75,7 +78,7 @@ func getSamplingPercentageConfig(percentage float64) (float64, error) {
}

// SetupSampling - set up the global sampling for use by traceability
func SetupSampling(cfg Sampling, offlineMode bool) error {
func SetupSampling(cfg Sampling, offlineMode bool, apicDeployment string) error {
var err error

if offlineMode {
Expand All @@ -86,15 +89,18 @@ func SetupSampling(cfg Sampling, offlineMode bool) error {
OnlyErrors: false,
}
} else {
cfg.Percentage, err = getSamplingPercentageConfig(cfg.Percentage)
cfg.Percentage, err = getSamplingPercentageConfig(cfg.Percentage, apicDeployment)
cfg.countMax = int(100 * math.Pow(10, float64(numberOfDecimals(cfg.Percentage))))
cfg.shouldSampleMax = int(float64(cfg.countMax) * cfg.Percentage / 100)
}

agentSamples = &sample{
config: cfg,
currentCounts: make(map[string]int),
counterLock: sync.Mutex{},
config: cfg,
currentCounts: make(map[string]int),
counterLock: sync.Mutex{},
samplingCounter: 0,
counterResetPeriod: time.Minute,
counterResetStopCh: make(chan struct{}),
}

if err != nil {
Expand Down
102 changes: 64 additions & 38 deletions pkg/traceability/sampling/sample.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,93 @@
package sampling

import (
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/publisher"
)

// sample - private struct that is used to keep track of the samples being taken
type sample struct {
config Sampling
currentCounts map[string]int
counterLock sync.Mutex
config Sampling
currentCounts map[string]int
counterLock sync.Mutex
samplingCounter int32
counterResetPeriod time.Duration
counterResetStopCh chan struct{}
}

// ShouldSampleTransaction - receives the transaction details and returns true to sample it false to not
func (s *sample) ShouldSampleTransaction(details TransactionDetails) bool {
hasFailedStatus := details.Status == "Failure"
// sample only failed transaction if OnlyErrors is set to `true` and the transaction summary's status is an error
if !hasFailedStatus && s.config.OnlyErrors {
return false
}
func (s *sample) EnableSampling(samplingLimit int32, samplingEndTime time.Time) {
s.config.enabled = true
s.config.endTime = samplingEndTime
s.config.limit = samplingLimit

sampleGlobal := s.shouldSampleWithCounter(globalCounter)
perAPIEnabled := s.config.PerAPI && details.APIID != ""
s.resetSamplingCounter()

if s.config.PerSub && details.SubID != "" {
apiSamp := false
if perAPIEnabled {
apiSamp = s.shouldSampleWithCounter(details.APIID)
}
return s.shouldSampleWithCounter(fmt.Sprintf("%s-%s", details.APIID, details.SubID)) || apiSamp
}
// start limit reset job; limit is reset every minute
go s.samplingCounterReset()

if perAPIEnabled {
return s.shouldSampleWithCounter(details.APIID)
}
// disable sampling at endTime
go s.disableSampling()
}

func (s *sample) disableSampling() {
disableTimer := time.NewTimer(time.Until(s.config.endTime))
<-disableTimer.C

return sampleGlobal
s.config.enabled = false

// stop limit reset job when sampling is disabled
s.counterResetStopCh <- struct{}{}
}

func (s *sample) shouldSampleWithCounter(counterName string) bool {
func (s *sample) samplingCounterReset() {
nextLimiterPeriod := time.Now().Round(s.counterResetPeriod)
<-time.NewTimer(time.Until(nextLimiterPeriod)).C
s.resetSamplingCounter()

ticker := time.NewTicker(s.counterResetPeriod)

defer ticker.Stop()
for {
select {
case <-s.counterResetStopCh:
return
case <-ticker.C:
s.resetSamplingCounter()
}
}
}

func (s *sample) resetSamplingCounter() {
s.counterLock.Lock()
defer s.counterLock.Unlock()
// check if counter needs initiated
if _, found := s.currentCounts[counterName]; !found {
s.currentCounts[counterName] = 0
s.samplingCounter = 0
}

// ShouldSampleTransaction - receives the transaction details and returns true to sample it false to not
func (s *sample) ShouldSampleTransaction(details TransactionDetails) bool {
// check if sampling is enabled
if !s.config.enabled {
return false
}

// Only sampling on percentage, not currently looking at the details
shouldSample := false
if s.currentCounts[counterName] < s.config.shouldSampleMax {
shouldSample = true
// sampling limit per minute exceeded
if s.config.limit <= s.samplingCounter {
return false
}
s.currentCounts[counterName]++

// reset the count once we hit 100 * 10^(nb_decimals) messages
if s.currentCounts[counterName] == s.config.countMax {
s.currentCounts[counterName] = 0
hasFailedStatus := details.Status == "Failure"
// sample only failed transaction if OnlyErrors is set to `true` and the transaction summary's status is an error
if !hasFailedStatus && s.config.OnlyErrors {
return false
}

// return if we should sample this transaction
return shouldSample
s.counterLock.Lock()
s.samplingCounter++
s.counterLock.Unlock()

return true
}

// FilterEvents - returns an array of events that are part of the sample
Expand Down
Loading

0 comments on commit 40832da

Please sign in to comment.