Skip to content

Commit

Permalink
feat(c++): write label chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
Elssky committed Sep 25, 2024
1 parent e265e0e commit 3f56896
Show file tree
Hide file tree
Showing 16 changed files with 508 additions and 75 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ if (BUILD_TESTS)
add_test(test_chunk_info_reader SRCS test/test_chunk_info_reader.cc)
add_test(test_arrow_chunk_reader SRCS test/test_arrow_chunk_reader.cc)
add_test(test_graph SRCS test/test_graph.cc)
add_test(test_multi_label SRCS test/test_multi_label.cc)

# enable_testing()
endif()
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/bgl_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ int main(int argc, char* argv[]) {
int chunk_size = 100;
auto version = graphar::InfoVersion::Parse("gar/v1").value();
auto new_info = graphar::CreateVertexInfo(vertex_type, chunk_size, {group},
vertex_prefix, version);
{}, vertex_prefix, version);
// dump new vertex info
ASSERT(new_info->IsValidated());
ASSERT(new_info->Dump().status().ok());
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/construct_info_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int main(int argc, char* argv[]) {
graphar::CreatePropertyGroup(property_vector_2, graphar::FileType::ORC);

// create vertex info
auto vertex_info = graphar::CreateVertexInfo(type, chunk_size, {group1},
auto vertex_info = graphar::CreateVertexInfo(type, chunk_size, {group1}, {},
vertex_prefix, version);

ASSERT(vertex_info != nullptr);
Expand Down Expand Up @@ -150,7 +150,7 @@ int main(int argc, char* argv[]) {

// create graph info
auto graph_info = graphar::CreateGraphInfo(name, {vertex_info}, {edge_info},
prefix, version);
{}, prefix, version);
ASSERT(graph_info->GetName() == name);
ASSERT(graph_info->GetPrefix() == prefix);
ASSERT(graph_info->GetVertexInfos().size() == 1);
Expand Down
6 changes: 3 additions & 3 deletions cpp/examples/snap_dataset_to_graphar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ int main(int argc, char* argv[]) {
std::string type = "node", vertex_prefix = "vertex/node/";

// create vertex info
auto vertex_info = graphar::CreateVertexInfo(type, VERTEX_CHUNK_SIZE, {},
auto vertex_info = graphar::CreateVertexInfo(type, VERTEX_CHUNK_SIZE, {}, {},
vertex_prefix, version);

// save & dump
Expand All @@ -75,8 +75,8 @@ int main(int argc, char* argv[]) {

/*------------------construct graph info------------------*/
// create graph info
auto graph_info = graphar::CreateGraphInfo(graph_name, {vertex_info},
{edge_info}, save_path, version);
auto graph_info = graphar::CreateGraphInfo(
graph_name, {vertex_info}, {edge_info}, {}, save_path, version);
// save & dump
ASSERT(!graph_info->Dump().has_error());
ASSERT(graph_info->Save(save_path + graph_name + ".graph.yml").ok());
Expand Down
63 changes: 63 additions & 0 deletions cpp/src/graphar/arrow/chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
return arrow::schema(fields);
}

Result<std::shared_ptr<arrow::Schema>> LabelToSchema(
std::vector<std::string> labels, bool contain_index_column = false) {
std::vector<std::shared_ptr<arrow::Field>> fields;
if (contain_index_column) {
fields.push_back(std::make_shared<arrow::Field>(
GeneralParams::kVertexIndexCol, arrow::int64()));
}
for (const auto& lab : labels) {
fields.push_back(std::make_shared<arrow::Field>(lab, arrow::boolean()));
}
return arrow::schema(fields);
}
Status GeneralCast(const std::shared_ptr<arrow::Array>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::Array>* out) {
Expand Down Expand Up @@ -148,6 +160,29 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
PropertyGroupToSchema(property_group_, true));
}

// initialize for labels
VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::vector<std::string>& labels, const std::string& prefix,
const util::FilterOptions& options)
: vertex_info_(std::move(vertex_info)),
labels_(labels),
chunk_index_(0),
seek_id_(0),
schema_(nullptr),
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));

std::string base_dir = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" +
std::to_string(chunk_index_);
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
util::GetVertexChunkNum(prefix_, vertex_info));
GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
util::GetVertexNum(prefix_, vertex_info_));
GAR_ASSIGN_OR_RAISE_ERROR(schema_, LabelToSchema(labels));
}

