Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDKS-9085] Adding large segment healthcheck monitor #297

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.3.0
github.com/splitio/gincache v1.0.1
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241202144122-6b3a0c7817bc
github.com/splitio/go-toolkit/v5 v5.4.0
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.6
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU=
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68 h1:Nr48cVYJZCOQzfKPGPsYcHykzEa4M/ADPkzO+eo3GOI=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241128203315-8c123543a54e h1:d/bRCPlzszazKemFu7UOyzJJHc+Ren43u178vd1Do5c=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241128203315-8c123543a54e/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241202144122-6b3a0c7817bc h1:pud1qA/GUJxe2VS2E23wg2BjZ/5q7hSKto+wUt5ZNFA=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241202144122-6b3a0c7817bc/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
2 changes: 1 addition & 1 deletion splitio/producer/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error {

// Healcheck Monitor
splitsConfig, segmentsConfig, storageConfig := getAppCounterConfigs(storages.SplitStorage)
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, &storageConfig, logger)
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, nil, &storageConfig, logger)
servicesMonitor := hcServices.NewMonitorImp(getServicesCountersConfig(advanced), logger)

impressionsCounter := strategy.NewImpressionsCounter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ const (
Low
)

const (
// Splits counter type
Splits = iota
// Segments counter type
Segments
// Storage counter type
Storage
)

