Skip to content

Commit

Permalink
fix: merge old pipeline data to new pipeline when flusher is unready (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc authored Feb 24, 2025
1 parent ae061ff commit 9a278cd
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
35 changes: 35 additions & 0 deletions pluginmanager/config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,38 @@ func (s *configUpdateTestSuite) TestStopAllExitTimeout() {
time.Sleep(time.Second * time.Duration(5))
s.Equal(0, checkFlusher.GetLogCount())
}

func (s *configUpdateTestSuite) TestUnreadyFlusher() {
// block config
LogtailConfigLock.RLock()
config := LogtailConfig[updateConfigName]
LogtailConfigLock.RUnlock()
s.NotNil(config, "%s logstrore config should exist", updateConfigName)
checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
s.True(ok)
mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock)
s.True(ok)
s.Equal(0, checkFlusher.GetLogCount(), "the block flusher checker doesn't have any logs")

// make flusher unready
checkFlusher.Ready = false
// unblock input
mockInput.Block = false
time.Sleep(time.Second * time.Duration(3))

// update config
s.NoError(Stop(updateConfigName, false))
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
_ = LoadAndStartMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
LogtailConfigLock.RLock()
config = LogtailConfig[updateConfigName]
LogtailConfigLock.RUnlock()
s.NotNil(config, "%s logstrore config should exist", updateConfigName)
checkFlusher, ok = GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
s.True(ok)
mockInput, ok = GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock)
s.True(ok)
mockInput.Block = false
time.Sleep(time.Second * time.Duration(3))
s.Equal(20000+10000, checkFlusher.GetLogCount(), "the new flusher checker should merge the old logs")
}
6 changes: 5 additions & 1 deletion pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if logstoreC.PluginRunner, err = initPluginRunner(logstoreC); err != nil {
return nil, err
}
if lastConfigRunner, hasLastConfig := LastUnsendBuffer[configName]; hasLastConfig {
// Move unsent LogGroups from last config to new config.
logstoreC.PluginRunner.Merge(lastConfigRunner)
}

logstoreC.ContainerLabelSet = make(map[string]struct{})
logstoreC.EnvSet = make(map[string]struct{})
Expand Down Expand Up @@ -652,7 +656,7 @@ func initPluginRunner(lc *LogstoreConfig) (PluginRunner, error) {
func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) error {
if len(jsonStr) == 0 {
logger.Info(context.Background(), "delete config", configName, "logstore", logstore)
DeleteLogstoreConfigFromLogtailConfig(configName)
DeleteLogstoreConfigFromLogtailConfig(configName, true)
return nil
}
logger.Info(context.Background(), "load config", configName, "logstore", logstore)
Expand Down
17 changes: 11 additions & 6 deletions pluginmanager/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var ContainerConfig *LogstoreConfig
var DisabledLogtailConfigLock sync.RWMutex
var DisabledLogtailConfig = make(map[*LogstoreConfig]struct{})

var LastUnsendBuffer = make(map[string]PluginRunner)

// Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).
var AlarmConfig *LogstoreConfig

Expand Down Expand Up @@ -131,7 +133,7 @@ func timeoutStop(config *LogstoreConfig, removedFlag bool) bool {
return
}
logger.Info(context.Background(), "Valid but slow stop config", config.ConfigName, "LogstoreConfig", addressStr)
DeleteLogstoreConfig(config)
DeleteLogstoreConfig(config, removedFlag)
delete(DisabledLogtailConfig, config)

DisabledLogtailConfigLock.Unlock()
Expand Down Expand Up @@ -175,7 +177,7 @@ func StopAllPipelines(withInput bool) error {
DisabledLogtailConfig[logstoreConfig] = struct{}{}
DisabledLogtailConfigLock.Unlock()
} else {
DeleteLogstoreConfig(logstoreConfig)
DeleteLogstoreConfig(logstoreConfig, true)
}
toDeleteConfigNames[configName] = struct{}{}
}
Expand All @@ -187,7 +189,7 @@ func StopAllPipelines(withInput bool) error {
return nil
}

func DeleteLogstoreConfig(config *LogstoreConfig) {
func DeleteLogstoreConfig(config *LogstoreConfig, removedFlag bool) {
if actualObject, ok := config.Context.(*ContextImp); ok {
actualObject.logstoreC = nil
}
Expand Down Expand Up @@ -227,13 +229,16 @@ func DeleteLogstoreConfig(config *LogstoreConfig) {
}
runner.LogstoreConfig = nil
}
if !removedFlag {
LastUnsendBuffer[config.ConfigName] = config.PluginRunner
}
config.PluginRunner = nil
}

func DeleteLogstoreConfigFromLogtailConfig(configName string) {
func DeleteLogstoreConfigFromLogtailConfig(configName string, removedFlag bool) {
LogtailConfigLock.Lock()
if config, ok := LogtailConfig[configName]; ok {
DeleteLogstoreConfig(config)
DeleteLogstoreConfig(config, removedFlag)
delete(LogtailConfig, configName)
}
LogtailConfigLock.Unlock()
Expand Down Expand Up @@ -282,7 +287,7 @@ func Stop(configName string, removedFlag bool) error {
} else {
logger.Info(config.Context.GetRuntimeContext(), "Stop config now", configName)
LogtailConfigLock.Lock()
DeleteLogstoreConfig(config)
DeleteLogstoreConfig(config, removedFlag)
delete(LogtailConfig, configName)
LogtailConfigLock.Unlock()
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/flusher/checker/flusher_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ type FlusherChecker struct {
context pipeline.Context
LogGroup protocol.LogGroup
Lock sync.RWMutex
Ready bool
}

func (p *FlusherChecker) Init(context pipeline.Context) error {
p.context = context
p.Ready = true
return nil
}

Expand Down Expand Up @@ -139,7 +141,7 @@ func (p *FlusherChecker) Flush(projectName string, logstoreName string, configNa

// IsReady is ready to flush
func (p *FlusherChecker) IsReady(projectName string, logstoreName string, logstoreKey int64) bool {
return true
return p.Ready
}

// Stop ...
Expand Down

0 comments on commit 9a278cd

Please sign in to comment.