Skip to content

Commit

Permalink
Feature/parquet tools (#213)
Browse files Browse the repository at this point in the history
* Add parquet-tools


---------

Co-authored-by: Keita Iwabuchi <[email protected]>
  • Loading branch information
KIwabuchi and Keita Iwabuchi authored Jun 21, 2024
1 parent 3c16820 commit a9371e2
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 48 deletions.
3 changes: 2 additions & 1 deletion tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ if (Arrow_FOUND AND Parquet_FOUND)
target_include_directories(parquet_tools PUBLIC ${Boost_INCLUDE_DIRS})
link_arrow_parquet(parquet_tools)
endif ()
endif ()
endif ()
file(COPY ./parquet_tools_subcmd.json DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
213 changes: 166 additions & 47 deletions tools/parquet_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
#include <string>
#include <vector>

#include <arrow/io/file.h>
#include <parquet/stream_writer.h>

#include <ygm/comm.hpp>
#include <ygm/io/csv_parser.hpp>
#include <ygm/io/detail/parquet2json.hpp>
#include <ygm/io/detail/parquet2variant.hpp>
#include <ygm/io/parquet_parser.hpp>
#include <ygm/utility.hpp>

// #define NDEBUG

namespace stdfs = std::filesystem;

struct options_t {
Expand All @@ -27,14 +29,20 @@ struct options_t {
bool variant = false;
bool json = false;
bool read_lines = false;
std::string out_prefix{"output"};
std::string output_file_prefix{"output"};
};

static constexpr char const* const ROWCOUNT = "rowcount";
static constexpr char const* const SCHEMA = "schema";
static constexpr char const* const DUMP = "dump";
static constexpr char const* const CONVERT = "convert";

bool parse_arguments(int argc, char** argv, options_t&, bool&);
template <typename os_t>
void show_usage(os_t&);
void show_usage(char** argv, os_t&);
void count_rows(const options_t&, ygm::comm&);
void dump(const options_t&, ygm::comm&);
void convert(const options_t&, ygm::comm&);

int main(int argc, char** argv) {
ygm::comm world(&argc, &argv);
Expand All @@ -43,21 +51,23 @@ int main(int argc, char** argv) {
bool show_help = false;
if (!parse_arguments(argc, argv, opt, show_help)) {
world.cerr0() << "Invalid arguments." << std::endl;
show_usage(world.cerr0());
if (world.rank0()) show_usage(argv, std::cerr);
}
if (show_help) {
show_usage(world.cout0());
if (world.rank0()) show_usage(argv, std::cout);
return 0;
}

if (opt.subcommand == "rowcount") {
if (opt.subcommand == ROWCOUNT) {
count_rows(opt, world);
} else if (opt.subcommand == "schema") {
} else if (opt.subcommand == SCHEMA) {
world.cout0() << "Schema" << std::endl;
ygm::io::parquet_parser parquetp(world, {opt.input_path.c_str()});
world.cout0() << parquetp.schema_to_string() << std::endl;
} else if (opt.subcommand == "dump") {
} else if (opt.subcommand == DUMP) {
dump(opt, world);
} else if (opt.subcommand == CONVERT) {
convert(opt, world);
} else {
world.cerr0() << "Unknown subcommand: " << opt.subcommand << std::endl;
}
Expand All @@ -70,14 +80,17 @@ int main(int argc, char** argv) {
bool parse_arguments(int argc, char** argv, options_t& options,
bool& show_help) {
int opt;
while ((opt = getopt(argc, argv, "c:p:vjlo:h")) != -1) {
while ((opt = getopt(argc, argv, "c:i:vjro:h")) != -1) {
switch (opt) {
case 'c':
options.subcommand = optarg;
break;
case 'p':
case 'i':
options.input_path = optarg;
break;
case 'r':
options.read_lines = true;
break;
case 'v':
options.read_lines = true;
options.variant = true;
Expand All @@ -86,11 +99,8 @@ bool parse_arguments(int argc, char** argv, options_t& options,
options.read_lines = true;
options.json = true;
break;
case 'l':
options.read_lines = true;
break;
case 'o':
options.out_prefix = optarg;
options.output_file_prefix = optarg;
break;
case 'h':
show_help = true;
Expand All @@ -102,42 +112,78 @@ bool parse_arguments(int argc, char** argv, options_t& options,
return true;
}

// Only for rank 0
template <typename os_t>
void show_usage(os_t& os) {
os << "Usage" << std::endl;
void show_usage(char** argv, os_t& os) {
os << "[Usage]" << std::endl;
os << "mpirun -np <#of ranks> ./parquet-tools [options]" << std::endl;
os << std::endl;

os << "Options" << std::endl;
os << "-c <subcommand>" << std::endl;
os << " rowcount" << std::endl;
os << " Return the number of rows in parquet files. If no subcommand "
"option was specified, return the value stored in the metadata without "
"actually reading the whole data and counting the number of lines."
<< std::endl;
os << " schema" << std::endl;
os << " Show the schemas of parquet files." << std::endl;
os << " dump" << std::endl;
os << " Dump data to files. One output file per rank." << std::endl;
os << "-p <path>" << std::endl;
os << " Parquet file path or a directory path that contains parquet files. "
"All parquet files must have the same schema."
<< std::endl;
os << "-h Show this help message." << std::endl;
os << "[Options]" << std::endl;
os << " -c <subcommand>" << std::endl;
os << " Subcommand name followed by its options." << std::endl;
os << " -h Show this help message." << std::endl;
os << std::endl;

os << std::endl;
os << "[Subcommand Options]" << std::endl;

os << "Subcommand Usage" << std::endl;
os << "rowcount [options]" << std::endl;
os << " Options" << std::endl;
os << " -l Read rows w/o converting." << std::endl;
os << " -v Read rows converting to arrays of std::variant." << std::endl;
os << " -j Read rows converting to arrays of JSON objects." << std::endl;
os << "dump -o <output file prefix> [options]" << std::endl;
os << " -o <path> Prefix of output files." << std::endl;
os << " Options" << std::endl;
os << " -v Dump rows converting to arrays of std::variant (default)."
<< std::endl;
os << " -j Dump rows converting to arrays of JSON objects." << std::endl;
std::filesystem::path subcommand_file =
std::filesystem::path(argv[0]).parent_path() /
"parquet_tools_subcmd.json";
std::ifstream ifs(subcommand_file);
std::string content;
std::string line;
while (getline(ifs, line)) {
content += line + "\n";
}
boost::json::string_view sv(content.c_str());
boost::json::value v = boost::json::parse(sv);

// JSON strings are double quoted by Boost.JSON.
// Remove leading and trailing quotes
auto format = [](boost::json::string bs) {
std::string s = bs.c_str();
// Remove leading and trailing whitespace and double quotes
s.erase(s.begin(), std::find_if(s.begin(), s.end(),
[](int ch) { return !std::isspace(ch); }));
return s;
};

for (const auto& entry : v.as_array()) {
const auto& entry_obj = entry.as_object();
os << format(entry_obj.at("cmd").as_string());
os << ": " << format(entry_obj.at("desc").as_string()) << std::endl;

// Required arguments
if (entry_obj.contains("req")) {
os << " Required arguments" << std::endl;
for (const auto& req : entry_obj.at("req").as_array()) {
auto& req_obj = req.as_object();
os << " -" << format(req_obj.at("key").as_string()) << " ";
if (req_obj.contains("value")) {
os << " <" << format(req_obj.at("value").as_string()) << "> ";
}
os << format(req_obj.at("desc").as_string()) << std::endl;
}
}

if (entry_obj.contains("opt")) {
os << " Optional arguments" << std::endl;
for (const auto& op : entry_obj.at("opt").as_array()) {
auto& op_obj = op.as_object();
assert(op_obj.contains("key"));
os << " -" << format(op_obj.at("key").as_string()) << " ";
if (op_obj.contains("value")) {
assert(op_obj.contains("value"));
os << " <" << format(op_obj.at("value").as_string()) << "> ";
}
assert(op_obj.contains("desc"));
os << format(op_obj.at("desc").as_string()) << std::endl;
}
}
os << std::endl;
}
}

void count_rows(const options_t& opt, ygm::comm& world) {
Expand Down Expand Up @@ -204,9 +250,8 @@ void dump(const options_t& opt, ygm::comm& world) {
std::size_t num_rows = 0;
std::size_t num_error_lines = 0;

std::string output_file_prefix = opt.out_prefix;
std::filesystem::path output_path =
output_file_prefix + "-" + std::to_string(world.rank());
std::string(opt.output_file_prefix) + "-" + std::to_string(world.rank());
std::ofstream ofs(output_path);
if (!ofs) {
world.cerr0() << "Failed to open the output file: " << output_path
Expand Down Expand Up @@ -259,4 +304,78 @@ void dump(const options_t& opt, ygm::comm& world) {
world.cout0() << "#of conversion error lines = "
<< world.all_reduce_sum(num_error_lines) << std::endl;
}
}

void convert(const options_t& opt, ygm::comm& world) {
std::string output_path =
std::string(opt.output_file_prefix) + "-" + std::to_string(world.rank());
std::cout << "Output path: " << output_path << std::endl;

std::filesystem::remove(output_path);
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile,
arrow::io::FileOutputStream::Open(output_path));

parquet::WriterProperties::Builder builder;
auto properties = builder.build();
parquet::schema::NodeVector fields;
std::shared_ptr<parquet::schema::GroupNode> schema = nullptr;

parquet::StreamWriter parquet_writer;
ygm::io::csv_parser csvp(world, std::vector<std::string>{opt.input_path});
bool schema_defined = false;
csvp.for_all([&parquet_writer, &schema_defined, &fields, &schema, &properties,
&outfile](const auto& vfields) {
// Define schema once
if (!schema_defined) {
std::size_t col_no = 0;
for (const auto& f : vfields) {
std::string col_name = "col-" + std::to_string(col_no);
parquet::Type::type type(parquet::Type::type::UNDEFINED);
parquet::ConvertedType::type converted_type(
parquet::ConvertedType::NONE);
if (f.is_integer()) {
type = parquet::Type::type::INT64;
converted_type = parquet::ConvertedType::INT_64;
} else if (f.is_unsigned_integer()) {
const uint64_t v = f.as_unsigned_integer();
type = parquet::Type::type::INT64;
converted_type = parquet::ConvertedType::UINT_64;
} else if (f.is_double()) {
type = parquet::Type::type::DOUBLE;
converted_type = parquet::ConvertedType::NONE;
} else {
type = parquet::Type::type::BYTE_ARRAY;
converted_type = parquet::ConvertedType::UTF8;
}
fields.push_back(parquet::schema::PrimitiveNode::Make(
col_name, parquet::Repetition::REQUIRED, type, converted_type));
++col_no;
}
schema = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make(
"schema", parquet::Repetition::REQUIRED, fields));
parquet_writer = parquet::StreamWriter{
parquet::ParquetFileWriter::Open(outfile, schema, properties)};
schema_defined = true;
}

for (auto f : vfields) {
if (f.is_integer()) {
const int64_t v = f.as_integer();
parquet_writer << v;
} else if (f.is_unsigned_integer()) {
const int64_t v = f.as_unsigned_integer();
parquet_writer << v;
} else if (f.is_double()) {
parquet_writer << f.as_double();
} else {
parquet_writer << f.as_string();
}
}
parquet_writer << parquet::EndRow;
});
if (schema_defined) {
parquet_writer << parquet::EndRowGroup;
}
}
80 changes: 80 additions & 0 deletions tools/parquet_tools_subcmd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
[
{
"cmd": "rowcount",
"desc": "Return the number of rows in parquet files. If no subcommand option was specified, return the value stored in the metadata without reading the whole data or counting the number of lines.",
"req": [
{
"key": "i",
"value": "path",
"desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema."
}
],
"opt": [
{
"key": "r",
"desc": "Read rows w/o converting."
},
{
"key": "v",
"desc": "Read rows converting to arrays of std::variant."
},
{
"key": "j",
"desc": "Read rows converting to arrays of JSON objects."
}
]
},
{
"cmd": "schema",
"desc": "Show the schemas of parquet files.",
"req": [
{
"key": "i",
"value": "path",
"desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema."
}
]
},
{
"cmd": "dump",
"desc": "Dump data to files. One output file per rank.",
"req": [
{
"key": "i",
"value": "path",
"desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema."
},
{
"key": "o",
"value": "path",
"desc": "Prefix of output files."
}
],
"opt": [
{
"key": "v",
"desc": "Dump rows converting to arrays of std::variant (default)"
},
{
"key": "j",
"desc": "Dump rows converting to arrays of JSON objects."
}
]
},
{
"cmd": "convert",
"desc": "Convert files to parquet files. Currently, only CSV is supported.",
"req": [
{
"key": "i",
"value": "path",
"desc": "Path to an input non-parquet file or to a directory that contains non-parquet files. All CSV files must have the same column types."
},
{
"key": "o",
"value": "path",
"desc": "Prefix of output parquet files."
}
]
}
]

0 comments on commit a9371e2

Please sign in to comment.