Skip to content

Commit

Permalink
Reliable storage lock
Browse files Browse the repository at this point in the history
Introduces a new `ReliableStorageLock` and `ReliableStorageLockGuard` to
be used as a slower but more reliable alternative to the existing
`StorageLock`.

It uses the new If-None-Match atomic put operations in S3.

This commit:
- Upgrades the aws-sdk-cpp in vcpkg (which needed a few additions
  because of some problematic dependencies)
- Adds `write_if_none_match` capability to `AsyncStore`'s S3 and to
  `InMemoryStore`
- Logic for `ReliableStorageLock`
- C++ tests using the `InMemoryStore`

Follow up commit will introduce a python integration test with real aws
s3.
  • Loading branch information
IvoDD committed Nov 25, 2024
1 parent c20c189 commit d4a8942
Show file tree
Hide file tree
Showing 34 changed files with 598 additions and 18 deletions.
7 changes: 5 additions & 2 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ set(arcticdb_srcs
util/slab_allocator.hpp
util/sparse_utils.hpp
util/storage_lock.hpp
util/reliable_storage_lock.hpp
util/string_utils.hpp
util/thread_cached_int.hpp
util/timeouts.hpp
Expand Down Expand Up @@ -511,7 +512,8 @@ set(arcticdb_srcs
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
util/buffer_holder.cpp)
util/buffer_holder.cpp
util/reliable_storage_lock.tpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -957,7 +959,8 @@ if(${TEST})
version/test/version_map_model.hpp
python/python_handlers.cpp
storage/test/common.hpp
version/test/test_sort_index.cpp)
version/test/test_sort_index.cpp
util/test/test_reliable_storage_lock.cpp)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
9 changes: 9 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ entity::VariantKey write_sync(
return WriteSegmentTask{library_}(std::move(encoded));
}

entity::VariantKey write_if_none_sync(
KeyType key_type,
const StreamId &stream_id,
SegmentInMemory &&segment) override {
util::check(is_ref_key_class(key_type), "Expected ref key type got {}", key_type);
auto encoded = EncodeRefTask{key_type, stream_id, std::move(segment), codec_, encoding_version_}();
return WriteIfNoneTask{library_}(std::move(encoded));
}

bool is_path_valid(const std::string_view path) const override {
return library_->is_path_valid(path);
}
Expand Down
17 changes: 17 additions & 0 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,23 @@ struct WriteSegmentTask : BaseTask {
}
};

struct WriteIfNoneTask : BaseTask {
std::shared_ptr<storage::Library> lib_;

explicit WriteIfNoneTask(std::shared_ptr<storage::Library> lib) :
lib_(std::move(lib)) {
}

ARCTICDB_MOVE_ONLY_DEFAULT(WriteIfNoneTask)

VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
ARCTICDB_SAMPLE(WriteSegmentTask, 0)
auto k = key_seg.variant_key();
lib_->write_if_none(std::move(key_seg));
return k;
}
};

