Skip to content

Commit

Permalink
Move Velox batch reader and writer to OSS nimble repo (#62)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #62

Just move files around, no functionality change.

Differential Revision: D58109796
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jun 3, 2024
1 parent acad60d commit 23acbac
Show file tree
Hide file tree
Showing 12 changed files with 1,397 additions and 0 deletions.
162 changes: 162 additions & 0 deletions dwio/nimble/velox/NimbleConfig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dwio/nimble/velox/NimbleConfig.h"

#include "dwio/nimble/encodings/EncodingSelectionPolicy.h"

#include <folly/json/dynamic.h>
#include <folly/json/json.h>

DEFINE_string(
nimble_selection_read_factors,
"Constant=1.0;Trivial=0.5;FixedBitWidth=0.9;MainlyConstant=1.0;SparseBool=1.0;Dictionary=1.0;RLE=1.0;Varint=1.0",
"Encoding selection read factors, in the format: "
"<EncodingName>=<FactorFloatValue>;<EncodingName>=<FactorFloatValue>;...");

DEFINE_double(
nimble_selection_compression_accept_ratio,
0.97,
"Encoding selection compression accept ratio.");

DEFINE_bool(
nimble_zstrong_enable_variable_bit_width_compressor,
false,
"Enable zstrong variable bit width compressor at write time. Transparent at read time.");

DEFINE_string(
nimble_writer_input_buffer_default_growth_config,
"{\"32\":4.0,\"512\":1.414,\"4096\":1.189}",
"Default growth config for writer input buffers, each entry in the format of {range_start,growth_factor}");

namespace facebook::nimble {
namespace {
template <typename T>
std::vector<T> parseVector(const std::string& str) {
std::vector<T> result;
if (!str.empty()) {
std::vector<folly::StringPiece> pieces;
folly::split(',', str, pieces, true);
for (auto& p : pieces) {
const auto& trimmedCol = folly::trimWhitespace(p);
if (!trimmedCol.empty()) {
result.push_back(folly::to<T>(trimmedCol));
}
}
}
return result;
}

std::map<uint64_t, float> parseGrowthConfigMap(const std::string& str) {
std::map<uint64_t, float> ret;
NIMBLE_CHECK(!str.empty(), "Can't supply an empty growth config.");
folly::dynamic json = folly::parseJson(str);
for (const auto& pair : json.items()) {
auto [_, inserted] = ret.emplace(
folly::to<uint64_t>(pair.first.asString()), pair.second.asDouble());
NIMBLE_CHECK(
inserted, fmt::format("Duplicate key: {}.", pair.first.asString()));
}
return ret;
}
} // namespace

/* static */ Config::Entry<bool> Config::FLATTEN_MAP("orc.flatten.map", false);

/* static */ Config::Entry<const std::vector<uint32_t>> Config::MAP_FLAT_COLS(
"orc.map.flat.cols",
{},
[](const std::vector<uint32_t>& val) { return folly::join(",", val); },
[](const std::string& /* key */, const std::string& val) {
return parseVector<uint32_t>(val);
});

/* static */ Config::Entry<const std::vector<uint32_t>>
Config::DEDUPLICATED_COLS(
"alpha.map.deduplicated.cols",
{},
[](const std::vector<uint32_t>& val) { return folly::join(",", val); },
[](const std::string& /* key */, const std::string& val) {
return parseVector<uint32_t>(val);
});

/* static */ Config::Entry<const std::vector<uint32_t>>
Config::BATCH_REUSE_COLS(
"alpha.dictionaryarray.cols",
{},
[](const std::vector<uint32_t>& val) { return folly::join(",", val); },
[](const std::string& /* key */, const std::string& val) {
return parseVector<uint32_t>(val);
});

/* static */ Config::Entry<uint64_t> Config::RAW_STRIPE_SIZE(
"alpha.raw.stripe.size",
512L * 1024L * 1024L);

/* static */ Config::Entry<const std::vector<std::pair<EncodingType, float>>>
Config::MANUAL_ENCODING_SELECTION_READ_FACTORS(
"alpha.encodingselection.read.factors",
ManualEncodingSelectionPolicyFactory::parseReadFactors(
FLAGS_nimble_selection_read_factors),
[](const std::vector<std::pair<EncodingType, float>>& val) {
std::vector<std::string> encodingFactorStrings;
std::transform(
val.cbegin(),
val.cend(),
std::back_inserter(encodingFactorStrings),
[](const auto& readFactor) {
return fmt::format(
"{}={}", toString(readFactor.first), readFactor.second);
});
return folly::join(";", encodingFactorStrings);
},
[](const std::string& /* key */, const std::string& val) {
return ManualEncodingSelectionPolicyFactory::parseReadFactors(val);
});

/* static */ Config::Entry<float>
Config::ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO(
"alpha.encodingselection.compression.accept.ratio",
FLAGS_nimble_selection_compression_accept_ratio);

/* static */ Config::Entry<uint32_t> Config::ZSTRONG_COMPRESSION_LEVEL(
"alpha.zstrong.compression.level",
4);

/* static */ Config::Entry<uint32_t> Config::ZSTRONG_DECOMPRESSION_LEVEL(
"alpha.zstrong.decompression.level",
2);

/* static */ Config::Entry<bool>
Config::ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR(
"alpha.zstrong.enable.variable.bit.width.compressor",
FLAGS_nimble_zstrong_enable_variable_bit_width_compressor);

/* static */ Config::Entry<const std::map<uint64_t, float>>
Config::INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS(
"alpha.writer.input.buffer.default.growth.configs",
parseGrowthConfigMap(
FLAGS_nimble_writer_input_buffer_default_growth_config),
[](const std::map<uint64_t, float>& val) {
folly::dynamic obj = folly::dynamic::object;
for (const auto& [rangeStart, growthFactor] : val) {
obj[folly::to<std::string>(rangeStart)] = growthFactor;
}
return folly::toJson(obj);
},
[](const std::string& /* key */, const std::string& val) {
return parseGrowthConfigMap(val);
});
} // namespace facebook::nimble
51 changes: 51 additions & 0 deletions dwio/nimble/velox/NimbleConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "dwio/nimble/common/Types.h"

#include "velox/common/config/Config.h"

namespace facebook::nimble {

class Config : public velox::common::ConfigBase<Config> {
public:
template <typename T>
using Entry = velox::common::ConfigBase<Config>::Entry<T>;

static Entry<bool> FLATTEN_MAP;
static Entry<const std::vector<uint32_t>> MAP_FLAT_COLS;
static Entry<const std::vector<uint32_t>> BATCH_REUSE_COLS;
static Entry<const std::vector<uint32_t>> DEDUPLICATED_COLS;
static Entry<uint64_t> RAW_STRIPE_SIZE;
static Entry<const std::vector<std::pair<EncodingType, float>>>
MANUAL_ENCODING_SELECTION_READ_FACTORS;
static Entry<float> ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO;
static Entry<uint32_t> ZSTRONG_COMPRESSION_LEVEL;
static Entry<uint32_t> ZSTRONG_DECOMPRESSION_LEVEL;
static Entry<bool> ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR;
static Entry<const std::map<uint64_t, float>>
INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS;

static std::shared_ptr<Config> fromMap(
const std::map<std::string, std::string>& map) {
auto ret = std::make_shared<Config>();
ret->configs_.insert(map.cbegin(), map.cend());
return ret;
}
};

} // namespace facebook::nimble
178 changes: 178 additions & 0 deletions dwio/nimble/velox/NimbleReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "dwio/nimble/velox/NimbleReader.h"

#include "dwio/nimble/tablet/Constants.h"
#include "dwio/nimble/velox/VeloxUtil.h"

namespace facebook::velox::nimble {

namespace {

const std::vector<std::string> kPreloadOptionalSections = {
std::string(facebook::nimble::kSchemaSection)};

class NimbleRowReader : public dwio::common::RowReader {
public:
NimbleRowReader(
std::unique_ptr<facebook::nimble::VeloxReader> reader,
const std::shared_ptr<common::ScanSpec>& scanSpec)
: reader_(std::move(reader)), scanSpec_(scanSpec) {
reader_->loadStripeIfAny();
}

int64_t nextRowNumber() override {
VELOX_NYI();
}

int64_t nextReadSize(uint64_t /*size*/) override {
VELOX_NYI();
}

uint64_t next(
uint64_t size,
VectorPtr& result,
const dwio::common::Mutation* mutation) override {
TypePtr resultType;
VectorPtr rawResult;
if (result) {
resultType = result->type();
rawResult = std::move(rawVectorForBatchReader(*result));
result.reset();
}
if (!reader_->next(size, rawResult)) {
if (rawResult) {
result = BaseVector::create(resultType, 0, &reader_->memoryPool());
rawVectorForBatchReader(*result) = std::move(rawResult);
}
return 0;
}
auto scanned = rawResult->size();
result = projectColumns(rawResult, *scanSpec_, mutation);
rawVectorForBatchReader(*result) = std::move(rawResult);
return scanned;
}

void updateRuntimeStats(
dwio::common::RuntimeStatistics& /*stats*/) const override {
// No-op for non-selective reader.
}

void resetFilterCaches() override {
// No-op for non-selective reader.
}

std::optional<size_t> estimatedRowSize() const override {
return std::optional(reader_->estimatedRowSize());
}

private:
std::unique_ptr<facebook::nimble::VeloxReader> reader_;
std::shared_ptr<common::ScanSpec> scanSpec_;

static VectorPtr& rawVectorForBatchReader(BaseVector& vector) {
auto* rowVector = vector.as<RowVector>();
VELOX_CHECK_NOT_NULL(rowVector);
return rowVector->rawVectorForBatchReader();
}
};

class NimbleReader : public dwio::common::Reader {
public:
NimbleReader(
const dwio::common::ReaderOptions& options,
const std::shared_ptr<ReadFile>& readFile)
: options_(options),
readFile_(readFile),
tabletReader_(std::make_shared<facebook::nimble::TabletReader>(
options.getMemoryPool(),
readFile_.get(),
kPreloadOptionalSections)) {
if (!options_.getFileSchema()) {
facebook::nimble::VeloxReader tmpReader(
options.getMemoryPool(), tabletReader_);
options_.setFileSchema(tmpReader.type());
}
}

std::optional<uint64_t> numberOfRows() const override {
return tabletReader_->tabletRowCount();
}

std::unique_ptr<dwio::common::ColumnStatistics> columnStatistics(
uint32_t /*index*/) const override {
// TODO
return nullptr;
}

const RowTypePtr& rowType() const override {
return options_.getFileSchema();
}

const std::shared_ptr<const dwio::common::TypeWithId>& typeWithId()
const override {
if (!typeWithId_) {
typeWithId_ = dwio::common::TypeWithId::create(rowType());
}
return typeWithId_;
}

std::unique_ptr<dwio::common::RowReader> createRowReader(
const dwio::common::RowReaderOptions& options) const override {
facebook::nimble::VeloxReadParams params;
params.fileRangeStartOffset = options.getOffset();
params.fileRangeEndOffset = options.getLimit();
params.decodingExecutor = options.getDecodingExecutor();
auto selector = options.getSelector();
if (!selector) {
selector = std::make_shared<dwio::common::ColumnSelector>(rowType());
}
facebook::dwio::api::populateFeatureSelector(
*selector, options.getMapColumnIdAsStruct(), params);
auto reader = std::make_unique<facebook::nimble::VeloxReader>(
options_.getMemoryPool(),
tabletReader_,
std::move(selector),
std::move(params));
return std::make_unique<NimbleRowReader>(
std::move(reader), options.getScanSpec());
}

private:
dwio::common::ReaderOptions options_;
std::shared_ptr<ReadFile> readFile_;
std::shared_ptr<facebook::nimble::TabletReader> tabletReader_;
mutable std::shared_ptr<const dwio::common::TypeWithId> typeWithId_;
};

} // namespace

std::unique_ptr<dwio::common::Reader> NimbleReaderFactory::createReader(
std::unique_ptr<dwio::common::BufferedInput> input,
const dwio::common::ReaderOptions& options) {
return std::make_unique<NimbleReader>(options, input->getReadFile());
}

void registerNimbleReaderFactory() {
dwio::common::registerReaderFactory(std::make_shared<NimbleReaderFactory>());
}

void unregisterNimbleReaderFactory() {
dwio::common::unregisterReaderFactory(dwio::common::FileFormat::NIMBLE);
}

} // namespace facebook::velox::nimble
Loading

0 comments on commit 23acbac

Please sign in to comment.