Skip to content

Commit

Permalink
add distributed locker for v2
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Nov 14, 2023
1 parent 3e2df30 commit 211e4ec
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 21 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func main() {
- [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector):
An elector can be used to elect a single instance of gocron to run as the primary with the
other instances checking to see if a new leader needs to be elected.
- Implementations: [go-co-op electors](https://github.com/go-co-op?q=-elector&type=all&language=&sort=)
- [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker):
A locker can be used to lock each run of a job to a single instance of gocron.
- Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=)
- **Events**: Job events can trigger actions.
- [**Listeners**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithEventListeners):
[Event listeners](https://pkg.go.dev/github.com/go-co-op/gocron/v2#EventListener)
Expand Down
20 changes: 19 additions & 1 deletion distributed.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//go:generate mockgen -source=distributed.go -destination=mocks/distributed.go -package=gocronmocks
package gocron

import "context"
import (
"context"
)

// Elector determines the leader from instances asking to be the leader. Only
// the leader runs jobs. If the leader goes down, a new leader will be elected.
Expand All @@ -10,3 +12,19 @@ type Elector interface {
// making the request and an error if the job should not be scheduled.
IsLeader(context.Context) error
}

// Locker represents the required interface to lock jobs when running multiple schedulers.
// The lock is held for the duration of the job's run, and it is expected that the
// locker implementation handles time splay between schedulers.
// The lock key passed is the job's name - which, if not set, defaults to the
// go function's name, e.g. "pkg.myJob" for func myJob() {} in pkg
type Locker interface {
// Lock if an error is returned by lock, the job will not be scheduled.
Lock(ctx context.Context, key string) (Lock, error)
}

// Lock represents an obtained lock. The lock is released after the execution of the job
// by the scheduler.
type Lock interface {
Unlock(ctx context.Context) error
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
ErrWeeklyJobMinutesSeconds = fmt.Errorf("gocron: WeeklyJob: atTimes minutes and seconds must be between 0 and 59 inclusive")
ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil")
ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil")
ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil")
ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")
Expand Down
25 changes: 25 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,31 @@ func ExampleWithDistributedElector() {
//)
}

func ExampleWithDistributedLocker() {
//var _ Locker = (*myLocker)(nil)
//
//type myLocker struct{}
//
//func (m myLocker) Lock(ctx context.Context, key string) (Lock, error) {
// return &testLock, nil
//}
//
//var _ Lock = (*testLock)(nil)
//
//type testLock struct {
//}
//
//func (t testLock) Unlock(_ context.Context) error {
// return nil
//}
//
//locker := myLocker{}
//
//_, _ = NewScheduler(
// WithDistributedLocker(locker),
//)
}

func ExampleWithEventListeners() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down
7 changes: 7 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type executor struct {
singletonRunners map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
}

type singletonRunner struct {
Expand Down Expand Up @@ -340,6 +341,12 @@ func (e *executor) runJob(j internalJob) {
if err := e.elector.IsLeader(j.ctx); err != nil {
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.id.String())
if err != nil {
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id)

Expand Down
36 changes: 18 additions & 18 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package gocron

import (
"fmt"
"log"
"strings"
)

// Logger is the interface that wraps the basic logging methods
Expand Down Expand Up @@ -53,42 +55,40 @@ func (l *logger) Debug(msg string, args ...interface{}) {
if l.level < LogLevelDebug {
return
}
if len(args) == 0 {
log.Printf("DEBUG: %s\n", msg)
return
}
log.Printf("DEBUG: %s, %v\n", msg, args)
log.Printf("DEBUG: %s%s\n", msg, logFormatArgs(args...))
}

func (l *logger) Error(msg string, args ...interface{}) {
if l.level < LogLevelError {
return
}
if len(args) == 0 {
log.Printf("ERROR: %s\n", msg)
return
}
log.Printf("ERROR: %s, %v\n", msg, args)
log.Printf("ERROR: %s%s\n", msg, logFormatArgs(args...))
}

func (l *logger) Info(msg string, args ...interface{}) {
if l.level < LogLevelInfo {
return
}
if len(args) == 0 {
log.Printf("INFO: %s\n", msg)
return
}
log.Printf("INFO: %s, %v\n", msg, args)
log.Printf("INFO: %s%s\n", msg, logFormatArgs(args...))
}

func (l *logger) Warn(msg string, args ...interface{}) {
if l.level < LogLevelWarn {
return
}
log.Printf("WARN: %s%s\n", msg, logFormatArgs(args...))
}

func logFormatArgs(args ...interface{}) string {
if len(args) == 0 {
log.Printf("WARN: %s\n", msg)
return
return ""
}
if len(args)%2 != 0 {
return ", " + fmt.Sprint(args...)
}
var pairs []string
for i := 0; i < len(args); i += 2 {
pairs = append(pairs, fmt.Sprintf("%s=%v", args[i], args[i+1]))
}
log.Printf("WARN: %s, %v\n", msg, args)
return ", " + strings.Join(pairs, ", ")
}
15 changes: 15 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package gocron
import (
"context"
"reflect"
"runtime"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -395,6 +396,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
return nil, ErrNewJobTaskNotFunc
}

j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
j.function = tsk.function
j.parameters = tsk.parameters

Expand Down Expand Up @@ -533,6 +535,19 @@ func WithDistributedElector(elector Elector) SchedulerOption {
}
}

// WithDistributedLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
func WithDistributedLocker(locker Locker) SchedulerOption {
return func(s *scheduler) error {
if locker == nil {
return ErrWithDistributedLockerNil
}
s.exec.locker = locker
return nil
}
}

// WithGlobalJobOptions sets JobOption's that will be applied to
// all jobs added to the scheduler. JobOption's set on the job
// itself will override if the same JobOption is set globally.
Expand Down
40 changes: 38 additions & 2 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) {
WithDistributedElector(nil),
ErrWithDistributedElectorNil,
},
{
"WithDistributedLocker nil",
WithDistributedLocker(nil),
ErrWithDistributedLockerNil,
},
{
"WithLimitConcurrentJobs limit 0",
WithLimitConcurrentJobs(0, LimitModeWait),
Expand Down Expand Up @@ -1006,15 +1011,46 @@ func (t *testElector) IsLeader(ctx context.Context) error {
return nil
}

func TestScheduler_WithDistributedElector(t *testing.T) {
var _ Locker = (*testLocker)(nil)

type testLocker struct {
mu sync.Mutex
jobLocked bool
}

func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.jobLocked {
return nil, fmt.Errorf("job already locked")
}
return &testLock{}, nil
}

var _ Lock = (*testLock)(nil)

type testLock struct{}

func (t testLock) Unlock(_ context.Context) error {
return nil
}

func TestScheduler_WithDistributed(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
}{
{
"3 schedulers",
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{}),
},
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{}),
},
}

Expand Down

0 comments on commit 211e4ec

Please sign in to comment.