diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 4faadfc6..4eaf86e6 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -9,4 +9,5 @@ if (Arrow_FOUND AND Parquet_FOUND) target_include_directories(parquet_tools PUBLIC ${Boost_INCLUDE_DIRS}) link_arrow_parquet(parquet_tools) endif () -endif () \ No newline at end of file +endif () +file(COPY ./parquet_tools_subcmd.json DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) \ No newline at end of file diff --git a/tools/parquet_tools.cpp b/tools/parquet_tools.cpp index bfc1c206..b70f0c54 100644 --- a/tools/parquet_tools.cpp +++ b/tools/parquet_tools.cpp @@ -11,14 +11,16 @@ #include #include +#include +#include + #include +#include #include #include #include #include -// #define NDEBUG - namespace stdfs = std::filesystem; struct options_t { @@ -27,14 +29,20 @@ struct options_t { bool variant = false; bool json = false; bool read_lines = false; - std::string out_prefix{"output"}; + std::string output_file_prefix{"output"}; }; +static constexpr char const* const ROWCOUNT = "rowcount"; +static constexpr char const* const SCHEMA = "schema"; +static constexpr char const* const DUMP = "dump"; +static constexpr char const* const CONVERT = "convert"; + bool parse_arguments(int argc, char** argv, options_t&, bool&); template -void show_usage(os_t&); +void show_usage(char** argv, os_t&); void count_rows(const options_t&, ygm::comm&); void dump(const options_t&, ygm::comm&); +void convert(const options_t&, ygm::comm&); int main(int argc, char** argv) { ygm::comm world(&argc, &argv); @@ -43,21 +51,23 @@ int main(int argc, char** argv) { bool show_help = false; if (!parse_arguments(argc, argv, opt, show_help)) { world.cerr0() << "Invalid arguments." << std::endl; - show_usage(world.cerr0()); + if (world.rank0()) show_usage(argv, std::cerr); } if (show_help) { - show_usage(world.cout0()); + if (world.rank0()) show_usage(argv, std::cout); return 0; } - if (opt.subcommand == "rowcount") { + if (opt.subcommand == ROWCOUNT) { count_rows(opt, world); - } else if (opt.subcommand == "schema") { + } else if (opt.subcommand == SCHEMA) { world.cout0() << "Schema" << std::endl; ygm::io::parquet_parser parquetp(world, {opt.input_path.c_str()}); world.cout0() << parquetp.schema_to_string() << std::endl; - } else if (opt.subcommand == "dump") { + } else if (opt.subcommand == DUMP) { dump(opt, world); + } else if (opt.subcommand == CONVERT) { + convert(opt, world); } else { world.cerr0() << "Unknown subcommand: " << opt.subcommand << std::endl; } @@ -70,14 +80,17 @@ int main(int argc, char** argv) { bool parse_arguments(int argc, char** argv, options_t& options, bool& show_help) { int opt; - while ((opt = getopt(argc, argv, "c:p:vjlo:h")) != -1) { + while ((opt = getopt(argc, argv, "c:i:vjro:h")) != -1) { switch (opt) { case 'c': options.subcommand = optarg; break; - case 'p': + case 'i': options.input_path = optarg; break; + case 'r': + options.read_lines = true; + break; case 'v': options.read_lines = true; options.variant = true; @@ -86,11 +99,8 @@ bool parse_arguments(int argc, char** argv, options_t& options, options.read_lines = true; options.json = true; break; - case 'l': - options.read_lines = true; - break; case 'o': - options.out_prefix = optarg; + options.output_file_prefix = optarg; break; case 'h': show_help = true; @@ -102,42 +112,78 @@ bool parse_arguments(int argc, char** argv, options_t& options, return true; } +// Only for rank 0 template -void show_usage(os_t& os) { - os << "Usage" << std::endl; +void show_usage(char** argv, os_t& os) { + os << "[Usage]" << std::endl; os << "mpirun -np <#of ranks> ./parquet-tools [options]" << std::endl; os << std::endl; - os << "Options" << std::endl; - os << "-c " << std::endl; - os << " rowcount" << std::endl; - os << " Return the number of rows in parquet files. If no subcommand " - "option was specified, return the value stored in the metadata without " - "actually reading the whole data and counting the number of lines." - << std::endl; - os << " schema" << std::endl; - os << " Show the schemas of parquet files." << std::endl; - os << " dump" << std::endl; - os << " Dump data to files. One output file per rank." << std::endl; - os << "-p " << std::endl; - os << " Parquet file path or a directory path that contains parquet files. " - "All parquet files must have the same schema." - << std::endl; - os << "-h Show this help message." << std::endl; + os << "[Options]" << std::endl; + os << " -c " << std::endl; + os << " Subcommand name followed by its options." << std::endl; + os << " -h Show this help message." << std::endl; + os << std::endl; + os << std::endl; + os << "[Subcommand Options]" << std::endl; - os << "Subcommand Usage" << std::endl; - os << "rowcount [options]" << std::endl; - os << " Options" << std::endl; - os << " -l Read rows w/o converting." << std::endl; - os << " -v Read rows converting to arrays of std::variant." << std::endl; - os << " -j Read rows converting to arrays of JSON objects." << std::endl; - os << "dump -o [options]" << std::endl; - os << " -o Prefix of output files." << std::endl; - os << " Options" << std::endl; - os << " -v Dump rows converting to arrays of std::variant (default)." - << std::endl; - os << " -j Dump rows converting to arrays of JSON objects." << std::endl; + std::filesystem::path subcommand_file = + std::filesystem::path(argv[0]).parent_path() / + "parquet_tools_subcmd.json"; + std::ifstream ifs(subcommand_file); + std::string content; + std::string line; + while (getline(ifs, line)) { + content += line + "\n"; + } + boost::json::string_view sv(content.c_str()); + boost::json::value v = boost::json::parse(sv); + + // JSON strings are double quoted by Boost.JSON. + // Remove leading and trailing quotes + auto format = [](boost::json::string bs) { + std::string s = bs.c_str(); + // Remove leading and trailing whitespace and double quotes + s.erase(s.begin(), std::find_if(s.begin(), s.end(), + [](int ch) { return !std::isspace(ch); })); + return s; + }; + + for (const auto& entry : v.as_array()) { + const auto& entry_obj = entry.as_object(); + os << format(entry_obj.at("cmd").as_string()); + os << ": " << format(entry_obj.at("desc").as_string()) << std::endl; + + // Required arguments + if (entry_obj.contains("req")) { + os << " Required arguments" << std::endl; + for (const auto& req : entry_obj.at("req").as_array()) { + auto& req_obj = req.as_object(); + os << " -" << format(req_obj.at("key").as_string()) << " "; + if (req_obj.contains("value")) { + os << " <" << format(req_obj.at("value").as_string()) << "> "; + } + os << format(req_obj.at("desc").as_string()) << std::endl; + } + } + + if (entry_obj.contains("opt")) { + os << " Optional arguments" << std::endl; + for (const auto& op : entry_obj.at("opt").as_array()) { + auto& op_obj = op.as_object(); + assert(op_obj.contains("key")); + os << " -" << format(op_obj.at("key").as_string()) << " "; + if (op_obj.contains("value")) { + assert(op_obj.contains("value")); + os << " <" << format(op_obj.at("value").as_string()) << "> "; + } + assert(op_obj.contains("desc")); + os << format(op_obj.at("desc").as_string()) << std::endl; + } + } + os << std::endl; + } } void count_rows(const options_t& opt, ygm::comm& world) { @@ -204,9 +250,8 @@ void dump(const options_t& opt, ygm::comm& world) { std::size_t num_rows = 0; std::size_t num_error_lines = 0; - std::string output_file_prefix = opt.out_prefix; std::filesystem::path output_path = - output_file_prefix + "-" + std::to_string(world.rank()); + std::string(opt.output_file_prefix) + "-" + std::to_string(world.rank()); std::ofstream ofs(output_path); if (!ofs) { world.cerr0() << "Failed to open the output file: " << output_path @@ -259,4 +304,78 @@ void dump(const options_t& opt, ygm::comm& world) { world.cout0() << "#of conversion error lines = " << world.all_reduce_sum(num_error_lines) << std::endl; } +} + +void convert(const options_t& opt, ygm::comm& world) { + std::string output_path = + std::string(opt.output_file_prefix) + "-" + std::to_string(world.rank()); + std::cout << "Output path: " << output_path << std::endl; + + std::filesystem::remove(output_path); + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, + arrow::io::FileOutputStream::Open(output_path)); + + parquet::WriterProperties::Builder builder; + auto properties = builder.build(); + parquet::schema::NodeVector fields; + std::shared_ptr schema = nullptr; + + parquet::StreamWriter parquet_writer; + ygm::io::csv_parser csvp(world, std::vector{opt.input_path}); + bool schema_defined = false; + csvp.for_all([&parquet_writer, &schema_defined, &fields, &schema, &properties, + &outfile](const auto& vfields) { + // Define schema once + if (!schema_defined) { + std::size_t col_no = 0; + for (const auto& f : vfields) { + std::string col_name = "col-" + std::to_string(col_no); + parquet::Type::type type(parquet::Type::type::UNDEFINED); + parquet::ConvertedType::type converted_type( + parquet::ConvertedType::NONE); + if (f.is_integer()) { + type = parquet::Type::type::INT64; + converted_type = parquet::ConvertedType::INT_64; + } else if (f.is_unsigned_integer()) { + const uint64_t v = f.as_unsigned_integer(); + type = parquet::Type::type::INT64; + converted_type = parquet::ConvertedType::UINT_64; + } else if (f.is_double()) { + type = parquet::Type::type::DOUBLE; + converted_type = parquet::ConvertedType::NONE; + } else { + type = parquet::Type::type::BYTE_ARRAY; + converted_type = parquet::ConvertedType::UTF8; + } + fields.push_back(parquet::schema::PrimitiveNode::Make( + col_name, parquet::Repetition::REQUIRED, type, converted_type)); + ++col_no; + } + schema = std::static_pointer_cast( + parquet::schema::GroupNode::Make( + "schema", parquet::Repetition::REQUIRED, fields)); + parquet_writer = parquet::StreamWriter{ + parquet::ParquetFileWriter::Open(outfile, schema, properties)}; + schema_defined = true; + } + + for (auto f : vfields) { + if (f.is_integer()) { + const int64_t v = f.as_integer(); + parquet_writer << v; + } else if (f.is_unsigned_integer()) { + const int64_t v = f.as_unsigned_integer(); + parquet_writer << v; + } else if (f.is_double()) { + parquet_writer << f.as_double(); + } else { + parquet_writer << f.as_string(); + } + } + parquet_writer << parquet::EndRow; + }); + if (schema_defined) { + parquet_writer << parquet::EndRowGroup; + } } \ No newline at end of file diff --git a/tools/parquet_tools_subcmd.json b/tools/parquet_tools_subcmd.json new file mode 100644 index 00000000..d60668b4 --- /dev/null +++ b/tools/parquet_tools_subcmd.json @@ -0,0 +1,80 @@ +[ + { + "cmd": "rowcount", + "desc": "Return the number of rows in parquet files. If no subcommand option was specified, return the value stored in the metadata without reading the whole data or counting the number of lines.", + "req": [ + { + "key": "i", + "value": "path", + "desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema." + } + ], + "opt": [ + { + "key": "r", + "desc": "Read rows w/o converting." + }, + { + "key": "v", + "desc": "Read rows converting to arrays of std::variant." + }, + { + "key": "j", + "desc": "Read rows converting to arrays of JSON objects." + } + ] + }, + { + "cmd": "schema", + "desc": "Show the schemas of parquet files.", + "req": [ + { + "key": "i", + "value": "path", + "desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema." + } + ] + }, + { + "cmd": "dump", + "desc": "Dump data to files. One output file per rank.", + "req": [ + { + "key": "i", + "value": "path", + "desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema." + }, + { + "key": "o", + "value": "path", + "desc": "Prefix of output files." + } + ], + "opt": [ + { + "key": "v", + "desc": "Dump rows converting to arrays of std::variant (default)" + }, + { + "key": "j", + "desc": "Dump rows converting to arrays of JSON objects." + } + ] + }, + { + "cmd": "convert", + "desc": "Convert files to parquet files. Currently, only CSV is supported.", + "req": [ + { + "key": "i", + "value": "path", + "desc": "Path to an input non-parquet file or to a directory that contains non-parquet files. All CSV files must have the same column types." + }, + { + "key": "o", + "value": "path", + "desc": "Prefix of output parquet files." + } + ] + } +]