Skip to content

Commit

Permalink
Bugfix 406 and 1245: Support sorting checks with parallel writes/appe…
Browse files Browse the repository at this point in the history
…nds, and maintain sortedness information in compact_incomplete
  • Loading branch information
alexowens90 committed Jan 19, 2024
1 parent 53a6050 commit 94670ee
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 44 deletions.
4 changes: 4 additions & 0 deletions cpp/arcticdb/pipeline/frame_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,8 @@ std::pair<size_t, size_t> offset_and_row_count(const std::shared_ptr<pipelines::
return std::make_pair(offset, row_count);
}

bool index_is_not_timeseries_or_is_sorted_ascending(const std::shared_ptr<pipelines::InputTensorFrame>& frame) {
return !std::holds_alternative<stream::TimeseriesIndex>(frame->index) || frame->desc.get_sorted() == SortedValue::ASCENDING;
}

}
2 changes: 2 additions & 0 deletions cpp/arcticdb/pipeline/frame_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,6 @@ size_t get_slice_rowcounts(
std::pair<size_t, size_t> offset_and_row_count(
const std::shared_ptr<pipelines::PipelineContext>& context);

bool index_is_not_timeseries_or_is_sorted_ascending(const std::shared_ptr<pipelines::InputTensorFrame>& frame);

} //namespace arcticdb
5 changes: 4 additions & 1 deletion cpp/arcticdb/stream/append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <arcticdb/stream/append_map.hpp>
#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/entity/protobuf_mappings.hpp>
#include <arcticdb/stream/protobuf_mappings.hpp>
#include <arcticdb/stream/stream_source.hpp>
#include <arcticdb/stream/index.hpp>
#include <arcticdb/entity/protobufs.hpp>
Expand Down Expand Up @@ -215,6 +214,10 @@ folly::Future<arcticdb::entity::VariantKey> write_incomplete_frame(
std::optional<AtomKey>&& next_key) {
using namespace arcticdb::pipelines;

if (!index_is_not_timeseries_or_is_sorted_ascending(frame)) {
sorting::raise<ErrorCode::E_UNSORTED_DATA>("When writing/appending staged data in parallel, input data must be sorted.");
}

auto index_range = frame->index_range;
auto segment = incomplete_segment_from_frame(frame, 0, std::move(next_key), false);
return store->write(
Expand Down
47 changes: 28 additions & 19 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ folly::Future<entity::AtomKey> async_write_dataframe_impl(
frame->set_bucketize_dynamic(options.bucketize_dynamic);
auto slicing_arg = get_slicing_policy(options, *frame);
auto partial_key = IndexPartialKey{frame->desc.id(), version_id};
sorting::check<ErrorCode::E_UNSORTED_DATA>(!validate_index || frame->desc.get_sorted() == SortedValue::ASCENDING || !std::holds_alternative<stream::TimeseriesIndex>(frame->index),
"When calling write with validate_index enabled, input data must be sorted.");
if (validate_index && !index_is_not_timeseries_or_is_sorted_ascending(frame)) {
sorting::raise<ErrorCode::E_UNSORTED_DATA>("When calling write with validate_index enabled, input data must be sorted");
}
return write_frame(std::move(partial_key), frame, slicing_arg, store, de_dup_map, sparsify_floats);
}

Expand All @@ -120,17 +121,14 @@ IndexDescriptor::Proto check_index_match(const arcticdb::stream::Index& index, c
}
}

void sorted_data_check_append(InputTensorFrame& frame, index::IndexSegmentReader& index_segment_reader, bool validate_index){
bool is_time_series = std::holds_alternative<stream::TimeseriesIndex>(frame.index);
bool input_data_is_sorted = frame.desc.get_sorted() == SortedValue::ASCENDING;
sorting::check<ErrorCode::E_UNSORTED_DATA>(
input_data_is_sorted || !validate_index || !is_time_series,
"validate_index set but input data index is not sorted");

bool existing_data_is_sorted = index_segment_reader.mutable_tsd().mutable_proto().stream_descriptor().sorted() == arcticdb::proto::descriptors::SortedValue::ASCENDING;
void sorted_data_check_append(const std::shared_ptr<InputTensorFrame>& frame, index::IndexSegmentReader& index_segment_reader){
if (!index_is_not_timeseries_or_is_sorted_ascending(frame)) {
sorting::raise<ErrorCode::E_UNSORTED_DATA>("When calling append with validate_index enabled, input data must be sorted");
}
sorting::check<ErrorCode::E_UNSORTED_DATA>(
existing_data_is_sorted || !validate_index || !is_time_series,
"validate_index set but existing data index is not sorted");
!std::holds_alternative<stream::TimeseriesIndex>(frame->index) ||
index_segment_reader.mutable_tsd().mutable_proto().stream_descriptor().sorted() == arcticdb::proto::descriptors::SortedValue::ASCENDING,
"When calling append with validate_index enabled, the existing data must be sorted");
}

folly::Future<AtomKey> async_append_impl(
Expand All @@ -147,7 +145,9 @@ folly::Future<AtomKey> async_append_impl(
bool bucketize_dynamic = index_segment_reader.bucketize_dynamic();
auto row_offset = index_segment_reader.tsd().proto().total_rows();
util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data");
sorted_data_check_append(*frame, index_segment_reader, validate_index);
if (validate_index) {
sorted_data_check_append(frame, index_segment_reader);
}
frame->set_offset(static_cast<ssize_t>(row_offset));
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame);

Expand Down Expand Up @@ -1163,8 +1163,11 @@ VersionedItem sort_merge_impl(
pipeline_context->version_id_ = update_info.next_version_id_;
ReadQuery read_query;

if(append && update_info.previous_index_key_.has_value())
std::optional<SortedValue> previous_sorted_value;
if(append && update_info.previous_index_key_.has_value()) {
read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, ReadOptions{});
previous_sorted_value.emplace(pipeline_context->desc_->get_sorted());
}

auto num_versioned_rows = pipeline_context->total_rows_;

Expand Down Expand Up @@ -1215,6 +1218,7 @@ VersionedItem sort_merge_impl(
sk->unset_segment();
}
aggregator.commit();
pipeline_context->desc_->set_sorted(deduce_sorted(previous_sorted_value.value_or(SortedValue::ASCENDING), SortedValue::ASCENDING));
},
[&](const auto &) {
util::raise_rte("Sort merge only supports datetime indexed data. You data does not have a datetime index.");
Expand Down Expand Up @@ -1253,8 +1257,11 @@ VersionedItem compact_incomplete_impl(
read_options.set_dynamic_schema(true);

std::optional<SegmentInMemory> last_indexed;
if(append && update_info.previous_index_key_.has_value())
std::optional<SortedValue> previous_sorted_value;
if(append && update_info.previous_index_key_.has_value()) {
read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, read_options);
previous_sorted_value.emplace(pipeline_context->desc_->get_sorted());
}

auto prev_size = pipeline_context->slice_and_keys_.size();
read_incompletes_to_pipeline(store, pipeline_context, ReadQuery{}, ReadOptions{}, convert_int_to_float, via_iteration, sparsify);
Expand All @@ -1273,12 +1280,12 @@ VersionedItem compact_incomplete_impl(
std::vector<FrameSlice> slices;
bool dynamic_schema = write_options.dynamic_schema;
auto index = index_type_from_descriptor(first_seg.descriptor());
auto policies = std::make_tuple(index,
auto policies = std::make_tuple(index,
dynamic_schema ? VariantSchema{DynamicSchema::default_schema(index)} : VariantSchema{FixedSchema::default_schema(index)},
sparsify ? VariantColumnPolicy{SparseColumnPolicy{}} : VariantColumnPolicy{DenseColumnPolicy{}}
);
util::variant_match(std::move(policies), [
&fut_vec, &slices, pipeline_context=pipeline_context, &store, convert_int_to_float] (auto &&idx, auto &&schema, auto &&column_policy) {
&fut_vec, &slices, pipeline_context=pipeline_context, &store, convert_int_to_float, &previous_sorted_value] (auto &&idx, auto &&schema, auto &&column_policy) {
using IndexType = std::remove_reference_t<decltype(idx)>;
using SchemaType = std::remove_reference_t<decltype(schema)>;
using ColumnPolicyType = std::remove_reference_t<decltype(column_policy)>;
Expand All @@ -1291,6 +1298,9 @@ VersionedItem compact_incomplete_impl(
store,
convert_int_to_float,
std::nullopt);
if constexpr(std::is_same_v<IndexType, TimeseriesIndex>) {
pipeline_context->desc_->set_sorted(deduce_sorted(previous_sorted_value.value_or(SortedValue::ASCENDING), SortedValue::ASCENDING));
}
});

auto keys = folly::collect(fut_vec).get();
Expand All @@ -1300,8 +1310,7 @@ VersionedItem compact_incomplete_impl(
slices,
keys,
pipeline_context->incompletes_after(),
user_meta
);
user_meta);


store->remove_keys(delete_keys).get();
Expand Down
4 changes: 4 additions & 0 deletions python/arcticdb/version_store/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ def __getitem__(self, item):
def tsloc(self):
return TimeFrame._TsLocProxy(self)

@property
def issorted(self):
return np.all(self.times[:-1] <= self.times[1:])

def __eq__(self, other):
if other is None:
return False
Expand Down
2 changes: 1 addition & 1 deletion python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col
index_values=ix_vals,
column_names=columns_names,
columns_values=columns_vals,
sorted=_SortedValue.UNKNOWN,
sorted=_SortedValue.ASCENDING if item.issorted else _SortedValue.UNSORTED,
),
metadata=norm_meta,
)
Expand Down
2 changes: 2 additions & 0 deletions python/tests/hypothesis/arcticdb/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ def run_test(
segment_details, merged_segment_row_size
) != get_no_of_column_merged_segments(segment_details):
seg_details_before_compaction = lib.read_index(sym)
assert lib.get_info(sym)["sorted"] == "ASCENDING"
lib.defragment_symbol_data(sym, None)
assert lib.get_info(sym)["sorted"] == "ASCENDING"
res = lib.read(sym).data
res = res.reindex(sorted(list(res.columns)), axis=1)
res = res.replace("", 0.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_update_date_range_non_pandas_dataframe(basic_store_custom_norm, with_ti
df = pd.DataFrame(index=dtidx, data={"a": [1, 2, 3, 4, 5]})
version_store.write("sym_1", CustomTimeseries(df, with_timezone_attr=with_timezone_attr, timezone_=timezone_))
info = version_store.get_info("sym_1")
assert info["sorted"] == "UNKNOWN"
assert info["sorted"] == "ASCENDING"

dtidx = pd.date_range("2022-05-01", "2022-06-10")
a = np.arange(dtidx.shape[0]).astype(np.int64)
Expand All @@ -129,7 +129,7 @@ def test_update_date_range_non_pandas_dataframe(basic_store_custom_norm, with_ti
date_range=(datetime(2022, 6, 2), datetime(2022, 6, 4)),
)
info = version_store.get_info("sym_1")
assert info["sorted"] == "UNKNOWN"
assert info["sorted"] == "ASCENDING"

# then
result = version_store.read("sym_1").data
Expand All @@ -149,7 +149,7 @@ def test_append_date_range_non_pandas_dataframe(basic_store_custom_norm, with_ti
df = pd.DataFrame(index=dtidx, data={"a": [1, 2, 3, 4, 5]})
version_store.write("sym_1", CustomTimeseries(df, with_timezone_attr=with_timezone_attr, timezone_=timezone_))
info = version_store.get_info("sym_1")
assert info["sorted"] == "UNKNOWN"
assert info["sorted"] == "ASCENDING"

dtidx = pd.date_range("2022-06-05", "2022-06-10")
a = np.arange(dtidx.shape[0]).astype(np.int64)
Expand All @@ -162,7 +162,7 @@ def test_append_date_range_non_pandas_dataframe(basic_store_custom_norm, with_ti
date_range=(datetime(2022, 6, 2), datetime(2022, 6, 4)),
)
info = version_store.get_info("sym_1")
assert info["sorted"] == "UNKNOWN"
assert info["sorted"] == "ASCENDING"

# then
result = version_store.read("sym_1").data
Expand Down
4 changes: 4 additions & 0 deletions python/tests/unit/arcticdb/version_store/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,9 @@ def test_append_with_cont_mem_problem(sym, lmdb_version_store_tiny_segment_dynam
lib.append(sym, df1).version
lib.append(sym, df2).version
lib.append(sym, df3).version
assert lib.get_info(sym)["sorted"] == "ASCENDING"
lib.version_store.defragment_symbol_data(sym, None)
assert lib.get_info(sym)["sorted"] == "ASCENDING"
res = lib.read(sym).data
assert_frame_equal(df, res)

Expand Down Expand Up @@ -530,7 +532,9 @@ def test_defragment_read_prev_versions(sym, lmdb_version_store):
assert_frame_equal(lmdb_version_store.read(sym, as_of=version_id).data, expected_df)

assert lmdb_version_store.is_symbol_fragmented(sym)
assert lmdb_version_store.get_info(sym)["sorted"] == "ASCENDING"
versioned_item = lmdb_version_store.defragment_symbol_data(sym)
assert lmdb_version_store.get_info(sym)["sorted"] == "ASCENDING"
assert versioned_item.version == 101
assert len(lmdb_version_store.list_versions(sym)) == 102

Expand Down
19 changes: 0 additions & 19 deletions python/tests/unit/arcticdb/version_store/test_compaction.py

This file was deleted.

Loading

0 comments on commit 94670ee

Please sign in to comment.