Skip to content

Commit

Permalink
Pipe: strict check for synonym pipe parameters to avoid ambiguity (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Jan 14, 2025
1 parent b24fb5b commit 972fa0a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PipeParameterValidator {

Expand All @@ -36,6 +40,40 @@ public PipeParameters getParameters() {
return parameters;
}

/**
* Validates whether the attributes entered by the user contain at least one attribute from
* lhsAttributes or rhsAttributes (if required), but not both.
*
* @param lhsAttributes list of left-hand side synonym attributes
* @param rhsAttributes list of right-hand side synonym attributes
* @param isRequired specifies whether at least one attribute from lhsAttributes or rhsAttributes
* must be provided
* @throws PipeParameterNotValidException if both lhsAttributes and rhsAttributes are provided
* @throws PipeAttributeNotProvidedException if isRequired is true and neither lhsAttributes nor
* rhsAttributes are provided
* @return the instance of PipeParameterValidator for method chaining
*/
public PipeParameterValidator validateSynonymAttributes(
final List<String> lhsAttributes,
final List<String> rhsAttributes,
final boolean isRequired) {
final boolean lhsExistence = lhsAttributes.stream().anyMatch(parameters::hasAttribute);
final boolean rhsExistence = rhsAttributes.stream().anyMatch(parameters::hasAttribute);
if (lhsExistence && rhsExistence) {
throw new PipeParameterNotValidException(
String.format(
"Cannot specify both %s and %s at the same time", lhsAttributes, rhsAttributes));
}
if (isRequired && !lhsExistence && !rhsExistence) {
throw new PipeAttributeNotProvidedException(
Stream.concat(lhsAttributes.stream(), rhsAttributes.stream())
.collect(
Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList))
.toString());
}
return this;
}

/**
* Validates whether the attributes entered by the user contain an attribute whose key is
* attributeKey.
Expand Down Expand Up @@ -83,7 +121,7 @@ public PipeParameterValidator validateAttributeValueRange(
* @throws PipeParameterNotValidException if the given argument is not valid
*/
public PipeParameterValidator validate(
final PipeParameterValidator.SingleObjectValidationRule validationRule,
final SingleObjectValidationRule validationRule,
final String messageToThrow,
final Object argument)
throws PipeParameterNotValidException {
Expand All @@ -107,7 +145,7 @@ public interface SingleObjectValidationRule {
* @throws PipeParameterNotValidException if the given arguments are not valid
*/
public PipeParameterValidator validate(
final PipeParameterValidator.MultipleObjectsValidationRule validationRule,
final MultipleObjectsValidationRule validationRule,
final String messageToThrow,
final Object... arguments)
throws PipeParameterNotValidException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public void validate(final PipeParameterValidator validator) throws Exception {
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));

// Check coexistence of user and username
validator.validateSynonymAttributes(
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);

username =
parameters.getStringOrDefault(
Arrays.asList(
Expand Down

0 comments on commit 972fa0a

Please sign in to comment.