Skip to content

Commit

Permalink
C++ Client: a few more date tests; arrow deprecation; more style guide
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Nov 29, 2023
1 parent 7df6e46 commit 3001714
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 52 deletions.
3 changes: 2 additions & 1 deletion cpp-client/deephaven/dhclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ set(ALL_FILES
include/private/deephaven/client/subscription/subscribe_thread.h
include/private/deephaven/client/subscription/subscription_handle.h

src/utility/arrow_util.cc
src/utility/executor.cc
include/private/deephaven/client/utility/executor.h

src/utility/arrow_util.cc
src/utility/table_maker.cc

include/public/deephaven/client/utility/arrow_util.h
include/public/deephaven/client/utility/misc_types.h
include/public/deephaven/client/utility/table_maker.h
Expand Down
80 changes: 39 additions & 41 deletions cpp-client/deephaven/dhclient/src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,33 @@ const char *const Server::kAuthorizationKey = "authorization";

namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_Resp);
const ConfigurationConstantsResponse &cc_resp);

const char *timeoutKey = "http.session.durationMs";
constexpr const char *kTimeoutKey = "http.session.durationMs";

// A handshake resend interval to use as a default if our normal interval calculation
// fails, e.g. due to GRPC errors.
constexpr const auto kHandshakeResendInterval = std::chrono::seconds(5);
} // namespace

