Skip to content

Commit

Permalink
C++ Client: more avoid deprecated Arrow, more conform to style guide (d…
Browse files Browse the repository at this point in the history
…eephaven#4858)

* C++ Client: more conform to deprecated Arrow, more conform coding style

* arrow stuff to include elsewhere
  • Loading branch information
kosak authored Nov 20, 2023
1 parent 89357e4 commit fe4b551
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 41 deletions.
10 changes: 4 additions & 6 deletions cpp-client/deephaven/dhclient/src/impl/table_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,13 @@ std::shared_ptr<Schema> TableHandleImpl::Schema() {
);

auto fd = ConvertTicketToFlightDescriptor(ticket_.ticket());
std::unique_ptr<arrow::flight::SchemaResult> schema_result;
auto gs_result = server->FlightClient()->GetSchema(options, fd, &schema_result);
auto gs_result = server->FlightClient()->GetSchema(options, fd);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(gs_result));

std::shared_ptr<arrow::Schema> arrow_schema;
auto schema_res = schema_result->GetSchema(nullptr, &arrow_schema);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_res));
auto schema_result = (*gs_result)->GetSchema(nullptr);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_result));

const auto &fields = arrow_schema->fields();
const auto &fields = (*schema_result)->fields();
auto names = MakeReservedVector<std::string>(fields.size());
auto types = MakeReservedVector<ElementTypeId::Enum>(fields.size());
for (const auto &f: fields) {
Expand Down
37 changes: 18 additions & 19 deletions cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SubscribeState final {
class UpdateProcessor final : public SubscriptionHandle {
public:
[[nodiscard]]
static std::shared_ptr<UpdateProcessor> startThread(std::unique_ptr<FlightStreamReader> fsr,
static std::shared_ptr<UpdateProcessor> StartThread(std::unique_ptr<FlightStreamReader> fsr,
std::unique_ptr<FlightStreamWriter> fsw, std::shared_ptr<Schema> schema,
std::shared_ptr<TickingCallback> callback);

Expand Down Expand Up @@ -158,22 +158,21 @@ std::shared_ptr<SubscriptionHandle> SubscribeState::InvokeHelper() {

descriptor.type = arrow::flight::FlightDescriptor::DescriptorType::CMD;
descriptor.cmd = std::string(magic_data, 4);
std::unique_ptr<FlightStreamWriter> fsw;
std::unique_ptr<FlightStreamReader> fsr;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(client->DoExchange(fco, descriptor, &fsw, &fsr)));
auto res = client->DoExchange(fco, descriptor);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));

auto sub_req_raw = BarrageProcessor::CreateSubscriptionRequest(ticketBytes_.data(),
ticketBytes_.size());
auto buffer = std::make_shared<OwningBuffer>(std::move(sub_req_raw));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteMetadata(std::move(buffer))));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteMetadata(std::move(buffer))));

// Run forever (until error or cancellation)
auto processor = UpdateProcessor::startThread(std::move(fsr), std::move(fsw), std::move(schema_),
std::move(callback_));
auto processor = UpdateProcessor::StartThread(std::move(res->reader), std::move(res->writer),
std::move(schema_), std::move(callback_));
return processor;
}

