Skip to content

Commit

Permalink
VER: Release 0.18.1
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen authored May 21, 2024
2 parents ce8eb73 + 246e14d commit 6ced9b0
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 42 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog

## 0.18.1 - 2024-05-22

### Enhancements
- Added live `Subscribe` function overload with `use_snapshot` parameter
- Added `GetIf` method to `Record` that allows `if` chaining for handling multiple
record types
- Added record type checking to `Record::Get` method to catch programming errors
and prevent reading invalid data

### Bug fixes
- Added missing symbol chunking for live `Subscribe` overloads with `const std::string&`
`start` parameter

## 0.18.0 - 2024-05-14

### Breaking changes
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ cmake_minimum_required(VERSION 3.14)
# Project details
#

project("databento" VERSION 0.18.0 LANGUAGES CXX)
project("databento" VERSION 0.18.1 LANGUAGES CXX)
string(TOUPPER ${PROJECT_NAME} PROJECT_NAME_UPPERCASE)

#
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ int main() {

auto handler = [&symbol_mappings](const Record& rec) {
symbol_mappings.OnRecord(rec);
if (rec.Holds<TradeMsg>()) {
auto trade = rec.Get<TradeMsg>();
if (const auto* trade = rec.GetIf<TradeMsg>()) {
std::cout << "Received trade for "
<< symbol_mappings[trade.hd.instrument_id] << ':' << trade
<< symbol_mappings[trade->hd.instrument_id] << ':' << *trade
<< '\n';
}
return KeepGoing::Continue;
Expand Down
5 changes: 2 additions & 3 deletions example/live/readme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ int main() {

auto handler = [&symbol_mappings](const Record& rec) {
symbol_mappings.OnRecord(rec);
if (rec.Holds<TradeMsg>()) {
auto trade = rec.Get<TradeMsg>();
if (const auto* trade = rec.GetIf<TradeMsg>()) {
std::cout << "Received trade for "
<< symbol_mappings[trade.hd.instrument_id] << ':' << trade
<< symbol_mappings[trade->hd.instrument_id] << ':' << *trade
<< '\n';
}
return KeepGoing::Continue;
Expand Down
4 changes: 4 additions & 0 deletions include/databento/live_blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class LiveBlocking {
SType stype_in, UnixNanos start);
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype_in, const std::string& start);
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype_in, bool use_snapshot);
// Notifies the gateway to start sending messages for all subscriptions.
//
// This method should only be called once per instance.
Expand Down Expand Up @@ -78,6 +80,8 @@ class LiveBlocking {
std::string GenerateCramReply(const std::string& challenge_key);
std::string EncodeAuthReq(const std::string& auth);
std::uint64_t DecodeAuthResp();
void Subscribe(const std::string& sub_msg,
const std::vector<std::string>& symbols, bool use_snapshot);
detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout);
RecordHeader* BufferRecordHeader();

Expand Down
2 changes: 2 additions & 0 deletions include/databento/live_threaded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class LiveThreaded {
SType stype_in, UnixNanos start);
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype_in, const std::string& start);
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype_in, bool use_snapshot);
// Notifies the gateway to start sending messages for all subscriptions.
// `metadata_callback` will be called exactly once, before any calls to
// `record_callback`. `record_callback` will be called for records from all
Expand Down
29 changes: 27 additions & 2 deletions include/databento/record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "databento/constants.hpp" // kSymbolCstrLen
#include "databento/datetime.hpp" // UnixNanos
#include "databento/enums.hpp"
#include "databento/exceptions.hpp" // InvalidArgumentError
#include "databento/flag_set.hpp" // FlagSet
#include "databento/publishers.hpp" // Publisher
#include "databento/with_ts_out.hpp"
Expand Down Expand Up @@ -54,11 +55,33 @@ class Record {

template <typename T>
const T& Get() const {
return *reinterpret_cast<const T*>(record_);
if (const auto* r = GetIf<T>()) {
return *r;
}
throw InvalidArgumentError{
"Get", "T", std::string{"rtype mismatch, found "} + ToString(RType())};
}
template <typename T>
T& Get() {
return *reinterpret_cast<T*>(record_);
if (auto* r = GetIf<T>()) {
return *r;
}
throw InvalidArgumentError{
"Get", "T", std::string{"rtype mismatch, found "} + ToString(RType())};
}
template <typename T>
const T* GetIf() const {
if (!Holds<T>()) {
return nullptr;
}
return reinterpret_cast<const T*>(record_);
}
template <typename T>
T* GetIf() {
if (!Holds<T>()) {
return nullptr;
}
return reinterpret_cast<T*>(record_);
}

std::size_t Size() const;
Expand Down Expand Up @@ -629,6 +652,8 @@ inline bool operator!=(const SymbolMappingMsg& lhs,

std::string ToString(const RecordHeader& header);
std::ostream& operator<<(std::ostream& stream, const RecordHeader& header);
std::string ToString(const Record& header);
std::ostream& operator<<(std::ostream& stream, const Record& header);
std::string ToString(const MboMsg& mbo_msg);
std::ostream& operator<<(std::ostream& stream, const MboMsg& mbo_msg);
std::string ToString(const BidAskPair& ba_pair);
Expand Down
2 changes: 1 addition & 1 deletion pkg/PKGBUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Maintainer: Databento <[email protected]>
_pkgname=databento-cpp
pkgname=databento-cpp-git
pkgver=0.18.0
pkgver=0.18.1
pkgrel=1
pkgdesc="Official C++ client for Databento"
arch=('any')
Expand Down
65 changes: 40 additions & 25 deletions src/live_blocking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,42 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,

void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
Schema schema, SType stype_in, UnixNanos start) {
std::ostringstream sub_msg;
sub_msg << "schema=" << ToString(schema)
<< "|stype_in=" << ToString(stype_in);
if (start.time_since_epoch().count()) {
sub_msg << "|start=" << start.time_since_epoch().count();
}
Subscribe(sub_msg.str(), symbols, false);
}

void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
Schema schema, SType stype_in,
const std::string& start) {
std::ostringstream sub_msg;
sub_msg << "schema=" << ToString(schema)
<< "|stype_in=" << ToString(stype_in);
if (!start.empty()) {
sub_msg << "|start=" << start;
}
Subscribe(sub_msg.str(), symbols, false);
}

void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
Schema schema, SType stype_in, bool use_snapshot) {
std::ostringstream sub_msg;
sub_msg << "schema=" << ToString(schema)
<< "|stype_in=" << ToString(stype_in);

Subscribe(sub_msg.str(), symbols, use_snapshot);
}

void LiveBlocking::Subscribe(const std::string& sub_msg,
const std::vector<std::string>& symbols,
bool use_snapshot) {
static constexpr auto kMethodName = "Live::Subscribe";
constexpr std::ptrdiff_t kSymbolMaxChunkSize = 128;

if (symbols.empty()) {
throw InvalidArgumentError{kMethodName, "symbols",
"must contain at least one symbol"};
Expand All @@ -70,36 +104,17 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
const auto chunk_size =
std::min(kSymbolMaxChunkSize, std::distance(symbols_it, symbols.end()));

std::ostringstream sub_msg;
sub_msg << "schema=" << ToString(schema)
<< "|stype_in=" << ToString(stype_in) << "|symbols="
<< JoinSymbolStrings(kMethodName, symbols_it,
symbols_it + chunk_size);
if (start.time_since_epoch().count()) {
sub_msg << "|start=" << start.time_since_epoch().count();
}
sub_msg << '\n';
client_.WriteAll(sub_msg.str());
std::ostringstream chunked_sub_msg;
chunked_sub_msg << sub_msg << "|symbols="
<< JoinSymbolStrings(kMethodName, symbols_it,
symbols_it + chunk_size)
<< "|snapshot=" << use_snapshot << '\n';
client_.WriteAll(chunked_sub_msg.str());

symbols_it += chunk_size;
}
}

void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
Schema schema, SType stype_in,
const std::string& start) {
std::ostringstream sub_msg;
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
<< "|symbols="
<< JoinSymbolStrings("LiveBlocking::Subscribe", symbols);
if (!start.empty()) {
sub_msg << "|start=" << start;
}
sub_msg << '\n';

client_.WriteAll(sub_msg.str());
}

