Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
justlorain committed Mar 31, 2024
1 parent aecbfc8 commit 21ef822
Showing 1 changed file with 12 additions and 48 deletions.
60 changes: 12 additions & 48 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
// TODO: record execute time of each task
// TODO: add progress bar
// TODO: data cleaning e.g. ByteDance, bytedance, Bytedance => bytedance
// TODO: add retry count option

var ErrReachedRetryTimes = errors.New("error reached retry times")

Expand All @@ -31,46 +30,7 @@ func Start(ctx context.Context) {
// if init failed, stop service
errC <- InitTask(ctx, storage.DB)

c := cron.New()
if _, err := c.AddFunc(config.GlobalConfig.Backend.Cron, func() {
i := 0
for {
if i == config.GlobalConfig.Backend.Retry {
errC <- ErrReachedRetryTimes
}
tx := storage.DB.Begin()
err := UpdateTask(ctx, tx)
if err == nil {
tx.Commit()
j := 0
for {
if j == config.GlobalConfig.Backend.Retry {
errC <- ErrReachedRetryTimes
}
stx := storage.DB.Begin()
err := UpdateContributorCount(ctx, stx)
if err == nil {
stx.Commit()
break
}
slog.Error("error update contributor count", "err", err.Error())
stx.Rollback()
slog.Info("transaction rollback and retry")
j++
}
break
}
slog.Error("error doing update task", "err", err.Error())
tx.Rollback()
slog.Info("transaction rollback and retry")
i++
}
}); err != nil {
slog.Error("error doing cron", "err", err)
errC <- err
}
c.Start()
defer c.Stop()
StartCron(ctx, errC)

if err := util.WaitSignal(errC); err != nil {
slog.Error("receive close signal error", "signal", err.Error())
Expand All @@ -85,6 +45,17 @@ func Restart(ctx context.Context) {

errC := make(chan error)

StartCron(ctx, errC)

if err := util.WaitSignal(errC); err != nil {
slog.Error("receive close signal error", "signal", err.Error())
return
}

slog.Info("openalysis service stopped")
}

func StartCron(ctx context.Context, errC chan error) {
c := cron.New()
if _, err := c.AddFunc(config.GlobalConfig.Backend.Cron, func() {
i := 0
Expand Down Expand Up @@ -125,13 +96,6 @@ func Restart(ctx context.Context) {
}
c.Start()
defer c.Stop()

if err := util.WaitSignal(errC); err != nil {
slog.Error("receive close signal error", "signal", err.Error())
return
}

slog.Info("openalysis service stopped")
}

// map[orgNodeID][]repoNameWithOwner
Expand Down

0 comments on commit 21ef822

Please sign in to comment.