Skip to content

Commit

Permalink
Add schema to parquet writer options (facebookincubator#6074)
Browse files Browse the repository at this point in the history
Summary:
Here is the schema information for the TPCH `supplier `table.
```
root
 |-- s_suppkey: long (nullable = true)
 |-- s_name: string (nullable = true)
 |-- s_address: string (nullable = true)
 |-- s_nationkey: long (nullable = true)
 |-- s_phone: string (nullable = true)
 |-- s_acctbal: double (nullable = true)
 |-- s_comment: string (nullable = true)
```

 When we write the `supplier `table into a parquet file using velox parquet writer in Gluten, we encounter the incorrect schema issue.

```
root
 |-- n0_0: long (nullable = true)
 |-- n0_1: string (nullable = true)
 |-- n0_2: string (nullable = true)
 |-- n0_3: long (nullable = true)
 |-- n0_4: string (nullable = true)
 |-- n0_5: double (nullable = true)
 |-- n0_6: string (nullable = true)
```

The reason for this is that the schema is inferred from the record batch. To address this, this PR introduces a schema into the `WriterOption`. This allows us to pass the appropriate schema to the Parquet writer.

Pull Request resolved: facebookincubator#6074

Reviewed By: xiaoxmeng

Differential Revision: D52153693

Pulled By: mbasmanova

fbshipit-source-id: d4bb340ebe5b1ad0f6460ccc1b14b5d8e21ab885
  • Loading branch information
JkSelf authored and facebook-github-bot committed Dec 19, 2023
1 parent 050c16c commit 2ada2ec
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 34 deletions.
3 changes: 2 additions & 1 deletion velox/dwio/parquet/tests/ParquetTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,15 @@ class ParquetTestBase : public testing::Test, public test::VectorTestBase {
std::function<
std::unique_ptr<facebook::velox::parquet::DefaultFlushPolicy>()>
flushPolicy,
const RowTypePtr& rowType,
facebook::velox::common::CompressionKind compressionKind =
facebook::velox::common::CompressionKind_NONE) {
facebook::velox::parquet::WriterOptions options;
options.memoryPool = rootPool_.get();
options.flushPolicyFactory = flushPolicy;
options.compression = compressionKind;
return std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), options);
std::move(sink), options, rowType);
}

