diff --git a/client/client_test.go b/client/client_test.go index d485ab2..f333490 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -5,6 +5,7 @@ import ( rt "github.com/appscode/g2/pkg/runtime" "github.com/appscode/log" + "time" ) const ( @@ -72,7 +73,7 @@ func TestClientDoCron(t *testing.T) { } func TestClientDoAt(t *testing.T) { - handle, err := client.DoAt("scheduledJobTest", 1484160580, []byte("test data")) + handle, err := client.DoAt("scheduledJobTest", time.Now().Add(10*time.Second).Unix(), []byte("test data")) if err != nil { t.Fatal(err) } diff --git a/pkg/server/server.go b/pkg/server/server.go index b8d3749..8c4d1b7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -57,11 +57,10 @@ func (s *Server) loadAllJobs() { log.Error(err) return } - - log.Debugf("%+v", jobs) for _, j := range jobs { j.ProcessBy = 0 //no body handle it now j.CreateBy = 0 //clear + log.Debugf("handle: %v\tfunc: %v\tbg: %v", j.Handle, j.FuncName, j.IsBackGround) s.doAddJob(j) } } @@ -72,11 +71,12 @@ func (s *Server) loadAllCronJobs() { log.Error(err) return } - log.Debugf("load scheduled job: %+v", schedJobs) for _, sj := range schedJobs { if epoch, ok := s.ExpressionToEpoch(sj.Expression); ok { + log.Debugf("handle: %v func: %v schedule: %v", sj.Handle, sj.JobTemplete.FuncName, time.Unix(epoch, 0).Local()) s.doAddEpochJob(sj, epoch) } else { + log.Debugf("handle: %v func: %v expr: %v", sj.Handle, sj.JobTemplete.FuncName, sj.Expression) s.doAddCronJob(sj) } } @@ -92,7 +92,7 @@ func (s *Server) Start(addr string) { log.Debug("listening on", addr) go registerWebHandler(s) - + go s.WatcherLoop() if s.cronSvc != nil { s.cronSvc.Start() } @@ -172,7 +172,7 @@ func (s *Server) doAddJob(j *Job) { func (s *Server) doAddAndPersistJob(j *Job) { // persistent job - log.Debugf("add job %+v", j) + log.Debugf("add job with handle: %v func: %v bg: %v", j.Handle, j.FuncName, j.IsBackGround) if s.store != nil { if err := s.store.AddJob(j); err != nil { log.Warning(err) @@ -271,7 +271,7 @@ func (s *Server) wakeupWorker(funcName string) bool { continue } - log.Debug("wakeup sessionId", w.SessionId) + log.Debug("wakeup sessionId ", w.SessionId) w.Send(wakeupReply) return true @@ -320,7 +320,7 @@ func (s *Server) handleCloseSession(e *event) error { } } if c, ok := s.client[sessionId]; ok { - log.Debug("removeClient sessionId", sessionId) + log.Debug("removeClient with sessionId ", sessionId) delete(s.client, c.SessionId) } e.result <- true //notify close finish @@ -334,7 +334,7 @@ func (s *Server) handleGetWorker(e *event) (err error) { e.result <- string(buf) }() cando := e.args.t0.(string) - log.Debug("get worker", cando) + log.Debug("get worker ", cando) if len(cando) == 0 { workers := make([]*Worker, 0, len(s.worker)) for _, v := range s.worker { @@ -348,9 +348,8 @@ func (s *Server) handleGetWorker(e *event) (err error) { return nil } - log.Debugf("%+v", s.funcWorker) if jw, ok := s.funcWorker[cando]; ok { - log.Debug(cando, jw.workers.Len()) + log.Debugf("func: %v #workers: %v", cando, jw.workers.Len()) workers := make([]*Worker, 0, jw.workers.Len()) for it := jw.workers.Front(); it != nil; it = it.Next() { workers = append(workers, it.Value.(*Worker)) @@ -367,7 +366,7 @@ func (s *Server) handleGetWorker(e *event) (err error) { } func (s *Server) handleGetJob(e *event) (err error) { - log.Debug("get jobs", e.handle) + log.Debug("get jobs ", e.handle) var buf []byte defer func() { e.result <- string(buf) @@ -399,7 +398,7 @@ func (s *Server) handleGetJob(e *event) (err error) { } func (s *Server) handleGetCronJob(e *event) (err error) { - log.Debug("get cronjobs", e.handle) + log.Debug("get cronjobs ", e.handle) var buf []byte defer func() { e.result <- string(buf) @@ -501,7 +500,6 @@ func (s *Server) handleCronJob(e *event) { sj.Handle = allocSchedJobId() e.result <- sj.Handle // persistent Cron Job - log.Debugf("add scheduled job %+v", sj) id := s.doAddCronJob(sj) sj.CronEntryID = int(id) scdT, err := NewCronSchedule(sj.Expression) @@ -514,7 +512,7 @@ func (s *Server) handleCronJob(e *event) { log.Errorln(err) } } - log.Debugf("Scheduled cron job added with function name `%s`, data '%s' and cron SpecScheduleTime - '%+v'\n", string(sj.JobTemplete.FuncName), string(sj.JobTemplete.Data), sj.Expression) + log.Debugf("add cron job with handle: %v func: %v expr: %v", sj.Handle, sj.JobTemplete.FuncName, sj.Expression) } func (s *Server) handleSubmitEpochJob(e *event) { @@ -545,13 +543,13 @@ func (s *Server) handleSubmitEpochJob(e *event) { epoch, _ := s.ExpressionToEpoch(sj.Expression) sj.Next = time.Unix(epoch, 0) // persistent Cron Job - log.Debugf("add scheduled epoch job %+v", sj) s.doAddEpochJob(sj, val) if s.store != nil { if err := s.store.AddCronJob(sj); err != nil { log.Errorln(err) } } + log.Debugf("add epoch job with handle: %v func: %v schedule: %v", sj.Handle, sj.JobTemplete.FuncName, time.Unix(epoch, 0).Local()) } func (s *Server) handleWorkReport(e *event) { @@ -666,7 +664,7 @@ func (s *Server) handleProtoEvt(e *event) { } w.status = wsSleep - log.Debugf("worker sessionId %d sleep", sessionId) + log.Debugf("worker with sessionId %d sleep", sessionId) //check if there are any jobs for this worker for k := range w.canDo { if s.wakeupWorker(k) { @@ -731,6 +729,46 @@ func (s *Server) EvtLoop() { } } +func (s *Server) WatcherLoop() { + ticker := time.NewTicker(10 * time.Minute) + for { + select { + case <-ticker.C: + cjs, err := s.store.GetCronJobs() + if err != nil { + log.Error(err) + continue + } + rep := 0 + one := 0 + for _, cj := range cjs { + if _, isOne := s.ExpressionToEpoch(cj.Expression); isOne { + one++ + } else { + rep++ + } + } + log.Infof("total cron job: %v #repeated job: %v #onetime job: %v", len(cjs), rep, one) + js, err := s.store.GetJobs() + if err != nil { + log.Error(err) + continue + } + var b, r int = 0, 0 + for _, j := range js { + if j.IsBackGround { + b++ + } + if j.Running { + r++ + } + } + log.Infof("total job: %v #background: %v #running: %v", len(js), b, r) + } + } + +} + func (s *Server) allocSessionId() int64 { return atomic.AddInt64(&s.startSessionId, 1) } @@ -738,7 +776,7 @@ func (s *Server) allocSessionId() int64 { func (s *Server) DeleteCronJob(cj *CronJob) error { sj, err := s.store.DeleteCronJob(cj) if err == lberror.ErrNotFound { - log.Errorf("handle `%v` not found\n", cj.Handle) + log.Errorf("handle `%v` not found", cj.Handle) return fmt.Errorf("handle `%v` not found", cj.Handle) } if err != nil { @@ -746,7 +784,7 @@ func (s *Server) DeleteCronJob(cj *CronJob) error { return err } s.cronSvc.Remove(cron.EntryID(sj.CronEntryID)) - log.Debugf("job `%v` successfully cancelled.\n", cj.Handle) + log.Debugf("job `%v` successfully cancelled.", cj.Handle) return nil } diff --git a/pkg/server/session.go b/pkg/server/session.go index 4b4cae4..c8daeda 100644 --- a/pkg/server/session.go +++ b/pkg/server/session.go @@ -43,8 +43,7 @@ func (se *session) handleConnection(s *Server, conn net.Conn) { close(inbox) //notify writer to quit } }() - - log.Debug("new sessionId", sessionId, "address:", conn.RemoteAddr()) + log.Debugf("new session with sessionId %v and address: %v", sessionId, conn.RemoteAddr()) go queueingWriter(inbox, out) go writer(conn, out) @@ -70,16 +69,16 @@ func (se *session) handleBinaryConnection(s *Server, conn net.Conn, r *bufio.Rea for { tp, buf, err := ReadMessage(r) if err != nil { - log.Debug(err, "sessionId", sessionId) + log.Debugf("%v with sessionId %v", err, sessionId) return } args, ok := decodeArgs(tp, buf) if !ok { - log.Debug("tp:", tp.String(), "argc not match", "details:", string(buf)) + log.Debugf("pt: %v argc not match details: %v", tp.String(), string(buf)) return } - log.Debug("sessionId", sessionId, "tp:", tp.String(), "len(args):", len(args), "details:", string(buf)) + log.Debugf("sessionId: %v pt: %v len(args): %v details: %v", sessionId, tp.String(), len(args), string(buf)) switch tp { case PT_CanDo, PT_CanDoTimeout: //todo: CAN_DO_TIMEOUT timeout support @@ -107,7 +106,7 @@ func (se *session) handleBinaryConnection(s *Server, conn net.Conn, r *bufio.Rea s.protoEvtCh <- e job := (<-e.result).(*Job) if job == nil { - log.Debug("sessionId", sessionId, "no job") + log.Debugf("sessionId %v has no job", sessionId) sendReplyResult(inbox, nojobReply) break }