std::shared_ptr<UpdateProcessor> UpdateProcessor::startThread(
std::shared_ptr<UpdateProcessor> UpdateProcessor::StartThread(
std::unique_ptr<FlightStreamReader> fsr,
std::unique_ptr<FlightStreamWriter> fsw,
std::shared_ptr<Schema> schema,
Expand All @@ -195,12 +194,12 @@ UpdateProcessor::~UpdateProcessor() {
}

void UpdateProcessor::Cancel() {
static const char *const me = "UpdateProcessor::Cancel";
gpr_log(GPR_INFO, "%s: Subscription Shutdown requested.", me);
constexpr const char *const kMe = "UpdateProcessor::Cancel";
gpr_log(GPR_INFO, "%s: Subscription Shutdown requested.", kMe);
std::unique_lock guard(mutex_);
if (cancelled_) {
guard.unlock(); // to be nice
gpr_log(GPR_ERROR, "%s: Already cancelled.", me);
gpr_log(GPR_ERROR, "%s: Already cancelled.", kMe);
return;
}
cancelled_ = true;
Expand All @@ -223,12 +222,12 @@ void UpdateProcessor::RunUntilCancelled(std::shared_ptr<UpdateProcessor> self) {

void UpdateProcessor::RunForeverHelper() {
// Reuse the chunk for efficiency.
arrow::flight::FlightStreamChunk flight_stream_chunk;
BarrageProcessor bp(schema_);
// Process Arrow Flight messages until error or cancellation.
while (true) {
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr_->Next(&flight_stream_chunk)));
const auto &cols = flight_stream_chunk.data->columns();
auto chunk = fsr_->Next();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(chunk));
const auto &cols = chunk->data->columns();
auto column_sources = MakeReservedVector<std::shared_ptr<ColumnSource>>(cols.size());
auto sizes = MakeReservedVector<size_t>(cols.size());
for (const auto &col : cols) {
Expand All @@ -239,9 +238,9 @@ void UpdateProcessor::RunForeverHelper() {

const void *metadata = nullptr;
size_t metadata_size = 0;
if (flight_stream_chunk.app_metadata != nullptr) {
metadata = flight_stream_chunk.app_metadata->data();
metadata_size = flight_stream_chunk.app_metadata->size();
if (chunk->app_metadata != nullptr) {
metadata = chunk->app_metadata->data();
metadata_size = chunk->app_metadata->size();
}
auto result = bp.ProcessNextChunk(column_sources, sizes, metadata, metadata_size);

Expand Down Expand Up @@ -307,9 +306,9 @@ ColumnSourceAndSize ArrayToColumnSource(const arrow::Array &array) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

const auto listElement = list_array->GetScalar(0).ValueOrDie();
const auto list_element = list_array->GetScalar(0).ValueOrDie();
const auto *list_scalar = VerboseCast<const arrow::ListScalar *>(
DEEPHAVEN_LOCATION_EXPR(listElement.get()));
DEEPHAVEN_LOCATION_EXPR(list_element.get()));
const auto &list_scalar_value = list_scalar->value;

ArrayToColumnSourceVisitor v(list_scalar_value);
Expand Down
29 changes: 13 additions & 16 deletions cpp-client/deephaven/dhclient/src/utility/table_maker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
#include "deephaven/client/flight.h"
#include "deephaven/client/flight.h"
#include "deephaven/client/utility/table_maker.h"
#include "deephaven/client/utility/arrow_util.h"
#include "deephaven/dhcore/utility/utility.h"
Expand All @@ -19,11 +18,11 @@ TableMaker::TableMaker() = default;
TableMaker::~TableMaker() = default;

void TableMaker::FinishAddColumn(std::string name, internal::TypeConverter info) {
auto kvMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(kvMetadata->Set("deephaven:type", info.DeephavenType())));
auto kv_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(kv_metadata->Set("deephaven:type", info.DeephavenType())));

auto field = std::make_shared<arrow::Field>(std::move(name), std::move(info.DataType()), true,
std::move(kvMetadata));
std::move(kv_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder_.AddField(field)));

if (columns_.empty()) {
Expand All @@ -42,30 +41,28 @@ TableHandle TableMaker::MakeTable(const TableHandleManager &manager) {

auto wrapper = manager.CreateFlightWrapper();
auto ticket = manager.NewTicket();
auto flightDescriptor = ConvertTicketToFlightDescriptor(ticket);
auto flight_descriptor = ConvertTicketToFlightDescriptor(ticket);

arrow::flight::FlightCallOptions options;
wrapper.AddHeaders(&options);

std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(wrapper.FlightClient()->DoPut(options, flightDescriptor,
schema, &fsw, &fmr)));
auto res = wrapper.FlightClient()->DoPut(options, flight_descriptor, schema);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));
auto batch = arrow::RecordBatch::Make(schema, numRows_, std::move(columns_));

OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteRecordBatch(*batch)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->DoneWriting()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteRecordBatch(*batch)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->DoneWriting()));

std::shared_ptr<arrow::Buffer> buf;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fmr->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->Close()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->reader->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->Close()));
return manager.MakeTableHandleFromTicket(std::move(ticket));
}

namespace internal {
TypeConverter::TypeConverter(std::shared_ptr<arrow::DataType> dataType,
std::string deephavenType, std::shared_ptr<arrow::Array> column) :
dataType_(std::move(dataType)), deephavenType_(std::move(deephavenType)),
TypeConverter::TypeConverter(std::shared_ptr<arrow::DataType> data_type,
std::string deephaven_type, std::shared_ptr<arrow::Array> column) :
dataType_(std::move(data_type)), deephavenType_(std::move(deephaven_type)),
column_(std::move(column)) {}
TypeConverter::~TypeConverter() = default;

Expand Down

0 comments on commit fe4b551

Please sign in to comment.