Skip to content

Commit

Permalink
Block Graceful Shutdown: stop input bindings and subscriptions (dapr#…
Browse files Browse the repository at this point in the history
…7474)

Whilst Dapr should keep available outgoing API requests whilst blocking
graceful shutdown, we want to prevent incoming input bindings and
subscriptions from receiving new messages.

PR updates the block graceful shutdown to forever stop input bindings
and subscriptions from being read.

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL authored Feb 1, 2024
1 parent c917506 commit 9c69f83
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/runtime/processor/binding/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type binding struct {

lock sync.Mutex
readingBindings bool
stopForever bool

subscribeBindingList []string
inputCancels map[string]context.CancelFunc
Expand Down
10 changes: 9 additions & 1 deletion pkg/runtime/processor/binding/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (b *binding) StartReadingFromBindings(ctx context.Context) error {
b.lock.Lock()
defer b.lock.Unlock()

if b.stopForever {
return nil
}

b.readingBindings = true

if b.channels.AppChannel() == nil {
Expand Down Expand Up @@ -110,11 +114,15 @@ func (b *binding) startInputBinding(comp componentsV1alpha1.Component, binding b
return nil
}

func (b *binding) StopReadingFromBindings() {
func (b *binding) StopReadingFromBindings(forever bool) {
b.lock.Lock()
defer b.lock.Unlock()
defer b.wg.Wait()

if forever {
b.stopForever = true
}

b.readingBindings = false

for _, cancel := range b.inputCancels {
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type PubsubManager interface {
BulkPublish(context.Context, *contribpubsub.BulkPublishRequest) (contribpubsub.BulkPublishResponse, error)

StartSubscriptions(context.Context) error
StopSubscriptions()
StopSubscriptions(forever bool)
Outbox() outbox.Outbox
manager
}
Expand All @@ -56,7 +56,7 @@ type BindingManager interface {
SendToOutputBinding(context.Context, string, *bindings.InvokeRequest) (*bindings.InvokeResponse, error)

StartReadingFromBindings(context.Context) error
StopReadingFromBindings()
StopReadingFromBindings(forever bool)
manager
}

Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/processor/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type pubsub struct {

lock sync.RWMutex
subscribing bool
stopForever bool

topicCancels map[string]context.CancelFunc
outbox outbox.Outbox
Expand Down
10 changes: 5 additions & 5 deletions pkg/runtime/processor/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestInitPubSub(t *testing.T) {

mockAppChannel := new(channelt.MockAppChannel)
ps.channels = new(channels.Channels).WithAppChannel(mockAppChannel)
ps.StopSubscriptions()
ps.StopSubscriptions(false)
ps.compStore.SetTopicRoutes(nil)
ps.compStore.SetSubscriptions(nil)
for name := range ps.compStore.ListPubSubs() {
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestInitPubSub(t *testing.T) {
mockAppChannel.On("InvokeMethod", mock.MatchedBy(matchContextInterface), matchDaprRequestMethod("dapr/subscribe")).Return(fakeResp, nil)

require.NoError(t, ps.StartSubscriptions(context.Background()))
ps.StopSubscriptions()
ps.StopSubscriptions(false)

// act
for _, comp := range pubsubComponents {
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestInitPubSub(t *testing.T) {
mockAppChannel.On("InvokeMethod", mock.MatchedBy(matchContextInterface), matchDaprRequestMethod("dapr/subscribe")).Return(fakeResp, nil)

require.NoError(t, ps.StartSubscriptions(context.Background()))
ps.StopSubscriptions()
ps.StopSubscriptions(false)

// act
for _, comp := range pubsubComponents {
Expand Down Expand Up @@ -1927,7 +1927,7 @@ func TestPubsubLifecycle(t *testing.T) {
comp3 := getPubSub("mockPubSub3")
comp3.On("unsubscribed", "topic4").Return(nil).Once()

ps.StopSubscriptions()
ps.StopSubscriptions(false)

sendMessages(t, 0)

Expand All @@ -1947,7 +1947,7 @@ func TestPubsubLifecycle(t *testing.T) {
comp2.On("unsubscribed", "topic2").Return(nil).Once()
comp2.On("unsubscribed", "topic3").Return(nil).Once()

ps.StopSubscriptions()
ps.StopSubscriptions(false)
time.Sleep(time.Second / 2)

comp1.AssertCalled(t, "unsubscribed", "topic1")
Expand Down
14 changes: 12 additions & 2 deletions pkg/runtime/processor/pubsub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import (
// StartSubscriptions starts the pubsub subscriptions
func (p *pubsub) StartSubscriptions(ctx context.Context) error {
// Clean any previous state
p.StopSubscriptions()
p.StopSubscriptions(false)

p.lock.Lock()
defer p.lock.Unlock()

// If Dapr has stopped subscribing forever, return early.
if p.stopForever {
return nil
}

p.subscribing = true

var errs []error
Expand All @@ -47,10 +52,15 @@ func (p *pubsub) StartSubscriptions(ctx context.Context) error {
}

// StopSubscriptions to all topics and cleans the cached topics
func (p *pubsub) StopSubscriptions() {
func (p *pubsub) StopSubscriptions(forever bool) {
p.lock.Lock()
defer p.lock.Unlock()

if forever {
// Mark if Dapr has stopped subscribing forever.
p.stopForever = true
}

p.subscribing = false

for subKey := range p.topicCancels {
Expand Down
19 changes: 15 additions & 4 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ func (a *DaprRuntime) Run(parentCtx context.Context) error {
}

log.Infof("Blocking graceful shutdown for %s or until app reports unhealthy...", *a.runtimeConfig.blockShutdownDuration)

// Stop reading from subscriptions and input bindings forever while
// blocking graceful shutdown. This will prevent incoming messages from
// being processed, but allow outgoing APIs to be processed.
a.processor.PubSub().StopSubscriptions(true)
a.processor.Binding().StopReadingFromBindings(true)

select {
case <-a.clock.After(*a.runtimeConfig.blockShutdownDuration):
log.Info("Block shutdown period expired, entering shutdown...")
Expand Down Expand Up @@ -715,8 +722,8 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) {
}

// Stop topic subscriptions and input bindings
a.processor.PubSub().StopSubscriptions()
a.processor.Binding().StopReadingFromBindings()
a.processor.PubSub().StopSubscriptions(false)
a.processor.Binding().StopReadingFromBindings(false)
}
}

Expand Down Expand Up @@ -803,10 +810,14 @@ func (a *DaprRuntime) startHTTPServer(port int, publicPort *int, profilePort int
return err
}

if err := a.runnerCloser.AddCloser(a.processor.PubSub().StopSubscriptions); err != nil {
if err := a.runnerCloser.AddCloser(func() {
a.processor.PubSub().StopSubscriptions(true)
}); err != nil {
return err
}
if err := a.runnerCloser.AddCloser(a.processor.Binding().StopReadingFromBindings); err != nil {
if err := a.runnerCloser.AddCloser(func() {
a.processor.Binding().StopReadingFromBindings(true)
}); err != nil {
return err
}

Expand Down
27 changes: 25 additions & 2 deletions tests/integration/suite/daprd/shutdown/block/app/healthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

commonv1 "github.com/dapr/dapr/pkg/proto/common/v1"
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
Expand Down Expand Up @@ -102,6 +103,14 @@ metadata:
spec:
type: pubsub.in-memory
version: v1
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mystore
spec:
type: state.in-memory
version: v1
`))

return []framework.Option{
Expand Down Expand Up @@ -149,9 +158,16 @@ func (h *healthy) Run(t *testing.T, ctx context.Context) {
require.NoError(t, err)
select {
case <-h.routeCh:
case <-ctx.Done():
assert.Fail(t, "pubsub did not send message to subscriber")
assert.Fail(t, "pubsub should not have sent message to subscriber")
case <-time.After(time.Second):
}
_, err = client.SaveState(ctx, &rtv1.SaveStateRequest{
StoreName: "mystore",
States: []*commonv1.StateItem{
{Key: "key", Value: []byte("value")},
},
})
require.NoError(t, err)

healthzCalled = h.healthzCalled.Load()
h.appHealth.Store(false)
Expand All @@ -170,6 +186,13 @@ func (h *healthy) Run(t *testing.T, ctx context.Context) {
//nolint:testifylint
assert.Error(c, err)
}, time.Second*5, time.Millisecond*100)
_, err = client.SaveState(ctx, &rtv1.SaveStateRequest{
StoreName: "mystore",
States: []*commonv1.StateItem{
{Key: "key", Value: []byte("value2")},
},
})
require.Error(t, err)

select {
case <-daprdStopped:
Expand Down
Loading

0 comments on commit 9c69f83

Please sign in to comment.