Skip to content

Commit

Permalink
Merge Lock Renewal feature to V2 (#107)
Browse files Browse the repository at this point in the history
Co-authored-by: starryrbs <[email protected]>
  • Loading branch information
mfnd and starryrbs authored Mar 8, 2025
1 parent 77591ae commit fdde917
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
22 changes: 22 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package redislock

import (
"time"

"github.com/go-redsync/redsync/v4"
)

type LockerOption func(*redisLocker)

func WithAutoExtendDuration(duration time.Duration) LockerOption {
return func(locker *redisLocker) {
locker.autoExtendDuration = duration
}
}

// WithRedsyncOptions sets the redsync options.
func WithRedsyncOptions(options ...redsync.Option) LockerOption {
return func(locker *redisLocker) {
locker.options = options
}
}
52 changes: 48 additions & 4 deletions redislock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-co-op/gocron/v2"
"github.com/go-redsync/redsync/v4"
Expand Down Expand Up @@ -42,17 +43,36 @@ func NewRedisLockerAlways(r redis.UniversalClient, options ...redsync.Option) (g
return newLocker(r, options...), r.Ping(context.Background()).Err()
}

func NewRedisLockerWithOptions(r redis.UniversalClient, options ...LockerOption) (gocron.Locker, error) {
if err := r.Ping(context.Background()).Err(); err != nil {
return nil, fmt.Errorf("%s: %w", ErrFailedToConnectToRedis, err)
}
return newLockerWithOptions(r, options...), nil
}

func newLocker(r redis.UniversalClient, options ...redsync.Option) gocron.Locker {
pool := goredis.NewPool(r)
rs := redsync.New(pool)
return &redisLocker{rs: rs, options: options}
}

func newLockerWithOptions(r redis.UniversalClient, options ...LockerOption) gocron.Locker {
pool := goredis.NewPool(r)
rs := redsync.New(pool)
l := &redisLocker{rs: rs}
for _, option := range options {
option(l)
}

return l
}

var _ gocron.Locker = (*redisLocker)(nil)

type redisLocker struct {
rs *redsync.Redsync
options []redsync.Option
rs *redsync.Redsync
options []redsync.Option
autoExtendDuration time.Duration
}

func (r *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
Expand All @@ -62,18 +82,27 @@ func (r *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error)
return nil, ErrFailedToObtainLock
}
rl := &redisLock{
mu: mu,
mu: mu,
autoExtendDuration: r.autoExtendDuration,
done: make(chan struct{}),
}

if r.autoExtendDuration > 0 {
go func() { rl.doExtend() }()
}
return rl, nil
}

var _ gocron.Lock = (*redisLock)(nil)

type redisLock struct {
mu *redsync.Mutex
mu *redsync.Mutex
done chan struct{}
autoExtendDuration time.Duration
}

func (r *redisLock) Unlock(ctx context.Context) error {
close(r.done)
unlocked, err := r.mu.UnlockContext(ctx)
if err != nil {
return ErrFailedToReleaseLock
Expand All @@ -84,3 +113,18 @@ func (r *redisLock) Unlock(ctx context.Context) error {

return nil
}

func (r *redisLock) doExtend() {
ticker := time.NewTicker(r.autoExtendDuration)
for {
select {
case <-r.done:
return
case <-ticker.C:
_, err := r.mu.Extend()
if err != nil {
return
}
}
}
}
44 changes: 44 additions & 0 deletions redislock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,47 @@ func TestEnableDistributedLocking(t *testing.T) {
}
assert.Len(t, results, 4)
}

func TestAutoExtend(t *testing.T) {
ctx := context.Background()
redisContainer, err := testcontainersredis.RunContainer(ctx)
require.NoError(t, err)
t.Cleanup(func() {
if err := redisContainer.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

uri, err := redisContainer.ConnectionString(ctx)
require.NoError(t, err)

redisClient := redis.NewClient(&redis.Options{Addr: strings.TrimPrefix(uri, "redis://")})
// create lock not auto extend
l1, err := NewRedisLockerWithOptions(redisClient, WithRedsyncOptions(WithTries(1)))
require.NoError(t, err)
_, err = l1.Lock(ctx, "test1")
require.NoError(t, err)

t.Logf("waiting 9 seconds for lock to expire")
// wait for the lock to expire
time.Sleep(9 * time.Second)

_, err = l1.Lock(ctx, "test1")
require.NoError(t, err)

// create auto extend lock
l2, err := NewRedisLockerWithOptions(redisClient, WithAutoExtendDuration(time.Second*2), WithRedsyncOptions(WithTries(1)))
require.NoError(t, err)
unlocker, err := l2.Lock(ctx, "test2")
require.NoError(t, err)

t.Log("waiting 9 seconds for lock to expire")
// wait for the lock to expire
time.Sleep(9 * time.Second)

_, err = l2.Lock(ctx, "test2")
require.Equal(t, ErrFailedToObtainLock, err)

err = unlocker.Unlock(ctx)
require.NoError(t, err)
}

0 comments on commit fdde917

Please sign in to comment.