Skip to content

Commit

Permalink
Merge branch '2414-iss-wal-queue' into 'dev'
Browse files Browse the repository at this point in the history
chore: rename consumer with compactor

See merge request cloudcare-tools/datakit!3227
  • Loading branch information
谭彪 committed Oct 18, 2024
2 parents 4b5bcf3 + c56adf6 commit 9d5ced6
Show file tree
Hide file tree
Showing 101 changed files with 4,927 additions and 2,177 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ code_stat:
# promlinter: show prometheuse metrics defined in Datakit.
# go install github.com/yeya24/promlinter/cmd/promlinter@latest
show_metrics:
@promlinter list . --add-help -o md --with-vendor
@promlinter list . --add-help -o md --with-vendor --add-module

clean:
@rm -rf build/*
Expand Down
25 changes: 5 additions & 20 deletions cmd/awslambda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func loadLambdaDefaultConf() {
config.Cfg.HTTPAPI.Listen = "0.0.0.0:9529"
config.Cfg.DefaultEnabledInputs = []string{"awslambda", "ddtrace", "opentelemetry", "statsd"}
config.Cfg.Dataway.MaxRawBodySize = dataway.MinimalRawBodySize
config.Cfg.IO.FlushWorkers = 1
config.Cfg.IO.CompactWorkers = 1
}

func run() {
Expand Down Expand Up @@ -129,27 +129,12 @@ func startIO() {
opts := []dkio.IOOption{
dkio.WithFeederOutputer(dkio.NewAwsLambdaOutput()),
dkio.WithDataway(config.Cfg.Dataway),
dkio.WithMaxCacheCount(c.MaxCacheCount),
dkio.WithDiskCache(c.EnableCache),
dkio.WithDiskCacheSize(c.CacheSizeGB),
dkio.WithCompactAt(c.MaxCacheCount),
dkio.WithFilters(c.Filters),
dkio.WithCacheAll(c.CacheAll),
dkio.WithFlushWorkers(c.FlushWorkers),
dkio.WithCompactWorkers(c.CompactWorkers),
dkio.WithRecorder(config.Cfg.Recorder),
dkio.WithConsumer(false),
}

du, err := time.ParseDuration(c.FlushInterval)
if err != nil {
} else {
opts = append(opts, dkio.WithFlushInterval(du))
}

du, err = time.ParseDuration(c.CacheCleanInterval)
if err != nil {
l.Warnf("parse CacheCleanInterval failed: %s, use default 5s", err)
} else {
opts = append(opts, dkio.WithDiskCacheCleanInterval(du))
dkio.WithCompactInterval(c.CompactInterval),
dkio.WithCompactor(false),
}

dkio.Start(opts...)
Expand Down
34 changes: 18 additions & 16 deletions cmd/datakit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/gitrepo"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/httpapi"
dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/dataway"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/metrics"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/ntp"
plRemote "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/pipeline/remote"
Expand Down Expand Up @@ -219,30 +220,29 @@ func startIO() {
opts := []dkio.IOOption{
dkio.WithFeederOutputer(dkio.NewDatawayOutput(c.FeedChanSize)),
dkio.WithDataway(config.Cfg.Dataway),
dkio.WithMaxCacheCount(c.MaxCacheCount),
dkio.WithDiskCache(c.EnableCache),
dkio.WithDiskCacheSize(c.CacheSizeGB),
dkio.WithCompactAt(c.MaxCacheCount),
dkio.WithFilters(c.Filters),
dkio.WithCacheAll(c.CacheAll),
dkio.WithFlushWorkers(c.FlushWorkers),
dkio.WithCompactWorkers(c.CompactWorkers),
dkio.WithRecorder(config.Cfg.Recorder),
dkio.WithAvailableCPUs(datakit.AvailableCPUs),
}

du, err := time.ParseDuration(c.FlushInterval)
if err != nil {
} else {
opts = append(opts, dkio.WithFlushInterval(du))
}
dkio.Start(opts...)
}

du, err = time.ParseDuration(c.CacheCleanInterval)
if err != nil {
l.Warnf("parse CacheCleanInterval failed: %s, use default 5s", err)
} else {
opts = append(opts, dkio.WithDiskCacheCleanInterval(du))
func startDatawayWorkers() {
dw := config.Cfg.Dataway

// setup extra options on @dw
if dw.WAL.Workers == 0 {
n := datakit.AvailableCPUs * 2
l.Infof("set %d flush WAL workers", n)
dataway.WithWALWorkers(n)(dw)
}

dkio.Start(opts...)
if err := dw.StartFlushWorkers(); err != nil {
l.Errorf("StartFlushWorkers failed: %s", err)
}
}

func gc(du time.Duration) {
Expand Down Expand Up @@ -280,10 +280,12 @@ func doRun() error {
}

cpuLimit := getCurrentCPULimits()
l.Infof("get limited cpu cores: %f", cpuLimit)
if cpuLimit > 1.0 {
datakit.AvailableCPUs = int(cpuLimit)
} // else datakit.AvailableCPUs default to 1

startDatawayWorkers()
startIO()

// start NTP syncer on dataway.
Expand Down
8 changes: 4 additions & 4 deletions cmd/installer/installer/dkconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (
EnableInputs,
CloudProvider,
Proxy,
Dataway string
DatawayURLs string

HTTPPublicAPIs string

Expand Down Expand Up @@ -313,15 +313,15 @@ func preEnableHostobjectInput(cloud string) []byte {
func getDataway() (*dataway.Dataway, error) {
dw := dataway.NewDefaultDataway()

if Dataway != "" {
dw.URLs = strings.Split(Dataway, ",")
if DatawayURLs != "" {
urls := strings.Split(DatawayURLs, ",")

if Proxy != "" {
l.Debugf("set proxy to %s", Proxy)
dw.HTTPProxy = Proxy
}

if err := dw.Init(); err != nil {
if err := dw.Init(dataway.WithURLs(urls...)); err != nil {
return nil, err
} else {
tokens := dw.GetTokens()
Expand Down
4 changes: 2 additions & 2 deletions cmd/installer/installer/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ func Install(svc service.Service, userName string) {
mc.DatakitUser = userName

// prepare dataway info and check token format
if len(Dataway) != 0 {
if len(DatawayURLs) != 0 {
mc.Dataway, err = getDataway()
if err != nil {
l.Errorf("getDataway failed: %s", err.Error())
l.Fatal(err)
}

l.Infof("Set dataway to %s", Dataway)
l.Infof("Set dataway to %s", DatawayURLs)

mc.Dataway.GlobalCustomerKeys = dataway.ParseGlobalCustomerKeys(SinkerGlobalCustomerKeys)
mc.Dataway.EnableSinker = (EnableSinker != "")
Expand Down
14 changes: 11 additions & 3 deletions cmd/installer/installer/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/GuanceCloud/cliutils/logger"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/dataway"
)

var l = logger.DefaultSLogger("upgrade")
Expand Down Expand Up @@ -71,6 +72,13 @@ func upgradeMainConfig(c *config.Config) *config.Config {

c.Dataway.DeprecatedHTTPTimeout = "" // always remove the config
}

if c.Dataway.MaxRawBodySize >= dataway.DeprecatedDefaultMaxRawBodySize {
l.Infof("to save memory, set max-raw-body-size from %d to %d",
c.Dataway.MaxRawBodySize, dataway.DefaultMaxRawBodySize)

c.Dataway.MaxRawBodySize = dataway.DefaultMaxRawBodySize
}
}

l.Infof("Set log to %s", c.Logging.Log)
Expand Down Expand Up @@ -126,9 +134,9 @@ func upgradeMainConfig(c *config.Config) *config.Config {
c.IO.MaxCacheCount = 1000
}

if c.IntervalDeprecated != "" {
c.IO.FlushInterval = c.IntervalDeprecated
c.IntervalDeprecated = ""
if c.IntervalDeprecated != time.Duration(0) {
c.IO.CompactInterval = c.IntervalDeprecated
c.IntervalDeprecated = time.Duration(0)
}

if c.IO.FeedChanSize > 1 {
Expand Down
20 changes: 18 additions & 2 deletions cmd/installer/installer/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/election"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/dataway"
)

func Test_setupDefaultInputs(t *T.T) {
Expand Down Expand Up @@ -252,15 +253,15 @@ func TestUpgradeMainConfig(t *T.T) {
old: func() *config.Config {
c := config.DefaultConfig()
c.IOCacheCountDeprecated = 10
c.IntervalDeprecated = "100s"
c.IntervalDeprecated = 100 * time.Second

return c
}(),

expect: func() *config.Config {
c := config.DefaultConfig()
c.IO.MaxCacheCount = 1000 // auto reset to 10000
c.IO.FlushInterval = "100s"
c.IO.CompactInterval = 100 * time.Second

return c
}(),
Expand All @@ -280,6 +281,21 @@ func TestUpgradeMainConfig(t *T.T) {
return c
}(),
},

{
name: "set-default-raw-body-size",
old: func() *config.Config {
c := config.DefaultConfig()
c.Dataway.MaxRawBodySize = dataway.DeprecatedDefaultMaxRawBodySize

return c
}(),

expect: func() *config.Config {
c := config.DefaultConfig()
return c
}(),
},
}

for _, tc := range cases {
Expand Down
8 changes: 4 additions & 4 deletions cmd/installer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func init() {
// flag.StringVar(&flagAPMInstrumentationLibraries, "apm-instrumentation-libraries", "datadog|java,python",
// "install and use the APM library of the specified provider")

flag.StringVar(&installer.Dataway, "dataway", "", "DataWay host(https://guance.openway.com?token=xxx)")
flag.StringVar(&installer.DatawayURLs, "dataway", "", "DataWay host(https://guance.openway.com?token=xxx)")
flag.StringVar(&installer.Proxy, "proxy", "", "http proxy http://ip:port for datakit")
flag.StringVar(&installer.DatakitName, "name", "", "specify DataKit name, example: prod-env-datakit")
flag.StringVar(&installer.EnableInputs, "enable-inputs", "", "default enable inputs(comma splited, example:cpu,mem,disk)")
Expand Down Expand Up @@ -542,12 +542,12 @@ __downloadOK:
setupUserGroup(userName, userName)

if flagInstallOnly != 0 {
l.Warnf("Only install service %q, NOT started", dkservice.Name)
l.Warnf("Only install service %q, NOT started", dkservice.Name())
} else {
if err = service.Control(svc, "start"); err != nil {
l.Warnf("Start service %q failed: %s", dkservice.Name, err.Error())
l.Warnf("Start service %q failed: %s", dkservice.Name(), err.Error())
} else {
l.Infof("Starting service %q ok", dkservice.Name)
l.Infof("Starting service %q ok", dkservice.Name())
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ require (

require (
github.com/DataDog/ebpf-manager v0.2.16
github.com/GuanceCloud/cliutils v1.1.22-0.20240930074036-255c78c086fd
github.com/GuanceCloud/cliutils v1.1.22-0.20241018104846-17e816f0e123
github.com/GuanceCloud/kubernetes v0.0.0-20230801080916-ca299820872b
github.com/GuanceCloud/zipstream v0.1.0 // indirect
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/GuanceCloud/client_model v0.0.0-20230418154757-93bd4e878a5e h1:i34dA4
github.com/GuanceCloud/client_model v0.0.0-20230418154757-93bd4e878a5e/go.mod h1:PMnE48aPzuRu83FmWZugC0O3d54ZupJd/MmiaYxz8sM=
github.com/GuanceCloud/cliutils v1.1.22-0.20240930074036-255c78c086fd h1:KxbB1a1NybivPLnI+xVcR0WPPXlI1+jCyCmPMJ5LnpE=
github.com/GuanceCloud/cliutils v1.1.22-0.20240930074036-255c78c086fd/go.mod h1:5bIAZ9yA6l7W8MMUKw0+SIZJRpmEwxM6ZYLy4vweTgU=
github.com/GuanceCloud/cliutils v1.1.22-0.20241018104846-17e816f0e123 h1:CigTx24h5Lc/49Zghr8d70jrMeoEJ9tJXUE/79uzbQ0=
github.com/GuanceCloud/cliutils v1.1.22-0.20241018104846-17e816f0e123/go.mod h1:5bIAZ9yA6l7W8MMUKw0+SIZJRpmEwxM6ZYLy4vweTgU=
github.com/GuanceCloud/confd v0.1.101 h1:yjHgfl6YzAlTbFOFMTE4ERpFJzIyovOW7ZFc2/ZssL0=
github.com/GuanceCloud/confd v0.1.101/go.mod h1:o0opIwOX+yNwV9nh56x5ymFMJ+YBD8JuPxBJ7a1mEmo=
github.com/GuanceCloud/dockertest/v3 v3.9.4 h1:ScSNhfA2HSNLfrYoNd1KSRxkrymlKiBE60g4f6eUoOk=
Expand Down
6 changes: 6 additions & 0 deletions internal/cmds/debug_bugreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,12 @@ func (info *datakitInfo) compressDir() (string, error) {
date := time.Now().UnixMilli()
fileName := fmt.Sprintf("info-%d", date)
zipPath := fmt.Sprintf("%s.zip", fileName)

if *flagDebugBugreportTag != "" {
fileName = fmt.Sprintf("%s-info-%d", *flagDebugBugreportTag, date)
zipPath = fmt.Sprintf("%s.zip", fileName)
}

// Open a file to write the compressed data to
zipFile, err := os.Create(filepath.Clean(zipPath))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/cmds/debug_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func debugInput(conf string) error {
dkio.Start(dkio.WithFeederOutputer(dkio.NewDebugOutput()),
// disable filter and consumer, the debug output not implemented the Reader()
dkio.WithFilter(false),
dkio.WithConsumer(false))
dkio.WithCompactor(false))

loadedInputs, err := config.LoadSingleConfFile(conf, inputs.Inputs, false)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/cmds/debug_upload_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type successRes struct {
}

func uploadLog(urls []string) error {
dw := dataway.Dataway{URLs: urls}
dw := dataway.NewDefaultDataway()
dw.URLs = urls

if config.Cfg.Dataway != nil {
if len(config.Cfg.Dataway.HTTPProxy) > 0 {
Expand Down
6 changes: 5 additions & 1 deletion internal/cmds/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ func setupUploader() (uploader, error) {
}

u := &uploaderImpl{
dw: &dataway.Dataway{URLs: dwURLS},
dw: func() *dataway.Dataway {
x := dataway.NewDefaultDataway()
x.URLs = dwURLS
return x
}(),
}

if err := u.dw.Init(); err != nil {
Expand Down
19 changes: 10 additions & 9 deletions internal/cmds/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
)

var moduleMap = map[string]string{
"G": "goroutine",
"B": "basic",
"R": "runtime",
"F": "filter",
"H": "http",
"In": "inputs",
"P": "pipeline",
"IO": "io_stats",
"W": "dataway",
"G": "goroutine",
"B": "basic",
"R": "runtime",
"F": "filter",
"H": "http",
"In": "inputs",
"P": "pipeline",
"IO": "io_stats",
"W": "dataway",
"WAL": "wal",
}

// loadLocalDatakitConf try to find where local datakit listen.
Expand Down
1 change: 1 addition & 0 deletions internal/cmds/parse_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ var (
flagDebugBugreportOSS = fsDebug.String("oss", "", "upload bug report file to specified object storage(format host:bucket:ak:sk)")
flagDebugBugreportDisableProfile = fsDebug.Bool("disable-profile", false, "disable profile collection when running bug-report")
flagDebugBugreportNMetrics = fsDebug.Int("nmetrics", 3, "collect N batch of datakit metrics")
flagDebugBugreportTag = fsDebug.String("tag", "", "ping a tag to current bug report")

flagDebugInputConf = fsDebug.String("input-conf", "", "input TOML conf path")
flagDebugHTTPListen = fsDebug.String("http-listen", "", "setup HTTP server on debugging some inputs(such as some Trace/RUM/...)")
Expand Down
Loading

0 comments on commit 9d5ced6

Please sign in to comment.