Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce distributed mutex #84

Merged
merged 9 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 83 additions & 15 deletions v2/distributed_gobreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type SharedState struct {

// SharedDataStore stores the shared state of DistributedCircuitBreaker.
type SharedDataStore interface {
Lock(name string) error
Unlock(name string) error
GetData(name string) ([]byte, error)
SetData(name string, data []byte) error
}
Expand All @@ -34,17 +36,28 @@ type DistributedCircuitBreaker[T any] struct {
}

// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker.
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (*DistributedCircuitBreaker[T], error) {
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (dcb *DistributedCircuitBreaker[T], err error) {
if store == nil {
return nil, ErrNoSharedStore
}

dcb := &DistributedCircuitBreaker[T]{
dcb = &DistributedCircuitBreaker[T]{
CircuitBreaker: NewCircuitBreaker[T](settings),
store: store,
}

_, err := dcb.getSharedState()
err = dcb.lock()
if err != nil {
return nil, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

_, err = dcb.getSharedState()
if err == ErrNoSharedState {
err = dcb.setSharedState(dcb.extract())
}
Expand All @@ -55,8 +68,43 @@ func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Setting
return dcb, nil
}

const (
mutexTimeout = 5 * time.Second
mutexWaitTime = 500 * time.Millisecond
)

func (dcb *DistributedCircuitBreaker[T]) mutexKey() string {
return "gobreaker:mutex:" + dcb.name
}

func (dcb *DistributedCircuitBreaker[T]) lock() error {
if dcb.store == nil {
return ErrNoSharedStore
}

var err error
expiry := time.Now().Add(mutexTimeout)
for time.Now().Before(expiry) {
err = dcb.store.Lock(dcb.mutexKey())
if err == nil {
return nil
}

time.Sleep(mutexWaitTime)
}
return err
}

func (dcb *DistributedCircuitBreaker[T]) unlock() error {
if dcb.store == nil {
return ErrNoSharedStore
}

return dcb.store.Unlock(dcb.mutexKey())
}

func (dcb *DistributedCircuitBreaker[T]) sharedStateKey() string {
return "gobreaker:" + dcb.name
return "gobreaker:state:" + dcb.name
}

func (dcb *DistributedCircuitBreaker[T]) getSharedState() (SharedState, error) {
Expand Down Expand Up @@ -112,37 +160,57 @@ func (dcb *DistributedCircuitBreaker[T]) extract() SharedState {
}

// State returns the State of DistributedCircuitBreaker.
func (dcb *DistributedCircuitBreaker[T]) State() (State, error) {
func (dcb *DistributedCircuitBreaker[T]) State() (state State, err error) {
shared, err := dcb.getSharedState()
if err != nil {
return shared.State, err
}

err = dcb.lock()
if err != nil {
return state, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

dcb.inject(shared)
state := dcb.CircuitBreaker.State()
state = dcb.CircuitBreaker.State()
shared = dcb.extract()

err = dcb.setSharedState(shared)
return state, err
}

// Execute runs the given request if the DistributedCircuitBreaker accepts it.
func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (T, error) {
func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (t T, err error) {
shared, err := dcb.getSharedState()
if err != nil {
var defaultValue T
return defaultValue, err
return t, err
}

err = dcb.lock()
if err != nil {
return t, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

dcb.inject(shared)
t, e := dcb.CircuitBreaker.Execute(req)
t, err = dcb.CircuitBreaker.Execute(req)
shared = dcb.extract()

err = dcb.setSharedState(shared)
if err != nil {
var defaultValue T
return defaultValue, err
e := dcb.setSharedState(shared)
if e != nil {
return t, e
}

return t, e
return t, err
}
59 changes: 43 additions & 16 deletions v2/distributed_gobreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,57 @@ import (
"time"

"github.com/alicebob/miniredis/v2"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

type storeAdapter struct {
ctx context.Context
client *redis.Client
rs *redsync.Redsync
mutex map[string]*redsync.Mutex
}

func (sa *storeAdapter) GetData(key string) ([]byte, error) {
return sa.client.Get(sa.ctx, key).Bytes()
func newStoreAdapter(client *redis.Client) *storeAdapter {
return &storeAdapter{
ctx: context.Background(),
client: client,
rs: redsync.New(goredis.NewPool(client)),
mutex: map[string]*redsync.Mutex{},
}
}

func (sa *storeAdapter) Lock(name string) error {
mutex, ok := sa.mutex[name]
if ok {
return mutex.Lock()
}

mutex = sa.rs.NewMutex(name, redsync.WithExpiry(mutexTimeout))
sa.mutex[name] = mutex
return mutex.Lock()
}

func (sa *storeAdapter) Unlock(name string) error {
mutex, ok := sa.mutex[name]
if ok {
var err error
ok, err = mutex.Unlock()
if ok && err == nil {
return nil
}
}
return errors.New("unlock failed")
}

func (sa *storeAdapter) SetData(key string, value []byte) error {
return sa.client.Set(sa.ctx, key, value, 0).Err()
func (sa *storeAdapter) GetData(name string) ([]byte, error) {
return sa.client.Get(sa.ctx, name).Bytes()
}

func (sa *storeAdapter) SetData(name string, data []byte) error {
return sa.client.Set(sa.ctx, name, data, 0).Err()
}

var redisServer *miniredis.Miniredis
Expand All @@ -37,10 +73,7 @@ func setUpDCB() *DistributedCircuitBreaker[any] {
Addr: redisServer.Addr(),
})

store := &storeAdapter{
ctx: context.Background(),
client: client,
}
store := newStoreAdapter(client)

dcb, err := NewDistributedCircuitBreaker[any](store, Settings{
Name: "TestBreaker",
Expand Down Expand Up @@ -205,10 +238,7 @@ func TestCustomDistributedCircuitBreaker(t *testing.T) {
Addr: mr.Addr(),
})

store := &storeAdapter{
ctx: context.Background(),
client: client,
}
store := newStoreAdapter(client)

customDCB, err = NewDistributedCircuitBreaker[any](store, Settings{
Name: "CustomBreaker",
Expand Down Expand Up @@ -301,10 +331,7 @@ func TestCustomDistributedCircuitBreakerStateTransitions(t *testing.T) {
Addr: mr.Addr(),
})

store := &storeAdapter{
ctx: context.Background(),
client: client,
}
store := newStoreAdapter(client)

dcb, err := NewDistributedCircuitBreaker[any](store, customSt)
assert.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
module github.com/sony/gobreaker/v2

go 1.21
go 1.22

toolchain go1.22.10

require github.com/stretchr/testify v1.8.4

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
)

require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-redsync/redsync/v4 v4.13.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.7.0
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
25 changes: 25 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,45 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA=
github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
Loading