Skip to content

Commit

Permalink
Access schema of current stream (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron authored Jun 26, 2024
1 parent 182e7df commit 70cb725
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions pyo3-arrow/src/record_batch_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ffi::CString;

use arrow::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::RecordBatchReader;
use arrow_schema::SchemaRef;
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::intern;
use pyo3::prelude::*;
Expand All @@ -18,6 +19,10 @@ use crate::PyTable;
pub struct PyRecordBatchReader(pub(crate) Option<Box<dyn RecordBatchReader + Send>>);

impl PyRecordBatchReader {
pub fn new(reader: Box<dyn RecordBatchReader + Send>) -> Self {
Self(Some(reader))
}

/// Returns `true` if this reader has already been consumed.
pub fn closed(&self) -> bool {
self.0.is_none()
Expand Down Expand Up @@ -48,6 +53,17 @@ impl PyRecordBatchReader {
Ok(PyTable::new(schema, batches))
}

/// Access the [SchemaRef] of this RecordBatchReader.
///
/// If the stream has already been consumed, this method will error.
pub fn schema_ref(&self) -> PyArrowResult<SchemaRef> {
let stream = self
.0
.as_ref()
.ok_or(PyIOError::new_err("Stream already closed."))?;
Ok(stream.schema())
}

/// Export this to a Python `arro3.core.RecordBatchReader`.
pub fn to_python(&mut self, py: Python) -> PyArrowResult<PyObject> {
let arro3_mod = py.import_bound(intern!(py, "arro3.core"))?;
Expand Down

0 comments on commit 70cb725

Please sign in to comment.