diff --git a/bulk.go b/bulk.go index 251a191..d2f4455 100644 --- a/bulk.go +++ b/bulk.go @@ -36,8 +36,19 @@ func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.Wai docBuf := bytes.Buffer{} docEnc := json.NewEncoder(&docBuf) + idleDuration := 5 * time.Second + idleTimeout := time.NewTimer(idleDuration) + defer idleTimeout.Stop() + + taskTimeOutDuration := 5 * time.Minute + taskTimeout := time.NewTimer(taskTimeOutDuration) + defer taskTimeout.Stop() + + READ_DOCS: for { + idleTimeout.Reset(idleDuration) + taskTimeout.Reset(taskTimeOutDuration) select { case docI, open := <-c.DocChan: var err error @@ -138,10 +149,10 @@ READ_DOCS: bulkItemSize++ docBuf.Reset() (*docCount)++ - case <-time.After(time.Second * 5): + case <-idleTimeout.C: log.Debug("5s no message input") goto CLEAN_BUFFER - case <-time.After(time.Minute * 5): + case <-taskTimeout.C: log.Warn("5m no message input, close worker") goto WORKER_DONE } diff --git a/http.go b/http.go index dd1daf8..838d0ec 100644 --- a/http.go +++ b/http.go @@ -284,9 +284,12 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)( } func DecodeJson(jsonStream string, o interface{})(error) { + + decoder := json.NewDecoder(strings.NewReader(jsonStream)) // UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64. decoder.UseNumber() + decoder. if err := decoder.Decode(o); err != nil { fmt.Println("error:", err) diff --git a/main.go b/main.go index a56616d..527bda3 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "github.com/mattn/go-isatty" "io" "io/ioutil" + "net/http" _ "net/http/pprof" "os" "runtime" @@ -23,21 +24,21 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) - //go func() { - // //log.Infof("pprof listen at: http://%s/debug/pprof/", app.httpprof) - // mux := http.NewServeMux() - // - // // register pprof handler - // mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { - // http.DefaultServeMux.ServeHTTP(w, r) - // }) - // - // // register metrics handler - // //mux.HandleFunc("/debug/vars", app.metricsHandler) - // - // endpoint := http.ListenAndServe("0.0.0.0:6060", mux) - // log.Debug("stop pprof server: %v", endpoint) - //}() + go func() { + //log.Infof("pprof listen at: http://%s/debug/pprof/", app.httpprof) + mux := http.NewServeMux() + + // register pprof handler + mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { + http.DefaultServeMux.ServeHTTP(w, r) + }) + + // register metrics handler + //mux.HandleFunc("/debug/vars", app.metricsHandler) + + endpoint := http.ListenAndServe("0.0.0.0:6060", mux) + log.Debug("stop pprof server: %v", endpoint) + }() var err error c := &Config{} @@ -298,9 +299,12 @@ func main() { } // wait for cluster state to be okay before moving - timer := time.NewTimer(time.Second * 3) - + idleDuration := 3 * time.Second + timer := time.NewTimer(idleDuration) + defer timer.Stop() for { + timer.Reset(idleDuration) + if len(c.SourceEs) > 0 { if status, ready := migrator.ClusterReady(migrator.SourceESAPI); !ready { log.Infof("%s at %s is %s, delaying migration ", status.Name, c.SourceEs, status.Status) @@ -316,7 +320,6 @@ func main() { continue } } - timer.Stop() break }