diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index a292f0aad81..fed575a8178 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -280,6 +280,10 @@ bool supports_prefix_matching() const override { return library_->supports_prefix_matching(); } +bool supports_atomic_writes() const override { + return library_->supports_atomic_writes(); +} + std::string key_path(const VariantKey& key) const { return library_->key_path(key); } diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 2b926e5cbe5..1c679cfab67 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -55,6 +55,10 @@ class AzureStorage final : public Storage { return true; } + bool do_supports_atomic_writes() const final { + return false; + } + bool do_fast_delete() final { return false; } diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.hpp b/cpp/arcticdb/storage/file/mapped_file_storage.hpp index 44d9fd365ac..ee4b3954622 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.hpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.hpp @@ -50,6 +50,10 @@ class MappedFileStorage final : public SingleFileStorage { return false; }; + bool do_supports_atomic_writes() const final { + return false; + } + std::string do_key_path(const VariantKey&) const override { return {}; } bool do_fast_delete() override; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index feff46053c1..2f0d3969bc8 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -163,6 +163,8 @@ class Library { bool supports_prefix_matching() const { return storages_->supports_prefix_matching(); } + bool supports_atomic_writes() const { return storages_->supports_atomic_writes(); } + const LibraryPath &library_path() const { return library_path_; } OpenMode open_mode() const { return storages_->open_mode(); } diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp index 9c6fa838ccd..9c8bf9cbdcd 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp @@ -52,6 +52,10 @@ class LmdbStorage final : public Storage { return false; }; + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; void cleanup() override; diff --git a/cpp/arcticdb/storage/memory/memory_storage.hpp b/cpp/arcticdb/storage/memory/memory_storage.hpp index 34eda07a673..6cec3994945 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.hpp +++ b/cpp/arcticdb/storage/memory/memory_storage.hpp @@ -45,6 +45,10 @@ namespace arcticdb::storage::memory { return false; } + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) final; diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.hpp b/cpp/arcticdb/storage/mongo/mongo_storage.hpp index bf37f5d5daf..fe505b469c0 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.hpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.hpp @@ -47,6 +47,10 @@ class MongoStorage final : public Storage { return false; } + bool do_supports_atomic_writes() const final { + return false; + } + inline bool do_fast_delete() final; bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final; diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 9310f2e7790..015fff9bc20 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -60,6 +60,13 @@ class S3Storage final : public Storage { return true; } + bool do_supports_atomic_writes() const final { + // There is no way to differentiate whether an s3 backed supports atomic writes. As of Nov 2024 S3 and MinIO + // support atomic If-None-Match and If-Match put operations. Unfortunately if we're running on VAST or PURE + // these would just work like regular PUTs with no way to know. + return true; + }; + bool do_fast_delete() final { return false; } diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index cca4f3902b0..27abfb5b238 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -152,6 +152,10 @@ class Storage { return do_supports_prefix_matching(); } + bool supports_atomic_writes() const { + return do_supports_atomic_writes(); + } + bool fast_delete() { return do_fast_delete(); } @@ -202,6 +206,8 @@ class Storage { virtual bool do_supports_prefix_matching() const = 0; + virtual bool do_supports_atomic_writes() const = 0; + virtual bool do_fast_delete() = 0; // Stop iteration and return true upon the first key k for which visitor(k) is true, return false if no key matches diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index e046c4a00d9..5d5883d9637 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -59,6 +59,10 @@ class Storages { return primary().supports_prefix_matching(); } + bool supports_atomic_writes() const { + return primary().supports_atomic_writes(); + } + bool fast_delete() { return primary().fast_delete(); } diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index e2927fa0204..255fb4b5eb3 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -39,6 +39,10 @@ namespace arcticdb { return false; } + bool supports_atomic_writes() const override { + return true; + } + bool fast_delete() override { return false; } diff --git a/cpp/arcticdb/stream/stream_sink.hpp b/cpp/arcticdb/stream/stream_sink.hpp index 14d1e0e64a7..fe90e26705f 100644 --- a/cpp/arcticdb/stream/stream_sink.hpp +++ b/cpp/arcticdb/stream/stream_sink.hpp @@ -99,6 +99,8 @@ struct StreamSink { const StreamId &stream_id, SegmentInMemory &&segment) = 0; + virtual bool supports_atomic_writes() const = 0; + virtual entity::VariantKey write_if_none_sync( KeyType key_type, const StreamId &stream_id, diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp index a027ca8b290..56b9f5851bb 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.hpp +++ b/cpp/arcticdb/util/reliable_storage_lock.hpp @@ -47,11 +47,12 @@ class ReliableStorageLock { timestamp timeout_; }; -// The ReliableStorageLockGuard aquires a ReliableStorageLock on construction and frees it on destruction. While the lock -// is held it periodically extends its timeout in a heartbeating thread. +// The ReliableStorageLockGuard protects an aquired ReliableStorageLock::Epoch and frees it on destruction. While the lock +// is held it periodically extends its timeout in a heartbeating thread. If for some reason the lock is lost we get notified +// via the on_lock_lost. class ReliableStorageLockGuard { public: - ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock); + ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock); ~ReliableStorageLockGuard(); private: diff --git a/cpp/arcticdb/util/reliable_storage_lock.tpp b/cpp/arcticdb/util/reliable_storage_lock.tpp index 27bb7400e08..a41b932f161 100644 --- a/cpp/arcticdb/util/reliable_storage_lock.tpp +++ b/cpp/arcticdb/util/reliable_storage_lock.tpp @@ -40,6 +40,7 @@ SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) { template ReliableStorageLock::ReliableStorageLock(const std::string &base_name, const std::shared_ptr store, timestamp timeout) : base_name_(base_name), store_(store), timeout_(timeout) { + storage::check(store_.supports_atomic_writes()); auto s3_timeout = ConfigsMap::instance()->get_int("S3Storage.RequestTimeoutMs", 200000) * ONE_MILLISECOND; if (2 * s3_timeout > timeout) { log::lock().warn( @@ -195,10 +196,9 @@ std::optional ReliableStorageLock::try_take_next_epoch(const s return epoch; } -ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock) : +ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock) : lock_(lock), aquired_epoch_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) { - aquired_epoch_ = lock_.retry_until_take_lock(); - util::check(aquired_epoch_.has_value(), "We should have waited until we surely aquire a lock"); + aquired_epoch_ = aquired_epoch; // We heartbeat 5 times per lock timeout to extend the lock. auto hearbeat_frequency = std::chrono::duration_cast( std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT)); @@ -229,7 +229,8 @@ ReliableStorageLockGuard::~ReliableStorageLockGuard() { } void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) { - guard = std::make_shared(lock, [](){ + auto aquired = lock.retry_until_take_lock(); + guard = std::make_shared(lock, aquired, [](){ throw LostReliableLock(); }); } diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp index 033081c539e..d500c010e0d 100644 --- a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp +++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp @@ -82,7 +82,8 @@ struct SlowIncrementTask : async::BaseTask { cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {} void operator()() { - auto guard = ReliableStorageLockGuard(lock_, [that = this](){ + auto aquired = lock_.retry_until_take_lock(); + auto guard = ReliableStorageLockGuard(lock_, aquired, [that = this](){ that->lock_lost_ = true; }); auto value_before_sleep = cnt_;