Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: strict check for synonym pipe parameters to avoid ambiguity #14694

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
extractorAttributes.put("database-name", "test.*");
extractorAttributes.put("table-name", "test.*");
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("mode.streaming", "true");
extractorAttributes.put("mode.snapshot", "false");
extractorAttributes.put("mode.strict", "true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PipeParameterValidator {

Expand All @@ -36,6 +40,40 @@ public PipeParameters getParameters() {
return parameters;
}

/**
* Validates whether the attributes entered by the user contain at least one attribute from
* lhsAttributes or rhsAttributes (if required), but not both.
*
* @param lhsAttributes list of left-hand side synonym attributes
* @param rhsAttributes list of right-hand side synonym attributes
* @param isRequired specifies whether at least one attribute from lhsAttributes or rhsAttributes
* must be provided
* @throws PipeParameterNotValidException if both lhsAttributes and rhsAttributes are provided
* @throws PipeAttributeNotProvidedException if isRequired is true and neither lhsAttributes nor
* rhsAttributes are provided
* @return the instance of PipeParameterValidator for method chaining
*/
public PipeParameterValidator validateSynonymAttributes(
final List<String> lhsAttributes,
final List<String> rhsAttributes,
final boolean isRequired) {
final boolean lhsExistence = lhsAttributes.stream().anyMatch(parameters::hasAttribute);
final boolean rhsExistence = rhsAttributes.stream().anyMatch(parameters::hasAttribute);
if (lhsExistence && rhsExistence) {
throw new PipeParameterNotValidException(
String.format(
"Cannot specify both %s and %s at the same time", lhsAttributes, rhsAttributes));
}
if (isRequired && !lhsExistence && !rhsExistence) {
throw new PipeAttributeNotProvidedException(
Stream.concat(lhsAttributes.stream(), rhsAttributes.stream())
.collect(
Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList))
.toString());
}
return this;
}

/**
* Validates whether the attributes entered by the user contain an attribute whose key is
* attributeKey.
Expand Down Expand Up @@ -83,7 +121,7 @@ public PipeParameterValidator validateAttributeValueRange(
* @throws PipeParameterNotValidException if the given argument is not valid
*/
public PipeParameterValidator validate(
final PipeParameterValidator.SingleObjectValidationRule validationRule,
final SingleObjectValidationRule validationRule,
final String messageToThrow,
final Object argument)
throws PipeParameterNotValidException {
Expand All @@ -107,7 +145,7 @@ public interface SingleObjectValidationRule {
* @throws PipeParameterNotValidException if the given arguments are not valid
*/
public PipeParameterValidator validate(
final PipeParameterValidator.MultipleObjectsValidationRule validationRule,
final MultipleObjectsValidationRule validationRule,
final String messageToThrow,
final Object... arguments)
throws PipeParameterNotValidException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
Expand All @@ -77,9 +79,11 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATH_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
Expand All @@ -91,8 +95,12 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
Expand All @@ -104,11 +112,15 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATH_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._SOURCE_WATERMARK_INTERVAL_KEY;
Expand Down Expand Up @@ -174,25 +186,22 @@ public void validate(final PipeParameterValidator validator) throws Exception {
&& validator
.getParameters()
.hasAnyAttributes(
PipeExtractorConstant.EXTRACTOR_PATH_KEY,
PipeExtractorConstant.SOURCE_PATH_KEY,
PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
PipeExtractorConstant.SOURCE_PATTERN_KEY)) {
EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY)) {
throw new PipeException(
"The pipe cannot extract tree model data when sql dialect is set to table.");
}
if (!isTableModelDataAllowedToBeCaptured
&& validator
.getParameters()
.hasAnyAttributes(
PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY,
PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY,
PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY,
PipeExtractorConstant.SOURCE_TABLE_NAME_KEY,
PipeExtractorConstant.EXTRACTOR_DATABASE_KEY,
PipeExtractorConstant.SOURCE_DATABASE_KEY,
PipeExtractorConstant.EXTRACTOR_TABLE_KEY,
PipeExtractorConstant.SOURCE_TABLE_KEY)) {
EXTRACTOR_DATABASE_NAME_KEY,
SOURCE_DATABASE_NAME_KEY,
EXTRACTOR_TABLE_NAME_KEY,
SOURCE_TABLE_NAME_KEY,
EXTRACTOR_DATABASE_KEY,
SOURCE_DATABASE_KEY,
EXTRACTOR_TABLE_KEY,
SOURCE_TABLE_KEY)) {
throw new PipeException(
"The pipe cannot extract table model data when sql dialect is set to tree.");
}
Expand Down Expand Up @@ -287,7 +296,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
}

checkInvalidParameters(validator.getParameters());
checkInvalidParameters(validator);

constructHistoricalExtractor();
constructRealtimeExtractor(validator.getParameters());
Expand Down Expand Up @@ -319,7 +328,9 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t
}
}

