Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Construct RelationalSchema with IDatasetStream #346

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions src/core/algorithms/cfd/cfd_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ void CFDDiscovery::RegisterOptions() {
DESBORDANTE_OPTION_USING;

RegisterOption(config::kTableOpt(&input_table_));
RegisterOption(Option{&columns_number_, kCfdColumnsNumber, kDCfdColumnsNumber, 0u});
RegisterOption(Option{&tuples_number_, kCfdTuplesNumber, kDCfdTuplesNumber, 0u});
RegisterOption(Option{&columns_number_, kCfdColumnsNumber, kDCfdColumnsNumber, 0ul});
RegisterOption(Option{&tuples_number_, kCfdTuplesNumber, kDCfdTuplesNumber, 0ul});
}

int CFDDiscovery::NrCfds() const {
Expand Down
4 changes: 2 additions & 2 deletions src/core/algorithms/cfd/cfd_discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class CFDDiscovery : public Algorithm {
protected:
config::InputTable input_table_;

unsigned columns_number_;
unsigned tuples_number_;
size_t columns_number_;
size_t tuples_number_;
BUYT-1 marked this conversation as resolved.
Show resolved Hide resolved
ItemsetCFDList cfd_list_;
std::shared_ptr<CFDRelationData> relation_;

Expand Down
33 changes: 16 additions & 17 deletions src/core/algorithms/cfd/model/cfd_relation_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void CFDRelationData::AddNewItemsInFullTable(ItemDictionary& item_dictionary,
std::vector<std::string> const& string_row,
std::vector<int>& int_row,
std::vector<Transaction>& data_rows,
int& unique_elems_number, unsigned num_columns) {
int& unique_elems_number, size_t num_columns) {
int it;
for (size_t i = 0; i < num_columns; i++) {
auto ptr = item_dictionary.find(std::make_pair(i, string_row[i]));
Expand All @@ -42,24 +42,29 @@ void CFDRelationData::AddNewItemsInFullTable(ItemDictionary& item_dictionary,
}

std::unique_ptr<CFDRelationData> CFDRelationData::CreateFrom(model::IDatasetStream& parser,
unsigned columns_number,
unsigned tuples_number,
double c_sample, double r_sample) {
size_t columns_number,
size_t tuples_number, double c_sample,
double r_sample) {
if (columns_number == 0 || tuples_number == 0) {
return CFDRelationData::CreateFrom(parser, c_sample, r_sample);
}

size_t const num_columns = std::min(parser.GetNumberOfColumns(), columns_number);
std::vector<std::string> column_names;
column_names.reserve(num_columns);
for (AttributeIndex i = 0; static_cast<size_t>(i) < num_columns; ++i) {
column_names.push_back(parser.GetColumnName(i));
}
// Fields of CFDRelationData class
auto schema = std::make_unique<RelationalSchema>(parser.GetRelationName());
auto schema =
std::make_unique<RelationalSchema>(parser.GetRelationName(), std::move(column_names));
std::vector<Transaction> data_rows;
ItemDictionary item_dictionary;
std::vector<ItemInfo> items;
ColumnesValuesDict columns_values_dict;
int unique_elems_number = 1;

unsigned num_columns = parser.GetNumberOfColumns();
std::vector<std::string> line;
num_columns = std::min(num_columns, columns_number);
std::vector<std::string> string_row(num_columns);
while (parser.HasNextRow() && data_rows.size() < tuples_number) {
line = parser.GetNextRow();
Expand All @@ -74,11 +79,8 @@ std::unique_ptr<CFDRelationData> CFDRelationData::CreateFrom(model::IDatasetStre

std::vector<CFDColumnData> column_data;
for (AttributeIndex i = 0; static_cast<size_t>(i) < num_columns; ++i) {
auto column = Column(schema.get(), parser.GetColumnName(i), i);
schema->AppendColumn(std::move(column));
column_data.emplace_back(schema->GetColumn(i), columns_values_dict[i]);
column_data.emplace_back(&schema->GetColumn(i), columns_values_dict[i]);
}
schema->Init();

return std::make_unique<CFDRelationData>(std::move(schema), std::move(column_data),
std::move(data_rows), std::move(item_dictionary),
Expand Down Expand Up @@ -120,7 +122,7 @@ void CFDRelationData::AddNewItemsInPartialTable(ItemDictionary& item_dictionary,
std::unique_ptr<CFDRelationData> CFDRelationData::CreateFrom(model::IDatasetStream& file_input,
double c_sample, double r_sample) {
// Fields of CFDRelationData class
auto schema = std::make_unique<RelationalSchema>(file_input.GetRelationName());
auto schema = RelationalSchema::CreateFrom(file_input);
std::vector<Transaction> data_rows;
ItemDictionary item_dictionary;
std::vector<ItemInfo> items;
Expand Down Expand Up @@ -149,11 +151,8 @@ std::unique_ptr<CFDRelationData> CFDRelationData::CreateFrom(model::IDatasetStre

std::vector<CFDColumnData> column_data;
for (AttributeIndex i = 0; i < num_columns; ++i) {
auto column = Column(schema.get(), file_input.GetColumnName(i), i);
schema->AppendColumn(std::move(column));
column_data.emplace_back(schema->GetColumn(i), columns_values_dict[i]);
column_data.emplace_back(&schema->GetColumn(i), columns_values_dict[i]);
}
schema->Init();
return std::make_unique<CFDRelationData>(std::move(schema), std::move(column_data),
std::move(data_rows), std::move(item_dictionary),
std::move(items));
Expand Down Expand Up @@ -194,7 +193,7 @@ std::vector<int> const& CFDRelationData::GetDomain(unsigned attr) const {
}

std::string CFDRelationData::GetAttrName(int index) const {
return GetSchema()->GetColumn(index)->GetName();
return GetSchema()->GetColumn(index).GetName();
}

int CFDRelationData::GetAttr(std::string const& s) const {
Expand Down
7 changes: 3 additions & 4 deletions src/core/algorithms/cfd/model/cfd_relation_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CFDRelationData : public AbstractRelationData<CFDColumnData> {
static void AddNewItemsInFullTable(ItemDictionary &, ColumnesValuesDict &,
std::vector<ItemInfo> &, std::vector<std::string> const &,
std::vector<int> &, std::vector<Transaction> &, int &,
unsigned);
size_t);

static void AddNewItemsInPartialTable(ItemDictionary &, ColumnesValuesDict &,
std::vector<ItemInfo> &, std::vector<std::string> const &,
Expand Down Expand Up @@ -74,9 +74,8 @@ class CFDRelationData : public AbstractRelationData<CFDColumnData> {
static std::unique_ptr<CFDRelationData> CreateFrom(model::IDatasetStream &file_input,
double c_sample = 1, double r_sample = 1);
static std::unique_ptr<CFDRelationData> CreateFrom(model::IDatasetStream &file_input,
unsigned columns_number,
unsigned tuples_number, double c_sample = 1,
double r_sample = 1);
size_t columns_number, size_t tuples_number,
double c_sample = 1, double r_sample = 1);

CFDRelationData(std::unique_ptr<RelationalSchema> schema,
std::vector<CFDColumnData> column_data, std::vector<Transaction> data,
Expand Down
11 changes: 3 additions & 8 deletions src/core/algorithms/fd/aidfd/aid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ void Aid::LoadDataInternal() {
throw std::runtime_error("Unable to work on an empty dataset.");
}

schema_ = std::make_unique<RelationalSchema>(input_table_->GetRelationName());

for (size_t i = 0; i < number_of_attributes_; ++i) {
std::string const& column_name = input_table_->GetColumnName(static_cast<int>(i));
schema_->AppendColumn(column_name);
}
schema_ = RelationalSchema::CreateFrom(*input_table_);

while (input_table_->HasNextRow()) {
std::vector<std::string> const& next_line = input_table_->GetNextRow();
Expand Down Expand Up @@ -160,7 +155,7 @@ void Aid::HandleConstantColumns(boost::dynamic_bitset<>& attributes) {
attr_num != boost::dynamic_bitset<>::npos;
attr_num = constant_columns_.find_next(attr_num)) {
attributes[attr_num] = false;
Column rhs = *schema_->GetColumn(attr_num);
Column rhs = schema_->GetColumn(attr_num);
RegisterFd(lhs, rhs);
}
}
Expand Down Expand Up @@ -243,7 +238,7 @@ void Aid::InvertNegativeCover() {

void Aid::RegisterFDs(size_t rhs_attribute,
std::vector<boost::dynamic_bitset<>> const& list_of_lhs_attributes) {
Column rhs = *schema_->GetColumn(rhs_attribute);
Column rhs = schema_->GetColumn(rhs_attribute);
for (auto const& lhs_attributes : list_of_lhs_attributes) {
Vertical lhs = schema_->GetVertical(lhs_attributes);
RegisterFd(lhs, rhs);
Expand Down
17 changes: 8 additions & 9 deletions src/core/algorithms/fd/depminer/depminer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ std::vector<CMAXSet> Depminer::GenerateCmaxSets(std::unordered_set<Vertical> con
std::vector<CMAXSet> c_max_cets;

for (auto const& column : this->schema_->GetColumns()) {
CMAXSet result(*column);
CMAXSet result(column);

// finding all sets, which doesn't contain column
for (auto const& ag : agree_sets) {
if (!ag.Contains(*column)) {
if (!ag.Contains(column)) {
result.AddCombination(ag);
}
}
Expand Down Expand Up @@ -110,16 +110,15 @@ std::vector<CMAXSet> Depminer::GenerateCmaxSets(std::unordered_set<Vertical> con
return c_max_cets;
}

void Depminer::LhsForColumn(std::unique_ptr<Column> const& column,
std::vector<CMAXSet> const& c_max_cets) {
void Depminer::LhsForColumn(Column const& column, std::vector<CMAXSet> const& c_max_cets) {
std::unordered_set<Vertical> level;
// 3
CMAXSet correct = GenFirstLevel(c_max_cets, *column, level);
CMAXSet correct = GenFirstLevel(c_max_cets, column, level);

auto const pli = relation_->GetColumnData(column->GetIndex()).GetPositionListIndex();
auto const pli = relation_->GetColumnData(column.GetIndex()).GetPositionListIndex();
bool column_contains_only_equal_values = pli->IsConstant();
if (column_contains_only_equal_values) {
RegisterFd(Vertical(), *column);
RegisterFd(Vertical(), column);
return;
}

Expand All @@ -137,8 +136,8 @@ void Depminer::LhsForColumn(std::unique_ptr<Column> const& column,
}
// 6
if (is_fd) {
if (!l.Contains(*column)) {
this->RegisterFd(l, *column);
if (!l.Contains(column)) {
this->RegisterFd(l, column);
}
level_copy.erase(l);
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/algorithms/fd/depminer/depminer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Depminer : public PliBasedFDAlgorithm {
std::unordered_set<Vertical> const& prev_level);
static bool CheckJoin(Vertical const& _p, Vertical const& _q);

void LhsForColumn(std::unique_ptr<Column> const& column, std::vector<CMAXSet> const& cmax_sets);
void LhsForColumn(Column const& column, std::vector<CMAXSet> const& cmax_sets);
std::vector<CMAXSet> GenerateCmaxSets(std::unordered_set<Vertical> const& agree_sets);

double progress_step_ = 0;
Expand Down
16 changes: 8 additions & 8 deletions src/core/algorithms/fd/dfd/dfd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ unsigned long long DFD::ExecuteInternal() {

// search for unique columns
for (auto const& column : schema->GetColumns()) {
ColumnData& column_data = relation_->GetColumnData(column->GetIndex());
ColumnData& column_data = relation_->GetColumnData(column.GetIndex());
model::PositionListIndex const* const column_pli = column_data.GetPositionListIndex();

if (column_pli->AllValuesAreUnique()) {
Vertical const lhs = Vertical(*column);
Vertical const lhs = Vertical(column);
unique_columns_.push_back(lhs);
// we do not register an FD at once, because we check for FDs with empty LHS later
}
Expand All @@ -51,28 +51,28 @@ unsigned long long DFD::ExecuteInternal() {
double progress_step = 100.0 / schema->GetNumColumns();
boost::asio::thread_pool search_space_pool(number_of_threads_);

for (auto& rhs : schema->GetColumns()) {
for (auto const& rhs : schema->GetColumns()) {
boost::asio::post(
search_space_pool, [this, &rhs, schema, progress_step, &partition_storage]() {
ColumnData const& rhs_data = relation_->GetColumnData(rhs->GetIndex());
ColumnData const& rhs_data = relation_->GetColumnData(rhs.GetIndex());
model::PositionListIndex const* const rhs_pli = rhs_data.GetPositionListIndex();

/* if all the rows have the same value, then we register FD with empty LHS
* if we have minimal FD like []->RHS, it is impossible to find smaller FD with
* this RHS, so we register it and move to the next RHS
* */
if (rhs_pli->GetNepAsLong() == relation_->GetNumTuplePairs()) {
RegisterFd(*(schema->empty_vertical_), *rhs);
RegisterFd(*(schema->empty_vertical_), rhs);
AddProgress(progress_step);
return;
}

auto search_space = LatticeTraversal(rhs.get(), relation_.get(),
unique_columns_, partition_storage.get());
auto search_space = LatticeTraversal(&rhs, relation_.get(), unique_columns_,
partition_storage.get());
auto const minimal_deps = search_space.FindLHSs();

for (auto const& minimal_dependency_lhs : minimal_deps) {
RegisterFd(minimal_dependency_lhs, *rhs);
RegisterFd(minimal_dependency_lhs, rhs);
}
AddProgress(progress_step);
LOG(INFO) << static_cast<int>(GetProgress().second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ NodeCategory LatticeObservations::UpdateNonDependencyCategory(Vertical const& no

for (size_t index = column_indices.find_first(); index < column_indices.size();
index = column_indices.find_next(index)) {
auto const superset_node_iter = this->find(node.Union(*node.GetSchema()->GetColumn(index)));
auto const superset_node_iter = this->find(node.Union(node.GetSchema()->GetColumn(index)));

if (superset_node_iter == this->end()) {
// if we found unchecked superset of this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ std::unordered_set<Vertical> LatticeTraversal::FindLHSs() {
for (unsigned partition_index :
column_order_.GetOrderHighDistinctCount(Vertical(*rhs_).Invert())) {
if (partition_index != rhs_->GetIndex()) {
seeds.push(Vertical(*schema->GetColumn(partition_index)));
seeds.push(Vertical(schema->GetColumn(partition_index)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once

#include <list>
#include <random>
#include <stack>
#include <vector>

#include "../column_order/column_order.h"
#include "../lattice_observations/lattice_observations.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ PartitionStorage::PartitionStorage(ColumnLayoutRelationData* relation_data,
relation_data->GetSchema())),
caching_method_(caching_method),
eviction_method_(eviction_method) {
for (auto& column_ptr : relation_data->GetSchema()->GetColumns()) {
index_->Put(static_cast<Vertical>(*column_ptr),
relation_data->GetColumnData(column_ptr->GetIndex()).GetPliOwnership());
for (auto const& column : relation_data->GetSchema()->GetColumns()) {
index_->Put(static_cast<Vertical>(column),
relation_data->GetColumnData(column.GetIndex()).GetPliOwnership());
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/algorithms/fd/dfd/pruning_maps/pruning_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

PruningMap::PruningMap(RelationalSchema const* schema) {
for (auto const& column : schema->GetColumns()) {
this->insert(std::make_pair(Vertical(*column), std::unordered_set<Vertical>()));
this->insert(std::make_pair(Vertical(column), std::unordered_set<Vertical>()));
}
}

Expand Down Expand Up @@ -32,7 +32,7 @@ void PruningMap::RebalanceGroup(Vertical const& key) {
for (size_t column_index = inverted_columns.find_first();
column_index < inverted_columns.size();
column_index = inverted_columns.find_next(column_index)) {
Vertical new_key = key.Union(*key.GetSchema()->GetColumn(column_index));
Vertical new_key = key.Union(key.GetSchema()->GetColumn(column_index));
std::unordered_set<Vertical> new_group;

for (auto const& dep_of_group : deps_of_group) {
Expand Down
22 changes: 11 additions & 11 deletions src/core/algorithms/fd/fastfds/fastfds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ unsigned long long FastFDs::ExecuteInternal() {
return elapsed_milliseconds.count();
}

auto task = [this](std::unique_ptr<Column> const& column) {
if (ColumnContainsOnlyEqualValues(*column)) {
auto task = [this](Column const& column) {
if (ColumnContainsOnlyEqualValues(column)) {
LOG(DEBUG) << "Registered FD: " << schema_->empty_vertical_->ToString() << "->"
<< column->ToString();
RegisterFd(Vertical(), *column);
<< column.ToString();
RegisterFd(Vertical(), column);
return;
}

vector<DiffSet> diff_sets_mod = GetDiffSetsMod(*column);
vector<DiffSet> diff_sets_mod = GetDiffSetsMod(column);
assert(!diff_sets_mod.empty());
if (!(diff_sets_mod.size() == 1 && diff_sets_mod.back() == *schema_->empty_vertical_)) {
set<Column, OrderingComparator> init_ordering = GetInitOrdering(diff_sets_mod, *column);
FindCovers(*column, diff_sets_mod, diff_sets_mod, *schema_->empty_vertical_,
set<Column, OrderingComparator> init_ordering = GetInitOrdering(diff_sets_mod, column);
FindCovers(column, diff_sets_mod, diff_sets_mod, *schema_->empty_vertical_,
init_ordering);
} else {
AddProgress(percent_per_col_);
Expand All @@ -78,13 +78,13 @@ unsigned long long FastFDs::ExecuteInternal() {
if (threads_num_ > 1) {
boost::asio::thread_pool pool(threads_num_);

for (std::unique_ptr<Column> const& column : schema_->GetColumns()) {
for (Column const& column : schema_->GetColumns()) {
boost::asio::post(pool, [&column, task]() { return task(column); });
}

pool.join();
} else {
for (std::unique_ptr<Column> const& column : schema_->GetColumns()) {
for (Column const& column : schema_->GetColumns()) {
task(column);
}
}
Expand Down Expand Up @@ -195,8 +195,8 @@ set<Column, FastFDs::OrderingComparator> FastFDs::GetInitOrdering(vector<DiffSet
set<Column, OrderingComparator> ordering(ordering_comp);

for (auto const& col : schema_->GetColumns()) {
if (*col != attribute) {
ordering.insert(*col);
if (col != attribute) {
ordering.insert(col);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/core/algorithms/fd/fd_algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "config/max_lhs/option.h"
#include "model/table/relational_schema.h"

namespace algos {

Expand Down
4 changes: 2 additions & 2 deletions src/core/algorithms/fd/fd_mine/fd_mine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ void FdMine::Display() {
}
Vertical lhs_vertical(schema_, lhs);
LOG(DEBUG) << "Discovered FD: " << lhs_vertical.ToString() << " -> "
<< schema_->GetColumn(j)->GetName();
RegisterFd(std::move(lhs_vertical), *schema_->GetColumn(j));
<< schema_->GetColumn(j).GetName();
RegisterFd(std::move(lhs_vertical), schema_->GetColumn(j));
fd_counter++;
}
}
Expand Down
Loading
Loading