Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix close blocked #1308

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ type partitionConsumer struct {

dispatcherSeekingControlCh chan struct{}
isSeeking atomic.Bool
ctx context.Context
cancelFunc context.CancelFunc
}

// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
Expand Down Expand Up @@ -344,6 +346,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
boFunc = backoff.NewDefaultBackoff
}

ctx, cancelFunc := context.WithCancel(context.Background())
pc := &partitionConsumer{
parentConsumer: parent,
client: client,
Expand All @@ -367,6 +370,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
schemaInfoCache: newSchemaInfoCache(client, options.topic),
backoffPolicyFunc: boFunc,
dispatcherSeekingControlCh: make(chan struct{}),
ctx: ctx,
cancelFunc: cancelFunc,
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
Expand Down Expand Up @@ -938,6 +943,8 @@ func (pc *partitionConsumer) Close() {
return
}

pc.cancelFunc()

// flush all pending ACK requests and terminate the timer goroutine
pc.ackGroupingTracker.close()

Expand Down Expand Up @@ -1866,7 +1873,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose

return struct{}{}, err
}
_, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration {
_, _ = internal.Retry(pc.ctx, opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
Expand Down
43 changes: 43 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsaradmin"
Expand Down Expand Up @@ -4940,3 +4944,42 @@ func TestAckResponseNotBlocked(t *testing.T) {
}
}
}

func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) {
req := testcontainers.ContainerRequest{
Image: getPulsarTestImage(),
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
WaitingFor: wait.ForExposedPort(),
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
}
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err, "Failed to start the pulsar container")
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
require.NoError(t, err, "Failed to get the pulsar endpoint")

client, err := NewClient(ClientOptions{
URL: endpoint,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer client.Close()

var testConsumer Consumer
require.Eventually(t, func() bool {
testConsumer, err = client.Subscribe(ConsumerOptions{
Topic: newTopicName(),
Schema: NewBytesSchema(nil),
SubscriptionName: "test-sub",
})
return err == nil
}, 30*time.Second, 1*time.Second)
_ = c.Terminate(context.Background())
require.Eventually(t, func() bool {
testConsumer.Close()
return true
}, 30*time.Second, 1*time.Second)
}
8 changes: 1 addition & 7 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,12 +491,6 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
return struct{}{}, nil
}

select {
case <-p.ctx.Done():
return struct{}{}, nil
default:
}

if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
Expand Down Expand Up @@ -552,7 +546,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed

return struct{}{}, err
}
_, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration {
_, _ = internal.Retry(p.ctx, opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
p.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
Expand Down
Loading