diff --git a/cpp-client/deephaven/dhclient/src/impl/table_handle_impl.cc b/cpp-client/deephaven/dhclient/src/impl/table_handle_impl.cc index 81b9f59925c..c7d8affc6a7 100644 --- a/cpp-client/deephaven/dhclient/src/impl/table_handle_impl.cc +++ b/cpp-client/deephaven/dhclient/src/impl/table_handle_impl.cc @@ -737,15 +737,13 @@ std::shared_ptr TableHandleImpl::Schema() { ); auto fd = ConvertTicketToFlightDescriptor(ticket_.ticket()); - std::unique_ptr schema_result; - auto gs_result = server->FlightClient()->GetSchema(options, fd, &schema_result); + auto gs_result = server->FlightClient()->GetSchema(options, fd); OkOrThrow(DEEPHAVEN_LOCATION_EXPR(gs_result)); - std::shared_ptr arrow_schema; - auto schema_res = schema_result->GetSchema(nullptr, &arrow_schema); - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_res)); + auto schema_result = (*gs_result)->GetSchema(nullptr); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_result)); - const auto &fields = arrow_schema->fields(); + const auto &fields = (*schema_result)->fields(); auto names = MakeReservedVector(fields.size()); auto types = MakeReservedVector(fields.size()); for (const auto &f: fields) { diff --git a/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc b/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc index 4ca590f6204..ad199cfa209 100644 --- a/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc +++ b/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc @@ -68,7 +68,7 @@ class SubscribeState final { class UpdateProcessor final : public SubscriptionHandle { public: [[nodiscard]] - static std::shared_ptr startThread(std::unique_ptr fsr, + static std::shared_ptr StartThread(std::unique_ptr fsr, std::unique_ptr fsw, std::shared_ptr schema, std::shared_ptr callback); @@ -158,22 +158,21 @@ std::shared_ptr SubscribeState::InvokeHelper() { descriptor.type = arrow::flight::FlightDescriptor::DescriptorType::CMD; descriptor.cmd = std::string(magic_data, 4); - std::unique_ptr fsw; - std::unique_ptr fsr; - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(client->DoExchange(fco, descriptor, &fsw, &fsr))); + auto res = client->DoExchange(fco, descriptor); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res)); auto sub_req_raw = BarrageProcessor::CreateSubscriptionRequest(ticketBytes_.data(), ticketBytes_.size()); auto buffer = std::make_shared(std::move(sub_req_raw)); - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteMetadata(std::move(buffer)))); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteMetadata(std::move(buffer)))); // Run forever (until error or cancellation) - auto processor = UpdateProcessor::startThread(std::move(fsr), std::move(fsw), std::move(schema_), - std::move(callback_)); + auto processor = UpdateProcessor::StartThread(std::move(res->reader), std::move(res->writer), + std::move(schema_), std::move(callback_)); return processor; } -std::shared_ptr UpdateProcessor::startThread( +std::shared_ptr UpdateProcessor::StartThread( std::unique_ptr fsr, std::unique_ptr fsw, std::shared_ptr schema, @@ -195,12 +194,12 @@ UpdateProcessor::~UpdateProcessor() { } void UpdateProcessor::Cancel() { - static const char *const me = "UpdateProcessor::Cancel"; - gpr_log(GPR_INFO, "%s: Subscription Shutdown requested.", me); + constexpr const char *const kMe = "UpdateProcessor::Cancel"; + gpr_log(GPR_INFO, "%s: Subscription Shutdown requested.", kMe); std::unique_lock guard(mutex_); if (cancelled_) { guard.unlock(); // to be nice - gpr_log(GPR_ERROR, "%s: Already cancelled.", me); + gpr_log(GPR_ERROR, "%s: Already cancelled.", kMe); return; } cancelled_ = true; @@ -223,12 +222,12 @@ void UpdateProcessor::RunUntilCancelled(std::shared_ptr self) { void UpdateProcessor::RunForeverHelper() { // Reuse the chunk for efficiency. - arrow::flight::FlightStreamChunk flight_stream_chunk; BarrageProcessor bp(schema_); // Process Arrow Flight messages until error or cancellation. while (true) { - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr_->Next(&flight_stream_chunk))); - const auto &cols = flight_stream_chunk.data->columns(); + auto chunk = fsr_->Next(); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(chunk)); + const auto &cols = chunk->data->columns(); auto column_sources = MakeReservedVector>(cols.size()); auto sizes = MakeReservedVector(cols.size()); for (const auto &col : cols) { @@ -239,9 +238,9 @@ void UpdateProcessor::RunForeverHelper() { const void *metadata = nullptr; size_t metadata_size = 0; - if (flight_stream_chunk.app_metadata != nullptr) { - metadata = flight_stream_chunk.app_metadata->data(); - metadata_size = flight_stream_chunk.app_metadata->size(); + if (chunk->app_metadata != nullptr) { + metadata = chunk->app_metadata->data(); + metadata_size = chunk->app_metadata->size(); } auto result = bp.ProcessNextChunk(column_sources, sizes, metadata, metadata_size); @@ -307,9 +306,9 @@ ColumnSourceAndSize ArrayToColumnSource(const arrow::Array &array) { throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message)); } - const auto listElement = list_array->GetScalar(0).ValueOrDie(); + const auto list_element = list_array->GetScalar(0).ValueOrDie(); const auto *list_scalar = VerboseCast( - DEEPHAVEN_LOCATION_EXPR(listElement.get())); + DEEPHAVEN_LOCATION_EXPR(list_element.get())); const auto &list_scalar_value = list_scalar->value; ArrayToColumnSourceVisitor v(list_scalar_value); diff --git a/cpp-client/deephaven/dhclient/src/utility/table_maker.cc b/cpp-client/deephaven/dhclient/src/utility/table_maker.cc index 68d58301737..b7e4c6515c5 100644 --- a/cpp-client/deephaven/dhclient/src/utility/table_maker.cc +++ b/cpp-client/deephaven/dhclient/src/utility/table_maker.cc @@ -2,7 +2,6 @@ * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending */ #include "deephaven/client/flight.h" -#include "deephaven/client/flight.h" #include "deephaven/client/utility/table_maker.h" #include "deephaven/client/utility/arrow_util.h" #include "deephaven/dhcore/utility/utility.h" @@ -19,11 +18,11 @@ TableMaker::TableMaker() = default; TableMaker::~TableMaker() = default; void TableMaker::FinishAddColumn(std::string name, internal::TypeConverter info) { - auto kvMetadata = std::make_shared(); - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(kvMetadata->Set("deephaven:type", info.DeephavenType()))); + auto kv_metadata = std::make_shared(); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(kv_metadata->Set("deephaven:type", info.DeephavenType()))); auto field = std::make_shared(std::move(name), std::move(info.DataType()), true, - std::move(kvMetadata)); + std::move(kv_metadata)); OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder_.AddField(field))); if (columns_.empty()) { @@ -42,30 +41,28 @@ TableHandle TableMaker::MakeTable(const TableHandleManager &manager) { auto wrapper = manager.CreateFlightWrapper(); auto ticket = manager.NewTicket(); - auto flightDescriptor = ConvertTicketToFlightDescriptor(ticket); + auto flight_descriptor = ConvertTicketToFlightDescriptor(ticket); arrow::flight::FlightCallOptions options; wrapper.AddHeaders(&options); - std::unique_ptr fsw; - std::unique_ptr fmr; - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(wrapper.FlightClient()->DoPut(options, flightDescriptor, - schema, &fsw, &fmr))); + auto res = wrapper.FlightClient()->DoPut(options, flight_descriptor, schema); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res)); auto batch = arrow::RecordBatch::Make(schema, numRows_, std::move(columns_)); - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteRecordBatch(*batch))); - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->DoneWriting())); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteRecordBatch(*batch))); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->DoneWriting())); std::shared_ptr buf; - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fmr->ReadMetadata(&buf))); - OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->Close())); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->reader->ReadMetadata(&buf))); + OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->Close())); return manager.MakeTableHandleFromTicket(std::move(ticket)); } namespace internal { -TypeConverter::TypeConverter(std::shared_ptr dataType, - std::string deephavenType, std::shared_ptr column) : - dataType_(std::move(dataType)), deephavenType_(std::move(deephavenType)), +TypeConverter::TypeConverter(std::shared_ptr data_type, + std::string deephaven_type, std::shared_ptr column) : + dataType_(std::move(data_type)), deephavenType_(std::move(deephaven_type)), column_(std::move(column)) {} TypeConverter::~TypeConverter() = default;