diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java index 5e3fe5ea9..d9d9b386a 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java @@ -187,6 +187,11 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig { = new ParentValueRecommender(FORMAT_CLASS_CONFIG, AvroFormat.class, AVRO_SUPPORTED_CODECS); private static final ParquetCodecRecommender PARQUET_COMPRESSION_RECOMMENDER = new ParquetCodecRecommender(); + public static final String WRITE_FAILURE_TOLERANCE_CONFIG = "write.failure.tolerance"; + private static final Integer WRITE_FAILURE_TOLERANCE_DEFAULT = 10; + private static final String WRITE_FAILURE_TOLERANCE_DOC = "The maximum number of times that the" + + "task is going to handle consecutive write exceptions."; + private static final String WRITE_FAILURE_TOLERANCE_DISPLAY = "Failure Tolerance"; static { STORAGE_CLASS_RECOMMENDER.addValidValues( @@ -307,6 +312,17 @@ public static ConfigDef newConfigDef() { ROTATE_MAX_FILE_SIZE_BYTES_DISPLAY ); + configDef.define( + WRITE_FAILURE_TOLERANCE_CONFIG, + Type.INT, + WRITE_FAILURE_TOLERANCE_DEFAULT, + Importance.MEDIUM, + WRITE_FAILURE_TOLERANCE_DOC, + group, + ++orderInGroup, + Width.MEDIUM, + WRITE_FAILURE_TOLERANCE_DISPLAY + ); } { diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 249466d6b..bfd34124e 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -108,6 +108,8 @@ public class TopicPartitionWriter { private final Map endOffsets; private final long timeoutMs; private long failureTime; + private int failureCount; + private int failureTolerance; private final StorageSchemaCompatibility compatibility; private Schema currentSchema; private final String extension; @@ -205,6 +207,7 @@ public TopicPartitionWriter( maxFileSizeRotationBytes = config.getLong(HdfsSinkConnectorConfig .ROTATE_MAX_FILE_SIZE_BYTES_CONFIG); timeoutMs = config.getLong(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG); + failureTolerance = config.getInt(HdfsSinkConnectorConfig.WRITE_FAILURE_TOLERANCE_CONFIG); compatibility = StorageSchemaCompatibility.getCompatibility( config.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)); @@ -222,6 +225,7 @@ public TopicPartitionWriter( endOffsets = new HashMap<>(); state = State.RECOVERY_STARTED; failureTime = -1L; + failureCount = 0; // The next offset to consume after the last commit (one more than last offset written to HDFS) offset = -1L; if (writerProvider != null) { @@ -339,6 +343,11 @@ public void write() { if (failureTime > 0 && now - failureTime < timeoutMs) { return; } + if (failureCount > failureTolerance) { + log.error("The writer has failed {} times consecutively.", failureCount); + // Kill the task as it has been failing more than the max retries + throw new ConnectException("The task has exceeded the failure tolerance."); + } if (state.compareTo(State.WRITE_STARTED) < 0) { boolean success = recover(); if (!success) { @@ -424,8 +433,9 @@ public void write() { } catch (ConnectException e) { log.error("Exception on topic partition {}: ", tp, e); failureTime = time.milliseconds(); + failureCount += 1; setRetryTimeout(timeoutMs); - break; + return; } } if (buffer.isEmpty()) { @@ -464,6 +474,7 @@ public void write() { } catch (ConnectException e) { log.error("Exception on topic partition {}: ", tp, e); failureTime = time.milliseconds(); + failureCount += 1; setRetryTimeout(timeoutMs); return; } @@ -471,6 +482,7 @@ public void write() { resume(); state = State.WRITE_STARTED; } + this.failureCount = 0; } public void close() throws ConnectException {