namespace {
std::shared_ptr<grpc::ChannelCredentials> getCredentials(
const bool useTls,
const std::string &tlsRootCerts,
const std::string &clientCertChain,
const std::string &clientPrivateKey) {
if (!useTls) {
std::shared_ptr<grpc::ChannelCredentials> GetCredentials(
const bool use_tls,
const std::string &tls_root_certs,
const std::string &client_root_chain,
const std::string &client_private_key) {
if (!use_tls) {
return grpc::InsecureChannelCredentials();
}
grpc::SslCredentialsOptions options;
if (!tlsRootCerts.empty()) {
options.pem_root_certs = tlsRootCerts;
if (!tls_root_certs.empty()) {
options.pem_root_certs = tls_root_certs;
}
if (!clientCertChain.empty()) {
options.pem_cert_chain = clientCertChain;
if (!client_root_chain.empty()) {
options.pem_cert_chain = client_root_chain;
}
if (!clientPrivateKey.empty()) {
options.pem_private_key = clientPrivateKey;
if (!client_private_key.empty()) {
options.pem_private_key = client_private_key;
}
return grpc::SslCredentials(options);
}
Expand All @@ -120,7 +120,7 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.generic_options.emplace_back(opt.first, opt.second);
}

auto credentials = getCredentials(
auto credentials = GetCredentials(
client_options.UseTls(),
client_options.TlsRootCerts(),
client_options.ClientCertChain(),
Expand All @@ -145,13 +145,12 @@ std::shared_ptr<Server> Server::CreateFromTarget(
auto its = InputTableService::NewStub(channel);

// TODO(kosak): Warn about this string conversion or do something more general.
auto flightTarget = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;
arrow::flight::Location location;
auto flight_target = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;

auto rc1 = arrow::flight::Location::Parse(flightTarget, &location);
if (!rc1.ok()) {
auto location_res = arrow::flight::Location::Parse(flight_target);
if (!location_res.ok()) {
auto message = Stringf("Location::Parse(%o) failed, error = %o",
flightTarget, rc1.ToString());
flight_target, location_res.status());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

Expand All @@ -165,33 +164,32 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.private_key = client_options.ClientPrivateKey();
}

std::unique_ptr<arrow::flight::FlightClient> fc;
auto rc2 = arrow::flight::FlightClient::Connect(location, options, &fc);
if (!rc2.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", rc2.ToString());
auto client_res = arrow::flight::FlightClient::Connect(*location_res, options);
if (!client_res.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", client_res.status());
throw std::runtime_error(message);
}
gpr_log(GPR_DEBUG,
"%s: "
"FlightClient(%p) created, "
"target=%s",
"Server::CreateFromTarget",
static_cast<void*>(fc.get()),
static_cast<void*>(client_res->get()),
target.c_str());

std::string sessionToken;
std::chrono::milliseconds expirationInterval;
auto sendTime = std::chrono::system_clock::now();
std::string session_token;
std::chrono::milliseconds expiration_interval;
auto send_time = std::chrono::system_clock::now();
{
ConfigurationConstantsRequest ccReq;
ConfigurationConstantsResponse ccResp;
ConfigurationConstantsRequest cc_req;
ConfigurationConstantsResponse cc_resp;
grpc::ClientContext ctx;
ctx.AddMetadata(kAuthorizationKey, client_options.AuthorizationValue());
for (const auto &header : client_options.ExtraHeaders()) {
ctx.AddMetadata(header.first, header.second);
}

auto result = cfs->GetConfigurationConstants(&ctx, ccReq, &ccResp);
auto result = cfs->GetConfigurationConstants(&ctx, cc_req, &cc_resp);

if (!result.ok()) {
auto message = Stringf("Can't get configuration constants. Error %o: %o",
Expand All @@ -205,29 +203,29 @@ std::shared_ptr<Server> Server::CreateFromTarget(
throw std::runtime_error(
DEEPHAVEN_LOCATION_STR("Configuration response didn't contain authorization token"));
}
sessionToken.assign(ip->second.begin(), ip->second.end());
session_token.assign(ip->second.begin(), ip->second.end());

// Get expiration interval.
auto expInt = ExtractExpirationInterval(ccResp);
if (expInt.has_value()) {
expirationInterval = *expInt;
auto exp_int = ExtractExpirationInterval(cc_resp);
if (exp_int.has_value()) {
expiration_interval = *exp_int;
} else {
expirationInterval = std::chrono::seconds(10);
expiration_interval = std::chrono::seconds(10);
}
}

auto nextHandshakeTime = sendTime + expirationInterval;
auto next_handshake_time = send_time + expiration_interval;

auto result = std::make_shared<Server>(Private(), std::move(as), std::move(cs),
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(fc),
client_options.ExtraHeaders(), std::move(sessionToken), expirationInterval, nextHandshakeTime);
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(*client_res),
client_options.ExtraHeaders(), std::move(session_token), expiration_interval, next_handshake_time);
result->keepAliveThread_ = std::thread(&SendKeepaliveMessages, result);
gpr_log(GPR_DEBUG,
"%s: "
"Server(%p) created, "
"target=%s",
"Server::CreateFromTarget",
(void*) result.get(),
static_cast<void*>(result.get()),
target.c_str());
return result;
}
Expand Down Expand Up @@ -461,7 +459,7 @@ void Server::ForEachHeaderNameAndValue(
namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_resp) {
auto ip2 = cc_resp.config_values().find(timeoutKey);
auto ip2 = cc_resp.config_values().find(kTimeoutKey);
if (ip2 == cc_resp.config_values().end() || !ip2->second.has_string_value()) {
return {};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstdint>
#include <ostream>
#include "deephaven/dhcore/utility/utility.h"
#include "deephaven/third_party/fmt/ostream.h"

namespace deephaven::dhcore {
struct ElementTypeId {
Expand Down Expand Up @@ -392,3 +393,5 @@ class DateTime {
friend std::ostream &operator<<(std::ostream &s, const DateTime &o);
};
} // namespace deephaven::dhcore

template<> struct fmt::formatter<deephaven::dhcore::DateTime> : ostream_formatter {};
12 changes: 8 additions & 4 deletions cpp-client/deephaven/dhcore/src/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "deephaven/third_party/fmt/format.h"
#include "deephaven/third_party/fmt/ostream.h"

static_assert(FMT_VERSION >= 100000);

namespace deephaven::dhcore {
constexpr const char16_t DeephavenConstants::kNullChar;

Expand Down Expand Up @@ -54,20 +56,22 @@ constexpr const int64_t DeephavenConstants::kMinLong;
constexpr const int64_t DeephavenConstants::kMaxLong;

DateTime DateTime::Parse(std::string_view iso_8601_timestamp) {
constexpr const char *kFormatToUse = "%FT%T%z";
// Special handling for "Z" timezone
const char *format_to_use = !iso_8601_timestamp.empty() && iso_8601_timestamp.back() == 'Z' ?
"%FT%TZ" : "%FT%T%z";
std::istringstream istream((std::string(iso_8601_timestamp)));
std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds> tp;
istream >> date::parse(kFormatToUse, tp);
istream >> date::parse(format_to_use, tp);
if (istream.fail()) {
auto message = fmt::format(R"x(Can't parse "{}" as ISO 8601 timestamp (using format string "{}"))x",
iso_8601_timestamp, kFormatToUse);
iso_8601_timestamp, format_to_use);
throw std::runtime_error(message);
}

auto probe = istream.peek();
if (probe != std::istringstream::traits_type::eof()) {
auto message = fmt::format(R"x(Input string "{}" had extra trailing characters (using format string "{}"))x",
iso_8601_timestamp, kFormatToUse);
iso_8601_timestamp, format_to_use);
throw std::runtime_error(message);
}

Expand Down
5 changes: 4 additions & 1 deletion cpp-client/deephaven/tests/cython_support_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#include <array>
#include <type_traits>
#include "deephaven/dhcore/chunk/chunk.h"
#include "deephaven/dhcore/column/array_column_source.h"
Expand Down Expand Up @@ -28,7 +29,9 @@ TEST_CASE("CreateStringColumnsSource", "[cython]") {
static_assert((kNumElements + 7) / 8 == kValidity.size());

auto result = CythonSupport::CreateStringColumnSource(kText, kText + kTextSize,
kOffsets.begin(), kOffsets.end(), kValidity.begin(), kValidity.end(), kNumElements);
kOffsets.data(), kOffsets.data() + kOffsets.size(),
kValidity.data(), kValidity.data() + kValidity.size(),
kNumElements);

auto rs = RowSequence::CreateSequential(0, kNumElements);
auto data = dhcore::chunk::StringChunk::Create(kNumElements);
Expand Down
36 changes: 36 additions & 0 deletions cpp-client/deephaven/tests/date_time_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "tests/third_party/catch.hpp"
#include "deephaven/dhcore/types.h"
#include "deephaven/dhcore/utility/utility.h"
#include "deephaven/third_party/fmt/core.h"
#include "deephaven/third_party/fmt/format.h"

using deephaven::dhcore::DateTime;
using deephaven::dhcore::utility::Base64Encode;
Expand All @@ -29,6 +31,40 @@ TEST_CASE("DateTime parse ISO8601", "[datetime]") {

auto dt2 = DateTime::Parse("2001-03-01T12:34:56-0400");
CHECK(dt2.Nanos() == 983464496 * kOneBillion);

auto dt3 = DateTime::Parse("2001-03-01T12:34:56Z");
CHECK(dt3.Nanos() == 983450096 * kOneBillion);

auto dt4 = DateTime::Parse("2001-03-01T12:34:56.987-0500");
CHECK(dt4.Nanos() == 983468096987000000);

auto dt5 = DateTime::Parse("2001-03-01T12:34:56.987654-0500");
CHECK(dt5.Nanos() == 983468096987654000);

auto dt6 = DateTime::Parse("2001-03-01T12:34:56.987654321-0500");
CHECK(dt6.Nanos() == 983468096987654321);
}

TEST_CASE("DateTime format ISO8601", "[datetime]") {
constexpr const uint64_t kOneBillion = 1'000'000'000;

DateTime dt1(983468096 * kOneBillion);
CHECK(fmt::to_string(dt1) == "2001-03-01T17:34:56.000000000Z");

DateTime dt2(983464496 * kOneBillion);
CHECK(fmt::to_string(dt2) == "2001-03-01T16:34:56.000000000Z");

DateTime dt3(983450096 * kOneBillion);
CHECK(fmt::to_string(dt3) == "2001-03-01T12:34:56.000000000Z");

DateTime dt4(983468096987000000);
CHECK(fmt::to_string(dt4) == "2001-03-01T17:34:56.987000000Z");

DateTime dt5(983468096987654000);
CHECK(fmt::to_string(dt5) == "2001-03-01T17:34:56.987654000Z");

DateTime dt6(983468096987654321);
CHECK(fmt::to_string(dt6) == "2001-03-01T17:34:56.987654321Z");
}

TEST_CASE("DateTime parse fails", "[datetime]") {
Expand Down
11 changes: 6 additions & 5 deletions cpp-client/deephaven/tests/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ Client TableMakerForTests::CreateClient(const ClientOptions &options) {
}

TableMakerForTests::TableMakerForTests(TableMakerForTests::ClientType &&client,
TableHandle &&test_table, ColumnNamesForTests &&column_names, ColumnDataForTests &&columnData) :
TableHandle &&test_table, ColumnNamesForTests &&column_names, ColumnDataForTests &&column_data) :
client_(std::move(client)),
testTable_(std::move(test_table)), columnNames_(std::move(column_names)),
columnData_(std::move(columnData)) {}
columnData_(std::move(column_data)) {}

TableMakerForTests::TableMakerForTests(TableMakerForTests &&) noexcept = default;
TableMakerForTests &TableMakerForTests::operator=(TableMakerForTests &&) noexcept = default;
Expand Down Expand Up @@ -208,16 +208,17 @@ void CompareTableHelper(int depth, const std::shared_ptr<arrow::Table> &table,

std::shared_ptr<arrow::Table> BasicValidate(const deephaven::client::TableHandle &table, int expected_columns) {
auto fsr = table.GetFlightStreamReader();
std::shared_ptr<arrow::Table> arrow_table;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->ReadAll(&arrow_table)));
auto table_res = fsr->ToTable();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(table_res));

auto &arrow_table = *table_res;
if (expected_columns != arrow_table->num_columns()) {
auto message = Stringf("Expected %o columns, but Table actually has %o columns",
expected_columns, arrow_table->num_columns());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

return arrow_table;
return std::move(arrow_table);
}
} // namespace internal
} // namespace deephaven::client::tests

0 comments on commit 3001714

Please sign in to comment.