diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d162392b8..cd12d486c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -458,6 +458,7 @@ if (BUILD_TESTS) add_test(test_chunk_info_reader SRCS test/test_chunk_info_reader.cc) add_test(test_arrow_chunk_reader SRCS test/test_arrow_chunk_reader.cc) add_test(test_graph SRCS test/test_graph.cc) + add_test(test_multi_label SRCS test/test_multi_label.cc) # enable_testing() endif() diff --git a/cpp/examples/bgl_example.cc b/cpp/examples/bgl_example.cc index d8231c6f4..6cbb574d0 100644 --- a/cpp/examples/bgl_example.cc +++ b/cpp/examples/bgl_example.cc @@ -112,7 +112,7 @@ int main(int argc, char* argv[]) { int chunk_size = 100; auto version = graphar::InfoVersion::Parse("gar/v1").value(); auto new_info = graphar::CreateVertexInfo(vertex_type, chunk_size, {group}, - vertex_prefix, version); + {}, vertex_prefix, version); // dump new vertex info ASSERT(new_info->IsValidated()); ASSERT(new_info->Dump().status().ok()); diff --git a/cpp/examples/construct_info_example.cc b/cpp/examples/construct_info_example.cc index d5b390b7d..b9c0a2761 100644 --- a/cpp/examples/construct_info_example.cc +++ b/cpp/examples/construct_info_example.cc @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) { graphar::CreatePropertyGroup(property_vector_2, graphar::FileType::ORC); // create vertex info - auto vertex_info = graphar::CreateVertexInfo(type, chunk_size, {group1}, + auto vertex_info = graphar::CreateVertexInfo(type, chunk_size, {group1}, {}, vertex_prefix, version); ASSERT(vertex_info != nullptr); @@ -150,7 +150,7 @@ int main(int argc, char* argv[]) { // create graph info auto graph_info = graphar::CreateGraphInfo(name, {vertex_info}, {edge_info}, - prefix, version); + {}, prefix, version); ASSERT(graph_info->GetName() == name); ASSERT(graph_info->GetPrefix() == prefix); ASSERT(graph_info->GetVertexInfos().size() == 1); diff --git a/cpp/examples/snap_dataset_to_graphar.cc b/cpp/examples/snap_dataset_to_graphar.cc index 29bc442a1..ad74cb115 100644 --- a/cpp/examples/snap_dataset_to_graphar.cc +++ b/cpp/examples/snap_dataset_to_graphar.cc @@ -49,7 +49,7 @@ int main(int argc, char* argv[]) { std::string type = "node", vertex_prefix = "vertex/node/"; // create vertex info - auto vertex_info = graphar::CreateVertexInfo(type, VERTEX_CHUNK_SIZE, {}, + auto vertex_info = graphar::CreateVertexInfo(type, VERTEX_CHUNK_SIZE, {}, {}, vertex_prefix, version); // save & dump @@ -75,8 +75,8 @@ int main(int argc, char* argv[]) { /*------------------construct graph info------------------*/ // create graph info - auto graph_info = graphar::CreateGraphInfo(graph_name, {vertex_info}, - {edge_info}, save_path, version); + auto graph_info = graphar::CreateGraphInfo( + graph_name, {vertex_info}, {edge_info}, {}, save_path, version); // save & dump ASSERT(!graph_info->Dump().has_error()); ASSERT(graph_info->Save(save_path + graph_name + ".graph.yml").ok()); diff --git a/cpp/src/graphar/arrow/chunk_reader.cc b/cpp/src/graphar/arrow/chunk_reader.cc index f4add8a40..e38b324c7 100644 --- a/cpp/src/graphar/arrow/chunk_reader.cc +++ b/cpp/src/graphar/arrow/chunk_reader.cc @@ -51,6 +51,18 @@ Result> PropertyGroupToSchema( return arrow::schema(fields); } +Result> LabelToSchema( + std::vector labels, bool contain_index_column = false) { + std::vector> fields; + if (contain_index_column) { + fields.push_back(std::make_shared( + GeneralParams::kVertexIndexCol, arrow::int64())); + } + for (const auto& lab : labels) { + fields.push_back(std::make_shared(lab, arrow::boolean())); + } + return arrow::schema(fields); +} Status GeneralCast(const std::shared_ptr& in, const std::shared_ptr& to_type, std::shared_ptr* out) { @@ -148,6 +160,29 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( PropertyGroupToSchema(property_group_, true)); } +// initialize for labels +VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( + const std::shared_ptr& vertex_info, + const std::vector& labels, const std::string& prefix, + const util::FilterOptions& options) + : vertex_info_(std::move(vertex_info)), + labels_(labels), + chunk_index_(0), + seek_id_(0), + schema_(nullptr), + chunk_table_(nullptr), + filter_options_(options) { + GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); + + std::string base_dir = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" + + std::to_string(chunk_index_); + GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_, + util::GetVertexChunkNum(prefix_, vertex_info)); + GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_, + util::GetVertexNum(prefix_, vertex_info_)); + GAR_ASSIGN_OR_RAISE_ERROR(schema_, LabelToSchema(labels)); +} + Status VertexPropertyArrowChunkReader::seek(IdType id) { seek_id_ = id; IdType pre_chunk_index = chunk_index_; @@ -186,6 +221,24 @@ VertexPropertyArrowChunkReader::GetChunk() { return chunk_table_->Slice(row_offset); } +Result> +VertexPropertyArrowChunkReader::GetLabelChunk() { + FileType filetype = FileType::PARQUET; + if (chunk_table_ == nullptr) { + std::string path = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" + + std::to_string(chunk_index_); + GAR_ASSIGN_OR_RAISE(chunk_table_, + fs_->ReadFileToTable(path, filetype, filter_options_)); + // TODO(acezen): filter pushdown doesn't support cast schema now + // if (schema_ != nullptr && filter_options_.filter == nullptr) { + // GAR_RETURN_NOT_OK( + // CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); + // } + } + IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); + return chunk_table_->Slice(row_offset); +} + Status VertexPropertyArrowChunkReader::next_chunk() { if (++chunk_index_ >= chunk_num_) { return Status::IndexError( @@ -215,6 +268,16 @@ VertexPropertyArrowChunkReader::Make( vertex_info, property_group, prefix, options); } +// Make for labels +Result> +VertexPropertyArrowChunkReader::Make( + const std::shared_ptr& vertex_info, + const std::vector& labels, const std::string& prefix, + const util::FilterOptions& options) { + return std::make_shared(vertex_info, labels, + prefix, options); +} + Result> VertexPropertyArrowChunkReader::Make( const std::shared_ptr& graph_info, const std::string& type, diff --git a/cpp/src/graphar/arrow/chunk_reader.h b/cpp/src/graphar/arrow/chunk_reader.h index b204753bd..f39f7b314 100644 --- a/cpp/src/graphar/arrow/chunk_reader.h +++ b/cpp/src/graphar/arrow/chunk_reader.h @@ -53,6 +53,19 @@ class VertexPropertyArrowChunkReader { const std::shared_ptr& property_group, const std::string& prefix, const util::FilterOptions& options = {}); + VertexPropertyArrowChunkReader() : vertex_info_(nullptr), prefix_("") {} + + /** + * @brief Initialize the VertexPropertyArrowChunkReader. + * + * @param vertex_info The vertex info that describes the vertex type. + * @param labels The labels of the vertex type. + * @param prefix The absolute prefix. + */ + VertexPropertyArrowChunkReader(const std::shared_ptr& vertex_info, + const std::vector& labels, + const std::string& prefix, + const util::FilterOptions& options = {}); /** * @brief Sets chunk position indicator for reader by internal vertex id. * If internal vertex id is not found, will return Status::IndexError @@ -67,7 +80,11 @@ class VertexPropertyArrowChunkReader { * @brief Return the current arrow chunk table of chunk position indicator. */ Result> GetChunk(); - + /** + * @brief Return the current arrow label chunk table of chunk position + * indicator. + */ + Result> GetLabelChunk(); /** * @brief Sets chunk position indicator to next chunk. * @@ -109,6 +126,19 @@ class VertexPropertyArrowChunkReader { const std::shared_ptr& property_group, const std::string& prefix, const util::FilterOptions& options = {}); + /** + * @brief Create a VertexPropertyArrowChunkReader instance from vertex info + * for labels. + * + * @param vertex_info The vertex info. + * @param prefix The absolute prefix of the graph. + * @param options The filter options, default is empty. + */ + static Result> Make( + const std::shared_ptr& vertex_info, + const std::vector& labels, const std::string& prefix, + const util::FilterOptions& options); + /** * @brief Create a VertexPropertyArrowChunkReader instance from graph info and * property group. @@ -142,6 +172,7 @@ class VertexPropertyArrowChunkReader { std::shared_ptr vertex_info_; std::shared_ptr property_group_; std::string prefix_; + std::vector labels_; IdType chunk_index_; IdType seek_id_; IdType chunk_num_; diff --git a/cpp/src/graphar/arrow/chunk_writer.cc b/cpp/src/graphar/arrow/chunk_writer.cc index 2e3e73539..c58c98ef1 100644 --- a/cpp/src/graphar/arrow/chunk_writer.cc +++ b/cpp/src/graphar/arrow/chunk_writer.cc @@ -17,7 +17,7 @@ * under the License. */ -#include +#include #include #include "arrow/api.h" @@ -251,6 +251,23 @@ Status VertexPropertyWriter::WriteChunk( return Status::OK(); } +Status VertexPropertyWriter::WriteLabelChunk( + const std::shared_ptr& input_table, IdType chunk_index, + FileType file_type, ValidateLevel validate_level) const { + auto schema = input_table->schema(); + std::vector indices; + for (int i = 0; i < schema->num_fields(); i++) { + indices.push_back(i); + } + + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table, + input_table->SelectColumns(indices)); + std::string suffix = + vertex_info_->GetPrefix() + "labels/chunk" + std::to_string(chunk_index); + std::string path = prefix_ + suffix; + return fs_->WriteTableToFile(input_table, file_type, path); +} + Status VertexPropertyWriter::WriteTable( const std::shared_ptr& input_table, const std::shared_ptr& property_group, @@ -287,9 +304,116 @@ Status VertexPropertyWriter::WriteTable( GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group, start_chunk_index, validate_level)); } + auto labels = vertex_info_->GetLabels(); + if (!labels.empty()) { + GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTable(input_table, labels)) + // WARNING!!! WARNING!!! WARNING!!! This is using for experiments + // GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTableAndRandomlyAddLabels + // (input_table, labels)) + GAR_RETURN_NOT_OK(WriteLabelTable(label_table, start_chunk_index, + FileType::PARQUET, validate_level)); + } + + return Status::OK(); +} + +// Helper function to split a string by a delimiter +std::vector SplitString(const std::string& str, char delimiter) { + std::vector tokens; + std::string token; + std::istringstream tokenStream(str); + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + return tokens; +} + +Status VertexPropertyWriter::WriteLabelTable( + const std::shared_ptr& input_table, IdType start_chunk_index, + FileType file_type, ValidateLevel validate_level) const { + auto schema = input_table->schema(); + int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol); + IdType chunk_size = vertex_info_->GetChunkSize(); + int64_t length = input_table->num_rows(); + IdType chunk_index = start_chunk_index; + for (int64_t offset = 0; offset < length; + offset += chunk_size, chunk_index++) { + auto in_chunk = input_table->Slice(offset, chunk_size); + GAR_RETURN_NOT_OK( + WriteLabelChunk(in_chunk, chunk_index, file_type, validate_level)); + } return Status::OK(); } +Result> VertexPropertyWriter::GetLabelTable( + const std::shared_ptr& input_table, + const std::vector& labels) const { + // Find the label column index + auto label_col_idx = + input_table->schema()->GetFieldIndex(GeneralParams::kLabelCol); + if (label_col_idx == -1) { + return Status::KeyError("label column not found in the input table."); + } + + // Create a matrix of booleans with dimensions [number of rows, number of + // labels] + std::vector> bool_matrix( + input_table->num_rows(), std::vector(labels.size(), false)); + + // Create a map for labels to column indices + std::unordered_map label_to_index; + for (size_t i = 0; i < labels.size(); ++i) { + label_to_index[labels[i]] = i; + } + + int row_offset = 0; // Offset for where to fill the bool_matrix + // Iterate through each chunk of the :LABEL column + for (int64_t chunk_idx = 0; + chunk_idx < input_table->column(label_col_idx)->num_chunks(); + ++chunk_idx) { + auto chunk = input_table->column(label_col_idx)->chunk(chunk_idx); + auto label_column = std::static_pointer_cast(chunk); + + // Populate the matrix based on :LABEL column values + for (int64_t row = 0; row < label_column->length(); ++row) { + if (label_column->IsValid(row)) { + std::string labels_string = label_column->GetString(row); + auto row_labels = SplitString(labels_string, ';'); + for (const auto& lbl : row_labels) { + if (label_to_index.find(lbl) != label_to_index.end()) { + bool_matrix[row_offset + row][label_to_index[lbl]] = true; + } + } + } + } + + row_offset += + label_column->length(); // Update the row offset for the next chunk + } + + // Create Arrow arrays for each label column + arrow::FieldVector fields; + arrow::ArrayVector arrays; + + for (const auto& label : labels) { + arrow::BooleanBuilder builder; + for (const auto& row : bool_matrix) { + builder.Append(row[label_to_index[label]]); + } + + std::shared_ptr array; + builder.Finish(&array); + fields.push_back(arrow::field(label, arrow::boolean())); + arrays.push_back(array); + } + + // Create the Arrow Table with the boolean columns + auto schema = std::make_shared(fields); + auto result_table = arrow::Table::Make(schema, arrays); + + return result_table; +} + Result> VertexPropertyWriter::Make( const std::shared_ptr& vertex_info, const std::string& prefix, const ValidateLevel& validate_level) { diff --git a/cpp/src/graphar/arrow/chunk_writer.h b/cpp/src/graphar/arrow/chunk_writer.h index 65a8813ef..23c7f415f 100644 --- a/cpp/src/graphar/arrow/chunk_writer.h +++ b/cpp/src/graphar/arrow/chunk_writer.h @@ -117,6 +117,21 @@ class VertexPropertyWriter { const std::shared_ptr& property_group, IdType chunk_index, ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** + * @brief Write all labels of a single vertex chunk + * to corresponding files. + * + * @param input_table The table containing data. + * @param chunk_index The index of the vertex chunk. + * @param validate_level The validate level for this operation, + * which is the writer's validate level by default. + * @return Status: ok or error. + */ + Status WriteLabelChunk( + const std::shared_ptr& input_table, IdType chunk_index, + FileType file_type, + ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** * @brief Write all property groups of a single vertex chunk * to corresponding files. @@ -163,6 +178,35 @@ class VertexPropertyWriter { IdType start_chunk_index, ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** + * @brief Write all labels for multiple vertex chunks + * to corresponding files. + * + * @param input_table The table containing data. + * @param start_chunk_index The start index of the vertex chunks. + * @param validate_level The validate level for this operation, + * which is the writer's validate level by default. + * @return Status: ok or error. + */ + Status WriteLabelTable( + const std::shared_ptr& input_table, + IdType start_chunk_index, FileType file_type, + ValidateLevel validate_level = ValidateLevel::default_validate) const; + + /** + * @brief Get label column from table to formulate label table + * @param input_table The table containing data. + * @param labels The labels. + * @return The table only containing label columns + * */ + Result> GetLabelTable( + const std::shared_ptr& input_table, + const std::vector& labels) const; + + Result> GetLabelTableAndRandomlyAddLabels( + const std::shared_ptr& input_table, + const std::vector& labels) const; + /** * @brief Construct a VertexPropertyWriter from vertex info. * diff --git a/cpp/src/graphar/filesystem.cc b/cpp/src/graphar/filesystem.cc index 62b9ab2e8..d83d4855d 100644 --- a/cpp/src/graphar/filesystem.cc +++ b/cpp/src/graphar/filesystem.cc @@ -233,8 +233,13 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr& table, break; } case FileType::PARQUET: { + auto schema = table->schema(); + auto column_num = schema->num_fields(); parquet::WriterProperties::Builder builder; builder.compression(arrow::Compression::type::ZSTD); // enable compression + for (int i = 0; i < column_num; ++i) { + builder.encoding(schema->field(i)->name(), parquet::Encoding::RLE); + } RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable( *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024, builder.build(), parquet::default_arrow_writer_properties())); diff --git a/cpp/src/graphar/fwd.h b/cpp/src/graphar/fwd.h index 0c0190399..236f07986 100644 --- a/cpp/src/graphar/fwd.h +++ b/cpp/src/graphar/fwd.h @@ -133,7 +133,8 @@ std::shared_ptr CreateAdjacentList( */ std::shared_ptr CreateVertexInfo( const std::string& type, IdType chunk_size, - const PropertyGroupVector& property_groups, const std::string& prefix = "", + const PropertyGroupVector& property_groups, + const std::vector& labels = {}, const std::string& prefix = "", std::shared_ptr version = nullptr); /** @@ -167,6 +168,7 @@ std::shared_ptr CreateEdgeInfo( * @param name The name of the graph * @param vertex_infos The vertex info vector of the graph * @param edge_infos The edge info vector of the graph + * @param labels The vertex labels of the graph. * @param prefix The absolute path prefix to store chunk files of the graph. * Defaults to "./" * @param version The version of the graph info @@ -175,7 +177,8 @@ std::shared_ptr CreateEdgeInfo( */ std::shared_ptr CreateGraphInfo( const std::string& name, const VertexInfoVector& vertex_infos, - const EdgeInfoVector& edge_infos, const std::string& prefix, + const EdgeInfoVector& edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version = nullptr, const std::unordered_map& extra_info = {}); diff --git a/cpp/src/graphar/general_params.h b/cpp/src/graphar/general_params.h index 55db90fb9..da9ed7acf 100644 --- a/cpp/src/graphar/general_params.h +++ b/cpp/src/graphar/general_params.h @@ -27,6 +27,7 @@ struct GeneralParams { static constexpr const char* kDstIndexCol = "_graphArDstIndex"; static constexpr const char* kOffsetCol = "_graphArOffset"; static constexpr const char* kPrimaryCol = "_graphArPrimary"; + static constexpr const char* kLabelCol = ":LABEL"; }; } // namespace graphar diff --git a/cpp/src/graphar/graph_info.cc b/cpp/src/graphar/graph_info.cc index 8c8aa3c20..81d1bba70 100644 --- a/cpp/src/graphar/graph_info.cc +++ b/cpp/src/graphar/graph_info.cc @@ -191,10 +191,12 @@ class VertexInfo::Impl { public: Impl(const std::string& type, IdType chunk_size, const std::string& prefix, const PropertyGroupVector& property_groups, + const std::vector& labels, std::shared_ptr version) : type_(type), chunk_size_(chunk_size), property_groups_(std::move(property_groups)), + labels_(labels), prefix_(prefix), version_(std::move(version)) { if (prefix_.empty()) { @@ -241,6 +243,7 @@ class VertexInfo::Impl { std::string type_; IdType chunk_size_; PropertyGroupVector property_groups_; + std::vector labels_; std::string prefix_; std::shared_ptr version_; std::unordered_map property_name_to_index_; @@ -252,9 +255,11 @@ class VertexInfo::Impl { VertexInfo::VertexInfo(const std::string& type, IdType chunk_size, const PropertyGroupVector& property_groups, + const std::vector& labels, const std::string& prefix, std::shared_ptr version) - : impl_(new Impl(type, chunk_size, prefix, property_groups, version)) {} + : impl_(new Impl(type, chunk_size, prefix, property_groups, labels, + version)) {} VertexInfo::~VertexInfo() = default; @@ -264,6 +269,10 @@ IdType VertexInfo::GetChunkSize() const { return impl_->chunk_size_; } const std::string& VertexInfo::GetPrefix() const { return impl_->prefix_; } +const std::vector& VertexInfo::GetLabels() const { + return impl_->labels_; +} + const std::shared_ptr& VertexInfo::version() const { return impl_->version_; } @@ -367,21 +376,22 @@ Result> VertexInfo::AddPropertyGroup( } return std::make_shared( impl_->type_, impl_->chunk_size_, - AddVectorElement(impl_->property_groups_, property_group), impl_->prefix_, - impl_->version_); + AddVectorElement(impl_->property_groups_, property_group), impl_->labels_, + impl_->prefix_, impl_->version_); } bool VertexInfo::IsValidated() const { return impl_->is_validated(); } std::shared_ptr CreateVertexInfo( const std::string& type, IdType chunk_size, - const PropertyGroupVector& property_groups, const std::string& prefix, + const PropertyGroupVector& property_groups, + const std::vector& labels, const std::string& prefix, std::shared_ptr version) { if (type.empty() || chunk_size <= 0) { return nullptr; } - return std::make_shared(type, chunk_size, property_groups, prefix, - version); + return std::make_shared(type, chunk_size, property_groups, labels, + prefix, version); } Result> VertexInfo::Load( @@ -396,6 +406,13 @@ Result> VertexInfo::Load( if (!yaml->operator[]("prefix").IsNone()) { prefix = yaml->operator[]("prefix").As(); } + std::vector labels; + const auto& labels_node = yaml->operator[]("labels"); + if (labels_node.IsSequence()) { + for (auto it = labels_node.Begin(); it != labels_node.End(); it++) { + labels.push_back((*it).second.As()); + } + } std::shared_ptr version = nullptr; if (!yaml->operator[]("version").IsNone()) { GAR_ASSIGN_OR_RAISE( @@ -430,8 +447,8 @@ Result> VertexInfo::Load( std::make_shared(property_vec, file_type, pg_prefix)); } } - return std::make_shared(type, chunk_size, property_groups, prefix, - version); + return std::make_shared(type, chunk_size, property_groups, labels, + prefix, version); } Result> VertexInfo::Load(const std::string& input) { @@ -449,6 +466,13 @@ Result VertexInfo::Dump() const noexcept { node["type"] = impl_->type_; node["chunk_size"] = std::to_string(impl_->chunk_size_); node["prefix"] = impl_->prefix_; + if (impl_->labels_.size() > 0) { + node["labels"]; + for (const auto& label : impl_->labels_) { + node["labels"].PushBack(); + node["labels"][node["labels"].Size() - 1] = label; + } + } for (const auto& pg : impl_->property_groups_) { ::Yaml::Node pg_node; if (!pg->GetPrefix().empty()) { @@ -1042,8 +1066,18 @@ static Result> ConstructGraphInfo( edge_infos.push_back(edge_info); } } - return std::make_shared(name, vertex_infos, edge_infos, prefix, - version, extra_info); + + std::vector labels; + if (!graph_meta->operator[]("labels").IsNone()) { + const auto& labels_node = graph_meta->operator[]("labels"); + if (labels_node.IsSequence()) { + for (auto it = labels_node.Begin(); it != labels_node.End(); it++) { + labels.push_back((*it).second.As()); + } + } + } + return std::make_shared(name, vertex_infos, edge_infos, labels, + prefix, version, extra_info); } } // namespace @@ -1051,12 +1085,13 @@ static Result> ConstructGraphInfo( class GraphInfo::Impl { public: Impl(const std::string& graph_name, VertexInfoVector vertex_infos, - EdgeInfoVector edge_infos, const std::string& prefix, - std::shared_ptr version, + EdgeInfoVector edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version, const std::unordered_map& extra_info) : name_(graph_name), vertex_infos_(std::move(vertex_infos)), edge_infos_(std::move(edge_infos)), + labels_(labels), prefix_(prefix), version_(std::move(version)), extra_info_(extra_info) { @@ -1099,6 +1134,7 @@ class GraphInfo::Impl { std::string name_; VertexInfoVector vertex_infos_; EdgeInfoVector edge_infos_; + std::vector labels_; std::string prefix_; std::shared_ptr version_; std::unordered_map extra_info_; @@ -1108,16 +1144,20 @@ class GraphInfo::Impl { GraphInfo::GraphInfo( const std::string& graph_name, VertexInfoVector vertex_infos, - EdgeInfoVector edge_infos, const std::string& prefix, - std::shared_ptr version, + EdgeInfoVector edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version, const std::unordered_map& extra_info) : impl_(new Impl(graph_name, std::move(vertex_infos), std::move(edge_infos), - prefix, version, extra_info)) {} + labels, prefix, version, extra_info)) {} GraphInfo::~GraphInfo() = default; const std::string& GraphInfo::GetName() const { return impl_->name_; } +const std::vector& GraphInfo::GetLabels() const { + return impl_->labels_; +} + const std::string& GraphInfo::GetPrefix() const { return impl_->prefix_; } const std::shared_ptr& GraphInfo::version() const { @@ -1196,7 +1236,7 @@ Result> GraphInfo::AddVertex( } return std::make_shared( impl_->name_, AddVectorElement(impl_->vertex_infos_, vertex_info), - impl_->edge_infos_, impl_->prefix_, impl_->version_); + impl_->edge_infos_, impl_->labels_, impl_->prefix_, impl_->version_); } Result> GraphInfo::AddEdge( @@ -1210,20 +1250,20 @@ Result> GraphInfo::AddEdge( } return std::make_shared( impl_->name_, impl_->vertex_infos_, - AddVectorElement(impl_->edge_infos_, edge_info), impl_->prefix_, - impl_->version_); + AddVectorElement(impl_->edge_infos_, edge_info), impl_->labels_, + impl_->prefix_, impl_->version_); } std::shared_ptr CreateGraphInfo( const std::string& name, const VertexInfoVector& vertex_infos, - const EdgeInfoVector& edge_infos, const std::string& prefix, - std::shared_ptr version, + const EdgeInfoVector& edge_infos, const std::vector& labels, + const std::string& prefix, std::shared_ptr version, const std::unordered_map& extra_info) { if (name.empty()) { return nullptr; } - return std::make_shared(name, vertex_infos, edge_infos, prefix, - version, extra_info); + return std::make_shared(name, vertex_infos, edge_infos, labels, + prefix, version, extra_info); } Result> GraphInfo::Load(const std::string& path) { @@ -1275,6 +1315,13 @@ Result GraphInfo::Dump() const { edge->GetDstType()) + ".edge.yaml"; } + if (impl_->labels_.size() > 0) { + node["labels"]; + for (const auto& label : impl_->labels_) { + node["labels"].PushBack(); + node["labels"][node["labels"].Size() - 1] = label; + } + } if (impl_->version_ != nullptr) { node["version"] = impl_->version_->ToString(); } diff --git a/cpp/src/graphar/graph_info.h b/cpp/src/graphar/graph_info.h index f6d086966..b2b372b0c 100644 --- a/cpp/src/graphar/graph_info.h +++ b/cpp/src/graphar/graph_info.h @@ -180,12 +180,14 @@ class VertexInfo { * @param type The type of the vertex. * @param chunk_size The number of vertices in each vertex chunk. * @param property_groups The property group vector of the vertex. + * @param labels The labels of the vertex. * @param prefix The prefix of the vertex info. If left empty, the default * prefix will be set to the type of the vertex. * @param version The format version of the vertex info. */ explicit VertexInfo(const std::string& type, IdType chunk_size, const PropertyGroupVector& property_groups, + const std::vector& labels = {}, const std::string& prefix = "", std::shared_ptr version = nullptr); @@ -227,6 +229,12 @@ class VertexInfo { */ const std::shared_ptr& version() const; + /** + * Get the labels of the vertex. + * @return The labels of the vertex. + */ + const std::vector& GetLabels() const; + /** * Get the number of property groups of the vertex. * @@ -694,6 +702,7 @@ class GraphInfo { * @param graph_name The name of the graph. * @param vertex_infos The vertex info vector of the graph. * @param edge_infos The edge info vector of the graph. + * @param labels The vertex labels of the graph. * @param prefix The absolute path prefix to store chunk files of the graph. * Defaults to "./". * @param version The version of the graph info. @@ -701,7 +710,8 @@ class GraphInfo { */ explicit GraphInfo( const std::string& graph_name, VertexInfoVector vertex_infos, - EdgeInfoVector edge_infos, const std::string& prefix = "./", + EdgeInfoVector edge_infos, const std::vector& labels = {}, + const std::string& prefix = "./", std::shared_ptr version = nullptr, const std::unordered_map& extra_info = {}); @@ -753,6 +763,12 @@ class GraphInfo { */ const std::string& GetName() const; + /** + * @brief Get the vertex labels of the graph. + * @return The vertex labels of the graph. + */ + const std::vector& GetLabels() const; + /** * @brief Get the absolute path prefix of the chunk files. * @return The absolute path prefix of the chunk files. diff --git a/cpp/src/graphar/util.cc b/cpp/src/graphar/util.cc index e477188d6..7a3b1a829 100644 --- a/cpp/src/graphar/util.cc +++ b/cpp/src/graphar/util.cc @@ -86,6 +86,9 @@ Result GetArrowArrayData( } else if (array->type()->Equals(arrow::null())) { return reinterpret_cast( std::dynamic_pointer_cast(array).get()); + } else if (array->type()->Equals(arrow::boolean())) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); } else { return Status::TypeError("Array type - ", array->type()->ToString(), " is not supported yet..."); diff --git a/cpp/test/test_info.cc b/cpp/test/test_info.cc index c0349ee03..82d839e3a 100644 --- a/cpp/test/test_info.cc +++ b/cpp/test/test_info.cc @@ -183,7 +183,7 @@ TEST_CASE_METHOD(GlobalFixture, "VertexInfo") { {Property("p0", int32(), true), Property("p1", string(), false)}, FileType::CSV, "p0_p1/"); auto vertex_info = - CreateVertexInfo(type, chunk_size, {pg}, "test_vertex", version); + CreateVertexInfo(type, chunk_size, {pg}, {}, "test_vertex", version); SECTION("Basics") { REQUIRE(vertex_info->GetType() == type); @@ -224,24 +224,25 @@ TEST_CASE_METHOD(GlobalFixture, "VertexInfo") { auto invalid_pg = CreatePropertyGroup({Property("p0", nullptr, true)}, FileType::CSV); auto invalid_vertex_info0 = CreateVertexInfo(type, chunk_size, {invalid_pg}, - "test_vertex/", version); + {}, "test_vertex/", version); REQUIRE(invalid_vertex_info0->IsValidated() == false); - VertexInfo invalid_vertex_info1("", chunk_size, {pg}, "test_vertex/", + VertexInfo invalid_vertex_info1("", chunk_size, {pg}, {}, "test_vertex/", version); REQUIRE(invalid_vertex_info1.IsValidated() == false); - VertexInfo invalid_vertex_info2(type, 0, {pg}, "test_vertex/", version); + VertexInfo invalid_vertex_info2(type, 0, {pg}, {}, "test_vertex/", version); REQUIRE(invalid_vertex_info2.IsValidated() == false); // check if prefix empty auto vertex_info_empty_prefix = - CreateVertexInfo(type, chunk_size, {pg}, "", version); + CreateVertexInfo(type, chunk_size, {pg}, {}, "", version); REQUIRE(vertex_info_empty_prefix->IsValidated() == true); } SECTION("CreateVertexInfo") { - auto vertex_info3 = CreateVertexInfo("", chunk_size, {pg}, "test_vertex/"); + auto vertex_info3 = + CreateVertexInfo("", chunk_size, {pg}, {}, "test_vertex/"); REQUIRE(vertex_info3 == nullptr); - auto vertex_info4 = CreateVertexInfo(type, 0, {pg}, "test_vertex/"); + auto vertex_info4 = CreateVertexInfo(type, 0, {pg}, {}, "test_vertex/"); REQUIRE(vertex_info4 == nullptr); } @@ -267,7 +268,7 @@ version: gar/v1 )"; REQUIRE(dump_result.value() == expected); auto vertex_info_empty_version = - CreateVertexInfo(type, chunk_size, {pg}, "test_vertex/"); + CreateVertexInfo(type, chunk_size, {pg}, {}, "test_vertex/"); REQUIRE(vertex_info_empty_version->Dump().status().ok()); } @@ -521,7 +522,7 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { {Property("p0", int32(), true), Property("p1", string(), false)}, FileType::CSV, "p0_p1/"); auto vertex_info = - CreateVertexInfo("test_vertex", 100, {pg}, "test_vertex/", version); + CreateVertexInfo("test_vertex", 100, {pg}, {}, "test_vertex/", version); std::unordered_map extra_info = { {"category", "test graph"}}; auto edge_info = @@ -529,7 +530,7 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { {CreateAdjacentList(AdjListType::ordered_by_source, FileType::CSV, "adj_list/")}, {pg}, "test_edge/", version); - auto graph_info = CreateGraphInfo(name, {vertex_info}, {edge_info}, + auto graph_info = CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "test_graph/", version, extra_info); SECTION("Basics") { @@ -544,7 +545,7 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { SECTION("ExtraInfo") { auto graph_info_with_extra_info = - CreateGraphInfo(name, {vertex_info}, {edge_info}, "test_graph/", + CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "test_graph/", version, {{"key1", "value1"}, {"key2", "value2"}}); const auto& extra_info = graph_info_with_extra_info->GetExtraInfo(); REQUIRE(extra_info.size() == 2); @@ -580,9 +581,9 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { SECTION("IsValidated") { REQUIRE(graph_info->IsValidated() == true); auto invalid_vertex_info = - CreateVertexInfo("", 100, {pg}, "test_vertex/", version); + CreateVertexInfo("", 100, {pg}, {}, "test_vertex/", version); auto invalid_graph_info0 = CreateGraphInfo( - name, {invalid_vertex_info}, {edge_info}, "test_graph/", version); + name, {invalid_vertex_info}, {edge_info}, {}, "test_graph/", version); REQUIRE(invalid_graph_info0->IsValidated() == false); auto invalid_edge_info = CreateEdgeInfo("", "knows", "person", 1024, 100, 100, true, @@ -590,23 +591,23 @@ TEST_CASE_METHOD(GlobalFixture, "GraphInfo") { FileType::CSV, "adj_list/")}, {pg}, "test_edge/", version); auto invalid_graph_info1 = CreateGraphInfo( - name, {vertex_info}, {invalid_edge_info}, "test_graph/", version); + name, {vertex_info}, {invalid_edge_info}, {}, "test_graph/", version); REQUIRE(invalid_graph_info1->IsValidated() == false); - GraphInfo invalid_graph_info2("", {vertex_info}, {edge_info}, "test_graph/", - version); + GraphInfo invalid_graph_info2("", {vertex_info}, {edge_info}, {}, + "test_graph/", version); REQUIRE(invalid_graph_info2.IsValidated() == false); - GraphInfo invalid_graph_info3(name, {vertex_info}, {edge_info}, "", + GraphInfo invalid_graph_info3(name, {vertex_info}, {edge_info}, {}, "", version); REQUIRE(invalid_graph_info3.IsValidated() == false); // check if prefix empty, graph_info with empty prefix is invalid auto graph_info_with_empty_prefix = - CreateGraphInfo(name, {vertex_info}, {edge_info}, "", version); + CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "", version); REQUIRE(graph_info_with_empty_prefix->IsValidated() == false); } SECTION("CreateGraphInfo") { auto graph_info_empty_name = - CreateGraphInfo("", {vertex_info}, {edge_info}, "test_graph/"); + CreateGraphInfo("", {vertex_info}, {edge_info}, {}, "test_graph/"); REQUIRE(graph_info_empty_name == nullptr); } @@ -626,7 +627,7 @@ version: gar/v1 )"; REQUIRE(dump_result.value() == expected); auto graph_info_empty_version = - CreateGraphInfo(name, {vertex_info}, {edge_info}, "test_graph/"); + CreateGraphInfo(name, {vertex_info}, {edge_info}, {}, "test_graph/"); REQUIRE(graph_info_empty_version->Dump().status().ok()); } @@ -638,8 +639,8 @@ version: gar/v1 } SECTION("AddVertex") { - auto vertex_info2 = - CreateVertexInfo("test_vertex2", 100, {pg}, "test_vertex2/", version); + auto vertex_info2 = CreateVertexInfo("test_vertex2", 100, {pg}, {}, + "test_vertex2/", version); auto maybe_extend_info = graph_info->AddVertex(vertex_info2); REQUIRE(maybe_extend_info.status().ok()); auto extend_info = maybe_extend_info.value(); @@ -778,25 +779,25 @@ version: gar/v1 /* TODO(acezen): need to mock S3 server to test this case, this private service is not available for public access. - -TEST_CASE_METHOD(GlobalFixture, "LoadFromS3") { - // explicitly call InitS3 to initialize S3 APIs before using - // S3 file system. - InitializeS3(); - std::string path = - "s3://graphscope/graphar/ldbc/ldbc.graph.yml" - "?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com"; - auto graph_info_result = GraphInfo::Load(path); - std::cout << graph_info_result.status().message() << std::endl; - REQUIRE(!graph_info_result.has_error()); - auto graph_info = graph_info_result.value(); - REQUIRE(graph_info->GetName() == "ldbc"); - const auto& vertex_infos = graph_info->GetVertexInfos(); - const auto& edge_infos = graph_info->GetEdgeInfos(); - REQUIRE(vertex_infos.size() == 8); - REQUIRE(edge_infos.size() == 23); - // explicitly call FinalizeS3 to avoid memory leak - FinalizeS3(); -} */ +// TEST_CASE_METHOD(GlobalFixture, "LoadFromS3") { +// // explicitly call InitS3 to initialize S3 APIs before using +// // S3 file system. +// InitializeS3(); +// std::string path = +// "s3://graphar/ldbc/ldbc.graph.yml" +// "?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com"; +// auto graph_info_result = GraphInfo::Load(path); +// std::cout << graph_info_result.status().message() << std::endl; +// REQUIRE(!graph_info_result.has_error()); +// auto graph_info = graph_info_result.value(); +// REQUIRE(graph_info->GetName() == "ldbc"); +// const auto& vertex_infos = graph_info->GetVertexInfos(); +// const auto& edge_infos = graph_info->GetEdgeInfos(); +// REQUIRE(vertex_infos.size() == 8); +// REQUIRE(edge_infos.size() == 23); +// // explicitly call FinalizeS3 to avoid memory leak +// FinalizeS3(); +// } + } // namespace graphar diff --git a/cpp/test/test_multi_label.cc b/cpp/test/test_multi_label.cc new file mode 100644 index 000000000..111b3620d --- /dev/null +++ b/cpp/test/test_multi_label.cc @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/csv/api.h" +#include "arrow/filesystem/api.h" +#include "arrow/io/api.h" +#include "arrow/stl.h" +#include "arrow/util/uri.h" +#include "parquet/arrow/writer.h" + +#include "./util.h" +#include "graphar/api/high_level_writer.h" + +#include + +std::shared_ptr read_csv_to_table(const std::string& filename) { + arrow::csv::ReadOptions read_options{}; + arrow::csv::ParseOptions parse_options{}; + arrow::csv::ConvertOptions convert_options{}; + + parse_options.delimiter = '|'; + + auto input = + arrow::io::ReadableFile::Open(filename, arrow::default_memory_pool()) + .ValueOrDie(); + + auto reader = arrow::csv::TableReader::Make(arrow::io::default_io_context(), + input, read_options, + parse_options, convert_options) + .ValueOrDie(); + + std::shared_ptr table; + table = reader->Read().ValueOrDie(); + + return table; +} + +namespace graphar { +TEST_CASE_METHOD(GlobalFixture, "test_multi_label_builder") { + std::cout << "Test multi label builder" << std::endl; + + // construct graph information from file + std::string path = test_data_dir + "/ldbc/parquet/" + "ldbc.graph.yml"; + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto vertex_info = graph_info->GetVertexInfo("organisation"); + + auto labels = vertex_info->GetLabels(); + + std::unordered_map code; + + std::vector> label_column_data; + + // read labels csv file as arrow table + auto table = read_csv_to_table(test_data_dir + + "/ldbc/organisation_0_0.csv"); + std::string table_message = table->ToString(); + + auto schema = table->schema(); + std::cout << schema->ToString() << std::endl; + + // write arrow table as chunk parquet + auto maybe_writer = + VertexPropertyWriter::Make(vertex_info, test_data_dir + "/ldbc/parquet/"); + REQUIRE(!maybe_writer.has_error()); + auto writer = maybe_writer.value(); + REQUIRE(writer->WriteTable(table, 0).ok()); + REQUIRE(writer->WriteVerticesNum(table->num_rows()).ok()); +} +} // namespace graphar