Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added WithDisableCircuitBreaker and WithDisableBusyLoopBreaker options #16

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.3.0 (Sep 25, 2024)

1. Added `WithDisableCircuitBreaker` and `WithDisableBusyLoopBreaker` options. These are variants of the now deprecated `DisableCircuitBreaker`
and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with
code generation and for shimming with configuration.

## 1.2.0 (Sep 23, 2024)

1. Update to allow subject name specification (not just TopicNameStrategy)
Expand Down
10 changes: 5 additions & 5 deletions test/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) {
zkafka.ConsumerTopicConfig{Topic: topicName},
kproc,
zkafka.CircuitBreakAfter(1), // Circuit breaks after 1 error.
zkafka.WithDisableCircuitBreaker(false),
zkafka.CircuitBreakFor(50*time.Millisecond),
zkafka.WithOnDone(func(ctx context.Context, message *zkafka.Message, err error) {
cnt.Add(1)
Expand Down Expand Up @@ -443,7 +444,7 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) {
w := kwf.Create(
zkafka.ConsumerTopicConfig{Topic: topicName},
&fakeProcessor{},
zkafka.DisableCircuitBreaker(),
zkafka.WithDisableCircuitBreaker(true),
zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) {
cnt.Add(1)
}}),
Expand Down Expand Up @@ -966,7 +967,7 @@ func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *test
},
},
&processor,
zkafka.DisableCircuitBreaker(),
zkafka.WithDisableCircuitBreaker(true),
)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1564,7 +1565,6 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen
kcp := zkafka_mocks.NewMockClientProvider(ctrl)
kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Return(r, nil).AnyTimes()

//l := zkafka.NoopLogger{}
l := stdLogger{includeDebug: true}
kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l))

Expand All @@ -1578,7 +1578,7 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen
return errors.New("an error occurred during processing")
},
},
zkafka.DisableBusyLoopBreaker(),
zkafka.WithDisableBusyLoopBreaker(true),
zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) {
fanOutCount.Add(1)
}}),
Expand Down Expand Up @@ -2132,7 +2132,7 @@ func BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker(b *testing.B) {
zkafka.Speedup(10),
zkafka.CircuitBreakAfter(100),
zkafka.CircuitBreakFor(30*time.Millisecond),
zkafka.DisableBusyLoopBreaker(),
zkafka.WithDisableCircuitBreaker(true),
)

ctx, cancel := context.WithTimeout(ctx, time.Second)
Expand Down
2 changes: 1 addition & 1 deletion work.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Work struct {
// Duration for which a circuit is open. Use CircuitBreakFor to control
cbFor *time.Duration

// Disable circuit breaking. Use DisableCircuitBreaker to control
// Disable circuit breaking. Use WithDisableCircuitBreaker to control
disableCb bool

// Busy loop breaker. When circuit breaker circuit is open, instead of consuming cpu in a busy loop
Expand Down
27 changes: 21 additions & 6 deletions workoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ func CircuitBreakFor(duration time.Duration) WorkOption {
return circuitBreakForOption{duration: duration}
}

// DisableCircuitBreaker disables the circuit breaker so that it never breaks
// Deprecated: DisableCircuitBreaker disables the circuit breaker so that it never breaks
func DisableCircuitBreaker() WorkOption {
return disableCbOption{}
return WithDisableCircuitBreaker(true)
}

// DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
// WithDisableCircuitBreaker allows the user to control whether circuit breaker is disabled or not
func WithDisableCircuitBreaker(isDisabled bool) WorkOption {
return disableCbOption{disabled: isDisabled}
}

// Deprecated: DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
// Without blb we see increased cpu usage when circuit is open
func DisableBusyLoopBreaker() WorkOption {
return disableBlbOption{}
return WithDisableBusyLoopBreaker(true)
}

// WithDisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
// Without blb we see increased cpu usage when circuit is open
func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption {
return disableBlbOption{disabled: isDisabled}
}

// WithOnDone allows you to specify a callback function executed after processing of a kafka message
Expand Down Expand Up @@ -75,7 +86,9 @@ func (c circuitBreakForOption) apply(w *Work) {
}
}

type disableCbOption struct{}
type disableCbOption struct {
disabled bool
}

func (d disableCbOption) apply(w *Work) {
w.disableCb = true
Expand All @@ -99,7 +112,9 @@ func (o lifeCycleOption) apply(w *Work) {
w.lifecycle = o.lh
}

type disableBlbOption struct{}
type disableBlbOption struct {
disabled bool
}

func (d disableBlbOption) apply(w *Work) {
w.blb.disabled = true
Expand Down