From 7e815189ad8b05db2e71d78e60973d8ca2df9b8b Mon Sep 17 00:00:00 2001 From: denglijun Date: Thu, 22 Jul 2021 15:56:33 +0800 Subject: [PATCH] [*] fix dialtesting connection bug --- plugins/inputs/dialtesting/input.go | 65 ++++++++++++++++++----------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/plugins/inputs/dialtesting/input.go b/plugins/inputs/dialtesting/input.go index c42d63d8c7..c6f28d8e35 100644 --- a/plugins/inputs/dialtesting/input.go +++ b/plugins/inputs/dialtesting/input.go @@ -127,37 +127,47 @@ func (d *Input) Run() { func (d *Input) doServerTask() { - du, err := time.ParseDuration(d.PullInterval) - if err != nil { - l.Warnf("invalid frequency: %s, use default", d.PullInterval) - du = time.Minute - } - if du > 24*time.Hour || du < time.Minute { - l.Warnf("invalid frequency: %s, use default", d.PullInterval) - du = time.Minute - } + var f rtpanic.RecoverCallback - tick := time.NewTicker(du) + f = func(stack []byte, err error) { - for { - select { - case <-tick.C: - j, err := d.pullTask() - if err != nil { - l.Warnf(`%s,ignore`, err.Error()) - continue - } - l.Debugf(`task: %s %v`, string(j), d.pos) - d.dispatchTasks(j) + defer rtpanic.Recover(f, nil) + + du, err := time.ParseDuration(d.PullInterval) + if err != nil { + l.Warnf("invalid frequency: %s, use default", d.PullInterval) + du = time.Minute + } + if du > 24*time.Hour || du < time.Minute { + l.Warnf("invalid frequency: %s, use default", d.PullInterval) + du = time.Minute + } - case <-datakit.Exit.Wait(): - l.Info("exit") - return + tick := time.NewTicker(du) + defer tick.Stop() - // TODO: 调接口发送每个任务的执行情况,便于中心对任务的管理 + for { + select { + case <-tick.C: + j, err := d.pullTask() + if err != nil { + l.Warnf(`%s,ignore`, err.Error()) + continue + } + l.Debugf(`task: %s %v`, string(j), d.pos) + d.dispatchTasks(j) + + case <-datakit.Exit.Wait(): + l.Info("exit") + return + + // TODO: 调接口发送每个任务的执行情况,便于中心对任务的管理 + } } } + f(nil, nil) + } func (d *Input) doLocalTask(path string) { @@ -416,6 +426,7 @@ func (d *Input) pullHTTPTask(reqURL *url.URL, sinceUs int64) ([]byte, error) { bodymd5 := fmt.Sprintf("%x", md5.Sum([]byte(""))) req.Header.Set("Date", time.Now().Format(http.TimeFormat)) req.Header.Set("Content-MD5", bodymd5) + req.Header.Set("Connection", "close") signReq(req, d.AK, d.SK) resp, err := d.cli.Do(req) @@ -460,8 +471,12 @@ func init() { curTasks: map[string]*dialer{}, wg: sync.WaitGroup{}, cli: &http.Client{ + Timeout: 10 * time.Second, Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, }, }, }