diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 22ca020d1ac2..be1bc64e21b8 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exec.Runtimes
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.{ArrowAbiUtil, Iterators}
import org.apache.gluten.vectorized._
diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
index f2beef6ca6fb..77ef1c6422b2 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
@@ -18,7 +18,7 @@ package org.apache.spark.api.python
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.Iterators
import org.apache.gluten.vectorized.ArrowWritableColumnVector
diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
index f61cdb8e9986..6150507b4baa 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.utils
import org.apache.gluten.datasource.DatasourceJniWrapper
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.spark.sql.types.StructType
diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 88678cb5e5c3..7385c53d61b3 100644
--- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exec.Runtimes
import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.{ArrowAbiUtil, Iterators}
import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 26d249f9056b..23dff990c464 100644
--- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException}
diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 97ebb932ac05..c358d6372c36 100644
--- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.GlutenRowSplitter
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.{ArrowAbiUtil, DatasourceUtil}
diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index c5bd8874853a..d72977f59714 100644
--- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -18,7 +18,7 @@ package org.apache.spark.shuffle
import org.apache.gluten.GlutenConfig
import org.apache.gluten.exec.Runtimes
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.ArrowAbiUtil
import org.apache.gluten.vectorized._
diff --git a/gluten-data/pom.xml b/gluten-data/pom.xml
index 951f53ee83d3..db617112f652 100644
--- a/gluten-data/pom.xml
+++ b/gluten-data/pom.xml
@@ -137,6 +137,34 @@
compile
+
+ org.apache.arrow
+ arrow-dataset
+ ${arrow.version}
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-buffer
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ protobuf-java
+ com.google.protobuf
+
+
+ compile
+
diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 85ef875d69a2..624428dcba19 100644
--- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -19,7 +19,7 @@
import org.apache.gluten.exception.GlutenException;
import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.Runtimes;
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators;
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.memory.nmm.NativeMemoryManager;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ArrowBufferAllocators.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
similarity index 98%
rename from gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ArrowBufferAllocators.java
rename to gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
index fcac26d6ebb1..efee20e48b83 100644
--- a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ArrowBufferAllocators.java
+++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.memory.arrowalloc;
+package org.apache.gluten.memory.arrow.alloc;
import org.apache.gluten.memory.memtarget.MemoryTargets;
diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ManagedAllocationListener.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
similarity index 98%
rename from gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ManagedAllocationListener.java
rename to gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
index ded2af60bd3a..a76c0aabee3b 100644
--- a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ManagedAllocationListener.java
+++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.memory.arrowalloc;
+package org.apache.gluten.memory.arrow.alloc;
import org.apache.gluten.GlutenConfig;
import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
new file mode 100644
index 000000000000..04a6e0002ade
--- /dev/null
+++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.memory.arrow.pool;
+
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.spark.util.TaskResource;
+import org.apache.spark.util.TaskResources;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArrowNativeMemoryPool implements TaskResource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ArrowNativeMemoryPool.class);
+
+ private final NativeMemoryPool arrowPool;
+ private final ArrowReservationListener listener;
+
+ public ArrowNativeMemoryPool() {
+ listener = new ArrowReservationListener(TaskResources.getSharedUsage());
+ arrowPool = NativeMemoryPool.createListenable(listener);
+ }
+
+ public static NativeMemoryPool arrowPool(String name) {
+ if (!TaskResources.inSparkTask()) {
+ throw new IllegalStateException("This method must be called in a Spark task.");
+ }
+ String id = "ArrowNativeMemoryPool:" + name;
+ return TaskResources.addResourceIfNotRegistered(id, () -> createArrowNativeMemoryPool(name))
+ .getArrowPool();
+ }
+
+ private static ArrowNativeMemoryPool createArrowNativeMemoryPool(String name) {
+ return new ArrowNativeMemoryPool();
+ }
+
+ @Override
+ public void release() throws Exception {
+ if (arrowPool.getBytesAllocated() != 0) {
+ LOGGER.warn(
+ String.format(
+ "Arrow pool still reserved non-zero bytes, "
+ + "which may cause memory leak, size: %s. ",
+ Utils.bytesToString(arrowPool.getBytesAllocated())));
+ }
+ arrowPool.close();
+ }
+
+ @Override
+ public int priority() {
+ return 0;
+ }
+
+ @Override
+ public String resourceName() {
+ return "arrow_mem";
+ }
+
+ public NativeMemoryPool getArrowPool() {
+ return arrowPool;
+ }
+}
diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java
new file mode 100644
index 000000000000..2e1a254453f2
--- /dev/null
+++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.memory.arrow.pool;
+
+import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
+
+public class ArrowReservationListener implements org.apache.arrow.dataset.jni.ReservationListener {
+ private final SimpleMemoryUsageRecorder sharedUsage; // shared task metrics
+
+ public ArrowReservationListener(SimpleMemoryUsageRecorder recorder) {
+ this.sharedUsage = recorder;
+ }
+
+ @Override
+ public void reserve(long size) {
+ synchronized (this) {
+ sharedUsage.inc(size);
+ }
+ }
+
+ @Override
+ public void unreserve(long size) {
+ synchronized (this) {
+ sharedUsage.inc(-size);
+ }
+ }
+}
diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
index 4603cbc4187d..dfd570debc0a 100644
--- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
+++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.vectorized;
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators;
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
index 85b24166ebf5..bd89f62a1806 100644
--- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
+++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
@@ -18,7 +18,7 @@
import org.apache.gluten.columnarbatch.ColumnarBatchJniWrapper;
import org.apache.gluten.columnarbatch.ColumnarBatches;
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators;
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.spark.sql.vectorized.ColumnarBatch;
diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala b/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala
index 1a5ae42777ff..4ffb3ab5cdcc 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.utils
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.vectorized.ArrowWritableColumnVector
import org.apache.spark.sql.vectorized.ColumnarBatch
diff --git a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 9fbdb36377e3..e632700e3743 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.vectorized
import org.apache.gluten.GlutenConfig
import org.apache.gluten.exec.Runtimes
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.ArrowAbiUtil
diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index ac94bfa89d67..9d9f5ab1765c 100644
--- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exec.Runtimes
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.{ArrowAbiUtil, Iterators}
diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
index fd0cf45c8a2a..d3e7b409686a 100644
--- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.utils
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.Iterators
import org.apache.gluten.vectorized.{ArrowWritableColumnVector, NativeColumnarToRowInfo, NativeColumnarToRowJniWrapper, NativePartitioning}
diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
index 208afd294920..3e86be79ac3f 100644
--- a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.utils
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.vectorized.ArrowWritableColumnVector
import org.apache.spark.sql.vectorized.ColumnarBatch