diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 4519233faf..a3c82ddd35 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -364,6 +364,7 @@ set(arcticdb_srcs util/slab_allocator.hpp util/sparse_utils.hpp util/storage_lock.hpp + util/reliable_storage_lock.hpp util/string_utils.hpp util/thread_cached_int.hpp util/timeouts.hpp @@ -511,7 +512,8 @@ set(arcticdb_srcs version/symbol_list.cpp version/version_map_batch_methods.cpp storage/s3/ec2_utils.cpp - util/buffer_holder.cpp) + util/buffer_holder.cpp + util/reliable_storage_lock.tpp) add_library(arcticdb_core_object OBJECT ${arcticdb_srcs}) @@ -957,7 +959,8 @@ if(${TEST}) version/test/version_map_model.hpp python/python_handlers.cpp storage/test/common.hpp - version/test/test_sort_index.cpp) + version/test/test_sort_index.cpp + util/test/test_reliable_storage_lock.cpp) set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755 diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index bd712b8f06..a292f0aad8 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -151,6 +151,15 @@ entity::VariantKey write_sync( return WriteSegmentTask{library_}(std::move(encoded)); } +entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId &stream_id, + SegmentInMemory &&segment) override { + util::check(is_ref_key_class(key_type), "Expected ref key type got {}", key_type); + auto encoded = EncodeRefTask{key_type, stream_id, std::move(segment), codec_, encoding_version_}(); + return WriteIfNoneTask{library_}(std::move(encoded)); +} + bool is_path_valid(const std::string_view path) const override { return library_->is_path_valid(path); } diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index a24ab7f23f..ac7031020d 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -184,6 +184,23 @@ struct WriteSegmentTask : BaseTask { } }; +struct WriteIfNoneTask : BaseTask { + std::shared_ptr lib_; + + explicit WriteIfNoneTask(std::shared_ptr lib) : + lib_(std::move(lib)) { + } + + ARCTICDB_MOVE_ONLY_DEFAULT(WriteIfNoneTask) + + VariantKey operator()(storage::KeySegmentPair &&key_seg) const { + ARCTICDB_SAMPLE(WriteSegmentTask, 0) + auto k = key_seg.variant_key(); + lib_->write_if_none(std::move(key_seg)); + return k; + } +}; + struct UpdateSegmentTask : BaseTask { std::shared_ptr lib_; storage::UpdateOpts opts_; diff --git a/cpp/arcticdb/entity/key.cpp b/cpp/arcticdb/entity/key.cpp index 91ecb417ab..dfd6c49b07 100644 --- a/cpp/arcticdb/entity/key.cpp +++ b/cpp/arcticdb/entity/key.cpp @@ -60,6 +60,7 @@ KeyData get_key_data(KeyType key_type) { STRING_REF(KeyType::APPEND_REF, aref, 'a') STRING_KEY(KeyType::MULTI_KEY, mref, 'm') STRING_REF(KeyType::LOCK, lref, 'x') + STRING_REF(KeyType::SLOW_LOCK, lref, 'x') STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X') STRING_KEY(KeyType::APPEND_DATA, app, 'b') // Unused diff --git a/cpp/arcticdb/entity/key.hpp b/cpp/arcticdb/entity/key.hpp index f68338c364..387319a45f 100644 --- a/cpp/arcticdb/entity/key.hpp +++ b/cpp/arcticdb/entity/key.hpp @@ -182,6 +182,10 @@ enum class KeyType : int { * Used for storing the ids of storages that failed to sync */ REPLICATION_FAIL_INFO = 26, + /* + * Used for a list based reliable storage lock + */ + SLOW_LOCK = 27, UNDEFINED }; diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 2c8168bfc0..2b926e5cbe 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -37,6 +37,10 @@ class AzureStorage final : public Storage { protected: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.hpp b/cpp/arcticdb/storage/file/mapped_file_storage.hpp index 6371becd26..44d9fd365a 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.hpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.hpp @@ -36,6 +36,10 @@ class MappedFileStorage final : public SingleFileStorage { void do_write(Composite&& kvs) override; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) override; void do_read(Composite&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index 39542b03d6..feff46053c 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -90,6 +90,14 @@ class Library { storages_->write(std::move(kvs)); } + void write_if_none(KeySegmentPair&& kv) { + if (open_mode() < OpenMode::WRITE) { + throw LibraryPermissionException(library_path_, open_mode(), "write"); + } + + storages_->write_if_none(std::move(kv)); + } + void update(Composite&& kvs, storage::UpdateOpts opts) { ARCTICDB_SAMPLE(LibraryUpdate, 0) if (open_mode() < OpenMode::WRITE) diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp index b04af485b6..9c6fa838cc 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp @@ -38,6 +38,10 @@ class LmdbStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/memory/memory_storage.hpp b/cpp/arcticdb/storage/memory/memory_storage.hpp index a02dd08269..34eda07a67 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.hpp +++ b/cpp/arcticdb/storage/memory/memory_storage.hpp @@ -29,6 +29,10 @@ namespace arcticdb::storage::memory { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.hpp b/cpp/arcticdb/storage/mongo/mongo_storage.hpp index 7f774a6aa9..141edc3c4a 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.hpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.hpp @@ -31,6 +31,10 @@ class MongoStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index d3503b0a0b..16ee41faa4 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -129,6 +129,27 @@ namespace s3 { }); } + template + void do_write_if_none_impl( + KeySegmentPair &&kv, + const std::string &root_folder, + const std::string &bucket_name, + S3ClientWrapper &s3_client, + KeyBucketizer &&bucketizer) { + ARCTICDB_SAMPLE(S3StorageWriteIfNone, 0) + auto key_type_dir = key_type_folder(root_folder, kv.key_type()); + auto &k = kv.variant_key(); + auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k); + auto &seg = kv.segment(); + + auto put_object_result = s3_client.put_object(s3_object_name, std::move(seg), bucket_name, true); + + if (!put_object_result.is_success()) { + auto& error = put_object_result.get_error(); + raise_s3_exception(error, s3_object_name); + } + } + template void do_update_impl( Composite &&kvs, diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp index d7a3dfac5d..de6267cffb 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp @@ -35,6 +35,10 @@ class NfsBackedStorage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final { + storage::raise("Atomic operations are only supported for s3 backend"); + }; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp index 96c98e13ce..613722934e 100644 --- a/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp +++ b/cpp/arcticdb/storage/s3/s3_client_wrapper.hpp @@ -57,7 +57,8 @@ class S3ClientWrapper { virtual S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) = 0; + const std::string& bucket_name, + bool if_none_match = false) = 0; virtual S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.cpp b/cpp/arcticdb/storage/s3/s3_mock_client.cpp index a6b33570f5..aec87b8b48 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.cpp @@ -83,7 +83,8 @@ S3Result MockS3Client::get_object( S3Result MockS3Client::put_object( const std::string &s3_object_name, Segment &&segment, - const std::string &bucket_name) { + const std::string &bucket_name, + bool if_none_match[[maybe_unused]]) { auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE); if (maybe_error.has_value()) { return {*maybe_error}; diff --git a/cpp/arcticdb/storage/s3/s3_mock_client.hpp b/cpp/arcticdb/storage/s3/s3_mock_client.hpp index 04c6812218..c7c094da23 100644 --- a/cpp/arcticdb/storage/s3/s3_mock_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_mock_client.hpp @@ -61,7 +61,8 @@ class MockS3Client : public S3ClientWrapper { S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) override; + const std::string& bucket_name, + bool if_none_match = false) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_real_client.cpp b/cpp/arcticdb/storage/s3/s3_real_client.cpp index b2f74dadc8..9fd7b80ec7 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.cpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.cpp @@ -133,12 +133,16 @@ S3Result RealS3Client::get_object( S3Result RealS3Client::put_object( const std::string &s3_object_name, Segment &&segment, - const std::string &bucket_name) { + const std::string &bucket_name, + bool if_none_match) { ARCTICDB_SUBSAMPLE(S3StorageWritePreamble, 0) Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucket_name.c_str()); request.SetKey(s3_object_name.c_str()); + if (if_none_match) { + request.SetIfNoneMatch("*"); + } ARCTICDB_RUNTIME_DEBUG(log::storage(), "Set s3 key {}", request.GetKey().c_str()); auto [dst, write_size, buffer] = segment.serialize_header(); diff --git a/cpp/arcticdb/storage/s3/s3_real_client.hpp b/cpp/arcticdb/storage/s3/s3_real_client.hpp index 57bfe87b65..b0e7e0caff 100644 --- a/cpp/arcticdb/storage/s3/s3_real_client.hpp +++ b/cpp/arcticdb/storage/s3/s3_real_client.hpp @@ -38,7 +38,8 @@ class RealS3Client : public S3ClientWrapper { S3Result put_object( const std::string& s3_object_name, Segment&& segment, - const std::string& bucket_name) override; + const std::string& bucket_name, + bool if_none_match = false) override; S3Result delete_objects( const std::vector& s3_object_names, diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 71daa55906..e1c49870ff 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -50,6 +50,10 @@ void S3Storage::do_write(Composite&& kvs) { detail::do_write_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } +void S3Storage::do_write_if_none(KeySegmentPair&& kv) { + detail::do_write_if_none_impl(std::move(kv), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); +} + void S3Storage::do_update(Composite&& kvs, UpdateOpts) { detail::do_update_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 9847113949..9310f2e779 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -44,6 +44,8 @@ class S3Storage final : public Storage { private: void do_write(Composite&& kvs) final; + void do_write_if_none(KeySegmentPair&& kv) final; + void do_update(Composite&& kvs, UpdateOpts opts) final; void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final; diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index 0291f74ec8..cca4f3902b 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -107,6 +107,10 @@ class Storage { return write(Composite{std::move(kv)}); } + void write_if_none(KeySegmentPair&& kv) { + return do_write_if_none(std::move(kv)); + } + void update(Composite &&kvs, UpdateOpts opts) { ARCTICDB_SAMPLE(StorageUpdate, 0) return do_update(std::move(kvs), opts); @@ -186,6 +190,8 @@ class Storage { private: virtual void do_write(Composite&& kvs) = 0; + virtual void do_write_if_none(KeySegmentPair&& kv) = 0; + virtual void do_update(Composite&& kvs, UpdateOpts opts) = 0; virtual void do_read(Composite&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) = 0; diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index ee77a36301..e046c4a00d 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -46,6 +46,10 @@ class Storages { primary().write(std::move(kvs)); } + void write_if_none(KeySegmentPair&& kv) { + primary().write_if_none(std::move(kv)); + } + void update(Composite&& kvs, storage::UpdateOpts opts) { ARCTICDB_SAMPLE(StoragesUpdate, 0) primary().update(std::move(kvs), opts); diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index d351e19905..e2927fa020 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -144,6 +144,15 @@ namespace arcticdb { return write(key_type, stream_id, std::move(segment)).get(); } + entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId& stream_id, + SegmentInMemory &&segment) override { + auto key = entity::RefKey{stream_id, key_type}; + add_segment(key, std::move(segment), true); + return key; + } + bool is_path_valid(const std::string_view) const override { return true; } @@ -465,10 +474,15 @@ namespace arcticdb { seg_by_atom_key_[key] = std::make_unique(std::move(seg)); } - void add_segment(const RefKey &key, SegmentInMemory &&seg) { + void add_segment(const RefKey &key, SegmentInMemory &&seg, bool if_none_match = false) { StorageFailureSimulator::instance()->go(FailureType::WRITE); std::lock_guard lock{mutex_}; ARCTICDB_DEBUG(log::storage(), "Adding segment with key {}", key); + if (if_none_match) { + if (seg_by_ref_key_.find(key) != seg_by_ref_key_.end()) { + storage::raise("Precondition failed. Object is already present."); + } + } seg_by_ref_key_[key] = std::make_unique(std::move(seg)); } diff --git a/cpp/arcticdb/stream/piloted_clock.cpp b/cpp/arcticdb/stream/piloted_clock.cpp index d396cfc6ca..c17530c724 100644 --- a/cpp/arcticdb/stream/piloted_clock.cpp +++ b/cpp/arcticdb/stream/piloted_clock.cpp @@ -2,4 +2,5 @@ namespace arcticdb { std::atomic PilotedClock::time_; +std::atomic PilotedClockNoAutoIncrement::time_; } \ No newline at end of file diff --git a/cpp/arcticdb/stream/piloted_clock.hpp b/cpp/arcticdb/stream/piloted_clock.hpp index f2b79189ac..41da1b5e0f 100644 --- a/cpp/arcticdb/stream/piloted_clock.hpp +++ b/cpp/arcticdb/stream/piloted_clock.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include @@ -14,4 +16,11 @@ struct PilotedClock { } }; +struct PilotedClockNoAutoIncrement { + static std::atomic time_; + static entity::timestamp nanos_since_epoch() { + return PilotedClockNoAutoIncrement::time_; + } +}; + } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/stream/stream_sink.hpp b/cpp/arcticdb/stream/stream_sink.hpp index ed28a611ab..14d1e0e64a 100644 --- a/cpp/arcticdb/stream/stream_sink.hpp +++ b/cpp/arcticdb/stream/stream_sink.hpp @@ -99,6 +99,11 @@ struct StreamSink { const StreamId &stream_id, SegmentInMemory &&segment) = 0; + virtual entity::VariantKey write_if_none_sync( + KeyType key_type, + const StreamId &stream_id, + SegmentInMemory &&segment) = 0; + [[nodiscard]] virtual folly::Future write_compressed(storage::KeySegmentPair ks) = 0; virtual void write_compressed_sync(storage::KeySegmentPair ks) = 0; diff --git a/cpp/arcticdb/stream/test/stream_test_common.cpp b/cpp/arcticdb/stream/test/stream_test_common.cpp index 3a02c8a639..d5ee8a6ae1 100644 --- a/cpp/arcticdb/stream/test/stream_test_common.cpp +++ b/cpp/arcticdb/stream/test/stream_test_common.cpp @@ -9,6 +9,4 @@ namespace arcticdb { -std::atomic PilotedClock::time_{0}; - } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp index da53b08ab7..ff56ed1d20 100644 --- a/cpp/arcticdb/util/error_code.hpp +++ b/cpp/arcticdb/util/error_code.hpp @@ -80,6 +80,7 @@ inline std::unordered_map get_error_category_names() ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \ ERROR_CODE(5003, E_PERMISSION) \ ERROR_CODE(5004, E_RESOURCE_NOT_FOUND) \ + ERROR_CODE(5005, E_UNSUPPORTED_ATOMIC_OPERATION) \ ERROR_CODE(5010, E_LMDB_MAP_FULL) \ ERROR_CODE(5011, E_UNEXPECTED_LMDB_ERROR) \ ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \ diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp new file mode 100644 index 0000000000..57e1018156 --- /dev/null +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -0,0 +1,71 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace arcticdb { + +namespace lock { + +using Epoch = uint64_t; + +// The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to +// provide a slower but more reliable lock than the StorageLock. It should be completely consistent unless a process +// holding a lock get's paused for times comparable to the lock timeout. +// It lock follows the algorithm described here: +// https://www.morling.dev/blog/leader-election-with-s3-conditional-writes/ +template +class ReliableStorageLock { +public: + ReliableStorageLock(const std::string& base_name, const std::shared_ptr store, timestamp timeout); + + Epoch retry_until_take_lock() const; + std::optional try_take_lock() const; + std::optional try_extend_lock(Epoch held_lock_epoch) const; + void free_lock(Epoch held_lock_epoch) const; + timestamp timeout() const; +private: + std::optional try_take_next_epoch(const std::vector& existing_locks, std::optional latest) const; + std::pair, std::optional> get_all_locks() const; + void clear_old_locks(Epoch latest, const std::vector& epochs) const; + StreamId get_stream_id(Epoch e) const; + std::string base_name_; + std::shared_ptr store_; + timestamp timeout_; +}; + +// The ReliableStorageLockGuard aquires a ReliableStorageLock on construction and frees it on destruction. While the lock +// is held it periodically extends its timeout in a heartbeating thread. +class ReliableStorageLockGuard { +public: + ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock); + + ~ReliableStorageLockGuard(); + + // Does the same as the descructor. It is exposed only to be used by python integration tests. + void free_lock_and_cleanup(); +private: + void cleanup_on_lost_lock(); + const ReliableStorageLock<> &lock_; + std::optional aquired_epoch_; + folly::Func on_lost_lock_; + folly::FunctionScheduler extend_lock_heartbeat_; +}; + +} + +} + +#include "arcticdb/util/reliable_storage_lock.tpp" \ No newline at end of file diff --git a/cpp/arcticdb/util/reliable_storage_lock.tpp b/cpp/arcticdb/util/reliable_storage_lock.tpp new file mode 100644 index 0000000000..3d8513c009 --- /dev/null +++ b/cpp/arcticdb/util/reliable_storage_lock.tpp @@ -0,0 +1,222 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include +#include + +#include +#include + +namespace arcticdb { + +namespace lock { + +const auto SEPARATOR = '*'; +const auto EXTENDS_PER_TIMEOUT = 5u; +const auto KEEP_LAST_N_LOCKS = 5u; + +StreamDescriptor lock_stream_descriptor(const StreamId &stream_id) { + return StreamDescriptor{stream_descriptor( + stream_id, + stream::RowCountIndex(), + {scalar_field(DataType::INT64, "expiration")})}; +} + +SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { + SegmentInMemory output{lock_stream_descriptor(name)}; + output.set_scalar(0, expiration); + output.end_row(); + return output; +} + +template +ReliableStorageLock::ReliableStorageLock(const std::string &base_name, const std::shared_ptr store, timestamp timeout) : + base_name_(base_name), store_(store), timeout_(timeout) { + auto s3_timeout = ConfigsMap::instance()->get_int("S3Storage.RequestTimeoutMs", 200000) * ONE_MILLISECOND; + if (2 * s3_timeout > timeout) { + log::lock().warn( + "Reliable Lock is created with a timeout < twice the s3 timeout. This is not recommended, as it increases the risk for a faulty lock timeout in s3." + "Lock timeout: {}, S3 timeout: {}", timeout, s3_timeout); + } +} + +template +timestamp ReliableStorageLock::timeout() const { + return timeout_; +} + +Epoch get_next_epoch(std::optional maybe_prev) { + if (maybe_prev.has_value()) { + return maybe_prev.value() + 1; + } + return 0; +} + +template +StreamId ReliableStorageLock::get_stream_id(Epoch e) const { + return fmt::format("{}{}{}", base_name_, SEPARATOR, e); +} + +Epoch extract_epoch_from_stream_id(const StreamId& stream_id) { + auto string_id = std::get(stream_id); + auto epoch_string = string_id.substr(string_id.find(SEPARATOR)+1, string_id.size()); + return std::stoull(epoch_string); +} + + +template +std::pair, std::optional> ReliableStorageLock::get_all_locks() const { + std::vector epochs; + store_->iterate_type( + KeyType::SLOW_LOCK, + [&epochs](VariantKey &&key){ + auto current_epoch = extract_epoch_from_stream_id(variant_key_id(key)); + epochs.push_back(current_epoch); + }, base_name_ + SEPARATOR); + std::optional latest = epochs.size()==0 ? std::nullopt : std::make_optional<>(*std::max_element(epochs.begin(), epochs.end())); + return {epochs, latest}; +} + +template +void ReliableStorageLock::clear_old_locks(arcticdb::lock::Epoch latest, const std::vector& epochs) const { + auto to_delete = std::vector(); + for (auto epoch : epochs) { + if (epoch + KEEP_LAST_N_LOCKS <= latest) { + to_delete.emplace_back(RefKey{get_stream_id(epoch), KeyType::SLOW_LOCK}); + } + } + store_->remove_keys_sync(to_delete); +} + +template +std::optional ReliableStorageLock::try_take_lock() const { + auto [existing_locks, latest] = get_all_locks(); + if (latest.has_value()) { + auto kv = store_->read_sync(RefKey{get_stream_id(latest.value()), KeyType::SLOW_LOCK}, storage::ReadKeyOpts{}); + auto expires = kv.second.template scalar_at(0, 0).value(); + if (expires > ClockType::nanos_since_epoch()) { + // An unexpired lock exists + return std::nullopt; + } + } + return try_take_next_epoch(existing_locks, latest); +} + +template +Epoch ReliableStorageLock::retry_until_take_lock() const { + // We don't use the ExponentialBackoff because we want to be able to wait indefinitely + auto max_wait = std::chrono::duration_cast(std::chrono::nanoseconds(timeout())); + auto min_wait = max_wait / 16; + auto current_wait = min_wait; + std::minstd_rand gen(std::random_device{}()); + std::uniform_real_distribution<> dist(1.0, 1.2); + auto jittered_wait = [&]() { + auto factor = dist(gen); + return current_wait * factor; + }; + + auto aquired_epoch = try_take_lock(); + while (!aquired_epoch.has_value()) { + std::this_thread::sleep_for(jittered_wait()); + current_wait = std::min(current_wait * 2, max_wait); + aquired_epoch = try_take_lock(); + } + return aquired_epoch.value(); +} + +template +std::optional ReliableStorageLock::try_extend_lock(Epoch held_lock_epoch) const { + auto [existing_locks, latest] = get_all_locks(); + util::check(latest.has_value() && latest.value() >= held_lock_epoch, + "We are trying to extend a newer epoch than the existing one in storage. Extend epoch: {}", + held_lock_epoch); + if (latest.value() != held_lock_epoch) { + // We have lost the lock while holding it (most likely due to timeout). + return std::nullopt; + } + return try_take_next_epoch(existing_locks, latest); +} + +template +void ReliableStorageLock::free_lock(Epoch held_lock_epoch) const { + auto [existing_locks, latest_epoch] = get_all_locks(); + util::check(latest_epoch.has_value() && latest_epoch.value() >= held_lock_epoch, + "We are trying to free a newer epoch than the existing one in storage. Free epoch: {}, Existing epoch: {}", + held_lock_epoch, latest_epoch); + if (latest_epoch.value() != held_lock_epoch) { + // Lock is already lost + return; + } + auto lock_stream_id = get_stream_id(held_lock_epoch); + auto expiration = 0; // Write 0 to mark the lock as expired + store_->write_sync(KeyType::SLOW_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); +} + +template +std::optional ReliableStorageLock::try_take_next_epoch(const std::vector& existing_locks, std::optional latest) const { + Epoch epoch = get_next_epoch(latest); + auto lock_stream_id = get_stream_id(epoch); + auto expiration = ClockType::nanos_since_epoch() + timeout_; + try { + store_->write_if_none_sync(KeyType::SLOW_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration)); + } catch (const StorageException& e) { + // There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException. + // Either way it's safe to assume we have failed to aquire the lock in case of transient S3 error. + // If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate + // the transient error. + log::lock().warn("Failed to aquire lock (likely someone aquired it before us): {}", e.what()); + return std::nullopt; + } + // We clear old locks only after aquiring the lock to avoid unnecessary deletes + clear_old_locks(epoch, existing_locks); + return epoch; +} + +ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock) : + lock_(lock), aquired_epoch_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { + aquired_epoch_ = lock_.retry_until_take_lock(); + util::check(aquired_epoch_.has_value(), "We should have waited until we surely aquire a lock"); + // We heartbeat 5 times per lock timeout to extend the lock. + auto hearbeat_frequency = std::chrono::duration_cast( + std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT)); + extend_lock_heartbeat_.addFunction( + [that=this](){ + if (that->aquired_epoch_.has_value()) { + that->aquired_epoch_ = that->lock_.try_extend_lock(that->aquired_epoch_.value()); + if (!that->aquired_epoch_.has_value()) { + // Clean up if we have lost the lock. + that->cleanup_on_lost_lock(); + } + } + }, hearbeat_frequency, "Extend lock", hearbeat_frequency); + extend_lock_heartbeat_.start(); +} + +void ReliableStorageLockGuard::cleanup_on_lost_lock() { + // We do not use shutdown because we don't want to run it from within a FunctionScheduler thread to avoid a deadlock + extend_lock_heartbeat_.cancelAllFunctions(); + on_lost_lock_(); +} + +void ReliableStorageLockGuard::free_lock_and_cleanup() { + extend_lock_heartbeat_.shutdown(); + if (aquired_epoch_.has_value()) { + lock_.free_lock(aquired_epoch_.value()); + } +} + +ReliableStorageLockGuard::~ReliableStorageLockGuard() { + free_lock_and_cleanup(); +} + +} + +} diff --git a/cpp/arcticdb/util/storage_lock.hpp b/cpp/arcticdb/util/storage_lock.hpp index 7fdd45a443..61963e2e5e 100644 --- a/cpp/arcticdb/util/storage_lock.hpp +++ b/cpp/arcticdb/util/storage_lock.hpp @@ -70,6 +70,8 @@ inline std::thread::id get_thread_id() noexcept { return std::this_thread::get_id(); } +// This StorageLock is inherently unreliable. It does not use atomic operations and it is possible for two processes to aquire if the timing is right. +// If you want a reliable alternative which is slower but uses atomic primitives you can look at the `ReliableStorageLock`. template class StorageLock { // 1 Day diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp new file mode 100644 index 0000000000..6f8e18c507 --- /dev/null +++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp @@ -0,0 +1,111 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace lock; + +// These tests test the actual implementation + +TEST(ReliableStorageLock, SingleThreaded) { + auto store = std::make_shared(); + using Clock = PilotedClockNoAutoIncrement; + // We have 2 locks, one with timeout of 20 and another with a timeout of 10 + ReliableStorageLock lock1{StringId{"test_lock"}, store, 20}; + ReliableStorageLock lock2{StringId{"test_lock"}, store, 10}; + + // We take the first lock at 0 and it should not expire until 20 + Clock::time_ = 0; + ASSERT_EQ(lock1.try_take_lock(), std::optional{0}); + Clock::time_ = 5; + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 10; + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + Clock::time_ = 19; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + + // Once the first lock has expired we can take a new lock with epoch=1 + Clock::time_ = 20; + ASSERT_EQ(lock2.try_take_lock(), std::optional{1}); + + // We can extend the lock timeout at 25 to 35 and get an epoch=2 + Clock::time_ = 25; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + ASSERT_EQ(lock2.try_extend_lock(1), std::optional{2}); + Clock::time_ = 34; + ASSERT_EQ(lock1.try_take_lock(), std::nullopt); + + // At time 35 the lock with epoch=2 has expired and we can re-aquire the lock + Clock::time_ = 35; + ASSERT_EQ(lock1.try_take_lock(), std::optional{3}); + ASSERT_EQ(lock2.try_take_lock(), std::nullopt); + + // And we can free the lock immediately to allow re-aquiring without waiting for timeout + lock1.free_lock(3); + ASSERT_EQ(lock2.try_take_lock(), std::optional{4}); +} + +struct SlowIncrementTask : async::BaseTask { + int& cnt_; + const ReliableStorageLock<>& lock_; + std::chrono::milliseconds sleep_time_; + bool lock_lost_ = false; + + SlowIncrementTask(int& cnt, ReliableStorageLock<>& lock, std::chrono::milliseconds sleep_time) : + cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {} + + void operator()() { + auto guard = ReliableStorageLockGuard(lock_, [that = this](){ + that->lock_lost_ = true; + }); + auto value_before_sleep = cnt_; + std::this_thread::sleep_for(sleep_time_); + if (lock_lost_) { + // Return early on a lost lock. We will raise an issue if this happens. + return; + } + cnt_ = value_before_sleep + 1; + } +}; + + +TEST(ReliableStorageLock, StressMultiThreaded) { + // It is hard to use a piloted clock for these tests because the folly::FunctionScheduler we use for the lock + // extensions doesn't support a custom clock. Thus this test will need to run for about half a minute. + auto threads = 100u; + folly::FutureExecutor exec{threads}; + auto store = std::make_shared(); + ReliableStorageLock<> lock{StringId{"test_lock"}, store, ONE_SECOND / 4}; + + int counter = 0; + + std::vector> futures; + for(auto i = 0u; i < threads; ++i) { + // We use both fast and slow tasks to test both fast lock frees and lock extensions + auto sleep_time = std::chrono::milliseconds(i%2 * 500); + futures.emplace_back(exec.addFuture(SlowIncrementTask{counter, lock, sleep_time})); + } + folly::collectAll(futures).get(); + + // If no locks were lost and no races happened each thread will have incremented the counter exactly once + ASSERT_EQ(counter, threads); + + // We should be clearing locks to not allow them to accumulate + auto number_of_lock_keys = 0; + store->iterate_type(KeyType::SLOW_LOCK, [&number_of_lock_keys](VariantKey&& _ [[maybe_unused]]){++number_of_lock_keys;}); + ASSERT_LE(number_of_lock_keys, KEEP_LAST_N_LOCKS); + + // Also the lock should be free by the end (i.e. we can take a new lock) + ASSERT_EQ(lock.try_take_lock().has_value(), true); +} diff --git a/cpp/vcpkg b/cpp/vcpkg index c82f746672..6e1219d080 160000 --- a/cpp/vcpkg +++ b/cpp/vcpkg @@ -1 +1 @@ -Subproject commit c82f74667287d3dc386bce81e44964370c91a289 +Subproject commit 6e1219d080fc0525f55bf8c8ba57d8ab2ce35422 diff --git a/cpp/vcpkg.json b/cpp/vcpkg.json index 796910d8e9..e8f8ee68bf 100644 --- a/cpp/vcpkg.json +++ b/cpp/vcpkg.json @@ -31,16 +31,53 @@ "features": [ "push", "pull" ] }, "mongo-cxx-driver", - { - "name": "aws-c-io", - "$version reason": "To pick up https://github.com/awslabs/aws-c-io/pull/515" - }, { "name": "aws-sdk-cpp", - "$version reason": "Minimum version in the baseline that works with aws-c-io above.", + "version>=": "1.11.405", + "$version reason": "Version which contains atomic put operations", "default-features": false, "features": [ "s3" ] }, + { + "name": "aws-crt-cpp", + "version>=": "0.28.3" + }, + { + "name": "aws-c-s3", + "version>=": "0.6.6" + }, + { + "name": "aws-c-io", + "version>=": "0.14.18" + }, + { + "name": "aws-c-common", + "version>=": "0.9.28" + }, + { + "name": "aws-c-auth", + "version>=": "0.7.31" + }, + { + "name": "aws-c-cal", + "version>=": "0.7.4" + }, + { + "name": "aws-c-http", + "version>=": "0.8.10" + }, + { + "name": "aws-c-sdkutils", + "version>=": "0.1.19" + }, + { + "name": "aws-c-event-stream", + "version>=": "0.4.3" + }, + { + "name": "aws-checksums", + "version>=": "0.1.20" + }, "boost-dynamic-bitset", "boost-interprocess", "boost-callable-traits", @@ -68,9 +105,7 @@ ], "overrides": [ { "name": "openssl", "version-string": "3.3.0" }, - { "name": "aws-sdk-cpp", "version": "1.11.201" }, { "name": "azure-core-cpp", "version": "1.12.0" }, - { "name": "aws-c-s3", "version": "0.3.24" }, { "name": "benchmark", "version": "1.9.0" }, { "name": "bitmagic", "version": "7.12.3" }, { "name": "boost-algorithm", "version": "1.84.0" },