Skip to content

Commit

Permalink
[GLUTEN-3378][VL] Feat: Support read iceberg mor table for Velox back…
Browse files Browse the repository at this point in the history
…end (apache#4779)

Velox add iceberg mor table read support in facebookincubator/velox#7847. This PR supports read iceberg mor table for Velox backend.
  • Loading branch information
liujiayi771 authored and taiyang-li committed Oct 8, 2024
1 parent 50f30d4 commit 7624212
Show file tree
Hide file tree
Showing 13 changed files with 444 additions and 70 deletions.
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ set(VELOX_SRCS
compute/VeloxRuntime.cc
compute/WholeStageResultIterator.cc
compute/VeloxPlanConverter.cc
compute/iceberg/IcebergPlanConverter.cc
jni/VeloxJniWrapper.cc
jni/JniFileSystem.cc
jni/JniUdf.cc
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "compute/ResultIterator.h"
#include "config/GlutenConfig.h"
#include "iceberg/IcebergPlanConverter.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/common/file/FileSystems.h"

Expand Down Expand Up @@ -93,6 +94,9 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
case SubstraitFileFormatCase::kText:
splitInfo->format = dwio::common::FileFormat::TEXT;
break;
case SubstraitFileFormatCase::kIceberg:
splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, std::move(splitInfo));
break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
Expand Down
44 changes: 31 additions & 13 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,37 @@ WholeStageResultIterator::WholeStageResultIterator(
auto metadataColumn = metadataColumns[idx];
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
constructPartitionColumns(partitionKeys, partitionColumn);
auto split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
kHiveConnectorId,
paths[idx],
format,
starts[idx],
lengths[idx],
partitionKeys,
std::nullopt,
std::unordered_map<std::string, std::string>(),
nullptr,
std::unordered_map<std::string, std::string>(),
0,
metadataColumn);
std::shared_ptr<velox::connector::ConnectorSplit> split;
if (auto icebergSplitInfo = std::dynamic_pointer_cast<IcebergSplitInfo>(scanInfo)) {
// Set Iceberg split.
std::unordered_map<std::string, std::string> customSplitInfo{{"table_format", "hive-iceberg"}};
auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx];
split = std::make_shared<velox::connector::hive::iceberg::HiveIcebergSplit>(
kHiveConnectorId,
paths[idx],
format,
starts[idx],
lengths[idx],
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
deleteFiles);
} else {
split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
kHiveConnectorId,
paths[idx],
format,
starts[idx],
lengths[idx],
partitionKeys,
std::nullopt,
std::unordered_map<std::string, std::string>(),
nullptr,
std::unordered_map<std::string, std::string>(),
0,
metadataColumn);
}
connectorSplits.emplace_back(split);
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
#pragma once

#include "compute/Runtime.h"
#include "iceberg/IcebergPlanConverter.h"
#include "memory/ColumnarBatchIterator.h"
#include "memory/VeloxColumnarBatch.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
#include "utils/metrics.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/core/Config.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Task.h"
Expand Down
84 changes: 84 additions & 0 deletions cpp/velox/compute/iceberg/IcebergPlanConverter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "IcebergPlanConverter.h"

namespace gluten {

std::shared_ptr<IcebergSplitInfo> IcebergPlanConverter::parseIcebergSplitInfo(
substrait::ReadRel_LocalFiles_FileOrFiles file,
std::shared_ptr<SplitInfo> splitInfo) {
using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::FileFormatCase;
using SubstraitDeleteFileFormatCase =
::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::DeleteFile::FileFormatCase;
auto icebergSplitInfo = std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)
? std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)
: std::make_shared<IcebergSplitInfo>(*splitInfo);
auto icebergReadOption = file.iceberg();
switch (icebergReadOption.file_format_case()) {
case SubstraitFileFormatCase::kParquet:
icebergSplitInfo->format = dwio::common::FileFormat::PARQUET;
break;
case SubstraitFileFormatCase::kOrc:
icebergSplitInfo->format = dwio::common::FileFormat::ORC;
break;
default:
icebergSplitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
}
if (icebergReadOption.delete_files_size() > 0) {
auto deleteFiles = icebergReadOption.delete_files();
std::vector<IcebergDeleteFile> deletes;
deletes.reserve(icebergReadOption.delete_files_size());
for (auto i = 0; i < icebergReadOption.delete_files_size(); i++) {
auto deleteFile = icebergReadOption.delete_files().Get(i);
dwio::common::FileFormat format;
FileContent fileContent;
switch (deleteFile.file_format_case()) {
case SubstraitDeleteFileFormatCase::kParquet:
format = dwio::common::FileFormat::PARQUET;
break;
case SubstraitDeleteFileFormatCase::kOrc:
format = dwio::common::FileFormat::ORC;
break;
default:
format = dwio::common::FileFormat::UNKNOWN;
}
switch (deleteFile.filecontent()) {
case ::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_POSITION_DELETES:
fileContent = FileContent::kPositionalDeletes;
break;
case ::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_EQUALITY_DELETES:
fileContent = FileContent::kEqualityDeletes;
break;
default:
fileContent = FileContent::kData;
break;
}
deletes.emplace_back(IcebergDeleteFile(
fileContent, deleteFile.filepath(), format, deleteFile.recordcount(), deleteFile.filesize()));
}
icebergSplitInfo->deleteFilesVec.emplace_back(deletes);
} else {
// Add an empty delete files vector to indicate that this data file has no delete file.
icebergSplitInfo->deleteFilesVec.emplace_back(std::vector<IcebergDeleteFile>{});
}

