diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 81f06478cbb6..061b0bb974f7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -22,6 +22,7 @@ import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter} import org.apache.gluten.expression.UDFMappings import org.apache.gluten.init.NativeBackendInitializer +import org.apache.gluten.storage.memory.NativeDataCache import org.apache.gluten.utils._ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace} @@ -30,6 +31,7 @@ import org.apache.spark.api.plugin.PluginContext import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter} import org.apache.spark.sql.expression.UDFResolver import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf} +import org.apache.spark.storage.memory.{GlutenExtMemStoreInjects, GlutenMemStoreInjects} import org.apache.spark.util.SparkDirectoryUtil import org.apache.commons.lang3.StringUtils @@ -54,6 +56,14 @@ class VeloxListenerApi extends ListenerApi { override def onExecutorStart(pc: PluginContext): Unit = { initialize(pc.conf(), isDriver = false) + if (GlutenMemStoreInjects.getReservationListener() != null) { + NativeDataCache.setAsyncDataCache( + GlutenMemStoreInjects.getMemStoreSize(), + GlutenMemStoreInjects.getReservationListener()) + // scalastyle:off println + println("!!!!Data cache is initialized.") + } + // scalastyle:on println } override def onExecutorShutdown(): Unit = shutdown() @@ -198,6 +208,8 @@ class VeloxListenerApi extends ListenerApi { GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects()) GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects()) GlutenRowSplitter.setInstance(new VeloxRowSplitter()) + // for memStore evict + GlutenMemStoreInjects.setInstance(new GlutenExtMemStoreInjects()) } private def shutdown(): Unit = { diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 4d7c30402985..4ec184bd732f 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -212,6 +212,10 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS file(MAKE_DIRECTORY ${root_directory}/releases) add_library(gluten SHARED ${SPARK_COLUMNAR_PLUGIN_SRCS}) add_dependencies(gluten jni_proto) +target_include_directories(gluten PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} + ${VELOX_HOME}/ +) if(ENABLE_GLUTEN_VCPKG) # Hide symbols of some static dependencies. Otherwise, if such dependencies diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index c2d690a7e055..6ba59df0be1f 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -505,6 +505,7 @@ set(VELOX_SRCS jni/VeloxJniWrapper.cc jni/JniFileSystem.cc jni/JniUdf.cc + cache/VeloxListenableAsyncDataCache.cc memory/BufferOutputStream.cc memory/VeloxColumnarBatch.cc memory/VeloxMemoryManager.cc @@ -560,6 +561,7 @@ target_include_directories( PUBLIC ${CMAKE_SYSTEM_INCLUDE_PATH} ${JNI_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/../ ${VELOX_HOME}/ ${VELOX_BUILD_PATH}/ ${VELOX_BUILD_PATH}/_deps/xsimd-src/include/ diff --git a/cpp/velox/cache/VeloxListenableAsyncDataCache.cc b/cpp/velox/cache/VeloxListenableAsyncDataCache.cc new file mode 100644 index 000000000000..f1fee620b10b --- /dev/null +++ b/cpp/velox/cache/VeloxListenableAsyncDataCache.cc @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "VeloxListenableAsyncDataCache.h" +#include + +namespace gluten { + +std::shared_ptr ListenableAsyncDataCache::create( + facebook::velox::memory::MemoryAllocator* allocator, + gluten::AllocationListener* listener, + std::unique_ptr ssdCache) { + auto cache = std::make_shared(allocator, listener, std::move(ssdCache)); + allocator->registerCache(cache); + return cache; +} + +uint64_t ListenableAsyncDataCache::shrink(uint64_t targetBytes) { + std::lock_guard l(mutex_); + uint64_t shrinkedSize = facebook::velox::cache::AsyncDataCache::shrink(targetBytes); + listener_->allocationChanged(-shrinkedSize); + return shrinkedSize; +} +facebook::velox::cache::CachePin ListenableAsyncDataCache::findOrCreate( + facebook::velox::cache::RawFileCacheKey key, + uint64_t size, + folly::SemiFuture* waitFuture) { + bool exists = facebook::velox::cache::AsyncDataCache::exists(key); + if (exists) { + return facebook::velox::cache::AsyncDataCache::findOrCreate(key, size, waitFuture); + } else { + std::lock_guard l(mutex_); + listener_->allocationChanged(size); + facebook::velox::cache::CacheStats preStats = facebook::velox::cache::AsyncDataCache::refreshStats(); + uint64_t preCacheSize = preStats.tinySize + preStats.largeSize + preStats.tinyPadding + preStats.largePadding; + auto pin = facebook::velox::cache::AsyncDataCache::findOrCreate(key, size, waitFuture); + facebook::velox::cache::CacheStats posStats = facebook::velox::cache::AsyncDataCache::refreshStats(); + uint64_t posCacheSize = posStats.tinySize + posStats.largeSize + posStats.tinyPadding + posStats.largePadding; + listener_->allocationChanged(posCacheSize - preCacheSize - size); + return pin; + } +} + +} // namespace gluten diff --git a/cpp/velox/cache/VeloxListenableAsyncDataCache.h b/cpp/velox/cache/VeloxListenableAsyncDataCache.h new file mode 100644 index 000000000000..40ea8683e247 --- /dev/null +++ b/cpp/velox/cache/VeloxListenableAsyncDataCache.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "memory/AllocationListener.h" +#include "velox/common/caching/AsyncDataCache.h" +#include "velox/common/caching/SsdCache.h" + +namespace gluten { +class ListenableAsyncDataCache final : public facebook::velox::cache::AsyncDataCache { + public: + ListenableAsyncDataCache( + facebook::velox::memory::MemoryAllocator* allocator, + AllocationListener* listener, + std::unique_ptr ssdCache = nullptr) + : facebook::velox::cache::AsyncDataCache(allocator, std::move(ssdCache)) { + listener_ = listener; + } + + static std::shared_ptr create( + facebook::velox::memory::MemoryAllocator* allocator, + gluten::AllocationListener* listener, + std::unique_ptr ssdCache = nullptr); + uint64_t shrink(uint64_t targetBytes) override; + facebook::velox::cache::CachePin + findOrCreate(facebook::velox::cache::RawFileCacheKey key, uint64_t size, folly::SemiFuture* waitFuture = nullptr) override; + + private: + AllocationListener* listener_; + std::recursive_mutex mutex_; +}; + +} // namespace gluten diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 1ec5879966d6..8b8101c9056c 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -15,6 +15,7 @@ * limitations under the License. */ #include +#include #include "VeloxBackend.h" @@ -108,7 +109,7 @@ void VeloxBackend::init(const std::unordered_map& conf // Setup and register. velox::filesystems::registerLocalFileSystem(); initJolFilesystem(); - initCache(); + // initCache(); initConnector(); // Register Velox functions @@ -126,27 +127,20 @@ void VeloxBackend::init(const std::unordered_map& conf facebook::velox::memory::MemoryManager::initialize({}); } -facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { - return asyncDataCache_.get(); +uint64_t VeloxBackend::shrinkAsyncDataCache(uint64_t size) const { + return asyncDataCache_->shrink(size); } -// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces -void VeloxBackend::initJolFilesystem() { - int64_t maxSpillFileSize = backendConf_->get(kMaxSpillFileSize, kMaxSpillFileSizeDefault); - - // FIXME It's known that if spill compression is disabled, the actual spill file size may - // in crease beyond this limit a little (maximum 64 rows which is by default - // one compression page) - gluten::registerJolFileSystem(maxSpillFileSize); -} - -void VeloxBackend::initCache() { - if (backendConf_->get(kVeloxCacheEnabled, false)) { +void VeloxBackend::setAsyncDataCache( + int64_t memCacheSize, + facebook::velox::memory::MmapAllocator* allocator, + gluten::AllocationListener* listener) { + if (true) { FLAGS_ssd_odirect = true; FLAGS_ssd_odirect = backendConf_->get(kVeloxSsdODirectEnabled, false); - uint64_t memCacheSize = backendConf_->get(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault); + // uint64_t memCacheSize = backendConf_->get(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault); uint64_t ssdCacheSize = backendConf_->get(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault); int32_t ssdCacheShards = backendConf_->get(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault); int32_t ssdCacheIOThreads = backendConf_->get(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault); @@ -167,25 +161,38 @@ void VeloxBackend::initCache() { "free space: " + std::to_string(si.available)) } - velox::memory::MmapAllocator::Options options; - options.capacity = memCacheSize; - cacheAllocator_ = std::make_shared(options); if (ssdCacheSize == 0) { LOG(INFO) << "AsyncDataCache will do memory caching only as ssd cache size is 0"; // TODO: this is not tracked by Spark. - asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get()); + asyncDataCache_ = gluten::ListenableAsyncDataCache::create(allocator, listener, nullptr); } else { // TODO: this is not tracked by Spark. - asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd)); + asyncDataCache_ = gluten::ListenableAsyncDataCache::create(allocator, listener, std::move(ssd)); + // asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator); } VELOX_CHECK_NOT_NULL(dynamic_cast(asyncDataCache_.get())) - LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize + std::cout << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize << ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize - << ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads; + << ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads + << std::endl; } } +facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { + return asyncDataCache_.get(); +} + +// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces +void VeloxBackend::initJolFilesystem() { + int64_t maxSpillFileSize = backendConf_->get(kMaxSpillFileSize, kMaxSpillFileSizeDefault); + + // FIXME It's known that if spill compression is disabled, the actual spill file size may + // in crease beyond this limit a little (maximum 64 rows which is by default + // one compression page) + gluten::registerJolFileSystem(maxSpillFileSize); +} + void VeloxBackend::initConnector() { // The configs below are used at process level. std::unordered_map connectorConfMap = backendConf_->values(); diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index e8298eeed192..6e836d5b697b 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -28,6 +28,7 @@ #include "velox/common/memory/MemoryPool.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/core/Config.h" +#include "velox/cache/VeloxListenableAsyncDataCache.h" namespace gluten { /// As a static instance in per executor, initialized at executor startup. @@ -35,7 +36,7 @@ namespace gluten { class VeloxBackend { public: ~VeloxBackend() { - if (dynamic_cast(asyncDataCache_.get())) { + if (dynamic_cast(asyncDataCache_.get())) { LOG(INFO) << asyncDataCache_->toString(); for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) { if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) { @@ -51,8 +52,12 @@ class VeloxBackend { static VeloxBackend* get(); + void setAsyncDataCache(int64_t memCacheSize, facebook::velox::memory::MmapAllocator* allocator, gluten::AllocationListener* listener); + facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const; + uint64_t shrinkAsyncDataCache(uint64_t size) const; + std::shared_ptr getBackendConf() const { return backendConf_; } @@ -70,7 +75,8 @@ class VeloxBackend { } void init(const std::unordered_map& conf); - void initCache(); + // we create cache late in memory manager initialization + // void initCache(); void initConnector(); void initUdf(); @@ -83,11 +89,11 @@ class VeloxBackend { static std::unique_ptr instance_; // Instance of AsyncDataCache used for all large allocations. - std::shared_ptr asyncDataCache_; + std::shared_ptr asyncDataCache_; std::unique_ptr ssdCacheExecutor_; std::unique_ptr ioExecutor_; - std::shared_ptr cacheAllocator_; + // std::shared_ptr cacheAllocator_; std::string cachePathPrefix_; std::string cacheFilePrefix_; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 3b52eaa86b2f..285cc99343ba 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -16,6 +16,7 @@ */ #include +#include #include #include @@ -37,6 +38,11 @@ using namespace facebook; +static jclass javaReservationListenerClass; + +static jmethodID reserveMemoryMethod; +static jmethodID unreserveMemoryMethod; + #ifdef __cplusplus extern "C" { #endif @@ -46,7 +52,13 @@ jint JNI_OnLoad(JavaVM* vm, void*) { if (vm->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { return JNI_ERR; } - + javaReservationListenerClass = createGlobalClassReference( + env, + "Lorg/apache/spark/storage/memory/" + "ExtMemStoreReservationListener;"); + reserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "reserve", "(J)J"); + unreserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "unreserve", "(J)J"); + gluten::getJniCommonState()->ensureInitialized(env); gluten::getJniErrorState()->ensureInitialized(env); gluten::initVeloxJniFileSystem(env); @@ -86,6 +98,38 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_shut JNI_METHOD_END() } +JNIEXPORT void JNICALL Java_org_apache_gluten_storage_memory_NativeDataCache_setAsyncDataCache( // NOLINT + JNIEnv* env, + jclass, + jlong size, + jobject jlistener) { + JNI_METHOD_START + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + throw gluten::GlutenException("Unable to get JavaVM instance"); + } + gluten::AllocationListener* listener = + new SparkAllocationListener(vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod); + velox::memory::MmapAllocator::Options options; + options.capacity = size; + facebook::velox::memory::MmapAllocator* cacheAllocator = new facebook::velox::memory::MmapAllocator(options); + gluten::VeloxBackend::get()->setAsyncDataCache(size, cacheAllocator, listener); + JNI_METHOD_END() +} + +// TODO: move to dedicate class +JNIEXPORT jlong JNICALL Java_org_apache_gluten_storage_memory_NativeDataCache_shrinkAsyncDataCache( // NOLINT + JNIEnv* env, + jclass, + jlong size) { + JNI_METHOD_START + static std::mutex mtx; + std::lock_guard lock{mtx}; + uint64_t shrinkedSize = gluten::VeloxBackend::get()->shrinkAsyncDataCache(static_cast(size)); + return static_cast(shrinkedSize); + JNI_METHOD_END(gluten::kInvalidResourceHandle) +} + JNIEXPORT void JNICALL Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT JNIEnv* env, jclass) { diff --git a/package/pom.xml b/package/pom.xml index ab87e14805ff..62d1c111987d 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -275,6 +275,7 @@ org.apache.spark.sql.execution.columnar.ByteBufferHelper$ org.apache.spark.rdd.EmptyRDD + org.apache.spark.storage.memory.ExternalMemoryStore org.apache.spark.sql.hive.execution.HiveFileFormat org.apache.spark.sql.hive.execution.HiveFileFormat$$$$anon$1 org.apache.spark.sql.hive.execution.HiveOutputWriter diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 89933cc58a4d..7b900b99d655 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -1306,12 +1306,12 @@ object GlutenConfig { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1GB") - val COLUMNAR_VELOX_MEM_INIT_CAPACITY = - buildConf("spark.gluten.sql.columnar.backend.velox.memInitCapacity") - .internal() - .doc("The initial memory capacity to reserve for a newly created Velox query memory pool.") - .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("8MB") +// val COLUMNAR_VELOX_MEM_INIT_CAPACITY = +// buildConf("spark.gluten.sql.columnar.backend.velox.memInitCapacity") +// .internal() +// .doc("The initial memory capacity to reserve for a newly created Velox query memory pool.") +// .bytesConf(ByteUnit.BYTE) +// .createWithDefaultString("8MB") val COLUMNAR_VELOX_SSD_CACHE_PATH = buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath") diff --git a/shims/common/src/main/scala/org/apache/gluten/storage/memory/NativeDataCache.java b/shims/common/src/main/scala/org/apache/gluten/storage/memory/NativeDataCache.java new file mode 100644 index 000000000000..c8571a36e409 --- /dev/null +++ b/shims/common/src/main/scala/org/apache/gluten/storage/memory/NativeDataCache.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.storage.memory; + +import org.apache.spark.storage.memory.ExtMemStoreReservationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public final class NativeDataCache { + private static final Logger LOG = LoggerFactory.getLogger(NativeDataCache.class); + + public static native void setAsyncDataCache( + long size, ExtMemStoreReservationListener listener); + + public static native long shrinkAsyncDataCache(long size); +} diff --git a/shims/common/src/main/scala/org/apache/spark/storage/memory/ExtMemStoreReservationListener.java b/shims/common/src/main/scala/org/apache/spark/storage/memory/ExtMemStoreReservationListener.java new file mode 100644 index 000000000000..1591f97f1c8b --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/storage/memory/ExtMemStoreReservationListener.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Reserve Spark managed memory. */ +public class ExtMemStoreReservationListener { + + private static final Logger LOG = + LoggerFactory.getLogger(ExtMemStoreReservationListener.class); + + private final ExternalMemoryStore extMemoryStore; + + public ExtMemStoreReservationListener(ExternalMemoryStore extMemoryStore) { + this.extMemoryStore = extMemoryStore; + } + + public long reserve(long size) { + synchronized (this) { + try { + System.out.println("*******Data cache tries to acquire " + size + " bytes memory."); + // throw new RuntimeException("Failed acquire enough memory from storage memory pool"); + boolean success = extMemoryStore.acquireStorageMemory(size); + if (success) { + return size; + } else { + throw new RuntimeException("Failed acquire enough memory from storage memory pool"); + } + } catch (Exception e) { + LOG.error("Error reserving memory from native memory store", e); + throw e; + } + } + } + + public long unreserve(long size) { + synchronized (this) { + System.out.println("*******Data cache tries to release " + size + " bytes memory."); + extMemoryStore.releaseStorageMemory(size); + return size; + } + } +} diff --git a/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenExtMemStoreInjects.scala b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenExtMemStoreInjects.scala new file mode 100644 index 000000000000..3a4e36a87cf6 --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenExtMemStoreInjects.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory + +import org.apache.gluten.storage.memory.NativeDataCache + +class GlutenExtMemStoreInjects { + def initExternalCache(size: Long, memoryStore: ExternalMemoryStore): Unit = { + // scalastyle:off println + println("*******initializing data cache through injects.") + // scalastyle:on println + val reservationListener = new ExtMemStoreReservationListener(memoryStore) + NativeDataCache.setAsyncDataCache(size, reservationListener) + } + def evictEntriesToFreeSpace(spaceToFree: Long): Long = { + // scalastyle:off println + println("*******Will shrink data cache with target size" + spaceToFree + " bytes.") + // scalastyle:on println + val shrinkedSize = NativeDataCache.shrinkAsyncDataCache(spaceToFree) + // scalastyle:off println + println("*******Shrinked size: " + shrinkedSize + " bytes.") + // scalastyle:on println + shrinkedSize + } +} diff --git a/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenMemStoreInjects.scala b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenMemStoreInjects.scala new file mode 100644 index 000000000000..c65038c0ce9a --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/storage/memory/GlutenMemStoreInjects.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory + +object GlutenMemStoreInjects { + private var INSTANCE: GlutenExtMemStoreInjects = _ + private var memStoreSize: Long = _ + private var reservationListener: ExtMemStoreReservationListener = _ + + def setInstance(instance: GlutenExtMemStoreInjects): Unit = { + INSTANCE = instance + } + def getInstance(): GlutenExtMemStoreInjects = { + if (INSTANCE == null) { + throw new IllegalStateException("GlutenExtMemStoreInjects is not initialized") +// INSTANCE = new GlutenExtMemStoreInjects() + } + INSTANCE + } + + def setMemStoreSize(size: Long): Unit = { + memStoreSize = size; + } + + def getMemStoreSize(): Long = { + memStoreSize + } + + def setReservationListener(listener: ExtMemStoreReservationListener): Unit = { + reservationListener = listener + } + + def getReservationListener(): ExtMemStoreReservationListener = { + reservationListener + } +} diff --git a/shims/spark33/src/main/scala/org/apache/spark/storage/memory/ExternalMemoryStore.scala b/shims/spark33/src/main/scala/org/apache/spark/storage/memory/ExternalMemoryStore.scala new file mode 100644 index 000000000000..a00b5fec02be --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/storage/memory/ExternalMemoryStore.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage.memory + +import org.apache.gluten.GlutenConfig + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_SIZE, MEMORY_STORAGE_FRACTION, STORAGE_MEMORY_EVICT_PREFERENCE} +import org.apache.spark.memory.{MemoryManager, MemoryMode} +import org.apache.spark.storage.BlockId + +private[spark] class ExternalMemoryStore(conf: SparkConf, memoryManager: MemoryManager) + extends Logging { + + // Note: all changes to memory allocations, notably evicting entries and + // acquiring memory, must be synchronized on `memoryManager`! + + private[spark] def initExternalCache(conf: SparkConf): Unit = { + val reservationListener = new ExtMemStoreReservationListener(this) + if (GlutenConfig.getConf.enableVeloxCache) { + // scalastyle:off println + println("*******Will initializing data cache late in velox backend") + // scalastyle:on println + val size = (conf.get(MEMORY_OFFHEAP_SIZE) * conf.get(MEMORY_STORAGE_FRACTION) * 1.1).toLong + GlutenMemStoreInjects.setMemStoreSize(size) + GlutenMemStoreInjects.setReservationListener(reservationListener) + } + } + + // This method will call memoryManager.acquireStorageMemory() + // NativeStorageMemoryListener will connect this method with velox memory allocator.allocate() + private[spark] def acquireStorageMemory(numBytes: Long): Boolean = memoryManager.synchronized { + // scalastyle:off println + println("*******acquireStorageMemory in NativeMemoryStore") + // scalastyle:on println + memoryManager.acquireStorageMemory(BlockId("test_file_cache"), numBytes, MemoryMode.OFF_HEAP) + } + // We don't need handle increase spaceToFree in storage memory pool in this method + // as releaseStorageMemory() will be automatically triggered during eviction + private[spark] def evictEntriesToFreeSpace(spaceToFree: Long): Long = memoryManager.synchronized { + if (GlutenConfig.getConf.enableVeloxCache) { + // scalastyle:off println + println("*******trying to evict data cache: " + spaceToFree) + // scalastyle:on println + GlutenMemStoreInjects.getInstance().evictEntriesToFreeSpace(spaceToFree) + } else { + 0L + } + } + // This method will call memoryManager.releaseStorageMemory() + // NativeStorageMemoryListener will connect this method with velox memory allocator.free() + private[spark] def releaseStorageMemory(numBytes: Long): Unit = memoryManager.synchronized { + // scalastyle:off println + println("*******releaseStorageMemory in NativeMemoryStore") + // scalastyle:on println + memoryManager.releaseStorageMemory(numBytes, MemoryMode.OFF_HEAP) + } +}