diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 072e149f41d..b655e4fc986 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -48,10 +48,9 @@ public class ArrowToTableConverter { private volatile boolean completed = false; - private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException { + private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) throws IOException { final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo(); - final ByteBuffer bb = ByteBuffer.wrap(ipcMessage); bb.order(ByteOrder.LITTLE_ENDIAN); final int continuation = bb.getInt(); final int metadata_size = bb.getInt(); @@ -70,7 +69,10 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip } @ScriptApi - public synchronized void setSchema(final byte[] ipcMessage) { + public synchronized void setSchema(final ByteBuffer ipcMessage) { + // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the + // return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy + // the data out of the input ByteBuffer to use after the return of this method. if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -82,7 +84,20 @@ public synchronized void setSchema(final byte[] ipcMessage) { } @ScriptApi - public synchronized void addRecordBatch(final byte[] ipcMessage) { + public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { + // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the + // return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy + // the data out of the input ByteBuffer to use after the return of this method. + for (final ByteBuffer ipcMessage : ipcMessages) { + addRecordBatch(ipcMessage); + } + } + + @ScriptApi + public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { + // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the + // return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy + // the data out of the input ByteBuffer to use after the return of this method. if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -121,6 +136,9 @@ public synchronized void onCompleted() throws InterruptedException { } protected void parseSchema(final Schema header) { + // The Schema instance (especially originated from Python) can't be assumed to be valid after the return + // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to make a copy of + // the header to use after the return of this method. if (resultTable != null) { throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Schema evolution not supported"); } @@ -139,6 +157,9 @@ protected void parseSchema(final Schema header) { } protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, int numColumns) { + // The BarrageProtoUtil.MessageInfo instance (especially originated from Python) can't be assumed to be valid + // after the return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need + // to make a copy of it to use after the return of this method. final BarrageMessage msg = new BarrageMessage(); final RecordBatch batch = (RecordBatch) mi.header.header(new RecordBatch()); @@ -192,7 +213,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i return msg; } - private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) { + private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) { final BarrageProtoUtil.MessageInfo mi; try { mi = parseArrowIpcMessage(ipcMessage); @@ -201,4 +222,6 @@ private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) { } return mi; } + + } diff --git a/py/server/deephaven/arrow.py b/py/server/deephaven/arrow.py index 9cefb97e03b..09fb8d73808 100644 --- a/py/server/deephaven/arrow.py +++ b/py/server/deephaven/arrow.py @@ -100,13 +100,10 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table: dh_schema = pa.schema(dh_fields) try: - pa_buffer = dh_schema.serialize() - j_barrage_table_builder.setSchema(dtypes.array(dtypes.byte, pa_buffer)) + j_barrage_table_builder.setSchema(jpy.byte_buffer(dh_schema.serialize())) record_batches = pa_table.to_batches() - for rb in record_batches: - pa_buffer = rb.serialize() - j_barrage_table_builder.addRecordBatch(dtypes.array(dtypes.byte, pa_buffer)) + j_barrage_table_builder.addRecordBatches([jpy.byte_buffer(rb.serialize()) for rb in record_batches]) j_barrage_table_builder.onCompleted() return Table(j_table=j_barrage_table_builder.getResultTable()) diff --git a/py/server/tests/test_arrow.py b/py/server/tests/test_arrow.py index 411128b2f4f..b56418a2d69 100644 --- a/py/server/tests/test_arrow.py +++ b/py/server/tests/test_arrow.py @@ -18,6 +18,7 @@ from deephaven.table import Table from tests.testbase import BaseTestCase + class ArrowTestCase(BaseTestCase): test_table: Table