Skip to content

Commit

Permalink
add distributed locker for v2 (#614)
Browse files Browse the repository at this point in the history
* add distributed locker for v2

* fix logger test

* enhance logger test
  • Loading branch information
JohnRoesler authored Nov 14, 2023
1 parent 3e2df30 commit 7fea987
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 56 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func main() {
- [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector):
An elector can be used to elect a single instance of gocron to run as the primary with the
other instances checking to see if a new leader needs to be elected.
- Implementations: [go-co-op electors](https://github.com/go-co-op?q=-elector&type=all&language=&sort=)
- [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker):
A locker can be used to lock each run of a job to a single instance of gocron.
- Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=)
- **Events**: Job events can trigger actions.
- [**Listeners**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithEventListeners):
[Event listeners](https://pkg.go.dev/github.com/go-co-op/gocron/v2#EventListener)
Expand Down
20 changes: 19 additions & 1 deletion distributed.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//go:generate mockgen -source=distributed.go -destination=mocks/distributed.go -package=gocronmocks
package gocron

import "context"
import (
"context"
)

// Elector determines the leader from instances asking to be the leader. Only
// the leader runs jobs. If the leader goes down, a new leader will be elected.
Expand All @@ -10,3 +12,19 @@ type Elector interface {
// making the request and an error if the job should not be scheduled.
IsLeader(context.Context) error
}

// Locker represents the required interface to lock jobs when running multiple schedulers.
// The lock is held for the duration of the job's run, and it is expected that the
// locker implementation handles time splay between schedulers.
// The lock key passed is the job's name - which, if not set, defaults to the
// go function's name, e.g. "pkg.myJob" for func myJob() {} in pkg
type Locker interface {
// Lock if an error is returned by lock, the job will not be scheduled.
Lock(ctx context.Context, key string) (Lock, error)
}

// Lock represents an obtained lock. The lock is released after the execution of the job
// by the scheduler.
type Lock interface {
Unlock(ctx context.Context) error
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
ErrWeeklyJobMinutesSeconds = fmt.Errorf("gocron: WeeklyJob: atTimes minutes and seconds must be between 0 and 59 inclusive")
ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil")
ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil")
ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil")
ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")
Expand Down
25 changes: 25 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,31 @@ func ExampleWithDistributedElector() {
//)
}

func ExampleWithDistributedLocker() {
//var _ Locker = (*myLocker)(nil)
//
//type myLocker struct{}
//
//func (m myLocker) Lock(ctx context.Context, key string) (Lock, error) {
// return &testLock, nil
//}
//
//var _ Lock = (*testLock)(nil)
//
//type testLock struct {
//}
//
//func (t testLock) Unlock(_ context.Context) error {
// return nil
//}
//
//locker := myLocker{}
//
//_, _ = NewScheduler(
// WithDistributedLocker(locker),
//)
}

func ExampleWithEventListeners() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down
7 changes: 7 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type executor struct {
singletonRunners map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
}

type singletonRunner struct {
Expand Down Expand Up @@ -340,6 +341,12 @@ func (e *executor) runJob(j internalJob) {
if err := e.elector.IsLeader(j.ctx); err != nil {
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.id.String())

This comment has been minimized.

Copy link
@okoyfoeciov

okoyfoeciov Nov 16, 2023

Contributor

Hi @JohnRoesler, don't you mind if I ask a question at this line? I see that you're using job ID for the key, but in distributed environment, other instances would come up with several instances of the same job, which leads to different UUIDs for the same task. Therefore, the locker has no effect at all because all instances will try to acquire different locks. In v1, I saw that you've used job name (which is also function name) for lock key. Could I ask if it is an intentional design or not?

This comment has been minimized.

Copy link
@JohnRoesler

JohnRoesler Nov 16, 2023

Author Contributor

Nope - just a miss. Good call out 👍

This comment has been minimized.

Copy link
@JohnRoesler

JohnRoesler Nov 16, 2023

Author Contributor

Gave you credit 😄 #623

if err != nil {
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id)

Expand Down
60 changes: 30 additions & 30 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package gocron

import (
"fmt"
"log"
"strings"
)

// Logger is the interface that wraps the basic logging methods
Expand All @@ -12,20 +14,20 @@ import (
// or implement your own Logger. The actual level of Log that is logged
// is handled by the implementation.
type Logger interface {
Debug(msg string, args ...interface{})
Error(msg string, args ...interface{})
Info(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Debug(msg string, args ...any)
Error(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
}

var _ Logger = (*noOpLogger)(nil)

type noOpLogger struct{}

func (l noOpLogger) Debug(_ string, _ ...interface{}) {}
func (l noOpLogger) Error(_ string, _ ...interface{}) {}
func (l noOpLogger) Info(_ string, _ ...interface{}) {}
func (l noOpLogger) Warn(_ string, _ ...interface{}) {}
func (l noOpLogger) Debug(_ string, _ ...any) {}
func (l noOpLogger) Error(_ string, _ ...any) {}
func (l noOpLogger) Info(_ string, _ ...any) {}
func (l noOpLogger) Warn(_ string, _ ...any) {}

var _ Logger = (*logger)(nil)

Expand All @@ -49,46 +51,44 @@ func NewLogger(level LogLevel) Logger {
return &logger{level: level}
}

func (l *logger) Debug(msg string, args ...interface{}) {
func (l *logger) Debug(msg string, args ...any) {
if l.level < LogLevelDebug {
return
}
if len(args) == 0 {
log.Printf("DEBUG: %s\n", msg)
return
}
log.Printf("DEBUG: %s, %v\n", msg, args)
log.Printf("DEBUG: %s%s\n", msg, logFormatArgs(args...))
}

func (l *logger) Error(msg string, args ...interface{}) {
func (l *logger) Error(msg string, args ...any) {
if l.level < LogLevelError {
return
}
if len(args) == 0 {
log.Printf("ERROR: %s\n", msg)
return
}
log.Printf("ERROR: %s, %v\n", msg, args)
log.Printf("ERROR: %s%s\n", msg, logFormatArgs(args...))
}

func (l *logger) Info(msg string, args ...interface{}) {
func (l *logger) Info(msg string, args ...any) {
if l.level < LogLevelInfo {
return
}
if len(args) == 0 {
log.Printf("INFO: %s\n", msg)
return
}
log.Printf("INFO: %s, %v\n", msg, args)
log.Printf("INFO: %s%s\n", msg, logFormatArgs(args...))
}

func (l *logger) Warn(msg string, args ...interface{}) {
func (l *logger) Warn(msg string, args ...any) {
if l.level < LogLevelWarn {
return
}
log.Printf("WARN: %s%s\n", msg, logFormatArgs(args...))
}

func logFormatArgs(args ...any) string {
if len(args) == 0 {
log.Printf("WARN: %s\n", msg)
return
return ""
}
if len(args)%2 != 0 {
return ", " + fmt.Sprint(args...)
}
var pairs []string
for i := 0; i < len(args); i += 2 {
pairs = append(pairs, fmt.Sprintf("%s=%v", args[i], args[i+1]))
}
log.Printf("WARN: %s, %v\n", msg, args)
return ", " + strings.Join(pairs, ", ")
}
75 changes: 52 additions & 23 deletions logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gocron
import (
"bytes"
"log"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -45,32 +46,60 @@ func TestNewLogger(t *testing.T) {
log.SetOutput(&results)
l := NewLogger(tt.level)

l.Debug("debug", "arg1", "arg2")
if tt.level >= LogLevelDebug {
assert.Contains(t, results.String(), "DEBUG: debug, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
}
var noArgs []any
oneArg := []any{"arg1"}
twoArgs := []any{"arg1", "arg2"}
var noArgsStr []string
oneArgStr := []string{"arg1"}
twoArgsStr := []string{"arg1", "arg2"}

l.Info("info", "arg1", "arg2")
if tt.level >= LogLevelInfo {
assert.Contains(t, results.String(), "INFO: info, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
}
for _, args := range []struct {
argsAny []any
argsStr []string
}{
{noArgs, noArgsStr},
{oneArg, oneArgStr},
{twoArgs, twoArgsStr},
} {
l.Debug("debug", args.argsAny...)
if tt.level >= LogLevelDebug {
r := results.String()
assert.Contains(t, r, "DEBUG: debug")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()

l.Warn("warn", "arg1", "arg2")
if tt.level >= LogLevelWarn {
assert.Contains(t, results.String(), "WARN: warn, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
}
l.Info("info", args.argsAny...)
if tt.level >= LogLevelInfo {
r := results.String()
assert.Contains(t, r, "INFO: info")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()

l.Warn("warn", args.argsAny...)
if tt.level >= LogLevelWarn {
r := results.String()
assert.Contains(t, r, "WARN: warn")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()

l.Error("error", "arg1", "arg2")
if tt.level >= LogLevelError {
assert.Contains(t, results.String(), "ERROR: error, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
l.Error("error", args.argsAny...)
if tt.level >= LogLevelError {
r := results.String()
assert.Contains(t, r, "ERROR: error")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()
}
})
}
Expand Down
15 changes: 15 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package gocron
import (
"context"
"reflect"
"runtime"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -395,6 +396,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
return nil, ErrNewJobTaskNotFunc
}

j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
j.function = tsk.function
j.parameters = tsk.parameters

Expand Down Expand Up @@ -533,6 +535,19 @@ func WithDistributedElector(elector Elector) SchedulerOption {
}
}

// WithDistributedLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
func WithDistributedLocker(locker Locker) SchedulerOption {
return func(s *scheduler) error {
if locker == nil {
return ErrWithDistributedLockerNil
}
s.exec.locker = locker
return nil
}
}

// WithGlobalJobOptions sets JobOption's that will be applied to
// all jobs added to the scheduler. JobOption's set on the job
// itself will override if the same JobOption is set globally.
Expand Down
40 changes: 38 additions & 2 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) {
WithDistributedElector(nil),
ErrWithDistributedElectorNil,
},
{
"WithDistributedLocker nil",
WithDistributedLocker(nil),
ErrWithDistributedLockerNil,
},
{
"WithLimitConcurrentJobs limit 0",
WithLimitConcurrentJobs(0, LimitModeWait),
Expand Down Expand Up @@ -1006,15 +1011,46 @@ func (t *testElector) IsLeader(ctx context.Context) error {
return nil
}

func TestScheduler_WithDistributedElector(t *testing.T) {
var _ Locker = (*testLocker)(nil)

type testLocker struct {
mu sync.Mutex
jobLocked bool
}

func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.jobLocked {
return nil, fmt.Errorf("job already locked")
}
return &testLock{}, nil
}

var _ Lock = (*testLock)(nil)

type testLock struct{}

func (t testLock) Unlock(_ context.Context) error {
return nil
}

func TestScheduler_WithDistributed(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
}{
{
"3 schedulers",
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{}),
},
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{}),
},
}

Expand Down

0 comments on commit 7fea987

Please sign in to comment.