Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lib tool upgrades 44x #1934

Draft
wants to merge 4 commits into
base: 4.4.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <arcticdb/processing/clause.hpp>
#include <arcticdb/storage/key_segment_pair.hpp>

namespace arcticdb::toolbox::apy{
class LibraryTool;
}

namespace arcticdb::async {

std::pair<VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
Expand Down Expand Up @@ -360,6 +364,7 @@ std::vector<folly::Future<bool>> batch_key_exists(
}

private:
friend class arcticdb::toolbox::apy::LibraryTool;
std::shared_ptr<storage::Library> library_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_;
const EncodingVersion encoding_version_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/stream/append_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ void append_incomplete(
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);

SegmentInMemory incomplete_segment_from_frame(
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
size_t existing_rows,
std::optional<entity::AtomKey>&& prev_key,
bool allow_sparse);

std::optional<int64_t> latest_incomplete_timestamp(
const std::shared_ptr<Store>& store,
const StreamId& stream_id
Expand Down
24 changes: 16 additions & 8 deletions cpp/arcticdb/stream/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <arcticdb/stream/stream_reader.hpp>
#include <arcticdb/stream/stream_writer.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/entity/protobuf_mappings.hpp>

namespace py = pybind11;

Expand Down Expand Up @@ -48,9 +49,8 @@ void register_types(py::module &m) {
DATA_TYPE(NANOSECONDS_UTC64)
DATA_TYPE(ASCII_FIXED64)
DATA_TYPE(ASCII_DYNAMIC64)
//DATA_TYPE(UTF8_STRING)
// DATA_TYPE(BYTES)
// DATA_TYPE(PICKLE)
DATA_TYPE(UTF_FIXED64)
DATA_TYPE(UTF_DYNAMIC64)
#undef DATA_TYPE
;

Expand Down Expand Up @@ -97,11 +97,19 @@ void register_types(py::module &m) {
})
);

python_util::add_repr(py::class_<TimeseriesDescriptor>(m, "TimeseriesDescriptor")
.def("fields", [](const TimeseriesDescriptor& desc){
return field_collection_to_ref_vector(desc.fields());
})
);
py::class_<TimeseriesDescriptor>(m, "TimeseriesDescriptor")
.def_property_readonly("fields", [](const TimeseriesDescriptor& desc){
return field_collection_to_ref_vector(desc.fields());
}).def_property_readonly("normalization", [](const TimeseriesDescriptor& self) {
return python_util::pb_to_python(self.proto().normalization());
}).def_property_readonly("total_rows", [](const TimeseriesDescriptor& self) {
return self.proto().total_rows();
}).def_property_readonly("next_key", [](const TimeseriesDescriptor& self) -> std::optional<AtomKey> {
if (self.proto().has_next_key()){
return decode_key(self.proto().next_key());
}
return std::nullopt;
});

py::class_<PyTimestampRange>(m, "TimestampRange")
.def(py::init<const py::object &, const py::object &>())
Expand Down
77 changes: 62 additions & 15 deletions cpp/arcticdb/toolbox/library_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,67 +11,114 @@
#include <arcticdb/codec/default_codecs.hpp>
#include <arcticdb/entity/atom_key.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/entity/protobuf_mappings.hpp>
#include <arcticdb/entity/types.hpp>
#include <arcticdb/pipeline/pipeline_utils.hpp>
#include <arcticdb/storage/library.hpp>
#include <arcticdb/util/clock.hpp>
#include <arcticdb/util/key_utils.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/version/version_utils.hpp>
#include <arcticdb/stream/append_map.hpp>
#include <cstdlib>

namespace arcticdb::toolbox::apy {

using namespace arcticdb::entity;

LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib) {
store_ = std::make_shared<async::AsyncStore<util::SysClock>>(lib, codec::default_lz4_codec(), encoding_version(lib->config()));
LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib): engine_(lib, util::SysClock()) {}

std::shared_ptr<Store> LibraryTool::store() {
return engine_._test_get_store();
}

async::AsyncStore<>& LibraryTool::async_store() {
return dynamic_cast<async::AsyncStore<>&>(*store());
}

ReadResult LibraryTool::read(const VariantKey& key) {
auto segment = read_to_segment(key);
auto segment_in_memory = decode_segment(std::move(segment));
auto frame_and_descriptor = frame_and_descriptor_from_segment(std::move(segment_in_memory));
return pipelines::make_read_result_from_frame(frame_and_descriptor, to_atom(key));
auto atom_key = util::variant_match(
key,
[](const AtomKey& key){return key;},
// We construct a dummy atom key in case of a RefKey to be able to build the read_result
[](const RefKey& key){return AtomKeyBuilder().build<KeyType::VERSION_REF>(key.id());},
[](const auto&){});
return pipelines::make_read_result_from_frame(frame_and_descriptor, atom_key);
}

Segment LibraryTool::read_to_segment(const VariantKey& key) {
auto kv = store_->read_compressed_sync(key, storage::ReadKeyOpts{});
auto kv = store()->read_compressed_sync(key, storage::ReadKeyOpts{});
util::check(kv.has_segment(), "Failed to read key: {}", key);
kv.segment().force_own_buffer();
return kv.segment();
}

std::optional<google::protobuf::Any> LibraryTool::read_metadata(const VariantKey& key){
return store_->read_metadata(key, storage::ReadKeyOpts{}).get().second;
return store()->read_metadata(key, storage::ReadKeyOpts{}).get().second;
}

StreamDescriptor LibraryTool::read_descriptor(const VariantKey& key){
auto metadata_and_descriptor = store_->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get();
auto metadata_and_descriptor = store()->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get();
return std::get<StreamDescriptor>(metadata_and_descriptor);
}

TimeseriesDescriptor LibraryTool::read_timeseries_descriptor(const VariantKey& key){
return store_->read_timeseries_descriptor(key).get().second;
return store()->read_timeseries_descriptor(key).get().second;
}

void LibraryTool::write(VariantKey key, Segment segment) {
storage::KeySegmentPair kv{std::move(key), std::move(segment)};
store_->write_compressed_sync(std::move(kv));
store()->write_compressed_sync(std::move(kv));
}

void LibraryTool::overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory) {
auto segment = encode_dispatch(std::move(segment_in_memory), *(async_store().codec_), async_store().encoding_version_);
remove(key);
write(key, segment);
}

