Skip to content

Commit

Permalink
[Refactor] refactor deltalake getRemoteFiles (StarRocks#51655)
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt authored Oct 10, 2024
1 parent 48bedcc commit fe227d6
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 70 deletions.
33 changes: 17 additions & 16 deletions fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.catalog;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -47,10 +46,8 @@ public class DeltaLakeTable extends Table {
private String tableLocation;
private Engine deltaEngine;


public static final String PARTITION_NULL_VALUE = "null";


public DeltaLakeTable() {
super(TableType.DELTALAKE);
}
Expand Down Expand Up @@ -126,10 +123,25 @@ public boolean isUnPartitioned() {
return partColumnNames.size() == 0;
}

public THdfsPartition toHdfsPartition(DescriptorTable.ReferencedPartitionInfo info) {
Metadata deltaMetadata = getDeltaMetadata();
PartitionKey key = info.getKey();
THdfsPartition tPartition = new THdfsPartition();
tPartition.setFile_format(DeltaUtils.getRemoteFileFormat(deltaMetadata.getFormat().getProvider()).toThrift());

List<LiteralExpr> keys = key.getKeys();
tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList()));

THdfsPartitionLocation tPartitionLocation = new THdfsPartitionLocation();
tPartitionLocation.setPrefix_index(-1);
tPartitionLocation.setSuffix(info.getPath());
tPartition.setLocation(tPartitionLocation);
return tPartition;
}

@Override
public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> partitions) {
Preconditions.checkNotNull(partitions);
Metadata deltaMetadata = getDeltaMetadata();

TDeltaLakeTable tDeltaLakeTable = new TDeltaLakeTable();
tDeltaLakeTable.setLocation(getTableLocation());
Expand All @@ -155,19 +167,8 @@ public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> p
}

for (DescriptorTable.ReferencedPartitionInfo info : partitions) {
PartitionKey key = info.getKey();
long partitionId = info.getId();

THdfsPartition tPartition = new THdfsPartition();
tPartition.setFile_format(DeltaUtils.getRemoteFileFormat(deltaMetadata.getFormat().getProvider()).toThrift());

List<LiteralExpr> keys = key.getKeys();
tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList()));

