Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Add some lightweight encodings #1980

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ set(arcticdb_srcs
codec/magic_words.hpp
codec/passthrough.hpp
codec/protobuf_mappings.hpp
codec/compression/frequency.hpp
codec/compression/constant.hpp
codec/calculate_stats.hpp
codec/compression/fastlanes_common.hpp
codec/compression/delta.hpp codec/compression/ffor.hpp
codec/compression/bitpack_fused.hpp
codec/adaptive.hpp
codec/adaptive.hpp
codec/slice_data_sink.hpp
codec/segment_header.hpp
codec/segment_identifier.hpp
Expand Down Expand Up @@ -405,6 +413,15 @@ set(arcticdb_srcs
async/task_scheduler.cpp
async/tasks.cpp
codec/codec.cpp
codec/compression/frequency.hpp
codec/compression/constant.hpp
codec/calculate_stats.hpp
codec/compression/fastlanes_common.hpp
codec/compression/delta.hpp codec/compression/ffor.hpp
codec/compression/bitpack_fused_old.hpp
codec/adaptive.hpp
codec/adaptive.hpp
codec/scanner.hpp
codec/encode_v1.cpp
codec/encode_v2.cpp
codec/encoded_field.cpp
Expand Down Expand Up @@ -526,7 +543,10 @@ set(arcticdb_srcs
version/version_core.cpp
version/version_store_api.cpp
version/version_utils.cpp
version/version_map_batch_methods.cpp)
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
util/buffer_holder.cpp codec/compression/encoders.hpp codec/compression/estimators.hpp codec/compression/bitpack_fused.hpp codec/compression/rle.hpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -668,7 +688,7 @@ set (arcticdb_core_libraries
arcticdb_proto
xxHash::xxHash
prometheus-cpp::push
prometheus-cpp::pull
#prometheus-cpp::pull
unordered_dense::unordered_dense
${standard_libraries}
fmt::fmt
Expand Down Expand Up @@ -753,8 +773,8 @@ if (SSL_LINK)
find_package(OpenSSL REQUIRED)
list(APPEND arcticdb_core_libraries OpenSSL::SSL)
if (NOT WIN32)
list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
#list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
#list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
endif()
endif ()
target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries})
Expand Down Expand Up @@ -893,6 +913,7 @@ else()
${CMAKE_COMMAND} -E copy $<TARGET_FILE:arcticdb_ext> ${CMAKE_INSTALL_PREFIX})
endif()


## Unit Tests ##
if(${TEST})
unset(Python_USE_STATIC_LIBS)
Expand All @@ -909,6 +930,12 @@ if(${TEST})
codec/test/test_encode_field_collection.cpp
codec/test/test_segment_header.cpp
codec/test/test_encoded_field.cpp
codec/test/test_frequency_encoding.cpp
codec/test/test_constant_encoding.cpp
codec/test/encoding_test_common.hpp
codec/test/test_stats.cpp
codec/test/test_ffor.cpp
codec/test/test_fused_bitpack.cpp
column_store/test/ingestion_stress_test.cpp
column_store/test/test_column.cpp
column_store/test/test_column_data_random_accessor.cpp
Expand Down Expand Up @@ -983,12 +1010,13 @@ if(${TEST})
version/test/test_version_map_batch.cpp
version/test/test_version_store.cpp
version/test/version_map_model.hpp
python/python_handlers.cpp)
python/python_handlers.cpp
storage/test/common.hpp
version/test/test_sort_index.cpp codec/test/test_delta.cpp codec/test/test_rle.cpp codec/test/test_estimators.cpp)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

add_executable(test_unit_arcticdb ${unit_test_srcs})

install(TARGETS test_unit_arcticdb RUNTIME
DESTINATION .
PERMISSIONS ${EXECUTABLE_PERMS}
Expand Down Expand Up @@ -1074,6 +1102,7 @@ if(${TEST})
)

