Skip to content

Commit

Permalink
Resolve "DataKit 重启后 pipeline 加载落后数据上报"
Browse files Browse the repository at this point in the history
  • Loading branch information
zwj authored and 谭彪 committed Sep 4, 2024
1 parent a2abd1e commit 4f7f331
Show file tree
Hide file tree
Showing 63 changed files with 742 additions and 819 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ require (

require (
github.com/DataDog/ebpf-manager v0.2.16
github.com/GuanceCloud/cliutils v1.1.21-0.20240821062850-3079bb7ed98d
github.com/GuanceCloud/cliutils v1.1.21-0.20240904042137-2a87297900d6
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43
github.com/brianvoe/gofakeit/v6 v6.28.0
github.com/cilium/ebpf v0.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ github.com/DataDog/sketches-go v1.4.1 h1:j5G6as+9FASM2qC36lvpvQAj9qsv/jUs3FtO8Cw
github.com/DataDog/sketches-go v1.4.1/go.mod h1:xJIXldczJyyjnbDop7ZZcLxJdV3+7Kra7H1KMgpgkLk=
github.com/GuanceCloud/client_model v0.0.0-20230418154757-93bd4e878a5e h1:i34dA4kiRTfG+KdvkIXCLPDduarVeFlQhGDD3TefgS4=
github.com/GuanceCloud/client_model v0.0.0-20230418154757-93bd4e878a5e/go.mod h1:PMnE48aPzuRu83FmWZugC0O3d54ZupJd/MmiaYxz8sM=
github.com/GuanceCloud/cliutils v1.1.21-0.20240821062850-3079bb7ed98d h1:dM5YEBXuFuwiZ5roAzxfA8RK2VwE0TnHAl5zAsHSKFg=
github.com/GuanceCloud/cliutils v1.1.21-0.20240821062850-3079bb7ed98d/go.mod h1:Qbeedf/Ji3immd8Ka01NDQG6SP6j8JBnbZwsHTtxyqs=
github.com/GuanceCloud/cliutils v1.1.21-0.20240904042137-2a87297900d6 h1:hXvV/9i3aWkVVnpnHLngbpjZwTU+ut36YxOZOS2J4MM=
github.com/GuanceCloud/cliutils v1.1.21-0.20240904042137-2a87297900d6/go.mod h1:Qbeedf/Ji3immd8Ka01NDQG6SP6j8JBnbZwsHTtxyqs=
github.com/GuanceCloud/confd v0.1.101 h1:yjHgfl6YzAlTbFOFMTE4ERpFJzIyovOW7ZFc2/ZssL0=
github.com/GuanceCloud/confd v0.1.101/go.mod h1:o0opIwOX+yNwV9nh56x5ymFMJ+YBD8JuPxBJ7a1mEmo=
github.com/GuanceCloud/dockertest/v3 v3.9.4 h1:ScSNhfA2HSNLfrYoNd1KSRxkrymlKiBE60g4f6eUoOk=
Expand Down
20 changes: 8 additions & 12 deletions internal/cmds/pldbg.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,28 +215,24 @@ func plScriptTmpStore(category point.Category) (*manager.ScriptStore, map[string
errs := map[string]map[string]error{}

{ // default
ns := manager.DefaultScriptNS
ns := manager.NSDefault
plPath := filepath.Join(datakit.InstallDir, "pipeline")
scripts, scriptsPath := manager.ReadPlScriptFromPlStructPath(plPath)
errs[ns] = store.UpdateScriptsWithNS(ns, scripts[category], scriptsPath[category], nil)
scripts, _ := manager.ReadWorkspaceScripts(plPath)
errs[ns] = store.UpdateScriptsWithNS(ns, scripts[category], nil)
}
{ // gitrepo
ns := manager.GitRepoScriptNS
ns := manager.NSGitRepo
plPath := filepath.Join(datakit.GitReposRepoFullPath, "pipeline")
scripts, scriptsPath := manager.ReadPlScriptFromPlStructPath(plPath)
errs[ns] = store.UpdateScriptsWithNS(ns, scripts[category], scriptsPath[category], nil)
scripts, _ := manager.ReadWorkspaceScripts(plPath)
errs[ns] = store.UpdateScriptsWithNS(ns, scripts[category], nil)
}
{ // remote
ns := manager.RemoteScriptNS
ns := manager.NSRemote
plPath := filepath.Join(datakit.PipelineRemoteDir, plremote.GetConentFileName())
if tarMap, err := targzutil.ReadTarToMap(plPath); err == nil {
allCategory := plremote.ConvertContentMapToThreeMap(tarMap)
scripts := allCategory[category.String()]
scriptsPath := map[string]string{}
for k := range scripts {
scriptsPath[k] = filepath.Join(plPath, category.String(), k)
}
errs[ns] = store.UpdateScriptsWithNS(ns, scripts, scriptsPath, nil)
errs[ns] = store.UpdateScriptsWithNS(ns, scripts, nil)
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/confd/confd.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,9 @@ func getPipelineData() {
// Update pipeline script.
l.Debug("before set pipelines from confd ")

if managerwkr, ok := plval.GetManager(); ok && managerwkr != nil {
manager.LoadScripts2StoreFromPlStructPath(
managerwkr, manager.ConfdScriptNS, datakit.ConfdPipelineDir, nil)
if m, ok := plval.GetManager(); ok && m != nil {
m.LoadScriptsFromWorkspace(
manager.NSConfd, datakit.ConfdPipelineDir, nil)
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func GetPipelinePath(category point.Category, pipeLineName string) (string, erro
}

{
files := plmanager.SearchPlFilePathFromPlStructPath(datakit.GitReposRepoFullPath)
files := plmanager.SearchWorkspaceScripts(datakit.GitReposRepoFullPath)
if f, ok := files[category]; ok {
if plPath, ok := f[pipeLineName]; ok {
if _, err := os.Stat(plPath); err != nil {
Expand All @@ -246,7 +246,7 @@ func GetPipelinePath(category point.Category, pipeLineName string) (string, erro
}
}

files := plmanager.SearchPlFilePathFromPlStructPath(datakit.PipelineDir)
files := plmanager.SearchWorkspaceScripts(datakit.PipelineDir)
if f, ok := files[category]; ok {
if plPath, ok := f[pipeLineName]; ok {
if _, err := os.Stat(plPath); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/httpapi/api_dca.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,8 @@ func ReloadDataKit(ctx context.Context) error {

case 3:
l.Info("before set pipelines")
if managerwkr, ok := plval.GetManager(); ok && managerwkr != nil {
manager.LoadScripts2StoreFromPlStructPath(managerwkr,
manager.GitRepoScriptNS,
if m, ok := plval.GetManager(); ok && m != nil {
m.LoadScriptsFromWorkspace(manager.NSGitRepo,
filepath.Join(datakit.GitReposRepoFullPath, "pipeline"), nil)
}

Expand Down Expand Up @@ -895,7 +894,7 @@ func dcaTestPipelines(c *gin.Context) {

category := point.CatString(body.Category)

pls, errs := pipeline.NewPipelineMulti(category, body.Pipeline[body.Category], nil, nil)
pls, errs := pipeline.NewPipelineMulti(category, body.Pipeline[body.Category], nil)
if err, ok := errs[body.ScriptName]; ok && err != nil {
context.fail(dcaError{ErrorCode: "400", ErrorMsg: fmt.Sprintf("pipeline parse error: %s", err.Error())})
return
Expand Down Expand Up @@ -935,6 +934,7 @@ func dcaTestPipelines(c *gin.Context) {
point.RUM,
point.Security,
point.Tracing,
point.DialTesting,
point.UnknownCategory:

arr, err := dec.Decode([]byte(data))
Expand Down
2 changes: 1 addition & 1 deletion internal/httpapi/api_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func apiPipelineDebugHandler(w http.ResponseWriter, req *http.Request, whatever
func parsePipeline(category point.Category, scriptName string,
scripts map[string]string, buks *plmap.AggBuckets,
) (*manager.PlScript, error) {
success, faild := pipeline.NewPipelineMulti(category, scripts, nil, buks)
success, faild := pipeline.NewPipelineMulti(category, scripts, buks)
if err, ok := faild[scriptName]; ok && err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/io/dataway/ptgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (ptg *ptGrouper) setExtKVs() {
switch ptg.cat {
case
point.Logging,
point.DialTesting,
point.Network,
point.KeyEvent,
point.RUM:
Expand Down
5 changes: 2 additions & 3 deletions internal/io/feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@ func TestRunpl(t *T.T) {
if !ok {
t.Error("!ok")
}
manager.LoadScripts(m, manager.RemoteScriptNS,
m.LoadScripts(manager.NSRemote,
map[point.Category]map[string]string{
point.Logging: {"a.p": "add_key('a', 1)"},
},
nil, nil)
}, nil)
if _, ok := m.QueryScript(point.Logging, "a.p"); !ok {
t.Error("!ok")
}
Expand Down
1 change: 1 addition & 0 deletions internal/io/filter/inputdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (d *KVs) Setup(category point.Category, pt *point.Point) {
switch category {
case
point.Logging,
point.DialTesting,
point.Network,
point.KeyEvent,
point.RUM:
Expand Down
28 changes: 16 additions & 12 deletions internal/io/pipeline_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ package io
import (
"encoding/base64"
"fmt"

"github.com/GuanceCloud/cliutils/point"
)

// PullPipeline returns name/text, updateTime, err.
func PullPipeline(ts, relaTS int64) (mFiles, plRelation map[string]map[string]string,
defaultPl map[string]string, updateTime int64, relationTS int64, err error,
func PullPipeline(ts, relaTS int64) (mFiles, plRelation map[point.Category]map[string]string,
defaultPl map[point.Category]string, updateTime int64, relationTS int64, err error,
) {
defer func() {
if err := recover(); err != nil {
Expand All @@ -35,34 +37,36 @@ func PullPipeline(ts, relaTS int64) (mFiles, plRelation map[string]map[string]st
}

func parsePipelinePullStruct(pulledStruct *pullPipelineReturn) (
map[string]map[string]string, map[string]map[string]string,
map[string]string, int64, int64, error,
map[point.Category]map[string]string, map[point.Category]map[string]string,
map[point.Category]string, int64, int64, error,
) {
mFiles := make(map[string]map[string]string)
defaultPl := make(map[string]string)
mFiles := make(map[point.Category]map[string]string)
defaultPl := make(map[point.Category]string)
for _, v := range pulledStruct.Pipelines {
bys, err := base64.StdEncoding.DecodeString(v.Base64Text)
if err != nil {
return nil, nil, nil, 0, 0, err
}

cat := point.CatString(v.Category)
if v.AsDefault {
defaultPl[v.Category] = v.Name
defaultPl[cat] = v.Name
}

if val, ok := mFiles[v.Category]; ok {
if val, ok := mFiles[cat]; ok {
val[v.Name] = string(bys)
} else {
mf := make(map[string]string)
mf[v.Name] = string(bys)
mFiles[v.Category] = mf
mFiles[cat] = mf
}
}

plRelation := make(map[string]map[string]string)
plRelation := make(map[point.Category]map[string]string)
for _, v := range pulledStruct.Relation {
if m, ok := plRelation[v.Category]; !ok {
plRelation[v.Category] = map[string]string{
cat := point.CatString(v.Category)
if m, ok := plRelation[cat]; !ok {
plRelation[cat] = map[string]string{
v.Source: v.Name,
}
} else {
Expand Down
Loading

0 comments on commit 4f7f331

Please sign in to comment.