Skip to content
This repository was archived by the owner on Jan 23, 2019. It is now read-only.

Beautify log #8

Merged
merged 3 commits into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

rt "github.com/appscode/g2/pkg/runtime"
"github.com/appscode/log"
"time"
)

const (
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestClientDoCron(t *testing.T) {
}

func TestClientDoAt(t *testing.T) {
handle, err := client.DoAt("scheduledJobTest", 1484160580, []byte("test data"))
handle, err := client.DoAt("scheduledJobTest", time.Now().Add(10*time.Second).Unix(), []byte("test data"))
if err != nil {
t.Fatal(err)
}
Expand Down
74 changes: 56 additions & 18 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func (s *Server) loadAllJobs() {
log.Error(err)
return
}

log.Debugf("%+v", jobs)
for _, j := range jobs {
j.ProcessBy = 0 //no body handle it now
j.CreateBy = 0 //clear
log.Debugf("handle: %v\tfunc: %v\tbg: %v", j.Handle, j.FuncName, j.IsBackGround)
s.doAddJob(j)
}
}
Expand All @@ -72,11 +71,12 @@ func (s *Server) loadAllCronJobs() {
log.Error(err)
return
}
log.Debugf("load scheduled job: %+v", schedJobs)
for _, sj := range schedJobs {
if epoch, ok := s.ExpressionToEpoch(sj.Expression); ok {
log.Debugf("handle: %v func: %v schedule: %v", sj.Handle, sj.JobTemplete.FuncName, time.Unix(epoch, 0).Local())
s.doAddEpochJob(sj, epoch)
} else {
log.Debugf("handle: %v func: %v expr: %v", sj.Handle, sj.JobTemplete.FuncName, sj.Expression)
s.doAddCronJob(sj)
}
}
Expand All @@ -92,7 +92,7 @@ func (s *Server) Start(addr string) {
log.Debug("listening on", addr)

go registerWebHandler(s)

go s.WatcherLoop()
if s.cronSvc != nil {
s.cronSvc.Start()
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *Server) doAddJob(j *Job) {

func (s *Server) doAddAndPersistJob(j *Job) {
// persistent job
log.Debugf("add job %+v", j)
log.Debugf("add job with handle: %v func: %v bg: %v", j.Handle, j.FuncName, j.IsBackGround)
if s.store != nil {
if err := s.store.AddJob(j); err != nil {
log.Warning(err)
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *Server) wakeupWorker(funcName string) bool {
continue
}

log.Debug("wakeup sessionId", w.SessionId)
log.Debug("wakeup sessionId ", w.SessionId)

w.Send(wakeupReply)
return true
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *Server) handleCloseSession(e *event) error {
}
}
if c, ok := s.client[sessionId]; ok {
log.Debug("removeClient sessionId", sessionId)
log.Debug("removeClient with sessionId ", sessionId)
delete(s.client, c.SessionId)
}
e.result <- true //notify close finish
Expand All @@ -334,7 +334,7 @@ func (s *Server) handleGetWorker(e *event) (err error) {
e.result <- string(buf)
}()
cando := e.args.t0.(string)
log.Debug("get worker", cando)
log.Debug("get worker ", cando)
if len(cando) == 0 {
workers := make([]*Worker, 0, len(s.worker))
for _, v := range s.worker {
Expand All @@ -348,9 +348,8 @@ func (s *Server) handleGetWorker(e *event) (err error) {
return nil
}

log.Debugf("%+v", s.funcWorker)
if jw, ok := s.funcWorker[cando]; ok {
log.Debug(cando, jw.workers.Len())
log.Debugf("func: %v #workers: %v", cando, jw.workers.Len())
workers := make([]*Worker, 0, jw.workers.Len())
for it := jw.workers.Front(); it != nil; it = it.Next() {
workers = append(workers, it.Value.(*Worker))
Expand All @@ -367,7 +366,7 @@ func (s *Server) handleGetWorker(e *event) (err error) {
}

func (s *Server) handleGetJob(e *event) (err error) {
log.Debug("get jobs", e.handle)
log.Debug("get jobs ", e.handle)
var buf []byte
defer func() {
e.result <- string(buf)
Expand Down Expand Up @@ -399,7 +398,7 @@ func (s *Server) handleGetJob(e *event) (err error) {
}

func (s *Server) handleGetCronJob(e *event) (err error) {
log.Debug("get cronjobs", e.handle)
log.Debug("get cronjobs ", e.handle)
var buf []byte
defer func() {
e.result <- string(buf)
Expand Down Expand Up @@ -501,7 +500,6 @@ func (s *Server) handleCronJob(e *event) {
sj.Handle = allocSchedJobId()
e.result <- sj.Handle
// persistent Cron Job
log.Debugf("add scheduled job %+v", sj)
id := s.doAddCronJob(sj)
sj.CronEntryID = int(id)
scdT, err := NewCronSchedule(sj.Expression)
Expand All @@ -514,7 +512,7 @@ func (s *Server) handleCronJob(e *event) {
log.Errorln(err)
}
}
log.Debugf("Scheduled cron job added with function name `%s`, data '%s' and cron SpecScheduleTime - '%+v'\n", string(sj.JobTemplete.FuncName), string(sj.JobTemplete.Data), sj.Expression)
log.Debugf("add cron job with handle: %v func: %v expr: %v", sj.Handle, sj.JobTemplete.FuncName, sj.Expression)
}

func (s *Server) handleSubmitEpochJob(e *event) {
Expand Down Expand Up @@ -545,13 +543,13 @@ func (s *Server) handleSubmitEpochJob(e *event) {
epoch, _ := s.ExpressionToEpoch(sj.Expression)
sj.Next = time.Unix(epoch, 0)
// persistent Cron Job
log.Debugf("add scheduled epoch job %+v", sj)
s.doAddEpochJob(sj, val)
if s.store != nil {
if err := s.store.AddCronJob(sj); err != nil {
log.Errorln(err)
}
}
log.Debugf("add epoch job with handle: %v func: %v schedule: %v", sj.Handle, sj.JobTemplete.FuncName, time.Unix(epoch, 0).Local())
}

func (s *Server) handleWorkReport(e *event) {
Expand Down Expand Up @@ -666,7 +664,7 @@ func (s *Server) handleProtoEvt(e *event) {
}

w.status = wsSleep
log.Debugf("worker sessionId %d sleep", sessionId)
log.Debugf("worker with sessionId %d sleep", sessionId)
//check if there are any jobs for this worker
for k := range w.canDo {
if s.wakeupWorker(k) {
Expand Down Expand Up @@ -731,22 +729,62 @@ func (s *Server) EvtLoop() {
}
}

func (s *Server) WatcherLoop() {
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ticker.C:
cjs, err := s.store.GetCronJobs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop DB access. read from cron status or gearman status.

if err != nil {
log.Error(err)
continue
}
rep := 0
one := 0
for _, cj := range cjs {
if _, isOne := s.ExpressionToEpoch(cj.Expression); isOne {
one++
} else {
rep++
}
}
log.Infof("total cron job: %v #repeated job: %v #onetime job: %v", len(cjs), rep, one)
js, err := s.store.GetJobs()
if err != nil {
log.Error(err)
continue
}
var b, r int = 0, 0
for _, j := range js {
if j.IsBackGround {
b++
}
if j.Running {
r++
}
}
log.Infof("total job: %v #background: %v #running: %v", len(js), b, r)
}
}

}

func (s *Server) allocSessionId() int64 {
return atomic.AddInt64(&s.startSessionId, 1)
}

func (s *Server) DeleteCronJob(cj *CronJob) error {
sj, err := s.store.DeleteCronJob(cj)
if err == lberror.ErrNotFound {
log.Errorf("handle `%v` not found\n", cj.Handle)
log.Errorf("handle `%v` not found", cj.Handle)
return fmt.Errorf("handle `%v` not found", cj.Handle)
}
if err != nil {
log.Errorln(err)
return err
}
s.cronSvc.Remove(cron.EntryID(sj.CronEntryID))
log.Debugf("job `%v` successfully cancelled.\n", cj.Handle)
log.Debugf("job `%v` successfully cancelled.", cj.Handle)
return nil
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (se *session) handleConnection(s *Server, conn net.Conn) {
close(inbox) //notify writer to quit
}
}()

log.Debug("new sessionId", sessionId, "address:", conn.RemoteAddr())
log.Debugf("new session with sessionId %v and address: %v", sessionId, conn.RemoteAddr())

go queueingWriter(inbox, out)
go writer(conn, out)
Expand All @@ -70,16 +69,16 @@ func (se *session) handleBinaryConnection(s *Server, conn net.Conn, r *bufio.Rea
for {
tp, buf, err := ReadMessage(r)
if err != nil {
log.Debug(err, "sessionId", sessionId)
log.Debugf("%v with sessionId %v", err, sessionId)
return
}
args, ok := decodeArgs(tp, buf)
if !ok {
log.Debug("tp:", tp.String(), "argc not match", "details:", string(buf))
log.Debugf("pt: %v argc not match details: %v", tp.String(), string(buf))
return
}

log.Debug("sessionId", sessionId, "tp:", tp.String(), "len(args):", len(args), "details:", string(buf))
log.Debugf("sessionId: %v pt: %v len(args): %v details: %v", sessionId, tp.String(), len(args), string(buf))

switch tp {
case PT_CanDo, PT_CanDoTimeout: //todo: CAN_DO_TIMEOUT timeout support
Expand Down Expand Up @@ -107,7 +106,7 @@ func (se *session) handleBinaryConnection(s *Server, conn net.Conn, r *bufio.Rea
s.protoEvtCh <- e
job := (<-e.result).(*Job)
if job == nil {
log.Debug("sessionId", sessionId, "no job")
log.Debugf("sessionId %v has no job", sessionId)
sendReplyResult(inbox, nojobReply)
break
}
Expand Down