Status VertexPropertyArrowChunkReader::seek(IdType id) {
seek_id_ = id;
IdType pre_chunk_index = chunk_index_;
Expand Down Expand Up @@ -186,6 +221,24 @@ VertexPropertyArrowChunkReader::GetChunk() {
return chunk_table_->Slice(row_offset);
}

Result<std::shared_ptr<arrow::Table>>
VertexPropertyArrowChunkReader::GetLabelChunk() {
FileType filetype = FileType::PARQUET;
if (chunk_table_ == nullptr) {
std::string path = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" +
std::to_string(chunk_index_);
GAR_ASSIGN_OR_RAISE(chunk_table_,
fs_->ReadFileToTable(path, filetype, filter_options_));
// TODO(acezen): filter pushdown doesn't support cast schema now
// if (schema_ != nullptr && filter_options_.filter == nullptr) {
// GAR_RETURN_NOT_OK(
// CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
// }
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
}

Status VertexPropertyArrowChunkReader::next_chunk() {
if (++chunk_index_ >= chunk_num_) {
return Status::IndexError(
Expand Down Expand Up @@ -215,6 +268,16 @@ VertexPropertyArrowChunkReader::Make(
vertex_info, property_group, prefix, options);
}

// Make for labels
Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
VertexPropertyArrowChunkReader::Make(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::vector<std::string>& labels, const std::string& prefix,
const util::FilterOptions& options) {
return std::make_shared<VertexPropertyArrowChunkReader>(vertex_info, labels,
prefix, options);
}

Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
VertexPropertyArrowChunkReader::Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
Expand Down
33 changes: 32 additions & 1 deletion cpp/src/graphar/arrow/chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ class VertexPropertyArrowChunkReader {
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, const util::FilterOptions& options = {});

VertexPropertyArrowChunkReader() : vertex_info_(nullptr), prefix_("") {}

/**
* @brief Initialize the VertexPropertyArrowChunkReader.
*
* @param vertex_info The vertex info that describes the vertex type.
* @param labels The labels of the vertex type.
* @param prefix The absolute prefix.
*/
VertexPropertyArrowChunkReader(const std::shared_ptr<VertexInfo>& vertex_info,
const std::vector<std::string>& labels,
const std::string& prefix,
const util::FilterOptions& options = {});
/**
* @brief Sets chunk position indicator for reader by internal vertex id.
* If internal vertex id is not found, will return Status::IndexError
Expand All @@ -67,7 +80,11 @@ class VertexPropertyArrowChunkReader {
* @brief Return the current arrow chunk table of chunk position indicator.
*/
Result<std::shared_ptr<arrow::Table>> GetChunk();

/**
* @brief Return the current arrow label chunk table of chunk position
* indicator.
*/
Result<std::shared_ptr<arrow::Table>> GetLabelChunk();
/**
* @brief Sets chunk position indicator to next chunk.
*
Expand Down Expand Up @@ -109,6 +126,19 @@ class VertexPropertyArrowChunkReader {
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, const util::FilterOptions& options = {});

/**
* @brief Create a VertexPropertyArrowChunkReader instance from vertex info
* for labels.
*
* @param vertex_info The vertex info.
* @param prefix The absolute prefix of the graph.
* @param options The filter options, default is empty.
*/
static Result<std::shared_ptr<VertexPropertyArrowChunkReader>> Make(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::vector<std::string>& labels, const std::string& prefix,
const util::FilterOptions& options);

/**
* @brief Create a VertexPropertyArrowChunkReader instance from graph info and
* property group.
Expand Down Expand Up @@ -142,6 +172,7 @@ class VertexPropertyArrowChunkReader {
std::shared_ptr<VertexInfo> vertex_info_;
std::shared_ptr<PropertyGroup> property_group_;
std::string prefix_;
std::vector<std::string> labels_;
IdType chunk_index_;
IdType seek_id_;
IdType chunk_num_;
Expand Down
126 changes: 125 additions & 1 deletion cpp/src/graphar/arrow/chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

#include <iostream>
#include <unordered_map>
#include <utility>

#include "arrow/api.h"
Expand Down Expand Up @@ -251,6 +251,23 @@ Status VertexPropertyWriter::WriteChunk(
return Status::OK();
}

Status VertexPropertyWriter::WriteLabelChunk(
const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
FileType file_type, ValidateLevel validate_level) const {
auto schema = input_table->schema();
std::vector<int> indices;
for (int i = 0; i < schema->num_fields(); i++) {
indices.push_back(i);
}

GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table,
input_table->SelectColumns(indices));
std::string suffix =
vertex_info_->GetPrefix() + "labels/chunk" + std::to_string(chunk_index);
std::string path = prefix_ + suffix;
return fs_->WriteTableToFile(input_table, file_type, path);
}

Status VertexPropertyWriter::WriteTable(
const std::shared_ptr<arrow::Table>& input_table,
const std::shared_ptr<PropertyGroup>& property_group,
Expand Down Expand Up @@ -287,9 +304,116 @@ Status VertexPropertyWriter::WriteTable(
GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group,
start_chunk_index, validate_level));
}
auto labels = vertex_info_->GetLabels();
if (!labels.empty()) {
GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTable(input_table, labels))
// WARNING!!! WARNING!!! WARNING!!! This is using for experiments
// GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTableAndRandomlyAddLabels
// (input_table, labels))
GAR_RETURN_NOT_OK(WriteLabelTable(label_table, start_chunk_index,
FileType::PARQUET, validate_level));
}

return Status::OK();
}

// Helper function to split a string by a delimiter
std::vector<std::string> SplitString(const std::string& str, char delimiter) {
std::vector<std::string> tokens;
std::string token;
std::istringstream tokenStream(str);
while (std::getline(tokenStream, token, delimiter)) {
tokens.push_back(token);
}
return tokens;
}

Status VertexPropertyWriter::WriteLabelTable(
const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
FileType file_type, ValidateLevel validate_level) const {
auto schema = input_table->schema();
int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
IdType chunk_size = vertex_info_->GetChunkSize();
int64_t length = input_table->num_rows();
IdType chunk_index = start_chunk_index;
for (int64_t offset = 0; offset < length;
offset += chunk_size, chunk_index++) {
auto in_chunk = input_table->Slice(offset, chunk_size);
GAR_RETURN_NOT_OK(
WriteLabelChunk(in_chunk, chunk_index, file_type, validate_level));
}
return Status::OK();
}

Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::GetLabelTable(
const std::shared_ptr<arrow::Table>& input_table,
const std::vector<std::string>& labels) const {
// Find the label column index
auto label_col_idx =
input_table->schema()->GetFieldIndex(GeneralParams::kLabelCol);
if (label_col_idx == -1) {
return Status::KeyError("label column not found in the input table.");
}

// Create a matrix of booleans with dimensions [number of rows, number of
// labels]
std::vector<std::vector<bool>> bool_matrix(
input_table->num_rows(), std::vector<bool>(labels.size(), false));

// Create a map for labels to column indices
std::unordered_map<std::string, int> label_to_index;
for (size_t i = 0; i < labels.size(); ++i) {
label_to_index[labels[i]] = i;
}

int row_offset = 0; // Offset for where to fill the bool_matrix
// Iterate through each chunk of the :LABEL column
for (int64_t chunk_idx = 0;
chunk_idx < input_table->column(label_col_idx)->num_chunks();
++chunk_idx) {
auto chunk = input_table->column(label_col_idx)->chunk(chunk_idx);
auto label_column = std::static_pointer_cast<arrow::StringArray>(chunk);

// Populate the matrix based on :LABEL column values
for (int64_t row = 0; row < label_column->length(); ++row) {
if (label_column->IsValid(row)) {
std::string labels_string = label_column->GetString(row);
auto row_labels = SplitString(labels_string, ';');
for (const auto& lbl : row_labels) {
if (label_to_index.find(lbl) != label_to_index.end()) {
bool_matrix[row_offset + row][label_to_index[lbl]] = true;
}
}
}
}

row_offset +=
label_column->length(); // Update the row offset for the next chunk
}

// Create Arrow arrays for each label column
arrow::FieldVector fields;
arrow::ArrayVector arrays;

for (const auto& label : labels) {
arrow::BooleanBuilder builder;
for (const auto& row : bool_matrix) {
builder.Append(row[label_to_index[label]]);
}

std::shared_ptr<arrow::Array> array;
builder.Finish(&array);
fields.push_back(arrow::field(label, arrow::boolean()));
arrays.push_back(array);
}

// Create the Arrow Table with the boolean columns
auto schema = std::make_shared<arrow::Schema>(fields);
auto result_table = arrow::Table::Make(schema, arrays);

return result_table;
}

Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
const ValidateLevel& validate_level) {
Expand Down
Loading

0 comments on commit 3f56896

Please sign in to comment.