private void checkInvalidParameters(final PipeParameters parameters) {
private void checkInvalidParameters(final PipeParameterValidator validator) {
final PipeParameters parameters = validator.getParameters();

// Enable history and realtime if specifying start-time or end-time
if (parameters.hasAnyAttributes(
SOURCE_START_TIME_KEY,
Expand All @@ -343,87 +354,66 @@ private void checkInvalidParameters(final PipeParameters parameters) {
EXTRACTOR_HISTORY_END_TIME_KEY);
}

// Check coexistence of database-name and database
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_DATABASE_NAME_KEY, SOURCE_DATABASE_NAME_KEY),
Arrays.asList(EXTRACTOR_DATABASE_KEY, SOURCE_DATABASE_KEY),
false);

// Check coexistence of table-name and table
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_TABLE_NAME_KEY, SOURCE_TABLE_NAME_KEY),
Arrays.asList(EXTRACTOR_TABLE_KEY, SOURCE_TABLE_KEY),
false);

// Check coexistence of mode.snapshot and mode
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)
&& parameters.hasAnyAttributes(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_SNAPSHOT_KEY,
SOURCE_MODE_SNAPSHOT_KEY,
EXTRACTOR_MODE_KEY,
SOURCE_MODE_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY),
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
false);

// Check coexistence of mode.streaming and realtime.mode
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)
&& parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY,
EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY),
Arrays.asList(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY),
false);

// Check coexistence of mode.strict, history.loose-range and realtime.loose-range
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) {
if (parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_STRICT_KEY,
SOURCE_MODE_STRICT_KEY,
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY),
Arrays.asList(
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY);
}
if (parameters.hasAnyAttributes(
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_STRICT_KEY,
SOURCE_MODE_STRICT_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY,
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY,
SOURCE_REALTIME_LOOSE_RANGE_KEY);
}
}
SOURCE_REALTIME_LOOSE_RANGE_KEY),
false);

// Check coexistence of mods and mods.enable
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY)
&& parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODS_KEY,
SOURCE_MODS_KEY,
EXTRACTOR_MODS_ENABLE_KEY,
SOURCE_MODS_ENABLE_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY),
Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY),
false);

// Check coexistence of watermark.interval-ms and watermark-interval-ms
if (parameters.hasAnyAttributes(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)
&& parameters.hasAnyAttributes(
_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_WATERMARK_INTERVAL_KEY,
SOURCE_WATERMARK_INTERVAL_KEY,
_EXTRACTOR_WATERMARK_INTERVAL_KEY,
_SOURCE_WATERMARK_INTERVAL_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY),
Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY),
false);

// Check if specifying mode.snapshot or mode.streaming when disable realtime extractor
if (!parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
LOGGER.info(
LOGGER.warn(
"When '{}' ('{}') is set to false, specifying {} and {} is invalid.",
EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY,
EXTRACTOR_MODE_SNAPSHOT_KEY,
SOURCE_MODE_SNAPSHOT_KEY);
}
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) {
LOGGER.info(
LOGGER.warn(
"When '{}' ('{}') is set to false, specifying {} and {} is invalid.",
EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

Expand Down Expand Up @@ -102,32 +102,25 @@ public class TwoStageCountProcessor implements PipeProcessor {

@Override
public void validate(PipeParameterValidator validator) throws Exception {
checkInvalidParameters(validator.getParameters());

final String rawOutputSeries;
if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) {
validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY);
rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY);
} else {
rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY);
}
checkInvalidParameters(validator);

final String rawOutputSeries =
validator
.getParameters()
.getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY, _PROCESSOR_OUTPUT_SERIES_KEY);
try {
PathUtils.isLegalPath(rawOutputSeries);
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries);
PathUtils.isLegalPath(Objects.requireNonNull(rawOutputSeries));
} catch (Exception e) {
throw new PipeParameterNotValidException("Illegal output series path: " + rawOutputSeries);
}
}

private void checkInvalidParameters(final PipeParameters parameters) {
private void checkInvalidParameters(final PipeParameterValidator validator) {
// Check coexistence of output.series and output-series
if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)
&& parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) {
LOGGER.warn(
"When {} is specified, specifying {} is invalid.",
PROCESSOR_OUTPUT_SERIES_KEY,
_PROCESSOR_OUTPUT_SERIES_KEY);
}
validator.validateSynonymAttributes(
Collections.singletonList(PROCESSOR_OUTPUT_SERIES_KEY),
Collections.singletonList(_PROCESSOR_OUTPUT_SERIES_KEY),
true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public void validate(final PipeParameterValidator validator) throws Exception {
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));

// Check coexistence of user and username
validator.validateSynonymAttributes(
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);

username =
parameters.getStringOrDefault(
Arrays.asList(
Expand Down
Loading