Skip to content

Commit

Permalink
feat: add NTP sync on dataway
Browse files Browse the repository at this point in the history
  • Loading branch information
谭彪 committed Sep 19, 2024
1 parent cadffe5 commit bdb34e1
Show file tree
Hide file tree
Showing 14 changed files with 410 additions and 64 deletions.
8 changes: 8 additions & 0 deletions cmd/datakit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/httpapi"
dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/metrics"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/ntp"
plRemote "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/pipeline/remote"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/all"
Expand Down Expand Up @@ -285,6 +286,13 @@ func doRun() error {

startIO()

// start NTP syncer on dataway.
if n := config.Cfg.Dataway.NTP; n != nil {
ntp.StartNTP(config.Cfg.Dataway,
n.Interval,
uint64(n.SyncOnDiff/time.Second))
}

checkutil.CheckConditionExit(func() bool {
if err := dnswatcher.StartWatch(); err != nil {
return false
Expand Down
18 changes: 18 additions & 0 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,24 @@ func (c *Config) loadDatawayEnvs() {
c.Dataway.GlobalCustomerKeys = dataway.ParseGlobalCustomerKeys(v)
l.Infof("set global custom keys to %v", c.Dataway.GlobalCustomerKeys)
}

if c.Dataway.NTP != nil {
if v := datakit.GetEnv("ENV_DATAWAY_NTP_INTERVAL"); v != "" {
if du, err := time.ParseDuration(v); err == nil {
c.Dataway.NTP.Interval = du
} else {
l.Warnf("invalid ENV_DATAWAY_NTP_INTERVAL: %q: %s, ignored", v, err.Error())
}
}

if v := datakit.GetEnv("ENV_DATAWAY_NTP_DIFF"); v != "" {
if du, err := time.ParseDuration(v); err == nil {
c.Dataway.NTP.SyncOnDiff = du
} else {
l.Warnf("invalid ENV_DATAWAY_NTP_DIFF: %q: %s, ignored", v, err.Error())
}
}
}
}

func (c *Config) loadElectionEnvs() {
Expand Down
131 changes: 70 additions & 61 deletions internal/config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ func TestLoadEnv(t *testing.T) {
envs map[string]string
expect *Config
}{
{
name: "test-dataway-ntp",
envs: map[string]string{
"ENV_DATAWAY_NTP_INTERVAL": "5m",
"ENV_DATAWAY_NTP_DIFF": "30s",
},

expect: func() *Config {
cfg := DefaultConfig()
cfg.Dataway.NTP.Interval = 5 * time.Minute
cfg.Dataway.NTP.SyncOnDiff = 30 * time.Second

return cfg
}(),
},
{
name: "test-recorder-envs",
envs: map[string]string{
Expand Down Expand Up @@ -115,21 +130,21 @@ func TestLoadEnv(t *testing.T) {
cfg := DefaultConfig()

cfg.Name = "testing-datakit"
cfg.Dataway = &dataway.Dataway{
URLs: []string{"http://host1.org", "http://host2.com"},
MaxIdleConnsPerHost: 123,
HTTPProxy: "http://1.2.3.4:1234",
EnableHTTPTrace: true,
IdleTimeout: 90 * time.Second,
HTTPTimeout: 30 * time.Second,
ContentEncoding: "v2",
MaxRetryCount: dataway.DefaultRetryCount,
InsecureSkipVerify: true,
RetryDelay: dataway.DefaultRetryDelay,
MaxRawBodySize: dataway.DefaultMaxRawBodySize,
GlobalCustomerKeys: []string{},
GZip: true,
}

cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"}

cfg.Dataway.MaxIdleConnsPerHost = 123
cfg.Dataway.HTTPProxy = "http://1.2.3.4:1234"
cfg.Dataway.EnableHTTPTrace = true
cfg.Dataway.IdleTimeout = 90 * time.Second
cfg.Dataway.HTTPTimeout = 30 * time.Second
cfg.Dataway.ContentEncoding = "v2"
cfg.Dataway.MaxRetryCount = dataway.DefaultRetryCount
cfg.Dataway.InsecureSkipVerify = true
cfg.Dataway.RetryDelay = dataway.DefaultRetryDelay
cfg.Dataway.MaxRawBodySize = dataway.DefaultMaxRawBodySize
cfg.Dataway.GlobalCustomerKeys = []string{}
cfg.Dataway.GZip = true

cfg.HTTPAPI.RUMOriginIPHeader = "not-set"
cfg.HTTPAPI.Listen = "localhost:9559"
Expand Down Expand Up @@ -352,21 +367,19 @@ func TestLoadEnv(t *testing.T) {

cfg.ProtectMode = false

cfg.Dataway = &dataway.Dataway{
URLs: []string{"http://host1.org", "http://host2.com"},
MaxIdleConnsPerHost: 0,
MaxIdleConns: 100,
EnableHTTPTrace: true,
IdleTimeout: 100 * time.Second,
HTTPTimeout: time.Minute,
GlobalCustomerKeys: []string{"key1", "key2"},
MaxRetryCount: 8,
RetryDelay: time.Second * 5,
MaxRawBodySize: 1024 * 32,
ContentEncoding: "v2",
EnableSinker: true,
GZip: true,
}
cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"}
cfg.Dataway.MaxIdleConnsPerHost = 0
cfg.Dataway.MaxIdleConns = 100
cfg.Dataway.EnableHTTPTrace = true
cfg.Dataway.IdleTimeout = 100 * time.Second
cfg.Dataway.HTTPTimeout = time.Minute
cfg.Dataway.GlobalCustomerKeys = []string{"key1", "key2"}
cfg.Dataway.MaxRetryCount = 8
cfg.Dataway.RetryDelay = time.Second * 5
cfg.Dataway.MaxRawBodySize = 1024 * 32
cfg.Dataway.ContentEncoding = "v2"
cfg.Dataway.EnableSinker = true
cfg.Dataway.GZip = true

return cfg
}(),
Expand Down Expand Up @@ -394,21 +407,19 @@ func TestLoadEnv(t *testing.T) {
expect: func() *Config {
cfg := DefaultConfig()

cfg.Dataway = &dataway.Dataway{
URLs: []string{"http://host1.org", "http://host2.com"},
MaxIdleConnsPerHost: 0,
MaxIdleConns: 100,
EnableHTTPTrace: true,
IdleTimeout: 100 * time.Second,
HTTPTimeout: time.Minute,
GlobalCustomerKeys: []string{"key1", "key2"},
MaxRetryCount: 8,
RetryDelay: time.Second * 5,
MaxRawBodySize: dataway.MinimalRawBodySize,
ContentEncoding: "v2",
EnableSinker: true,
GZip: true,
}
cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"}
cfg.Dataway.MaxIdleConnsPerHost = 0
cfg.Dataway.MaxIdleConns = 100
cfg.Dataway.EnableHTTPTrace = true
cfg.Dataway.IdleTimeout = 100 * time.Second
cfg.Dataway.HTTPTimeout = time.Minute
cfg.Dataway.GlobalCustomerKeys = []string{"key1", "key2"}
cfg.Dataway.MaxRetryCount = 8
cfg.Dataway.RetryDelay = time.Second * 5
cfg.Dataway.MaxRawBodySize = dataway.MinimalRawBodySize
cfg.Dataway.ContentEncoding = "v2"
cfg.Dataway.EnableSinker = true
cfg.Dataway.GZip = true

return cfg
}(),
Expand All @@ -435,22 +446,20 @@ func TestLoadEnv(t *testing.T) {
expect: func() *Config {
cfg := DefaultConfig()

cfg.Dataway = &dataway.Dataway{
URLs: []string{"http://host1.org", "http://host2.com"},
MaxIdleConnsPerHost: 0,
MaxIdleConns: 100,
EnableHTTPTrace: true,
IdleTimeout: 100 * time.Second,
HTTPTimeout: time.Minute,
GlobalCustomerKeys: []string{"key1", "key2"},
MaxRetryCount: 8,
RetryDelay: time.Second * 5,
MaxRawBodySize: 1024 * 1024 * 32,
ContentEncoding: "v2",
EnableSinker: true,
GZip: true,
InsecureSkipVerify: true,
}
cfg.Dataway.URLs = []string{"http://host1.org", "http://host2.com"}
cfg.Dataway.MaxIdleConnsPerHost = 0
cfg.Dataway.MaxIdleConns = 100
cfg.Dataway.EnableHTTPTrace = true
cfg.Dataway.IdleTimeout = 100 * time.Second
cfg.Dataway.HTTPTimeout = time.Minute
cfg.Dataway.GlobalCustomerKeys = []string{"key1", "key2"}
cfg.Dataway.MaxRetryCount = 8
cfg.Dataway.RetryDelay = time.Second * 5
cfg.Dataway.MaxRawBodySize = 1024 * 1024 * 32
cfg.Dataway.ContentEncoding = "v2"
cfg.Dataway.EnableSinker = true
cfg.Dataway.GZip = true
cfg.Dataway.InsecureSkipVerify = true

return cfg
}(),
Expand Down
1 change: 1 addition & 0 deletions internal/datakit/datakit.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const (
ListDataWay = "/v2/list/dataway"
TokenCheck = "/v1/check/token"
UsageTrace = "/v1/datakit/usage_trace"
NTPSync = "/v1/ntp"

StrGitRepos = "gitrepos"
StrPipelineRemote = "pipeline_remote"
Expand Down
9 changes: 9 additions & 0 deletions internal/datakit/dkconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ ulimit = 64000
global_customer_keys = []
enable_sinker = false # disable sinker
# use dataway as NTP server
[dataway.ntp]
interval = "5m" # sync dataway time each 5min
# if datakit local time and dataway time's ABS value reach the diff,
# datakit's soft time will update to the dataway time.
# NOTE: diff MUST larger than "1s"
diff = "30s"
################################################
# Datakit logging configure
################################################
Expand Down
15 changes: 15 additions & 0 deletions internal/export/non_input_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ func envDataway() []*inputs.ENVInfo {
Desc: "Enable self-signed TLS certificate on Dataway [:octicons-tag-24: Version-1.29.0](changelog.md#cl-1.29.0)",
DescZh: "允许对应的 Dataway 上的证书是自签证书 [:octicons-tag-24: Version-1.29.0](changelog.md#cl-1.29.0)",
},

// NTP
{
ENVName: "ENV_DATAWAY_NTP_INTERVAL",
Type: doc.String,
Desc: "Set NTP sync interval [:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)",
DescZh: "设置 NTP 时间同步间隔 [:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)",
},

{
ENVName: "ENV_DATAWAY_NTP_DIFF",
Type: doc.String,
Desc: "Set NTP sync difference [:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)",
DescZh: "设置 NTP 时间同步的误差[:octicons-tag-24: Version-1.39.0](changelog.md#cl-1.39.0)",
},
}

for idx := range infos {
Expand Down
12 changes: 12 additions & 0 deletions internal/io/dataway/dw.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
datakit.ProfilingUpload,
datakit.TokenCheck,
datakit.UsageTrace,
datakit.NTPSync,
}

AvailableDataways = []string{}
Expand All @@ -78,9 +79,18 @@ func NewDefaultDataway() *Dataway {
GZip: true,
MaxRetryCount: DefaultRetryCount,
RetryDelay: DefaultRetryDelay,
NTP: &ntp{
Interval: time.Minute * 5,
SyncOnDiff: time.Second * 30,
},
}
}

type ntp struct {
Interval time.Duration `toml:"interval"`
SyncOnDiff time.Duration `toml:"diff"`
}

type Dataway struct {
URLs []string `toml:"urls"`

Expand Down Expand Up @@ -125,6 +135,8 @@ type Dataway struct {

globalTags map[string]string
globalTagsHTTPHeaderValue string

NTP *ntp `toml:"ntp"`
}

type dwopt func(*Dataway)
Expand Down
67 changes: 67 additions & 0 deletions internal/io/dataway/dwapis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"net/http"
"strings"
"time"

"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit"
)
Expand Down Expand Up @@ -368,3 +369,69 @@ func (dw *Dataway) Pull(args string) ([]byte, error) {

return dw.eps[0].datakitPull(args)
}

type ntpResp struct {
TimestampSec int64 `json:"timestamp_sec"`
}

// TimeDiff implement ntp time sync interface.
func (dw *Dataway) TimeDiff() int64 {
if d, err := dw.doTimeDiff(); err != nil {
log.Errorf("doTimeDiff: %s", err.Error())
return 0
} else {
return d
}
}

func (dw *Dataway) doTimeDiff() (int64, error) {
if len(dw.eps) == 0 {
return 0, fmt.Errorf("no dataway available")
}

ep := dw.eps[0]
requrl, ok := ep.categoryURL[datakit.NTPSync]
if !ok {
return 0, fmt.Errorf("url %s not available", datakit.NTPSync)
}

log.Debugf("NewRequest: %s", requrl)
req, err := http.NewRequest(http.MethodGet, requrl, nil)
if err != nil {
return 0, fmt.Errorf("http.NewRequest: %w", err)
}

// Common HTTP headers appended, such as User-Agent, X-Global-Tags
for k, v := range ep.httpHeaders {
req.Header.Set(k, v)
}

resp, err := ep.sendReq(req)
if err != nil {
return 0, fmt.Errorf("doSendReq: %w", err)
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("io.readAll: %w", err)
}

defer resp.Body.Close() //nolint:errcheck
switch resp.StatusCode / 100 {
case 2:
log.Debugf("ntp ok")

var nr ntpResp

if err := json.Unmarshal(respBody, &nr); err != nil {
log.Errorf("Unmarshal: %s", string(respBody))

return 0, fmt.Errorf(`json.Unmarshal: %w`, err)
}

return nr.TimestampSec - time.Now().Unix(), nil

default:
return 0, fmt.Errorf("ntp failed(status: %d): %s", resp.StatusCode, string(respBody))
}
}
Loading

0 comments on commit bdb34e1

Please sign in to comment.