From 92908633b7dd2829a6dfa87d09824323743d1d5d Mon Sep 17 00:00:00 2001 From: Kristian Andersen Date: Wed, 12 Jun 2024 12:38:41 +0200 Subject: [PATCH] Handle disconnect -> close properly in ChoosePartions --- CHANGELOG.md | 7 +++++++ src/DotPulsar/Internal/Producer.cs | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c0a1d855..957ba44c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Fixed + +- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()` dispose causing an unintended + `DivideByZeroException`. It now correctly throws a `ProducerClosedException` + ## [3.3.0] - 2024-06-10 ### Added diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs index 45db4b5f0..e9e2ad917 100644 --- a/src/DotPulsar/Internal/Producer.cs +++ b/src/DotPulsar/Internal/Producer.cs @@ -170,6 +170,7 @@ private SubProducer CreateSubProducer(string topic, int partition) process.Start(); return producer; } + public bool IsFinalState() => _state.IsFinalState(); @@ -202,9 +203,11 @@ private async ValueTask ChoosePartitions(MessageMetadata metadata, Cancella { if (_producerCount == 0) { - _ = await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false); + var newState = await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false); if (_faultException is not null) throw new ProducerFaultedException(_faultException); + if (newState == ProducerState.Closed) + throw new ProducerClosedException(); } if (_producerCount == 1)