return icebergSplitInfo;
}

} // namespace gluten
42 changes: 42 additions & 0 deletions cpp/velox/compute/iceberg/IcebergPlanConverter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "substrait/SubstraitToVeloxPlan.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"

using namespace facebook::velox::connector::hive::iceberg;

namespace gluten {
struct IcebergSplitInfo : SplitInfo {
std::vector<std::vector<IcebergDeleteFile>> deleteFilesVec;

IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {
// Reserve the actual size of the deleteFilesVec.
deleteFilesVec.reserve(splitInfo.paths.capacity());
}
};

class IcebergPlanConverter {
public:
static std::shared_ptr<IcebergSplitInfo> parseIcebergSplitInfo(
substrait::ReadRel_LocalFiles_FileOrFiles file,
std::shared_ptr<SplitInfo> splitInfo);
};

} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ struct SplitInfo {

/// The file format of the files to be scanned.
dwio::common::FileFormat format;

/// Make SplitInfo polymorphic
virtual ~SplitInfo() = default;
};

/// This class is used to convert the Substrait plan into Velox plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public enum ReadFileFormat {
UnknownFormat()
}

private ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat;
protected ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat;
private Boolean iterAsInput = false;
private StructType fileSchema;
private Map<String, String> fileReadProperties;
Expand Down Expand Up @@ -112,6 +112,13 @@ public List<String> preferredLocations() {
return this.preferredLocations;
}

/**
* Data Lake formats require some additional processing to be done on the FileBuilder, such as
* inserting delete files information. Different lake formats should override this method to
* implement their corresponding logic.
*/
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {}

public ReadRel.LocalFiles toProtobuf() {
ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder();
// The input is iterator, and the path is in the format of: Iterator:index.
Expand Down Expand Up @@ -210,6 +217,7 @@ public ReadRel.LocalFiles toProtobuf() {
default:
break;
}
processFileBuilder(fileBuilder);
localFilesBuilder.addItems(fileBuilder.build());
}
return localFilesBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ message ReadRel {
uint64 max_block_size = 1;
NamedStruct schema = 2 [deprecated=true];
}
message IcebergReadOptions {
enum FileContent {
DATA = 0;
POSITION_DELETES = 1;
EQUALITY_DELETES = 2;
}
message DeleteFile {
FileContent fileContent = 1;
string filePath = 2;
uint64 fileSize = 3;
uint64 recordCount = 4;
oneof file_format {
ParquetReadOptions parquet = 5;
OrcReadOptions orc = 6;
}
}
oneof file_format {
ParquetReadOptions parquet = 1;
OrcReadOptions orc = 2;
}
repeated DeleteFile delete_files = 3;
}

// File reading options
oneof file_format {
Expand All @@ -159,22 +181,23 @@ message ReadRel {
DwrfReadOptions dwrf = 13;
TextReadOptions text = 14;
JsonReadOptions json = 15;
IcebergReadOptions iceberg = 16;
}

message partitionColumn {
string key = 1;
string value = 2;
}
repeated partitionColumn partition_columns = 16;
repeated partitionColumn partition_columns = 17;

/// File schema
NamedStruct schema = 17;
NamedStruct schema = 18;

message metadataColumn {
string key = 1;
string value = 2;
}
repeated metadataColumn metadata_columns = 18;
repeated metadataColumn metadata_columns = 19;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,29 @@
*/
package io.glutenproject.substrait.rel;

import org.apache.iceberg.DeleteFile;

import java.util.List;
import java.util.Map;

public class IcebergLocalFilesBuilder {

// TODO: Add makeIcebergLocalFiles for MOR iceberg table

public static IcebergLocalFilesNode makeIcebergLocalFiles(
Integer index,
List<String> paths,
List<Long> starts,
List<Long> lengths,
List<Map<String, String>> partitionColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations) {
List<String> preferredLocations,
Map<String, List<DeleteFile>> deleteFilesMap) {
return new IcebergLocalFilesNode(
index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations);
index,
paths,
starts,
lengths,
partitionColumns,
fileFormat,
preferredLocations,
deleteFilesMap);
}
}
Loading

0 comments on commit 7624212

Please sign in to comment.