SegmentInMemory LibraryTool::overwrite_append_data(
VariantKey key,
const py::tuple &item,
const py::object &norm,
const py::object & user_meta) {
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
std::holds_alternative<AtomKey>(key) && std::get<AtomKey>(key).type() == KeyType::APPEND_DATA,
"Can only override APPEND_DATA keys. Received: {}", key);
auto old_segment = read_to_segment(key);
auto old_segment_in_memory = decode_segment(std::move(old_segment));
const auto& tsd = old_segment_in_memory.index_descriptor();
std::optional<AtomKey> next_key = std::nullopt;
if (tsd.proto().has_next_key()){
next_key = decode_key(tsd.proto().next_key());
}

auto stream_id = util::variant_match(key, [](const auto& key){return key.id();});
auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types());
auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse());
overwrite_segment_in_memory(key, segment_in_memory);
return old_segment_in_memory;
}

bool LibraryTool::key_exists(const VariantKey& key) {
return store()->key_exists_sync(key);
}

void LibraryTool::remove(VariantKey key) {
store_->remove_key_sync(std::move(key), storage::RemoveOpts{});
store()->remove_key_sync(std::move(key), storage::RemoveOpts{});
}

void LibraryTool::clear_ref_keys() {
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store_, false);
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store(), false);
}

