diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeProtocolIT.java index d0bbb0d967fc..046c5f68fece 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeProtocolIT.java @@ -435,7 +435,6 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { extractorAttributes.put("database-name", "test.*"); extractorAttributes.put("table-name", "test.*"); extractorAttributes.put("inclusion", "data.insert"); - extractorAttributes.put("mode.streaming", "true"); extractorAttributes.put("mode.snapshot", "false"); extractorAttributes.put("mode.strict", "true"); diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java index 1fa0046ccdab..99d547a23c6d 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java @@ -23,6 +23,10 @@ import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class PipeParameterValidator { @@ -36,6 +40,40 @@ public PipeParameters getParameters() { return parameters; } + /** + * Validates whether the attributes entered by the user contain at least one attribute from + * lhsAttributes or rhsAttributes (if required), but not both. + * + * @param lhsAttributes list of left-hand side synonym attributes + * @param rhsAttributes list of right-hand side synonym attributes + * @param isRequired specifies whether at least one attribute from lhsAttributes or rhsAttributes + * must be provided + * @throws PipeParameterNotValidException if both lhsAttributes and rhsAttributes are provided + * @throws PipeAttributeNotProvidedException if isRequired is true and neither lhsAttributes nor + * rhsAttributes are provided + * @return the instance of PipeParameterValidator for method chaining + */ + public PipeParameterValidator validateSynonymAttributes( + final List lhsAttributes, + final List rhsAttributes, + final boolean isRequired) { + final boolean lhsExistence = lhsAttributes.stream().anyMatch(parameters::hasAttribute); + final boolean rhsExistence = rhsAttributes.stream().anyMatch(parameters::hasAttribute); + if (lhsExistence && rhsExistence) { + throw new PipeParameterNotValidException( + String.format( + "Cannot specify both %s and %s at the same time", lhsAttributes, rhsAttributes)); + } + if (isRequired && !lhsExistence && !rhsExistence) { + throw new PipeAttributeNotProvidedException( + Stream.concat(lhsAttributes.stream(), rhsAttributes.stream()) + .collect( + Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)) + .toString()); + } + return this; + } + /** * Validates whether the attributes entered by the user contain an attribute whose key is * attributeKey. @@ -83,7 +121,7 @@ public PipeParameterValidator validateAttributeValueRange( * @throws PipeParameterNotValidException if the given argument is not valid */ public PipeParameterValidator validate( - final PipeParameterValidator.SingleObjectValidationRule validationRule, + final SingleObjectValidationRule validationRule, final String messageToThrow, final Object argument) throws PipeParameterNotValidException { @@ -107,7 +145,7 @@ public interface SingleObjectValidationRule { * @throws PipeParameterNotValidException if the given arguments are not valid */ public PipeParameterValidator validate( - final PipeParameterValidator.MultipleObjectsValidationRule validationRule, + final MultipleObjectsValidationRule validationRule, final String messageToThrow, final Object... arguments) throws PipeParameterNotValidException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index d4ce3e7663dc..97f8021a32a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -60,6 +60,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; @@ -77,9 +79,11 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; @@ -91,8 +95,12 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; @@ -104,11 +112,15 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._EXTRACTOR_WATERMARK_INTERVAL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._SOURCE_WATERMARK_INTERVAL_KEY; @@ -174,10 +186,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { && validator .getParameters() .hasAnyAttributes( - PipeExtractorConstant.EXTRACTOR_PATH_KEY, - PipeExtractorConstant.SOURCE_PATH_KEY, - PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, - PipeExtractorConstant.SOURCE_PATTERN_KEY)) { + EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY)) { throw new PipeException( "The pipe cannot extract tree model data when sql dialect is set to table."); } @@ -185,14 +194,14 @@ public void validate(final PipeParameterValidator validator) throws Exception { && validator .getParameters() .hasAnyAttributes( - PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY, - PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY, - PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY, - PipeExtractorConstant.SOURCE_TABLE_NAME_KEY, - PipeExtractorConstant.EXTRACTOR_DATABASE_KEY, - PipeExtractorConstant.SOURCE_DATABASE_KEY, - PipeExtractorConstant.EXTRACTOR_TABLE_KEY, - PipeExtractorConstant.SOURCE_TABLE_KEY)) { + EXTRACTOR_DATABASE_NAME_KEY, + SOURCE_DATABASE_NAME_KEY, + EXTRACTOR_TABLE_NAME_KEY, + SOURCE_TABLE_NAME_KEY, + EXTRACTOR_DATABASE_KEY, + SOURCE_DATABASE_KEY, + EXTRACTOR_TABLE_KEY, + SOURCE_TABLE_KEY)) { throw new PipeException( "The pipe cannot extract table model data when sql dialect is set to tree."); } @@ -287,7 +296,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); } - checkInvalidParameters(validator.getParameters()); + checkInvalidParameters(validator); constructHistoricalExtractor(); constructRealtimeExtractor(validator.getParameters()); @@ -319,7 +328,9 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t } } - private void checkInvalidParameters(final PipeParameters parameters) { + private void checkInvalidParameters(final PipeParameterValidator validator) { + final PipeParameters parameters = validator.getParameters(); + // Enable history and realtime if specifying start-time or end-time if (parameters.hasAnyAttributes( SOURCE_START_TIME_KEY, @@ -343,79 +354,58 @@ private void checkInvalidParameters(final PipeParameters parameters) { EXTRACTOR_HISTORY_END_TIME_KEY); } + // Check coexistence of database-name and database + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_DATABASE_NAME_KEY, SOURCE_DATABASE_NAME_KEY), + Arrays.asList(EXTRACTOR_DATABASE_KEY, SOURCE_DATABASE_KEY), + false); + + // Check coexistence of table-name and table + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_TABLE_NAME_KEY, SOURCE_TABLE_NAME_KEY), + Arrays.asList(EXTRACTOR_TABLE_KEY, SOURCE_TABLE_KEY), + false); + // Check coexistence of mode.snapshot and mode - if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY) - && parameters.hasAnyAttributes(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY)) { - LOGGER.warn( - "When {} or {} is specified, specifying {} and {} is invalid.", - EXTRACTOR_MODE_SNAPSHOT_KEY, - SOURCE_MODE_SNAPSHOT_KEY, - EXTRACTOR_MODE_KEY, - SOURCE_MODE_KEY); - } + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY), + Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), + false); // Check coexistence of mode.streaming and realtime.mode - if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) - && parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { - LOGGER.warn( - "When {} or {} is specified, specifying {} and {} is invalid.", - EXTRACTOR_MODE_STREAMING_KEY, - SOURCE_MODE_STREAMING_KEY, - EXTRACTOR_REALTIME_MODE_KEY, - SOURCE_REALTIME_MODE_KEY); - } + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY), + Arrays.asList(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY), + false); // Check coexistence of mode.strict, history.loose-range and realtime.loose-range - if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { - if (parameters.hasAnyAttributes( - EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY)) { - LOGGER.warn( - "When {} or {} is specified, specifying {} and {} is invalid.", - EXTRACTOR_MODE_STRICT_KEY, - SOURCE_MODE_STRICT_KEY, + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY), + Arrays.asList( EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, - SOURCE_HISTORY_LOOSE_RANGE_KEY); - } - if (parameters.hasAnyAttributes( - EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY)) { - LOGGER.warn( - "When {} or {} is specified, specifying {} and {} is invalid.", - EXTRACTOR_MODE_STRICT_KEY, - SOURCE_MODE_STRICT_KEY, + SOURCE_HISTORY_LOOSE_RANGE_KEY, EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, - SOURCE_REALTIME_LOOSE_RANGE_KEY); - } - } + SOURCE_REALTIME_LOOSE_RANGE_KEY), + false); // Check coexistence of mods and mods.enable - if (parameters.hasAnyAttributes(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY) - && parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { - LOGGER.warn( - "When {} or {} is specified, specifying {} and {} is invalid.", - EXTRACTOR_MODS_KEY, - SOURCE_MODS_KEY, - EXTRACTOR_MODS_ENABLE_KEY, - SOURCE_MODS_ENABLE_KEY); - } + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY), + Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY), + false); // Check coexistence of watermark.interval-ms and watermark-interval-ms - if (parameters.hasAnyAttributes(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY) - && parameters.hasAnyAttributes( - _EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) { - LOGGER.warn( - "When {} or {} is specified, specifying {} and {} is invalid.", - EXTRACTOR_WATERMARK_INTERVAL_KEY, - SOURCE_WATERMARK_INTERVAL_KEY, - _EXTRACTOR_WATERMARK_INTERVAL_KEY, - _SOURCE_WATERMARK_INTERVAL_KEY); - } + validator.validateSynonymAttributes( + Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY), + Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY), + false); // Check if specifying mode.snapshot or mode.streaming when disable realtime extractor if (!parameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { - LOGGER.info( + LOGGER.warn( "When '{}' ('{}') is set to false, specifying {} and {} is invalid.", EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY, @@ -423,7 +413,7 @@ private void checkInvalidParameters(final PipeParameters parameters) { SOURCE_MODE_SNAPSHOT_KEY); } if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) { - LOGGER.info( + LOGGER.warn( "When '{}' ('{}') is set to false, specifying {} and {} is invalid.", EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java index d47a270ac0cc..0902cc3a7d9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; @@ -50,6 +49,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -102,32 +102,25 @@ public class TwoStageCountProcessor implements PipeProcessor { @Override public void validate(PipeParameterValidator validator) throws Exception { - checkInvalidParameters(validator.getParameters()); - - final String rawOutputSeries; - if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) { - validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY); - rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY); - } else { - rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY); - } + checkInvalidParameters(validator); + final String rawOutputSeries = + validator + .getParameters() + .getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY, _PROCESSOR_OUTPUT_SERIES_KEY); try { - PathUtils.isLegalPath(rawOutputSeries); - } catch (IllegalPathException e) { - throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries); + PathUtils.isLegalPath(Objects.requireNonNull(rawOutputSeries)); + } catch (Exception e) { + throw new PipeParameterNotValidException("Illegal output series path: " + rawOutputSeries); } } - private void checkInvalidParameters(final PipeParameters parameters) { + private void checkInvalidParameters(final PipeParameterValidator validator) { // Check coexistence of output.series and output-series - if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY) - && parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) { - LOGGER.warn( - "When {} is specified, specifying {} is invalid.", - PROCESSOR_OUTPUT_SERIES_KEY, - _PROCESSOR_OUTPUT_SERIES_KEY); - } + validator.validateSynonymAttributes( + Collections.singletonList(PROCESSOR_OUTPUT_SERIES_KEY), + Collections.singletonList(_PROCESSOR_OUTPUT_SERIES_KEY), + true); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 3539d25ddfbe..3d173a49a8ae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -190,6 +190,12 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)); + // Check coexistence of user and username + validator.validateSynonymAttributes( + Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), + Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY), + false); + username = parameters.getStringOrDefault( Arrays.asList(