Skip to content

Commit

Permalink
[*] fix dialtesting connection bug
Browse files Browse the repository at this point in the history
  • Loading branch information
denglijun committed Jul 22, 2021
1 parent d7e332b commit 7e81518
Showing 1 changed file with 40 additions and 25 deletions.
65 changes: 40 additions & 25 deletions plugins/inputs/dialtesting/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,37 +127,47 @@ func (d *Input) Run() {

func (d *Input) doServerTask() {

du, err := time.ParseDuration(d.PullInterval)
if err != nil {
l.Warnf("invalid frequency: %s, use default", d.PullInterval)
du = time.Minute
}
if du > 24*time.Hour || du < time.Minute {
l.Warnf("invalid frequency: %s, use default", d.PullInterval)
du = time.Minute
}
var f rtpanic.RecoverCallback

tick := time.NewTicker(du)
f = func(stack []byte, err error) {

for {
select {
case <-tick.C:
j, err := d.pullTask()
if err != nil {
l.Warnf(`%s,ignore`, err.Error())
continue
}
l.Debugf(`task: %s %v`, string(j), d.pos)
d.dispatchTasks(j)
defer rtpanic.Recover(f, nil)

du, err := time.ParseDuration(d.PullInterval)
if err != nil {
l.Warnf("invalid frequency: %s, use default", d.PullInterval)
du = time.Minute
}
if du > 24*time.Hour || du < time.Minute {
l.Warnf("invalid frequency: %s, use default", d.PullInterval)
du = time.Minute
}

case <-datakit.Exit.Wait():
l.Info("exit")
return
tick := time.NewTicker(du)
defer tick.Stop()

// TODO: 调接口发送每个任务的执行情况,便于中心对任务的管理
for {
select {
case <-tick.C:
j, err := d.pullTask()
if err != nil {
l.Warnf(`%s,ignore`, err.Error())
continue
}
l.Debugf(`task: %s %v`, string(j), d.pos)
d.dispatchTasks(j)

case <-datakit.Exit.Wait():
l.Info("exit")
return

// TODO: 调接口发送每个任务的执行情况,便于中心对任务的管理
}
}
}

f(nil, nil)

}

func (d *Input) doLocalTask(path string) {
Expand Down Expand Up @@ -416,6 +426,7 @@ func (d *Input) pullHTTPTask(reqURL *url.URL, sinceUs int64) ([]byte, error) {
bodymd5 := fmt.Sprintf("%x", md5.Sum([]byte("")))
req.Header.Set("Date", time.Now().Format(http.TimeFormat))
req.Header.Set("Content-MD5", bodymd5)
req.Header.Set("Connection", "close")
signReq(req, d.AK, d.SK)

resp, err := d.cli.Do(req)
Expand Down Expand Up @@ -460,8 +471,12 @@ func init() {
curTasks: map[string]*dialer{},
wg: sync.WaitGroup{},
cli: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
},
},
}
Expand Down

0 comments on commit 7e81518

Please sign in to comment.