Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(C++): filter property and return VerticesCollection #658

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,63 @@ void vertices_collection(
std::cout << property << " ";
std::cout << std::endl;
}
}
std::cout << std::endl;

std::cout << "Test vertices with property in a filtered vertices set"
<< std::endl;
std::cout << "--------------------------------------" << std::endl;
auto filter = graphar::_Equal(graphar::_Property("name"),
graphar::_Literal("Safi_Airways"));
auto maybe_filter_vertices_collection_4 =
graphar::VerticesCollection::verticesWithProperty(
std::string("name"), filter, graph_info, type);
ASSERT(!maybe_filter_vertices_collection_4.has_error());
auto filter_vertices_4 = maybe_filter_vertices_collection_4.value();
std::cout << "valid vertices num: " << filter_vertices_4->size() << std::endl;

for (auto it = filter_vertices_4->begin(); it != filter_vertices_4->end();
++it) {
// get a node's all labels
auto label_result = it.label();
std::cout << "id: " << it.id() << " ";
if (!label_result.has_error()) {
for (auto label : label_result.value()) {
std::cout << label << " ";
}
}
std::cout << "name: ";
auto property = it.property<std::string>("name").value();
std::cout << property << " ";
std::cout << std::endl;
}

std::cout << "Test vertices with property" << std::endl;
std::cout << "--------------------------------------" << std::endl;
auto filter_2 =
graphar::_Equal(graphar::_Property("name"), graphar::_Literal("Kam_Air"));
auto maybe_filter_vertices_collection_5 =
graphar::VerticesCollection::verticesWithProperty(
std::string("name"), filter_2, filter_vertices_3);
ASSERT(!maybe_filter_vertices_collection_5.has_error());
auto filter_vertices_5 = maybe_filter_vertices_collection_5.value();
std::cout << "valid vertices num: " << filter_vertices_5->size() << std::endl;

for (auto it = filter_vertices_5->begin(); it != filter_vertices_5->end();
++it) {
// get a node's all labels
auto label_result = it.label();
std::cout << "id: " << it.id() << " ";
if (!label_result.has_error()) {
for (auto label : label_result.value()) {
std::cout << label << " ";
}
}
std::cout << "name: ";
auto property = it.property<std::string>("name").value();
std::cout << property << " ";
std::cout << std::endl;
}
}
int main(int argc, char* argv[]) {
// read file and construct graph info
std::string path = GetTestingResourceRoot() + "/ldbc/parquet/ldbc.graph.yml";
Expand Down
108 changes: 106 additions & 2 deletions cpp/src/graphar/high-level/graph_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
* under the License.
*/

#include "graphar/high-level/graph_reader.h"
#include <algorithm>
#include <unordered_set>

#include "arrow/array.h"
#include "graphar/api/arrow_reader.h"
#include "graphar/convert_to_arrow_type.h"
#include "graphar/high-level/graph_reader.h"
#include "graphar/label.h"
#include "graphar/types.h"

Expand Down Expand Up @@ -264,6 +263,69 @@ Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
return indices64;
}

