Skip to content

Commit

Permalink
PipeConsensus: Avoid pipe task being restarted frequently by Pipe fra…
Browse files Browse the repository at this point in the history
…mework (#12931)
  • Loading branch information
Pengzna authored Jul 15, 2024
1 parent ac110bc commit ed75066
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -233,7 +233,8 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception

boolean enqueueResult = addEvent2Buffer((EnrichedEvent) tabletInsertionEvent);
if (!enqueueResult) {
throw new PipeException(ENQUEUE_EXCEPTION_MSG);
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
}
// batch transfer tablets.
if (isTabletBatchModeEnabled) {
Expand Down Expand Up @@ -328,7 +329,8 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception

boolean enqueueResult = addEvent2Buffer((EnrichedEvent) tsFileInsertionEvent);
if (!enqueueResult) {
throw new PipeException(ENQUEUE_EXCEPTION_MSG);
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
}
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
Expand Down

0 comments on commit ed75066

Please sign in to comment.