Skip to content

Commit

Permalink
fix: release pipelines manually before process exit (#2115)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc authored Mar 3, 2025
1 parent f7f76aa commit b544039
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 11 deletions.
2 changes: 2 additions & 0 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ void Application::Exit() {
// TODO: make it common
FlusherSLS::RecycleResourceIfNotUsed();

CollectionPipelineManager::GetInstance()->ClearAllPipelines();

#if defined(_MSC_VER)
ReleaseWindowsSignalObject();
#endif
Expand Down
37 changes: 28 additions & 9 deletions core/collection_pipeline/CollectionPipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

#include "collection_pipeline/CollectionPipelineManager.h"

#include "HostMonitorInputRunner.h"
#include "file_server/ConfigManager.h"
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>

#include "file_server/FileServer.h"
#include "go_pipeline/LogtailPlugin.h"
#include "host_monitor/HostMonitorInputRunner.h"
#include "prometheus/PrometheusInputRunner.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "ebpf/eBPFServer.h"
#endif
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
#include "config/feedbacker/ConfigFeedbackReceiver.h"
#include "runner/ProcessorRunner.h"
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
Expand Down Expand Up @@ -64,13 +66,16 @@ void logtail::CollectionPipelineManager::UpdatePipelines(CollectionConfigDiff& d
if (isFileServerStarted && isFileServerInputChanged) {
FileServer::GetInstance()->Pause();
}

// other threads only read mPipelineNameEntityMap, so we don't need to lock read here
for (const auto& name : diff.mRemoved) {
auto iter = mPipelineNameEntityMap.find(name);
iter->second->Stop(true);
DecreasePluginUsageCnt(iter->second->GetPluginStatistics());
iter->second->RemoveProcessQueue();
mPipelineNameEntityMap.erase(iter);
{
unique_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
mPipelineNameEntityMap.erase(name);
}
ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(name,
ConfigFeedbackStatus::DELETED);
}
Expand All @@ -97,8 +102,10 @@ void logtail::CollectionPipelineManager::UpdatePipelines(CollectionConfigDiff& d
auto iter = mPipelineNameEntityMap.find(config.mName);
iter->second->Stop(false);
DecreasePluginUsageCnt(iter->second->GetPluginStatistics());

mPipelineNameEntityMap[config.mName] = p;
{
unique_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
mPipelineNameEntityMap[config.mName] = p;
}
IncreasePluginUsageCnt(p->GetPluginStatistics());
p->Start();
ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName,
Expand All @@ -122,7 +129,10 @@ void logtail::CollectionPipelineManager::UpdatePipelines(CollectionConfigDiff& d
}
LOG_INFO(sLogger,
("pipeline building for new config succeeded", "begin to start pipeline")("config", config.mName));
mPipelineNameEntityMap[config.mName] = p;
{
unique_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
mPipelineNameEntityMap[config.mName] = p;
}
IncreasePluginUsageCnt(p->GetPluginStatistics());
p->Start();
ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName,
Expand Down Expand Up @@ -157,6 +167,7 @@ void logtail::CollectionPipelineManager::UpdatePipelines(CollectionConfigDiff& d
}

const shared_ptr<CollectionPipeline>& CollectionPipelineManager::FindConfigByName(const string& configName) const {
shared_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
auto it = mPipelineNameEntityMap.find(configName);
if (it != mPipelineNameEntityMap.end()) {
return it->second;
Expand All @@ -165,6 +176,7 @@ const shared_ptr<CollectionPipeline>& CollectionPipelineManager::FindConfigByNam
}

vector<string> CollectionPipelineManager::GetAllConfigNames() const {
shared_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
vector<string> res;
for (const auto& item : mPipelineNameEntityMap) {
res.push_back(item.first);
Expand Down Expand Up @@ -203,6 +215,11 @@ void CollectionPipelineManager::StopAllPipelines() {
LOG_INFO(sLogger, ("stop all pipelines", "succeeded"));
}

void CollectionPipelineManager::ClearAllPipelines() {
unique_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
mPipelineNameEntityMap.clear();
}

shared_ptr<CollectionPipeline> CollectionPipelineManager::BuildPipeline(CollectionConfig&& config) {
shared_ptr<CollectionPipeline> p = make_shared<CollectionPipeline>();
// only config.mDetail is removed, other members can be safely used later
Expand All @@ -213,6 +230,7 @@ shared_ptr<CollectionPipeline> CollectionPipelineManager::BuildPipeline(Collecti
}

void CollectionPipelineManager::FlushAllBatch() {
shared_lock<shared_mutex> lock(mPipelineNameEntityMapMutex);
for (const auto& item : mPipelineNameEntityMap) {
item.second->FlushBatch();
}
Expand All @@ -237,6 +255,7 @@ void CollectionPipelineManager::DecreasePluginUsageCnt(
}

bool CollectionPipelineManager::CheckIfFileServerUpdated(CollectionConfigDiff& diff) {
// private method, no need to lock mPipelineNameEntityMapMutex
for (const auto& name : diff.mRemoved) {
string inputType = mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
Expand Down
4 changes: 4 additions & 0 deletions core/collection_pipeline/CollectionPipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <memory>
#include <shared_mutex>
#include <string>
#include <unordered_map>

Expand All @@ -41,12 +42,14 @@ class CollectionPipelineManager {
const std::shared_ptr<CollectionPipeline>& FindConfigByName(const std::string& configName) const;
std::vector<std::string> GetAllConfigNames() const;
std::string GetPluginStatistics() const;

// for shennong only
const std::unordered_map<std::string, std::shared_ptr<CollectionPipeline>>& GetAllPipelines() const {
return mPipelineNameEntityMap;
}
// 过渡使用
void StopAllPipelines();
void ClearAllPipelines(); // only used when exiting

private:
CollectionPipelineManager();
Expand All @@ -61,6 +64,7 @@ class CollectionPipelineManager {
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(CollectionConfigDiff& diff);

mutable std::shared_mutex mPipelineNameEntityMapMutex;
std::unordered_map<std::string, std::shared_ptr<CollectionPipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>> mPluginCntMap;
Expand Down
4 changes: 2 additions & 2 deletions core/plugin/processor/inner/ProcessorTagNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ bool ProcessorTagNative::IsSupportedEvent(const PipelineEventPtr& /*e*/) const {
void ProcessorTagNative::AddTag(PipelineEventGroup& logGroup, TagKey tagKey, const string& value) const {
auto it = mPipelineMetaTagKey.find(tagKey);
if (it != mPipelineMetaTagKey.end()) {
if (!it->second.empty()) {
if (!value.empty() && !it->second.empty()) {
auto sb = logGroup.GetSourceBuffer()->CopyString(value);
logGroup.SetTagNoCopy(it->second, StringView(sb.data, sb.size));
}
Expand All @@ -163,7 +163,7 @@ void ProcessorTagNative::AddTag(PipelineEventGroup& logGroup, TagKey tagKey, con
void ProcessorTagNative::AddTag(PipelineEventGroup& logGroup, TagKey tagKey, StringView value) const {
auto it = mPipelineMetaTagKey.find(tagKey);
if (it != mPipelineMetaTagKey.end()) {
if (!it->second.empty()) {
if (!value.empty() && !it->second.empty()) {
logGroup.SetTagNoCopy(it->second, value);
}
// empty value means delete
Expand Down

0 comments on commit b544039

Please sign in to comment.