From 68aa7c98b649a5fac4216c6cc9791291bb16e1a4 Mon Sep 17 00:00:00 2001 From: zwj Date: Sat, 2 Nov 2024 12:04:45 +0800 Subject: [PATCH] support local default pipeline script --- internal/config/env.go | 10 +++++++++ internal/config/mainconf.go | 1 + internal/datakit/dkconf.go | 6 ++++++ internal/export/doc/en/datakit-conf.md | 20 ++++++++++++++++++ internal/export/doc/zh/datakit-conf.md | 20 ++++++++++++++++++ internal/export/non_input_docs.go | 8 ++++++++ internal/pipeline/plval/cfg.go | 1 + internal/pipeline/plval/plval.go | 28 ++++++++++++++++++++++++++ internal/pipeline/remote/remote.go | 3 ++- 9 files changed, 96 insertions(+), 1 deletion(-) diff --git a/internal/config/env.go b/internal/config/env.go index 874d48888f..dd17c32732 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -472,6 +472,16 @@ func (c *Config) loadElectionEnvs() { c.Election.Tags["election_namespace"] = c.Election.Namespace } + if v := datakit.GetEnv("ENV_PIPELINE_DEFAULT_PIPELINE"); v != "" { + var result map[string]string + if err := json.Unmarshal([]byte(v), &result); err != nil { + l.Errorf("unmarshal `ENV_PIPELINE_DEFAULT_PIPELINE` failed: %s", + err.Error()) + } else { + c.Pipeline.DefaultPipeline = result + } + } + for _, x := range []string{ "ENV_GLOBAL_ELECTION_TAGS", "ENV_GLOBAL_ENV_TAGS", // Deprecated diff --git a/internal/config/mainconf.go b/internal/config/mainconf.go index a5e09c0ad0..068204fc3c 100644 --- a/internal/config/mainconf.go +++ b/internal/config/mainconf.go @@ -201,6 +201,7 @@ func DefaultConfig() *Config { RemotePullInterval: "1m", ReferTableURL: "", ReferTablePullInterval: "5m", + DefaultPipeline: map[string]string{}, Offload: &offload.OffloadConfig{ Receiver: offload.DKRcv, Addresses: []string{}, diff --git a/internal/datakit/dkconf.go b/internal/datakit/dkconf.go index f053033a01..84c733fc5d 100644 --- a/internal/datakit/dkconf.go +++ b/internal/datakit/dkconf.go @@ -99,6 +99,12 @@ ulimit = 64000 # append run info disable_append_run_info = false + # default pipeline + [pipeline.default_pipeline] + # logging = "" + # metric = "" + # tracing = "" + # Offload data processing tasks to post-level data processors. [pipeline.offload] receiver = "datakit-http" diff --git a/internal/export/doc/en/datakit-conf.md b/internal/export/doc/en/datakit-conf.md index 21faeeddbd..aac6c81a02 100644 --- a/internal/export/doc/en/datakit-conf.md +++ b/internal/export/doc/en/datakit-conf.md @@ -430,6 +430,26 @@ The lookup priority is defined as follows: 2. If not found in *git_repos* , go to the */pipeline* directory for the Pipeline script, or go to the */python.d* directory for the Python script. +### Locally set Pipeline default script {#pipeline-settings} + +[:octicons-tag-24: Version-1.61.0](changelog.md#cl-1.61.0) + +Supports setting the default Pipeline script locally. If it conflicts with the default script set remotely, the local setting is preferred. + +It can be configured in two ways: + +- Host deployment, you can specify the default scripts for each category in the DataKit main configuration file, as follows: + + ```toml + # default pipeline + [pipeline.default_pipeline] + # logging = "" + # metric = "" + # tracing = "" + ``` + +- Container deployment, you can use the environment variable, `ENV_PIPELINE_DEFAULT_PIPELINE`, its value is, for example, `{"logging":"abc.p","metric":"xyz.p"}` + ### Set the Maximum Value of Open File Descriptor {#enable-max-fd} In a Linux environment, you can configure the ulimit entry in the Datakit main configuration file to set the maximum number of open files for Datakit, as follows: diff --git a/internal/export/doc/zh/datakit-conf.md b/internal/export/doc/zh/datakit-conf.md index 4d2d97b5c5..b6c9f8601e 100644 --- a/internal/export/doc/zh/datakit-conf.md +++ b/internal/export/doc/zh/datakit-conf.md @@ -351,6 +351,26 @@ Kubernetes 下部署相关配置参见[这里](datakit-daemonset-deploy.md#env-d 参见[这里](git-config-how-to.md) +### 本地设置 Pipeline 默认脚本 {#pipeline-settings} + +[:octicons-tag-24: Version-1.61.0](changelog.md#cl-1.61.0) + +支持通过本地设置默认 Pipeline 脚本,如果与远程设置的默认脚本冲突,则倾向本地设置。 + +可通过两种方式配置: + +- 主机方式部署,可在 DataKit 主配置文件中指定各类别的默认脚本,如下: + + ```toml + # default pipeline + [pipeline.default_pipeline] + # logging = "" + # metric = "" + # tracing = "" + ``` + +- 容器方式部署,可使用环境变量,`ENV_PIPELINE_DEFAULT_PIPELINE`,其值例如 `{"logging":"abc.p","metric":"xyz.p"}` + ### 设置打开的文件描述符的最大值 {#enable-max-fd} Linux 环境下,可以在 Datakit 主配置文件中配置 `ulimit` 项,以设置 Datakit 的最大可打开文件数,如下: diff --git a/internal/export/non_input_docs.go b/internal/export/non_input_docs.go index 67d90e9c0e..5b0f417904 100644 --- a/internal/export/non_input_docs.go +++ b/internal/export/non_input_docs.go @@ -73,6 +73,14 @@ func envCommon() []*inputs.ENVInfo { Desc: "Global tag, multiple tags are divided by English commas. The old `ENV_GLOBAL_TAGS` will be discarded", DescZh: "全局 tag,多个 tag 之间以英文逗号分割", }, + { + ENVName: "ENV_PIPELINE_DEFAULT_PIPELINE", + Type: doc.Map, + Example: "`{\"logging\":\"abc.p\",\"metric\":\"xyz.p\"}`", + Desc: "Set the default Pipeline script for the specified data category. " + + "This setting takes precedence when it conflicts with the remote setting.", + DescZh: "为指定数据类别设置默认 Pipeline 脚本;与远程设置冲突时,此设置优先", + }, { ENVName: "~~ENV_GLOBAL_TAGS~~", Type: doc.List, diff --git a/internal/pipeline/plval/cfg.go b/internal/pipeline/plval/cfg.go index 48b929b2a3..f5b095434d 100644 --- a/internal/pipeline/plval/cfg.go +++ b/internal/pipeline/plval/cfg.go @@ -40,6 +40,7 @@ type PipelineCfg struct { SQLiteMemMode bool `toml:"sqlite_mem_mode"` Offload *offload.OffloadConfig `toml:"offload"` EnableDebugFields bool `toml:"_"` + DefaultPipeline map[string]string `toml:"default_pipeline"` DeprecatedDisableAppendRunInfo bool `toml:"disable_append_run_info"` } diff --git a/internal/pipeline/plval/plval.go b/internal/pipeline/plval/plval.go index ddfc85a275..32cfe86428 100644 --- a/internal/pipeline/plval/plval.go +++ b/internal/pipeline/plval/plval.go @@ -89,6 +89,24 @@ func GetOffload() (*offload.OffloadWorker, bool) { const maxCustomer = 16 +var localDefaultPipeline map[point.Category]string + +func GetLocalDefaultPipeline() map[point.Category]string { + return localDefaultPipeline +} + +func PreferLocalDefaultPipeline(m map[point.Category]string) map[point.Category]string { + result := map[point.Category]string{} + for k, v := range m { + result[k] = v + } + for k, v := range GetLocalDefaultPipeline() { + result[k] = v + } + + return result +} + func InitPlVal(cfg *PipelineCfg, upFn plmap.UploadFunc, gTags map[string]string, installDir string, ) error { @@ -124,6 +142,16 @@ func InitPlVal(cfg *PipelineCfg, upFn plmap.UploadFunc, gTags map[string]string, _enableAppendRunInfo = true } + if cfg != nil && len(cfg.DefaultPipeline) > 0 { + mp := map[point.Category]string{} + for k, v := range cfg.DefaultPipeline { + mp[point.CatString(k)] = v + } + localDefaultPipeline = mp + managerIns.UpdateDefaultScript(mp) + l.Infof("set default pipeline: %v", mp) + } + // init refer-table if cfg != nil && cfg.ReferTableURL != "" { dur, err := time.ParseDuration(cfg.ReferTablePullInterval) diff --git a/internal/pipeline/remote/remote.go b/internal/pipeline/remote/remote.go index 5530011f02..8bd8d94689 100644 --- a/internal/pipeline/remote/remote.go +++ b/internal/pipeline/remote/remote.go @@ -266,7 +266,8 @@ func doPull(pathConfig, pathRelation, siteURL string, ipr IPipelineRemote) error l.Debug("dumpFiles succeeded") loadContentPipeline(mFiles) - managerWkr.UpdateDefaultScript(defaultPl) + combineLocal := plval.PreferLocalDefaultPipeline(defaultPl) + managerWkr.UpdateDefaultScript(combineLocal) err = updatePipelineRemoteConfig(pathConfig, siteURL, updateTime, ipr) if err != nil {