Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ Client: update examples, demos, and docs with style guide, Arrow deprecation #4860

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "deephaven/client/client.h"
#include "deephaven/client/flight.h"
#include "deephaven/client/utility/table_maker.h"
#include "deephaven/dhcore/utility/utility.h"

using deephaven::client::Client;
using deephaven::client::TableHandle;
Expand All @@ -13,6 +14,7 @@ using deephaven::client::utility::ConvertTicketToFlightDescriptor;
using deephaven::client::utility::OkOrThrow;
using deephaven::client::utility::TableMaker;
using deephaven::client::utility::ValueOrThrow;
using deephaven::dhcore::utility::GetWhat;

namespace {
void Doit(const TableHandleManager &manager);
Expand All @@ -23,7 +25,7 @@ int main(int argc, char *argv[]) {
const char *server = "localhost:10000";
if (argc > 1) {
if (argc != 2 || std::strcmp("-h", argv[1]) == 0) {
std::cerr << "Usage: " << argv[0] << " [host:port]" << std::endl;
std::cerr << "Usage: " << argv[0] << " [host:port]\n";
std::exit(1);
}
server = argv[1];
Expand All @@ -33,72 +35,72 @@ int main(int argc, char *argv[]) {
auto client = Client::Connect(server);
auto manager = client.GetManager();
Doit(manager);
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
} catch (...) {
std::cerr << "Caught exception: " << GetWhat(std::current_exception()) << '\n';
}
}

namespace {
void Doit(const TableHandleManager &manager) {
// 1. Build schema
arrow::SchemaBuilder schemaBuilder;
arrow::SchemaBuilder schema_builder;

// 2. Add "Symbol" column (type: string) to schema
{
auto symbolMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbolMetadata->Set("deephaven:type", "java.lang.String")));
auto symbolField = std::make_shared<arrow::Field>("Symbol",
std::make_shared<arrow::StringType>(), true, std::move(symbolMetadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.AddField(symbolField)));
auto symbol_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbol_metadata->Set("deephaven:type", "java.lang.String")));
auto symbol_field = std::make_shared<arrow::Field>("Symbol",
std::make_shared<arrow::StringType>(), true, std::move(symbol_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.AddField(symbol_field)));
}

// 3. Add "Price" column (type: double) to schema
{
auto priceMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(priceMetadata->Set("deephaven:type", "double")));
auto priceField = std::make_shared<arrow::Field>("Price",
std::make_shared<arrow::DoubleType>(), true, std::move(priceMetadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.AddField(priceField)));
auto price_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(price_metadata->Set("deephaven:type", "double")));
auto price_field = std::make_shared<arrow::Field>("Price",
std::make_shared<arrow::DoubleType>(), true, std::move(price_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.AddField(price_field)));
}

// 4. Add "Volume" column (type: int32) to schema
{
auto volumeMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volumeMetadata->Set("deephaven:type", "int")));
auto volumeField = std::make_shared<arrow::Field>("Volume",
std::make_shared<arrow::Int32Type>(), true, std::move(volumeMetadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.AddField(volumeField)));
auto volume_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volume_metadata->Set("deephaven:type", "int")));
auto volume_field = std::make_shared<arrow::Field>("Volume",
std::make_shared<arrow::Int32Type>(), true, std::move(volume_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.AddField(volume_field)));
}

// 4. Schema is done
auto schema = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.Finish()));
auto schema = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.Finish()));

// 5. Prepare symbol, price, and volume data cells
std::vector<std::string> symbols{"FB", "AAPL", "NFLX", "GOOG"};
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
std::vector<int32_t> volumes{1000, 2000, 3000, 4000};
auto numRows = symbols.size();
if (numRows != prices.size() || numRows != volumes.size()) {
throw DEEPHAVEN_LOCATION_EXPR(std::runtime_error(DEEPHAVEN_LOCATION_STR("sizes don't match")));
auto num_rows = symbols.size();
if (num_rows != prices.size() || num_rows != volumes.size()) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("sizes don't match"));
}

