Skip to content

Commit

Permalink
Support timestamp units in arrow bridge (7625)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and zhztheplayer committed Dec 29, 2023
1 parent d6ed32e commit cd61634
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 127 deletions.
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ std::string HiveConfig::insertExistingPartitionsBehaviorString(
}
}

// static.
uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* config) {
return config->get<uint8_t>(kArrowBridgeTimestampUnit, 9 /* nano */);
}

HiveConfig::InsertExistingPartitionsBehavior
HiveConfig::insertExistingPartitionsBehavior(const Config* session) const {
if (session->isValueExists(kInsertExistingPartitionsBehaviorSession)) {
Expand Down
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class HiveConfig {
static constexpr const char* kWriteFileCreateConfig =
"hive.write_file_create_config";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow_bridge_timestamp_unit";

/// Maximum number of rows for sort writer in one batch of output.
static constexpr const char* kSortWriterMaxOutputRows =
"sort-writer-max-output-rows";
Expand Down Expand Up @@ -190,6 +194,10 @@ class HiveConfig {

std::string gcsCredentials() const;

/// Returns the timestamp unit used in Velox-Arrow conversion.
/// 0: second, 3: milli, 6: micro, 9: nano.
static uint8_t arrowBridgeTimestampUnit(const Config* config);

bool isOrcUseColumnNames(const Config* session) const;

bool isFileColumnNamesReadAsLowerCase(const Config* session) const;
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
if (canReclaim()) {
options.spillConfig = spillConfig_;
}
options.arrowBridgeTimestampUnit = HiveConfig::arrowBridgeTimestampUnit(
connectorQueryCtx_->sessionProperties());
options.nonReclaimableSection =
writerInfo_.back()->nonReclaimableSectionHolder.get();
options.maxStripeSize = std::optional(
Expand Down
11 changes: 11 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ class QueryConfig {
static constexpr const char* kDriverCpuTimeSliceLimitMs =
"driver_cpu_time_slice_limit_ms";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow_bridge_timestamp_unit";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -613,6 +617,13 @@ class QueryConfig {
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

/// Returns the timestamp unit used in Velox-Arrow conversion.
/// 0: second, 3: milli, 6: micro, 9: nano.
uint8_t arrowBridgeTimestampUnit() const {
constexpr uint8_t kDefaultUnit = 9;
return get<uint8_t>(kArrowBridgeTimestampUnit, kDefaultUnit);
}

/// Returns the number of bits used to calculate the spilling partition
/// number for hash join. The number of spilling partitions will be power of
/// two.
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ struct WriterOptions {
std::optional<uint64_t> maxStripeSize{std::nullopt};
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> arrowBridgeTimestampUnit;
};

} // namespace facebook::velox::dwio::common
6 changes: 6 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ Writer::Writer(
} else {
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.arrowBridgeTimestampUnit);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -381,6 +383,10 @@ parquet::WriterOptions getParquetOptions(
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.arrowBridgeTimestampUnit.has_value()) {
parquetOptions.arrowBridgeTimestampUnit =
options.arrowBridgeTimestampUnit.value();
}
return parquetOptions;
}

Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -100,6 +101,7 @@ struct WriterOptions {
// policy with the configs in its ctor.
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
std::shared_ptr<CodecOptions> codecOptions;
uint8_t arrowBridgeTimestampUnit = static_cast<uint8_t>(TimestampUnit::kNano);
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -156,6 +158,8 @@ class Writer : public dwio::common::Writer {
std::unique_ptr<DefaultFlushPolicy> flushPolicy_;

const RowTypePtr schema_;

ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true};
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ ArrowStream::ArrowStream(
operatorId,
arrowStreamNode->id(),
"ArrowStream") {
options_.timestampUnit = static_cast<TimestampUnit>(
driverCtx->queryConfig().arrowBridgeTimestampUnit());
arrowStream_ = arrowStreamNode->arrowStream();
}

Expand Down Expand Up @@ -66,7 +68,7 @@ RowVectorPtr ArrowStream::getOutput() {

// Convert Arrow Array into RowVector and return.
return std::dynamic_pointer_cast<RowVector>(
importFromArrowAsOwner(arrowSchema, arrowArray, pool()));
importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool()));
}

bool ArrowStream::isFinished() {
Expand Down
1 change: 1 addition & 0 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ArrowStream : public SourceOperator {

bool finished_ = false;
std::shared_ptr<ArrowArrayStream> arrowStream_;
ArrowOptions options_;
};

} // namespace facebook::velox::exec
5 changes: 3 additions & 2 deletions velox/exec/tests/ArrowStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase {

int getNext(struct ArrowArray* outArray) {
if (vectorIndex_ < vectors_.size()) {
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get());
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_);
vectorIndex_ += 1;
} else {
// End of stream. Mark the array released.
Expand All @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase {
}

int getArrowSchema(ArrowSchema& out) {
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out);
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_);
return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed
: (int)ErrorCode::kNoError;
}

private:
ArrowOptions options_;
const std::shared_ptr<memory::MemoryPool> pool_;
const std::vector<RowVectorPtr>& vectors_;
const TypePtr type_;
Expand Down
Loading

0 comments on commit cd61634

Please sign in to comment.