Skip to content
This repository was archived by the owner on Jan 23, 2019. It is now read-only.

Commit

Permalink
Beautify log (#8)
Browse files Browse the repository at this point in the history
* Beautify log

* fix - delete unnecessary log
  • Loading branch information
ashiquzzaman33 authored and sadlil committed Jan 25, 2017
1 parent f1bf729 commit 9b1e646
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 25 deletions.
3 changes: 2 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

rt "github.com/appscode/g2/pkg/runtime"
"github.com/appscode/log"
"time"
)

const (
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestClientDoCron(t *testing.T) {
}

func TestClientDoAt(t *testing.T) {
handle, err := client.DoAt("scheduledJobTest", 1484160580, []byte("test data"))
handle, err := client.DoAt("scheduledJobTest", time.Now().Add(10*time.Second).Unix(), []byte("test data"))
if err != nil {
t.Fatal(err)
}
Expand Down
74 changes: 56 additions & 18 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func (s *Server) loadAllJobs() {
log.Error(err)
return
}

log.Debugf("%+v", jobs)
for _, j := range jobs {
j.ProcessBy = 0 //no body handle it now
j.CreateBy = 0 //clear
log.Debugf("handle: %v\tfunc: %v\tbg: %v", j.Handle, j.FuncName, j.IsBackGround)
s.doAddJob(j)
}
}
Expand All @@ -72,11 +71,12 @@ func (s *Server) loadAllCronJobs() {
log.Error(err)
return
}
log.Debugf("load scheduled job: %+v", schedJobs)
for _, sj := range schedJobs {
if epoch, ok := s.ExpressionToEpoch(sj.Expression); ok {
log.Debugf("handle: %v func: %v schedule: %v", sj.Handle, sj.JobTemplete.FuncName, time.Unix(epoch, 0).Local())
s.doAddEpochJob(sj, epoch)
} else {
log.Debugf("handle: %v func: %v expr: %v", sj.Handle, sj.JobTemplete.FuncName, sj.Expression)
s.doAddCronJob(sj)
}
}
Expand All @@ -92,7 +92,7 @@ func (s *Server) Start(addr string) {
log.Debug("listening on", addr)

go registerWebHandler(s)

go s.WatcherLoop()
if s.cronSvc != nil {
s.cronSvc.Start()
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *Server) doAddJob(j *Job) {

func (s *Server) doAddAndPersistJob(j *Job) {
// persistent job
log.Debugf("add job %+v", j)
log.Debugf("add job with handle: %v func: %v bg: %v", j.Handle, j.FuncName, j.IsBackGround)
if s.store != nil {
if err := s.store.AddJob(j); err != nil {
log.Warning(err)
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *Server) wakeupWorker(funcName string) bool {
continue
}

log.Debug("wakeup sessionId", w.SessionId)
log.Debug("wakeup sessionId ", w.SessionId)

w.Send(wakeupReply)
return true
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *Server) handleCloseSession(e *event) error {
}
}
if c, ok := s.client[sessionId]; ok {
log.Debug("removeClient sessionId", sessionId)
log.Debug("removeClient with sessionId ", sessionId)
delete(s.client, c.SessionId)
}
e.result <- true //notify close finish
Expand All @@ -334,7 +334,7 @@ func (s *Server) handleGetWorker(e *event) (err error) {
e.result <- string(buf)
}()
cando := e.args.t0.(string)
log.Debug("get worker", cando)
log.Debug("get worker ", cando)
if len(cando) == 0 {
workers := make([]*Worker, 0, len(s.worker))
for _, v := range s.worker {
Expand All @@ -348,9 +348,8 @@ func (s *Server) handleGetWorker(e *event) (err error) {
return nil
}

log.Debugf("%+v", s.funcWorker)
if jw, ok := s.funcWorker[cando]; ok {
log.Debug(cando, jw.workers.Len())
log.Debugf("func: %v #workers: %v", cando, jw.workers.Len())
workers := make([]*Worker, 0, jw.workers.Len())
for it := jw.workers.Front(); it != nil; it = it.Next() {
workers = append(workers, it.Value.(*Worker))
Expand All @@ -367,7 +366,7 @@ func (s *Server) handleGetWorker(e *event) (err error) {
}

func (s *Server) handleGetJob(e *event) (err error) {
log.Debug("get jobs", e.handle)
log.Debug("get jobs ", e.handle)
var buf []byte
defer func() {
e.result <- string(buf)
Expand Down Expand Up @@ -399,7 +398,7 @@ func (s *Server) handleGetJob(e *event) (err error) {
}

func (s *Server) handleGetCronJob(e *event) (err error) {
log.Debug("get cronjobs", e.handle)
log.Debug("get cronjobs ", e.handle)
var buf []byte
defer func() {
e.result <- string(buf)
Expand Down Expand Up @@ -501,7 +500,6 @@ func (s *Server) handleCronJob(e *event) {
sj.Handle = allocSchedJobId()
e.result <- sj.Handle
// persistent Cron Job
log.Debugf("add scheduled job %+v", sj)
id := s.doAddCronJob(sj)
sj.CronEntryID = int(id)
scdT, err := NewCronSchedule(sj.Expression)
Expand All @@ -514,7 +512,7 @@ func (s *Server) handleCronJob(e *event) {
log.Errorln(err)
}
}
log.Debugf("Scheduled cron job added with function name `%s`, data '%s' and cron SpecScheduleTime - '%+v'\n", string(sj.JobTemplete.FuncName), string(sj.JobTemplete.Data), sj.Expression)
log.Debugf("add cron job with handle: %v func: %v expr: %v", sj.Handle, sj.JobTemplete.FuncName, sj.Expression)
}

func (s *Server) handleSubmitEpochJob(e *event) {
Expand Down Expand Up @@ -545,13 +543,13 @@ func (s *Server) handleSubmitEpochJob(e *event) {
epoch, _ := s.ExpressionToEpoch(sj.Expression)
sj.Next = time.Unix(epoch, 0)
// persistent Cron Job
log.Debugf("add scheduled epoch job %+v", sj)
s.doAddEpochJob(sj, val)
if s.store != nil {
if err := s.store.AddCronJob(sj); err != nil {
log.Errorln(err)
}
}
log.Debugf("add epoch job with handle: %v func: %v schedule: %v", sj.Handle, sj.JobTemplete.FuncName, time.Unix(epoch, 0).Local())
}

func (s *Server) handleWorkReport(e *event) {
Expand Down Expand Up @@ -666,7 +664,7 @@ func (s *Server) handleProtoEvt(e *event) {
}

w.status = wsSleep
log.Debugf("worker sessionId %d sleep", sessionId)
log.Debugf("worker with sessionId %d sleep", sessionId)
//check if there are any jobs for this worker
for k := range w.canDo {
if s.wakeupWorker(k) {
Expand Down Expand Up @@ -731,22 +729,62 @@ func (s *Server) EvtLoop() {
}
}

func (s *Server) WatcherLoop() {
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ticker.C:
cjs, err := s.store.GetCronJobs()
if err != nil {
log.Error(err)
continue
}
rep := 0
one := 0
for _, cj := range cjs {
if _, isOne := s.ExpressionToEpoch(cj.Expression); isOne {
one++
} else {
rep++
}
}
log.Infof("total cron job: %v #repeated job: %v #onetime job: %v", len(cjs), rep, one)
js, err := s.store.GetJobs()
if err != nil {
log.Error(err)
continue
}
var b, r int = 0, 0
for _, j := range js {
if j.IsBackGround {
b++
}
if j.Running {
r++
}
}
log.Infof("total job: %v #background: %v #running: %v", len(js), b, r)
}
}

}

func (s *Server) allocSessionId() int64 {
return atomic.AddInt64(&s.startSessionId, 1)
}

func (s *Server) DeleteCronJob(cj *CronJob) error {
sj, err := s.store.DeleteCronJob(cj)
if err == lberror.ErrNotFound {
log.Errorf("handle `%v` not found\n", cj.Handle)
log.Errorf("handle `%v` not found", cj.Handle)
return fmt.Errorf("handle `%v` not found", cj.Handle)
}
if err != nil {
log.Errorln(err)
return err
}
s.cronSvc.Remove(cron.EntryID(sj.CronEntryID))
log.Debugf("job `%v` successfully cancelled.\n", cj.Handle)
log.Debugf("job `%v` successfully cancelled.", cj.Handle)
return nil
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (se *session) handleConnection(s *Server, conn net.Conn) {
close(inbox) //notify writer to quit
}
}()

log.Debug("new sessionId", sessionId, "address:", conn.RemoteAddr())
log.Debugf("new session with sessionId %v and address: %v", sessionId, conn.RemoteAddr())

go queueingWriter(inbox, out)
go writer(conn, out)
Expand All @@ -70,16 +69,16 @@ func (se *session) handleBinaryConnection(s *Server, conn net.Conn, r *bufio.Rea
for {
tp, buf, err := ReadMessage(r)
if err != nil {
log.Debug(err, "sessionId", sessionId)
log.Debugf("%v with sessionId %v", err, sessionId)
return
}
args, ok := decodeArgs(tp, buf)
if !ok {
log.Debug("tp:", tp.String(), "argc not match", "details:", string(buf))
log.Debugf("pt: %v argc not match details: %v", tp.String(), string(buf))
return
}

log.Debug("sessionId", sessionId, "tp:", tp.String(), "len(args):", len(args), "details:", string(buf))
log.Debugf("sessionId: %v pt: %v len(args): %v details: %v", sessionId, tp.String(), len(args), string(buf))

switch tp {
case PT_CanDo, PT_CanDoTimeout: //todo: CAN_DO_TIMEOUT timeout support
Expand Down Expand Up @@ -107,7 +106,7 @@ func (se *session) handleBinaryConnection(s *Server, conn net.Conn, r *bufio.Rea
s.protoEvtCh <- e
job := (<-e.result).(*Job)
if job == nil {
log.Debug("sessionId", sessionId, "no job")
log.Debugf("sessionId %v has no job", sessionId)
sendReplyResult(inbox, nojobReply)
break
}
Expand Down

0 comments on commit 9b1e646

Please sign in to comment.