diff --git a/cpp/include/cudf/io/csv.hpp b/cpp/include/cudf/io/csv.hpp index dae056ef157..9b2de7c72ec 100644 --- a/cpp/include/cudf/io/csv.hpp +++ b/cpp/include/cudf/io/csv.hpp @@ -1362,7 +1362,7 @@ table_with_metadata read_csv( */ /** - *@brief Builder to build options for `writer_csv()`. + *@brief Builder to build options for `write_csv()`. */ class csv_writer_options_builder; diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index c09e06bfc59..59a970263e0 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -1,10 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. from libcpp cimport bool -from libcpp.memory cimport unique_ptr -from libcpp.string cimport string -from libcpp.utility cimport move -from libcpp.vector cimport vector cimport pylibcudf.libcudf.types as libcudf_types @@ -23,16 +19,7 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool -from pylibcudf.libcudf.io.csv cimport ( - csv_writer_options, - write_csv as cpp_write_csv, -) -from pylibcudf.libcudf.io.data_sink cimport data_sink -from pylibcudf.libcudf.io.types cimport sink_info -from pylibcudf.libcudf.table.table_view cimport table_view - -from cudf._lib.io.utils cimport make_sink_info -from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table +from cudf._lib.utils cimport data_from_pylibcudf_io import pylibcudf as plc @@ -318,59 +305,40 @@ def write_csv( -------- cudf.to_csv """ - cdef table_view input_table_view = table_view_from_table( - table, not index - ) - cdef bool include_header_c = header - cdef char delim_c = ord(sep) - cdef string line_term_c = lineterminator.encode() - cdef string na_c = na_rep.encode() - cdef int rows_per_chunk_c = rows_per_chunk - cdef vector[string] col_names - cdef string true_value_c = 'True'.encode() - cdef string false_value_c = 'False'.encode() - cdef unique_ptr[data_sink] data_sink_c - cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c) - - if header is True: - all_names = columns_apply_na_rep(table._column_names, na_rep) - if index is True: - all_names = table._index.names + all_names - - if len(all_names) > 0: - col_names.reserve(len(all_names)) - if len(all_names) == 1: - if all_names[0] in (None, ''): - col_names.push_back('""'.encode()) - else: - col_names.push_back( - str(all_names[0]).encode() - ) - else: - for idx, col_name in enumerate(all_names): - if col_name is None: - col_names.push_back(''.encode()) - else: - col_names.push_back( - str(col_name).encode() - ) - - cdef csv_writer_options options = move( - csv_writer_options.builder(sink_info_c, input_table_view) - .names(col_names) - .na_rep(na_c) - .include_header(include_header_c) - .rows_per_chunk(rows_per_chunk_c) - .line_terminator(line_term_c) - .inter_column_delimiter(delim_c) - .true_value(true_value_c) - .false_value(false_value_c) - .build() - ) - + index_and_not_empty = index is True and table.index is not None + columns = [ + col.to_pylibcudf(mode="read") for col in table.index._columns + ] if index_and_not_empty else [] + columns.extend(col.to_pylibcudf(mode="read") for col in table._columns) + col_names = [] + if header: + all_names = list(table.index.names) if index_and_not_empty else [] + all_names.extend( + na_rep if name is None or pd.isnull(name) + else name for name in table._column_names + ) + col_names = [ + '""' if (name in (None, '') and len(all_names) == 1) + else (str(name) if name not in (None, '') else '') + for name in all_names + ] try: - with nogil: - cpp_write_csv(options) + plc.io.csv.write_csv( + ( + plc.io.csv.CsvWriterOptions.builder( + plc.io.SinkInfo([path_or_buf]), plc.Table(columns) + ) + .names(col_names) + .na_rep(na_rep) + .include_header(header) + .rows_per_chunk(rows_per_chunk) + .line_terminator(str(lineterminator)) + .inter_column_delimiter(str(sep)) + .true_value("True") + .false_value("False") + .build() + ) + ) except OverflowError: raise OverflowError( f"Writing CSV file with chunksize={rows_per_chunk} failed. " @@ -419,11 +387,3 @@ cdef DataType _get_plc_data_type_from_dtype(object dtype) except *: dtype = cudf.dtype(dtype) return dtype_to_pylibcudf_type(dtype) - - -def columns_apply_na_rep(column_names, na_rep): - return tuple( - na_rep if pd.isnull(col_name) - else col_name - for col_name in column_names - ) diff --git a/python/pylibcudf/pylibcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/io/csv.pxd new file mode 100644 index 00000000000..f04edaa316a --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/csv.pxd @@ -0,0 +1,35 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp.vector cimport vector +from libcpp.string cimport string +from libcpp cimport bool +from pylibcudf.libcudf.io.csv cimport ( + csv_writer_options, + csv_writer_options_builder, +) +from pylibcudf.libcudf.io.types cimport quote_style +from pylibcudf.io.types cimport SinkInfo +from pylibcudf.table cimport Table + +cdef class CsvWriterOptions: + cdef csv_writer_options c_obj + cdef Table table + cdef SinkInfo sink + + +cdef class CsvWriterOptionsBuilder: + cdef csv_writer_options_builder c_obj + cdef Table table + cdef SinkInfo sink + cpdef CsvWriterOptionsBuilder names(self, list names) + cpdef CsvWriterOptionsBuilder na_rep(self, str val) + cpdef CsvWriterOptionsBuilder include_header(self, bool val) + cpdef CsvWriterOptionsBuilder rows_per_chunk(self, int val) + cpdef CsvWriterOptionsBuilder line_terminator(self, str term) + cpdef CsvWriterOptionsBuilder inter_column_delimiter(self, str delim) + cpdef CsvWriterOptionsBuilder true_value(self, str val) + cpdef CsvWriterOptionsBuilder false_value(self, str val) + cpdef CsvWriterOptions build(self) + + +cpdef void write_csv(CsvWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyi b/python/pylibcudf/pylibcudf/io/csv.pyi index 356825a927d..583b66bc29c 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyi +++ b/python/pylibcudf/pylibcudf/io/csv.pyi @@ -5,9 +5,11 @@ from collections.abc import Mapping from pylibcudf.io.types import ( CompressionType, QuoteStyle, + SinkInfo, SourceInfo, TableWithMetadata, ) +from pylibcudf.table import Table from pylibcudf.types import DataType def read_csv( @@ -52,3 +54,23 @@ def read_csv( # detect_whitespace_around_quotes: bool = False, # timestamp_type: DataType = DataType(type_id.EMPTY), ) -> TableWithMetadata: ... +def write_csv(options: CsvWriterOptionsBuilder) -> None: ... + +class CsvWriterOptions: + def __init__(self): ... + @staticmethod + def builder(sink: SinkInfo, table: Table) -> CsvWriterOptionsBuilder: ... + +class CsvWriterOptionsBuilder: + def __init__(self): ... + def names(self, names: list) -> CsvWriterOptionsBuilder: ... + def na_rep(self, val: str) -> CsvWriterOptionsBuilder: ... + def include_header(self, val: bool) -> CsvWriterOptionsBuilder: ... + def rows_per_chunk(self, val: int) -> CsvWriterOptionsBuilder: ... + def line_terminator(self, term: str) -> CsvWriterOptionsBuilder: ... + def inter_column_delimiter( + self, delim: str + ) -> CsvWriterOptionsBuilder: ... + def true_value(self, val: str) -> CsvWriterOptionsBuilder: ... + def false_value(self, val: str) -> CsvWriterOptionsBuilder: ... + def build(self) -> CsvWriterOptions: ... diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index 858e580ab34..8be391de2c2 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -2,14 +2,18 @@ from libcpp cimport bool from libcpp.map cimport map + from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport SourceInfo, SinkInfo, TableWithMetadata from pylibcudf.libcudf.io.csv cimport ( csv_reader_options, + csv_writer_options, read_csv as cpp_read_csv, + write_csv as cpp_write_csv, ) + from pylibcudf.libcudf.io.types cimport ( compression_type, quote_style, @@ -17,9 +21,14 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType +from pylibcudf.table cimport Table - -__all__ = ["read_csv"] +__all__ = [ + "read_csv", + "write_csv", + "CsvWriterOptions", + "CsvWriterOptionsBuilder", +] cdef tuple _process_parse_dates_hex(list cols): cdef vector[string] str_cols @@ -82,6 +91,8 @@ def read_csv( ): """Reads a CSV file into a :py:class:`~.types.TableWithMetadata`. + For details, see :cpp:func:`read_csv`. + Parameters ---------- source_info : SourceInfo @@ -263,3 +274,202 @@ def read_csv( c_result = move(cpp_read_csv(options)) return TableWithMetadata.from_libcudf(c_result) + + +# TODO: Implement the remaining methods +cdef class CsvWriterOptions: + """The settings to use for ``write_csv`` + + For details, see :cpp:class:`cudf::io::csv_writer_options` + """ + @staticmethod + def builder(SinkInfo sink, Table table): + """Create a CsvWriterOptionsBuilder object + + For details, see :cpp:func:`cudf::io::csv_writer_options::builder` + + Parameters + ---------- + sink : SinkInfo + The sink used for writer output + table : Table + Table to be written to output + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + cdef CsvWriterOptionsBuilder csv_builder = CsvWriterOptionsBuilder.__new__( + CsvWriterOptionsBuilder + ) + csv_builder.c_obj = csv_writer_options.builder(sink.c_obj, table.view()) + csv_builder.table = table + csv_builder.sink = sink + return csv_builder + + +# TODO: Implement the remaining methods +cdef class CsvWriterOptionsBuilder: + """Builder to build options for ``write_csv`` + + For details, see :cpp:class:`cudf::io::csv_writer_options_builder` + """ + cpdef CsvWriterOptionsBuilder names(self, list names): + """Sets optional column names. + + Parameters + ---------- + names : list[str] + Column names + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.names([name.encode() for name in names]) + return self + + cpdef CsvWriterOptionsBuilder na_rep(self, str val): + """Sets string to used for null entries. + + Parameters + ---------- + val : str + String to represent null value + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.na_rep(val.encode()) + return self + + cpdef CsvWriterOptionsBuilder include_header(self, bool val): + """Enables/Disables headers being written to csv. + + Parameters + ---------- + val : bool + Boolean value to enable/disable + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.include_header(val) + return self + + cpdef CsvWriterOptionsBuilder rows_per_chunk(self, int val): + """Sets maximum number of rows to process for each file write. + + Parameters + ---------- + val : int + Number of rows per chunk + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.rows_per_chunk(val) + return self + + cpdef CsvWriterOptionsBuilder line_terminator(self, str term): + """Sets character used for separating lines. + + Parameters + ---------- + term : str + Character to represent line termination + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.line_terminator(term.encode()) + return self + + cpdef CsvWriterOptionsBuilder inter_column_delimiter(self, str delim): + """Sets character used for separating column values. + + Parameters + ---------- + delim : str + Character to delimit column values + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.inter_column_delimiter(ord(delim)) + return self + + cpdef CsvWriterOptionsBuilder true_value(self, str val): + """Sets string used for values != 0 + + Parameters + ---------- + val : str + String to represent values != 0 + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.true_value(val.encode()) + return self + + cpdef CsvWriterOptionsBuilder false_value(self, str val): + """Sets string used for values == 0 + + Parameters + ---------- + val : str + String to represent values == 0 + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.false_value(val.encode()) + return self + + cpdef CsvWriterOptions build(self): + """Create a CsvWriterOptions object""" + cdef CsvWriterOptions csv_options = CsvWriterOptions.__new__( + CsvWriterOptions + ) + csv_options.c_obj = move(self.c_obj.build()) + csv_options.table = self.table + csv_options.sink = self.sink + return csv_options + + +cpdef void write_csv( + CsvWriterOptions options +): + """ + Write to CSV format. + + The table to write, output paths, and options are encapsulated + by the `options` object. + + For details, see :cpp:func:`write_csv`. + + Parameters + ---------- + options: CsvWriterOptions + Settings for controlling writing behavior + """ + + with nogil: + cpp_write_csv(move(options.c_obj)) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 7a3f16c4c50..51d5bda75c7 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -261,18 +261,24 @@ cdef cppclass iobase_data_sink(data_sink): cdef class SinkInfo: - """A class containing details on a source to read from. + """ + A class containing details about destinations (sinks) to write data to. - For details, see :cpp:class:`cudf::io::sink_info`. + For more details, see :cpp:class:`cudf::io::sink_info`. Parameters ---------- - sinks : list of str, PathLike, BytesIO, StringIO + sinks : list of str, PathLike, or io.IOBase instances + A list of sinks to write data to. Each sink can be: - A homogeneous list of sinks (this can be a string filename, - bytes, or one of the Python I/O classes) to read from. + - A string representing a filename. + - A PathLike object. + - An instance of a Python I/O class that is a subclass of io.IOBase + (eg., io.BytesIO, io.StringIO). - Mixing different types of sinks will raise a `ValueError`. + The list must be homogeneous in type unless all sinks are instances + of subclasses of io.IOBase. Mixing different types of sinks + (that are not all io.IOBase instances) will raise a ValueError. """ def __init__(self, list sinks): @@ -280,32 +286,42 @@ cdef class SinkInfo: cdef vector[string] paths if not sinks: - raise ValueError("Need to pass at least one sink") + raise ValueError("At least one sink must be provided.") if isinstance(sinks[0], os.PathLike): sinks = [os.path.expanduser(s) for s in sinks] cdef object initial_sink_cls = type(sinks[0]) - if not all(isinstance(s, initial_sink_cls) for s in sinks): - raise ValueError("All sinks must be of the same type!") + if not all( + isinstance(s, initial_sink_cls) or ( + isinstance(sinks[0], io.IOBase) and isinstance(s, io.IOBase) + ) for s in sinks + ): + raise ValueError( + "All sinks must be of the same type unless they are all instances " + "of subclasses of io.IOBase." + ) - if initial_sink_cls in {io.StringIO, io.BytesIO, io.TextIOBase}: + if isinstance(sinks[0], io.IOBase): data_sinks.reserve(len(sinks)) - if isinstance(sinks[0], (io.StringIO, io.BytesIO)): - for s in sinks: + for s in sinks: + if isinstance(s, (io.StringIO, io.BytesIO)): self.sink_storage.push_back( unique_ptr[data_sink](new iobase_data_sink(s)) ) - elif isinstance(sinks[0], io.TextIOBase): - for s in sinks: - if codecs.lookup(s).name not in ('utf-8', 'ascii'): + elif isinstance(s, io.TextIOBase): + if codecs.lookup(s.encoding).name not in ('utf-8', 'ascii'): raise NotImplementedError(f"Unsupported encoding {s.encoding}") self.sink_storage.push_back( unique_ptr[data_sink](new iobase_data_sink(s.buffer)) ) - data_sinks.push_back(self.sink_storage.back().get()) - elif initial_sink_cls is str: + else: + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s)) + ) + data_sinks.push_back(self.sink_storage.back().get()) + elif isinstance(sinks[0], str): paths.reserve(len(sinks)) for s in sinks: paths.push_back( s.encode()) diff --git a/python/pylibcudf/pylibcudf/tests/common/utils.py b/python/pylibcudf/pylibcudf/tests/common/utils.py index d95849ef371..58c94713d09 100644 --- a/python/pylibcudf/pylibcudf/tests/common/utils.py +++ b/python/pylibcudf/pylibcudf/tests/common/utils.py @@ -385,12 +385,10 @@ def make_source(path_or_buf, pa_table, format, **kwargs): NESTED_STRUCT_TESTING_TYPE, ] +NON_NESTED_PA_TYPES = NUMERIC_PA_TYPES + STRING_PA_TYPES + BOOL_PA_TYPES + DEFAULT_PA_TYPES = ( - NUMERIC_PA_TYPES - + STRING_PA_TYPES - + BOOL_PA_TYPES - + LIST_PA_TYPES - + DEFAULT_PA_STRUCT_TESTING_TYPES + NON_NESTED_PA_TYPES + LIST_PA_TYPES + DEFAULT_PA_STRUCT_TESTING_TYPES ) # Map pylibcudf compression types to pandas ones diff --git a/python/pylibcudf/pylibcudf/tests/conftest.py b/python/pylibcudf/pylibcudf/tests/conftest.py index 5265e411c7f..36ab6798d8a 100644 --- a/python/pylibcudf/pylibcudf/tests/conftest.py +++ b/python/pylibcudf/pylibcudf/tests/conftest.py @@ -15,7 +15,12 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common")) -from utils import ALL_PA_TYPES, DEFAULT_PA_TYPES, NUMERIC_PA_TYPES +from utils import ( + ALL_PA_TYPES, + DEFAULT_PA_TYPES, + NON_NESTED_PA_TYPES, + NUMERIC_PA_TYPES, +) def _type_to_str(typ): @@ -79,29 +84,13 @@ def _get_vals_of_type(pa_type, length, seed): ) -# TODO: Consider adding another fixture/adapting this -# fixture to consider nullability -@pytest.fixture(scope="session", params=[0, 100]) -def table_data(request): - """ - Returns (TableWithMetadata, pa_table). - - This is the default fixture you should be using for testing - pylibcudf I/O writers. - - Contains one of each category (e.g. int, bool, list, struct) - of dtypes. - """ - nrows = request.param - +# TODO: Consider adapting this helper function +# to consider nullability +def _generate_table_data(types, nrows, seed=42): table_dict = {} - # Colnames in the format expected by - # plc.io.TableWithMetadata colnames = [] - seed = 42 - - for typ in ALL_PA_TYPES: + for typ in types: child_colnames = [] def _generate_nested_data(typ): @@ -151,6 +140,32 @@ def _generate_nested_data(typ): ), pa_table +@pytest.fixture(scope="session", params=[0, 100]) +def table_data(request): + """ + Returns (TableWithMetadata, pa_table). + + This is the default fixture you should be using for testing + pylibcudf I/O writers. + + Contains one of each category (e.g. int, bool, list, struct) + of dtypes. + """ + nrows = request.param + return _generate_table_data(ALL_PA_TYPES, nrows) + + +@pytest.fixture(scope="session", params=[0, 100]) +def table_data_with_non_nested_pa_types(request): + """ + Returns (TableWithMetadata, pa_table). + + This fixture is for testing with non-nested PyArrow types. + """ + nrows = request.param + return _generate_table_data(NON_NESTED_PA_TYPES, nrows) + + @pytest.fixture(params=[(0, 0), ("half", 0), (-1, "half")]) def nrows_skiprows(table_data, request): """ diff --git a/python/pylibcudf/pylibcudf/tests/io/test_csv.py b/python/pylibcudf/pylibcudf/tests/io/test_csv.py index 22c83acc47c..90d2d0896a5 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_csv.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_csv.py @@ -10,6 +10,7 @@ _convert_types, assert_table_and_meta_eq, make_source, + sink_to_str, write_source_str, ) @@ -282,3 +283,87 @@ def test_read_csv_header(csv_table_data, source_or_sink, header): # list true_values = None, # list false_values = None, # bool dayfirst = False, + + +@pytest.mark.parametrize("sep", [",", "*"]) +@pytest.mark.parametrize("lineterminator", ["\n", "\n\n"]) +@pytest.mark.parametrize("header", [True, False]) +@pytest.mark.parametrize("rows_per_chunk", [8, 100]) +def test_write_csv( + table_data_with_non_nested_pa_types, + source_or_sink, + sep, + lineterminator, + header, + rows_per_chunk, +): + plc_tbl_w_meta, pa_table = table_data_with_non_nested_pa_types + sink = source_or_sink + + plc.io.csv.write_csv( + ( + plc.io.csv.CsvWriterOptions.builder( + plc.io.SinkInfo([sink]), plc_tbl_w_meta.tbl + ) + .names(plc_tbl_w_meta.column_names()) + .na_rep("") + .include_header(header) + .rows_per_chunk(rows_per_chunk) + .line_terminator(lineterminator) + .inter_column_delimiter(sep) + .true_value("True") + .false_value("False") + .build() + ) + ) + + # Convert everything to string to make comparisons easier + str_result = sink_to_str(sink) + + pd_result = pa_table.to_pandas().to_csv( + sep=sep, + lineterminator=lineterminator, + header=header, + index=False, + ) + + assert str_result == pd_result + + +@pytest.mark.parametrize("na_rep", ["", "NA"]) +def test_write_csv_na_rep(na_rep): + names = ["a", "b"] + pa_tbl = pa.Table.from_arrays( + [pa.array([1.0, 2.0, None]), pa.array([True, None, False])], + names=names, + ) + plc_tbl = plc.interop.from_arrow(pa_tbl) + plc_tbl_w_meta = plc.io.types.TableWithMetadata( + plc_tbl, column_names=[(name, []) for name in names] + ) + + sink = io.StringIO() + + plc.io.csv.write_csv( + ( + plc.io.csv.CsvWriterOptions.builder( + plc.io.SinkInfo([sink]), plc_tbl_w_meta.tbl + ) + .names(plc_tbl_w_meta.column_names()) + .na_rep(na_rep) + .include_header(True) + .rows_per_chunk(8) + .line_terminator("\n") + .inter_column_delimiter(",") + .true_value("True") + .false_value("False") + .build() + ) + ) + + # Convert everything to string to make comparisons easier + str_result = sink_to_str(sink) + + pd_result = pa_tbl.to_pandas().to_csv(na_rep=na_rep, index=False) + + assert str_result == pd_result