Skip to content

Commit

Permalink
[Enhancement] List Partition For AMV(Part 4): Support more patterns f…
Browse files Browse the repository at this point in the history
…or list partition materialized views (StarRocks#50844)

Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing authored Sep 13, 2024
1 parent 73b3c70 commit 72f95bf
Show file tree
Hide file tree
Showing 22 changed files with 852 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio
});

ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv, refBaseTablePartitionMap,
allBasePartitionItems, tableRefIdxes);
allBasePartitionItems, tableRefIdxes, isQueryRewrite);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
Expand Down Expand Up @@ -131,7 +131,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
TableProperty.QueryRewriteConsistencyMode.LOOSE);
ListPartitionDiff listPartitionDiff = null;
try {
ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv);
ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv, isQueryRewrite);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.analysis.Expr;
Expand Down Expand Up @@ -83,9 +82,8 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep
mvTimelinessInfo);

// collect all ref base table's partition range map
Map<Table, Map<String, Set<String>>> extBaseToMVPartitionNameMap = Maps.newHashMap();
Map<Table, Map<String, Range<PartitionKey>>> basePartitionNameToRangeMap =
RangePartitionDiffer.syncBaseTablePartitionInfos(mv, partitionExpr, extBaseToMVPartitionNameMap);
RangePartitionDiffer.syncBaseTablePartitionInfos(mv, partitionExpr);

// If base table is materialized view, add partition name to cell mapping into base table partition mapping,
// otherwise base table(mv) may lose partition names of the real base table changed partitions.
Expand All @@ -100,7 +98,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep

// There may be a performance issue here, because it will fetch all partitions of base tables and mv partitions.
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv, null,
extBaseToMVPartitionNameMap, basePartitionNameToRangeMap, isQueryRewrite);
basePartitionNameToRangeMap, isQueryRewrite);
if (differ == null) {
throw new AnalysisException(String.format("Compute partition difference of mv %s with base table failed.",
mv.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public boolean syncAddOrDropPartitions() throws LockTimeoutException {

ListPartitionDiffResult result;
try {
result = ListPartitionDiffer.computeListPartitionDiff(mv);
result = ListPartitionDiffer.computeListPartitionDiff(mv, false);
if (result == null) {
LOG.warn("compute list partition diff failed: mv: {}", mv.getName());
return false;
Expand Down Expand Up @@ -128,6 +128,7 @@ public boolean syncAddOrDropPartitions() throws LockTimeoutException {
mvContext.setRefBaseTableMVIntersectedPartitions(baseToMvNameRef);
mvContext.setMvRefBaseTableIntersectedPartitions(mvToBaseNameRef);
mvContext.setRefBaseTableListPartitionMap(refBaseTablePartitionMap);
mvContext.setExternalRefBaseTableMVPartitionMap(result.getRefBaseTableMVPartitionMap());
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean syncAddOrDropPartitions() throws AnalysisException {
mvContext.setRefBaseTableMVIntersectedPartitions(baseToMvNameRef);
mvContext.setMvRefBaseTableIntersectedPartitions(mvToBaseNameRef);
mvContext.setRefBaseTableRangePartitionMap(result.refBaseTablePartitionMap);
mvContext.setExternalRefBaseTableMVPartitionMap(result.refBaseTableMVPartitionMap);
mvContext.setExternalRefBaseTableMVPartitionMap(result.getRefBaseTableMVPartitionMap());
return true;
}

Expand Down
48 changes: 26 additions & 22 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PhysicalPartitionImpl;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.RandomDistributionInfo;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Replica;
Expand Down Expand Up @@ -3007,7 +3006,7 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)
materializedView.getWarehouseId());
materializedView.addPartition(partition);
} else {
Expr partitionExpr = stmt.getPartitionExpDesc().getExpr();
Expr partitionExpr = stmt.getPartitionByExpr();
Map<Expr, SlotRef> partitionExprMaps = MVPartitionExprResolver.getMVPartitionExprsChecked(partitionExpr,
stmt.getQueryStatement(), stmt.getBaseTableInfos());
LOG.info("Generate mv {} partition exprs: {}", mvName, partitionExprMaps);
Expand Down Expand Up @@ -3051,30 +3050,35 @@ private long getRandomStart(IntervalLiteral interval, long randomizeStart) throw
}

public static PartitionInfo buildPartitionInfo(CreateMaterializedViewStatement stmt) throws DdlException {
ExpressionPartitionDesc expressionPartitionDesc = stmt.getPartitionExpDesc();
if (expressionPartitionDesc != null) {
Expr expr = expressionPartitionDesc.getExpr();
if (expr instanceof SlotRef) {
SlotRef slotRef = (SlotRef) expr;
if (slotRef.getType().getPrimitiveType() == PrimitiveType.VARCHAR) {
return new ListPartitionInfo(PartitionType.LIST,
Collections.singletonList(stmt.getPartitionColumn()));
Expr partitionByExpr = stmt.getPartitionByExpr();
PartitionType partitionType = stmt.getPartitionType();
if (partitionByExpr != null) {
if (partitionType == PartitionType.LIST) {
if (!(partitionByExpr instanceof SlotRef)) {
throw new DdlException("List partition only support partition by slot ref column:"
+ partitionByExpr.toSql());
}
}
if ((expr instanceof FunctionCallExpr)) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
if (functionCallExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.STR2DATE)) {
Column partitionColumn = new Column(stmt.getPartitionColumn());
partitionColumn.setType(com.starrocks.catalog.Type.DATE);
return expressionPartitionDesc.toPartitionInfo(
Collections.singletonList(partitionColumn),
Maps.newHashMap(), false);
return new ListPartitionInfo(PartitionType.LIST, Collections.singletonList(stmt.getPartitionColumn()));
} else {
ExpressionPartitionDesc expressionPartitionDesc = new ExpressionPartitionDesc(partitionByExpr);
if ((partitionByExpr instanceof FunctionCallExpr)) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) partitionByExpr;
if (functionCallExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.STR2DATE)) {
Column partitionColumn = new Column(stmt.getPartitionColumn());
partitionColumn.setType(com.starrocks.catalog.Type.DATE);
return expressionPartitionDesc.toPartitionInfo(
Collections.singletonList(partitionColumn),
Maps.newHashMap(), false);
}
}
return expressionPartitionDesc.toPartitionInfo(
Collections.singletonList(stmt.getPartitionColumn()),
Maps.newHashMap(), false);
}
return expressionPartitionDesc.toPartitionInfo(
Collections.singletonList(stmt.getPartitionColumn()),
Maps.newHashMap(), false);
} else {
if (partitionType != PartitionType.UNPARTITIONED && partitionType != null) {
throw new DdlException("Partition type is " + stmt.getPartitionType() + ", but partition by expr is null");
}
return new SinglePartitionInfo();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
Expand All @@ -68,7 +69,6 @@
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.DistributionDesc;
import com.starrocks.sql.ast.DropMaterializedViewStmt;
import com.starrocks.sql.ast.ExpressionPartitionDesc;
import com.starrocks.sql.ast.HashDistributionDesc;
import com.starrocks.sql.ast.IndexDef;
import com.starrocks.sql.ast.PartitionRangeDesc;
Expand All @@ -93,6 +93,7 @@
import com.starrocks.sql.optimizer.transformer.RelationTransformer;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanFragmentBuilder;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -336,7 +337,7 @@ public Void visitCreateMaterializedViewStatement(CreateMaterializedViewStatement
columnExprMap.put(pair.first, outputExpressions.get(pair.second));
}
// some check if partition exp exists
if (statement.getPartitionExpDesc() != null) {
if (statement.getPartitionByExpr() != null) {
// check partition expression all in column list and
// write the expr into partitionExpDesc if partition expression exists
checkExpInColumn(statement);
Expand All @@ -346,7 +347,10 @@ public Void visitCreateMaterializedViewStatement(CreateMaterializedViewStatement
checkPartitionExpPatterns(statement);
// check partition column must be base table's partition column
checkPartitionColumnWithBaseTable(statement, aliasTableMap);
// check window function can be used in partitioned mv
checkWindowFunctions(statement, columnExprMap);
// determine mv partition 's type: list or range
determinePartitionType(statement, aliasTableMap);
}
// check and analyze distribution
checkDistribution(statement, aliasTableMap);
Expand All @@ -365,9 +369,8 @@ public Void visitCreateMaterializedViewStatement(CreateMaterializedViewStatement
*/
private Map<TableName, Table> getNormalizedBaseTables(QueryStatement queryStatement, ConnectContext context) {
Map<TableName, Table> aliasTableMap = getAllBaseTables(queryStatement, context);

// do normalization if catalog is null
Map<TableName, Table> result = Maps.newHashMap();
Map<TableName, Table> result = new CaseInsensitiveMap();
for (Map.Entry<TableName, Table> entry : aliasTableMap.entrySet()) {
entry.getKey().normalization(context);
result.put(entry.getKey(), entry.getValue());
Expand Down Expand Up @@ -634,12 +637,12 @@ private List<Index> genMaterializedViewIndexes(CreateMaterializedViewStatement s
}

private void checkExpInColumn(CreateMaterializedViewStatement statement) {
ExpressionPartitionDesc expressionPartitionDesc = statement.getPartitionExpDesc();
Expr partitionByExpr = statement.getPartitionByExpr();
List<Column> columns = statement.getMvColumnItems();
SlotRef slotRef = getSlotRef(expressionPartitionDesc.getExpr());
SlotRef slotRef = getSlotRef(partitionByExpr);
if (slotRef.getTblNameWithoutAnalyzed() != null) {
throw new SemanticException("Materialized view partition exp: "
+ slotRef.toSql() + " must related to column", expressionPartitionDesc.getExpr().getPos());
+ slotRef.toSql() + " must related to column", partitionByExpr.getPos());
}
int columnId = 0;
for (Column column : columns) {
Expand Down Expand Up @@ -674,8 +677,11 @@ private boolean isValidPartitionExpr(Expr partitionExpr) {
private void checkPartitionColumnExprs(CreateMaterializedViewStatement statement,
Map<Column, Expr> columnExprMap,
ConnectContext connectContext) throws SemanticException {
ExpressionPartitionDesc expressionPartitionDesc = statement.getPartitionExpDesc();
Expr partitionByExpr = statement.getPartitionByExpr();
Column partitionColumn = statement.getPartitionColumn();
if (partitionByExpr == null) {
return;
}

// partition column expr from input query
Expr partitionColumnExpr = columnExprMap.get(partitionColumn);
Expand All @@ -684,13 +690,13 @@ private void checkPartitionColumnExprs(CreateMaterializedViewStatement statement
} catch (Exception e) {
LOG.warn("resolve partition column failed", e);
throw new SemanticException("Resolve partition column failed:" + e.getMessage(),
statement.getPartitionExpDesc().getPos());
statement.getPartitionByExpr().getPos());
}

// set partition-ref into statement
if (expressionPartitionDesc.isFunction()) {
if (partitionByExpr instanceof FunctionCallExpr) {
// e.g. partition by date_trunc('month', dt)
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expressionPartitionDesc.getExpr();
FunctionCallExpr functionCallExpr = (FunctionCallExpr) partitionByExpr;
String functionName = functionCallExpr.getFnName().getFunction();
if (!PartitionFunctionChecker.FN_NAME_TO_PATTERN.containsKey(functionName)) {
throw new SemanticException("Materialized view partition function " +
Expand Down Expand Up @@ -718,7 +724,7 @@ private void checkPartitionColumnExprs(CreateMaterializedViewStatement statement
statement.setPartitionRefTableExpr(partitionColumnExpr);
} else {
throw new SemanticException("Materialized view partition function must related with column",
expressionPartitionDesc.getPos());
partitionByExpr.getPos());
}
}
}
Expand Down Expand Up @@ -809,6 +815,61 @@ private void checkPartitionColumnWithBaseTable(CreateMaterializedViewStatement s
replaceTableAlias(slotRef, statement, tableNameTableMap);
}

private void determinePartitionType(CreateMaterializedViewStatement statement,
Map<TableName, Table> tableNameTableMap) {
Expr partitionRefTableExpr = statement.getPartitionRefTableExpr();
if (partitionRefTableExpr == null) {
statement.setPartitionType(PartitionType.UNPARTITIONED);
return;
}

SlotRef slotRef = getSlotRef(partitionRefTableExpr);
Table refBaseTable = tableNameTableMap.get(slotRef.getTblNameWithoutAnalyzed());
if (refBaseTable == null) {
LOG.warn("Materialized view partition expression %s could only ref to base table",
slotRef.toSql());
statement.setPartitionType(PartitionType.UNPARTITIONED);
return;
}

if (refBaseTable.isNativeTableOrMaterializedView()) {
// To olap table, determine mv's partition by its ref base table's partition type.
OlapTable olapTable = (OlapTable) refBaseTable;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.isRangePartition()) {
// set the partition type
statement.setPartitionType(PartitionType.RANGE);
} else if (partitionInfo.isListPartition()) {
// set the partition type
statement.setPartitionType(PartitionType.LIST);
}
} else {
List<Column> refPartitionCols = refBaseTable.getPartitionColumns();
Optional<Column> refPartitionColOpt = refPartitionCols.stream()
.filter(col -> col.getName().equals(slotRef.getColumnName()))
.findFirst();
if (refPartitionColOpt.isEmpty()) {
throw new SemanticException("Materialized view partition column in partition exp " +
"must be base table partition column", partitionRefTableExpr.getPos());
}
Column refPartitionCol = refPartitionColOpt.get();
Type partitionExprType = refPartitionCol.getType();
Expr partitionByExpr = statement.getPartitionByExpr();
// To olap table, determine mv's partition by its ref base table's partition column type:
// - if the partition column is string type && no use `str2date`, use list partition.
// - otherwise use range partition as before.
// To be compatible with old implementations, if the partition column is not a string type,
// still use range partition.
// TODO: remove this compatibility code in the future, use list partition directly later.
if (partitionExprType.isStringType() && (partitionByExpr instanceof SlotRef) &&
!(partitionRefTableExpr instanceof FunctionCallExpr)) {
statement.setPartitionType(PartitionType.LIST);
} else {
statement.setPartitionType(PartitionType.RANGE);
}
}
}

private void checkPartitionColumnWithBaseOlapTable(SlotRef slotRef, OlapTable table) {
PartitionInfo partitionInfo = table.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
Expand Down Expand Up @@ -1007,7 +1068,7 @@ boolean replacePaimonTableAlias(SlotRef slotRef, PaimonTable paimonTable, BaseTa

private void checkPartitionColumnType(Column partitionColumn) {
PrimitiveType type = partitionColumn.getPrimitiveType();
if (!type.isFixedPointType() && !type.isDateType() && type != PrimitiveType.CHAR && type != PrimitiveType.VARCHAR) {
if (!type.isFixedPointType() && !type.isDateType() && !type.isStringType()) {
throw new SemanticException("Materialized view partition exp column:"
+ partitionColumn.getName() + " with type " + type + " not supported");
}
Expand Down
Loading

0 comments on commit 72f95bf

Please sign in to comment.