std::vector<RowVectorPtr> createBatches(
Expand Down
63 changes: 56 additions & 7 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/dwio/common/tests/E2EFilterTestBase.h"
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

#include <folly/init/Init.h>

Expand All @@ -27,7 +28,7 @@ using namespace facebook::velox::parquet;

using dwio::common::MemorySink;

class E2EFilterTest : public E2EFilterTestBase {
class E2EFilterTest : public E2EFilterTestBase, public test::VectorTestBase {
protected:
void SetUp() override {
E2EFilterTestBase::SetUp();
Expand All @@ -53,13 +54,13 @@ class E2EFilterTest : public E2EFilterTestBase {
}

void writeToMemory(
const TypePtr&,
const TypePtr& type,
const std::vector<RowVectorPtr>& batches,
bool forRowGroupSkip = false) override {
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024, FileSink::Options{.pool = leafPool_.get()});
sinkPtr_ = sink.get();
options_.memoryPool = rootPool_.get();
options_.memoryPool = E2EFilterTestBase::rootPool_.get();
int32_t flushCounter = 0;
options_.flushPolicyFactory = [&]() {
return std::make_unique<LambdaFlushPolicy>(
Expand All @@ -71,7 +72,7 @@ class E2EFilterTest : public E2EFilterTestBase {
};

writer_ = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), options_);
std::move(sink), options_, asRowType(type));
for (auto& batch : batches) {
writer_->write(batch);
}
Expand All @@ -92,7 +93,7 @@ class E2EFilterTest : public E2EFilterTestBase {
};

TEST_F(E2EFilterTest, writerMagic) {
rowType_ = ROW({INTEGER()});
rowType_ = ROW({"c0"}, {INTEGER()});
std::vector<RowVectorPtr> batches;
batches.push_back(std::static_pointer_cast<RowVector>(
test::BatchMaker::createBatch(rowType_, 20000, *leafPool_, nullptr, 0)));
Expand Down Expand Up @@ -561,7 +562,7 @@ TEST_F(E2EFilterTest, varbinaryDictionary) {
TEST_F(E2EFilterTest, largeMetadata) {
rowsInRowGroup_ = 1;

rowType_ = ROW({INTEGER()});
rowType_ = ROW({"c0"}, {INTEGER()});
std::vector<RowVectorPtr> batches;
batches.push_back(std::static_pointer_cast<RowVector>(
test::BatchMaker::createBatch(rowType_, 1000, *leafPool_, nullptr, 0)));
Expand Down Expand Up @@ -598,7 +599,7 @@ TEST_F(E2EFilterTest, date) {

TEST_F(E2EFilterTest, combineRowGroup) {
rowsInRowGroup_ = 5;
rowType_ = ROW({INTEGER()});
rowType_ = ROW({"c0"}, {INTEGER()});
std::vector<RowVectorPtr> batches;
for (int i = 0; i < 5; i++) {
batches.push_back(std::static_pointer_cast<RowVector>(
Expand All @@ -615,6 +616,54 @@ TEST_F(E2EFilterTest, combineRowGroup) {
EXPECT_EQ(parquetReader.numberOfRows(), 5);
}

TEST_F(E2EFilterTest, configurableWriteSchema) {
auto test = [&](auto& type, auto& newType) {
std::vector<RowVectorPtr> batches;
for (auto i = 0; i < 5; i++) {
auto vector = BaseVector::create(type, 100, pool());
auto rowVector = std::dynamic_pointer_cast<RowVector>(vector);
batches.push_back(rowVector);
}

writeToMemory(newType, batches, false);
std::string_view data(sinkPtr_->data(), sinkPtr_->size());
dwio::common::ReaderOptions readerOpts{leafPool_.get()};
auto input = std::make_unique<BufferedInput>(
std::make_shared<InMemoryReadFile>(data), readerOpts.getMemoryPool());
auto reader = makeReader(readerOpts, std::move(input));
auto parquetReader = dynamic_cast<ParquetReader&>(*reader.get());

EXPECT_EQ(parquetReader.rowType()->toString(), newType->toString());
};

// ROW(ROW(ROW))
auto type =
ROW({"a", "b"}, {INTEGER(), ROW({"c"}, {ROW({"d"}, {INTEGER()})})});
auto newType =
ROW({"aa", "bb"}, {INTEGER(), ROW({"cc"}, {ROW({"dd"}, {INTEGER()})})});
test(type, newType);

// ARRAY(ROW)
type =
ROW({"a", "b"}, {ARRAY(ROW({"c", "d"}, {BIGINT(), BIGINT()})), BIGINT()});
newType = ROW(
{"aa", "bb"}, {ARRAY(ROW({"cc", "dd"}, {BIGINT(), BIGINT()})), BIGINT()});
test(type, newType);

// // MAP(ROW)
type =
ROW({"a", "b"},
{MAP(ROW({"c", "d"}, {BIGINT(), BIGINT()}),
ROW({"e", "f"}, {BIGINT(), BIGINT()})),
BIGINT()});
newType =
ROW({"aa", "bb"},
{MAP(ROW({"cc", "dd"}, {BIGINT(), BIGINT()}),
ROW({"ee", "ff"}, {BIGINT(), BIGINT()})),
BIGINT()});
test(type, newType);
}

// Define main so that gflags get processed.
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
Expand Down
8 changes: 5 additions & 3 deletions velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ const double kFilterErrorMargin = 0.2;

class ParquetReaderBenchmark {
public:
explicit ParquetReaderBenchmark(bool disableDictionary)
explicit ParquetReaderBenchmark(
bool disableDictionary,
const RowTypePtr& rowType)
: disableDictionary_(disableDictionary) {
rootPool_ = memory::MemoryManager::getInstance()->addRootPool(
"ParquetReaderBenchmark");
Expand All @@ -56,7 +58,7 @@ class ParquetReaderBenchmark {
}
options.memoryPool = rootPool_.get();
writer_ = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), options);
std::move(sink), options, rowType);
}

~ParquetReaderBenchmark() {}
Expand Down Expand Up @@ -272,7 +274,7 @@ void run(
uint8_t nullsRateX100,
uint32_t nextSize,
bool disableDictionary) {
ParquetReaderBenchmark benchmark(disableDictionary);
ParquetReaderBenchmark benchmark(disableDictionary, asRowType(type));
BIGINT()->toString();
benchmark.readSingleColumn(
columnName, type, 0, filterRateX100, nullsRateX100, nextSize);
Expand Down
28 changes: 18 additions & 10 deletions velox/dwio/parquet/tests/writer/SinkTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ using namespace facebook::velox::parquet;
class SinkTest : public ParquetTestBase {};

TEST_F(SinkTest, close) {
auto batches = createBatches(ROW({INTEGER(), VARCHAR()}), 2, 3);
auto rowType = ROW({"c0", "c1"}, {INTEGER(), VARCHAR()});
auto batches = createBatches(rowType, 2, 3);
auto filePath = fs::path(fmt::format("{}/test_close.txt", tempPath_->path));
auto sink = createSink(filePath.string());
auto sinkPtr = sink.get();
auto writer = createWriter(std::move(sink), [&]() {
return std::make_unique<LambdaFlushPolicy>(
kRowsInRowGroup, kBytesInRowGroup, [&]() { return false; });
});
auto writer = createWriter(
std::move(sink),
[&]() {
return std::make_unique<LambdaFlushPolicy>(
kRowsInRowGroup, kBytesInRowGroup, [&]() { return false; });
},
rowType);

for (auto& batch : batches) {
writer->write(batch);
Expand All @@ -49,14 +53,18 @@ TEST_F(SinkTest, close) {
}

TEST_F(SinkTest, abort) {
auto batches = createBatches(ROW({INTEGER(), VARCHAR()}), 2, 3);
auto rowType = ROW({"c0", "c1"}, {INTEGER(), VARCHAR()});
auto batches = createBatches(rowType, 2, 3);
auto filePath = fs::path(fmt::format("{}/test_abort.txt", tempPath_->path));
auto sink = createSink(filePath.string());
auto sinkPtr = sink.get();
auto writer = createWriter(std::move(sink), [&]() {
return std::make_unique<LambdaFlushPolicy>(
kRowsInRowGroup, kBytesInRowGroup, [&]() { return false; });
});
auto writer = createWriter(
std::move(sink),
[&]() {
return std::make_unique<LambdaFlushPolicy>(
kRowsInRowGroup, kBytesInRowGroup, [&]() { return false; });
},
rowType);

for (auto& batch : batches) {
writer->write(batch);
Expand Down
111 changes: 101 additions & 10 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
* limitations under the License.
*/

#include "velox/vector/arrow/Bridge.h"

#include "velox/dwio/parquet/writer/Writer.h"
#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>

#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -122,6 +120,8 @@ Compression::type getArrowParquetCompression(
}
}

namespace {

std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
const parquet::WriterOptions& options,
const std::unique_ptr<DefaultFlushPolicy>& flushPolicy) {
Expand All @@ -139,17 +139,89 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
return properties->build();
}

void validateSchemaRecursive(const RowTypePtr& schema) {
// Check the schema's field names is not empty and unique.
VELOX_USER_CHECK_NOT_NULL(schema, "Field schema must not be empty.");
const auto& fieldNames = schema->names();

folly::F14FastSet<std::string> uniqueNames;
for (const auto& name : fieldNames) {
VELOX_USER_CHECK(!name.empty(), "Field name must not be empty.")
auto result = uniqueNames.insert(name);
VELOX_USER_CHECK(
result.second,
"File schema should not have duplicate columns: {}",
name);
}

for (auto i = 0; i < schema->size(); ++i) {
if (auto childSchema =
std::dynamic_pointer_cast<const RowType>(schema->childAt(i))) {
validateSchemaRecursive(childSchema);
}
}
}

std::shared_ptr<::arrow::Field> updateFieldNameRecursive(
const std::shared_ptr<::arrow::Field>& field,
const Type& type,
const std::string& name = "") {
if (type.isRow()) {
auto rowType = type.asRow();
auto newField = field->WithName(name);
auto structType =
std::dynamic_pointer_cast<::arrow::StructType>(newField->type());
auto childrenSize = rowType.size();
std::vector<std::shared_ptr<::arrow::Field>> newFields;
newFields.reserve(childrenSize);
for (auto i = 0; i < childrenSize; i++) {
newFields.push_back(updateFieldNameRecursive(
structType->fields()[i], *rowType.childAt(i), rowType.nameOf(i)));
}
return newField->WithType(::arrow::struct_(newFields));
} else if (type.isArray()) {
auto newField = field->WithName(name);
auto listType =
std::dynamic_pointer_cast<::arrow::BaseListType>(newField->type());
auto elementType = type.asArray().elementType();
auto elementField = listType->value_field();
return newField->WithType(
::arrow::list(updateFieldNameRecursive(elementField, *elementType)));
} else if (type.isMap()) {
auto mapType = type.asMap();
auto newField = field->WithName(name);
auto arrowMapType =
std::dynamic_pointer_cast<::arrow::MapType>(newField->type());
auto newKeyField =
updateFieldNameRecursive(arrowMapType->key_field(), *mapType.keyType());
auto newValueField = updateFieldNameRecursive(
arrowMapType->item_field(), *mapType.valueType());
return newField->WithType(
::arrow::map(newKeyField->type(), newValueField->type()));
} else if (name != "") {
return field->WithName(name);
} else {
return field;
}
}

} // namespace

Writer::Writer(
std::unique_ptr<dwio::common::FileSink> sink,
const WriterOptions& options,
std::shared_ptr<memory::MemoryPool> pool)
std::shared_ptr<memory::MemoryPool> pool,
RowTypePtr schema)
: pool_(std::move(pool)),
generalPool_{pool_->addLeafChild(".general")},
stream_(std::make_shared<ArrowDataBufferSink>(
std::move(sink),
*generalPool_,
options.bufferGrowRatio)),
arrowContext_(std::make_shared<ArrowContext>()) {
arrowContext_(std::make_shared<ArrowContext>()),
schema_(std::move(schema)) {
validateSchemaRecursive(schema_);

if (options.flushPolicyFactory) {
flushPolicy_ = options.flushPolicyFactory();
} else {
Expand All @@ -161,13 +233,15 @@ Writer::Writer(

Writer::Writer(
std::unique_ptr<dwio::common::FileSink> sink,
const WriterOptions& options)
const WriterOptions& options,
RowTypePtr schema)
: Writer{
std::move(sink),
options,
options.memoryPool->addAggregateChild(fmt::format(
"writer_node_{}",
folly::to<std::string>(folly::Random::rand64())))} {}
folly::to<std::string>(folly::Random::rand64()))),
std::move(schema)} {}

void Writer::flush() {
if (arrowContext_->stagingRows > 0) {
Expand Down Expand Up @@ -224,13 +298,29 @@ dwio::common::StripeProgress getStripeProgress(
* This method assumes each input `ColumnarBatch` have same schema.
*/
void Writer::write(const VectorPtr& data) {
VELOX_USER_CHECK(
data->type()->equivalent(*schema_),
"The file schema type should be equal with the input rowvector type.");

ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get(), options);
exportToArrow(data, schema, options);

// Convert the arrow schema to Schema and then update the column names based
// on schema_.
auto arrowSchema = ::arrow::ImportSchema(&schema).ValueOrDie();
std::vector<std::shared_ptr<::arrow::Field>> newFields;
auto childSize = schema_->size();
for (auto i = 0; i < childSize; i++) {
newFields.push_back(updateFieldNameRecursive(
arrowSchema->fields()[i], *schema_->childAt(i), schema_->nameOf(i)));
}

PARQUET_ASSIGN_OR_THROW(
auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema));
auto recordBatch,
::arrow::ImportRecordBatch(&array, ::arrow::schema(newFields)));
if (!arrowContext_->schema) {
arrowContext_->schema = recordBatch->schema();
for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields();
Expand Down Expand Up @@ -295,7 +385,8 @@ std::unique_ptr<dwio::common::Writer> ParquetWriterFactory::createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const dwio::common::WriterOptions& options) {
auto parquetOptions = getParquetOptions(options);
return std::make_unique<Writer>(std::move(sink), parquetOptions);
return std::make_unique<Writer>(
std::move(sink), parquetOptions, asRowType(options.schema));
}

} // namespace facebook::velox::parquet
Loading

0 comments on commit 2ada2ec

Please sign in to comment.