diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6d37a07..5f60c34 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: 1.18 - name: Install cover run: go get golang.org/x/tools/cmd/cover @@ -30,7 +30,8 @@ jobs: - name: Test run: go test --race -v -covermode=atomic -coverprofile=coverage.out ./... - - name: Update Coveralls - env: - COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: goveralls -coverprofile=coverage.out -service=github + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4.0.1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + slug: madflojo/tasks \ No newline at end of file diff --git a/README.md b/README.md index c4fd8f8..0a00c78 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Tasks -[![Coverage Status](https://coveralls.io/repos/github/madflojo/tasks/badge.svg?branch=main)](https://coveralls.io/github/madflojo/tasks?branch=main) +[![codecov](https://codecov.io/gh/madflojo/tasks/graph/badge.svg?token=882QTXA7PX)](https://codecov.io/gh/madflojo/tasks) [![Go Report Card](https://goreportcard.com/badge/github.com/madflojo/tasks)](https://goreportcard.com/report/github.com/madflojo/tasks) [![PkgGoDev](https://pkg.go.dev/badge/github.com/madflojo/tasks)](https://pkg.go.dev/github.com/madflojo/tasks) diff --git a/go.mod b/go.mod index c8536a1..cd41374 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/madflojo/tasks -go 1.17 +go 1.18 require github.com/rs/xid v1.5.0 diff --git a/tasks.go b/tasks.go index 3bf5299..42f8ada 100644 --- a/tasks.go +++ b/tasks.go @@ -91,10 +91,7 @@ import ( // Task contains the scheduled task details and control mechanisms. This struct is used during the creation of tasks. // It allows users to control how and when tasks are executed. type Task struct { - sync.Mutex - - // id is the Unique ID created for each task. This ID is generated by the Add() function. - id string + sync.RWMutex // TaskContext allows for user-defined context that is passed to task functions. TaskContext TaskContext @@ -120,6 +117,14 @@ type Task struct { // the task self deleting. RunOnce bool + // RunSingleInstance is used to set a task as a single instance task. By default, tasks will continue executing at + // the interval specified until deleted. With RunSingleInstance enabled a subsequent task execution will be skipped + // if the previous task execution is still running. + // + // This is useful for tasks that may take longer than the interval to execute. This will prevent multiple instances + // of the same task from running concurrently. + RunSingleInstance bool + // StartAfter is used to specify a start time for the scheduler. When set, tasks will wait for the specified // time to start the schedule timer. StartAfter time.Time @@ -148,6 +153,12 @@ type Task struct { // Either ErrFunc or ErrFuncWithTaskContext must be defined. If both are defined, ErrFuncWithTaskContext will be used. ErrFuncWithTaskContext func(TaskContext, error) + // id is the Unique ID created for each task. This ID is generated by the Add() function. + id string + + // running is used for RunSingleInstance tasks to track whether a previous invocation is still running. + running sync.Mutex + // timer is the internal task timer. This is stored here to provide control via main scheduler functions. timer *time.Timer @@ -360,6 +371,15 @@ func (schd *Scheduler) scheduleTask(t *Task) { // execTask is the underlying scheduler, it is used to trigger and execute tasks. func (schd *Scheduler) execTask(t *Task) { go func() { + if t.RunSingleInstance { + if !t.running.TryLock() { + // Skip execution if task is already running + return + } + defer t.running.Unlock() + } + + // Execute task var err error if t.FuncWithTaskContext != nil { err = t.FuncWithTaskContext(t.TaskContext) @@ -373,10 +393,14 @@ func (schd *Scheduler) execTask(t *Task) { go t.ErrFunc(err) } } + + // If RunOnce is set, delete the task after execution if t.RunOnce { defer schd.Del(t.id) } }() + + // Reschedule task for next execution if !t.RunOnce { t.safeOps(func() { t.timer.Reset(t.Interval) @@ -402,6 +426,7 @@ func (t *Task) Clone() *Task { task.Interval = t.Interval task.StartAfter = t.StartAfter task.RunOnce = t.RunOnce + task.RunSingleInstance = t.RunSingleInstance task.id = t.id task.ctx = t.ctx task.cancel = t.cancel diff --git a/tasks_benchmarks_test.go b/tasks_benchmarks_test.go index e2c28d8..82deb00 100644 --- a/tasks_benchmarks_test.go +++ b/tasks_benchmarks_test.go @@ -14,7 +14,7 @@ func BenchmarkTasks(b *testing.B) { taskID, err := scheduler.Add(&Task{ Interval: time.Duration(1 * time.Minute), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { b.Fatalf("Unable to schedule example task - %s", err) diff --git a/tasks_test.go b/tasks_test.go index 1a5e65a..8a9288f 100644 --- a/tasks_test.go +++ b/tasks_test.go @@ -3,6 +3,7 @@ package tasks import ( "context" "fmt" + "sync" "testing" "time" @@ -25,6 +26,33 @@ type ExecutionTestCase struct { callsFunc bool } +type Counter struct { + sync.RWMutex + val int +} + +func NewCounter() *Counter { + return &Counter{} +} + +func (c *Counter) Inc() { + c.Lock() + defer c.Unlock() + c.val++ +} + +func (c *Counter) Dec() { + c.Lock() + defer c.Unlock() + c.val-- +} + +func (c *Counter) Val() int { + c.RLock() + defer c.RUnlock() + return c.val +} + func TestTasksInterface(t *testing.T) { var tt []InterfaceTestCase @@ -50,7 +78,7 @@ func TestTasksInterface(t *testing.T) { task: &Task{ Interval: time.Duration(1 * time.Second), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }, }) @@ -59,7 +87,7 @@ func TestTasksInterface(t *testing.T) { task: &Task{ Interval: time.Duration(1 * time.Second), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, TaskContext: TaskContext{Context: context.Background()}, }, }) @@ -69,7 +97,7 @@ func TestTasksInterface(t *testing.T) { task: &Task{ Interval: time.Duration(1 * time.Second), FuncWithTaskContext: func(_ TaskContext) error { return nil }, - ErrFuncWithTaskContext: func(_ TaskContext, e error) {}, + ErrFuncWithTaskContext: func(_ TaskContext, _ error) {}, TaskContext: TaskContext{Context: context.Background()}, }, }) @@ -79,7 +107,7 @@ func TestTasksInterface(t *testing.T) { task: &Task{ Interval: time.Duration(1 * time.Second), FuncWithTaskContext: func(_ TaskContext) error { return nil }, - ErrFuncWithTaskContext: func(_ TaskContext, e error) {}, + ErrFuncWithTaskContext: func(_ TaskContext, _ error) {}, }, }) @@ -268,7 +296,7 @@ func TestTaskExecution(t *testing.T) { tc4.cancel() return nil }, - ErrFunc: func(e error) { + ErrFunc: func(error) { t.Errorf("ErrFunc should not be called") }, } @@ -283,11 +311,11 @@ func TestTaskExecution(t *testing.T) { tc5.task = &Task{ Interval: time.Duration(1 * time.Second), TaskContext: TaskContext{Context: tc5.ctx}, - FuncWithTaskContext: func(taskCtx TaskContext) error { + FuncWithTaskContext: func(_ TaskContext) error { tc5.cancel() return nil }, - ErrFuncWithTaskContext: func(taskCtx TaskContext, e error) { + ErrFuncWithTaskContext: func(_ TaskContext, _ error) { t.Errorf("ErrFuncWithTaskContext should not be called") }, } @@ -324,7 +352,7 @@ func TestTaskExecution(t *testing.T) { Interval: time.Duration(1 * time.Second), StartAfter: tc7StartAfter, TaskContext: TaskContext{Context: tc7.ctx}, - FuncWithTaskContext: func(taskCtx TaskContext) error { + FuncWithTaskContext: func(_ TaskContext) error { if time.Now().Before(tc7StartAfter) { t.Errorf("Task should not have been called before StartAfter time") return nil @@ -379,7 +407,7 @@ func TestAdd(t *testing.T) { id, err := scheduler.Add(&Task{ Interval: time.Duration(1 * time.Minute), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -402,7 +430,7 @@ func TestAdd(t *testing.T) { err := scheduler.AddWithID(id.String(), &Task{ Interval: time.Duration(1 * time.Minute), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -431,7 +459,7 @@ func TestAdd(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -440,7 +468,7 @@ func TestAdd(t *testing.T) { err = scheduler.AddWithID(id, &Task{ Interval: time.Duration(1 * time.Minute), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != ErrIDInUse { t.Errorf("Expected error for task with existing id") @@ -455,7 +483,7 @@ func TestAdd(t *testing.T) { t.Run("Check for nil callback", func(t *testing.T) { _, err := scheduler.Add(&Task{ Interval: time.Duration(1 * time.Minute), - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err == nil { t.Errorf("Unexpected success when scheduling an invalid task - %s", err) @@ -465,7 +493,7 @@ func TestAdd(t *testing.T) { t.Run("Check for nil interval", func(t *testing.T) { _, err := scheduler.Add(&Task{ TaskFunc: func() error { return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err == nil { t.Errorf("Unexpected success when scheduling an invalid task - %s", err) @@ -476,6 +504,7 @@ func TestAdd(t *testing.T) { func TestScheduler(t *testing.T) { // Create a base scheduler to use scheduler := New() + defer scheduler.Stop() t.Run("Verify Tasks Run when Added", func(t *testing.T) { // Channel for orchestrating when the task ran @@ -488,7 +517,7 @@ func TestScheduler(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -521,7 +550,7 @@ func TestScheduler(t *testing.T) { cancel() return fmt.Errorf("Fake Error") }, - ErrFuncWithTaskContext: func(ctx TaskContext, e error) { + ErrFuncWithTaskContext: func(ctx TaskContext, _ error) { if ctx.Context != nil && ctx.Context.Err() == context.Canceled { doneCh <- struct{}{} } @@ -558,7 +587,7 @@ func TestScheduler(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -581,6 +610,7 @@ func TestScheduler(t *testing.T) { func TestSchedulerDoesntRun(t *testing.T) { // Create a base scheduler to use scheduler := New() + defer scheduler.Stop() t.Run("Verify Cancelling a StartAfter works as expected", func(t *testing.T) { // Channel for orchestrating when the task ran @@ -597,7 +627,7 @@ func TestSchedulerDoesntRun(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -627,7 +657,7 @@ func TestSchedulerDoesntRun(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -658,6 +688,7 @@ func TestSchedulerDoesntRun(t *testing.T) { func TestSchedulerExtras(t *testing.T) { // Create a base scheduler to use scheduler := New() + defer scheduler.Stop() t.Run("Verify RunOnce works as expected", func(t *testing.T) { // Channel for orchestrating when the task ran @@ -671,7 +702,7 @@ func TestSchedulerExtras(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) {}, + ErrFunc: func(_ error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -703,7 +734,7 @@ func TestSchedulerExtras(t *testing.T) { _, err := scheduler.Add(&Task{ Interval: time.Duration(1 * time.Second), TaskFunc: func() error { return fmt.Errorf("Errors are bad") }, - ErrFunc: func(e error) { doneCh <- struct{}{} }, + ErrFunc: func(_ error) { doneCh <- struct{}{} }, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -718,3 +749,56 @@ func TestSchedulerExtras(t *testing.T) { } }) } + +func TestSingleInstance(t *testing.T) { + // Create a base scheduler to use + scheduler := New() + defer scheduler.Stop() + + // Create a counter to track how many times the task is called + counter := NewCounter() + + // Create a second counter to track number of executions + counter2 := NewCounter() + + // Create an error channel to signal failure + errCh := make(chan error) + + // Add a task that will increment the counter + _, err := scheduler.Add(&Task{ + Interval: time.Duration(500 * time.Millisecond), + RunSingleInstance: true, + TaskFunc: func() error { + // Increment Concurrent Counter + counter.Inc() + if counter.Val() > 1 { + return fmt.Errorf("Task ran more than once - count %d", counter.Val()) + } + // Increment Execution Counter + counter2.Inc() + + // Wait for 10 seconds + <-time.After(5 * time.Second) + + // Decrement Concurrent Counter + counter.Dec() + return nil + }, + ErrFunc: func(e error) { + errCh <- e + }, + }) + if err != nil { + t.Fatalf("Unexpected errors when scheduling task - %s", err) + } + + // Wait for tasks to run and if no error, then we are good + select { + case <-time.After(30 * time.Second): + if counter2.Val() < 4 { + t.Fatalf("Task was not called more than once successfully - count %d", counter2.Val()) + } + case e := <-errCh: + t.Fatalf("Error function was called - %s", e) + } +}