Skip to content

Commit

Permalink
support local default pipeline script
Browse files Browse the repository at this point in the history
  • Loading branch information
zwj authored and 谭彪 committed Nov 2, 2024
1 parent fd9d3e4 commit 68aa7c9
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 1 deletion.
10 changes: 10 additions & 0 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,16 @@ func (c *Config) loadElectionEnvs() {
c.Election.Tags["election_namespace"] = c.Election.Namespace
}

if v := datakit.GetEnv("ENV_PIPELINE_DEFAULT_PIPELINE"); v != "" {
var result map[string]string
if err := json.Unmarshal([]byte(v), &result); err != nil {
l.Errorf("unmarshal `ENV_PIPELINE_DEFAULT_PIPELINE` failed: %s",
err.Error())
} else {
c.Pipeline.DefaultPipeline = result
}
}

for _, x := range []string{
"ENV_GLOBAL_ELECTION_TAGS",
"ENV_GLOBAL_ENV_TAGS", // Deprecated
Expand Down
1 change: 1 addition & 0 deletions internal/config/mainconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func DefaultConfig() *Config {
RemotePullInterval: "1m",
ReferTableURL: "",
ReferTablePullInterval: "5m",
DefaultPipeline: map[string]string{},
Offload: &offload.OffloadConfig{
Receiver: offload.DKRcv,
Addresses: []string{},
Expand Down
6 changes: 6 additions & 0 deletions internal/datakit/dkconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ ulimit = 64000
# append run info
disable_append_run_info = false
# default pipeline
[pipeline.default_pipeline]
# logging = "<your_script.p>"
# metric = "<your_script.p>"
# tracing = "<your_script.p>"
# Offload data processing tasks to post-level data processors.
[pipeline.offload]
receiver = "datakit-http"
Expand Down
20 changes: 20 additions & 0 deletions internal/export/doc/en/datakit-conf.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,26 @@ The lookup priority is defined as follows:

2. If not found in *git_repos* , go to the *<Datakit Installation Directory\>/pipeline* directory for the Pipeline script, or go to the *<Datakit Installation Directory\>/python.d* directory for the Python script.

### Locally set Pipeline default script {#pipeline-settings}

[:octicons-tag-24: Version-1.61.0](changelog.md#cl-1.61.0)

Supports setting the default Pipeline script locally. If it conflicts with the default script set remotely, the local setting is preferred.

It can be configured in two ways:

- Host deployment, you can specify the default scripts for each category in the DataKit main configuration file, as follows:

```toml
# default pipeline
[pipeline.default_pipeline]
# logging = "<your_script.p>"
# metric = "<your_script.p>"
# tracing = "<your_script.p>"
```

- Container deployment, you can use the environment variable, `ENV_PIPELINE_DEFAULT_PIPELINE`, its value is, for example, `{"logging":"abc.p","metric":"xyz.p"}`

### Set the Maximum Value of Open File Descriptor {#enable-max-fd}

In a Linux environment, you can configure the ulimit entry in the Datakit main configuration file to set the maximum number of open files for Datakit, as follows:
Expand Down
20 changes: 20 additions & 0 deletions internal/export/doc/zh/datakit-conf.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,26 @@ Kubernetes 下部署相关配置参见[这里](datakit-daemonset-deploy.md#env-d

参见[这里](git-config-how-to.md)

### 本地设置 Pipeline 默认脚本 {#pipeline-settings}

[:octicons-tag-24: Version-1.61.0](changelog.md#cl-1.61.0)

支持通过本地设置默认 Pipeline 脚本,如果与远程设置的默认脚本冲突,则倾向本地设置。

可通过两种方式配置:

- 主机方式部署,可在 DataKit 主配置文件中指定各类别的默认脚本,如下:

```toml
# default pipeline
[pipeline.default_pipeline]
# logging = "<your_script.p>"
# metric = "<your_script.p>"
# tracing = "<your_script.p>"
```

- 容器方式部署,可使用环境变量,`ENV_PIPELINE_DEFAULT_PIPELINE`,其值例如 `{"logging":"abc.p","metric":"xyz.p"}`

### 设置打开的文件描述符的最大值 {#enable-max-fd}

Linux 环境下,可以在 Datakit 主配置文件中配置 `ulimit` 项,以设置 Datakit 的最大可打开文件数,如下:
Expand Down
8 changes: 8 additions & 0 deletions internal/export/non_input_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func envCommon() []*inputs.ENVInfo {
Desc: "Global tag, multiple tags are divided by English commas. The old `ENV_GLOBAL_TAGS` will be discarded",
DescZh: "全局 tag,多个 tag 之间以英文逗号分割",
},
{
ENVName: "ENV_PIPELINE_DEFAULT_PIPELINE",
Type: doc.Map,
Example: "`{\"logging\":\"abc.p\",\"metric\":\"xyz.p\"}`",
Desc: "Set the default Pipeline script for the specified data category. " +
"This setting takes precedence when it conflicts with the remote setting.",
DescZh: "为指定数据类别设置默认 Pipeline 脚本;与远程设置冲突时,此设置优先",
},
{
ENVName: "~~ENV_GLOBAL_TAGS~~",
Type: doc.List,
Expand Down
1 change: 1 addition & 0 deletions internal/pipeline/plval/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type PipelineCfg struct {
SQLiteMemMode bool `toml:"sqlite_mem_mode"`
Offload *offload.OffloadConfig `toml:"offload"`
EnableDebugFields bool `toml:"_"`
DefaultPipeline map[string]string `toml:"default_pipeline"`

DeprecatedDisableAppendRunInfo bool `toml:"disable_append_run_info"`
}
Expand Down
28 changes: 28 additions & 0 deletions internal/pipeline/plval/plval.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ func GetOffload() (*offload.OffloadWorker, bool) {

const maxCustomer = 16

var localDefaultPipeline map[point.Category]string

func GetLocalDefaultPipeline() map[point.Category]string {
return localDefaultPipeline
}

func PreferLocalDefaultPipeline(m map[point.Category]string) map[point.Category]string {
result := map[point.Category]string{}
for k, v := range m {
result[k] = v
}
for k, v := range GetLocalDefaultPipeline() {
result[k] = v
}

return result
}

func InitPlVal(cfg *PipelineCfg, upFn plmap.UploadFunc, gTags map[string]string,
installDir string,
) error {
Expand Down Expand Up @@ -124,6 +142,16 @@ func InitPlVal(cfg *PipelineCfg, upFn plmap.UploadFunc, gTags map[string]string,
_enableAppendRunInfo = true
}

if cfg != nil && len(cfg.DefaultPipeline) > 0 {
mp := map[point.Category]string{}
for k, v := range cfg.DefaultPipeline {
mp[point.CatString(k)] = v
}
localDefaultPipeline = mp
managerIns.UpdateDefaultScript(mp)
l.Infof("set default pipeline: %v", mp)
}

// init refer-table
if cfg != nil && cfg.ReferTableURL != "" {
dur, err := time.ParseDuration(cfg.ReferTablePullInterval)
Expand Down
3 changes: 2 additions & 1 deletion internal/pipeline/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func doPull(pathConfig, pathRelation, siteURL string, ipr IPipelineRemote) error
l.Debug("dumpFiles succeeded")

loadContentPipeline(mFiles)
managerWkr.UpdateDefaultScript(defaultPl)
combineLocal := plval.PreferLocalDefaultPipeline(defaultPl)
managerWkr.UpdateDefaultScript(combineLocal)

err = updatePipelineRemoteConfig(pathConfig, siteURL, updateTime, ipr)
if err != nil {
Expand Down

0 comments on commit 68aa7c9

Please sign in to comment.