Skip to content

Commit

Permalink
Add file-create-config to hive config and propagate to file sink
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Dec 13, 2023
1 parent e57c47f commit f485f11
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 5 deletions.
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,11 @@ uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const {
return 10UL << 20;
}

std::string HiveConfig::fileCreateConfig(const Config* session) const {
if (session->isValueExists(kFileCreateConfig)) {
return session->get<std::string>(kFileCreateConfig).value();
}
return config_->get<std::string>(kFileCreateConfig, "");
}

} // namespace facebook::velox::connector::hive
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ class HiveConfig {
static constexpr const char* kSortWriterMaxOutputBytesSession =
"sort_writer_max_output_bytes";

/// Config used to create sink files. This config is provided to underlying
/// file system and the config is free form. The form should be defined by
/// the underlying file system.
static constexpr const char* kFileCreateConfig = "file-create-config";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -201,6 +206,8 @@ class HiveConfig {

uint64_t sortWriterMaxOutputBytes(const Config* session) const;

std::string fileCreateConfig(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
14 changes: 9 additions & 5 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/connectors/hive/HiveDataSink.h"

#include "velox/common/base/Counters.h"
#include "velox/common/file/FileSystems.h
#include "velox/common/base/Fs.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/testutil/TestValue.h"
Expand Down Expand Up @@ -595,6 +596,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
setMemoryReclaimers(writerInfo_.back().get());
dwio::common::WriterOptions options;
const auto connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = inputType_;
options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
Expand All @@ -603,11 +606,10 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
}
options.nonReclaimableSection =
writerInfo_.back()->nonReclaimableSectionHolder.get();
options.maxStripeSize = std::optional(hiveConfig_->getOrcWriterMaxStripeSize(
connectorQueryCtx_->sessionProperties()));
options.maxDictionaryMemory =
std::optional(hiveConfig_->getOrcWriterMaxDictionaryMemory(
connectorQueryCtx_->sessionProperties()));
options.maxStripeSize = std::optional(
hiveConfig_->getOrcWriterMaxStripeSize(connectorSessionProperties));
options.maxDictionaryMemory = std::optional(
hiveConfig_->getOrcWriterMaxDictionaryMemory(connectorSessionProperties));
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());
// Prevents the memory allocation during the writer creation.
Expand All @@ -617,6 +619,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
writePath,
{.bufferWrite = false,
.connectorProperties = hiveConfig_->config(),
.fileCreateConfig =
hiveConfig_->fileCreateConfig(connectorSessionProperties),
.pool = writerInfo_.back()->sinkPool.get(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = ioStats_.back().get()}),
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/common/FileSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class FileSink : public Closeable {
/// Connector properties are required to create a FileSink on FileSystems
/// such as S3.
const std::shared_ptr<const Config>& connectorProperties{nullptr};
/// Config used to create sink files. This config is provided to underlying
/// file system and the config is free form. The form should be defined by
/// the underlying file system.
const std::string fileCreateConfig;
memory::MemoryPool* pool{nullptr};
MetricsLogPtr metricLogger{MetricsLog::voidLog()};
IoStatistics* stats{nullptr};
Expand Down

0 comments on commit f485f11

Please sign in to comment.