Skip to content

Commit

Permalink
C++ Client: a few more date tests; arrow deprecation; more style guide
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Nov 29, 2023
1 parent 580e220 commit 0c85e1c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 48 deletions.
3 changes: 2 additions & 1 deletion cpp-client/deephaven/dhclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ set(ALL_FILES
include/private/deephaven/client/subscription/subscribe_thread.h
include/private/deephaven/client/subscription/subscription_handle.h

src/utility/arrow_util.cc
src/utility/executor.cc
include/private/deephaven/client/utility/executor.h

src/utility/arrow_util.cc
src/utility/table_maker.cc

include/public/deephaven/client/utility/arrow_util.h
include/public/deephaven/client/utility/misc_types.h
include/public/deephaven/client/utility/table_maker.h
Expand Down
80 changes: 39 additions & 41 deletions cpp-client/deephaven/dhclient/src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,33 @@ const char *const Server::kAuthorizationKey = "authorization";

namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_Resp);
const ConfigurationConstantsResponse &cc_resp);

const char *timeoutKey = "http.session.durationMs";
constexpr const char *kTimeoutKey = "http.session.durationMs";

// A handshake resend interval to use as a default if our normal interval calculation
// fails, e.g. due to GRPC errors.
constexpr const auto kHandshakeResendInterval = std::chrono::seconds(5);
} // namespace

