Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 28, 2024
1 parent 3fd00eb commit 59714e7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
9 changes: 7 additions & 2 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
const std::unordered_map<std::string, std::string>& sessionConf = {});
static void release(Runtime*);

Runtime(const std::unordered_map<std::string, std::string>& confMap) : confMap_(confMap) {}
Runtime(std::shared_ptr<MemoryManager> memoryManager, const std::unordered_map<std::string, std::string>& confMap)
: memoryManager_(memoryManager), confMap_(confMap) {}

virtual ~Runtime() = default;

virtual void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string> dumpFile) = 0;
Expand All @@ -89,7 +91,9 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual std::shared_ptr<ColumnarBatch> select(std::shared_ptr<ColumnarBatch>, std::vector<int32_t>) = 0;

virtual MemoryManager* memoryManager() = 0;
virtual MemoryManager* memoryManager() {
return memoryManager_.get();
};

/// This function is used to create certain converter from the format used by
/// the backend to Spark unsafe row.
Expand Down Expand Up @@ -129,6 +133,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
}

protected:
std::shared_ptr<MemoryManager> memoryManager_;
std::unique_ptr<ObjectStore> objStore_ = ObjectStore::create();
std::unordered_map<std::string, std::string> confMap_; // Session conf map

Expand Down
33 changes: 16 additions & 17 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ namespace gluten {
VeloxRuntime::VeloxRuntime(
std::unique_ptr<AllocationListener> listener,
const std::unordered_map<std::string, std::string>& confMap)
: Runtime(confMap), listener_(std::move(listener)) {
: Runtime(std::make_shared<VeloxMemoryManager>(std::move(listener)), confMap) {
// Refresh session config.
memoryManager_ = std::make_shared<VeloxMemoryManager>(std::move(listener_));
vmm_ = dynamic_cast<VeloxMemoryManager*>(memoryManager_.get());
veloxCfg_ = std::make_shared<facebook::velox::core::MemConfig>(confMap_);
debugModeEnabled_ = veloxCfg_->get<bool>(kDebugModeEnabled, false);
FLAGS_minloglevel = veloxCfg_->get<uint32_t>(kGlogSeverityLevel, FLAGS_minloglevel);
Expand Down Expand Up @@ -132,7 +132,7 @@ void VeloxRuntime::injectWriteFilesTempPath(const std::string& path) {
}

VeloxMemoryManager* VeloxRuntime::memoryManager() {
return memoryManager_.get();
return vmm_;
}

std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
Expand All @@ -141,8 +141,7 @@ std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
const std::unordered_map<std::string, std::string>& sessionConf) {
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << printConfig(confMap_);

VeloxPlanConverter veloxPlanConverter(
inputs, memoryManager_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_);
VeloxPlanConverter veloxPlanConverter(inputs, vmm_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_);
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_));

// Scan node can be required.
Expand All @@ -154,12 +153,12 @@ std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
getInfoAndIds(veloxPlanConverter.splitInfos(), veloxPlan_->leafPlanNodeIds(), scanInfos, scanIds, streamIds);

auto wholestageIter = std::make_unique<WholeStageResultIterator>(
memoryManager_.get(), veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_);
vmm_.get(), veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_);
return std::make_shared<ResultIterator>(std::move(wholestageIter), this);
}

std::shared_ptr<ColumnarToRowConverter> VeloxRuntime::createColumnar2RowConverter() {
auto veloxPool = memoryManager_->getLeafMemoryPool();
auto veloxPool = vmm_->getLeafMemoryPool();
return std::make_shared<VeloxColumnarToRowConverter>(veloxPool);
}

Expand All @@ -175,23 +174,23 @@ std::shared_ptr<ColumnarBatch> VeloxRuntime::createOrGetEmptySchemaBatch(int32_t
std::shared_ptr<ColumnarBatch> VeloxRuntime::select(
std::shared_ptr<ColumnarBatch> batch,
std::vector<int32_t> columnIndices) {
auto veloxPool = memoryManager_->getLeafMemoryPool();
auto veloxPool = vmm_->getLeafMemoryPool();
auto veloxBatch = gluten::VeloxColumnarBatch::from(veloxPool.get(), batch);
auto outputBatch = veloxBatch->select(veloxPool.get(), std::move(columnIndices));
return outputBatch;
}

std::shared_ptr<RowToColumnarConverter> VeloxRuntime::createRow2ColumnarConverter(struct ArrowSchema* cSchema) {
auto veloxPool = memoryManager_->getLeafMemoryPool();
auto veloxPool = vmm_->getLeafMemoryPool();
return std::make_shared<VeloxRowToColumnarConverter>(cSchema, veloxPool);
}

std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter(
int numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options) {
auto veloxPool = memoryManager_->getLeafMemoryPool();
auto arrowPool = memoryManager_->getArrowMemoryPool();
auto veloxPool = vmm_->getLeafMemoryPool();
auto arrowPool = vmm_->getArrowMemoryPool();
GLUTEN_ASSIGN_OR_THROW(
std::shared_ptr<ShuffleWriter> shuffleWriter,
VeloxShuffleWriter::create(
Expand All @@ -208,10 +207,10 @@ std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
const std::string& filePath,
std::shared_ptr<arrow::Schema> schema) {
static std::atomic_uint32_t id{0UL};
auto veloxPool = memoryManager_->getAggregateMemoryPool()->addAggregateChild("datasource." + std::to_string(id++));
auto veloxPool = vmm_->getAggregateMemoryPool()->addAggregateChild("datasource." + std::to_string(id++));
// Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool
// with parquet writer.
auto sinkPool = memoryManager_->getLeafMemoryPool();
auto sinkPool = vmm_->getLeafMemoryPool();
if (isSupportedHDFSPath(filePath)) {
#ifdef ENABLE_HDFS
return std::make_shared<VeloxParquetDatasourceHDFS>(filePath, veloxPool, sinkPool, schema);
Expand Down Expand Up @@ -249,24 +248,24 @@ std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
ShuffleReaderOptions options) {
auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend);
auto ctxVeloxPool = memoryManager_->getLeafMemoryPool();
auto ctxVeloxPool = vmm_->getLeafMemoryPool();
auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
auto deserializerFactory = std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>(
schema,
std::move(codec),
veloxCompressionType,
rowType,
options.batchSize,
memoryManager_->getArrowMemoryPool(),
vmm_->getArrowMemoryPool(),
ctxVeloxPool,
options.shuffleWriterType);
auto reader = std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
return reader;
}

std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
auto arrowPool = memoryManager_->getArrowMemoryPool();
auto veloxPool = memoryManager_->getLeafMemoryPool();
auto arrowPool = vmm_->getArrowMemoryPool();
auto veloxPool = vmm_->getLeafMemoryPool();
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, cSchema);
}

Expand Down
4 changes: 1 addition & 3 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ class VeloxRuntime final : public Runtime {
std::vector<facebook::velox::core::PlanNodeId>& streamIds);

private:
std::unique_ptr<AllocationListener> listener_;
std::shared_ptr<VeloxMemoryManager> memoryManager_;

VeloxMemoryManager* vmm_;
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan_;
std::shared_ptr<facebook::velox::Config> veloxCfg_;
bool debugModeEnabled_{false};
Expand Down

0 comments on commit 59714e7

Please sign in to comment.