From 99cb442a918358e79c4eb087d988b24377c1590d Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Mon, 11 Dec 2023 15:30:23 -0700 Subject: [PATCH 1/7] Use ByteBuffer to receive data from Python --- .../barrage/util/ArrowToTableConverter.java | 11 ++++++----- py/server/deephaven/arrow.py | 4 ++-- py/server/tests/test_arrow.py | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 072e149f41d..12063e81b6e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -48,10 +48,9 @@ public class ArrowToTableConverter { private volatile boolean completed = false; - private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException { + private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(ByteBuffer bb) throws IOException { final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo(); - final ByteBuffer bb = ByteBuffer.wrap(ipcMessage); bb.order(ByteOrder.LITTLE_ENDIAN); final int continuation = bb.getInt(); final int metadata_size = bb.getInt(); @@ -70,7 +69,7 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip } @ScriptApi - public synchronized void setSchema(final byte[] ipcMessage) { + public synchronized void setSchema(final ByteBuffer ipcMessage) { if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -82,7 +81,7 @@ public synchronized void setSchema(final byte[] ipcMessage) { } @ScriptApi - public synchronized void addRecordBatch(final byte[] ipcMessage) { + public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -192,7 +191,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i return msg; } - private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) { + private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) { final BarrageProtoUtil.MessageInfo mi; try { mi = parseArrowIpcMessage(ipcMessage); @@ -201,4 +200,6 @@ private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) { } return mi; } + + } diff --git a/py/server/deephaven/arrow.py b/py/server/deephaven/arrow.py index 9cefb97e03b..4b2cb6b90b7 100644 --- a/py/server/deephaven/arrow.py +++ b/py/server/deephaven/arrow.py @@ -101,12 +101,12 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table: try: pa_buffer = dh_schema.serialize() - j_barrage_table_builder.setSchema(dtypes.array(dtypes.byte, pa_buffer)) + j_barrage_table_builder.setSchema(pa_buffer) record_batches = pa_table.to_batches() for rb in record_batches: pa_buffer = rb.serialize() - j_barrage_table_builder.addRecordBatch(dtypes.array(dtypes.byte, pa_buffer)) + j_barrage_table_builder.addRecordBatch(pa_buffer) j_barrage_table_builder.onCompleted() return Table(j_table=j_barrage_table_builder.getResultTable()) diff --git a/py/server/tests/test_arrow.py b/py/server/tests/test_arrow.py index 411128b2f4f..b56418a2d69 100644 --- a/py/server/tests/test_arrow.py +++ b/py/server/tests/test_arrow.py @@ -18,6 +18,7 @@ from deephaven.table import Table from tests.testbase import BaseTestCase + class ArrowTestCase(BaseTestCase): test_table: Table From 92180d6a6e2e441f57c31b1c8350ce526626b58f Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 14 Dec 2023 09:50:03 -0700 Subject: [PATCH 2/7] Batch load arrow record batches --- .../extensions/barrage/util/ArrowToTableConverter.java | 7 +++++++ py/server/deephaven/arrow.py | 4 +--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 12063e81b6e..8d23a4dc7ef 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -80,6 +80,13 @@ public synchronized void setSchema(final ByteBuffer ipcMessage) { parseSchema((Schema) mi.header.header(new Schema())); } + @ScriptApi + public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { + for (final ByteBuffer ipcMessage : ipcMessages) { + addRecordBatch(ipcMessage); + } + } + @ScriptApi public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { if (completed) { diff --git a/py/server/deephaven/arrow.py b/py/server/deephaven/arrow.py index 4b2cb6b90b7..b61e62debb1 100644 --- a/py/server/deephaven/arrow.py +++ b/py/server/deephaven/arrow.py @@ -104,9 +104,7 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table: j_barrage_table_builder.setSchema(pa_buffer) record_batches = pa_table.to_batches() - for rb in record_batches: - pa_buffer = rb.serialize() - j_barrage_table_builder.addRecordBatch(pa_buffer) + j_barrage_table_builder.addRecordBatches([jpy.byte_buffer(rb.serialize()) for rb in record_batches]) j_barrage_table_builder.onCompleted() return Table(j_table=j_barrage_table_builder.getResultTable()) From 9d99c2b1cc8b2e74fbc7562957c33246f187eaaf Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 20 Dec 2023 12:50:10 -0700 Subject: [PATCH 3/7] Sync with changes to JPY --- py/server/deephaven/arrow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/py/server/deephaven/arrow.py b/py/server/deephaven/arrow.py index b61e62debb1..09fb8d73808 100644 --- a/py/server/deephaven/arrow.py +++ b/py/server/deephaven/arrow.py @@ -100,8 +100,7 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table: dh_schema = pa.schema(dh_fields) try: - pa_buffer = dh_schema.serialize() - j_barrage_table_builder.setSchema(pa_buffer) + j_barrage_table_builder.setSchema(jpy.byte_buffer(dh_schema.serialize())) record_batches = pa_table.to_batches() j_barrage_table_builder.addRecordBatches([jpy.byte_buffer(rb.serialize()) for rb in record_batches]) From 5a39c62dd29a88c7b29425d639415d21470a316b Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 10 Jan 2024 12:42:12 -0700 Subject: [PATCH 4/7] Add final modifier --- .../extensions/barrage/util/ArrowToTableConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 8d23a4dc7ef..3f9b22248bb 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -48,7 +48,7 @@ public class ArrowToTableConverter { private volatile boolean completed = false; - private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(ByteBuffer bb) throws IOException { + private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) throws IOException { final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo(); bb.order(ByteOrder.LITTLE_ENDIAN); From 7a4bb1e98b307013a049b668b1cd95ad73f1970d Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 10 Jan 2024 13:42:08 -0700 Subject: [PATCH 5/7] Add warning comments for memory safety risk --- .../barrage/util/ArrowToTableConverter.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 3f9b22248bb..aa8bdf4b73e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -48,7 +48,7 @@ public class ArrowToTableConverter { private volatile boolean completed = false; - private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) throws IOException { + private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) throws IOException { final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo(); bb.order(ByteOrder.LITTLE_ENDIAN); @@ -68,6 +68,11 @@ private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) t return mi; } + /** + * The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the return + * of this method. Until jpy-consortium/jpy#126 is resolved, we need to copy the input ByteBuffer to a new + * ByteBuffer instance that is safe to use after the return of this method. + */ @ScriptApi public synchronized void setSchema(final ByteBuffer ipcMessage) { if (completed) { @@ -80,6 +85,11 @@ public synchronized void setSchema(final ByteBuffer ipcMessage) { parseSchema((Schema) mi.header.header(new Schema())); } + /** + * The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the return + * of this method. Until jpy-consortium/jpy#126 is resolved, we need to copy the input ByteBuffer to a new + * ByteBuffer instance that is safe to use after the return of this method. + */ @ScriptApi public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { for (final ByteBuffer ipcMessage : ipcMessages) { @@ -87,6 +97,11 @@ public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { } } + /** + * The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the return + * of this method. Until jpy-consortium/jpy#126 is resolved, we need to copy the input ByteBuffer to a new + * ByteBuffer instance that is safe to use after the return of this method. + */ @ScriptApi public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { if (completed) { From 5d7fe30bfcde2ebb7d158351c845758538dc54b5 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 10 Jan 2024 14:50:46 -0700 Subject: [PATCH 6/7] Improve warning comments --- .../barrage/util/ArrowToTableConverter.java | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index aa8bdf4b73e..15c17478aab 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -68,13 +68,13 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffe return mi; } - /** - * The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the return - * of this method. Until jpy-consortium/jpy#126 is resolved, we need to copy the input ByteBuffer to a new - * ByteBuffer instance that is safe to use after the return of this method. - */ @ScriptApi public synchronized void setSchema(final ByteBuffer ipcMessage) { + // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the + // return + // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data + // out of + // the input ByteBuffer to use after the return of this method. if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -85,25 +85,25 @@ public synchronized void setSchema(final ByteBuffer ipcMessage) { parseSchema((Schema) mi.header.header(new Schema())); } - /** - * The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the return - * of this method. Until jpy-consortium/jpy#126 is resolved, we need to copy the input ByteBuffer to a new - * ByteBuffer instance that is safe to use after the return of this method. - */ @ScriptApi public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { + // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the + // return + // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data + // out of + // the input ByteBuffer to use after the return of this method. for (final ByteBuffer ipcMessage : ipcMessages) { addRecordBatch(ipcMessage); } } - /** - * The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the return - * of this method. Until jpy-consortium/jpy#126 is resolved, we need to copy the input ByteBuffer to a new - * ByteBuffer instance that is safe to use after the return of this method. - */ @ScriptApi public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { + // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the + // return + // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data + // out of + // the input ByteBuffer to use after the return of this method. if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -142,6 +142,9 @@ public synchronized void onCompleted() throws InterruptedException { } protected void parseSchema(final Schema header) { + // The Schema instance (especially originated from Python) can't be assumed to be valid after the return + // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to make a copy of + // the header to use after the return of this method. if (resultTable != null) { throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Schema evolution not supported"); } @@ -160,6 +163,9 @@ protected void parseSchema(final Schema header) { } protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, int numColumns) { + // The BarrageProtoUtil.MessageInfo instance (especially originated from Python) can't be assumed to be valid + // after the return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need + // to make a copy of it to use after the return of this method. final BarrageMessage msg = new BarrageMessage(); final RecordBatch batch = (RecordBatch) mi.header.header(new RecordBatch()); From fed587529f0ed9f95e2c2d6a4ccd2e7a866d5f6c Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 10 Jan 2024 14:55:23 -0700 Subject: [PATCH 7/7] Fix the format problems of the comments --- .../barrage/util/ArrowToTableConverter.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 15c17478aab..b655e4fc986 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -71,10 +71,8 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffe @ScriptApi public synchronized void setSchema(final ByteBuffer ipcMessage) { // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the - // return - // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data - // out of - // the input ByteBuffer to use after the return of this method. + // return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy + // the data out of the input ByteBuffer to use after the return of this method. if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -88,10 +86,8 @@ public synchronized void setSchema(final ByteBuffer ipcMessage) { @ScriptApi public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the - // return - // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data - // out of - // the input ByteBuffer to use after the return of this method. + // return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy + // the data out of the input ByteBuffer to use after the return of this method. for (final ByteBuffer ipcMessage : ipcMessages) { addRecordBatch(ipcMessage); } @@ -100,10 +96,8 @@ public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) { @ScriptApi public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { // The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the - // return - // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy the data - // out of - // the input ByteBuffer to use after the return of this method. + // return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy + // the data out of the input ByteBuffer to use after the return of this method. if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); }