namespace {
std::shared_ptr<grpc::ChannelCredentials> getCredentials(
const bool useTls,
const std::string &tlsRootCerts,
const std::string &clientCertChain,
const std::string &clientPrivateKey) {
if (!useTls) {
std::shared_ptr<grpc::ChannelCredentials> GetCredentials(
const bool use_tls,
const std::string &tls_root_certs,
const std::string &client_root_chain,
const std::string &client_private_key) {
if (!use_tls) {
return grpc::InsecureChannelCredentials();
}
grpc::SslCredentialsOptions options;
if (!tlsRootCerts.empty()) {
options.pem_root_certs = tlsRootCerts;
if (!tls_root_certs.empty()) {
options.pem_root_certs = tls_root_certs;
}
if (!clientCertChain.empty()) {
options.pem_cert_chain = clientCertChain;
if (!client_root_chain.empty()) {
options.pem_cert_chain = client_root_chain;
}
if (!clientPrivateKey.empty()) {
options.pem_private_key = clientPrivateKey;
if (!client_private_key.empty()) {
options.pem_private_key = client_private_key;
}
return grpc::SslCredentials(options);
}
Expand All @@ -120,7 +120,7 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.generic_options.emplace_back(opt.first, opt.second);
}

auto credentials = getCredentials(
auto credentials = GetCredentials(
client_options.UseTls(),
client_options.TlsRootCerts(),
client_options.ClientCertChain(),
Expand All @@ -145,13 +145,12 @@ std::shared_ptr<Server> Server::CreateFromTarget(
auto its = InputTableService::NewStub(channel);

// TODO(kosak): Warn about this string conversion or do something more general.
auto flightTarget = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;
arrow::flight::Location location;
auto flight_target = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;

auto rc1 = arrow::flight::Location::Parse(flightTarget, &location);
if (!rc1.ok()) {
auto location_res = arrow::flight::Location::Parse(flight_target);
if (!location_res.ok()) {
auto message = Stringf("Location::Parse(%o) failed, error = %o",
flightTarget, rc1.ToString());
flight_target, location_res.status());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

Expand All @@ -165,33 +164,32 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.private_key = client_options.ClientPrivateKey();
}

std::unique_ptr<arrow::flight::FlightClient> fc;
auto rc2 = arrow::flight::FlightClient::Connect(location, options, &fc);
if (!rc2.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", rc2.ToString());
auto client_res = arrow::flight::FlightClient::Connect(*location_res, options);
if (!client_res.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", client_res.status());
throw std::runtime_error(message);
}
gpr_log(GPR_DEBUG,
"%s: "
"FlightClient(%p) created, "
"target=%s",
"Server::CreateFromTarget",
static_cast<void*>(fc.get()),
static_cast<void*>(client_res->get()),
target.c_str());

std::string sessionToken;
std::chrono::milliseconds expirationInterval;
auto sendTime = std::chrono::system_clock::now();
std::string session_token;
std::chrono::milliseconds expiration_interval;
auto send_time = std::chrono::system_clock::now();
{
ConfigurationConstantsRequest ccReq;
ConfigurationConstantsResponse ccResp;
ConfigurationConstantsRequest cc_req;
ConfigurationConstantsResponse cc_resp;
grpc::ClientContext ctx;
ctx.AddMetadata(kAuthorizationKey, client_options.AuthorizationValue());
for (const auto &header : client_options.ExtraHeaders()) {
ctx.AddMetadata(header.first, header.second);
}

auto result = cfs->GetConfigurationConstants(&ctx, ccReq, &ccResp);
auto result = cfs->GetConfigurationConstants(&ctx, cc_req, &cc_resp);

if (!result.ok()) {
auto message = Stringf("Can't get configuration constants. Error %o: %o",
Expand All @@ -205,29 +203,29 @@ std::shared_ptr<Server> Server::CreateFromTarget(
throw std::runtime_error(
DEEPHAVEN_LOCATION_STR("Configuration response didn't contain authorization token"));
}
sessionToken.assign(ip->second.begin(), ip->second.end());
session_token.assign(ip->second.begin(), ip->second.end());

// Get expiration interval.
auto expInt = ExtractExpirationInterval(ccResp);
if (expInt.has_value()) {
expirationInterval = *expInt;
auto exp_int = ExtractExpirationInterval(cc_resp);
if (exp_int.has_value()) {
expiration_interval = *exp_int;
} else {
expirationInterval = std::chrono::seconds(10);
expiration_interval = std::chrono::seconds(10);
}
}

auto nextHandshakeTime = sendTime + expirationInterval;
auto next_handshake_time = send_time + expiration_interval;

auto result = std::make_shared<Server>(Private(), std::move(as), std::move(cs),
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(fc),
client_options.ExtraHeaders(), std::move(sessionToken), expirationInterval, nextHandshakeTime);
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(*client_res),
client_options.ExtraHeaders(), std::move(session_token), expiration_interval, next_handshake_time);
result->keepAliveThread_ = std::thread(&SendKeepaliveMessages, result);
gpr_log(GPR_DEBUG,
"%s: "
"Server(%p) created, "
"target=%s",
"Server::CreateFromTarget",
(void*) result.get(),
static_cast<void*>(result.get()),
target.c_str());
return result;
}
Expand Down Expand Up @@ -461,7 +459,7 @@ void Server::ForEachHeaderNameAndValue(
namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_resp) {
auto ip2 = cc_resp.config_values().find(timeoutKey);
auto ip2 = cc_resp.config_values().find(kTimeoutKey);
if (ip2 == cc_resp.config_values().end() || !ip2->second.has_string_value()) {
return {};
}
Expand Down
5 changes: 4 additions & 1 deletion cpp-client/deephaven/tests/cython_support_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#include <array>
#include <type_traits>
#include "deephaven/dhcore/chunk/chunk.h"
#include "deephaven/dhcore/column/array_column_source.h"
Expand Down Expand Up @@ -28,7 +29,9 @@ TEST_CASE("CreateStringColumnsSource", "[cython]") {
static_assert((kNumElements + 7) / 8 == kValidity.size());

auto result = CythonSupport::CreateStringColumnSource(kText, kText + kTextSize,
kOffsets.begin(), kOffsets.end(), kValidity.begin(), kValidity.end(), kNumElements);
kOffsets.data(), kOffsets.data() + kOffsets.size(),
kValidity.data(), kValidity.data() + kValidity.size(),
kNumElements);

auto rs = RowSequence::CreateSequential(0, kNumElements);
auto data = dhcore::chunk::StringChunk::Create(kNumElements);
Expand Down
11 changes: 6 additions & 5 deletions cpp-client/deephaven/tests/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ Client TableMakerForTests::CreateClient(const ClientOptions &options) {
}

TableMakerForTests::TableMakerForTests(TableMakerForTests::ClientType &&client,
TableHandle &&test_table, ColumnNamesForTests &&column_names, ColumnDataForTests &&columnData) :
TableHandle &&test_table, ColumnNamesForTests &&column_names, ColumnDataForTests &&column_data) :
client_(std::move(client)),
testTable_(std::move(test_table)), columnNames_(std::move(column_names)),
columnData_(std::move(columnData)) {}
columnData_(std::move(column_data)) {}

TableMakerForTests::TableMakerForTests(TableMakerForTests &&) noexcept = default;
TableMakerForTests &TableMakerForTests::operator=(TableMakerForTests &&) noexcept = default;
Expand Down Expand Up @@ -208,16 +208,17 @@ void CompareTableHelper(int depth, const std::shared_ptr<arrow::Table> &table,

std::shared_ptr<arrow::Table> BasicValidate(const deephaven::client::TableHandle &table, int expected_columns) {
auto fsr = table.GetFlightStreamReader();
std::shared_ptr<arrow::Table> arrow_table;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->ReadAll(&arrow_table)));
auto table_res = fsr->ToTable();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(table_res));

auto &arrow_table = *table_res;
if (expected_columns != arrow_table->num_columns()) {
auto message = Stringf("Expected %o columns, but Table actually has %o columns",
expected_columns, arrow_table->num_columns());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

return arrow_table;
return std::move(arrow_table);
}
} // namespace internal
} // namespace deephaven::client::tests

0 comments on commit 0c85e1c

Please sign in to comment.