Skip to content

Commit

Permalink
Added write_ipc_stream to DataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
ankane committed May 22, 2024
1 parent 748478c commit 5322ad8
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- Updated Polars to 0.40.0
- Added `date_ranges` method to `Polars`
- Added `write_ipc_stream` to `DataFrame`
- Added `flags` method to `DataFrame`
- Added support for keyword arguments to `agg` methods
- Aliased `apply` to `map_rows` for `DataFrame`
Expand Down
1 change: 1 addition & 0 deletions ext/polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ features = [
"fmt",
"interpolate",
"ipc",
"ipc_streaming",
"is_first_distinct",
"is_in",
"is_last_distinct",
Expand Down
22 changes: 22 additions & 0 deletions ext/polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,28 @@ impl RbDataFrame {
Ok(())
}

pub fn write_ipc_stream(
&self,
rb_f: Value,
compression: Wrap<Option<IpcCompression>>,
) -> RbResult<()> {
if let Ok(s) = String::try_convert(rb_f) {
let f = std::fs::File::create(s).unwrap();
IpcStreamWriter::new(f)
.with_compression(compression.0)
.finish(&mut self.df.borrow_mut())
.map_err(RbPolarsErr::from)?
} else {
let mut buf = get_file_like(rb_f, true)?;

IpcStreamWriter::new(&mut buf)
.with_compression(compression.0)
.finish(&mut self.df.borrow_mut())
.map_err(RbPolarsErr::from)?;
}
Ok(())
}

pub fn write_avro(
&self,
rb_f: Value,
Expand Down
4 changes: 4 additions & 0 deletions ext/polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ fn init(ruby: &Ruby) -> RbResult<()> {
class.define_method("write_ndjson", method!(RbDataFrame::write_ndjson, 1))?;
class.define_method("write_csv", method!(RbDataFrame::write_csv, 10))?;
class.define_method("write_ipc", method!(RbDataFrame::write_ipc, 2))?;
class.define_method(
"write_ipc_stream",
method!(RbDataFrame::write_ipc_stream, 2),
)?;
class.define_method("row_tuple", method!(RbDataFrame::row_tuple, 1))?;
class.define_method("row_tuples", method!(RbDataFrame::row_tuples, 0))?;
class.define_method("to_numo", method!(RbDataFrame::to_numo, 0))?;
Expand Down
41 changes: 41 additions & 0 deletions lib/polars/data_frame.rb
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,47 @@ def write_ipc(file, compression: "uncompressed")
return_bytes ? file.string : nil
end

# Write to Arrow IPC record batch stream.
#
# See "Streaming format" in https://arrow.apache.org/docs/python/ipc.html.
#
# @param file [Object]
# Path or writable file-like object to which the IPC record batch data will
# be written. If set to `None`, the output is returned as a BytesIO object.
# @param compression ['uncompressed', 'lz4', 'zstd']
# Compression method. Defaults to "uncompressed".
#
# @return [Object]
#
# @example
# df = Polars::DataFrame.new(
# {
# "foo" => [1, 2, 3, 4, 5],
# "bar" => [6, 7, 8, 9, 10],
# "ham" => ["a", "b", "c", "d", "e"]
# }
# )
# df.write_ipc_stream("new_file.arrow")
def write_ipc_stream(
file,
compression: "uncompressed"
)
return_bytes = file.nil?
if return_bytes
file = StringIO.new
file.set_encoding(Encoding::BINARY)
elsif Utils.pathlike?(file)
file = Utils.normalize_filepath(file)
end

if compression.nil?
compression = "uncompressed"
end

_df.write_ipc_stream(file, compression)
return_bytes ? file.string : nil
end

# Write to Apache Parquet file.
#
# @param file [String, Pathname, StringIO]
Expand Down
6 changes: 6 additions & 0 deletions test/ipc_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ def test_write_ipc_to_string
assert_equal Encoding::BINARY, output.encoding
end

def test_write_ipc_stream
df = Polars::DataFrame.new({"a" => [1, 2, 3], "b" => ["one", "two", "three"]})
output = df.write_ipc_stream(nil)
assert_equal Encoding::BINARY, output.encoding
end

def test_sink_ipc
df = Polars::DataFrame.new({"a" => [1, 2, 3], "b" => ["one", "two", "three"]})
path = temp_path
Expand Down

0 comments on commit 5322ad8

Please sign in to comment.