Skip to content

Commit

Permalink
apacheGH-42024: Allow specifying maximum chunk size to RecordBatchStream
Browse files Browse the repository at this point in the history
  • Loading branch information
F516B7AB244C4EDC9C46C82E519C790A committed Jun 7, 2024
1 parent 9ee6ea7 commit 9f61e12
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1743,8 +1743,9 @@ cdef class RecordBatchStream(FlightDataStream):
cdef:
object data_source
CIpcWriteOptions write_options
int64_t max_chunksize

def __init__(self, data_source, options=None):
def __init__(self, data_source, options=None, max_chunksize=None):
"""Create a RecordBatchStream from a data source.
Parameters
Expand All @@ -1753,13 +1754,20 @@ cdef class RecordBatchStream(FlightDataStream):
The data to stream to the client.
options : pyarrow.ipc.IpcWriteOptions, optional
Optional IPC options to control how to write the data.
max_chunksize : int, default None
Optional maximum number of rows for each chunk.
Only applicable if the data source is a Table.
"""
if (not isinstance(data_source, RecordBatchReader) and
not isinstance(data_source, lib.Table)):
raise TypeError("Expected RecordBatchReader or Table, "
"but got: {}".format(type(data_source)))
self.data_source = data_source
self.write_options = _get_options(options).c_options
if max_chunksize is not None:
self.max_chunksize = max_chunksize
else:
self.max_chunksize = 0

cdef CFlightDataStream* to_stream(self) except *:
cdef:
Expand All @@ -1768,7 +1776,10 @@ cdef class RecordBatchStream(FlightDataStream):
reader = (<RecordBatchReader> self.data_source).reader
elif isinstance(self.data_source, lib.Table):
table = (<Table> self.data_source).table
reader.reset(new TableBatchReader(deref(table)))
batch_reader = new TableBatchReader(deref(table))
if self.max_chunksize > 0:
batch_reader.set_chunksize(self.max_chunksize)
reader.reset(batch_reader)
else:
raise RuntimeError("Can't construct RecordBatchStream "
"from type {}".format(type(self.data_source)))
Expand Down

0 comments on commit 9f61e12

Please sign in to comment.