// 6. Move data to Arrow column builders
arrow::StringBuilder symbolBuilder;
arrow::DoubleBuilder priceBuilder;
arrow::Int32Builder volumeBuilder;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbolBuilder.AppendValues(symbols)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(priceBuilder.AppendValues(prices)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volumeBuilder.AppendValues(volumes)));
arrow::StringBuilder symbol_builder;
arrow::DoubleBuilder price_builder;
arrow::Int32Builder volume_builder;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbol_builder.AppendValues(symbols)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(price_builder.AppendValues(prices)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volume_builder.AppendValues(volumes)));

auto symbolArray = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(symbolBuilder.Finish()));
auto priceArray = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(priceBuilder.Finish()));
auto volumeArray = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(volumeBuilder.Finish()));
auto symbol_array = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(symbol_builder.Finish()));
auto price_array = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(price_builder.Finish()));
auto volume_array = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(volume_builder.Finish()));

// 7. Get Arrow columns from builders
std::vector<std::shared_ptr<arrow::Array>> columns = {
std::move(symbolArray),
std::move(priceArray),
std::move(volumeArray)
std::move(symbol_array),
std::move(price_array),
std::move(volume_array)
};

// 8. Get a Deephaven "FlightWrapper" object to access Arrow Flight
Expand All @@ -116,24 +118,23 @@ void Doit(const TableHandleManager &manager) {
auto fd = deephaven::client::utility::ConvertTicketToFlightDescriptor(ticket);

// 12. Perform the doPut
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(wrapper.FlightClient()->DoPut(options, fd, schema, &fsw, &fmr)));
auto res = wrapper.FlightClient()->DoPut(options, fd, schema);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));

// 13. Make a RecordBatch containing both the schema and the data
auto batch = arrow::RecordBatch::Make(schema, static_cast<std::int64_t>(numRows), std::move(columns));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteRecordBatch(*batch)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->DoneWriting()));
auto batch = arrow::RecordBatch::Make(schema, static_cast<std::int64_t>(num_rows), std::move(columns));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteRecordBatch(*batch)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->DoneWriting()));

// 14. Read back a metadata message (ignored), then close the Writer
std::shared_ptr<arrow::Buffer> buf;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fmr->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->Close()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->reader->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->Close()));

// 15. Now that the table is ready, bind the ticket to a TableHandle.
auto table = manager.MakeTableHandleFromTicket(ticket);

// 16. Use Deephaven high level operations to fetch the table and print it
std::cout << "table is:\n" << table.Stream(true) << std::endl;
std::cout << "table is:\n" << table.Stream(true) << '\n';
}
} // namespace
10 changes: 5 additions & 5 deletions cpp-client/deephaven/examples/demos/chapter1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ void PrintTable(const TableHandle &table, bool null_aware) {
auto fsr = table.GetFlightStreamReader();

while (true) {
arrow::flight::FlightStreamChunk chunk;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->Next(&chunk)));
if (chunk.data == nullptr) {
auto chunk = fsr->Next();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(chunk));
if (chunk->data == nullptr) {
break;
}

auto int64_data = chunk.data->GetColumnByName("Int64Value");
auto int64_data = chunk->data->GetColumnByName("Int64Value");
CheckNotNull(int64_data.get(), DEEPHAVEN_LOCATION_STR("Int64Value column not found"));
auto double_data = chunk.data->GetColumnByName("DoubleValue");
auto double_data = chunk->data->GetColumnByName("DoubleValue");
CheckNotNull(double_data.get(), DEEPHAVEN_LOCATION_STR("DoubleValue column not found"));

