From 7fea9871373f57ba9e66edf82fd667ab4443af76 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 14 Nov 2023 09:56:05 -0600 Subject: [PATCH] add distributed locker for v2 (#614) * add distributed locker for v2 * fix logger test * enhance logger test --- README.md | 4 +++ distributed.go | 20 ++++++++++++- errors.go | 1 + example_test.go | 25 ++++++++++++++++ executor.go | 7 +++++ logger.go | 60 ++++++++++++++++++------------------- logger_test.go | 75 ++++++++++++++++++++++++++++++++--------------- scheduler.go | 15 ++++++++++ scheduler_test.go | 40 +++++++++++++++++++++++-- 9 files changed, 191 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 3056c656..8c4dc6c1 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,10 @@ func main() { - [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector): An elector can be used to elect a single instance of gocron to run as the primary with the other instances checking to see if a new leader needs to be elected. + - Implementations: [go-co-op electors](https://github.com/go-co-op?q=-elector&type=all&language=&sort=) + - [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker): + A locker can be used to lock each run of a job to a single instance of gocron. + - Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=) - **Events**: Job events can trigger actions. - [**Listeners**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithEventListeners): [Event listeners](https://pkg.go.dev/github.com/go-co-op/gocron/v2#EventListener) diff --git a/distributed.go b/distributed.go index 7ca9f092..60d1b84a 100644 --- a/distributed.go +++ b/distributed.go @@ -1,7 +1,9 @@ //go:generate mockgen -source=distributed.go -destination=mocks/distributed.go -package=gocronmocks package gocron -import "context" +import ( + "context" +) // Elector determines the leader from instances asking to be the leader. Only // the leader runs jobs. If the leader goes down, a new leader will be elected. @@ -10,3 +12,19 @@ type Elector interface { // making the request and an error if the job should not be scheduled. IsLeader(context.Context) error } + +// Locker represents the required interface to lock jobs when running multiple schedulers. +// The lock is held for the duration of the job's run, and it is expected that the +// locker implementation handles time splay between schedulers. +// The lock key passed is the job's name - which, if not set, defaults to the +// go function's name, e.g. "pkg.myJob" for func myJob() {} in pkg +type Locker interface { + // Lock if an error is returned by lock, the job will not be scheduled. + Lock(ctx context.Context, key string) (Lock, error) +} + +// Lock represents an obtained lock. The lock is released after the execution of the job +// by the scheduler. +type Lock interface { + Unlock(ctx context.Context) error +} diff --git a/errors.go b/errors.go index b9e46694..f5ebef4d 100644 --- a/errors.go +++ b/errors.go @@ -30,6 +30,7 @@ var ( ErrWeeklyJobMinutesSeconds = fmt.Errorf("gocron: WeeklyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil") ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil") + ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil") ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") diff --git a/example_test.go b/example_test.go index 73986251..8deef9b0 100644 --- a/example_test.go +++ b/example_test.go @@ -426,6 +426,31 @@ func ExampleWithDistributedElector() { //) } +func ExampleWithDistributedLocker() { + //var _ Locker = (*myLocker)(nil) + // + //type myLocker struct{} + // + //func (m myLocker) Lock(ctx context.Context, key string) (Lock, error) { + // return &testLock, nil + //} + // + //var _ Lock = (*testLock)(nil) + // + //type testLock struct { + //} + // + //func (t testLock) Unlock(_ context.Context) error { + // return nil + //} + // + //locker := myLocker{} + // + //_, _ = NewScheduler( + // WithDistributedLocker(locker), + //) +} + func ExampleWithEventListeners() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() diff --git a/executor.go b/executor.go index 2bfe5eb1..6ead0b9f 100644 --- a/executor.go +++ b/executor.go @@ -22,6 +22,7 @@ type executor struct { singletonRunners map[uuid.UUID]singletonRunner limitMode *limitModeConfig elector Elector + locker Locker } type singletonRunner struct { @@ -340,6 +341,12 @@ func (e *executor) runJob(j internalJob) { if err := e.elector.IsLeader(j.ctx); err != nil { return } + } else if e.locker != nil { + lock, err := e.locker.Lock(j.ctx, j.id.String()) + if err != nil { + return + } + defer func() { _ = lock.Unlock(j.ctx) }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id) diff --git a/logger.go b/logger.go index ae7c8bbf..7a2271d8 100644 --- a/logger.go +++ b/logger.go @@ -2,7 +2,9 @@ package gocron import ( + "fmt" "log" + "strings" ) // Logger is the interface that wraps the basic logging methods @@ -12,20 +14,20 @@ import ( // or implement your own Logger. The actual level of Log that is logged // is handled by the implementation. type Logger interface { - Debug(msg string, args ...interface{}) - Error(msg string, args ...interface{}) - Info(msg string, args ...interface{}) - Warn(msg string, args ...interface{}) + Debug(msg string, args ...any) + Error(msg string, args ...any) + Info(msg string, args ...any) + Warn(msg string, args ...any) } var _ Logger = (*noOpLogger)(nil) type noOpLogger struct{} -func (l noOpLogger) Debug(_ string, _ ...interface{}) {} -func (l noOpLogger) Error(_ string, _ ...interface{}) {} -func (l noOpLogger) Info(_ string, _ ...interface{}) {} -func (l noOpLogger) Warn(_ string, _ ...interface{}) {} +func (l noOpLogger) Debug(_ string, _ ...any) {} +func (l noOpLogger) Error(_ string, _ ...any) {} +func (l noOpLogger) Info(_ string, _ ...any) {} +func (l noOpLogger) Warn(_ string, _ ...any) {} var _ Logger = (*logger)(nil) @@ -49,46 +51,44 @@ func NewLogger(level LogLevel) Logger { return &logger{level: level} } -func (l *logger) Debug(msg string, args ...interface{}) { +func (l *logger) Debug(msg string, args ...any) { if l.level < LogLevelDebug { return } - if len(args) == 0 { - log.Printf("DEBUG: %s\n", msg) - return - } - log.Printf("DEBUG: %s, %v\n", msg, args) + log.Printf("DEBUG: %s%s\n", msg, logFormatArgs(args...)) } -func (l *logger) Error(msg string, args ...interface{}) { +func (l *logger) Error(msg string, args ...any) { if l.level < LogLevelError { return } - if len(args) == 0 { - log.Printf("ERROR: %s\n", msg) - return - } - log.Printf("ERROR: %s, %v\n", msg, args) + log.Printf("ERROR: %s%s\n", msg, logFormatArgs(args...)) } -func (l *logger) Info(msg string, args ...interface{}) { +func (l *logger) Info(msg string, args ...any) { if l.level < LogLevelInfo { return } - if len(args) == 0 { - log.Printf("INFO: %s\n", msg) - return - } - log.Printf("INFO: %s, %v\n", msg, args) + log.Printf("INFO: %s%s\n", msg, logFormatArgs(args...)) } -func (l *logger) Warn(msg string, args ...interface{}) { +func (l *logger) Warn(msg string, args ...any) { if l.level < LogLevelWarn { return } + log.Printf("WARN: %s%s\n", msg, logFormatArgs(args...)) +} + +func logFormatArgs(args ...any) string { if len(args) == 0 { - log.Printf("WARN: %s\n", msg) - return + return "" + } + if len(args)%2 != 0 { + return ", " + fmt.Sprint(args...) + } + var pairs []string + for i := 0; i < len(args); i += 2 { + pairs = append(pairs, fmt.Sprintf("%s=%v", args[i], args[i+1])) } - log.Printf("WARN: %s, %v\n", msg, args) + return ", " + strings.Join(pairs, ", ") } diff --git a/logger_test.go b/logger_test.go index d4bdb860..b2691f04 100644 --- a/logger_test.go +++ b/logger_test.go @@ -3,6 +3,7 @@ package gocron import ( "bytes" "log" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -45,32 +46,60 @@ func TestNewLogger(t *testing.T) { log.SetOutput(&results) l := NewLogger(tt.level) - l.Debug("debug", "arg1", "arg2") - if tt.level >= LogLevelDebug { - assert.Contains(t, results.String(), "DEBUG: debug, [arg1 arg2]\n") - } else { - assert.Empty(t, results.String()) - } + var noArgs []any + oneArg := []any{"arg1"} + twoArgs := []any{"arg1", "arg2"} + var noArgsStr []string + oneArgStr := []string{"arg1"} + twoArgsStr := []string{"arg1", "arg2"} - l.Info("info", "arg1", "arg2") - if tt.level >= LogLevelInfo { - assert.Contains(t, results.String(), "INFO: info, [arg1 arg2]\n") - } else { - assert.Empty(t, results.String()) - } + for _, args := range []struct { + argsAny []any + argsStr []string + }{ + {noArgs, noArgsStr}, + {oneArg, oneArgStr}, + {twoArgs, twoArgsStr}, + } { + l.Debug("debug", args.argsAny...) + if tt.level >= LogLevelDebug { + r := results.String() + assert.Contains(t, r, "DEBUG: debug") + assert.Contains(t, r, strings.Join(args.argsStr, "=")) + } else { + assert.Empty(t, results.String()) + } + results.Reset() - l.Warn("warn", "arg1", "arg2") - if tt.level >= LogLevelWarn { - assert.Contains(t, results.String(), "WARN: warn, [arg1 arg2]\n") - } else { - assert.Empty(t, results.String()) - } + l.Info("info", args.argsAny...) + if tt.level >= LogLevelInfo { + r := results.String() + assert.Contains(t, r, "INFO: info") + assert.Contains(t, r, strings.Join(args.argsStr, "=")) + } else { + assert.Empty(t, results.String()) + } + results.Reset() + + l.Warn("warn", args.argsAny...) + if tt.level >= LogLevelWarn { + r := results.String() + assert.Contains(t, r, "WARN: warn") + assert.Contains(t, r, strings.Join(args.argsStr, "=")) + } else { + assert.Empty(t, results.String()) + } + results.Reset() - l.Error("error", "arg1", "arg2") - if tt.level >= LogLevelError { - assert.Contains(t, results.String(), "ERROR: error, [arg1 arg2]\n") - } else { - assert.Empty(t, results.String()) + l.Error("error", args.argsAny...) + if tt.level >= LogLevelError { + r := results.String() + assert.Contains(t, r, "ERROR: error") + assert.Contains(t, r, strings.Join(args.argsStr, "=")) + } else { + assert.Empty(t, results.String()) + } + results.Reset() } }) } diff --git a/scheduler.go b/scheduler.go index 8bac3922..1ab56b21 100644 --- a/scheduler.go +++ b/scheduler.go @@ -4,6 +4,7 @@ package gocron import ( "context" "reflect" + "runtime" "time" "github.com/google/uuid" @@ -395,6 +396,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW return nil, ErrNewJobTaskNotFunc } + j.name = runtime.FuncForPC(taskFunc.Pointer()).Name() j.function = tsk.function j.parameters = tsk.parameters @@ -533,6 +535,19 @@ func WithDistributedElector(elector Elector) SchedulerOption { } } +// WithDistributedLocker sets the locker to be used by multiple +// Scheduler instances to ensure that only one instance of each +// job is run. +func WithDistributedLocker(locker Locker) SchedulerOption { + return func(s *scheduler) error { + if locker == nil { + return ErrWithDistributedLockerNil + } + s.exec.locker = locker + return nil + } +} + // WithGlobalJobOptions sets JobOption's that will be applied to // all jobs added to the scheduler. JobOption's set on the job // itself will override if the same JobOption is set globally. diff --git a/scheduler_test.go b/scheduler_test.go index ee8f7588..2596a6bb 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -811,6 +811,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) { WithDistributedElector(nil), ErrWithDistributedElectorNil, }, + { + "WithDistributedLocker nil", + WithDistributedLocker(nil), + ErrWithDistributedLockerNil, + }, { "WithLimitConcurrentJobs limit 0", WithLimitConcurrentJobs(0, LimitModeWait), @@ -1006,15 +1011,46 @@ func (t *testElector) IsLeader(ctx context.Context) error { return nil } -func TestScheduler_WithDistributedElector(t *testing.T) { +var _ Locker = (*testLocker)(nil) + +type testLocker struct { + mu sync.Mutex + jobLocked bool +} + +func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.jobLocked { + return nil, fmt.Errorf("job already locked") + } + return &testLock{}, nil +} + +var _ Lock = (*testLock)(nil) + +type testLock struct{} + +func (t testLock) Unlock(_ context.Context) error { + return nil +} + +func TestScheduler_WithDistributed(t *testing.T) { goleak.VerifyNone(t) tests := []struct { name string count int + opt SchedulerOption }{ { - "3 schedulers", + "3 schedulers with elector", + 3, + WithDistributedElector(&testElector{}), + }, + { + "3 schedulers with locker", 3, + WithDistributedLocker(&testLocker{}), }, }