Skip to content

Commit

Permalink
Real S3 storage python tests for ReliableStorageLock
Browse files Browse the repository at this point in the history
Adds a real s3 storage test (currently to be run with persistent storage
tests mark) for the lock.
  • Loading branch information
IvoDD committed Nov 25, 2024
1 parent d4a8942 commit f06bd01
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/python/python_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ PYBIND11_MODULE(arcticdb_ext, m) {
arcticdb::storage::apy::register_bindings(storage_submodule, base_exception);

arcticdb::stream::register_bindings(m);
arcticdb::toolbox::apy::register_bindings(m);
arcticdb::toolbox::apy::register_bindings(m, base_exception);

m.def("get_version_string", &arcticdb::get_arcticdb_version_string);

Expand Down
21 changes: 20 additions & 1 deletion cpp/arcticdb/toolbox/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
#include <arcticdb/version/symbol_list.hpp>
#include <arcticdb/util/pybind_mutex.hpp>
#include <arcticdb/util/storage_lock.hpp>
#include <arcticdb/util/reliable_storage_lock.hpp>

namespace arcticdb::toolbox::apy {

void register_bindings(py::module &m) {
void register_bindings(py::module &m, py::exception<arcticdb::ArcticException>& base_exception) {
auto tools = m.def_submodule("tools", "Library management tool hooks");
using namespace arcticdb::toolbox::apy;
using namespace arcticdb::storage;
Expand Down Expand Up @@ -67,6 +68,24 @@ void register_bindings(py::module &m) {
.def("inspect_env_variable", &LibraryTool::inspect_env_variable)
.def_static("read_unaltered_lib_cfg", &LibraryTool::read_unaltered_lib_cfg);

// Reliable storage lock exposed for integration testing. It is intended for use in C++
using namespace arcticdb::lock;

py::register_exception<LostReliableLock>(tools, "LostReliableLock", base_exception.ptr());

py::class_<ReliableStorageLock<>>(tools, "ReliableStorageLock")
.def(py::init<>([](std::string base_name, std::shared_ptr<Library> lib, timestamp timeout){
auto store = version_store::LocalVersionedEngine(lib)._test_get_store();
return ReliableStorageLock<>(base_name, store, timeout);
}));

py::class_<ReliableStorageLockManager>(tools, "ReliableStorageLockManager")
.def(py::init<>([](){
return ReliableStorageLockManager();
}))
.def("take_lock_guard", &ReliableStorageLockManager::take_lock_guard)
.def("free_lock_guard", &ReliableStorageLockManager::free_lock_guard);

// S3 Storage tool
using namespace arcticdb::storage::s3;
py::class_<S3StorageTool, std::shared_ptr<S3StorageTool>>(tools, "S3Tool")
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/toolbox/python_bindings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace arcticdb::toolbox::apy {

namespace py = pybind11;

void register_bindings(py::module &m);
void register_bindings(py::module &m, py::exception<arcticdb::ArcticException>& base_exception);

} // namespace arcticdb::toolbox::apy

Expand Down
11 changes: 11 additions & 0 deletions cpp/arcticdb/util/reliable_storage_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ class ReliableStorageLockGuard {
folly::FunctionScheduler extend_lock_heartbeat_;
};


// Only used for python tests
struct LostReliableLock : std::exception {};
class ReliableStorageLockManager {
public:
void take_lock_guard(const ReliableStorageLock<>& lock);
void free_lock_guard();
private:
std::optional<std::shared_ptr<ReliableStorageLockGuard>> guard = std::nullopt;
};

}

}
Expand Down
10 changes: 10 additions & 0 deletions cpp/arcticdb/util/reliable_storage_lock.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ ReliableStorageLockGuard::~ReliableStorageLockGuard() {
free_lock_and_cleanup();
}

void ReliableStorageLockManager::take_lock_guard(const ReliableStorageLock<> &lock) {
guard = std::make_shared<ReliableStorageLockGuard>(lock, [](){
throw LostReliableLock();
});
}

void ReliableStorageLockManager::free_lock_guard() {
guard = std::nullopt;
}

}

}
49 changes: 49 additions & 0 deletions python/tests/integration/arcticdb/test_storage_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pandas as pd
import numpy as np
import pytest

from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager
from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK

from multiprocessing import Process, set_start_method
set_start_method("fork") # Okay to fork an S3 lib
import time

from arcticdb.util.test import assert_frame_equal


one_sec = 1_000_000_000


def slow_increment_task(lib, symbol, sleep_time, lock_manager, lock):
lock_manager.take_lock_guard(lock)
df = lib.read(symbol).data
df["col"][0] = df["col"][0] + 1
time.sleep(sleep_time)
lib.write(symbol, df)
lock_manager.free_lock_guard()


@pytest.mark.parametrize("num_processes,max_sleep", [(100, 1), (5, 20)])
@REAL_S3_TESTS_MARK
def test_many_increments(real_s3_version_store, num_processes, max_sleep):
lib = real_s3_version_store
init_df = pd.DataFrame({"col": [0]})
symbol = "counter"
lib.write(symbol, init_df)
lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec)
lock_manager = ReliableStorageLockManager()

processes = [
Process(target=slow_increment_task, args=(lib, symbol, 0 if i%2==0 else max_sleep, lock_manager, lock))
for i in range(num_processes)
]
for p in processes:
p.start()

for p in processes:
p.join()

read_df = lib.read(symbol).data
expected_df = pd.DataFrame({"col": [num_processes]})
assert_frame_equal(read_df, expected_df)

0 comments on commit f06bd01

Please sign in to comment.