THdfsPartitionLocation tPartitionLocation = new THdfsPartitionLocation();
tPartitionLocation.setPrefix_index(-1);
tPartitionLocation.setSuffix(info.getPath());
tPartition.setLocation(tPartitionLocation);
THdfsPartition tPartition = toHdfsPartition(info);
tDeltaLakeTable.putToPartitions(partitionId, tPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public class RemoteFileInfo {
private RemoteFileInputFormat format;
private List<RemoteFileDesc> files = new ArrayList<>();
private List<RemoteFileDesc> files;
private String fullPath;

private Object attachment;
Expand All @@ -33,6 +33,7 @@ public RemoteFileInfo(RemoteFileInputFormat format, List<RemoteFileDesc> files,
}

public RemoteFileInfo() {
this.files = new ArrayList<>();
}

public RemoteFileInputFormat getFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.starrocks.catalog.DeltaLakeTable;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.Pair;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
Expand All @@ -32,11 +30,11 @@
import com.starrocks.connector.PredicateSearchKey;
import com.starrocks.connector.RemoteFileDesc;
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
Expand All @@ -46,11 +44,9 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.ScanBuilderImpl;
import io.delta.kernel.internal.ScanImpl;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
Expand All @@ -59,6 +55,7 @@

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -127,7 +124,7 @@ public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams par

List<FileScanTask> scanTasks = splitTasks.get(key);
if (scanTasks == null) {
throw new StarRocksConnectorException("Missing iceberg split task for table:[{}.{}]. predicate:[{}]",
throw new StarRocksConnectorException("Missing deltalake split task for table:[{}.{}]. predicate:[{}]",
dbName, tableName, params.getPredicate());
}

Expand All @@ -137,6 +134,11 @@ public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams par
return Lists.newArrayList(remoteFileInfo);
}

@Override
public RemoteFileInfoSource getRemoteFilesAsync(Table table, GetRemoteFilesParams params) {
return buildRemoteInfoSource(table, params.getPredicate(), false);
}

@Override
public Statistics getTableStatistics(OptimizerContext session, Table table, Map<ColumnRefOperator, Column> columns,
List<PartitionKey> partitionKeys, ScalarOperator predicate, long limit,
Expand Down Expand Up @@ -190,8 +192,8 @@ private void triggerDeltaLakePlanFilesIfNeeded(PredicateSearchKey key, Table tab
}
}

private void collectDeltaLakePlanFiles(PredicateSearchKey key, Table table, ScalarOperator operator,
ConnectContext connectContext, List<String> fieldNames) {
private Iterator<Pair<FileScanTask, DeltaLakeAddFileStatsSerDe>> buildFileScanTaskIterator(
Table table, ScalarOperator operator, boolean enableCollectColumnStats) {
DeltaLakeTable deltaLakeTable = (DeltaLakeTable) table;
Metadata metadata = deltaLakeTable.getDeltaMetadata();
Engine engine = deltaLakeTable.getDeltaEngine();
Expand All @@ -207,6 +209,77 @@ private void collectDeltaLakePlanFiles(PredicateSearchKey key, Table table, Scal

ScanBuilderImpl scanBuilder = (ScanBuilderImpl) snapshot.getScanBuilder(engine);
ScanImpl scan = (ScanImpl) scanBuilder.withFilter(engine, deltaLakePredicate).build();
long estimateRowSize = table.getColumns().stream().mapToInt(column -> column.getType().getTypeSize()).sum();
return new Iterator<>() {
CloseableIterator<FilteredColumnarBatch> scanFilesAsBatches;
CloseableIterator<Row> scanFileRows;
boolean hasMore = true;

@Override
public boolean hasNext() {
if (hasMore) {
ensureOpen();
}
return hasMore;
}

@Override
public Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> next() {
ensureOpen();
Row scanFileRow = scanFileRows.next();
Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> pair =
ScanFileUtils.convertFromRowToFileScanTask(enableCollectColumnStats, scanFileRow, estimateRowSize);
return pair;
}

private void ensureOpen() {
try {
if (scanFilesAsBatches == null) {
scanFilesAsBatches = scan.getScanFiles(engine, true);
}
while (scanFileRows == null || !scanFileRows.hasNext()) {
if (!scanFilesAsBatches.hasNext()) {
scanFilesAsBatches.close();
hasMore = false;
break;
}
if (scanFileRows != null) {
scanFileRows.close();
}
FilteredColumnarBatch scanFileBatch = scanFilesAsBatches.next();
scanFileRows = scanFileBatch.getRows();
}
} catch (IOException e) {
LOG.error("Failed to get delta lake scan files", e);
throw new StarRocksConnectorException("Failed to get delta lake scan files", e);
}
}
};
}

private RemoteFileInfoSource buildRemoteInfoSource(Table table, ScalarOperator operator, boolean enableCollectColumnStats) {
Iterator<Pair<FileScanTask, DeltaLakeAddFileStatsSerDe>> iterator =
buildFileScanTaskIterator(table, operator, enableCollectColumnStats);
return new RemoteFileInfoSource() {
@Override
public RemoteFileInfo getOutput() {
Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> pair = iterator.next();
return new DeltaRemoteFileInfo(pair.first);
}

@Override
public boolean hasMoreOutput() {
return iterator.hasNext();
}
};
}

private void collectDeltaLakePlanFiles(PredicateSearchKey key, Table table, ScalarOperator operator,
ConnectContext connectContext, List<String> fieldNames) {
DeltaLakeTable deltaLakeTable = (DeltaLakeTable) table;
Metadata metadata = deltaLakeTable.getDeltaMetadata();
StructType schema = metadata.getSchema();
Set<String> partitionColumns = metadata.getPartitionColNames();

Set<String> nonPartitionPrimitiveColumns;
Set<String> partitionPrimitiveColumns;
Expand All @@ -229,48 +302,25 @@ private void collectDeltaLakePlanFiles(PredicateSearchKey key, Table table, Scal
}

List<FileScanTask> files = Lists.newArrayList();

long estimateRowSize = table.getColumns().stream().mapToInt(column -> column.getType().getTypeSize()).sum();

try (CloseableIterator<FilteredColumnarBatch> scanFilesAsBatches = scan.getScanFiles(engine, true)) {
while (scanFilesAsBatches.hasNext()) {
FilteredColumnarBatch scanFileBatch = scanFilesAsBatches.next();
try (CloseableIterator<Row> scanFileRows = scanFileBatch.getRows()) {
while (scanFileRows.hasNext()) {
Row scanFileRow = scanFileRows.next();
DeletionVectorDescriptor dv = InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFileRow);
if (dv != null) {
ErrorReport.reportValidateException(ErrorCode.ERR_BAD_TABLE_ERROR, ErrorType.UNSUPPORTED,
"Delta table feature [deletion vectors] is not supported");
}

if (enableCollectColumnStatistics(connectContext)) {
Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> pair =
ScanFileUtils.convertFromRowToFileScanTask(true, scanFileRow, estimateRowSize);
files.add(pair.first);

try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.updateDeltaLakeFileStats")) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
} else {
Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> pair =
ScanFileUtils.convertFromRowToFileScanTask(false, scanFileRow, estimateRowSize);
files.add(pair.first);

try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.updateDeltaLakeCardinality")) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
}
}
boolean enableCollectColumnStats = enableCollectColumnStatistics(connectContext);
Iterator<Pair<FileScanTask, DeltaLakeAddFileStatsSerDe>> iterator =
buildFileScanTaskIterator(table, operator, enableCollectColumnStats);
while (iterator.hasNext()) {
Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> pair = iterator.next();
files.add(pair.first);

if (enableCollectColumnStats) {
try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.updateDeltaLakeFileStats")) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
} else {
try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.updateDeltaLakeCardinality")) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
}
} catch (IOException e) {
LOG.error("Failed to get delta lake scan files", e);
throw new StarRocksConnectorException("Failed to get delta lake scan files", e);
}

splitTasks.put(key, files);
scannedTables.add(key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.delta;

import com.starrocks.connector.RemoteFileInfo;

public class DeltaRemoteFileInfo extends RemoteFileInfo {
private FileScanTask fileScanTask;

public DeltaRemoteFileInfo(FileScanTask fileScanTask) {
this.fileScanTask = fileScanTask;
}

FileScanTask getFileScanTask() {
return fileScanTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public class HdfsScanNode extends ScanNode {
private CloudConfiguration cloudConfiguration = null;
private final HDFSScanNodePredicates scanNodePredicates = new HDFSScanNodePredicates();

private DescriptorTable descTbl;

public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName);
hiveTable = (HiveTable) desc.getTable();
Expand All @@ -86,7 +84,6 @@ protected String debugString() {
}

public void setupScanRangeLocations(DescriptorTable descTbl) {
this.descTbl = descTbl;
this.scanRangeSource = new HiveConnectorScanRangeSource(descTbl, hiveTable, scanNodePredicates);
this.scanRangeSource.setup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testCreateDeltaLakeConnector() {
@Test
public void testCreateDeltaLakeConnectorWithException1() {
Map<String, String> properties = ImmutableMap.of("type", "deltalake",
"hive.metastore.TYPE", "glue", "aws.glue.access_key", "xxxxx",
"hive.metastore.TYPE", "glue", "aws.glue.access_key", "xxxxx",
"aws.glue.secret_key", "xxxx",
"aws.glue.region", "us-west-2");
try {
Expand All @@ -60,7 +60,7 @@ public void testCreateDeltaLakeConnectorWithException1() {
@Test
public void testCreateDeltaLakeConnectorWithException2() {
Map<String, String> properties = ImmutableMap.of("type", "deltalake",
"hive.metastore.type", "error_metastore", "aws.glue.access_key", "xxxxx",
"hive.metastore.type", "error_metastore", "aws.glue.access_key", "xxxxx",
"aws.glue.secret_key", "xxxx",
"aws.glue.region", "us-west-2");
try {
Expand All @@ -70,7 +70,7 @@ public void testCreateDeltaLakeConnectorWithException2() {
Assert.assertTrue(e instanceof StarRocksConnectorException);
Assert.assertEquals("Failed to init connector [type: deltalake, name: delta0]. " +
"msg: Getting analyzing error. Detail message: hive metastore type [error_metastore] " +
"is not supported.", e.getMessage());
"is not supported.", e.getMessage());
}
}

Expand All @@ -84,4 +84,11 @@ public void testDeltaLakeConnectorMemUsage() {
Assert.assertEquals(0, catalogConnector.estimateSize());
Assert.assertEquals(4, catalogConnector.estimateCount().size());
}

@Test
public void testDeltaLakeRemoteFileInfo() {
FileScanTask fileScanTask = null;
DeltaRemoteFileInfo deltaRemoteFileInfo = new DeltaRemoteFileInfo(fileScanTask);
Assert.assertNull(deltaRemoteFileInfo.getFileScanTask());
}
}

0 comments on commit fe227d6

Please sign in to comment.