struct UpdateSegmentTask : BaseTask {
std::shared_ptr<storage::Library> lib_;
storage::UpdateOpts opts_;
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/entity/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ KeyData get_key_data(KeyType key_type) {
STRING_REF(KeyType::APPEND_REF, aref, 'a')
STRING_KEY(KeyType::MULTI_KEY, mref, 'm')
STRING_REF(KeyType::LOCK, lref, 'x')
STRING_REF(KeyType::SLOW_LOCK, lref, 'x')
STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X')
STRING_KEY(KeyType::APPEND_DATA, app, 'b')
// Unused
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/entity/key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ enum class KeyType : int {
* Used for storing the ids of storages that failed to sync
*/
REPLICATION_FAIL_INFO = 26,
/*
* Used for a list based reliable storage lock
*/
SLOW_LOCK = 27,
UNDEFINED
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/azure/azure_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class AzureStorage final : public Storage {
protected:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class MappedFileStorage final : public SingleFileStorage {

void do_write(Composite<KeySegmentPair>&& kvs) override;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) override;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ class Library {
storages_->write(std::move(kvs));
}

void write_if_none(KeySegmentPair&& kv) {
if (open_mode() < OpenMode::WRITE) {
throw LibraryPermissionException(library_path_, open_mode(), "write");
}

storages_->write_if_none(std::move(kv));
}

void update(Composite<KeySegmentPair>&& kvs, storage::UpdateOpts opts) {
ARCTICDB_SAMPLE(LibraryUpdate, 0)
if (open_mode() < OpenMode::WRITE)
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/lmdb/lmdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class LmdbStorage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/memory/memory_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ namespace arcticdb::storage::memory {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/mongo/mongo_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class MongoStorage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand Down
21 changes: 21 additions & 0 deletions cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ namespace s3 {
});
}

template<class KeyBucketizer>
void do_write_if_none_impl(
KeySegmentPair &&kv,
const std::string &root_folder,
const std::string &bucket_name,
S3ClientWrapper &s3_client,
KeyBucketizer &&bucketizer) {
ARCTICDB_SAMPLE(S3StorageWriteIfNone, 0)
auto key_type_dir = key_type_folder(root_folder, kv.key_type());
auto &k = kv.variant_key();
auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k);
auto &seg = kv.segment();

auto put_object_result = s3_client.put_object(s3_object_name, std::move(seg), bucket_name, true);

if (!put_object_result.is_success()) {
auto& error = put_object_result.get_error();
raise_s3_exception(error, s3_object_name);
}
}

template<class KeyBucketizer>
void do_update_impl(
Composite<KeySegmentPair> &&kvs,
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class NfsBackedStorage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
};

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/storage/s3/s3_client_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class S3ClientWrapper {
virtual S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment&& segment,
const std::string& bucket_name) = 0;
const std::string& bucket_name,
bool if_none_match = false) = 0;

virtual S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/storage/s3/s3_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ S3Result<Segment> MockS3Client::get_object(
S3Result<std::monostate> MockS3Client::put_object(
const std::string &s3_object_name,
Segment &&segment,
const std::string &bucket_name) {
const std::string &bucket_name,
bool if_none_match[[maybe_unused]]) {
auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE);
if (maybe_error.has_value()) {
return {*maybe_error};
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/storage/s3/s3_mock_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class MockS3Client : public S3ClientWrapper {
S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment&& segment,
const std::string& bucket_name) override;
const std::string& bucket_name,
bool if_none_match = false) override;

S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/storage/s3/s3_real_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,16 @@ S3Result<Segment> RealS3Client::get_object(
S3Result<std::monostate> RealS3Client::put_object(
const std::string &s3_object_name,
Segment &&segment,
const std::string &bucket_name) {
const std::string &bucket_name,
bool if_none_match) {

ARCTICDB_SUBSAMPLE(S3StorageWritePreamble, 0)
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(bucket_name.c_str());
request.SetKey(s3_object_name.c_str());
if (if_none_match) {
request.SetIfNoneMatch("*");
}
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Set s3 key {}", request.GetKey().c_str());

auto [dst, write_size, buffer] = segment.serialize_header();
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/storage/s3/s3_real_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class RealS3Client : public S3ClientWrapper {
S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment&& segment,
const std::string& bucket_name) override;
const std::string& bucket_name,
bool if_none_match = false) override;

S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ void S3Storage::do_write(Composite<KeySegmentPair>&& kvs) {
detail::do_write_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{});
}

void S3Storage::do_write_if_none(KeySegmentPair&& kv) {
detail::do_write_if_none_impl(std::move(kv), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{});
}

void S3Storage::do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts) {
detail::do_update_impl(std::move(kvs), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{});
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class S3Storage final : public Storage {
private:
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_write_if_none(KeySegmentPair&& kv) final;

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ class Storage {
return write(Composite<KeySegmentPair>{std::move(kv)});
}

void write_if_none(KeySegmentPair&& kv) {
return do_write_if_none(std::move(kv));
}

void update(Composite<KeySegmentPair> &&kvs, UpdateOpts opts) {
ARCTICDB_SAMPLE(StorageUpdate, 0)
return do_update(std::move(kvs), opts);
Expand Down Expand Up @@ -186,6 +190,8 @@ class Storage {
private:
virtual void do_write(Composite<KeySegmentPair>&& kvs) = 0;

virtual void do_write_if_none(KeySegmentPair&& kv) = 0;

virtual void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) = 0;

virtual void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) = 0;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/storages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class Storages {
primary().write(std::move(kvs));
}

void write_if_none(KeySegmentPair&& kv) {
primary().write_if_none(std::move(kv));
}

void update(Composite<KeySegmentPair>&& kvs, storage::UpdateOpts opts) {
ARCTICDB_SAMPLE(StoragesUpdate, 0)
primary().update(std::move(kvs), opts);
Expand Down
16 changes: 15 additions & 1 deletion cpp/arcticdb/storage/test/in_memory_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ namespace arcticdb {
return write(key_type, stream_id, std::move(segment)).get();
}

entity::VariantKey write_if_none_sync(
KeyType key_type,
const StreamId& stream_id,
SegmentInMemory &&segment) override {
auto key = entity::RefKey{stream_id, key_type};
add_segment(key, std::move(segment), true);
return key;
}

bool is_path_valid(const std::string_view) const override {
return true;
}
Expand Down Expand Up @@ -465,10 +474,15 @@ namespace arcticdb {
seg_by_atom_key_[key] = std::make_unique<SegmentInMemory>(std::move(seg));
}

void add_segment(const RefKey &key, SegmentInMemory &&seg) {
void add_segment(const RefKey &key, SegmentInMemory &&seg, bool if_none_match = false) {
StorageFailureSimulator::instance()->go(FailureType::WRITE);
std::lock_guard lock{mutex_};
ARCTICDB_DEBUG(log::storage(), "Adding segment with key {}", key);
if (if_none_match) {
if (seg_by_ref_key_.find(key) != seg_by_ref_key_.end()) {
storage::raise<ErrorCode::E_UNEXPECTED_S3_ERROR>("Precondition failed. Object is already present.");
}
}
seg_by_ref_key_[key] = std::make_unique<SegmentInMemory>(std::move(seg));
}

Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/stream/piloted_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

namespace arcticdb {
std::atomic<entity::timestamp> PilotedClock::time_;
std::atomic<entity::timestamp> PilotedClockNoAutoIncrement::time_;
}
9 changes: 9 additions & 0 deletions cpp/arcticdb/stream/piloted_clock.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <arcticdb/entity/types.hpp>
#include <arcticdb/util/preprocess.hpp>

Expand All @@ -14,4 +16,11 @@ struct PilotedClock {
}
};

struct PilotedClockNoAutoIncrement {
static std::atomic<entity::timestamp> time_;
static entity::timestamp nanos_since_epoch() {
return PilotedClockNoAutoIncrement::time_;
}
};

} //namespace arcticdb
5 changes: 5 additions & 0 deletions cpp/arcticdb/stream/stream_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ struct StreamSink {
const StreamId &stream_id,
SegmentInMemory &&segment) = 0;

virtual entity::VariantKey write_if_none_sync(
KeyType key_type,
const StreamId &stream_id,
SegmentInMemory &&segment) = 0;

[[nodiscard]] virtual folly::Future<folly::Unit> write_compressed(storage::KeySegmentPair ks) = 0;

virtual void write_compressed_sync(storage::KeySegmentPair ks) = 0;
Expand Down
2 changes: 0 additions & 2 deletions cpp/arcticdb/stream/test/stream_test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@

namespace arcticdb {

std::atomic<timestamp> PilotedClock::time_{0};

} //namespace arcticdb
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \
ERROR_CODE(5003, E_PERMISSION) \
ERROR_CODE(5004, E_RESOURCE_NOT_FOUND) \
ERROR_CODE(5005, E_UNSUPPORTED_ATOMIC_OPERATION) \
ERROR_CODE(5010, E_LMDB_MAP_FULL) \
ERROR_CODE(5011, E_UNEXPECTED_LMDB_ERROR) \
ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \
Expand Down
Loading

0 comments on commit d4a8942

Please sign in to comment.