Skip to content

Commit

Permalink
[Refactor] Refactor Starrocks LOG to reduce the log file size(part3) (S…
Browse files Browse the repository at this point in the history
…tarRocks#52128)

Signed-off-by: sevev <[email protected]>
  • Loading branch information
sevev authored Oct 22, 2024
1 parent 41320fe commit 9fe4708
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private void updateClusterLoadStatistic() {
GlobalStateMgr.getCurrentState().getTabletInvertedIndex());
clusterLoadStatistic.init();
if (System.currentTimeMillis() - lastClusterLoadLoggingTime > CLUSTER_LOAD_STATISTICS_LOGGING_INTERVAL_MS) {
LOG.info("update cluster load statistic:\n{}", clusterLoadStatistic.getBrief());
LOG.debug("update cluster load statistic:\n{}", clusterLoadStatistic.getBrief());
lastClusterLoadLoggingTime = System.currentTimeMillis();
}
this.loadStatistic = clusterLoadStatistic;
Expand Down Expand Up @@ -647,7 +647,7 @@ private void schedulePendingTablets() {
if (AgentTaskQueue.addTask(task)) {
stat.counterCloneTask.incrementAndGet();
}
LOG.info("add task to agent task queue: {}", task);
LOG.debug("add task to agent task queue: {}", task);
}

// send task immediately
Expand Down Expand Up @@ -1774,7 +1774,7 @@ private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reaso
runningTablets.remove(tabletCtx.getTabletId());
allTabletIds.remove(tabletCtx.getTabletId());
schedHistory.add(tabletCtx);
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
LOG.debug("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
}

@VisibleForTesting
Expand Down
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public class Config extends ConfigBase {
* In such cases, it is possible to disable this switch.
*/
@ConfField(mutable = true)
public static boolean log_register_and_unregister_query_id = true;
public static boolean log_register_and_unregister_query_id = false;

/**
* Used to limit the maximum number of partitions that can be created when creating a dynamic partition table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ public static RuntimeProfile mergeIsomorphicProfiles(List<RuntimeProfile> profil
RuntimeProfile child = profile.getChild(childName);
if (child == null) {
identical = false;
LOG.info("find non-isomorphic children, profileName={}, requiredChildName={}",
LOG.debug("find non-isomorphic children, profileName={}, requiredChildName={}",
profile.name, childName);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
TNetworkAddress coordAddress = new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port);
params.setCoord(coordAddress);

LOG.info("load job id: {}, txn id: {}, parallel: {}, compress: {}, replicated: {}, quorum: {}",
DebugUtil.printId(loadId), streamLoadInfo.getTxnId(), queryOptions.getLoad_dop(),
queryOptions.getLoad_transmission_compression_type(), destTable.enableReplicatedStorage(), writeQuorum);
LOG.debug("load job id: {}, txn id: {}, parallel: {}, compress: {}, replicated: {}, quorum: {}",
DebugUtil.printId(loadId), streamLoadInfo.getTxnId(), queryOptions.getLoad_dop(),
queryOptions.getLoad_transmission_compression_type(), destTable.enableReplicatedStorage(), writeQuorum);
this.execPlanFragmentParams = params;
return params;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params,
final TReportExecStatusResult result = new TReportExecStatusResult();
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
LOG.info("ReportExecStatus() failed, query does not exist, fragment_instance_id={}, query_id={},",
// query is already removed which is acceptable
LOG.debug("ReportExecStatus() failed, query does not exist, fragment_instance_id={}, query_id={},",
DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id));
result.setStatus(new TStatus(TStatusCode.NOT_FOUND));
result.status.addToError_msgs("query id " + DebugUtil.printId(params.query_id) + " not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ public void finishInstance(TUniqueId instanceId) {
public void finishAllInstances(Status status) {
if (profileDoneSignal != null) {
profileDoneSignal.countDownToZero(status);
LOG.info("unfinished instances: {}", getUnfinishedInstanceIds());
List<String> unFinishedInstanceIds = getUnfinishedInstanceIds();
if (!unFinishedInstanceIds.isEmpty()) {
LOG.info("unfinished instances: {}", unFinishedInstanceIds);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ protected boolean replayJournalInner(JournalCursor cursor, boolean flowControl)

}
if (replayedJournalId.get() - startReplayId > 0) {
LOG.info("replayed journal from {} - {}", startReplayId, replayedJournalId);
LOG.debug("replayed journal from {} - {}", startReplayId, replayedJournalId);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ private long loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) thr
@Override
public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.info("receive txn commit request. db: {}, tbl: {}, txn_id: {}, backend: {}",
LOG.debug("receive txn commit request. db: {}, tbl: {}, txn_id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), clientAddr);
LOG.debug("txn commit request: {}", request);

Expand Down Expand Up @@ -1390,7 +1390,7 @@ void loadTxnCommitImpl(TLoadTxnCommitRequest request, TStatus status) throws Use
@Override
public TGetLoadTxnStatusResult getLoadTxnStatus(TGetLoadTxnStatusRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.info("receive get txn status request. db: {}, tbl: {}, txn_id: {}, backend: {}",
LOG.debug("receive get txn status request. db: {}, tbl: {}, txn_id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), clientAddr);
LOG.debug("get txn status request: {}", request);

Expand Down Expand Up @@ -1427,7 +1427,7 @@ public TGetLoadTxnStatusResult getLoadTxnStatus(TGetLoadTxnStatusRequest request
@Override
public TLoadTxnCommitResult loadTxnPrepare(TLoadTxnCommitRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.info("receive txn prepare request. db: {}, tbl: {}, txn_id: {}, backend: {}",
LOG.debug("receive txn prepare request. db: {}, tbl: {}, txn_id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), clientAddr);
LOG.debug("txn prepare request: {}", request);

Expand Down Expand Up @@ -1485,7 +1485,7 @@ private void loadTxnPrepareImpl(TLoadTxnCommitRequest request) throws UserExcept
@Override
public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.info("receive txn rollback request. db: {}, tbl: {}, txn_id: {}, reason: {}, backend: {}",
LOG.debug("receive txn rollback request. db: {}, tbl: {}, txn_id: {}, reason: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), request.getReason(), clientAddr);
LOG.debug("txn rollback request: {}", request);

Expand Down Expand Up @@ -1575,7 +1575,7 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc
@Override
public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
String clientAddr = getClientAddrAsString();
LOG.info("receive stream load put request. db:{}, tbl: {}, txn_id: {}, load id: {}, backend: {}",
LOG.debug("receive stream load put request. db:{}, tbl: {}, txn_id: {}, load id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), DebugUtil.printId(request.getLoadId()),
clientAddr);
LOG.debug("stream load put request: {}", request);
Expand Down Expand Up @@ -1873,7 +1873,7 @@ public TAllocateAutoIncrementIdResult allocAutoIncrementId(TAllocateAutoIncremen

@Override
public TImmutablePartitionResult updateImmutablePartition(TImmutablePartitionRequest request) throws TException {
LOG.info("Receive update immutable partition: {}", request);
LOG.debug("Receive update immutable partition: {}", request);

TImmutablePartitionResult result;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void prepareTransaction(long transactionId, List<TabletCommitInfo> tablet
persistTxnStateInTxnLevelLock(transactionState);
}

LOG.info("transaction:[{}] successfully prepare", transactionState);
LOG.debug("transaction:[{}] successfully prepare", transactionState);
} finally {
transactionState.writeUnlock();
}
Expand Down Expand Up @@ -1766,25 +1766,35 @@ public void abortTimeoutTxns(long currentMillis) {
}

public void replayUpsertTransactionState(TransactionState transactionState) {
boolean isCheckpoint = GlobalStateMgr.isCheckpointThread();
writeLock();
try {
if (transactionState.getTransactionStatus() == TransactionStatus.UNKNOWN) {
LOG.info("remove unknown transaction: {}", transactionState);
LOG.debug("remove unknown transaction: {}", transactionState);
return;
}
// set transaction status will call txn state change listener
transactionState.replaySetTransactionStatus();
Database db = globalStateMgr.getLocalMetastore().getDb(transactionState.getDbId());
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
LOG.info("replay a committed transaction {}", transactionState);
if (!isCheckpoint) {
LOG.info("replay a committed transaction {}", transactionState.getBrief());
}
LOG.debug("replay a committed transaction {}", transactionState);
updateCatalogAfterCommitted(transactionState, db);
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("replay a visible transaction {}", transactionState);
if (!isCheckpoint) {
LOG.info("replay a visible transaction {}", transactionState.getBrief());
}
LOG.debug("replay a visible transaction {}", transactionState);
updateCatalogAfterVisible(transactionState, db);
}
unprotectUpsertTransactionState(transactionState, true);
if (transactionState.isExpired(System.currentTimeMillis())) {
LOG.info("remove expired transaction: {}", transactionState);
if (!isCheckpoint) {
LOG.info("remove expired transaction: {}", transactionState.getBrief());
}
LOG.debug("remove expired transaction: {}", transactionState);
deleteTransaction(transactionState);
}
} finally {
Expand All @@ -1795,7 +1805,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) {
public void replayUpsertTransactionStateBatch(TransactionStateBatch transactionStateBatch) {
writeLock();
try {
LOG.info("replay a transaction state batch{}", transactionStateBatch);
LOG.debug("replay a transaction state batch{}", transactionStateBatch);
Database db = globalStateMgr.getLocalMetastore().getDb(transactionStateBatch.getDbId());
updateCatalogAfterVisibleBatch(transactionStateBatch, db);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
long version = partitionCommitInfo.getVersion();
List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
List<String> skipUpdateReplicas = Lists.newArrayList();
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
boolean hasFailedVersion = false;
Expand Down Expand Up @@ -139,7 +140,8 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
// not update their last failed version.
// if B is published successfully in next turn, then B is normal and C will be set
// abnormal so that quorum is maintained and loading will go on.
LOG.warn("skip update replica[{}.{}] to visible version", tablet.getId(), replica.getBackendId());
String combinedId = String.format("%d_%d", tablet.getId(), replica.getBackendId());
skipUpdateReplicas.add(combinedId);
newVersion = replica.getVersion();
if (version > lastFailedVersion) {
lastFailedVersion = version;
Expand All @@ -165,6 +167,9 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
}
replica.updateVersionInfo(newVersion, lastFailedVersion, lastSucessVersion);
} // end for replicas
if (!skipUpdateReplicas.isEmpty()) {
LOG.warn("skip update replicas to visible version(tabletId_BackendId): {}", skipUpdateReplicas);
}

if (hasFailedVersion && replicationNum == 1) {
TabletScheduler.resetDecommStatForSingleReplicaTabletUnlocked(tablet.getId(), replicas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ private void publishVersionForOlapTable(List<TransactionState> readyTransactionS

// every backend-transaction identified a single task
AgentBatchTask batchTask = new AgentBatchTask();
List<Long> transactionIds = new ArrayList<>();
// traverse all ready transactions and dispatch the version publish task to all backends
for (TransactionState transactionState : readyTransactionStates) {
List<PublishVersionTask> tasks = transactionState.createPublishVersionTask();
Expand All @@ -280,9 +281,10 @@ private void publishVersionForOlapTable(List<TransactionState> readyTransactionS
}
if (!tasks.isEmpty()) {
transactionState.setHasSendTask(true);
LOG.info("send publish tasks for txn_id: {}", transactionState.getTransactionId());
transactionIds.add(transactionState.getTransactionId());
}
}
LOG.debug("send publish tasks for transactions: {}", transactionIds);
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,25 @@ public String toString() {
return sb.toString();
}

public String getBrief() {
StringBuilder sb = new StringBuilder("TransactionState. ");
sb.append("txn_id: ").append(transactionId);
sb.append(", db id: ").append(dbId);
sb.append(", table id list: ").append(StringUtils.join(tableIdList, ","));
sb.append(", error replicas num: ").append(errorReplicas.size());
sb.append(", replica ids: ").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray()));
if (commitTime > prepareTime) {
sb.append(", write cost: ").append(commitTime - prepareTime).append("ms");
}
if (finishTime > commitTime && commitTime > 0) {
sb.append(", publish total cost: ").append(finishTime - commitTime).append("ms");
}
if (finishTime > prepareTime) {
sb.append(", total cost: ").append(finishTime - prepareTime).append("ms");
}
return sb.toString();
}

public LoadJobSourceType getSourceType() {
return sourceType;
}
Expand Down

0 comments on commit 9fe4708

Please sign in to comment.