Skip to content

Commit

Permalink
[GLUTEN-5884][VL] support native data cache spill in Gluten
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed Jun 26, 2024
1 parent 0800596 commit cefd2e5
Show file tree
Hide file tree
Showing 15 changed files with 468 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.init.NativeBackendInitializer
import org.apache.gluten.storage.memory.NativeDataCache
import org.apache.gluten.utils._
import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}

Expand All @@ -30,6 +31,7 @@ import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.storage.memory.{GlutenExtMemStoreInjects, GlutenMemStoreInjects}
import org.apache.spark.util.SparkDirectoryUtil

import org.apache.commons.lang3.StringUtils
Expand All @@ -54,6 +56,14 @@ class VeloxListenerApi extends ListenerApi {

override def onExecutorStart(pc: PluginContext): Unit = {
initialize(pc.conf(), isDriver = false)
if (GlutenMemStoreInjects.getReservationListener() != null) {
NativeDataCache.setAsyncDataCache(
GlutenMemStoreInjects.getMemStoreSize(),
GlutenMemStoreInjects.getReservationListener())
// scalastyle:off println
println("!!!!Data cache is initialized.")
}
// scalastyle:on println
}

override def onExecutorShutdown(): Unit = shutdown()
Expand Down Expand Up @@ -198,6 +208,8 @@ class VeloxListenerApi extends ListenerApi {
GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects())
GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects())
GlutenRowSplitter.setInstance(new VeloxRowSplitter())
// for memStore evict
GlutenMemStoreInjects.setInstance(new GlutenExtMemStoreInjects())
}

private def shutdown(): Unit = {
Expand Down
4 changes: 4 additions & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
file(MAKE_DIRECTORY ${root_directory}/releases)
add_library(gluten SHARED ${SPARK_COLUMNAR_PLUGIN_SRCS})
add_dependencies(gluten jni_proto)
target_include_directories(gluten PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${VELOX_HOME}/
)

if(ENABLE_GLUTEN_VCPKG)
# Hide symbols of some static dependencies. Otherwise, if such dependencies
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ set(VELOX_SRCS
jni/VeloxJniWrapper.cc
jni/JniFileSystem.cc
jni/JniUdf.cc
cache/VeloxListenableAsyncDataCache.cc
memory/BufferOutputStream.cc
memory/VeloxColumnarBatch.cc
memory/VeloxMemoryManager.cc
Expand Down Expand Up @@ -560,6 +561,7 @@ target_include_directories(
PUBLIC ${CMAKE_SYSTEM_INCLUDE_PATH}
${JNI_INCLUDE_DIRS}
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../
${VELOX_HOME}/
${VELOX_BUILD_PATH}/
${VELOX_BUILD_PATH}/_deps/xsimd-src/include/
Expand Down
58 changes: 58 additions & 0 deletions cpp/velox/cache/VeloxListenableAsyncDataCache.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "VeloxListenableAsyncDataCache.h"
#include <iostream>

namespace gluten {

std::shared_ptr<ListenableAsyncDataCache> ListenableAsyncDataCache::create(
facebook::velox::memory::MemoryAllocator* allocator,
gluten::AllocationListener* listener,
std::unique_ptr<facebook::velox::cache::SsdCache> ssdCache) {
auto cache = std::make_shared<ListenableAsyncDataCache>(allocator, listener, std::move(ssdCache));
allocator->registerCache(cache);
return cache;
}

uint64_t ListenableAsyncDataCache::shrink(uint64_t targetBytes) {
std::lock_guard<std::recursive_mutex> l(mutex_);
uint64_t shrinkedSize = facebook::velox::cache::AsyncDataCache::shrink(targetBytes);
listener_->allocationChanged(-shrinkedSize);
return shrinkedSize;
}
facebook::velox::cache::CachePin ListenableAsyncDataCache::findOrCreate(
facebook::velox::cache::RawFileCacheKey key,
uint64_t size,
folly::SemiFuture<bool>* waitFuture) {
bool exists = facebook::velox::cache::AsyncDataCache::exists(key);
if (exists) {
return facebook::velox::cache::AsyncDataCache::findOrCreate(key, size, waitFuture);
} else {
std::lock_guard<std::recursive_mutex> l(mutex_);
listener_->allocationChanged(size);
facebook::velox::cache::CacheStats preStats = facebook::velox::cache::AsyncDataCache::refreshStats();
uint64_t preCacheSize = preStats.tinySize + preStats.largeSize + preStats.tinyPadding + preStats.largePadding;
auto pin = facebook::velox::cache::AsyncDataCache::findOrCreate(key, size, waitFuture);
facebook::velox::cache::CacheStats posStats = facebook::velox::cache::AsyncDataCache::refreshStats();
uint64_t posCacheSize = posStats.tinySize + posStats.largeSize + posStats.tinyPadding + posStats.largePadding;
listener_->allocationChanged(posCacheSize - preCacheSize - size);
return pin;
}
}

} // namespace gluten
48 changes: 48 additions & 0 deletions cpp/velox/cache/VeloxListenableAsyncDataCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "memory/AllocationListener.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdCache.h"

namespace gluten {
class ListenableAsyncDataCache final : public facebook::velox::cache::AsyncDataCache {
public:
ListenableAsyncDataCache(
facebook::velox::memory::MemoryAllocator* allocator,
AllocationListener* listener,
std::unique_ptr<facebook::velox::cache::SsdCache> ssdCache = nullptr)
: facebook::velox::cache::AsyncDataCache(allocator, std::move(ssdCache)) {
listener_ = listener;
}

static std::shared_ptr<ListenableAsyncDataCache> create(
facebook::velox::memory::MemoryAllocator* allocator,
gluten::AllocationListener* listener,
std::unique_ptr<facebook::velox::cache::SsdCache> ssdCache = nullptr);
uint64_t shrink(uint64_t targetBytes) override;
facebook::velox::cache::CachePin
findOrCreate(facebook::velox::cache::RawFileCacheKey key, uint64_t size, folly::SemiFuture<bool>* waitFuture = nullptr) override;

private:
AllocationListener* listener_;
std::recursive_mutex mutex_;
};

} // namespace gluten
53 changes: 30 additions & 23 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include <filesystem>
#include <iostream>

#include "VeloxBackend.h"

Expand Down Expand Up @@ -108,7 +109,7 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem();
initCache();
// initCache();
initConnector();

// Register Velox functions
Expand All @@ -126,27 +127,20 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
facebook::velox::memory::MemoryManager::initialize({});
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
uint64_t VeloxBackend::shrinkAsyncDataCache(uint64_t size) const {
return asyncDataCache_->shrink(size);
}

// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem() {
int64_t maxSpillFileSize = backendConf_->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);

// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
gluten::registerJolFileSystem(maxSpillFileSize);
}

void VeloxBackend::initCache() {
if (backendConf_->get<bool>(kVeloxCacheEnabled, false)) {
void VeloxBackend::setAsyncDataCache(
int64_t memCacheSize,
facebook::velox::memory::MmapAllocator* allocator,
gluten::AllocationListener* listener) {
if (true) {
FLAGS_ssd_odirect = true;

FLAGS_ssd_odirect = backendConf_->get<bool>(kVeloxSsdODirectEnabled, false);

uint64_t memCacheSize = backendConf_->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
// uint64_t memCacheSize = backendConf_->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = backendConf_->get<uint64_t>(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault);
int32_t ssdCacheShards = backendConf_->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = backendConf_->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
Expand All @@ -167,25 +161,38 @@ void VeloxBackend::initCache() {
"free space: " + std::to_string(si.available))
}

velox::memory::MmapAllocator::Options options;
options.capacity = memCacheSize;
cacheAllocator_ = std::make_shared<velox::memory::MmapAllocator>(options);
if (ssdCacheSize == 0) {
LOG(INFO) << "AsyncDataCache will do memory caching only as ssd cache size is 0";
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get());
asyncDataCache_ = gluten::ListenableAsyncDataCache::create(allocator, listener, nullptr);
} else {
// TODO: this is not tracked by Spark.
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd));
asyncDataCache_ = gluten::ListenableAsyncDataCache::create(allocator, listener, std::move(ssd));
// asyncDataCache_ = velox::cache::AsyncDataCache::create(allocator);
}

VELOX_CHECK_NOT_NULL(dynamic_cast<velox::cache::AsyncDataCache*>(asyncDataCache_.get()))
LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize
std::cout << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize
<< ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize
<< ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads;
<< ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads
<< std::endl;
}
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
}

// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem() {
int64_t maxSpillFileSize = backendConf_->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);

// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
gluten::registerJolFileSystem(maxSpillFileSize);
}

void VeloxBackend::initConnector() {
// The configs below are used at process level.
std::unordered_map<std::string, std::string> connectorConfMap = backendConf_->values();
Expand Down
14 changes: 10 additions & 4 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/core/Config.h"
#include "velox/cache/VeloxListenableAsyncDataCache.h"

namespace gluten {
/// As a static instance in per executor, initialized at executor startup.
/// Should not put heavily work here.
class VeloxBackend {
public:
~VeloxBackend() {
if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) {
if (dynamic_cast<gluten::ListenableAsyncDataCache*>(asyncDataCache_.get())) {
LOG(INFO) << asyncDataCache_->toString();
for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) {
if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) {
Expand All @@ -51,8 +52,12 @@ class VeloxBackend {

static VeloxBackend* get();

void setAsyncDataCache(int64_t memCacheSize, facebook::velox::memory::MmapAllocator* allocator, gluten::AllocationListener* listener);

facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const;

uint64_t shrinkAsyncDataCache(uint64_t size) const;

std::shared_ptr<facebook::velox::Config> getBackendConf() const {
return backendConf_;
}
Expand All @@ -70,7 +75,8 @@ class VeloxBackend {
}

void init(const std::unordered_map<std::string, std::string>& conf);
void initCache();
// we create cache late in memory manager initialization
// void initCache();
void initConnector();
void initUdf();

Expand All @@ -83,11 +89,11 @@ class VeloxBackend {
static std::unique_ptr<VeloxBackend> instance_;

// Instance of AsyncDataCache used for all large allocations.
std::shared_ptr<facebook::velox::cache::AsyncDataCache> asyncDataCache_;
std::shared_ptr<gluten::ListenableAsyncDataCache> asyncDataCache_;

std::unique_ptr<folly::IOThreadPoolExecutor> ssdCacheExecutor_;
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_;
// std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_;

std::string cachePathPrefix_;
std::string cacheFilePrefix_;
Expand Down
Loading

0 comments on commit cefd2e5

Please sign in to comment.