From bdb34e15dc8fcd46fd50dfa8f616598de40ec18b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=AD=E5=BD=AA?= Date: Thu, 19 Sep 2024 09:25:17 +0800 Subject: [PATCH] feat: add NTP sync on dataway --- cmd/datakit/main.go | 8 ++ internal/config/env.go | 18 ++++ internal/config/env_test.go | 131 +++++++++++++----------- internal/datakit/datakit.go | 1 + internal/datakit/dkconf.go | 9 ++ internal/export/non_input_docs.go | 15 +++ internal/io/dataway/dw.go | 12 +++ internal/io/dataway/dwapis.go | 67 ++++++++++++ internal/io/dataway/dwapis_test.go | 34 ++++++ internal/io/dataway/endpoint.go | 4 +- internal/ntp/metrics.go | 44 ++++++++ internal/ntp/ntp.go | 74 +++++++++++++ internal/ntp/ntp_test.go | 54 ++++++++++ internal/plugins/inputs/chrony/input.go | 3 +- 14 files changed, 410 insertions(+), 64 deletions(-) create mode 100644 internal/ntp/metrics.go create mode 100644 internal/ntp/ntp.go create mode 100644 internal/ntp/ntp_test.go diff --git a/cmd/datakit/main.go b/cmd/datakit/main.go index a0ab435f47..044bfe9605 100644 --- a/cmd/datakit/main.go +++ b/cmd/datakit/main.go @@ -30,6 +30,7 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/httpapi" dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/metrics" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/ntp" plRemote "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/pipeline/remote" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs" _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/all" @@ -285,6 +286,13 @@ func doRun() error { startIO() + // start NTP syncer on dataway. + if n := config.Cfg.Dataway.NTP; n != nil { + ntp.StartNTP(config.Cfg.Dataway, + n.Interval, + uint64(n.SyncOnDiff/time.Second)) + } + checkutil.CheckConditionExit(func() bool { if err := dnswatcher.StartWatch(); err != nil { return false diff --git a/internal/config/env.go b/internal/config/env.go index 92d4ebe2ca..3ae977c738 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -384,6 +384,24 @@ func (c *Config) loadDatawayEnvs() { c.Dataway.GlobalCustomerKeys = dataway.ParseGlobalCustomerKeys(v) l.Infof("set global custom keys to %v", c.Dataway.GlobalCustomerKeys) } + + if c.Dataway.NTP != nil { + if v := datakit.GetEnv("ENV_DATAWAY_NTP_INTERVAL"); v != "" { + if du, err := time.ParseDuration(v); err == nil { + c.Dataway.NTP.Interval = du + } else { + l.Warnf("invalid ENV_DATAWAY_NTP_INTERVAL: %q: %s, ignored", v, err.Error()) + } + } + + if v := datakit.GetEnv("ENV_DATAWAY_NTP_DIFF"); v != "" { + if du, err := time.ParseDuration(v); err == nil { + c.Dataway.NTP.SyncOnDiff = du + } else { + l.Warnf("invalid ENV_DATAWAY_NTP_DIFF: %q: %s, ignored", v, err.Error()) + } + } + } } func (c *Config) loadElectionEnvs() { diff --git a/internal/config/env_test.go b/internal/config/env_test.go index e2ae8d5b90..02cf046812 100644 --- a/internal/config/env_test.go +++ b/internal/config/env_test.go @@ -23,6 +23,21 @@ func TestLoadEnv(t *testing.T) { envs map[string]string expect *Config }{ + { + name: "test-dataway-ntp", + envs: map[string]string{ + "ENV_DATAWAY_NTP_INTERVAL": "5m", + "ENV_DATAWAY_NTP_DIFF": "30s", + }, + + expect: func() *Config { + cfg := DefaultConfig() + cfg.Dataway.NTP.Interval = 5 * time.Minute + cfg.Dataway.NTP.SyncOnDiff = 30 * time.Second + + return cfg + }(), + }, { name: "test-recorder-envs", envs: map[string]string{ @@ -115,21 +130,21 @@ func TestLoadEnv(t *testing.T) { cfg := DefaultConfig() cfg.Name = "testing-datakit" - cfg.Dataway = &dataway.Dataway{ - URLs: []string{"http://host1.org", "http://host2.com"}, - MaxIdleConnsPerHost: 123, - HTTPProxy: "http://1.2.3.4:1234", - EnableHTTPTrace: true, - IdleTimeout: 90 * time.Second, - HTTPTimeout: 30 * time.Second, - ContentEncoding: "v2", - MaxRetryCount: dataway.DefaultRetryCount, - InsecureSkipVerify: true, - RetryDelay: dataway.DefaultRetryDelay, - MaxRawBodySize: dataway.DefaultMaxRawBodySize, - GlobalCustomerKeys: []string{}, - GZip: true, - } + + cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"} + + cfg.Dataway.MaxIdleConnsPerHost = 123 + cfg.Dataway.HTTPProxy = "http://1.2.3.4:1234" + cfg.Dataway.EnableHTTPTrace = true + cfg.Dataway.IdleTimeout = 90 * time.Second + cfg.Dataway.HTTPTimeout = 30 * time.Second + cfg.Dataway.ContentEncoding = "v2" + cfg.Dataway.MaxRetryCount = dataway.DefaultRetryCount + cfg.Dataway.InsecureSkipVerify = true + cfg.Dataway.RetryDelay = dataway.DefaultRetryDelay + cfg.Dataway.MaxRawBodySize = dataway.DefaultMaxRawBodySize + cfg.Dataway.GlobalCustomerKeys = []string{} + cfg.Dataway.GZip = true cfg.HTTPAPI.RUMOriginIPHeader = "not-set" cfg.HTTPAPI.Listen = "localhost:9559" @@ -352,21 +367,19 @@ func TestLoadEnv(t *testing.T) { cfg.ProtectMode = false - cfg.Dataway = &dataway.Dataway{ - URLs: []string{"http://host1.org", "http://host2.com"}, - MaxIdleConnsPerHost: 0, - MaxIdleConns: 100, - EnableHTTPTrace: true, - IdleTimeout: 100 * time.Second, - HTTPTimeout: time.Minute, - GlobalCustomerKeys: []string{"key1", "key2"}, - MaxRetryCount: 8, - RetryDelay: time.Second * 5, - MaxRawBodySize: 1024 * 32, - ContentEncoding: "v2", - EnableSinker: true, - GZip: true, - } + cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"} + cfg.Dataway.MaxIdleConnsPerHost = 0 + cfg.Dataway.MaxIdleConns = 100 + cfg.Dataway.EnableHTTPTrace = true + cfg.Dataway.IdleTimeout = 100 * time.Second + cfg.Dataway.HTTPTimeout = time.Minute + cfg.Dataway.GlobalCustomerKeys = []string{"key1", "key2"} + cfg.Dataway.MaxRetryCount = 8 + cfg.Dataway.RetryDelay = time.Second * 5 + cfg.Dataway.MaxRawBodySize = 1024 * 32 + cfg.Dataway.ContentEncoding = "v2" + cfg.Dataway.EnableSinker = true + cfg.Dataway.GZip = true return cfg }(), @@ -394,21 +407,19 @@ func TestLoadEnv(t *testing.T) { expect: func() *Config { cfg := DefaultConfig() - cfg.Dataway = &dataway.Dataway{ - URLs: []string{"http://host1.org", "http://host2.com"}, - MaxIdleConnsPerHost: 0, - MaxIdleConns: 100, - EnableHTTPTrace: true, - IdleTimeout: 100 * time.Second, - HTTPTimeout: time.Minute, - GlobalCustomerKeys: []string{"key1", "key2"}, - MaxRetryCount: 8, - RetryDelay: time.Second * 5, - MaxRawBodySize: dataway.MinimalRawBodySize, - ContentEncoding: "v2", - EnableSinker: true, - GZip: true, - } + cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"} + cfg.Dataway.MaxIdleConnsPerHost = 0 + cfg.Dataway.MaxIdleConns = 100 + cfg.Dataway.EnableHTTPTrace = true + cfg.Dataway.IdleTimeout = 100 * time.Second + cfg.Dataway.HTTPTimeout = time.Minute + cfg.Dataway.GlobalCustomerKeys = []string{"key1", "key2"} + cfg.Dataway.MaxRetryCount = 8 + cfg.Dataway.RetryDelay = time.Second * 5 + cfg.Dataway.MaxRawBodySize = dataway.MinimalRawBodySize + cfg.Dataway.ContentEncoding = "v2" + cfg.Dataway.EnableSinker = true + cfg.Dataway.GZip = true return cfg }(), @@ -435,22 +446,20 @@ func TestLoadEnv(t *testing.T) { expect: func() *Config { cfg := DefaultConfig() - cfg.Dataway = &dataway.Dataway{ - URLs: []string{"http://host1.org", "http://host2.com"}, - MaxIdleConnsPerHost: 0, - MaxIdleConns: 100, - EnableHTTPTrace: true, - IdleTimeout: 100 * time.Second, - HTTPTimeout: time.Minute, - GlobalCustomerKeys: []string{"key1", "key2"}, - MaxRetryCount: 8, - RetryDelay: time.Second * 5, - MaxRawBodySize: 1024 * 1024 * 32, - ContentEncoding: "v2", - EnableSinker: true, - GZip: true, - InsecureSkipVerify: true, - } + cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"} + cfg.Dataway.MaxIdleConnsPerHost = 0 + cfg.Dataway.MaxIdleConns = 100 + cfg.Dataway.EnableHTTPTrace = true + cfg.Dataway.IdleTimeout = 100 * time.Second + cfg.Dataway.HTTPTimeout = time.Minute + cfg.Dataway.GlobalCustomerKeys = []string{"key1", "key2"} + cfg.Dataway.MaxRetryCount = 8 + cfg.Dataway.RetryDelay = time.Second * 5 + cfg.Dataway.MaxRawBodySize = 1024 * 1024 * 32 + cfg.Dataway.ContentEncoding = "v2" + cfg.Dataway.EnableSinker = true + cfg.Dataway.GZip = true + cfg.Dataway.InsecureSkipVerify = true return cfg }(), diff --git a/internal/datakit/datakit.go b/internal/datakit/datakit.go index 4b9793a28b..fe4f7b9cdd 100644 --- a/internal/datakit/datakit.go +++ b/internal/datakit/datakit.go @@ -87,6 +87,7 @@ const ( ListDataWay = "/v2/list/dataway" TokenCheck = "/v1/check/token" UsageTrace = "/v1/datakit/usage_trace" + NTPSync = "/v1/ntp" StrGitRepos = "gitrepos" StrPipelineRemote = "pipeline_remote" diff --git a/internal/datakit/dkconf.go b/internal/datakit/dkconf.go index 00f4ddf92a..c372afd853 100644 --- a/internal/datakit/dkconf.go +++ b/internal/datakit/dkconf.go @@ -249,6 +249,15 @@ ulimit = 64000 global_customer_keys = [] enable_sinker = false # disable sinker + # use dataway as NTP server + [dataway.ntp] + interval = "5m" # sync dataway time each 5min + + # if datakit local time and dataway time's ABS value reach the diff, + # datakit's soft time will update to the dataway time. + # NOTE: diff MUST larger than "1s" + diff = "30s" + ################################################ # Datakit logging configure ################################################ diff --git a/internal/export/non_input_docs.go b/internal/export/non_input_docs.go index 207f4bfde4..c5eec5ef23 100644 --- a/internal/export/non_input_docs.go +++ b/internal/export/non_input_docs.go @@ -144,6 +144,21 @@ func envDataway() []*inputs.ENVInfo { Desc: "Enable self-signed TLS certificate on Dataway [:octicons-tag-24: Version-1.29.0](changelog.md#cl-1.29.0)", DescZh: "允许对应的 Dataway 上的证书是自签证书 [:octicons-tag-24: Version-1.29.0](changelog.md#cl-1.29.0)", }, + + // NTP + { + ENVName: "ENV_DATAWAY_NTP_INTERVAL", + Type: doc.String, + Desc: "Set NTP sync interval [:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)", + DescZh: "设置 NTP 时间同步间隔 [:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)", + }, + + { + ENVName: "ENV_DATAWAY_NTP_DIFF", + Type: doc.String, + Desc: "Set NTP sync difference [:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)", + DescZh: "设置 NTP 时间同步的误差[:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)", + }, } for idx := range infos { diff --git a/internal/io/dataway/dw.go b/internal/io/dataway/dw.go index 4f482eeefc..fb432da8f3 100644 --- a/internal/io/dataway/dw.go +++ b/internal/io/dataway/dw.go @@ -60,6 +60,7 @@ var ( datakit.ProfilingUpload, datakit.TokenCheck, datakit.UsageTrace, + datakit.NTPSync, } AvailableDataways = []string{} @@ -78,9 +79,18 @@ func NewDefaultDataway() *Dataway { GZip: true, MaxRetryCount: DefaultRetryCount, RetryDelay: DefaultRetryDelay, + NTP: &ntp{ + Interval: time.Minute * 5, + SyncOnDiff: time.Second * 30, + }, } } +type ntp struct { + Interval time.Duration `toml:"interval"` + SyncOnDiff time.Duration `toml:"diff"` +} + type Dataway struct { URLs []string `toml:"urls"` @@ -125,6 +135,8 @@ type Dataway struct { globalTags map[string]string globalTagsHTTPHeaderValue string + + NTP *ntp `toml:"ntp"` } type dwopt func(*Dataway) diff --git a/internal/io/dataway/dwapis.go b/internal/io/dataway/dwapis.go index 4c40a0603f..24cb180000 100644 --- a/internal/io/dataway/dwapis.go +++ b/internal/io/dataway/dwapis.go @@ -12,6 +12,7 @@ import ( "io" "net/http" "strings" + "time" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" ) @@ -368,3 +369,69 @@ func (dw *Dataway) Pull(args string) ([]byte, error) { return dw.eps[0].datakitPull(args) } + +type ntpResp struct { + TimestampSec int64 `json:"timestamp_sec"` +} + +// TimeDiff implement ntp time sync interface. +func (dw *Dataway) TimeDiff() int64 { + if d, err := dw.doTimeDiff(); err != nil { + log.Errorf("doTimeDiff: %s", err.Error()) + return 0 + } else { + return d + } +} + +func (dw *Dataway) doTimeDiff() (int64, error) { + if len(dw.eps) == 0 { + return 0, fmt.Errorf("no dataway available") + } + + ep := dw.eps[0] + requrl, ok := ep.categoryURL[datakit.NTPSync] + if !ok { + return 0, fmt.Errorf("url %s not available", datakit.NTPSync) + } + + log.Debugf("NewRequest: %s", requrl) + req, err := http.NewRequest(http.MethodGet, requrl, nil) + if err != nil { + return 0, fmt.Errorf("http.NewRequest: %w", err) + } + + // Common HTTP headers appended, such as User-Agent, X-Global-Tags + for k, v := range ep.httpHeaders { + req.Header.Set(k, v) + } + + resp, err := ep.sendReq(req) + if err != nil { + return 0, fmt.Errorf("doSendReq: %w", err) + } + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("io.readAll: %w", err) + } + + defer resp.Body.Close() //nolint:errcheck + switch resp.StatusCode / 100 { + case 2: + log.Debugf("ntp ok") + + var nr ntpResp + + if err := json.Unmarshal(respBody, &nr); err != nil { + log.Errorf("Unmarshal: %s", string(respBody)) + + return 0, fmt.Errorf(`json.Unmarshal: %w`, err) + } + + return nr.TimestampSec - time.Now().Unix(), nil + + default: + return 0, fmt.Errorf("ntp failed(status: %d): %s", resp.StatusCode, string(respBody)) + } +} diff --git a/internal/io/dataway/dwapis_test.go b/internal/io/dataway/dwapis_test.go index 18d5a6aa5a..c707323824 100644 --- a/internal/io/dataway/dwapis_test.go +++ b/internal/io/dataway/dwapis_test.go @@ -7,15 +7,49 @@ package dataway import ( "bytes" + "encoding/json" "fmt" "io" "net/http" "net/http/httptest" T "testing" + "time" "github.com/stretchr/testify/assert" ) +func TestNTP(t *T.T) { + t.Run(`basic`, func(t *T.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.NotEmpty(t, r.URL.Query().Get("token")) + + n := ntpResp{ + TimestampSec: time.Now().Unix(), + } + j, err := json.Marshal(n) + assert.NoError(t, err) + + w.WriteHeader(200) + w.Write(j) + })) + + dw := NewDefaultDataway() + dw.URLs[0] = fmt.Sprintf("%s?token=tkn_xxxxxxxx", ts.URL) + dw.NTP = &ntp{ + Interval: time.Second, + SyncOnDiff: time.Second, + } + + assert.NoError(t, dw.Init()) + + diff, err := dw.doTimeDiff() + + assert.NoErrorf(t, err, "dataway: %+#v", dw) + + assert.Equal(t, int64(0), diff) + }) +} + func TestDWAPIs(t *T.T) { t.Run("apis-with-global-tags", func(t *T.T) { var dw *Dataway diff --git a/internal/io/dataway/endpoint.go b/internal/io/dataway/endpoint.go index 90777879d3..c8878c7225 100644 --- a/internal/io/dataway/endpoint.go +++ b/internal/io/dataway/endpoint.go @@ -183,7 +183,7 @@ func newEndpoint(urlstr string, opts ...endPointOption) (*endPoint, error) { api) } - log.Infof("endpoint regist dataway API %q ok", api) + log.Infof("endpoint regist dataway API %q:%q ok", api, ep.categoryURL[api]) } switch ep.scheme { @@ -524,7 +524,7 @@ func (ep *endPoint) sendReq(req *http.Request) (resp *http.Response, err error) maxRetry = DefaultRetryCount } - log.Debugf("retry %q with delay %s on %d retrying", req.URL.String(), delay, maxRetry) + log.Debugf("retry %q with delay %s on %d retrying", req.URL.Path, delay, maxRetry) if err := retry.Do( func() error { diff --git a/internal/ntp/metrics.go b/internal/ntp/metrics.go new file mode 100644 index 0000000000..c30d690634 --- /dev/null +++ b/internal/ntp/metrics.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2022-present Guance, Inc. + +package ntp + +import ( + "github.com/GuanceCloud/cliutils/metrics" + p8s "github.com/prometheus/client_golang/prometheus" +) + +var ( + ntpSyncCount = p8s.NewCounter( + p8s.CounterOpts{ + Namespace: "datakit", + Subsystem: "ntp", + Name: "sync_total", + Help: "Total count synced with remote NTP server", + }, + ) + + ntpSyncSummary = p8s.NewSummary( + p8s.SummaryOpts{ + Namespace: "datakit", + Subsystem: "ntp", + Name: "time_diff", + Help: "Time difference(seconds) between remote NTP server", + Objectives: map[float64]float64{ + 0.5: .05, + 0.9: .01, + 0.99: .001, + }, + }, + ) +) + +// nolint:gochecknoinits +func init() { + metrics.MustRegister( + ntpSyncCount, + ntpSyncSummary, + ) +} diff --git a/internal/ntp/ntp.go b/internal/ntp/ntp.go new file mode 100644 index 0000000000..ef6988e303 --- /dev/null +++ b/internal/ntp/ntp.go @@ -0,0 +1,74 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2022-present Guance, Inc. + +// Package ntp sync network time. +package ntp + +import ( + "context" + "math" + "sync/atomic" + "time" + + "github.com/GuanceCloud/cliutils/logger" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" +) + +var ( + localTimeSecDiff atomic.Int64 + l = logger.DefaultSLogger("ntp") +) + +type syncer interface { + TimeDiff() int64 +} + +func doSync(diffSec int64, abs uint64) { + if uint64(math.Abs(float64(diffSec))) >= uint64(math.Abs(float64(abs))) { + localTimeSecDiff.Store(diffSec) + ntpSyncCount.Add(1) + l.Infof("update local time diff %s", time.Duration(localTimeSecDiff.Load())*time.Second) + } + + ntpSyncSummary.Observe(float64(diffSec)) +} + +func StartNTP(s syncer, syncInterval time.Duration, diffAbsRangeSecond uint64) { + g := datakit.G("ntp") + l = logger.SLogger("ntp") + + // sync ASAP + doSync(s.TimeDiff(), diffAbsRangeSecond) + + g.Go(func(_ context.Context) error { + tick := time.NewTicker(syncInterval) + defer tick.Stop() + + for { + select { + case <-tick.C: + doSync(s.TimeDiff(), diffAbsRangeSecond) + + case <-datakit.Exit.Wait(): + l.Infof("ntp exit") + return nil + } + } + }) +} + +// NTPTime get synced network time. +func NTPTime() time.Time { + local := time.Now() + + // if ntp time > local time, then localTimeSecDiff > 0, so add the difference. + // if ntp time < local time, localTimeSecDiff < 0, the minus the difference. + return local.Add(time.Duration(localTimeSecDiff.Load()) * time.Second) +} + +// LocalTime get local machine time. +func LocalTime() time.Time { + return time.Now() +} diff --git a/internal/ntp/ntp_test.go b/internal/ntp/ntp_test.go new file mode 100644 index 0000000000..33351ee39a --- /dev/null +++ b/internal/ntp/ntp_test.go @@ -0,0 +1,54 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2022-present Guance, Inc. + +// Package ntp sync network time. +package ntp + +import ( + T "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type mockNSec struct { + n int64 +} + +func (s *mockNSec) TimeDiff() int64 { + return s.n // always with n sec diff +} + +func TestNTPTime(t *T.T) { + t.Run("10s", func(t *T.T) { + m := &mockNSec{ + n: 10, + } + StartNTP(m, time.Minute, 1) + + time.Sleep(time.Second) // wait worker ok + + local := LocalTime() + ntpTime := NTPTime() + + assert.Equalf(t, int64(10), ntpTime.Unix()-local.Unix(), "local: %d, ntp: %d", local.Unix(), ntpTime.Unix()) + t.Logf("local: %d, ntp: %d", local.Unix(), ntpTime.Unix()) + }) + + t.Run("-10s", func(t *T.T) { + m := &mockNSec{ + n: -10, + } + StartNTP(m, time.Minute, 1) + + time.Sleep(time.Second) // wait worker ok + + local := LocalTime() + ntpTime := NTPTime() + + assert.Equalf(t, int64(-10), ntpTime.Unix()-local.Unix(), "local: %d, ntp: %d", local.Unix(), ntpTime.Unix()) + t.Logf("local: %d, ntp: %d", local.Unix(), ntpTime.Unix()) + }) +} diff --git a/internal/plugins/inputs/chrony/input.go b/internal/plugins/inputs/chrony/input.go index d3a400a4fa..827a75837d 100644 --- a/internal/plugins/inputs/chrony/input.go +++ b/internal/plugins/inputs/chrony/input.go @@ -28,6 +28,7 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/getdatassh" dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/metrics" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/ntp" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs" ) @@ -230,7 +231,7 @@ func (ipt *Input) getData() ([]*getdatassh.SSHData, error) { } func (ipt *Input) getPts(data []*getdatassh.SSHData) error { - ts := time.Now() + ts := ntp.NTPTime() opts := point.DefaultMetricOptions() opts = append(opts, point.WithTime(ts))