Skip to content

Commit

Permalink
C++ Client: use absl for dates; arrow deprecation; more style guide
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Nov 22, 2023
1 parent eab615f commit 91ca8c8
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 141 deletions.
84 changes: 41 additions & 43 deletions cpp-client/deephaven/dhclient/src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,33 @@ const char *const Server::kAuthorizationKey = "authorization";

namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_Resp);
const ConfigurationConstantsResponse &cc_resp);

const char *timeoutKey = "http.session.durationMs";
constexpr const char *kTimeoutKey = "http.session.durationMs";

// A handshake resend interval to use as a default if our normal interval calculation
// fails, e.g. due to GRPC errors.
constexpr const auto kHandshakeResendInterval = std::chrono::seconds(5);
} // namespace

namespace {
std::shared_ptr<grpc::ChannelCredentials> getCredentials(
const bool useTls,
const std::string &tlsRootCerts,
const std::string &clientCertChain,
const std::string &clientPrivateKey) {
if (!useTls) {
std::shared_ptr<grpc::ChannelCredentials> GetCredentials(
const bool use_tls,
const std::string &tls_root_certs,
const std::string &client_root_chain,
const std::string &client_private_key) {
if (!use_tls) {
return grpc::InsecureChannelCredentials();
}
grpc::SslCredentialsOptions options;
if (!tlsRootCerts.empty()) {
options.pem_root_certs = tlsRootCerts;
if (!tls_root_certs.empty()) {
options.pem_root_certs = tls_root_certs;
}
if (!clientCertChain.empty()) {
options.pem_cert_chain = clientCertChain;
if (!client_root_chain.empty()) {
options.pem_cert_chain = client_root_chain;
}
if (!clientPrivateKey.empty()) {
options.pem_private_key = clientPrivateKey;
if (!client_private_key.empty()) {
options.pem_private_key = client_private_key;
}
return grpc::SslCredentials(options);
}
Expand All @@ -120,7 +120,7 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.generic_options.emplace_back(opt.first, opt.second);
}

auto credentials = getCredentials(
auto credentials = GetCredentials(
client_options.UseTls(),
client_options.TlsRootCerts(),
client_options.ClientCertChain(),
Expand All @@ -145,13 +145,12 @@ std::shared_ptr<Server> Server::CreateFromTarget(
auto its = InputTableService::NewStub(channel);

// TODO(kosak): Warn about this string conversion or do something more general.
auto flightTarget = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;
arrow::flight::Location location;
auto flight_target = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;

auto rc1 = arrow::flight::Location::Parse(flightTarget, &location);
if (!rc1.ok()) {
auto location_res = arrow::flight::Location::Parse(flight_target);
if (!location_res.ok()) {
auto message = Stringf("Location::Parse(%o) failed, error = %o",
flightTarget, rc1.ToString());
flight_target, location_res.status());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

Expand All @@ -165,33 +164,32 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.private_key = client_options.ClientPrivateKey();
}

std::unique_ptr<arrow::flight::FlightClient> fc;
auto rc2 = arrow::flight::FlightClient::Connect(location, options, &fc);
if (!rc2.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", rc2.ToString());
auto client_res = arrow::flight::FlightClient::Connect(*location_res, options);
if (!client_res.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", client_res.status());
throw std::runtime_error(message);
}
gpr_log(GPR_DEBUG,
"%s: "
"FlightClient(%p) created, "
"target=%s",
"Server::CreateFromTarget",
static_cast<void*>(fc.get()),
static_cast<void*>(client_res->get()),
target.c_str());

std::string sessionToken;
std::chrono::milliseconds expirationInterval;
auto sendTime = std::chrono::system_clock::now();
std::string session_token;
std::chrono::milliseconds expiration_interval;
auto send_time = std::chrono::system_clock::now();
{
ConfigurationConstantsRequest ccReq;
ConfigurationConstantsResponse ccResp;
ConfigurationConstantsRequest cc_req;
ConfigurationConstantsResponse cc_resp;
grpc::ClientContext ctx;
ctx.AddMetadata(kAuthorizationKey, client_options.AuthorizationValue());
for (const auto &header : client_options.ExtraHeaders()) {
ctx.AddMetadata(header.first, header.second);
}

auto result = cfs->GetConfigurationConstants(&ctx, ccReq, &ccResp);
auto result = cfs->GetConfigurationConstants(&ctx, cc_req, &cc_resp);

if (!result.ok()) {
auto message = Stringf("Can't get configuration constants. Error %o: %o",
Expand All @@ -205,29 +203,29 @@ std::shared_ptr<Server> Server::CreateFromTarget(
throw std::runtime_error(
DEEPHAVEN_LOCATION_STR("Configuration response didn't contain authorization token"));
}
sessionToken.assign(ip->second.begin(), ip->second.end());
session_token.assign(ip->second.begin(), ip->second.end());

// Get expiration interval.
auto expInt = ExtractExpirationInterval(ccResp);
if (expInt.has_value()) {
expirationInterval = *expInt;
auto exp_int = ExtractExpirationInterval(cc_resp);
if (exp_int.has_value()) {
expiration_interval = *exp_int;
} else {
expirationInterval = std::chrono::seconds(10);
expiration_interval = std::chrono::seconds(10);
}
}

auto nextHandshakeTime = sendTime + expirationInterval;
auto next_handshake_time = send_time + expiration_interval;

auto result = std::make_shared<Server>(Private(), std::move(as), std::move(cs),
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(fc),
client_options.ExtraHeaders(), std::move(sessionToken), expirationInterval, nextHandshakeTime);
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(*client_res),
client_options.ExtraHeaders(), std::move(session_token), expiration_interval, next_handshake_time);
result->keepAliveThread_ = std::thread(&SendKeepaliveMessages, result);
gpr_log(GPR_DEBUG,
"%s: "
"Server(%p) created, "
"target=%s",
"Server::CreateFromTarget",
(void*) result.get(),
static_cast<void*>(result.get()),
target.c_str());
return result;
}
Expand Down Expand Up @@ -342,14 +340,14 @@ void Server::ReleaseUnchecked(Ticket ticket) {
// we can release outstanding TableHandles even after the cancelled_ flag is set.
void Server::SendRpc(const std::function<grpc::Status(grpc::ClientContext *)> &callback,
bool disregard_cancellation_state) {
using deephaven::dhcore::utility::TimePointToStr;
using deephaven::dhcore::utility::TimePointToLocalTimeStr;
auto now = std::chrono::system_clock::now();
gpr_log(GPR_DEBUG,
"Server(%p): "
"Sending RPC "
"at time %s.",
static_cast<void *>(this),
TimePointToStr(now).c_str());
TimePointToLocalTimeStr(now).c_str());

grpc::ClientContext ctx;
ForEachHeaderNameAndValue([&ctx](const std::string &name, const std::string &value) {
Expand Down Expand Up @@ -461,7 +459,7 @@ void Server::ForEachHeaderNameAndValue(
namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_resp) {
auto ip2 = cc_resp.config_values().find(timeoutKey);
auto ip2 = cc_resp.config_values().find(kTimeoutKey);
if (ip2 == cc_resp.config_values().end() || !ip2->second.has_string_value()) {
return {};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <thread>
#include <typeinfo>
#include <vector>
#include <absl/time/time.h>

namespace deephaven::dhcore::utility {
template<typename Dest, typename Src>
Expand Down Expand Up @@ -225,15 +226,25 @@ inline void TrueOrThrow(const DebugInfo &debugInfo, bool value) {
}

[[nodiscard]] std::string
EpochMillisToStr(std::chrono::milliseconds::rep epoch_millis);
EpochMillisToStr(std::chrono::milliseconds::rep epoch_millis, absl::TimeZone tz);

[[nodiscard]] inline std::string
EpochMillisToLocalTimeStr(std::chrono::milliseconds::rep epoch_millis) {
return EpochMillisToStr(epoch_millis, absl::LocalTimeZone());
}

[[nodiscard]] inline std::string
EpochMillisToUTCStr(std::chrono::milliseconds::rep epoch_millis) {
return EpochMillisToStr(epoch_millis, absl::UTCTimeZone());
}

[[nodiscard]] std::int64_t
TimePointToEpochMillis(
const std::chrono::time_point<std::chrono::system_clock> time_point);
std::chrono::time_point<std::chrono::system_clock> time_point);

[[nodiscard]] std::string
TimePointToStr(
const std::chrono::time_point<std::chrono::system_clock> time_point);
TimePointToLocalTimeStr(
std::chrono::time_point<std::chrono::system_clock> time_point);

template <class T> [[nodiscard]] std::string
TypeName(const T& t) {
Expand Down
40 changes: 12 additions & 28 deletions cpp-client/deephaven/dhcore/src/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
#include "deephaven/dhcore/types.h"

#include <absl/time/time.h>
#include <limits>

#include "deephaven/dhcore/utility/utility.h"
Expand Down Expand Up @@ -52,26 +53,16 @@ constexpr const int64_t DeephavenConstants::kMinLong;
constexpr const int64_t DeephavenConstants::kMaxLong;

DateTime DateTime::Parse(std::string_view iso_8601_timestamp) {
constexpr const char *kFormatToUse = "%Y-%m-%dT%H:%M:%S%z";
constexpr const int64_t kOneBillion = 1'000'000'000;

struct tm tm;
memset(&tm, 0, sizeof(struct tm));
const char *result = strptime(iso_8601_timestamp.data(), "%Y-%m-%dT%H:%M:%S%z", &tm);
if (result == nullptr) {
auto message = Stringf(R"x(Can't parse "%o" as ISO 8601 timestamp (using format string "%o"))x",
iso_8601_timestamp, kFormatToUse);
constexpr const char *kFormatToUse = "%Y-%m-%dT%H:%M:%E*S%z";
absl::Time result;
std::string error_string;
if (!absl::ParseTime(kFormatToUse, std::string(iso_8601_timestamp), &result, &error_string)) {
auto message = Stringf(R"x(Can't parse "%o" as ISO 8601 timestamp (using format string "%o"). Error is: %o)x",
iso_8601_timestamp, kFormatToUse, error_string);
throw std::runtime_error(message);
}
if (result != iso_8601_timestamp.end()) {
auto message = Stringf(R"x(Input string "%o" had extra trailing characters "%o" (using format string "%o"))x",
iso_8601_timestamp, result, kFormatToUse);
throw std::runtime_error(message);
}

auto tz_offset_secs = tm.tm_gmtoff;
auto time_secs = timegm(&tm) - tz_offset_secs;
return DateTime::FromNanos(time_secs * kOneBillion);
auto nanos = absl::ToUnixNanos(result);
return DateTime::FromNanos(nanos);
}

DateTime::DateTime(int year, int month, int day) : DateTime(year, month, day, 0, 0, 0, 0) {}
Expand All @@ -94,16 +85,9 @@ DateTime::DateTime(int year, int month, int day, int hour, int minute, int secon
}

std::ostream &operator<<(std::ostream &s, const DateTime &o) {
size_t one_billions = 1'000'000'000;
time_t time_secs = o.nanos_ / one_billions;
auto nanos = o.nanos_ % one_billions;
struct tm tm = {};
gmtime_r(&time_secs, &tm);
char date_buffer[32]; // ample
char nanos_buffer[32]; // ample
strftime(date_buffer, sizeof(date_buffer), "%FT%T", &tm);
snprintf(nanos_buffer, sizeof(nanos_buffer), "%09zd", nanos);
s << date_buffer << '.' << nanos_buffer << " UTC";
auto time = absl::FromUnixNanos(o.nanos_);
constexpr const char *kFormatToUse = "%Y-%m-%dT%H:%M:%E9S";
s << absl::FormatTime(kFormatToUse, time, absl::UTCTimeZone()) << 'Z';
return s;
}
} // namespace deephaven::client
29 changes: 9 additions & 20 deletions cpp-client/deephaven/dhcore/src/utility/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <ostream>
#include <string>
#include <vector>
#include <absl/strings/str_format.h>
#include <absl/time/time.h>

#ifdef __GNUG__
#include <cstdlib>
Expand Down Expand Up @@ -195,21 +197,10 @@ void dumpTillPercentOrEnd(std::ostream &result, const char **fmt) {
}
} // namespace

std::string EpochMillisToStr(int64_t epoch_millis) {
time_t time_secs = epoch_millis / 1000;
auto millis = epoch_millis % 1000;
struct tm tm = {};
localtime_r(&time_secs, &tm);
char date_buffer[32]; // ample
char millis_buffer[32]; // ample
char tz_buffer[32]; // ample
strftime(date_buffer, sizeof(date_buffer), "%FT%T", &tm);
snprintf(millis_buffer, sizeof(millis_buffer), ".%03zd", millis);
strftime(tz_buffer, sizeof(tz_buffer), "%z", &tm);

SimpleOstringstream s;
s << date_buffer << millis_buffer << tz_buffer;
return std::move(s.str());
std::string EpochMillisToStr(int64_t epoch_millis, absl::TimeZone time_zone) {
constexpr const char *kFormatToUse = "%Y-%m-%dT%H:%M:%S.%E3f%z";
absl::Time time = absl::FromUnixMillis(epoch_millis);
return absl::FormatTime(kFormatToUse, time, time_zone);
}

std::int64_t
Expand All @@ -221,9 +212,9 @@ TimePointToEpochMillis(
}

std::string
TimePointToStr(
TimePointToLocalTimeStr(
const std::chrono::time_point<std::chrono::system_clock> time_point) {
return EpochMillisToStr(TimePointToEpochMillis(time_point));
return EpochMillisToLocalTimeStr(TimePointToEpochMillis(time_point));
}

#ifdef __GNUG__
Expand All @@ -242,9 +233,7 @@ std::string demangle(const char* name) {
#endif

std::string ObjectId(const std::string &class_short_name, void *this_ptr) {
SimpleOstringstream s;
s << class_short_name << '(' << this_ptr << ')';
return std::move(s.str());
return absl::StrFormat("%s(%p)", class_short_name, this_ptr);
}

std::string
Expand Down
5 changes: 4 additions & 1 deletion cpp-client/deephaven/tests/cython_support_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#include <array>
#include <type_traits>
#include "deephaven/dhcore/chunk/chunk.h"
#include "deephaven/dhcore/column/array_column_source.h"
Expand Down Expand Up @@ -28,7 +29,9 @@ TEST_CASE("CreateStringColumnsSource", "[cython]") {
static_assert((kNumElements + 7) / 8 == kValidity.size());

auto result = CythonSupport::CreateStringColumnSource(kText, kText + kTextSize,
kOffsets.begin(), kOffsets.end(), kValidity.begin(), kValidity.end(), kNumElements);
kOffsets.data(), kOffsets.data() + kOffsets.size(),
kValidity.data(), kValidity.data() + kValidity.size(),
kNumElements);

auto rs = RowSequence::CreateSequential(0, kNumElements);
auto data = dhcore::chunk::StringChunk::Create(kNumElements);
Expand Down
Loading

0 comments on commit 91ca8c8

Please sign in to comment.