databento::Metadata LiveBlocking::Start() {
constexpr auto kMetadataPreludeSize = 8;
client_.WriteAll("start_session\n");
Expand Down
5 changes: 5 additions & 0 deletions src/live_threaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
impl_->blocking.Subscribe(symbols, schema, stype_in, start);
}

void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
Schema schema, SType stype_in, bool use_snapshot) {
impl_->blocking.Subscribe(symbols, schema, stype_in, use_snapshot);
}

void LiveThreaded::Start(RecordCallback callback) {
Start({}, std::move(callback), {});
}
Expand Down
10 changes: 9 additions & 1 deletion src/record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ bool databento::operator==(const ImbalanceMsg& lhs, const ImbalanceMsg& rhs) {
}

namespace databento {

std::string ToString(const Record& record) { return MakeString(record); }
std::ostream& operator<<(std::ostream& stream, const Record& record) {
return StreamOpBuilder{stream}
.SetSpacer(" ")
.SetTypeName("Record")
.Build()
.AddField("ptr", record.Header())
.Finish();
}
std::string ToString(const RecordHeader& header) { return MakeString(header); }
std::ostream& operator<<(std::ostream& stream, const RecordHeader& header) {
return StreamOpBuilder{stream}
Expand Down
5 changes: 4 additions & 1 deletion test/include/mock/mock_lsg_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ class MockLsgServer {
void Accept();
void Authenticate();
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype);
SType stype, bool use_snapshot = false);
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype, const std::string& start,
bool use_snapshot = false);
void Start();
std::size_t Send(const std::string& msg);
::ssize_t UncheckedSend(const std::string& msg);
Expand Down
99 changes: 98 additions & 1 deletion test/src/live_blocking_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ TEST_F(LiveBlockingTests, TestSubscribe) {
target.Subscribe(kSymbols, kSchema, kSType);
}

