diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 4b9ec739028f..ac9c633bcdd2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -122,7 +122,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo = { + metadataColumnNames: Seq[String], + properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => val partLists = new JArrayList[String]() @@ -183,7 +184,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionColumns, new JArrayList[JMap[String, String]](), fileFormat, - preferredLocations.toList.asJava + preferredLocations.toList.asJava, + mapAsJavaMap(properties) ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition: $partition.") @@ -209,7 +211,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { split match { case filesNode: LocalFilesNode => setFileSchemaForLocalFiles(filesNode, scans(i)) - filesNode.setFileReadProperties(mapAsJavaMap(scans(i).getProperties)) filesNode.getPaths.forEach(f => files += f) filesNode.toProtobuf.toByteArray case extensionTableNode: ExtensionTableNode => diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 613e539456ec..cc3cabb7c5e8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -54,7 +54,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo = { + metadataColumnNames: Seq[String], + properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => val ( @@ -78,7 +79,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionColumns, metadataColumns, fileFormat, - preferredLocations.toList.asJava) + preferredLocations.toList.asJava, + mapAsJavaMap(properties) + ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index d5c9f2b3b18b..1d784f3a5eda 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -362,6 +362,10 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co } } +/* +NOTE: the class must be thread safe + */ + class SparkAllocationListener final : public gluten::AllocationListener { public: SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID jReserveMethod, jmethodID jUnreserveMethod) @@ -399,25 +403,34 @@ class SparkAllocationListener final : public gluten::AllocationListener { env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size); checkException(env); } - bytesReserved_ += size; - maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_); + usedBytes_ += size; + while (true) { + int64_t savedPeakBytes = peakBytes_; + if (usedBytes_ <= savedPeakBytes) { + break; + } + // usedBytes_ > savedPeakBytes, update peak + if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) { + break; + } + } } int64_t currentBytes() override { - return bytesReserved_; + return usedBytes_; } int64_t peakBytes() override { - return maxBytesReserved_; + return peakBytes_; } private: JavaVM* vm_; jobject jListenerGlobalRef_; - jmethodID jReserveMethod_; - jmethodID jUnreserveMethod_; - int64_t bytesReserved_ = 0L; - int64_t maxBytesReserved_ = 0L; + const jmethodID jReserveMethod_; + const jmethodID jUnreserveMethod_; + std::atomic_int64_t usedBytes_{0L}; + std::atomic_int64_t peakBytes_{0L}; }; class BacktraceAllocationListener final : public gluten::AllocationListener { diff --git a/cpp/core/memory/AllocationListener.h b/cpp/core/memory/AllocationListener.h index d43a621de9a0..41797641fe14 100644 --- a/cpp/core/memory/AllocationListener.h +++ b/cpp/core/memory/AllocationListener.h @@ -19,6 +19,7 @@ #include #include +#include namespace gluten { @@ -46,29 +47,21 @@ class AllocationListener { }; /// Memory changes will be round to specified block size which aim to decrease delegated listener calls. +// The class must be thread safe class BlockAllocationListener final : public AllocationListener { public: - BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize) + BlockAllocationListener(AllocationListener* delegated, int64_t blockSize) : delegated_(delegated), blockSize_(blockSize) {} void allocationChanged(int64_t diff) override { if (diff == 0) { return; } - if (diff > 0) { - if (reservationBytes_ - usedBytes_ < diff) { - auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_; - delegated_->allocationChanged(roundSize); - reservationBytes_ += roundSize; - peakBytes_ = std::max(peakBytes_, reservationBytes_); - } - usedBytes_ += diff; - } else { - usedBytes_ += diff; - auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_; - delegated_->allocationChanged(-unreservedSize); - reservationBytes_ -= unreservedSize; + int64_t granted = reserve(diff); + if (granted == 0) { + return; } + delegated_->allocationChanged(granted); } int64_t currentBytes() override { @@ -80,11 +73,30 @@ class BlockAllocationListener final : public AllocationListener { } private: - AllocationListener* delegated_; - uint64_t blockSize_{0L}; - uint64_t usedBytes_{0L}; - uint64_t peakBytes_{0L}; - uint64_t reservationBytes_{0L}; + inline int64_t reserve(int64_t diff) { + std::lock_guard lock(mutex_); + usedBytes_ += diff; + int64_t newBlockCount; + if (usedBytes_ == 0) { + newBlockCount = 0; + } else { + // ceil to get the required block number + newBlockCount = (usedBytes_ - 1) / blockSize_ + 1; + } + int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_; + blocksReserved_ = newBlockCount; + peakBytes_ = std::max(peakBytes_, usedBytes_); + return bytesGranted; + } + + AllocationListener* const delegated_; + const uint64_t blockSize_; + int64_t blocksReserved_{0L}; + int64_t usedBytes_{0L}; + int64_t peakBytes_{0L}; + int64_t reservationBytes_{0L}; + + mutable std::mutex mutex_; }; } // namespace gluten diff --git a/cpp/core/memory/MemoryAllocator.cc b/cpp/core/memory/MemoryAllocator.cc index ac869219d5c1..c637c6a9c13d 100644 --- a/cpp/core/memory/MemoryAllocator.cc +++ b/cpp/core/memory/MemoryAllocator.cc @@ -93,7 +93,16 @@ int64_t ListenableMemoryAllocator::peakBytes() const { void ListenableMemoryAllocator::updateUsage(int64_t size) { listener_->allocationChanged(size); usedBytes_ += size; - peakBytes_ = std::max(peakBytes_, usedBytes_); + while (true) { + int64_t savedPeakBytes = peakBytes_; + if (usedBytes_ <= savedPeakBytes) { + break; + } + // usedBytes_ > savedPeakBytes, update peak + if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) { + break; + } + } } bool StdMemoryAllocator::allocate(int64_t size, void** out) { diff --git a/cpp/core/memory/MemoryAllocator.h b/cpp/core/memory/MemoryAllocator.h index bc8f9de1815e..01271cc94673 100644 --- a/cpp/core/memory/MemoryAllocator.h +++ b/cpp/core/memory/MemoryAllocator.h @@ -45,6 +45,7 @@ class MemoryAllocator { virtual int64_t peakBytes() const = 0; }; +// The class must be thread safe class ListenableMemoryAllocator final : public MemoryAllocator { public: explicit ListenableMemoryAllocator(MemoryAllocator* delegated, AllocationListener* listener) @@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public MemoryAllocator { private: void updateUsage(int64_t size); - MemoryAllocator* delegated_; - AllocationListener* listener_; - uint64_t usedBytes_{0L}; - uint64_t peakBytes_{0L}; + MemoryAllocator* const delegated_; + AllocationListener* const listener_; + std::atomic_int64_t usedBytes_{0L}; + std::atomic_int64_t peakBytes_{0L}; }; class StdMemoryAllocator final : public MemoryAllocator { diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 8439545ca382..c417202b9da1 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -210,28 +210,25 @@ std::shared_ptr WholeStageResultIterator::next() { } namespace { -class ConditionalSuspendedSection { +class SuspendedSection { public: - ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) { - if (condition) { - section_ = new velox::exec::SuspendedSection(driver); - } + SuspendedSection() { + reclaimer_->enterArbitration(); } - virtual ~ConditionalSuspendedSection() { - if (section_) { - delete section_; - } + virtual ~SuspendedSection() { + reclaimer_->leaveArbitration(); } // singleton - ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete; - ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete; - ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = delete; - ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = delete; + SuspendedSection(const SuspendedSection&) = delete; + SuspendedSection(SuspendedSection&&) = delete; + SuspendedSection& operator=(const SuspendedSection&) = delete; + SuspendedSection& operator=(SuspendedSection&&) = delete; private: - velox::exec::SuspendedSection* section_ = nullptr; + // We only use suspension APIs in exec::MemoryReclaimer. + std::unique_ptr reclaimer_{velox::exec::MemoryReclaimer::create()}; }; } // namespace @@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { if (spillStrategy_ == "auto") { int64_t remaining = size - shrunken; LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes..."; - // if we are on one of the driver of the spilled task, suspend it - velox::exec::Driver* thisDriver = nullptr; - task_->testingVisitDrivers([&](velox::exec::Driver* driver) { - if (driver->isOnThread()) { - thisDriver = driver; - } - }); // suspend the driver when we are on it - ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr); + SuspendedSection suspender; velox::exec::MemoryReclaimer::Stats status; auto* mm = memoryManager_->getMemoryManager(); uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling diff --git a/cpp/velox/memory/VeloxMemoryManager.h b/cpp/velox/memory/VeloxMemoryManager.h index 3607ca793f3e..7a96b87e16be 100644 --- a/cpp/velox/memory/VeloxMemoryManager.h +++ b/cpp/velox/memory/VeloxMemoryManager.h @@ -25,6 +25,7 @@ namespace gluten { +// Make sure the class is thread safe class VeloxMemoryManager final : public MemoryManager { public: VeloxMemoryManager(std::unique_ptr listener); diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java index 7e085f81f4e6..a58f5e043503 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java @@ -32,7 +32,8 @@ public static LocalFilesNode makeLocalFiles( List> partitionColumns, List> metadataColumns, LocalFilesNode.ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map properties) { return new LocalFilesNode( index, paths, @@ -43,7 +44,8 @@ public static LocalFilesNode makeLocalFiles( partitionColumns, metadataColumns, fileFormat, - preferredLocations); + preferredLocations, + properties); } public static LocalFilesNode makeLocalFiles(String iterPath) { diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index fa9f3d51612b..172a6e8cca69 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -67,7 +67,8 @@ public enum ReadFileFormat { List> partitionColumns, List> metadataColumns, ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map properties) { this.index = index; this.paths.addAll(paths); this.starts.addAll(starts); @@ -78,6 +79,7 @@ public enum ReadFileFormat { this.partitionColumns.addAll(partitionColumns); this.metadataColumns.addAll(metadataColumns); this.preferredLocations.addAll(preferredLocations); + this.fileReadProperties = properties; } LocalFilesNode(String iterPath) { @@ -109,10 +111,6 @@ private NamedStruct buildNamedStruct() { return namedStructBuilder.build(); } - public void setFileReadProperties(Map fileReadProperties) { - this.fileReadProperties = fileReadProperties; - } - @Override public List preferredLocations() { return this.preferredLocations; diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 53dc8f47861f..ee89d3d4616f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -36,7 +36,8 @@ trait IteratorApi { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo + metadataColumnNames: Seq[String], + properties: Map[String, String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 64071fb14c0c..d7ffe237e4a1 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -70,7 +70,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance - .genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name))) + .genSplitInfo( + _, + getPartitionSchema, + fileFormat, + getMetadataColumns.map(_.name), + getProperties)) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java index ba6b0ac4a029..398bdbdb5f03 100644 --- a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java @@ -22,6 +22,7 @@ import org.apache.iceberg.DeleteFile; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +48,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode { partitionColumns, new ArrayList<>(), fileFormat, - preferredLocations); + preferredLocations, + new HashMap<>()); this.deleteFilesList = deleteFilesList; }