auto int64_array = std::dynamic_pointer_cast<arrow::Int64Array>(int64_data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Consider the following program from ``cpp-examples/make_table``:
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
tm.addColumn("Symbol", symbols);
tm.addColumn("Price", prices);
auto table = tm.makeTable(manager, "myTable");
auto table = tm.MakeTable(manager, "myTable");

std::cout << "table is:\n" << table.stream(true) << std::endl;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-client/deephaven/examples/doc/fluent.rst
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ are created as the result of relational operators on other expressions. For exam
TableMaker tm;
tm.addColumn("A", aValues);
tm.addColumn("S", sValues);
auto temp = tm.makeTable(manager);
auto temp = tm.MakeTable(manager);
auto a = temp.getNumCol("A");
auto result = temp.where(a > 15);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ Consider the following program from ``cpp-examples/read_table_with_arrow_flight`
auto manager = client.getManager();

try {
auto table = makeTable(manager);
dumpSymbolColumn(table);
auto table = MakeTable(manager);
DumpSymbolColumn(table);
} catch (const std::runtime_error &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
}
Expand Down
36 changes: 17 additions & 19 deletions cpp-client/deephaven/examples/read_csv/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ arrow::Status Doit(const TableHandleManager &manager, const std::string &csvfn);
int main(int argc, char* argv[]) {
const char *server = "localhost:10000";
if (argc != 2 && argc != 3) {
std::cerr << "Usage: " << argv[0] << " [host:port] filename" << std::endl;
std::cerr << "Usage: " << argv[0] << " [host:port] filename\n";
std::exit(1);
}
int c = 1;
Expand All @@ -41,7 +41,7 @@ int main(int argc, char* argv[]) {
auto manager = client.GetManager();
auto st = Doit(manager, filename);
if (!st.ok()) {
std::cerr << "Failed with status " << st << std::endl;
std::cerr << "Failed with status " << st << '\n';
}
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
Expand Down Expand Up @@ -74,31 +74,29 @@ arrow::Status Doit(const TableHandleManager &manager, const std::string &csvfn)
wrapper.AddHeaders(&options);

auto fd = ConvertTicketToFlightDescriptor(ticket);
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(
wrapper.FlightClient()->DoPut(options, fd, arrow_table->schema(), &fsw, &fmr)));
auto res = wrapper.FlightClient()->DoPut(options, fd, arrow_table->schema());
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));

const auto &srcColumns = arrow_table->columns();
const size_t ncols = srcColumns.size();
const size_t nchunks = srcColumns[0]->num_chunks();
std::vector<std::shared_ptr<arrow::Array>> destColumns(ncols);
for (size_t chunkIndex = 0; chunkIndex < nchunks; ++chunkIndex) {
for (size_t colIndex = 0; colIndex < ncols; ++colIndex) {
destColumns[colIndex] = srcColumns[colIndex]->chunk(chunkIndex);
const auto &src_columns = arrow_table->columns();
const size_t ncols = src_columns.size();
const size_t nchunks = src_columns[0]->num_chunks();
std::vector<std::shared_ptr<arrow::Array>> dest_columns(ncols);
for (size_t chunk_index = 0; chunk_index < nchunks; ++chunk_index) {
for (size_t col_index = 0; col_index < ncols; ++col_index) {
dest_columns[col_index] = src_columns[col_index]->chunk(chunk_index);
}
auto batch = arrow::RecordBatch::Make(arrow_table->schema(), destColumns[0]->length(), destColumns);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteRecordBatch(*batch)));
auto batch = arrow::RecordBatch::Make(arrow_table->schema(), dest_columns[0]->length(), dest_columns);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteRecordBatch(*batch)));
}

OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->DoneWriting()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->DoneWriting()));

std::shared_ptr<arrow::Buffer> buf;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fmr->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->Close()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->reader->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->Close()));

auto table_handle = manager.MakeTableHandleFromTicket(ticket);
std::cout << "table is:\n" << table_handle.Stream(true) << std::endl;
std::cout << "table is:\n" << table_handle.Stream(true) << '\n';
table_handle.BindToVariable("showme");
return arrow::Status::OK();
}
Expand Down
46 changes: 23 additions & 23 deletions cpp-client/deephaven/examples/read_table_with_arrow_flight/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ using deephaven::client::utility::OkOrThrow;
using deephaven::client::utility::TableMaker;

namespace {
TableHandle makeTable(const TableHandleManager &manager);
void dumpSymbolColumn(const TableHandle &tableHandle);
TableHandle MakeTable(const TableHandleManager &manager);
void DumpSymbolColumn(const TableHandle &table_handle);
} // namespace

int main(int argc, char *argv[]) {
const char *server = "localhost:10000";
if (argc > 1) {
if (argc != 2 || std::strcmp("-h", argv[1]) == 0) {
std::cerr << "Usage: " << argv[0] << " [host:port]" << std::endl;
std::cerr << "Usage: " << argv[0] << " [host:port]\n";
std::exit(1);
}
server = argv[1];
Expand All @@ -29,15 +29,15 @@ int main(int argc, char *argv[]) {
try {
auto client = Client::Connect(server);
auto manager = client.GetManager();
auto table = makeTable(manager);
dumpSymbolColumn(table);
auto table = MakeTable(manager);
DumpSymbolColumn(table);
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
}
}

namespace {
TableHandle makeTable(const TableHandleManager &manager) {
TableHandle MakeTable(const TableHandleManager &manager) {
TableMaker tm;
std::vector<std::string> symbols{"FB", "AAPL", "NFLX", "GOOG"};
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
Expand All @@ -46,40 +46,40 @@ TableHandle makeTable(const TableHandleManager &manager) {
return tm.MakeTable(manager);
}

void dumpSymbolColumn(const TableHandle &tableHandle) {
auto fsr = tableHandle.GetFlightStreamReader();
void DumpSymbolColumn(const TableHandle &table_handle) {
auto fsr = table_handle.GetFlightStreamReader();
while (true) {
arrow::flight::FlightStreamChunk chunk;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->Next(&chunk)));
if (chunk.data == nullptr) {
auto res = fsr->Next();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));
if (res->data == nullptr) {
break;
}

auto symbolChunk = chunk.data->GetColumnByName("Symbol");
if (symbolChunk == nullptr) {
auto symbol_chunk = res->data->GetColumnByName("Symbol");
if (symbol_chunk == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("Symbol column not found"));
}
auto priceChunk = chunk.data->GetColumnByName("Price");
if (priceChunk == nullptr) {
auto price_chunk = res->data->GetColumnByName("Price");
if (price_chunk == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("Price column not found"));
}

auto symbolAsStringArray = std::dynamic_pointer_cast<arrow::StringArray>(symbolChunk);
auto priceAsDoubleArray = std::dynamic_pointer_cast<arrow::DoubleArray>(priceChunk);
if (symbolAsStringArray == nullptr) {
auto symbol_as_string_array = std::dynamic_pointer_cast<arrow::StringArray>(symbol_chunk);
auto price_as_double_array = std::dynamic_pointer_cast<arrow::DoubleArray>(price_chunk);
if (symbol_as_string_array == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("symbolChunk was not an arrow::StringArray"));
}
if (priceAsDoubleArray == nullptr) {
if (price_as_double_array == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("priceChunk was not an arrow::DoubleArray"));
}

if (symbolAsStringArray->length() != priceAsDoubleArray->length()) {
if (symbol_as_string_array->length() != price_as_double_array->length()) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("Lengths differ"));
}

for (int64_t i = 0; i < symbolAsStringArray->length(); ++i) {
auto symbol = symbolAsStringArray->GetView(i);
auto price = priceAsDoubleArray->Value(i);
for (int64_t i = 0; i < symbol_as_string_array->length(); ++i) {
auto symbol = symbol_as_string_array->GetView(i);
auto price = price_as_double_array->Value(i);
std::cout << symbol << ' ' << price << '\n';
}
}
Expand Down
Loading