TEST_F(LiveBlockingTests, TestSubscriptionChunking) {
TEST_F(LiveBlockingTests, TestSubscriptionChunkingUnixNanos) {
constexpr auto kTsOut = false;
constexpr auto kDataset = dataset::kXnasItch;
const auto kSymbol = "TEST";
Expand Down Expand Up @@ -126,6 +126,103 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunking) {
target.Subscribe(kSymbols, kSchema, kSType);
}

TEST_F(LiveBlockingTests, TestSubscriptionChunkingStringStart) {
constexpr auto kTsOut = false;
constexpr auto kDataset = dataset::kXnasItch;
const auto kSymbol = "TEST";
const std::size_t kSymbolCount = 1000;
const auto kSchema = Schema::Ohlcv1M;
const auto kSType = SType::RawSymbol;
const auto kStart = "2020-01-01T00:00:00";

const mock::MockLsgServer mock_server{
kDataset, kTsOut,
[kSymbol, kSymbolCount, kSchema, kSType,
kStart](mock::MockLsgServer& self) {
self.Accept();
self.Authenticate();
std::size_t i{};
while (i < 1000) {
const auto chunk_size =
std::min(static_cast<std::size_t>(128), kSymbolCount - i);
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
self.Subscribe(symbols_chunk, kSchema, kSType, kStart);
i += chunk_size;
}
}};

LiveBlocking target{logger_.get(),
kKey,
kDataset,
kLocalhost,
mock_server.Port(),
kTsOut,
VersionUpgradePolicy{}};
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
target.Subscribe(kSymbols, kSchema, kSType, kStart);
}

TEST_F(LiveBlockingTests, TestSubscribeSnapshot) {
constexpr auto kTsOut = false;
constexpr auto kDataset = dataset::kXnasItch;
const auto kSymbol = "TEST";
const std::size_t kSymbolCount = 1000;
const auto kSchema = Schema::Ohlcv1M;
const auto kSType = SType::RawSymbol;
const auto kUseSnapshot = true;

const mock::MockLsgServer mock_server{
kDataset, kTsOut,
[kSymbol, kSymbolCount, kSchema, kSType,
kUseSnapshot](mock::MockLsgServer& self) {
self.Accept();
self.Authenticate();
std::size_t i{};
while (i < 1000) {
const auto chunk_size =
std::min(static_cast<std::size_t>(128), kSymbolCount - i);
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
self.Subscribe(symbols_chunk, kSchema, kSType, kUseSnapshot);
i += chunk_size;
}
}};

LiveBlocking target{logger_.get(),
kKey,
kDataset,
kLocalhost,
mock_server.Port(),
kTsOut,
VersionUpgradePolicy{}};
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
target.Subscribe(kSymbols, kSchema, kSType, kUseSnapshot);
}

TEST_F(LiveBlockingTests, TestInvalidSubscription) {
constexpr auto kTsOut = false;
constexpr auto kDataset = dataset::kXnasItch;
const std::vector<std::string> noSymbols{};
const auto kSchema = Schema::Ohlcv1M;
const auto kSType = SType::RawSymbol;

const mock::MockLsgServer mock_server{kDataset, kTsOut,
[](mock::MockLsgServer& self) {
self.Accept();
self.Authenticate();
}};

LiveBlocking target{logger_.get(),
kKey,
kDataset,
kLocalhost,
mock_server.Port(),
kTsOut,
VersionUpgradePolicy{}};

ASSERT_THROW(target.Subscribe(noSymbols, kSchema, kSType),
databento::InvalidArgumentError);
}

TEST_F(LiveBlockingTests, TestNextRecord) {
constexpr auto kTsOut = false;
const auto kRecCount = 12;
Expand Down
Loading

0 comments on commit 6ced9b0

Please sign in to comment.