diff --git a/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc b/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc index f38b0d217b9..4ca590f6204 100644 --- a/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc +++ b/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc @@ -69,9 +69,10 @@ class UpdateProcessor final : public SubscriptionHandle { public: [[nodiscard]] static std::shared_ptr startThread(std::unique_ptr fsr, - std::shared_ptr schema, std::shared_ptr callback); + std::unique_ptr fsw, std::shared_ptr schema, + std::shared_ptr callback); - UpdateProcessor(std::unique_ptr fsr, + UpdateProcessor(std::unique_ptr fsr, std::unique_ptr fsw, std::shared_ptr schema, std::shared_ptr callback); ~UpdateProcessor() final; @@ -82,6 +83,10 @@ class UpdateProcessor final : public SubscriptionHandle { private: std::unique_ptr fsr_; + // The FlightStreamWriter is not used inside the thread, but arrow Flight >= 8.0.0 seems to + // require that it stay alive (along with the FlightStreamReader) for the duration of the + // DoExchange session. + std::unique_ptr fsw_; std::shared_ptr schema_; std::shared_ptr callback_; @@ -163,24 +168,27 @@ std::shared_ptr SubscribeState::InvokeHelper() { OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteMetadata(std::move(buffer)))); // Run forever (until error or cancellation) - auto processor = UpdateProcessor::startThread(std::move(fsr), std::move(schema_), + auto processor = UpdateProcessor::startThread(std::move(fsr), std::move(fsw), std::move(schema_), std::move(callback_)); return processor; } std::shared_ptr UpdateProcessor::startThread( - std::unique_ptr fsr, std::shared_ptr schema, + std::unique_ptr fsr, + std::unique_ptr fsw, + std::shared_ptr schema, std::shared_ptr callback) { - auto result = std::make_shared(std::move(fsr), std::move(schema), - std::move(callback)); + auto result = std::make_shared(std::move(fsr), std::move(fsw), + std::move(schema), std::move(callback)); result->thread_ = std::thread(&RunUntilCancelled, result); return result; } UpdateProcessor::UpdateProcessor(std::unique_ptr fsr, + std::unique_ptr fsw, std::shared_ptr schema, std::shared_ptr callback) : - fsr_(std::move(fsr)), schema_(std::move(schema)), callback_(std::move(callback)), - cancelled_(false) {} + fsr_(std::move(fsr)), fsw_(std::move(fsw)), schema_(std::move(schema)), + callback_(std::move(callback)), cancelled_(false) {} UpdateProcessor::~UpdateProcessor() { Cancel();