Skip to content

Commit

Permalink
Refactor WriterOptions processConfigs function (#10762)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #10762

Merge processSessionConfigs and processHiveConnectorConfigs
as one function processConfigs.
In processConfigs(), it prioritize getting config from session property,
if not found, use config from connector config.

Reviewed By: xiaoxmeng

Differential Revision: D61304850

fbshipit-source-id: 19b2a66958559cb54f96e4dcc73480ccd81d57c8
  • Loading branch information
Ke Wang authored and facebook-github-bot committed Aug 17, 2024
1 parent 2b7f559 commit 54b16e0
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 24 deletions.
3 changes: 1 addition & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
// through insertTableHandle)
// 2. Otherwise, acquire user defined session properties.
// 3. Lastly, acquire general hive connector configs.
options->processSessionConfigs(*connectorSessionProperties);
options->processHiveConnectorConfigs(*hiveConfig_->config());
options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties);

// Only overwrite options in case they were not already provided.
if (options->schema == nullptr) {
Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,9 @@ struct WriterOptions {

// WriterOption implementations should provide this function to specify how to
// process format-specific session and connector configs.
virtual void processSessionConfigs(const config::ConfigBase&) {}
virtual void processHiveConnectorConfigs(const config::ConfigBase&) {}
virtual void processConfigs(
const config::ConfigBase& connectorConfig,
const config::ConfigBase& session) {};

virtual ~WriterOptions() = default;
};
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::dwrf {

void WriterOptions::processConfigs(
const config::ConfigBase& connectorConfig,
const config::ConfigBase& session) {};

namespace {

dwio::common::StripeProgress getStripeProgress(const WriterContext& context) {
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ struct WriterOptions : public dwio::common::WriterOptions {
WriterContext& context,
const velox::dwio::common::TypeWithId& type)>
columnWriterFactory;

void processConfigs(
const config::ConfigBase& connectorConfig,
const config::ConfigBase& session) override;
};

class Writer : public dwio::common::Writer {
Expand Down
28 changes: 11 additions & 17 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,28 +435,22 @@ std::optional<std::string> getTimestampTimeZone(

} // namespace

void WriterOptions::processSessionConfigs(const config::ConfigBase& config) {
void WriterOptions::processConfigs(
const config::ConfigBase& connectorConfig,
const config::ConfigBase& session) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetSessionWriteTimestampUnit);
getTimestampUnit(session, kParquetSessionWriteTimestampUnit).has_value()
? getTimestampUnit(session, kParquetSessionWriteTimestampUnit)
: getTimestampUnit(connectorConfig, kParquetSessionWriteTimestampUnit);
}

if (!parquetWriteTimestampTimeZone) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
}
}

void WriterOptions::processHiveConnectorConfigs(
const config::ConfigBase& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit);
}

if (!parquetWriteTimestampTimeZone) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
getTimestampTimeZone(session, core::QueryConfig::kSessionTimezone)
.has_value()
? getTimestampTimeZone(session, core::QueryConfig::kSessionTimezone)
: getTimestampTimeZone(
connectorConfig, core::QueryConfig::kSessionTimezone);
}
}

Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ struct WriterOptions : public dwio::common::WriterOptions {
"hive.parquet.writer.timestamp-unit";

// Process hive connector and session configs.
void processSessionConfigs(const config::ConfigBase& config) override;
void processHiveConnectorConfigs(const config::ConfigBase& config) override;
void processConfigs(
const config::ConfigBase& connectorConfig,
const config::ConfigBase& session) override;
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/exec/fuzzer/PrestoQueryRunner.h"
#include <cpr/cpr.h> // @manual
#include <folly/json.h>
#include <velox/dwio/dwrf/writer/Writer.h>
#include <iostream>
#include "velox/common/base/Fs.h"
#include "velox/common/encode/Base64.h"
Expand Down Expand Up @@ -44,7 +45,8 @@ void writeToFile(
memory::MemoryPool* pool) {
VELOX_CHECK_GT(data.size(), 0);

auto options = std::make_shared<dwio::common::WriterOptions>();
std::shared_ptr<dwio::common::WriterOptions> options =
std::make_shared<dwrf::WriterOptions>();
options->schema = data[0]->type();
options->memoryPool = pool;

Expand Down

0 comments on commit 54b16e0

Please sign in to comment.