Skip to content

Commit

Permalink
[BugFix] Fix physical partition for rebalance (StarRocks#46402)
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo authored Oct 22, 2024
1 parent 6f54f6a commit 609614d
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 137 deletions.
12 changes: 8 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,9 @@ public TTableDescriptor toThrift(List<ReferencedPartitionInfo> partitions) {
public long getRowCount() {
long rowCount = 0;
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
rowCount += entry.getValue().getBaseIndex().getRowCount();
for (PhysicalPartition partition : entry.getValue().getSubPartitions()) {
rowCount += partition.getBaseIndex().getRowCount();
}
}
return rowCount;
}
Expand Down Expand Up @@ -3162,9 +3164,11 @@ public void removeTabletsFromInvertedIndex() {
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
Collection<Partition> allPartitions = getAllPartitions();
for (Partition partition : allPartitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}
Expand Down

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.MysqlTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Table;
Expand Down Expand Up @@ -362,7 +362,7 @@ private void genTaskFragments(List<PlanFragment> fragments, List<ScanNode> scanN
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
long dataSize = 0L;
if (tabletMeta.isLakeTablet()) {
Partition partition = exportTable.getPartition(tabletMeta.getPartitionId());
PhysicalPartition partition = exportTable.getPhysicalPartition(tabletMeta.getPhysicalPartitionId());
if (partition != null) {
MaterializedIndex index = partition.getIndex(tabletMeta.getIndexId());
if (index != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.SinglePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -408,9 +409,11 @@ private void gc(boolean isReplay) {

Partition partition = targetTable.getPartition(pid);
if (partition != null) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
// hash set is able to deduplicate the elements
sourceTablets.addAll(index.getTablets());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
}
targetTable.dropTempPartition(partition.getName(), true);
} else {
Expand Down Expand Up @@ -470,8 +473,10 @@ private void doCommit(boolean isReplay) {
Set<Tablet> sourceTablets = Sets.newHashSet();
sourcePartitionNames.forEach(name -> {
Partition partition = targetTable.getPartition(name);
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
}
});
long sumSourceRows = job.getSourcePartitionIds().stream()
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/load/PartitionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -153,9 +154,12 @@ public static void createAndAddTempPartitionsForTable(Database db, OlapTable tar
public static void clearTabletsFromInvertedIndex(List<Partition> partitions) {
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
for (Partition partition : partitions) {
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : materializedIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex materializedIndex : subPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : materializedIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
import com.starrocks.lake.LakeTablet;
Expand Down Expand Up @@ -65,8 +65,8 @@ public MetaScanNode(PlanNodeId id, TupleDescriptor desc, OlapTable olapTable,
}

public void computeRangeLocations() {
Collection<Partition> partitions = olapTable.getPartitions();
for (Partition partition : partitions) {
Collection<PhysicalPartition> partitions = olapTable.getPhysicalPartitions();
for (PhysicalPartition partition : partitions) {
MaterializedIndex index = partition.getBaseIndex();
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
List<Tablet> tablets = index.getTablets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4747,10 +4747,12 @@ public void onEraseDatabase(long dbId) {
public void onErasePartition(Partition partition) {
// remove tablet in inverted index
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/sql/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.optimizer.ExpressionContext;
Expand Down Expand Up @@ -204,8 +205,10 @@ public OperatorStr visitPhysicalOlapScan(OptExpression optExpression, OperatorPr
int totalTabletsNum = 0;
for (Long partitionId : scan.getSelectedPartitionId()) {
final Partition partition = ((OlapTable) scan.getTable()).getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(scan.getSelectedIndexId());
totalTabletsNum += selectedTable.getTablets().size();
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
final MaterializedIndex selectedTable = subPartition.getIndex(scan.getSelectedIndexId());
totalTabletsNum += selectedTable.getTablets().size();
}
}
String partitionAndBucketInfo = "partitionRatio: " +
scan.getSelectedPartitionId().size() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public void testBalance(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, anyLong);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, anyLong);
result = partition;
minTimes = 0;

Expand Down Expand Up @@ -312,7 +313,8 @@ public void testBalanceWithSameHost(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, anyLong);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, anyLong);
result = partition;
minTimes = 0;

Expand Down Expand Up @@ -494,11 +496,13 @@ public void testBalanceBackendTablet(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, partitionId1);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, partitionId1);
result = partition1;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, partitionId2);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, partitionId2);
result = partition2;
minTimes = 0;

Expand Down Expand Up @@ -685,7 +689,8 @@ public void testBalanceParallel(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, anyLong);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, anyLong);
result = partition;
minTimes = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@

package com.starrocks.load;

import com.google.common.collect.Lists;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.HashDistributionInfo;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Type;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class PartitionUtilsTest {
Expand All @@ -33,4 +41,15 @@ public void testConvertDateLiteralToDouble() throws Exception {
assertEquals(20150301120000L, result);
}

@Test
public void testClearTabletsFromInvertedIndex() throws Exception {
List<Partition> partitions = Lists.newArrayList();
MaterializedIndex materializedIndex = new MaterializedIndex();
HashDistributionInfo distributionInfo =
new HashDistributionInfo(1, Lists.newArrayList(new Column("id", Type.BIGINT)));

Partition p1 = new Partition(10001L, "p1", materializedIndex, distributionInfo);
partitions.add(p1);
PartitionUtils.clearTabletsFromInvertedIndex(partitions);
}
}

0 comments on commit 609614d

Please sign in to comment.