Skip to content

Commit

Permalink
[BugFix] Fix online optimize conflict with expression partition (Star…
Browse files Browse the repository at this point in the history
…Rocks#52074)

Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo authored Oct 22, 2024
1 parent 9fe4708 commit 6f54f6a
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 25 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/tablet_sink_index_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ Status NodeChannel::_wait_request(ReusableClosure<PTabletWriterAddBatchResult>*
}
}

std::vector<int64_t> tablet_ids;
std::set<int64_t> tablet_ids;
for (auto& tablet : closure->result.tablet_vec()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet.tablet_id();
Expand All @@ -818,7 +818,7 @@ Status NodeChannel::_wait_request(ReusableClosure<PTabletWriterAddBatchResult>*
_tablet_commit_infos.emplace_back(std::move(commit_info));

if (tablet_ids.size() < 128) {
tablet_ids.emplace_back(commit_info.tabletId);
tablet_ids.insert(commit_info.tabletId);
}
}
for (auto& log : *(closure->result.mutable_lake_tablet_data()->mutable_txn_logs())) {
Expand Down
23 changes: 18 additions & 5 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ void LocalTabletsChannel::_abort_replica_tablets(
cancel_request.set_txn_id(_txn_id);
cancel_request.set_index_id(_index_id);
cancel_request.set_reason(abort_reason);
cancel_request.set_sink_id(request.sink_id());

auto closure = new ReusableClosure<PTabletWriterCancelResult>();

Expand Down Expand Up @@ -589,8 +590,9 @@ void LocalTabletsChannel::_commit_tablets(const PTabletWriterAddChunkRequest& re
}
string commit_tablet_id_list_str;
JoinInts(commit_tablet_ids, ",", &commit_tablet_id_list_str);
LOG(INFO) << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id()) << " commit "
<< commit_tablet_ids.size() << " tablets: " << commit_tablet_id_list_str;
LOG(INFO) << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id())
<< " sink_id: " << request.sink_id() << " commit " << commit_tablet_ids.size()
<< " tablets: " << commit_tablet_id_list_str;

// abort seconary replicas located on other nodes which have no data
_abort_replica_tablets(request, "", node_id_to_abort_tablets);
Expand Down Expand Up @@ -713,8 +715,16 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
}
if (_is_replicated_storage) {
std::stringstream ss;
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id()) << " open "
<< _delta_writers.size() << " delta writers, " << failed_tablet_ids.size() << " failed_tablets: ";
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
<< " sink_id: " << params.sink_id() << " open " << _delta_writers.size() << " delta writer: ";
int i = 0;
for (auto& [tablet_id, delta_writer] : _delta_writers) {
ss << "[" << tablet_id << ":" << delta_writer->replica_state() << "]";
if (++i > 128) {
break;
}
}
ss << failed_tablet_ids.size() << " failed_tablets: ";
for (auto& tablet_id : failed_tablet_ids) {
ss << tablet_id << ",";
}
Expand Down Expand Up @@ -759,6 +769,9 @@ void LocalTabletsChannel::abort(const std::vector<int64_t>& tablet_ids, const st
if (it != _delta_writers.end()) {
it->second->cancel(Status::Cancelled(reason));
it->second->abort(abort_with_exception);
} else {
LOG(WARNING) << "tablet_id: " << tablet_id << " not found in LocalTabletsChannel txn_id: " << _txn_id
<< " load_id: " << _key.id << " index_id: " << _key.index_id;
}
}
string tablet_id_list_str;
Expand Down Expand Up @@ -842,7 +855,7 @@ Status LocalTabletsChannel::incremental_open(const PTabletWriterOpenRequest& par
size_t incremental_tablet_num = 0;
std::stringstream ss;
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
<< " incremental open delta writer: ";
<< " sink_id: " << params.sink_id() << " incremental open delta writer: ";

for (const PTabletWithPartition& tablet : params.tablets()) {
if (_delta_writers.count(tablet.tablet_id()) != 0) {
Expand Down
8 changes: 7 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,13 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {
} else {
// table is stable, set is to ROLLUP and begin altering.
LOG.info("table {} is stable, start job{}, type {}", tableId, jobId, type);
tbl.setState(type == JobType.ROLLUP ? OlapTableState.ROLLUP : OlapTableState.SCHEMA_CHANGE);
if (type == JobType.ROLLUP) {
tbl.setState(OlapTableState.ROLLUP);
} else if (type == JobType.OPTIMIZE) {
tbl.setState(OlapTableState.OPTIMIZE);
} else {
tbl.setState(OlapTableState.SCHEMA_CHANGE);
}
return true;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public enum AlterOpType {
}

public boolean needCheckCapacity() {
return this == ADD_ROLLUP || this == SCHEMA_CHANGE || this == ADD_PARTITION;
return this == ADD_ROLLUP || this == SCHEMA_CHANGE || this == ADD_PARTITION || this == OPTIMIZE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private void enableDoubleWritePartition(Database db, OlapTable tbl, String sourc
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.WRITE);
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
Preconditions.checkState(tbl.getState() == OlapTableState.OPTIMIZE);
tbl.addDoubleWritePartition(sourcePartitionName, tmpPartitionName);
LOG.info("job {} add double write partition {} to {}", jobId, tmpPartitionName, sourcePartitionName);
} finally {
Expand All @@ -306,7 +306,6 @@ private void enableDoubleWritePartition(Database db, OlapTable tbl, String sourc
private void disableDoubleWritePartition(Database db, OlapTable tbl) {
try (AutoCloseableLock ignored =
new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(tbl.getId()), LockType.WRITE)) {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
tbl.clearDoubleWritePartition();
LOG.info("job {} clear double write partitions", jobId);
}
Expand Down Expand Up @@ -596,7 +595,7 @@ private void replayPending(OnlineOptimizeJobV2 replayedJob) {
return;
}
// set table state
tbl.setState(OlapTableState.SCHEMA_CHANGE);
tbl.setState(OlapTableState.OPTIMIZE);
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ private void replayPending(OptimizeJobV2 replayedJob) {
try (AutoCloseableLock ignore =
new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(tbl.getId()), LockType.WRITE)) {
// set table state
tbl.setState(OlapTableState.SCHEMA_CHANGE);
tbl.setState(OlapTableState.OPTIMIZE);
}

this.jobState = JobState.PENDING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public AlterJobV2 build() throws UserException {
OptimizeJobV2 optimizeJob = new OptimizeJobV2(jobId, dbId, tableId, table.getName(), timeoutMs, optimizeClause);
return optimizeJob;
} else {
LOG.info("Online optimize job is created, table: {}", table.getName());
LOG.info("Online optimize job {} is created, table: {}", jobId, table.getName());
OnlineOptimizeJobV2 onlineOptimizeJob = new OnlineOptimizeJobV2(
jobId, dbId, tableId, table.getName(), timeoutMs, optimizeClause);
return onlineOptimizeJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,11 @@ public ShowResultSet process(List<AlterClause> alterClauses, Database db, OlapTa
}

// set table state
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
if (schemaChangeJob.getType() == AlterJobV2.JobType.OPTIMIZE) {
olapTable.setState(OlapTableState.OPTIMIZE);
} else {
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
}

// 2. add schemaChangeJob
addAlterJobV2(schemaChangeJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public static void checkNativeTable(Database db, Table table) throws DdlExceptio

// check table state
public static void checkTableState(OlapTable olapTable, String tableName) throws DdlException {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL
&& olapTable.getState() != OlapTable.OlapTableState.OPTIMIZE) {
throw InvalidOlapTableStateException.of(olapTable.getState(), tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public enum OlapTableState {
* The query plan which is generate during this state is invalid because the meta
* during the creation of the logical plan and the physical plan might be inconsistent.
*/
UPDATING_META
UPDATING_META,
OPTIMIZE
}

@SerializedName(value = "state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void complete() throws UserException {
tSink2.unsetPartition();
tSink2.unsetLocation();
TOlapTablePartitionParam partitionParam2 = createPartition(tSink2.getDb_id(), dstTable, tupleDescriptor,
enableAutomaticPartition, automaticBucketSize, doubleWritePartitionIds);
false, automaticBucketSize, doubleWritePartitionIds);
tSink2.setPartition(partitionParam2);
tSink2.setLocation(createLocation(dstTable, partitionParam2, enableReplicatedStorage, warehouseId));
tSink2.setIgnore_out_of_partition(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4774,11 +4774,10 @@ public OlapTable getCopiedTable(Database db, OlapTable olapTable, List<Long> sou
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
if (!isOptimize || olapTable.getState() != OlapTable.OlapTableState.SCHEMA_CHANGE) {
throw new RuntimeException("Table' state is not NORMAL: " + olapTable.getState()
+ ", tableId:" + olapTable.getId() + ", tabletName:" + olapTable.getName());
}
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL
&& olapTable.getState() != OlapTable.OlapTableState.OPTIMIZE) {
throw new RuntimeException("Table' state is not NORMAL: " + olapTable.getState()
+ ", tableId:" + olapTable.getId() + ", tabletName:" + olapTable.getName());
}
for (Long id : sourcePartitionIds) {
origPartitions.put(id, olapTable.getPartition(id).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testOptimizeTable() throws Exception {
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
Assert.assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
Assert.assertEquals(OlapTableState.OPTIMIZE, olapTable.getState());
}

// start a schema change, then finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testOptimizeTable() throws Exception {
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
Assert.assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
Assert.assertEquals(OlapTableState.OPTIMIZE, olapTable.getState());
}

// start a schema change, then finished
Expand Down
Loading

0 comments on commit 6f54f6a

Please sign in to comment.