set(rapidcheck_srcs
codec/test/rapidcheck_frequency_encoding.cpp
column_store/test/rapidcheck_column_store.cpp
column_store/test/rapidcheck_chunked_buffer.cpp
column_store/test/rapidcheck_column.cpp
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ auto read_and_continue(const VariantKey& key, std::shared_ptr<storage::Library>
}

/*
* AsyncStore is a wrapper around a Store that provides async methods for writing and reading data.
* AsyncStore is an implementation of the Store interface that provides async methods for writing and reading data.
* It is used by the VersionStore to write data to the Store asynchronously.
* It also can be used to write data synchronously (using the `*_sync` methods) and
* to write batch of data (using the `batch_*` methods).
Expand All @@ -50,11 +50,11 @@ class AsyncStore : public Store {
public:
AsyncStore(
std::shared_ptr<storage::Library> library,
const proto::encoding::VariantCodec &codec,
const BlockCodecImpl& codec,
EncodingVersion encoding_version
) :
library_(std::move(library)),
codec_(std::make_shared<proto::encoding::VariantCodec>(codec)),
codec_(std::make_shared<BlockCodecImpl>(codec)),
encoding_version_(encoding_version) {
}

Expand Down Expand Up @@ -399,7 +399,7 @@ folly::Future<SliceAndKey> async_write(
private:
friend class arcticdb::toolbox::apy::LibraryTool;
std::shared_ptr<storage::Library> library_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_;
std::shared_ptr<BlockCodecImpl> codec_;
const EncodingVersion encoding_version_;
};

Expand Down
13 changes: 4 additions & 9 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,10 @@ inline auto get_default_num_cpus([[maybe_unused]] const std::string& cgroup_fold
#endif
}

/*
* Possible areas of inprovement in the future:
* 1/ Task/op decoupling: push task and then use strategy to implement smart batching to
* amortize costs wherever possible
* 2/ Worker thread Affinity - would better locality improve throughput by keeping hot structure in
* hot cachelines and not jumping from one thread to the next (assuming thread/core affinity in hw too) ?
* 3/ Priority: How to assign priorities to task in order to treat the most pressing first.
* 4/ Throttling: (similar to priority) how to absorb work spikes and apply memory backpressure
*/
inline int64_t default_io_thread_count(uint64_t cpu_count) {
return std::min(int64_t(100L), static_cast<int64_t>(static_cast<double>(cpu_count) * 1.5));
}

class TaskScheduler {
public:
using CPUSchedulerType = folly::FutureExecutor<folly::CPUThreadPoolExecutor>;
Expand Down
16 changes: 8 additions & 8 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ struct EncodeAtomTask : BaseTask {
PartialKey partial_key_;
timestamp creation_ts_;
SegmentInMemory segment_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta_;
std::shared_ptr<BlockCodecImpl> codec_meta_;
EncodingVersion encoding_version_;

EncodeAtomTask(
PartialKey &&pk,
timestamp creation_ts,
SegmentInMemory &&segment,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version) :
partial_key_(std::move(pk)),
creation_ts_(creation_ts),
Expand All @@ -60,7 +60,7 @@ struct EncodeAtomTask : BaseTask {
EncodeAtomTask(
std::pair<PartialKey, SegmentInMemory>&& pk_seg,
timestamp creation_ts,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version) :
partial_key_(std::move(pk_seg.first)),
creation_ts_(creation_ts),
Expand All @@ -77,7 +77,7 @@ struct EncodeAtomTask : BaseTask {
IndexValue end_index,
timestamp creation_ts,
SegmentInMemory &&segment,
const std::shared_ptr<arcticdb::proto::encoding::VariantCodec> &codec_meta,
const std::shared_ptr<BlockCodecImpl> &codec_meta,
EncodingVersion encoding_version) :
EncodeAtomTask(
PartialKey{key_type, gen_id, std::move(stream_id), std::move(start_index), std::move(end_index)},
Expand Down Expand Up @@ -108,12 +108,12 @@ struct EncodeAtomTask : BaseTask {
struct EncodeSegmentTask : BaseTask {
VariantKey key_;
SegmentInMemory segment_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta_;
std::shared_ptr<BlockCodecImpl> codec_meta_;
EncodingVersion encoding_version_;

EncodeSegmentTask(entity::VariantKey key,
SegmentInMemory &&segment,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version)
: key_(std::move(key)),
segment_(std::move(segment)),
Expand All @@ -139,14 +139,14 @@ struct EncodeRefTask : BaseTask {
KeyType key_type_;
StreamId id_;
SegmentInMemory segment_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta_;
std::shared_ptr<BlockCodecImpl> codec_meta_;
EncodingVersion encoding_version_;

EncodeRefTask(
KeyType key_type,
StreamId stream_id,
SegmentInMemory &&segment,
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_meta,
std::shared_ptr<BlockCodecImpl> codec_meta,
EncodingVersion encoding_version
)
: key_type_(key_type),
Expand Down
46 changes: 26 additions & 20 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ TEST(Async, SinkBasic) {

as::UserAuth au{"abc"};
auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, au, as::NativeVariantStorage());
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();
auto block_codec = std::make_shared<BlockCodecImpl>();
aa::TaskScheduler sched{1};

auto seg = SegmentInMemory();
aa::EncodeAtomTask enc{
entity::KeyType::GENERATION, entity::VersionId{6}, NumericId{123}, NumericId{456}, timestamp{457}, entity::NumericIndex{999}, std::move(seg), codec_opt, EncodingVersion::V2
entity::KeyType::GENERATION,
entity::VersionId{6},
NumericId{123},
NumericId{456},
timestamp{457},
entity::NumericIndex{999},
std::move(seg),
block_codec,
EncodingVersion::V2
};

auto v = sched.submit_cpu_task(std::move(enc)).via(&aa::io_executor()).thenValue(aa::WriteSegmentTask{lib}).get();
Expand All @@ -75,8 +83,8 @@ TEST(Async, DeDupTest) {

as::UserAuth au{"abc"};
auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, au, storage::NativeVariantStorage());
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();
aa::AsyncStore store(lib, *codec_opt, EncodingVersion::V2);
auto codec_opt = std::make_shared<BlockCodecImpl>();
aa::AsyncStore store(lib, codec::default_lz4_codec(), EncodingVersion::V2);
auto seg = SegmentInMemory();

std::vector<std::pair<ast::StreamSink::PartialKey, SegmentInMemory>> key_segments;
Expand All @@ -103,7 +111,7 @@ TEST(Async, DeDupTest) {
for(const auto& slice_key : slice_keys)
keys.emplace_back(slice_key.key());

//The first key will be de-duped, second key will be fresh because indexes dont match
//The first key will be de-duped, second key will be fresh because indexes don't match
ASSERT_EQ(2ULL, keys.size());
ASSERT_EQ(k, to_atom(keys[0]));
ASSERT_NE(k, to_atom(keys[1]));
Expand Down Expand Up @@ -301,9 +309,9 @@ TEST(Async, NumCoresCgroupV2) {
std::shared_ptr<arcticdb::Store> create_store(const storage::LibraryPath &library_path,
as::LibraryIndex &library_index,
const storage::UserAuth &user_auth,
std::shared_ptr<proto::encoding::VariantCodec> &codec_opt) {
BlockCodecImpl&& codec_opt) {
auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, user_auth, storage::NativeVariantStorage());
auto store = aa::AsyncStore(lib, *codec_opt, EncodingVersion::V1);
auto store = aa::AsyncStore(lib, std::move(codec_opt), EncodingVersion::V1);
return std::make_shared<aa::AsyncStore<>>(std::move(store));
}

Expand All @@ -326,23 +334,23 @@ TEST(Async, CopyCompressedInterStore) {
as::LibraryIndex library_index{environment_name, config_resolver};

as::UserAuth user_auth{"abc"};
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();

auto source_store = create_store(library_path, library_index, user_auth, codec_opt);
auto source_store = create_store(library_path, library_index, user_auth, codec::default_passthrough_codec());

// When - we write a key to the source and copy it
const arcticdb::entity::RefKey& key = arcticdb::entity::RefKey{"abc", KeyType::VERSION_REF};
auto segment_in_memory = get_test_frame<arcticdb::stream::TimeseriesIndex>("symbol", {}, 10, 0).segment_;
auto row_count = segment_in_memory.row_count();
ASSERT_GT(row_count, 0);
auto segment = encode_dispatch(std::move(segment_in_memory), *codec_opt, arcticdb::EncodingVersion::V1);
auto codec_opt = codec::default_passthrough_codec();
auto segment = encode_dispatch(std::move(segment_in_memory), codec_opt, arcticdb::EncodingVersion::V1);
(void)segment.calculate_size();
source_store->write_compressed_sync(as::KeySegmentPair{key, std::move(segment)});

auto targets = std::vector<std::shared_ptr<arcticdb::Store>>{
create_store(library_path, library_index, user_auth, codec_opt),
create_store(library_path, library_index, user_auth, codec_opt),
create_store(library_path, library_index, user_auth, codec_opt)
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec())
};

CopyCompressedInterStoreTask task{
Expand Down Expand Up @@ -393,25 +401,23 @@ TEST(Async, CopyCompressedInterStoreNoSuchKeyOnWrite) {
as::LibraryIndex failed_library_index{environment_name, failed_config_resolver};

as::UserAuth user_auth{"abc"};
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();

auto source_store = create_store(library_path, library_index, user_auth, codec_opt);
auto source_store = create_store(library_path, library_index, user_auth, codec::default_passthrough_codec());

std::string failureSymbol = storage::s3::MockS3Client::get_failure_trigger("sym", storage::StorageOperation::WRITE, Aws::S3::S3Errors::NO_SUCH_KEY);

// Prepare 2 targets to fail and 1 to succeed
auto targets = std::vector<std::shared_ptr<arcticdb::Store>>{
create_store(library_path, library_index, user_auth, codec_opt),
create_store(library_path, failed_library_index, user_auth, codec_opt),
create_store(library_path, library_index, user_auth, codec_opt)
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, failed_library_index, user_auth, codec::default_passthrough_codec()),
create_store(library_path, library_index, user_auth, codec::default_passthrough_codec())
};

// When - we write a key to the source
const arcticdb::entity::RefKey& key = arcticdb::entity::RefKey{failureSymbol, KeyType::VERSION_REF};
auto segment_in_memory = get_test_frame<arcticdb::stream::TimeseriesIndex>("symbol", {}, 10, 0).segment_;
auto row_count = segment_in_memory.row_count();
ASSERT_GT(row_count, 0);
auto segment = encode_dispatch(std::move(segment_in_memory), *codec_opt, arcticdb::EncodingVersion::V1);
auto segment = encode_dispatch(std::move(segment_in_memory), codec::default_passthrough_codec(), arcticdb::EncodingVersion::V1);
(void)segment.calculate_size();
source_store->write_compressed_sync(as::KeySegmentPair{key, std::move(segment)});

Expand Down
Loading
Loading