From cee1f3bd869340c156feb0cdaf0a867f32343d54 Mon Sep 17 00:00:00 2001 From: Jin Chengcheng Date: Wed, 8 May 2024 11:35:25 +0800 Subject: [PATCH] [GLUTEN-5414] [VL] Support Arrow native memory pool usage track (#5550) --- .../execution/RowToVeloxColumnarExec.scala | 2 +- .../python/ColumnarArrowEvalPythonExec.scala | 2 +- .../apache/gluten/utils/DatasourceUtil.scala | 2 +- .../ColumnarCachedBatchSerializer.scala | 2 +- .../VeloxColumnarWriteFilesExec.scala | 2 +- .../velox/VeloxFormatWriterInjects.scala | 2 +- ...VeloxCelebornColumnarBatchSerializer.scala | 2 +- gluten-data/pom.xml | 28 +++++++ .../gluten/columnarbatch/ColumnarBatches.java | 2 +- .../alloc}/ArrowBufferAllocators.java | 2 +- .../alloc}/ManagedAllocationListener.java | 2 +- .../arrow/pool/ArrowNativeMemoryPool.java | 75 +++++++++++++++++++ .../arrow/pool/ArrowReservationListener.java | 41 ++++++++++ .../vectorized/ArrowWritableColumnVector.java | 2 +- .../vectorized/ColumnarBatchInIterator.java | 2 +- .../apache/gluten/utils/ImplicitClass.scala | 2 +- .../vectorized/ColumnarBatchSerializer.scala | 2 +- .../execution/ColumnarBuildSideRelation.scala | 2 +- .../spark/sql/execution/utils/ExecUtil.scala | 2 +- .../spark/sql/utils/SparkVectorUtil.scala | 2 +- 20 files changed, 161 insertions(+), 17 deletions(-) rename gluten-data/src/main/java/org/apache/gluten/memory/{arrowalloc => arrow/alloc}/ArrowBufferAllocators.java (98%) rename gluten-data/src/main/java/org/apache/gluten/memory/{arrowalloc => arrow/alloc}/ManagedAllocationListener.java (98%) create mode 100644 gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java create mode 100644 gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 22ca020d1ac2..be1bc64e21b8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.exception.GlutenException import org.apache.gluten.exec.Runtimes -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.{ArrowAbiUtil, Iterators} import org.apache.gluten.vectorized._ diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala index f2beef6ca6fb..77ef1c6422b2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.python import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.utils.Iterators import org.apache.gluten.vectorized.ArrowWritableColumnVector diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala index f61cdb8e9986..6150507b4baa 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala @@ -17,7 +17,7 @@ package org.apache.gluten.utils import org.apache.gluten.datasource.DatasourceJniWrapper -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.spark.sql.types.StructType diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index 88678cb5e5c3..7385c53d61b3 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.exec.Runtimes import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec} -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.{ArrowAbiUtil, Iterators} import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 26d249f9056b..23dff990c464 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala index 97ebb932ac05..c358d6372c36 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala @@ -20,7 +20,7 @@ import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.datasource.DatasourceJniWrapper import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.datasource.GlutenRowSplitter -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.{ArrowAbiUtil, DatasourceUtil} diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index c5bd8874853a..d72977f59714 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -18,7 +18,7 @@ package org.apache.spark.shuffle import org.apache.gluten.GlutenConfig import org.apache.gluten.exec.Runtimes -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.ArrowAbiUtil import org.apache.gluten.vectorized._ diff --git a/gluten-data/pom.xml b/gluten-data/pom.xml index 951f53ee83d3..db617112f652 100644 --- a/gluten-data/pom.xml +++ b/gluten-data/pom.xml @@ -137,6 +137,34 @@ compile + + org.apache.arrow + arrow-dataset + ${arrow.version} + + + io.netty + netty-common + + + io.netty + netty-buffer + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + protobuf-java + com.google.protobuf + + + compile + diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index 85ef875d69a2..624428dcba19 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -19,7 +19,7 @@ import org.apache.gluten.exception.GlutenException; import org.apache.gluten.exec.Runtime; import org.apache.gluten.exec.Runtimes; -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators; +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.gluten.memory.nmm.NativeMemoryManager; import org.apache.gluten.utils.ArrowAbiUtil; import org.apache.gluten.utils.ArrowUtil; diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ArrowBufferAllocators.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java similarity index 98% rename from gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ArrowBufferAllocators.java rename to gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java index fcac26d6ebb1..efee20e48b83 100644 --- a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ArrowBufferAllocators.java +++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.memory.arrowalloc; +package org.apache.gluten.memory.arrow.alloc; import org.apache.gluten.memory.memtarget.MemoryTargets; diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ManagedAllocationListener.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java similarity index 98% rename from gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ManagedAllocationListener.java rename to gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java index ded2af60bd3a..a76c0aabee3b 100644 --- a/gluten-data/src/main/java/org/apache/gluten/memory/arrowalloc/ManagedAllocationListener.java +++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.memory.arrowalloc; +package org.apache.gluten.memory.arrow.alloc; import org.apache.gluten.GlutenConfig; import org.apache.gluten.memory.SimpleMemoryUsageRecorder; diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java new file mode 100644 index 000000000000..04a6e0002ade --- /dev/null +++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.memory.arrow.pool; + +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.spark.util.TaskResource; +import org.apache.spark.util.TaskResources; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ArrowNativeMemoryPool implements TaskResource { + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowNativeMemoryPool.class); + + private final NativeMemoryPool arrowPool; + private final ArrowReservationListener listener; + + public ArrowNativeMemoryPool() { + listener = new ArrowReservationListener(TaskResources.getSharedUsage()); + arrowPool = NativeMemoryPool.createListenable(listener); + } + + public static NativeMemoryPool arrowPool(String name) { + if (!TaskResources.inSparkTask()) { + throw new IllegalStateException("This method must be called in a Spark task."); + } + String id = "ArrowNativeMemoryPool:" + name; + return TaskResources.addResourceIfNotRegistered(id, () -> createArrowNativeMemoryPool(name)) + .getArrowPool(); + } + + private static ArrowNativeMemoryPool createArrowNativeMemoryPool(String name) { + return new ArrowNativeMemoryPool(); + } + + @Override + public void release() throws Exception { + if (arrowPool.getBytesAllocated() != 0) { + LOGGER.warn( + String.format( + "Arrow pool still reserved non-zero bytes, " + + "which may cause memory leak, size: %s. ", + Utils.bytesToString(arrowPool.getBytesAllocated()))); + } + arrowPool.close(); + } + + @Override + public int priority() { + return 0; + } + + @Override + public String resourceName() { + return "arrow_mem"; + } + + public NativeMemoryPool getArrowPool() { + return arrowPool; + } +} diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java new file mode 100644 index 000000000000..2e1a254453f2 --- /dev/null +++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowReservationListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.memory.arrow.pool; + +import org.apache.gluten.memory.SimpleMemoryUsageRecorder; + +public class ArrowReservationListener implements org.apache.arrow.dataset.jni.ReservationListener { + private final SimpleMemoryUsageRecorder sharedUsage; // shared task metrics + + public ArrowReservationListener(SimpleMemoryUsageRecorder recorder) { + this.sharedUsage = recorder; + } + + @Override + public void reserve(long size) { + synchronized (this) { + sharedUsage.inc(size); + } + } + + @Override + public void unreserve(long size) { + synchronized (this) { + sharedUsage.inc(-size); + } + } +} diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java index 4603cbc4187d..dfd570debc0a 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java @@ -16,7 +16,7 @@ */ package org.apache.gluten.vectorized; -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators; +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.BigIntVector; diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java index 85b24166ebf5..bd89f62a1806 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java @@ -18,7 +18,7 @@ import org.apache.gluten.columnarbatch.ColumnarBatchJniWrapper; import org.apache.gluten.columnarbatch.ColumnarBatches; -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators; +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.spark.sql.vectorized.ColumnarBatch; diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala b/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala index 1a5ae42777ff..4ffb3ab5cdcc 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/utils/ImplicitClass.scala @@ -17,7 +17,7 @@ package org.apache.gluten.utils import org.apache.gluten.columnarbatch.ColumnarBatches -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.vectorized.ArrowWritableColumnVector import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 9fbdb36377e3..e632700e3743 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -18,7 +18,7 @@ package org.apache.gluten.vectorized import org.apache.gluten.GlutenConfig import org.apache.gluten.exec.Runtimes -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.ArrowAbiUtil diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index ac94bfa89d67..9d9f5ab1765c 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.exec.Runtimes -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.{ArrowAbiUtil, Iterators} diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index fd0cf45c8a2a..d3e7b409686a 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.utils import org.apache.gluten.columnarbatch.ColumnarBatches -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.Iterators import org.apache.gluten.vectorized.{ArrowWritableColumnVector, NativeColumnarToRowInfo, NativeColumnarToRowJniWrapper, NativePartitioning} diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala index 208afd294920..3e86be79ac3f 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.utils import org.apache.gluten.columnarbatch.ColumnarBatches -import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.vectorized.ArrowWritableColumnVector import org.apache.spark.sql.vectorized.ColumnarBatch