// HealthyResult description
type HealthyResult struct {
Name string
Expand Down
71 changes: 43 additions & 28 deletions splitio/provisional/healthcheck/application/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

hc "github.com/splitio/go-split-commons/v6/healthcheck/application"
"github.com/splitio/go-toolkit/v5/logging"
toolkitsync "github.com/splitio/go-toolkit/v5/sync"
"github.com/splitio/split-synchronizer/v5/splitio/provisional/healthcheck/application/counter"
Expand All @@ -21,13 +22,12 @@ type MonitorIterface interface {

// MonitorImp description
type MonitorImp struct {
splitsCounter counter.ThresholdCounterInterface
segmentsCounter counter.ThresholdCounterInterface
storageCounter counter.PeriodicCounterInterface
producerMode toolkitsync.AtomicBool
healthySince *time.Time
lock sync.RWMutex
logger logging.LoggerInterface
counters map[int]counter.ThresholdCounterInterface
storageCounter counter.PeriodicCounterInterface
producerMode toolkitsync.AtomicBool
healthySince *time.Time
lock sync.RWMutex
logger logging.LoggerInterface
}

// HealthDto struct
Expand Down Expand Up @@ -56,7 +56,7 @@ func (m *MonitorImp) getHealthySince(healthy bool) *time.Time {

func checkIfIsHealthy(result []ItemDto) bool {
for _, r := range result {
if r.Healthy == false && r.Severity == counter.Critical {
if !r.Healthy && r.Severity == counter.Critical {
return false
}
}
Expand All @@ -71,7 +71,10 @@ func (m *MonitorImp) GetHealthStatus() HealthDto {

var items []ItemDto
var results []counter.HealthyResult
results = append(results, m.splitsCounter.IsHealthy(), m.segmentsCounter.IsHealthy())

for _, mc := range m.counters {
results = append(results, mc.IsHealthy())
}

if m.producerMode.IsSet() {
results = append(results, m.storageCounter.IsHealthy())
Expand Down Expand Up @@ -104,12 +107,12 @@ func (m *MonitorImp) NotifyEvent(counterType int) {

m.logger.Debug(fmt.Sprintf("Notify Event. Type: %d.", counterType))

switch counterType {
case counter.Splits:
m.splitsCounter.NotifyHit()
case counter.Segments:
m.segmentsCounter.NotifyHit()
counter, ok := m.counters[counterType]
if !ok {
m.logger.Debug(fmt.Sprintf("wrong counterType: %d", counterType))
return
}
counter.NotifyHit()
}

// Reset counter value
Expand All @@ -119,21 +122,23 @@ func (m *MonitorImp) Reset(counterType int, value int) {

m.logger.Debug(fmt.Sprintf("Reset. Type: %d. Value: %d", counterType, value))

switch counterType {
case counter.Splits:
m.splitsCounter.ResetThreshold(value)
case counter.Segments:
m.segmentsCounter.ResetThreshold(value)
counter, ok := m.counters[counterType]
if !ok {
m.logger.Debug(fmt.Sprintf("wrong counterType: %d", counterType))
return
}
counter.ResetThreshold(value)
}

// Start counters
func (m *MonitorImp) Start() {
m.lock.Lock()
defer m.lock.Unlock()

m.splitsCounter.Start()
m.segmentsCounter.Start()
for _, counter := range m.counters {
counter.Start()
}

if m.producerMode.IsSet() {
m.storageCounter.Start()
}
Expand All @@ -146,8 +151,9 @@ func (m *MonitorImp) Stop() {
m.lock.Lock()
defer m.lock.Unlock()

m.splitsCounter.Stop()
m.segmentsCounter.Stop()
for _, counter := range m.counters {
counter.Stop()
}

if m.producerMode.IsSet() {
m.storageCounter.Stop()
Expand All @@ -158,16 +164,25 @@ func (m *MonitorImp) Stop() {
func NewMonitorImp(
splitsConfig counter.ThresholdConfig,
segmentsConfig counter.ThresholdConfig,
largeSegmentsConfig *counter.ThresholdConfig,
storageConfig *counter.PeriodicConfig,
logger logging.LoggerInterface,
) *MonitorImp {
now := time.Now()
splitsCounter := counter.NewThresholdCounter(splitsConfig, logger)
segmentsCounter := counter.NewThresholdCounter(segmentsConfig, logger)
monitor := &MonitorImp{
splitsCounter: counter.NewThresholdCounter(splitsConfig, logger),
segmentsCounter: counter.NewThresholdCounter(segmentsConfig, logger),
producerMode: *toolkitsync.NewAtomicBool(storageConfig != nil),
logger: logger,
healthySince: &now,
counters: map[int]counter.ThresholdCounterInterface{},
producerMode: *toolkitsync.NewAtomicBool(storageConfig != nil),
logger: logger,
healthySince: &now,
}

monitor.counters[hc.Splits] = splitsCounter
monitor.counters[hc.Segments] = segmentsCounter

if largeSegmentsConfig != nil {
monitor.counters[hc.LargeSegments] = counter.NewThresholdCounter(*largeSegmentsConfig, logger)
}

if monitor.producerMode.IsSet() {
Expand Down
8 changes: 7 additions & 1 deletion splitio/provisional/healthcheck/application/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func TestMonitor(t *testing.T) {
Severity: counter.Critical,
}

lsCfg := counter.ThresholdConfig{
Name: "LargeSegments",
Period: 10,
Severity: counter.Critical,
}

storageCfg := counter.PeriodicConfig{
Name: "Storage",
Period: 10,
Expand All @@ -46,7 +52,7 @@ func TestMonitor(t *testing.T) {
},
}

monitor := NewMonitorImp(splitsCfg, segmentsCfg, &storageCfg, logging.NewLogger(nil))
monitor := NewMonitorImp(splitsCfg, segmentsCfg, &lsCfg, &storageCfg, logging.NewLogger(nil))

monitor.Start()

Expand Down
6 changes: 4 additions & 2 deletions splitio/proxy/conf/sections.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package conf

import (
cconf "github.com/splitio/go-split-commons/v6/conf"
"github.com/splitio/go-split-commons/v6/service/api/specs"
"github.com/splitio/split-synchronizer/v5/splitio/common/conf"
)

Expand All @@ -21,7 +22,6 @@ type Main struct {
Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"`
Observability Observability `json:"observability" s-nested:"true"`
FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.2" s-desc:"Spec version for flags"`
LargeSegmentVersion string `json:"largeSegmentVersion" s-cli:"largesegment-version" s-def:"1.0" s-desc:"Spec version for large segments"`
}

// BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters
Expand All @@ -34,6 +34,8 @@ func (m *Main) BuildAdvancedConfig() *cconf.AdvancedConfig {
tmp.SplitsRefreshRate = int(m.Sync.SplitRefreshRateMs / 1000)
tmp.SegmentsRefreshRate = int(m.Sync.SegmentRefreshRateMs / 1000)
tmp.LargeSegment.LazyLoad = m.Sync.Advanced.LargeSegmentLazyLoad
tmp.LargeSegment.RefreshRate = int(m.Sync.LargeSegmentRefreshRateMs / 1000)
tmp.LargeSegment.Version = specs.LARGESEGMENT_V10
return tmp
}

Expand Down Expand Up @@ -72,7 +74,7 @@ type Persistent struct {
type Sync struct {
SplitRefreshRateMs int64 `json:"splitRefreshRateMs" s-cli:"split-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh feature flags"`
SegmentRefreshRateMs int64 `json:"segmentRefreshRateMs" s-cli:"segment-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh segments"`
LargeSegmentRefreshRateMs int64 `json:"largeSegmentRefreshRateMs" s-cli:"largesegment-refresh-rate-ms" s-def:"3600000" s-desc:"How often to refresh large segments"`
LargeSegmentRefreshRateMs int64 `json:"largeSegmentRefreshRateMs" s-cli:"largesegment-refresh-rate-ms" s-def:"600000" s-desc:"How often to refresh large segments"`
Advanced AdvancedSync `json:"advanced" s-nested:"true"`
}

Expand Down
10 changes: 7 additions & 3 deletions splitio/proxy/controllers/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,14 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
return
}

spec, _ := ctx.GetQuery("s")
if spec != specs.FLAG_V1_1 {
spec = specs.FLAG_V1_0
sParam, _ := ctx.GetQuery("s")
spec, err := specs.ParseAndValidate(sParam)
if err != nil {
c.logger.Error(fmt.Sprintf("error getting split changes: %s.", err))
ctx.JSON(http.StatusBadRequest, gin.H{"code": 400, "message": err.Error()})
return
}

splits.Splits = c.patchUnsupportedMatchers(splits.Splits, spec)

ctx.JSON(http.StatusOK, splits)
Expand Down
11 changes: 6 additions & 5 deletions splitio/proxy/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
)

// Healcheck Monitor
splitsConfig, segmentsConfig := getAppCounterConfigs()
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, nil, logger)
splitsConfig, segmentsConfig, lsConfig := getAppCounterConfigs()
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, &lsConfig, nil, logger)
servicesMonitor := hcServices.NewMonitorImp(getServicesCountersConfig(*advanced), logger)

// Creating Workers and Tasks
Expand Down Expand Up @@ -137,7 +137,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
ImpressionSyncTask: impressionTask,
ImpressionsCountSyncTask: impressionCountTask,
EventSyncTask: eventsTask,
LargeSegmentSyncTask: tasks.NewFetchLargeSegmentsTask(workers.LargeSegmentUpdater, splitStorage, int(cfg.Sync.LargeSegmentRefreshRateMs/1000), advanced.LargeSegment.Workers, advanced.LargeSegment.QueueSize, logger),
LargeSegmentSyncTask: tasks.NewFetchLargeSegmentsTask(workers.LargeSegmentUpdater, splitStorage, advanced.LargeSegment.RefreshRate, advanced.LargeSegment.Workers, advanced.LargeSegment.QueueSize, logger, appMonitor),
}

// Creating Synchronizer for tasks
Expand Down Expand Up @@ -321,11 +321,12 @@ func startBGSyng(m synchronizer.Manager, mstatus chan int, haveSnapshot bool, on

}

func getAppCounterConfigs() (hcAppCounter.ThresholdConfig, hcAppCounter.ThresholdConfig) {
func getAppCounterConfigs() (hcAppCounter.ThresholdConfig, hcAppCounter.ThresholdConfig, hcAppCounter.ThresholdConfig) {
splitsConfig := hcAppCounter.DefaultThresholdConfig("Splits")
segmentsConfig := hcAppCounter.DefaultThresholdConfig("Segments")
LargeSegmentsConfig := hcAppCounter.DefaultThresholdConfig("LargeSegments")

return splitsConfig, segmentsConfig
return splitsConfig, segmentsConfig, LargeSegmentsConfig
}

func getServicesCountersConfig(advanced conf.AdvancedConfig) []hcServicesCounter.Config {
Expand Down