Result<std::vector<IdType>> VerticesCollection::filter(
std::string property_name, std::shared_ptr<Expression> filter_expression,
std::vector<IdType>* new_valid_chunk) {
std::vector<int> indices;
const int TOT_ROWS_NUM = vertex_num_;
const int CHUNK_SIZE = vertex_info_->GetChunkSize();
int total_count = 0;
auto property_group = vertex_info_->GetPropertyGroup(property_name);
auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
vertex_info_, property_group, prefix_, {});
auto filter_reader = maybe_filter_reader.value();
filter_reader->Filter(filter_expression);
std::vector<int64_t> indices64;
if (is_filtered_) {
for (int chunk_idx : valid_chunk_) {
// how to itetate valid_chunk_?
filter_reader->seek(chunk_idx * CHUNK_SIZE);
auto filter_result = filter_reader->GetChunk();
auto filter_table = filter_result.value();
int count = filter_table->num_rows();
if (count != 0 && new_valid_chunk != nullptr) {
new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
// TODO(elssky): record indices
int kVertexIndexCol = filter_table->schema()->GetFieldIndex(
GeneralParams::kVertexIndexCol);
auto column_array = filter_table->column(kVertexIndexCol)->chunk(0);
auto int64_array =
std::static_pointer_cast<arrow::Int64Array>(column_array);
for (int64_t i = 0; i < int64_array->length(); ++i) {
if (!int64_array->IsNull(i)) {
indices64.push_back(int64_array->Value(i));
}
}
}
}
} else {
for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
++chunk_idx) {
auto filter_result = filter_reader->GetChunk();
auto filter_table = filter_result.value();
int count = filter_table->num_rows();
filter_reader->next_chunk();
total_count += count;
if (count != 0) {
valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
// TODO(elssky): record indices
int kVertexIndexCol = filter_table->schema()->GetFieldIndex(
GeneralParams::kVertexIndexCol);
auto column_array = filter_table->column(kVertexIndexCol)->chunk(0);
auto int64_array =
std::static_pointer_cast<arrow::Int64Array>(column_array);
for (int64_t i = 0; i < int64_array->length(); ++i) {
if (!int64_array->IsNull(i)) {
indices64.push_back(int64_array->Value(i));
}
}
}
}
}
// std::cout << "Total valid count: " << total_count << std::endl;
return indices64;
}

Result<std::shared_ptr<VerticesCollection>>
VerticesCollection::verticesWithLabel(
const std::string& filter_label,
Expand Down Expand Up @@ -384,6 +446,48 @@ VerticesCollection::verticesWithMultipleLabels(
return new_vertices_collection;
}

Result<std::shared_ptr<VerticesCollection>>
VerticesCollection::verticesWithProperty(
const std::string property_name, const graphar::util::Filter filter,
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
auto prefix = graph_info->GetPrefix();
auto vertex_info = graph_info->GetVertexInfo(type);
auto vertices_collection =
std::make_shared<VerticesCollection>(vertex_info, prefix);
vertices_collection->filtered_ids_ =
vertices_collection->filter(property_name, filter).value();
vertices_collection->is_filtered_ = true;
return vertices_collection;
}

Result<std::shared_ptr<VerticesCollection>>
VerticesCollection::verticesWithProperty(
const std::string property_name, const graphar::util::Filter filter,
const std::shared_ptr<VerticesCollection>& vertices_collection) {
auto new_vertices_collection = std::make_shared<VerticesCollection>(
vertices_collection->vertex_info_, vertices_collection->prefix_);
auto filtered_ids = vertices_collection
->filter(property_name, filter,
&new_vertices_collection->valid_chunk_)
.value();
if (vertices_collection->is_filtered_) {
std::unordered_set<IdType> origin_set(
vertices_collection->filtered_ids_.begin(),
vertices_collection->filtered_ids_.end());
std::unordered_set<int> intersection;
for (int num : filtered_ids) {
if (origin_set.count(num)) {
intersection.insert(num);
}
}
filtered_ids =
std::vector<IdType>(intersection.begin(), intersection.end());
new_vertices_collection->is_filtered_ = true;
}
new_vertices_collection->filtered_ids_ = filtered_ids;
return new_vertices_collection;
}

template <typename T>
Result<T> Vertex::property(const std::string& property) const {
if constexpr (std::is_final<T>::value) {
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/graphar/high-level/graph_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ class VerticesCollection {
Result<std::vector<IdType>> filter_by_acero(
std::vector<std::string> filter_labels) const;

Result<std::vector<IdType>> filter(
std::string property_name, std::shared_ptr<Expression> filter_expression,
std::vector<IdType>* new_valid_chunk = nullptr);

/**
* @brief Query vertices with a specific label
*
Expand Down Expand Up @@ -431,6 +435,14 @@ class VerticesCollection {
const std::vector<std::string>& filter_labels,
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);

static Result<std::shared_ptr<VerticesCollection>> verticesWithProperty(
const std::string property_name, const graphar::util::Filter filter,
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);

static Result<std::shared_ptr<VerticesCollection>> verticesWithProperty(
const std::string property_name, const graphar::util::Filter filter,
const std::shared_ptr<VerticesCollection>& vertices_collection);

/**
* @brief Query vertices with multiple labels within a given collection
*
Expand Down
Loading