Skip to content

Commit

Permalink
Added read_ipc_stream method to Polars
Browse files Browse the repository at this point in the history
  • Loading branch information
ankane committed May 22, 2024
1 parent 5322ad8 commit fac3fb9
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- Updated Polars to 0.40.0
- Added `date_ranges` method to `Polars`
- Added `read_ipc_stream` method to `Polars`
- Added `write_ipc_stream` to `DataFrame`
- Added `flags` method to `DataFrame`
- Added support for keyword arguments to `agg` methods
Expand Down
25 changes: 25 additions & 0 deletions ext/polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,31 @@ impl RbDataFrame {
Ok(RbDataFrame::new(df))
}

pub fn read_ipc_stream(
rb_f: Value,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
n_rows: Option<usize>,
row_index: Option<(String, IdxSize)>,
rechunk: bool,
) -> RbResult<Self> {
let row_index = row_index.map(|(name, offset)| RowIndex {
name: Arc::from(name.as_str()),
offset,
});
// rb_f = read_if_bytesio(rb_f);
let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?;
let df = IpcStreamReader::new(mmap_bytes_r)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_index)
.set_rechunk(rechunk)
.finish()
.map_err(RbPolarsErr::from)?;
Ok(RbDataFrame::new(df))
}

pub fn read_avro(
rb_f: Value,
columns: Option<Vec<String>>,
Expand Down
1 change: 1 addition & 0 deletions ext/polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn init(ruby: &Ruby) -> RbResult<()> {
class.define_singleton_method("read_csv", function!(RbDataFrame::read_csv, -1))?;
class.define_singleton_method("read_parquet", function!(RbDataFrame::read_parquet, 9))?;
class.define_singleton_method("read_ipc", function!(RbDataFrame::read_ipc, 6))?;
class.define_singleton_method("read_ipc_stream", function!(RbDataFrame::read_ipc_stream, 6))?;
class.define_singleton_method("read_avro", function!(RbDataFrame::read_avro, 4))?;
class.define_singleton_method("from_rows", function!(RbDataFrame::from_rows, 3))?;
class.define_singleton_method("from_hashes", function!(RbDataFrame::from_hashes, 5))?;
Expand Down
76 changes: 76 additions & 0 deletions lib/polars/io/ipc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,82 @@ def _read_ipc_impl(
Utils.wrap_df(rbdf)
end

# Read into a DataFrame from Arrow IPC record batch stream.
#
# See "Streaming format" on https://arrow.apache.org/docs/python/ipc.html.
#
# @param source [Object]
# Path to a file or a file-like object (by "file-like object" we refer to objects
# that have a `read()` method, such as a file handler like the builtin `open`
# function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
# to open remote files.
# @param columns [Array]
# Columns to select. Accepts a list of column indices (starting at zero) or a list
# of column names.
# @param n_rows [Integer]
# Stop reading from IPC stream after reading `n_rows`.
# @param storage_options [Hash]
# Extra options that make sense for a particular storage connection.
# @param row_index_name [String]
# Insert a row index column with the given name into the DataFrame as the first
# column. If set to `nil` (default), no row index column is created.
# @param row_index_offset [Integer]
# Start the row index at this offset. Cannot be negative.
# Only used if `row_index_name` is set.
# @param rechunk [Boolean]
# Make sure that all data is contiguous.
#
# @return [DataFrame]
def read_ipc_stream(
source,
columns: nil,
n_rows: nil,
storage_options: nil,
row_index_name: nil,
row_index_offset: 0,
rechunk: true
)
storage_options ||= {}
_prepare_file_arg(source, **storage_options) do |data|
_read_ipc_stream_impl(
data,
columns: columns,
n_rows: n_rows,
row_index_name: row_index_name,
row_index_offset: row_index_offset,
rechunk: rechunk
)
end
end

# @private
def _read_ipc_stream_impl(
source,
columns: nil,
n_rows: nil,
row_index_name: nil,
row_index_offset: 0,
rechunk: true
)
if Utils.pathlike?(source)
source = Utils.normalize_filepath(source)
end
if columns.is_a?(String)
columns = [columns]
end

projection, columns = Utils.handle_projection_columns(columns)
pydf = RbDataFrame.read_ipc_stream(
source,
columns,
projection,
n_rows,
Utils._prepare_row_count_args(row_index_name, row_index_offset),
rechunk
)
Utils.wrap_df(pydf)
end

# Get a schema of the IPC file without reading data.
#
# @param source [Object]
Expand Down
1 change: 1 addition & 0 deletions test/ipc_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def test_write_ipc_stream
df = Polars::DataFrame.new({"a" => [1, 2, 3], "b" => ["one", "two", "three"]})
output = df.write_ipc_stream(nil)
assert_equal Encoding::BINARY, output.encoding
assert_equal df, Polars.read_ipc_stream(StringIO.new(output))
end

def test_sink_ipc
Expand Down

0 comments on commit fac3fb9

Please sign in to comment.