diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 55b0a7797780e..fb501dc9acca8 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -64,7 +64,9 @@ class Runtime : public std::enable_shared_from_this { const std::unordered_map& sessionConf = {}); static void release(Runtime*); - Runtime(const std::unordered_map& confMap) : confMap_(confMap) {} + Runtime(std::shared_ptr memoryManager, const std::unordered_map& confMap) + : memoryManager_(memoryManager), confMap_(confMap) {} + virtual ~Runtime() = default; virtual void parsePlan(const uint8_t* data, int32_t size, std::optional dumpFile) = 0; @@ -89,7 +91,9 @@ class Runtime : public std::enable_shared_from_this { virtual std::shared_ptr select(std::shared_ptr, std::vector) = 0; - virtual MemoryManager* memoryManager() = 0; + virtual MemoryManager* memoryManager() { + return memoryManager_.get(); + }; /// This function is used to create certain converter from the format used by /// the backend to Spark unsafe row. @@ -129,6 +133,7 @@ class Runtime : public std::enable_shared_from_this { } protected: + std::shared_ptr memoryManager_; std::unique_ptr objStore_ = ObjectStore::create(); std::unordered_map confMap_; // Session conf map diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index bfc87985cbdcf..b560b0770079d 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -59,9 +59,9 @@ namespace gluten { VeloxRuntime::VeloxRuntime( std::unique_ptr listener, const std::unordered_map& confMap) - : Runtime(confMap), listener_(std::move(listener)) { + : Runtime(std::make_shared(std::move(listener)), confMap) { // Refresh session config. - memoryManager_ = std::make_shared(std::move(listener_)); + vmm_ = dynamic_cast(memoryManager_.get()); veloxCfg_ = std::make_shared(confMap_); debugModeEnabled_ = veloxCfg_->get(kDebugModeEnabled, false); FLAGS_minloglevel = veloxCfg_->get(kGlogSeverityLevel, FLAGS_minloglevel); @@ -132,7 +132,7 @@ void VeloxRuntime::injectWriteFilesTempPath(const std::string& path) { } VeloxMemoryManager* VeloxRuntime::memoryManager() { - return memoryManager_.get(); + return vmm_; } std::shared_ptr VeloxRuntime::createResultIterator( @@ -141,8 +141,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( const std::unordered_map& sessionConf) { LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << printConfig(confMap_); - VeloxPlanConverter veloxPlanConverter( - inputs, memoryManager_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_); + VeloxPlanConverter veloxPlanConverter(inputs, vmm_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); // Scan node can be required. @@ -154,12 +153,12 @@ std::shared_ptr VeloxRuntime::createResultIterator( getInfoAndIds(veloxPlanConverter.splitInfos(), veloxPlan_->leafPlanNodeIds(), scanInfos, scanIds, streamIds); auto wholestageIter = std::make_unique( - memoryManager_.get(), veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_); + vmm_.get(), veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_); return std::make_shared(std::move(wholestageIter), this); } std::shared_ptr VeloxRuntime::createColumnar2RowConverter() { - auto veloxPool = memoryManager_->getLeafMemoryPool(); + auto veloxPool = vmm_->getLeafMemoryPool(); return std::make_shared(veloxPool); } @@ -175,14 +174,14 @@ std::shared_ptr VeloxRuntime::createOrGetEmptySchemaBatch(int32_t std::shared_ptr VeloxRuntime::select( std::shared_ptr batch, std::vector columnIndices) { - auto veloxPool = memoryManager_->getLeafMemoryPool(); + auto veloxPool = vmm_->getLeafMemoryPool(); auto veloxBatch = gluten::VeloxColumnarBatch::from(veloxPool.get(), batch); auto outputBatch = veloxBatch->select(veloxPool.get(), std::move(columnIndices)); return outputBatch; } std::shared_ptr VeloxRuntime::createRow2ColumnarConverter(struct ArrowSchema* cSchema) { - auto veloxPool = memoryManager_->getLeafMemoryPool(); + auto veloxPool = vmm_->getLeafMemoryPool(); return std::make_shared(cSchema, veloxPool); } @@ -190,8 +189,8 @@ std::shared_ptr VeloxRuntime::createShuffleWriter( int numPartitions, std::unique_ptr partitionWriter, ShuffleWriterOptions options) { - auto veloxPool = memoryManager_->getLeafMemoryPool(); - auto arrowPool = memoryManager_->getArrowMemoryPool(); + auto veloxPool = vmm_->getLeafMemoryPool(); + auto arrowPool = vmm_->getArrowMemoryPool(); GLUTEN_ASSIGN_OR_THROW( std::shared_ptr shuffleWriter, VeloxShuffleWriter::create( @@ -208,10 +207,10 @@ std::shared_ptr VeloxRuntime::createDatasource( const std::string& filePath, std::shared_ptr schema) { static std::atomic_uint32_t id{0UL}; - auto veloxPool = memoryManager_->getAggregateMemoryPool()->addAggregateChild("datasource." + std::to_string(id++)); + auto veloxPool = vmm_->getAggregateMemoryPool()->addAggregateChild("datasource." + std::to_string(id++)); // Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool // with parquet writer. - auto sinkPool = memoryManager_->getLeafMemoryPool(); + auto sinkPool = vmm_->getLeafMemoryPool(); if (isSupportedHDFSPath(filePath)) { #ifdef ENABLE_HDFS return std::make_shared(filePath, veloxPool, sinkPool, schema); @@ -249,7 +248,7 @@ std::shared_ptr VeloxRuntime::createShuffleReader( ShuffleReaderOptions options) { auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend); - auto ctxVeloxPool = memoryManager_->getLeafMemoryPool(); + auto ctxVeloxPool = vmm_->getLeafMemoryPool(); auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr); auto deserializerFactory = std::make_unique( schema, @@ -257,7 +256,7 @@ std::shared_ptr VeloxRuntime::createShuffleReader( veloxCompressionType, rowType, options.batchSize, - memoryManager_->getArrowMemoryPool(), + vmm_->getArrowMemoryPool(), ctxVeloxPool, options.shuffleWriterType); auto reader = std::make_shared(std::move(deserializerFactory)); @@ -265,8 +264,8 @@ std::shared_ptr VeloxRuntime::createShuffleReader( } std::unique_ptr VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) { - auto arrowPool = memoryManager_->getArrowMemoryPool(); - auto veloxPool = memoryManager_->getLeafMemoryPool(); + auto arrowPool = vmm_->getArrowMemoryPool(); + auto veloxPool = vmm_->getLeafMemoryPool(); return std::make_unique(arrowPool, veloxPool, cSchema); } diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index ebcf4e5e0e84f..096ecb6fbf13b 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -99,9 +99,7 @@ class VeloxRuntime final : public Runtime { std::vector& streamIds); private: - std::unique_ptr listener_; - std::shared_ptr memoryManager_; - + VeloxMemoryManager* vmm_; std::shared_ptr veloxPlan_; std::shared_ptr veloxCfg_; bool debugModeEnabled_{false};