std::vector<VariantKey> LibraryTool::find_keys(entity::KeyType kt) {
std::vector<VariantKey> res;

store_->iterate_type(kt, [&](VariantKey &&found_key) {
store()->iterate_type(kt, [&](VariantKey &&found_key) {
res.emplace_back(found_key);
}, "");
return res;
Expand All @@ -84,12 +131,12 @@ int LibraryTool::count_keys(entity::KeyType kt) {
count++;
};

store_->iterate_type(kt, visitor, "");
store()->iterate_type(kt, visitor, "");
return count;
}

std::vector<bool> LibraryTool::batch_key_exists(const std::vector<VariantKey>& keys) {
auto key_exists_fut = store_->batch_key_exists(keys);
auto key_exists_fut = store()->batch_key_exists(keys);
return folly::collect(key_exists_fut).get();
}

Expand All @@ -106,12 +153,12 @@ std::vector<VariantKey> LibraryTool::find_keys_for_id(entity::KeyType kt, const
}
};

store_->iterate_type(kt, visitor, string_id);
store()->iterate_type(kt, visitor, string_id);
return res;
}

std::string LibraryTool::get_key_path(const VariantKey& key) {
return store_->key_path(key);
return async_store().key_path(key);
}

std::optional<std::string> LibraryTool::inspect_env_variable(std::string name){
Expand Down
14 changes: 10 additions & 4 deletions cpp/arcticdb/toolbox/library_tool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/entity/read_result.hpp>
#include <arcticdb/version/local_versioned_engine.hpp>

#include <memory>

Expand Down Expand Up @@ -45,10 +46,16 @@ class LibraryTool {

void write(VariantKey key, Segment segment);

void overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory);

SegmentInMemory overwrite_append_data(VariantKey key, const py::tuple &item, const py::object &norm, const py::object & user_meta);

void remove(VariantKey key);

std::vector<VariantKey> find_keys(arcticdb::entity::KeyType);

bool key_exists(const VariantKey& key);

std::vector<bool> batch_key_exists(const std::vector<VariantKey>& keys);

std::string get_key_path(const VariantKey& key);
Expand All @@ -62,10 +69,9 @@ class LibraryTool {
std::optional<std::string> inspect_env_variable(std::string name);

private:
// TODO: Remove the shared_ptr and just keep the store.
// The only reason we use a shared_ptr for the store is to be able to pass it to delete_all_keys_of_type.
// We can remove the shared_ptr when delete_all_keys_of_type takes a const ref instead of a shared pointer.
std::shared_ptr<arcticdb::async::AsyncStore<util::SysClock>> store_;
std::shared_ptr<Store> store();
async::AsyncStore<>& async_store();
version_store::LocalVersionedEngine engine_;
};

} //namespace arcticdb::toolbox::apy
3 changes: 3 additions & 0 deletions cpp/arcticdb/toolbox/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void register_bindings(py::module &m) {
}))
.def("read_to_segment", &LibraryTool::read_to_segment)
.def("read_metadata", &LibraryTool::read_metadata)
.def("key_exists", &LibraryTool::key_exists)
.def("read_descriptor", &LibraryTool::read_descriptor, R"pbdoc(
Gives the <StreamDescriptor> for a Variant key. The Stream Descriptor contains the <FieldRef>s for all fields in
the value written for that key.
Expand All @@ -49,6 +50,8 @@ void register_bindings(py::module &m) {
E.g. an Index key for a symbol which has columns "index" and "col" will have <FieldRef>s for those columns.
)pbdoc")
.def("write", &LibraryTool::write)
.def("overwrite_segment_in_memory", &LibraryTool::overwrite_segment_in_memory)
.def("overwrite_append_data", &LibraryTool::overwrite_append_data)
.def("remove", &LibraryTool::remove)
.def("find_keys", &LibraryTool::find_keys)
.def("count_keys", &LibraryTool::count_keys)
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ class LocalVersionedEngine : public VersionedEngine {
return store()->current_timestamp();
}

const arcticdb::proto::storage::VersionStoreConfig& cfg() const override { return cfg_; }

protected:
VersionedItem compact_incomplete_dynamic(
const StreamId& stream_id,
Expand All @@ -429,7 +431,6 @@ class LocalVersionedEngine : public VersionedEngine {
);

std::shared_ptr<Store>& store() override { return store_; }
const arcticdb::proto::storage::VersionStoreConfig& cfg() const override { return cfg_; }
std::shared_ptr<VersionMap>& version_map() override { return version_map_; }
SymbolList& symbol_list() override { return *symbol_list_; }
std::shared_ptr<SymbolList> symbol_list_ptr() { return symbol_list_; }
Expand Down
58 changes: 55 additions & 3 deletions python/arcticdb/toolbox/library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
import pandas as pd

from arcticdb.version_store._normalization import FrameData
from arcticdb.supported_types import ExplicitlySupportedDates
from arcticdb_ext.codec import decode_segment
from arcticdb_ext.storage import KeyType
from arcticdb_ext.stream import SegmentInMemory
from arcticdb_ext.tools import LibraryTool as LibraryToolImpl
from arcticdb_ext.version_store import AtomKey, PythonOutputFrame, RefKey
from arcticdb.version_store._normalization import denormalize_dataframe
from arcticdb.version_store._normalization import denormalize_dataframe, normalize_dataframe

VariantKey = Union[AtomKey, RefKey]
VersionQueryInput = Union[int, str, ExplicitlySupportedDates, None]

_KEY_PROPERTIES = {
key_type: {k: v for k, v in vars(key_type).items() if isinstance(v, property)} for key_type in (AtomKey, RefKey)
Expand All @@ -31,6 +33,10 @@ def props_dict_to_atom_key(d: Dict[str, Any]) -> AtomKey:


class LibraryTool(LibraryToolImpl):
def __init__(self, library, nvs):
super().__init__(library)
self._nvs = nvs

@staticmethod
def key_types() -> List[KeyType]:
return list(KeyType.__members__.values())
Expand All @@ -49,7 +55,7 @@ def dataframe_to_keys(
int(row.version_id),
int(row.creation_ts),
int(row.content_hash),
int(index.timestamp()),
index.value,
row.end_index.value,
key_type,
)
Expand Down Expand Up @@ -135,4 +141,50 @@ def read_to_keys(
8243267225673136445
"""
df = self.read_to_dataframe(key)
return self.dataframe_to_keys(df, id if id is not None else key.id, filter_key_type)
return self.dataframe_to_keys(df, id if id is not None else key.id, filter_key_type)

def read_index(self, symbol: str, as_of: Optional[VersionQueryInput] = None, **kwargs) -> pd.DataFrame:
"""
Read the index key for the named symbol.

Parameters
----------
symbol : `str`
Symbol name.
as_of : `Optional[VersionQueryInput]`, default=None
See documentation of `read` method for more details.

Returns
-------
Pandas DataFrame representing the index key in a human-readable format.
"""
return self._nvs.read_index(symbol, as_of, **kwargs)

def normalize_dataframe_with_nvs_defaults(self, df : pd.DataFrame):
# TODO: Have a unified place where we resolve all the normalization parameters and use that here.
# Currently all these parameters are resolved in various places throughout the _store.py. This can result in
# different defaults for different operations which is not desirable.
write_options = self._nvs._lib_cfg.lib_desc.version.write_options
dynamic_schema = self._nvs.resolve_defaults("dynamic_schema", write_options, False)
empty_types = self._nvs.resolve_defaults("empty_types", write_options, False)
dynamic_strings = self._nvs._resolve_dynamic_strings({})
return normalize_dataframe(df, dynamic_schema=dynamic_schema, empty_types=empty_types, dynamic_strings=dynamic_strings)

def overwrite_append_data_with_dataframe(self, key : VariantKey, df : pd.DataFrame) -> SegmentInMemory:
"""
Overwrites the append data key with the provided dataframe. Use with extreme caution as overwriting with
inappropriate data can render the symbol unreadable.

Returns
-------
SegmentInMemory backup of what was stored in the key before it was overwritten. Can be used with
lib_tool.overwrite_segment_in_memory to back out the change caused by this in case data ends up corrupted.
"""
item, norm_meta = self.normalize_dataframe_with_nvs_defaults(df)
return self.overwrite_append_data(key, item, norm_meta, None)

def update_append_data_column_type(self, key : VariantKey, column : str, to_type : type) -> SegmentInMemory:
old_df = self.read_to_dataframe(key)
assert column in old_df.columns
new_df = old_df.astype({column: to_type})
return self.overwrite_append_data_with_dataframe(key, new_df)
Loading
Loading