Skip to content

Commit

Permalink
Add a supports_atomic_writes predicate to storages.
Browse files Browse the repository at this point in the history
Currently all backends are unsupported apart from S3 (for which only some providers like AWS support it).

Unfotrunately it's impossible to differentiate between aws and e.g. vast
backends apart from looking at the endpoint which can be subject to
rerouting etc.

This commit also reworks the Guard to work only with aquired locks.
  • Loading branch information
IvoDD committed Nov 27, 2024
1 parent e140bca commit 9853ee8
Show file tree
Hide file tree
Showing 17 changed files with 65 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ bool supports_prefix_matching() const override {
return library_->supports_prefix_matching();
}

bool supports_atomic_writes() const override {
return library_->supports_atomic_writes();
}

std::string key_path(const VariantKey& key) const {
return library_->key_path(key);
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/azure/azure_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class AzureStorage final : public Storage {
return true;
}

bool do_supports_atomic_writes() const final {
return false;
}

bool do_fast_delete() final {
return false;
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class MappedFileStorage final : public SingleFileStorage {
return false;
};

bool do_supports_atomic_writes() const final {
return false;
}

std::string do_key_path(const VariantKey&) const override { return {}; }

bool do_fast_delete() override;
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class Library {

bool supports_prefix_matching() const { return storages_->supports_prefix_matching(); }

bool supports_atomic_writes() const { return storages_->supports_atomic_writes(); }

const LibraryPath &library_path() const { return library_path_; }

OpenMode open_mode() const { return storages_->open_mode(); }
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/lmdb/lmdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class LmdbStorage final : public Storage {
return false;
};

bool do_supports_atomic_writes() const final {
return false;
}

inline bool do_fast_delete() final;

void cleanup() override;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/memory/memory_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ namespace arcticdb::storage::memory {
return false;
}

bool do_supports_atomic_writes() const final {
return false;
}

inline bool do_fast_delete() final;

bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) final;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/mongo/mongo_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class MongoStorage final : public Storage {
return false;
}

bool do_supports_atomic_writes() const final {
return false;
}

inline bool do_fast_delete() final;

bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
.value("STORAGE_INFO", KeyType::STORAGE_INFO)
.value("APPEND_REF", KeyType::APPEND_REF)
.value("LOCK", KeyType::LOCK)
.value("SLOW_LOCK", KeyType::SLOW_LOCK)
.value("SNAPSHOT_REF", KeyType::SNAPSHOT_REF)
.value("TOMBSTONE", KeyType::TOMBSTONE)
.value("APPEND_DATA", KeyType::APPEND_DATA)
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class NfsBackedStorage final : public Storage {
return true;
}

bool do_supports_atomic_writes() const final {
return false;
}

bool do_fast_delete() final {
return false;
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class S3Storage final : public Storage {
return true;
}

bool do_supports_atomic_writes() const final {
// There is no way to differentiate whether an s3 backed supports atomic writes. As of Nov 2024 S3 and MinIO
// support atomic If-None-Match and If-Match put operations. Unfortunately if we're running on VAST or PURE
// these would just work like regular PUTs with no way to know.
return true;
};

bool do_fast_delete() final {
return false;
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class Storage {
return do_supports_prefix_matching();
}

bool supports_atomic_writes() const {
return do_supports_atomic_writes();
}

bool fast_delete() {
return do_fast_delete();
}
Expand Down Expand Up @@ -202,6 +206,8 @@ class Storage {

virtual bool do_supports_prefix_matching() const = 0;

virtual bool do_supports_atomic_writes() const = 0;

virtual bool do_fast_delete() = 0;

// Stop iteration and return true upon the first key k for which visitor(k) is true, return false if no key matches
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/storages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class Storages {
return primary().supports_prefix_matching();
}

bool supports_atomic_writes() const {
return primary().supports_atomic_writes();
}

bool fast_delete() {
return primary().fast_delete();
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/test/in_memory_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ namespace arcticdb {
return false;
}

bool supports_atomic_writes() const override {
return true;
}

bool fast_delete() override {
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/stream/stream_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ struct StreamSink {
const StreamId &stream_id,
SegmentInMemory &&segment) = 0;

virtual bool supports_atomic_writes() const = 0;

virtual entity::VariantKey write_if_none_sync(
KeyType key_type,
const StreamId &stream_id,
Expand Down
7 changes: 4 additions & 3 deletions cpp/arcticdb/util/reliable_storage_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ class ReliableStorageLock {
timestamp timeout_;
};

// The ReliableStorageLockGuard aquires a ReliableStorageLock on construction and frees it on destruction. While the lock
// is held it periodically extends its timeout in a heartbeating thread.
// The ReliableStorageLockGuard protects an aquired ReliableStorageLock::Epoch and frees it on destruction. While the lock
// is held it periodically extends its timeout in a heartbeating thread. If for some reason the lock is lost we get notified
// via the on_lock_lost.
class ReliableStorageLockGuard {
public:
ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock);
ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock);

~ReliableStorageLockGuard();
private:
Expand Down
9 changes: 5 additions & 4 deletions cpp/arcticdb/util/reliable_storage_lock.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ SegmentInMemory lock_segment(const StreamId &name, timestamp expiration) {
template <class ClockType>
ReliableStorageLock<ClockType>::ReliableStorageLock(const std::string &base_name, const std::shared_ptr<Store> store, timestamp timeout) :
base_name_(base_name), store_(store), timeout_(timeout) {
storage::check<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>(store_->supports_atomic_writes(), "Storage does not support atomic writes, so we can't create a lock");
auto s3_timeout = ConfigsMap::instance()->get_int("S3Storage.RequestTimeoutMs", 200000) * ONE_MILLISECOND;
if (2 * s3_timeout > timeout) {
log::lock().warn(
Expand Down Expand Up @@ -195,10 +196,9 @@ std::optional<Epoch> ReliableStorageLock<ClockType>::try_take_next_epoch(const s
return epoch;
}

ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, folly::Func&& on_lost_lock) :
ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, Epoch aquired_epoch, folly::Func&& on_lost_lock) :
lock_(lock), aquired_epoch_(std::nullopt), on_lost_lock_(std::move(on_lost_lock)) {
aquired_epoch_ = lock_.retry_until_take_lock();
util::check(aquired_epoch_.has_value(), "We should have waited until we surely aquire a lock");
aquired_epoch_ = aquired_epoch;
// We heartbeat 5 times per lock timeout to extend the lock.
auto hearbeat_frequency = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::nanoseconds(lock_.timeout() / EXTENDS_PER_TIMEOUT));
Expand Down Expand Up @@ -229,7 +229,8 @@ ReliableStorageLockGuard::~ReliableStorageLockGuard() {
}

void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) {
guard = std::make_shared<ReliableStorageLockGuard>(lock, [](){
auto aquired = lock.retry_until_take_lock();
guard = std::make_shared<ReliableStorageLockGuard>(lock, aquired, [](){
throw LostReliableLock();
});
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/util/test/test_reliable_storage_lock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ struct SlowIncrementTask : async::BaseTask {
cnt_(cnt), lock_(lock), sleep_time_(sleep_time) {}

void operator()() {
auto guard = ReliableStorageLockGuard(lock_, [that = this](){
auto aquired = lock_.retry_until_take_lock();
auto guard = ReliableStorageLockGuard(lock_, aquired, [that = this](){
that->lock_lost_ = true;
});
auto value_before_sleep = cnt_;
Expand Down

0 comments on commit 9853ee8

Please sign in to comment.