diff --git a/os/gcron/gcron.go b/os/gcron/gcron.go index 9cccc9f42f8..0e375185c74 100644 --- a/os/gcron/gcron.go +++ b/os/gcron/gcron.go @@ -120,3 +120,8 @@ func Start(name ...string) { func Stop(name ...string) { defaultCron.Stop(name...) } + +// StopGracefully Blocks and waits all current running jobs done. +func StopGracefully() { + defaultCron.StopGracefully() +} diff --git a/os/gcron/gcron_cron.go b/os/gcron/gcron_cron.go index 1c2d931578c..2588de4c071 100644 --- a/os/gcron/gcron_cron.go +++ b/os/gcron/gcron_cron.go @@ -8,6 +8,7 @@ package gcron import ( "context" + "sync" "time" "github.com/gogf/gf/v2/container/garray" @@ -19,10 +20,11 @@ import ( // Cron stores all the cron job entries. type Cron struct { - idGen *gtype.Int64 // Used for unique name generation. - status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed) - entries *gmap.StrAnyMap // All timed task entries. - logger glog.ILogger // Logger, it is nil in default. + idGen *gtype.Int64 // Used for unique name generation. + status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed) + entries *gmap.StrAnyMap // All timed task entries. + logger glog.ILogger // Logger, it is nil in default. + jobWaiter sync.WaitGroup // Graceful shutdown when cron jobs are stopped. } // New returns a new Cron object with default settings. @@ -187,6 +189,12 @@ func (c *Cron) Stop(name ...string) { } } +// StopGracefully Blocks and waits all current running jobs done. +func (c *Cron) StopGracefully() { + c.status.Set(StatusStopped) + c.jobWaiter.Wait() +} + // Remove deletes scheduled task which named `name`. func (c *Cron) Remove(name string) { if v := c.entries.Get(name); v != nil { diff --git a/os/gcron/gcron_entry.go b/os/gcron/gcron_entry.go index 9d9a73448cb..cbededcce8f 100644 --- a/os/gcron/gcron_entry.go +++ b/os/gcron/gcron_entry.go @@ -152,7 +152,9 @@ func (e *Entry) checkAndRun(ctx context.Context) { e.Close() case StatusReady, StatusRunning: + e.cron.jobWaiter.Add(1) defer func() { + e.cron.jobWaiter.Done() if exception := recover(); exception != nil { // Exception caught, it logs the error content to logger in default behavior. e.logErrorf(ctx, diff --git a/os/gcron/gcron_z_example_1_test.go b/os/gcron/gcron_z_example_1_test.go index 2e3eb9f4bd0..40027cda826 100644 --- a/os/gcron/gcron_z_example_1_test.go +++ b/os/gcron/gcron_z_example_1_test.go @@ -8,8 +8,12 @@ package gcron_test import ( "context" + "os" + "os/signal" + "syscall" "time" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" "github.com/gogf/gf/v2/os/glog" ) @@ -21,3 +25,24 @@ func ExampleCronAddSingleton() { }) select {} } + +func ExampleCronGracefulShutdown() { + _, err := gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) { + g.Log().Debug(ctx, "Every 2s job start") + time.Sleep(5 * time.Second) + g.Log().Debug(ctx, "Every 2s job after 5 second end") + }, "MyCronJob") + if err != nil { + panic(err) + } + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + sig := <-quit + glog.Printf(ctx, "Signal received: %s, stopping cron", sig) + + glog.Print(ctx, "Waiting for all cron jobs to complete...") + gcron.StopGracefully() + glog.Print(ctx, "All cron jobs completed") +} diff --git a/os/gcron/gcron_z_unit_test.go b/os/gcron/gcron_z_unit_test.go index 8c78a48947d..1f92a3d8e5f 100644 --- a/os/gcron/gcron_z_unit_test.go +++ b/os/gcron/gcron_z_unit_test.go @@ -9,12 +9,16 @@ package gcron_test import ( "context" "fmt" + "os" + "os/signal" + "syscall" "testing" "time" "github.com/gogf/gf/v2/container/garray" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" + "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/test/gtest" ) @@ -277,3 +281,41 @@ func TestCron_DelayAddTimes(t *testing.T) { t.Assert(cron.Size(), 0) }) } + +func TestCron_JobWaiter(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var err error + s1 := garray.New(true) + s2 := garray.New(true) + _, err = gcron.Add(ctx, "* * * * * *", func(ctx context.Context) { + g.Log().Debug(ctx, "Every second") + s1.Append(struct{}{}) + }, "MyFirstCronJob") + t.Assert(err, nil) + _, err = gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) { + g.Log().Debug(ctx, "Every 2s job start") + time.Sleep(3 * time.Second) + s2.Append(struct{}{}) + g.Log().Debug(ctx, "Every 2s job after 3 second end") + }, "MySecondCronJob") + t.Assert(err, nil) + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + go func() { + time.Sleep(4 * time.Second) // Ensure that the job is triggered twice + glog.Print(ctx, "Sending SIGINT") + quit <- syscall.SIGINT // Send SIGINT + }() + + sig := <-quit + glog.Printf(ctx, "Signal received: %s, stopping cron", sig) + + glog.Print(ctx, "Waiting for all cron jobs to complete...") + gcron.StopGracefully() + glog.Print(ctx, "All cron jobs completed") + t.Assert(s1.Len(), 4) + t.Assert(s2.Len(), 2) + }) +}