diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index e79c148f8f..dc4585997f 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -945,6 +945,7 @@ if(${TEST}) util/test/test_storage_lock.cpp util/test/test_string_pool.cpp util/test/test_string_utils.cpp + util/test/random_throw.hpp util/test/test_tracing_allocator.cpp version/test/test_append.cpp version/test/test_sparse.cpp @@ -957,7 +958,8 @@ if(${TEST}) version/test/version_map_model.hpp python/python_handlers.cpp storage/test/common.hpp - version/test/test_sort_index.cpp) + version/test/test_sort_index.cpp + ) set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755 diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index a24ab7f23f..0663e58baf 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -90,6 +91,7 @@ struct EncodeAtomTask : BaseTask { storage::KeySegmentPair encode() { ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_); + ARCTICDB_DEBUG_THROW(5) auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_); auto content_hash = get_segment_hash(enc_seg); @@ -128,6 +130,7 @@ struct EncodeSegmentTask : BaseTask { storage::KeySegmentPair operator()() { ARCTICDB_SAMPLE(EncodeSegmentTask, 0) + ARCTICDB_DEBUG_THROW(5) return encode(); } }; @@ -484,6 +487,7 @@ struct MemSegmentProcessingTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask) std::vector operator()() { + ARCTICDB_DEBUG_THROW(5) for (auto it = clauses_.cbegin(); it != clauses_.cend(); ++it) { entity_ids_ = (*it)->process(std::move(entity_ids_)); @@ -496,22 +500,6 @@ struct MemSegmentProcessingTask : BaseTask { }; -struct MemSegmentFunctionTask : BaseTask { - stream::StreamSource::DecodeContinuation func_; - - explicit MemSegmentFunctionTask( - stream::StreamSource::DecodeContinuation&& func) : - func_(std::move(func)) { - } - - ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentFunctionTask) - - folly::Unit operator()(std::pair &&seg_pair) { - func_(std::move(seg_pair.second)); - return folly::Unit{}; - } -}; - struct DecodeMetadataTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataTask) @@ -558,6 +546,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask { std::tuple, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const { ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0) + ARCTICDB_DEBUG_THROW(5) auto key_seg = std::move(ks); ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key())); diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 693409e08e..82ce052969 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -17,9 +17,9 @@ #include #include -#include #include #include +#include #include namespace arcticdb { @@ -56,6 +56,7 @@ class GroupingMap { template std::shared_ptr> get() { + ARCTICDB_DEBUG_THROW(5) return util::variant_match(map_, [that = this](const std::monostate &) { that->map_ = std::make_shared>(); @@ -133,6 +134,7 @@ std::string FilterClause::to_string() const { } std::vector ProjectClause::process(std::vector&& entity_ids) const { + ARCTICDB_DEBUG_THROW(5) if (entity_ids.empty()) { return {}; } @@ -171,6 +173,8 @@ std::vector ProjectClause::process(std::vector&& entity_ids) AggregationClause::AggregationClause(const std::string& grouping_column, const std::vector& named_aggregators): grouping_column_(grouping_column) { + ARCTICDB_DEBUG_THROW(5) + clause_info_.input_structure_ = ProcessingStructure::HASH_BUCKETED; clause_info_.can_combine_with_column_selection_ = false; clause_info_.index_ = NewIndex(grouping_column_); @@ -219,6 +223,7 @@ std::vector> AggregationClause::structure_for_processing(s for (auto& res_element: res) { res_element.reserve(entity_ids_vec.size()); } + ARCTICDB_DEBUG_THROW(5) // Experimentation shows flattening the entities into a single vector and a single call to // component_manager_->get is faster than not flattening and making multiple calls auto entity_ids = flatten_entities(std::move(entity_ids_vec)); @@ -232,6 +237,7 @@ std::vector> AggregationClause::structure_for_processing(s } std::vector AggregationClause::process(std::vector&& entity_ids) const { + ARCTICDB_DEBUG_THROW(5) if (entity_ids.empty()) { return {}; } @@ -590,6 +596,8 @@ std::vector ResampleClause::process(std::vector> input_agg_columns; input_agg_columns.reserve(row_slices.size()); diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp index 48c9bac02a..fbd40a7503 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -181,7 +182,7 @@ bool LmdbStorage::do_key_exists(const VariantKey&key) { ARCTICDB_SAMPLE(LmdbStorageKeyExists, 0) auto txn = ::lmdb::txn::begin(env(), nullptr, MDB_RDONLY); ARCTICDB_SUBSAMPLE(LmdbStorageInTransaction, 0) - + ARCTICDB_DEBUG_THROW(5) auto db_name = fmt::format("{}", variant_key_type(key)); ARCTICDB_SUBSAMPLE(LmdbStorageOpenDb, 0) auto stored_key = to_serialized_key(key); @@ -201,6 +202,7 @@ std::vector LmdbStorage::do_remove_internal(Composite&& auto fmt_db = [](auto &&k) { return variant_key_type(k); }; std::vector failed_deletes; + ARCTICDB_DEBUG_THROW(5) (fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach([&](auto &&group) { auto db_name = fmt::format("{}", group.key()); ARCTICDB_SUBSAMPLE(LmdbStorageOpenDb, 0) diff --git a/cpp/arcticdb/util/test/random_throw.hpp b/cpp/arcticdb/util/test/random_throw.hpp new file mode 100644 index 0000000000..69a2749405 --- /dev/null +++ b/cpp/arcticdb/util/test/random_throw.hpp @@ -0,0 +1,22 @@ +/* Copyright 2024 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ +#pragma once + +#include +#include + +#ifdef GENERATE_RANDOM_EXCEPTIONS +#define ARCTICDB_DEBUG_THROW(percentage) \ + do { \ + if (static_cast(std::rand()) / RAND_MAX * 100 < percentage) { \ + throw std::runtime_error("Exception intentionally thrown"); \ + } \ + } while(0); +#else +#define ARCTICDB_DEBUG_THROW(percentage) +#endif + diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index da52561a14..cc6764fa5b 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1012,8 +1012,19 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic( log::version().debug("Compacting incomplete symbol {}", stream_id); auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id); + auto pipeline_context = std::make_shared(); + pipeline_context->stream_id_ = stream_id; + pipeline_context->version_id_ = update_info.next_version_id_; + + + auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store(), options); + auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options()); + delete_incomplete_keys(*pipeline_context, *store()); + if(delete_keys_on_failure) + delete_keys_on_failure->release(); + write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_); add_to_symbol_list_on_compaction(stream_id, options, update_info); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 0be39e11b1..5c03aa2bec 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -1386,10 +1386,10 @@ VersionedItem collate_and_write( }); } -void delete_incomplete_keys(PipelineContext* pipeline_context, Store* store) { +void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store) { std::vector keys_to_delete; - keys_to_delete.reserve(pipeline_context->slice_and_keys_.size() - pipeline_context->incompletes_after()); - for (auto sk = pipeline_context->incompletes_begin(); sk != pipeline_context->end(); ++sk) { + keys_to_delete.reserve(pipeline_context.slice_and_keys_.size() - pipeline_context.incompletes_after()); + for (auto sk = pipeline_context.incompletes_begin(); sk != pipeline_context.end(); ++sk) { const auto& slice_and_key = sk->slice_and_key(); if (ARCTICDB_LIKELY(slice_and_key.key().type() == KeyType::APPEND_DATA)) { keys_to_delete.emplace_back(slice_and_key.key()); @@ -1401,41 +1401,45 @@ void delete_incomplete_keys(PipelineContext* pipeline_context, Store* store) { ); } } - store->remove_keys(keys_to_delete).get(); + store.remove_keys(keys_to_delete).get(); } -class IncompleteKeysRAII { -public: - IncompleteKeysRAII() = default; - IncompleteKeysRAII( +DeleteIncompleteKeysOnExit::DeleteIncompleteKeysOnExit( std::shared_ptr pipeline_context, std::shared_ptr store, - const CompactIncompleteOptions* options) + bool via_iteration) : context_(pipeline_context), store_(store), - options_(options) { + via_iteration_(via_iteration) { } - ARCTICDB_MOVE_ONLY_DEFAULT(IncompleteKeysRAII) - - ~IncompleteKeysRAII() { - if (context_ && store_) { - if (context_->incompletes_after_) { - delete_incomplete_keys(context_.get(), store_.get()); - } else { - // If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the - // context thus they must be read manually. - const std::vector entries = - read_incomplete_keys_for_symbol(store_, context_->stream_id_, options_->via_iteration_); - store_->remove_keys(entries).get(); - } + +DeleteIncompleteKeysOnExit::~DeleteIncompleteKeysOnExit() { + if(released_) + return; + + try { + if (context_->incompletes_after_) { + delete_incomplete_keys(*context_, *store_); + } else { + // If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the + // context thus they must be read manually. + auto entries = read_incomplete_keys_for_symbol(store_, context_->stream_id_, via_iteration_); + store_->remove_keys(entries).get(); } + } catch (...) { + // Don't emit exceptions from destructor } +} -private: - std::shared_ptr context_ = nullptr; - std::shared_ptr store_ = nullptr; - const CompactIncompleteOptions* options_ = nullptr; -}; +std::optional get_delete_keys_on_failure( + const std::shared_ptr& pipeline_context, + const std::shared_ptr& store, + const CompactIncompleteOptions& options) { + if(options.delete_staged_data_on_failure_) + return std::make_optional(pipeline_context, store, options.via_iteration_); + else + return std::nullopt; +} VersionedItem sort_merge_impl( const std::shared_ptr& store, @@ -1450,9 +1454,7 @@ VersionedItem sort_merge_impl( auto read_query = std::make_shared(); std::optional previous_sorted_value; - const IncompleteKeysRAII incomplete_keys_raii = options.delete_staged_data_on_failure_ - ? IncompleteKeysRAII{pipeline_context, store, &options} - : IncompleteKeysRAII{}; + auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store, options); if(options.append_ && update_info.previous_index_key_.has_value()) { read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), *read_query, ReadOptions{}); if (!write_options.dynamic_schema) { @@ -1553,9 +1555,11 @@ VersionedItem sort_merge_impl( keys, pipeline_context->incompletes_after(), norm_meta); - if (!options.delete_staged_data_on_failure_) { - delete_incomplete_keys(pipeline_context.get(), store.get()); - } + + delete_incomplete_keys(*pipeline_context, *store); + if(delete_keys_on_failure) + delete_keys_on_failure->release(); + return vit; } @@ -1565,20 +1569,15 @@ VersionedItem compact_incomplete_impl( const std::optional& user_meta, const UpdateInfo& update_info, const CompactIncompleteOptions& options, - const WriteOptions& write_options) { + const WriteOptions& write_options, + std::shared_ptr& pipeline_context) { - auto pipeline_context = std::make_shared(); - pipeline_context->stream_id_ = stream_id; - pipeline_context->version_id_ = update_info.next_version_id_; ReadQuery read_query; ReadOptions read_options; read_options.set_dynamic_schema(true); - std::optional last_indexed; std::optional previous_sorted_value; - const IncompleteKeysRAII incomplete_keys_raii = options.delete_staged_data_on_failure_ - ? IncompleteKeysRAII{pipeline_context, store, &options} - : IncompleteKeysRAII{}; + if(options.append_ && update_info.previous_index_key_.has_value()) { read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, read_options); if (!write_options.dynamic_schema) { @@ -1648,9 +1647,10 @@ VersionedItem compact_incomplete_impl( keys, pipeline_context->incompletes_after(), user_meta); - if (!options.delete_staged_data_on_failure_) { - delete_incomplete_keys(pipeline_context.get(), store.get()); - } + + + + return vit; } diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index 7e04f29e7f..f2bdcd4091 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -150,7 +150,8 @@ VersionedItem compact_incomplete_impl( const std::optional& user_meta, const UpdateInfo& update_info, const CompactIncompleteOptions& options, - const WriteOptions& write_options); + const WriteOptions& write_options, + std::shared_ptr& pipeline_context); struct PredefragmentationInfo{ std::shared_ptr pipeline_context; @@ -207,6 +208,34 @@ folly::Future read_frame_for_version( std::any& handler_data ); +class DeleteIncompleteKeysOnExit { +public: + DeleteIncompleteKeysOnExit( + std::shared_ptr pipeline_context, + std::shared_ptr store, + bool via_iteration); + + ARCTICDB_NO_MOVE_OR_COPY(DeleteIncompleteKeysOnExit) + + ~DeleteIncompleteKeysOnExit(); + + void release() { + released_ = true; + } + +private: + std::shared_ptr context_; + std::shared_ptr store_; + bool via_iteration_; + bool released_ = false; +}; +void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store); + +std::optional get_delete_keys_on_failure( + const std::shared_ptr& pipeline_context, + const std::shared_ptr& store, + const CompactIncompleteOptions& options); + } //namespace arcticdb::version_store #define ARCTICDB_VERSION_CORE_H_