collection) {
+ return (collection != null) && (!collection.isEmpty());
+ }
+
+ boolean hasExtendedKeyUsage() {
+ return keyUsages.stream().anyMatch(PublicKeyUsage::isNotExtendedUsage);
+ }
+
+ ExtendedKeyUsage getExtendedKeyUsage() {
+ KeyPurposeId[] usages = keyUsages.stream()
+ .filter(PublicKeyUsage::isExtendedUsage)
+ .map(PublicKeyUsage::getKeyPurposeId)
+ .toArray(KeyPurposeId[]::new);
+ return new ExtendedKeyUsage(usages);
+ }
+}
+// CS-ENFORCE-SINGLE
diff --git a/src/integrationTest/java/org/opensearch/test/framework/certificate/CertificatesIssuer.java b/src/integrationTest/java/org/opensearch/test/framework/certificate/CertificatesIssuer.java
new file mode 100644
index 0000000000..6facf5f2ac
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/certificate/CertificatesIssuer.java
@@ -0,0 +1,241 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+/*
+* Copyright 2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+package org.opensearch.test.framework.certificate;
+
+// CS-SUPPRESS-SINGLE: RegexpSingleline Extension is used to refer to certificate extensions, keeping this rule disable for the whole file
+import java.math.BigInteger;
+import java.security.KeyPair;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.Provider;
+import java.security.PublicKey;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.bouncycastle.asn1.DERSequence;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.RFC4519Style;
+import org.bouncycastle.asn1.x509.BasicConstraints;
+import org.bouncycastle.asn1.x509.Extension;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.CertIOException;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509ExtensionUtils;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+*
+* The class is used to generate public key certificate. The class hides low level details related to certificate creation and
+* usage of underlying Bouncy Castle library.
+*
+*
+* The public key certificate according to its name contains a public key and some metadata. The metadata describes an entity (human,
+* company, web server, IoT device, etc.) which is an owner of private key associated with the certificate (private key is not included
+* into certificate and is a kind of secret). The responsibility of the class is to issue a certificate. To issue a certificate it is
+* necessary to provide metadata which is embedded in the certificates. The metadata is represented by the class
+* {@link CertificateMetadata}. Furthermore, the class needs a public key which also must be embedded in the certificate. To obtain public
+* and private key pair the class uses {@link AlgorithmKit}. The result of creating certificate is data structure {@link CertificateData}.
+* The class {@link CertificateData} contains entire information which is necessary to use the certificate by its owner, that is:
+* certificate and private key.
+*
+*
+*
+* The class is able to create self-signed certificates or certificates signed by some entity. To create a self signed certificate
+* the method {@link #issueSignedCertificate(CertificateMetadata, CertificateData)} is used, whereas to create signed certificates
+* the method {@link #issueSignedCertificate(CertificateMetadata, CertificateData)} is employed.
+*
+*
+* The instance of the class can be obtained by invocation of static method defined in class {@link CertificatesIssuerFactory}.
+*
+*/
+class CertificatesIssuer {
+
+ private static final Logger log = LogManager.getLogger(CertificatesIssuer.class);
+
+ private static final AtomicLong ID_COUNTER = new AtomicLong(System.currentTimeMillis());
+
+ private final Provider securityProvider;
+ private final AlgorithmKit algorithmKit;
+ private final JcaX509ExtensionUtils extUtils;
+
+ CertificatesIssuer(Provider securityProvider, AlgorithmKit algorithmKit) {
+ this.securityProvider = securityProvider;
+ this.algorithmKit = algorithmKit;
+ this.extUtils = getExtUtils();
+ }
+
+ /**
+ * The method creates a certificate with provided metadata and public key obtained from {@link #algorithmKit}. The result of invocation
+ * contains required data to use a certificate by its owner.
+ *
+ * @param certificateMetadata metadata which should be embedded into created certificate
+ * @return {@link CertificateData} which contain certificate and private key associated with the certificate.
+ */
+ public CertificateData issueSelfSignedCertificate(CertificateMetadata certificateMetadata) {
+ try {
+ KeyPair publicAndPrivateKey = algorithmKit.generateKeyPair();
+ X500Name issuerName = stringToX500Name(requireNonNull(certificateMetadata.getSubject(), "Certificate metadata are required."));
+ X509CertificateHolder x509CertificateHolder = buildCertificateHolder(
+ certificateMetadata,
+ issuerName,
+ publicAndPrivateKey.getPublic(),
+ publicAndPrivateKey
+ );
+ return new CertificateData(x509CertificateHolder, publicAndPrivateKey);
+ } catch (OperatorCreationException | CertIOException e) {
+ log.error("Error while generating certificate", e);
+ throw new RuntimeException("Error while generating self signed certificate", e);
+ }
+ }
+
+ /**
+ * The method is similar to {@link #issueSignedCertificate(CertificateMetadata, CertificateData)} but additionally it signs created
+ * certificate using data from parentCertificateData
.
+ *
+ * @param metadata metadata which should be embedded into created certificate
+ * @param parentCertificateData data required to signe a newly issued certificate (private key among others things).
+ * @return {@link CertificateData} which contain certificate and private key associated with the certificate.
+ */
+ public CertificateData issueSignedCertificate(CertificateMetadata metadata, CertificateData parentCertificateData) {
+ try {
+ KeyPair publicAndPrivateKey = algorithmKit.generateKeyPair();
+ KeyPair parentKeyPair = requireNonNull(parentCertificateData, "Issuer certificate data are required").getKeyPair();
+ X500Name issuerName = parentCertificateData.getCertificateSubject();
+ var x509CertificateHolder = buildCertificateHolder(
+ requireNonNull(metadata, "Certificate metadata are required"),
+ issuerName,
+ publicAndPrivateKey.getPublic(),
+ parentKeyPair
+ );
+ return new CertificateData(x509CertificateHolder, publicAndPrivateKey);
+ } catch (OperatorCreationException | CertIOException e) {
+ log.error("Error while generating signed certificate", e);
+ throw new RuntimeException("Error while generating signed certificate", e);
+ }
+ }
+
+ private X509CertificateHolder buildCertificateHolder(
+ CertificateMetadata certificateMetadata,
+ X500Name issuerName,
+ PublicKey certificatePublicKey,
+ KeyPair parentKeyPair
+ ) throws CertIOException, OperatorCreationException {
+ X509v3CertificateBuilder builder = builderWithBasicExtensions(
+ certificateMetadata,
+ issuerName,
+ certificatePublicKey,
+ parentKeyPair.getPublic()
+ );
+ addSubjectAlternativeNameExtension(builder, certificateMetadata);
+ addExtendedKeyUsageExtension(builder, certificateMetadata);
+ return builder.build(createContentSigner(parentKeyPair.getPrivate()));
+ }
+
+ private ContentSigner createContentSigner(PrivateKey privateKey) throws OperatorCreationException {
+ return new JcaContentSignerBuilder(algorithmKit.getSignatureAlgorithmName()).setProvider(securityProvider).build(privateKey);
+ }
+
+ private void addExtendedKeyUsageExtension(X509v3CertificateBuilder builder, CertificateMetadata certificateMetadata)
+ throws CertIOException {
+ if (certificateMetadata.hasExtendedKeyUsage()) {
+ builder.addExtension(Extension.extendedKeyUsage, true, certificateMetadata.getExtendedKeyUsage());
+ }
+ }
+
+ private X509v3CertificateBuilder builderWithBasicExtensions(
+ CertificateMetadata certificateMetadata,
+ X500Name issuerName,
+ PublicKey certificatePublicKey,
+ PublicKey parentPublicKey
+ ) throws CertIOException {
+ X500Name subjectName = stringToX500Name(certificateMetadata.getSubject());
+ Date validityStartDate = new Date(System.currentTimeMillis() - (24 * 3600 * 1000));
+ Date validityEndDate = getEndDate(validityStartDate, certificateMetadata.getValidityDays());
+
+ BigInteger certificateSerialNumber = generateNextCertificateSerialNumber();
+ return new X509v3CertificateBuilder(
+ issuerName,
+ certificateSerialNumber,
+ validityStartDate,
+ validityEndDate,
+ subjectName,
+ SubjectPublicKeyInfo.getInstance(certificatePublicKey.getEncoded())
+ ).addExtension(Extension.basicConstraints, true, new BasicConstraints(certificateMetadata.isBasicConstrainIsCa()))
+ .addExtension(Extension.authorityKeyIdentifier, false, extUtils.createAuthorityKeyIdentifier(parentPublicKey))
+ .addExtension(Extension.subjectKeyIdentifier, false, extUtils.createSubjectKeyIdentifier(certificatePublicKey))
+ .addExtension(Extension.keyUsage, true, certificateMetadata.asKeyUsage());
+ }
+
+ private void addSubjectAlternativeNameExtension(X509v3CertificateBuilder builder, CertificateMetadata metadata) throws CertIOException {
+ if (metadata.hasSubjectAlternativeNameExtension()) {
+ DERSequence subjectAlternativeNames = metadata.createSubjectAlternativeNames();
+ builder.addExtension(Extension.subjectAlternativeName, false, subjectAlternativeNames);
+ }
+ }
+
+ private Date getEndDate(Date startDate, int validityDays) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(startDate);
+ calendar.add(Calendar.DATE, validityDays);
+ return calendar.getTime();
+ }
+
+ private static JcaX509ExtensionUtils getExtUtils() {
+ try {
+ return new JcaX509ExtensionUtils();
+ } catch (NoSuchAlgorithmException e) {
+ log.error("Getting certificate extension utils failed", e);
+ throw new RuntimeException("Getting certificate extension utils failed", e);
+ }
+ }
+
+ private X500Name stringToX500Name(String distinguishedName) {
+ if (Strings.isNullOrEmpty(distinguishedName)) {
+ throw new RuntimeException("No DN (distinguished name) must not be null or empty");
+ }
+ try {
+ return new X500Name(RFC4519Style.INSTANCE, distinguishedName);
+ } catch (IllegalArgumentException e) {
+ String message = String.format("Invalid DN (distinguished name) specified for %s certificate.", distinguishedName);
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ private BigInteger generateNextCertificateSerialNumber() {
+ return BigInteger.valueOf(ID_COUNTER.incrementAndGet());
+ }
+}
+// CS-ENFORCE-SINGLE
diff --git a/src/integrationTest/java/org/opensearch/test/framework/certificate/CertificatesIssuerFactory.java b/src/integrationTest/java/org/opensearch/test/framework/certificate/CertificatesIssuerFactory.java
new file mode 100644
index 0000000000..f68ccf6022
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/certificate/CertificatesIssuerFactory.java
@@ -0,0 +1,68 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.certificate;
+
+import java.security.Provider;
+import java.util.Optional;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+import static org.opensearch.test.framework.certificate.AlgorithmKit.ecdsaSha256withEcdsa;
+import static org.opensearch.test.framework.certificate.AlgorithmKit.rsaSha256withRsa;
+
+/**
+* The class defines static factory method for class {@link CertificatesIssuer}. Object of class {@link CertificatesIssuer} created by
+* various factory methods differs in terms of cryptographic algorithms used for certificates creation.
+*
+*/
+class CertificatesIssuerFactory {
+
+ private static final int KEY_SIZE = 2048;
+
+ private CertificatesIssuerFactory() {
+
+ }
+
+ private static final Provider DEFAULT_SECURITY_PROVIDER = new BouncyCastleProvider();
+
+ /**
+ * @see {@link #rsaBaseCertificateIssuer(Provider)}
+ */
+ public static CertificatesIssuer rsaBaseCertificateIssuer() {
+ return rsaBaseCertificateIssuer(null);
+ }
+
+ /**
+ * The method creates {@link CertificatesIssuer} which uses RSA algorithm for certificate creation.
+ * @param securityProvider determines cryptographic algorithm implementation, can be null
.
+ * @return new instance of {@link CertificatesIssuer}
+ */
+ public static CertificatesIssuer rsaBaseCertificateIssuer(Provider securityProvider) {
+ Provider provider = Optional.ofNullable(securityProvider).orElse(DEFAULT_SECURITY_PROVIDER);
+ return new CertificatesIssuer(provider, rsaSha256withRsa(provider, KEY_SIZE));
+ }
+
+ /**
+ * {@link #rsaBaseCertificateIssuer(Provider)}
+ */
+ public static CertificatesIssuer ecdsaBaseCertificatesIssuer() {
+ return ecdsaBaseCertificatesIssuer(null);
+ }
+
+ /**
+ * It creates {@link CertificatesIssuer} which uses asymmetric cryptography algorithm which relays on elliptic curves.
+ * @param securityProvider determines cryptographic algorithm implementation, can be null
.
+ * @return new instance of {@link CertificatesIssuer}
+ */
+ public static CertificatesIssuer ecdsaBaseCertificatesIssuer(Provider securityProvider) {
+ Provider provider = Optional.ofNullable(securityProvider).orElse(DEFAULT_SECURITY_PROVIDER);
+ return new CertificatesIssuer(provider, ecdsaSha256withEcdsa(securityProvider, "P-384"));
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/certificate/PemConverter.java b/src/integrationTest/java/org/opensearch/test/framework/certificate/PemConverter.java
new file mode 100644
index 0000000000..749ab232bc
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/certificate/PemConverter.java
@@ -0,0 +1,119 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+/*
+* Copyright 2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+package org.opensearch.test.framework.certificate;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.security.PrivateKey;
+import java.security.SecureRandom;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.openssl.PKCS8Generator;
+import org.bouncycastle.openssl.jcajce.JcaPEMWriter;
+import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8EncryptorBuilder;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.bouncycastle.operator.OutputEncryptor;
+import org.bouncycastle.util.io.pem.PemGenerationException;
+import org.bouncycastle.util.io.pem.PemObject;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+* The class provides a method useful for converting certificate and private key into PEM format
+* @see RFC 1421
+*/
+class PemConverter {
+
+ private PemConverter() {}
+
+ private static final Logger log = LogManager.getLogger(PemConverter.class);
+ private static final SecureRandom secureRandom = new SecureRandom();
+
+ /**
+ * It converts certificate represented by {@link X509CertificateHolder} object to PEM format
+ * @param certificate is a certificate to convert
+ * @return {@link String} which contains PEM encoded certificate
+ */
+ public static String toPem(X509CertificateHolder certificate) {
+ StringWriter stringWriter = new StringWriter();
+ try (JcaPEMWriter writer = new JcaPEMWriter(stringWriter)) {
+ writer.writeObject(requireNonNull(certificate, "Certificate is required."));
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot write certificate in PEM format", e);
+ }
+ return stringWriter.toString();
+ }
+
+ /**
+ * It converts private key represented by class {@link PrivateKey} to PEM format.
+ * @param privateKey is a private key, cannot be null
+ * @param privateKeyPassword is a password used to encode private key, null
for unencrypted private key
+ * @return {@link String} which contains PEM encoded private key
+ */
+ public static String toPem(PrivateKey privateKey, String privateKeyPassword) {
+ try (StringWriter stringWriter = new StringWriter()) {
+ savePrivateKey(stringWriter, requireNonNull(privateKey, "Private key is required."), privateKeyPassword);
+ return stringWriter.toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot convert private key into PEM format.", e);
+ }
+ }
+
+ private static void savePrivateKey(Writer out, PrivateKey privateKey, String privateKeyPassword) {
+ try (JcaPEMWriter writer = new JcaPEMWriter(out)) {
+ writer.writeObject(createPkcs8PrivateKeyPem(privateKey, privateKeyPassword));
+ } catch (Exception e) {
+ log.error("Error while writing private key.", e);
+ throw new RuntimeException("Error while writing private key ", e);
+ }
+ }
+
+ private static PemObject createPkcs8PrivateKeyPem(PrivateKey privateKey, String password) {
+ try {
+ OutputEncryptor outputEncryptor = password == null ? null : getPasswordEncryptor(password);
+ return new PKCS8Generator(PrivateKeyInfo.getInstance(privateKey.getEncoded()), outputEncryptor).generate();
+ } catch (PemGenerationException | OperatorCreationException e) {
+ log.error("Creating PKCS8 private key failed", e);
+ throw new RuntimeException("Creating PKCS8 private key failed", e);
+ }
+ }
+
+ private static OutputEncryptor getPasswordEncryptor(String password) throws OperatorCreationException {
+ if (!Strings.isNullOrEmpty(password)) {
+ JceOpenSSLPKCS8EncryptorBuilder encryptorBuilder = new JceOpenSSLPKCS8EncryptorBuilder(PKCS8Generator.PBE_SHA1_3DES);
+ encryptorBuilder.setRandom(secureRandom);
+ encryptorBuilder.setPassword(password.toCharArray());
+ return encryptorBuilder.build();
+ }
+ return null;
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/certificate/PublicKeyUsage.java b/src/integrationTest/java/org/opensearch/test/framework/certificate/PublicKeyUsage.java
new file mode 100644
index 0000000000..af37c66001
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/certificate/PublicKeyUsage.java
@@ -0,0 +1,75 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.certificate;
+
+import java.util.Objects;
+
+import org.bouncycastle.asn1.x509.KeyPurposeId;
+import org.bouncycastle.asn1.x509.KeyUsage;
+
+// CS-SUPPRESS-SINGLE: RegexpSingleline Extension is used to refer to certificate extensions
+/**
+* The class is associated with certificate extensions related to key usages. These extensions are defined by
+* RFC 5280 and describes allowed usage of public kay which is embedded in
+* certificate. The class is related to the following extensions:
+*
+* - Key Usage, defined in section 4.2.1.3
+* - Extended Key Usage, defined in section 4.2.1.12
+*
+*
+* @see RFC 5280
+*/
+// CS-ENFORCE-SINGLE
+enum PublicKeyUsage {
+ DIGITAL_SIGNATURE(KeyUsage.digitalSignature),
+ KEY_CERT_SIGN(KeyUsage.keyCertSign),
+ CRL_SIGN(KeyUsage.cRLSign),
+ NON_REPUDIATION(KeyUsage.nonRepudiation),
+ KEY_ENCIPHERMENT(KeyUsage.keyEncipherment),
+
+ SERVER_AUTH(KeyPurposeId.id_kp_serverAuth),
+
+ CLIENT_AUTH(KeyPurposeId.id_kp_clientAuth);
+
+ private final int keyUsage;
+ private final KeyPurposeId id;
+
+ PublicKeyUsage(int keyUsage) {
+ this.keyUsage = keyUsage;
+ this.id = null;
+ }
+
+ PublicKeyUsage(KeyPurposeId id) {
+ this.id = Objects.requireNonNull(id, "Key purpose id is required.");
+ this.keyUsage = 0;
+ }
+
+ boolean isExtendedUsage() {
+ return this.id != null;
+ }
+
+ boolean isNotExtendedUsage() {
+ return this.id == null;
+ }
+
+ int asInt() {
+ if (isExtendedUsage()) {
+ throw new RuntimeException("Integer value is not available for extended key usage");
+ }
+ return keyUsage;
+ }
+
+ KeyPurposeId getKeyPurposeId() {
+ if (isExtendedUsage() == false) {
+ throw new RuntimeException("Key purpose id is not available.");
+ }
+ return id;
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/certificate/TestCertificates.java b/src/integrationTest/java/org/opensearch/test/framework/certificate/TestCertificates.java
new file mode 100644
index 0000000000..2dd1dd5eea
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/certificate/TestCertificates.java
@@ -0,0 +1,214 @@
+/*
+* Copyright 2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.certificate;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.CLIENT_AUTH;
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.CRL_SIGN;
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.DIGITAL_SIGNATURE;
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.KEY_CERT_SIGN;
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.KEY_ENCIPHERMENT;
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.NON_REPUDIATION;
+import static org.opensearch.test.framework.certificate.PublicKeyUsage.SERVER_AUTH;
+
+/**
+* It provides TLS certificates required in test cases. The certificates are generated during process of creation objects of the class.
+* The class exposes method which can be used to write certificates and private keys in temporally files.
+*/
+public class TestCertificates {
+
+ private static final Logger log = LogManager.getLogger(TestCertificates.class);
+
+ public static final Integer MAX_NUMBER_OF_NODE_CERTIFICATES = 3;
+
+ private static final String CA_SUBJECT = "DC=com,DC=example,O=Example Com Inc.,OU=Example Com Inc. Root CA,CN=Example Com Inc. Root CA";
+ private static final String ADMIN_DN = "CN=kirk,OU=client,O=client,L=test,C=de";
+ private static final int CERTIFICATE_VALIDITY_DAYS = 365;
+ private static final String CERTIFICATE_FILE_EXT = ".cert";
+ private static final String KEY_FILE_EXT = ".key";
+ private final CertificateData caCertificate;
+ private final CertificateData adminCertificate;
+ private final List nodeCertificates;
+
+ private final CertificateData ldapCertificate;
+
+ public TestCertificates() {
+ this.caCertificate = createCaCertificate();
+ this.nodeCertificates = IntStream.range(0, MAX_NUMBER_OF_NODE_CERTIFICATES)
+ .mapToObj(this::createNodeCertificate)
+ .collect(Collectors.toList());
+ this.ldapCertificate = createLdapCertificate();
+ this.adminCertificate = createAdminCertificate(ADMIN_DN);
+ log.info("Test certificates successfully generated");
+ }
+
+ private CertificateData createCaCertificate() {
+ CertificateMetadata metadata = CertificateMetadata.basicMetadata(CA_SUBJECT, CERTIFICATE_VALIDITY_DAYS)
+ .withKeyUsage(true, DIGITAL_SIGNATURE, KEY_CERT_SIGN, CRL_SIGN);
+ return CertificatesIssuerFactory.rsaBaseCertificateIssuer().issueSelfSignedCertificate(metadata);
+ }
+
+ public CertificateData createAdminCertificate(String adminDn) {
+ CertificateMetadata metadata = CertificateMetadata.basicMetadata(adminDn, CERTIFICATE_VALIDITY_DAYS)
+ .withKeyUsage(false, DIGITAL_SIGNATURE, NON_REPUDIATION, KEY_ENCIPHERMENT, CLIENT_AUTH);
+ return CertificatesIssuerFactory.rsaBaseCertificateIssuer().issueSignedCertificate(metadata, caCertificate);
+ }
+
+ public CertificateData createSelfSignedCertificate(String distinguishedName) {
+ CertificateMetadata metadata = CertificateMetadata.basicMetadata(distinguishedName, CERTIFICATE_VALIDITY_DAYS);
+ return CertificatesIssuerFactory.rsaBaseCertificateIssuer().issueSelfSignedCertificate(metadata);
+ }
+
+ /**
+ * It returns the most trusted certificate. Certificates for nodes and users are derived from this certificate.
+ * @return file which contains certificate in PEM format, defined by RFC 1421
+ */
+ public File getRootCertificate() {
+ return createTempFile("root", CERTIFICATE_FILE_EXT, caCertificate.certificateInPemFormat());
+ }
+
+ public CertificateData getRootCertificateData() {
+ return caCertificate;
+ }
+
+ /**
+ * Certificate for Open Search node. The certificate is derived from root certificate, returned by method {@link #getRootCertificate()}
+ * @param node is a node index. It has to be less than {@link #MAX_NUMBER_OF_NODE_CERTIFICATES}
+ * @return file which contains certificate in PEM format, defined by RFC 1421
+ */
+ public File getNodeCertificate(int node) {
+ CertificateData certificateData = getNodeCertificateData(node);
+ return createTempFile("node-" + node, CERTIFICATE_FILE_EXT, certificateData.certificateInPemFormat());
+ }
+
+ public CertificateData getNodeCertificateData(int node) {
+ isCorrectNodeNumber(node);
+ return nodeCertificates.get(node);
+ }
+
+ private void isCorrectNodeNumber(int node) {
+ if (node >= MAX_NUMBER_OF_NODE_CERTIFICATES) {
+ String message = String.format(
+ "Cannot get certificate for node %d, number of created certificates for nodes is %d",
+ node,
+ MAX_NUMBER_OF_NODE_CERTIFICATES
+ );
+ throw new RuntimeException(message);
+ }
+ }
+
+ private CertificateData createNodeCertificate(Integer node) {
+ String subject = String.format("DC=de,L=test,O=node,OU=node,CN=node-%d.example.com", node);
+ String domain = String.format("node-%d.example.com", node);
+ CertificateMetadata metadata = CertificateMetadata.basicMetadata(subject, CERTIFICATE_VALIDITY_DAYS)
+ .withKeyUsage(false, DIGITAL_SIGNATURE, NON_REPUDIATION, KEY_ENCIPHERMENT, CLIENT_AUTH, SERVER_AUTH)
+ .withSubjectAlternativeName("1.2.3.4.5.5", List.of(domain, "localhost"), "127.0.0.1");
+ return CertificatesIssuerFactory.rsaBaseCertificateIssuer().issueSignedCertificate(metadata, caCertificate);
+ }
+
+ public CertificateData issueUserCertificate(String organizationUnit, String username) {
+ String subject = String.format("DC=de,L=test,O=users,OU=%s,CN=%s", organizationUnit, username);
+ CertificateMetadata metadata = CertificateMetadata.basicMetadata(subject, CERTIFICATE_VALIDITY_DAYS)
+ .withKeyUsage(false, DIGITAL_SIGNATURE, NON_REPUDIATION, KEY_ENCIPHERMENT, CLIENT_AUTH, SERVER_AUTH);
+ return CertificatesIssuerFactory.rsaBaseCertificateIssuer().issueSignedCertificate(metadata, caCertificate);
+ }
+
+ private CertificateData createLdapCertificate() {
+ String subject = "DC=de,L=test,O=node,OU=node,CN=ldap.example.com";
+ CertificateMetadata metadata = CertificateMetadata.basicMetadata(subject, CERTIFICATE_VALIDITY_DAYS)
+ .withKeyUsage(false, DIGITAL_SIGNATURE, NON_REPUDIATION, KEY_ENCIPHERMENT, CLIENT_AUTH, SERVER_AUTH)
+ .withSubjectAlternativeName(null, List.of("localhost"), "127.0.0.1");
+ return CertificatesIssuerFactory.rsaBaseCertificateIssuer().issueSignedCertificate(metadata, caCertificate);
+ }
+
+ public CertificateData getLdapCertificateData() {
+ return ldapCertificate;
+ }
+
+ /**
+ * It returns private key associated with node certificate returned by method {@link #getNodeCertificate(int)}
+ *
+ * @param node is a node index. It has to be less than {@link #MAX_NUMBER_OF_NODE_CERTIFICATES}
+ * @param privateKeyPassword is a password used to encode private key, can be null
to retrieve unencrypted key.
+ * @return file which contains private key encoded in PEM format, defined
+ * by RFC 1421
+ */
+ public File getNodeKey(int node, String privateKeyPassword) {
+ CertificateData certificateData = nodeCertificates.get(node);
+ return createTempFile("node-" + node, KEY_FILE_EXT, certificateData.privateKeyInPemFormat(privateKeyPassword));
+ }
+
+ /**
+ * Certificate which proofs admin user identity. Certificate is derived from root certificate returned by
+ * method {@link #getRootCertificate()}
+ * @return file which contains certificate in PEM format, defined by RFC 1421
+ */
+ public File getAdminCertificate() {
+ return createTempFile("admin", CERTIFICATE_FILE_EXT, adminCertificate.certificateInPemFormat());
+ }
+
+ public CertificateData getAdminCertificateData() {
+ return adminCertificate;
+ }
+
+ /**
+ * It returns private key associated with admin certificate returned by {@link #getAdminCertificate()}.
+ *
+ * @param privateKeyPassword is a password used to encode private key, can be null
to retrieve unencrypted key.
+ * @return file which contains private key encoded in PEM format, defined
+ * by RFC 1421
+ */
+ public File getAdminKey(String privateKeyPassword) {
+ return createTempFile("admin", KEY_FILE_EXT, adminCertificate.privateKeyInPemFormat(privateKeyPassword));
+ }
+
+ public String[] getAdminDNs() {
+ return new String[] { ADMIN_DN };
+ }
+
+ private File createTempFile(String name, String suffix, String contents) {
+ try {
+ Path path = Files.createTempFile(name, suffix);
+ Files.writeString(path, contents);
+ return path.toFile();
+ } catch (IOException ex) {
+ throw new RuntimeException("Cannot create temp file with name " + name + " and suffix " + suffix);
+ }
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/CloseableHttpClientFactory.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/CloseableHttpClientFactory.java
new file mode 100644
index 0000000000..a6a0324b27
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/CloseableHttpClientFactory.java
@@ -0,0 +1,81 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.routing.HttpRoutePlanner;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+class CloseableHttpClientFactory {
+
+ private final SSLContext sslContext;
+
+ private final RequestConfig requestConfig;
+
+ private final HttpRoutePlanner routePlanner;
+
+ private final String[] supportedCipherSuites;
+
+ public CloseableHttpClientFactory(
+ SSLContext sslContext,
+ RequestConfig requestConfig,
+ HttpRoutePlanner routePlanner,
+ String[] supportedCipherSuit
+ ) {
+ this.sslContext = Objects.requireNonNull(sslContext, "SSL context is required.");
+ this.requestConfig = requestConfig;
+ this.routePlanner = routePlanner;
+ this.supportedCipherSuites = supportedCipherSuit;
+ }
+
+ public CloseableHttpClient getHTTPClient() {
+
+ final HttpClientBuilder hcb = HttpClients.custom();
+
+ final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(
+ this.sslContext,
+ /* Uses default supported protocals */ null,
+ supportedCipherSuites,
+ NoopHostnameVerifier.INSTANCE
+ );
+
+ final HttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(
+ RegistryBuilder.create().register("https", sslsf).build(),
+ /* Uses default connnction factory */ null,
+ /* Uses default scheme port resolver */ null,
+ /* Uses default dns resolver */ null,
+ 60,
+ TimeUnit.SECONDS
+ );
+ hcb.setConnectionManager(cm);
+ if (routePlanner != null) {
+ hcb.setRoutePlanner(routePlanner);
+ }
+
+ if (requestConfig != null) {
+ hcb.setDefaultRequestConfig(requestConfig);
+ }
+
+ return hcb.build();
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/ClusterManager.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/ClusterManager.java
new file mode 100644
index 0000000000..db786a65e9
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/ClusterManager.java
@@ -0,0 +1,172 @@
+/*
+* Copyright 2015-2017 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.opensearch.index.reindex.ReindexPlugin;
+import org.opensearch.join.ParentJoinPlugin;
+import org.opensearch.percolator.PercolatorPlugin;
+import org.opensearch.plugins.Plugin;
+import org.opensearch.search.aggregations.matrix.MatrixAggregationPlugin;
+import org.opensearch.security.OpenSearchSecurityPlugin;
+import org.opensearch.transport.Netty4Plugin;
+
+import static java.util.Collections.unmodifiableList;
+import static org.opensearch.test.framework.cluster.NodeType.CLIENT;
+import static org.opensearch.test.framework.cluster.NodeType.CLUSTER_MANAGER;
+import static org.opensearch.test.framework.cluster.NodeType.DATA;
+
+public enum ClusterManager {
+ // 3 nodes (1m, 2d)
+ DEFAULT(new NodeSettings(NodeRole.CLUSTER_MANAGER), new NodeSettings(NodeRole.DATA), new NodeSettings(NodeRole.DATA)),
+
+ // 1 node (1md)
+ SINGLENODE(new NodeSettings(NodeRole.CLUSTER_MANAGER, NodeRole.DATA)),
+
+ SINGLE_REMOTE_CLIENT(new NodeSettings(NodeRole.CLUSTER_MANAGER, NodeRole.DATA, NodeRole.REMOTE_CLUSTER_CLIENT)),
+
+ // 4 node (1m, 2d, 1c)
+ CLIENTNODE(
+ new NodeSettings(NodeRole.CLUSTER_MANAGER),
+ new NodeSettings(NodeRole.DATA),
+ new NodeSettings(NodeRole.DATA),
+ new NodeSettings()
+ ),
+
+ THREE_CLUSTER_MANAGERS(
+ new NodeSettings(NodeRole.CLUSTER_MANAGER),
+ new NodeSettings(NodeRole.CLUSTER_MANAGER),
+ new NodeSettings(NodeRole.CLUSTER_MANAGER),
+ new NodeSettings(NodeRole.DATA),
+ new NodeSettings(NodeRole.DATA)
+ );
+
+ private List nodeSettings = new LinkedList<>();
+
+ private ClusterManager(NodeSettings... settings) {
+ nodeSettings.addAll(Arrays.asList(settings));
+ }
+
+ public List getNodeSettings() {
+ return unmodifiableList(nodeSettings);
+ }
+
+ public List getClusterManagerNodeSettings() {
+ return unmodifiableList(nodeSettings.stream().filter(a -> a.containRole(NodeRole.CLUSTER_MANAGER)).collect(Collectors.toList()));
+ }
+
+ public List getNonClusterManagerNodeSettings() {
+ return unmodifiableList(nodeSettings.stream().filter(a -> !a.containRole(NodeRole.CLUSTER_MANAGER)).collect(Collectors.toList()));
+ }
+
+ public int getNodes() {
+ return nodeSettings.size();
+ }
+
+ public int getClusterManagerNodes() {
+ return (int) nodeSettings.stream().filter(a -> a.containRole(NodeRole.CLUSTER_MANAGER)).count();
+ }
+
+ public int getDataNodes() {
+ return (int) nodeSettings.stream().filter(a -> a.containRole(NodeRole.DATA)).count();
+ }
+
+ public int getClientNodes() {
+ return (int) nodeSettings.stream().filter(a -> a.isClientNode()).count();
+ }
+
+ public static class NodeSettings {
+
+ private final static List> DEFAULT_PLUGINS = List.of(
+ Netty4Plugin.class,
+ OpenSearchSecurityPlugin.class,
+ MatrixAggregationPlugin.class,
+ ParentJoinPlugin.class,
+ PercolatorPlugin.class,
+ ReindexPlugin.class
+ );
+
+ private final Set roles;
+ public final List> plugins;
+
+ public NodeSettings(NodeRole... roles) {
+ this(roles.length == 0 ? Collections.emptySet() : EnumSet.copyOf(Arrays.asList(roles)), Collections.emptyList());
+ }
+
+ public NodeSettings(Set roles, List> additionalPlugins) {
+ super();
+ this.roles = Objects.requireNonNull(roles, "Node roles set must not be null");
+ this.plugins = mergePlugins(additionalPlugins, DEFAULT_PLUGINS);
+ }
+
+ public boolean containRole(NodeRole nodeRole) {
+ return roles.contains(nodeRole);
+ }
+
+ public boolean isClientNode() {
+ return (roles.contains(NodeRole.DATA) == false) && (roles.contains(NodeRole.CLUSTER_MANAGER));
+ }
+
+ NodeType recognizeNodeType() {
+ if (roles.contains(NodeRole.CLUSTER_MANAGER)) {
+ return CLUSTER_MANAGER;
+ } else if (roles.contains(NodeRole.DATA)) {
+ return DATA;
+ } else {
+ return CLIENT;
+ }
+ }
+
+ private List> mergePlugins(Collection>... plugins) {
+ List> mergedPlugins = Arrays.stream(plugins)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ return unmodifiableList(mergedPlugins);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Class extends Plugin>[] getPlugins() {
+ return plugins.toArray(new Class[0]);
+ }
+
+ public Class extends Plugin>[] pluginsWithAddition(List> additionalPlugins) {
+ return mergePlugins(plugins, additionalPlugins).toArray(Class[]::new);
+ }
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/ContextHeaderDecoratorClient.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/ContextHeaderDecoratorClient.java
new file mode 100644
index 0000000000..c6ddf3281a
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/ContextHeaderDecoratorClient.java
@@ -0,0 +1,55 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.core.action.ActionResponse;
+import org.opensearch.action.ActionType;
+import org.opensearch.action.support.ContextPreservingActionListener;
+import org.opensearch.client.Client;
+import org.opensearch.client.FilterClient;
+import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
+
+/**
+* The class adds provided headers into context before sending request via wrapped {@link Client}
+*/
+public class ContextHeaderDecoratorClient extends FilterClient {
+
+ private Map headers;
+
+ public ContextHeaderDecoratorClient(Client in, Map headers) {
+ super(in);
+ this.headers = headers != null ? headers : Collections.emptyMap();
+ }
+
+ @Override
+ protected void doExecute(
+ ActionType action,
+ Request request,
+ ActionListener listener
+ ) {
+
+ ThreadContext threadContext = threadPool().getThreadContext();
+ ContextPreservingActionListener wrappedListener = new ContextPreservingActionListener<>(
+ threadContext.newRestorableContext(true),
+ listener
+ );
+
+ try (StoredContext ctx = threadContext.stashContext()) {
+ threadContext.putHeader(this.headers);
+ super.doExecute(action, request, wrappedListener);
+ }
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalAddressRoutePlanner.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalAddressRoutePlanner.java
new file mode 100644
index 0000000000..09d8b2b6de
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalAddressRoutePlanner.java
@@ -0,0 +1,56 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.impl.conn.DefaultRoutePlanner;
+import org.apache.http.impl.conn.DefaultSchemePortResolver;
+import org.apache.http.protocol.HttpContext;
+
+/**
+* Class which can be used to bind Apache HTTP client to a particular network interface or its IP address so that the IP address of
+* network interface is used as a source IP address of HTTP request.
+*/
+class LocalAddressRoutePlanner extends DefaultRoutePlanner {
+
+ /**
+ * IP address of one of the local network interfaces.
+ */
+ private final InetAddress localAddress;
+
+ /**
+ * Creates {@link LocalAddressRoutePlanner}
+ * @param localAddress IP address of one of the local network interfaces. Client socket used by Apache HTTP client will be bind to
+ * address from this parameter. The parameter must not be null
.
+ */
+ public LocalAddressRoutePlanner(InetAddress localAddress) {
+ super(DefaultSchemePortResolver.INSTANCE);
+ this.localAddress = Objects.requireNonNull(localAddress);
+ }
+
+ @Override
+ public HttpRoute determineRoute(final HttpHost host, final HttpRequest request, final HttpContext context) throws HttpException {
+ final HttpClientContext clientContext = HttpClientContext.adapt(context);
+ final RequestConfig localRequsetConfig = RequestConfig.copy(clientContext.getRequestConfig())
+ .setLocalAddress(this.localAddress)
+ .build();
+ clientContext.setRequestConfig(localRequsetConfig);
+
+ return super.determineRoute(host, request, clientContext);
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java
new file mode 100644
index 0000000000..539e15fb57
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java
@@ -0,0 +1,528 @@
+/*
+* Copyright 2015-2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.rules.ExternalResource;
+
+import org.opensearch.client.Client;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.node.PluginAwareNode;
+import org.opensearch.plugins.Plugin;
+import org.opensearch.security.action.configupdate.ConfigUpdateAction;
+import org.opensearch.security.action.configupdate.ConfigUpdateRequest;
+import org.opensearch.security.action.configupdate.ConfigUpdateResponse;
+import org.opensearch.security.securityconf.impl.CType;
+import org.opensearch.security.support.ConfigConstants;
+import org.opensearch.test.framework.AuditConfiguration;
+import org.opensearch.test.framework.AuthFailureListeners;
+import org.opensearch.test.framework.AuthzDomain;
+import org.opensearch.test.framework.RolesMapping;
+import org.opensearch.test.framework.TestIndex;
+import org.opensearch.test.framework.TestSecurityConfig;
+import org.opensearch.test.framework.TestSecurityConfig.Role;
+import org.opensearch.test.framework.XffConfig;
+import org.opensearch.test.framework.audit.TestRuleAuditLogSink;
+import org.opensearch.test.framework.certificate.CertificateData;
+import org.opensearch.test.framework.certificate.TestCertificates;
+
+/**
+* This class allows to you start and manage a local cluster in an integration test. In contrast to the
+* OpenSearchIntegTestCase class, this class can be used in a composite way and allows the specification
+* of the security plugin configuration.
+*
+* This class can be both used as a JUnit @ClassRule (preferred) or in a try-with-resources block. The latter way should
+* be only sparingly used, as starting a cluster is not a particularly fast operation.
+*/
+public class LocalCluster extends ExternalResource implements AutoCloseable, OpenSearchClientProvider {
+
+ private static final Logger log = LogManager.getLogger(LocalCluster.class);
+
+ public static final String INIT_CONFIGURATION_DIR = "security.default_init.dir";
+
+ protected static final AtomicLong num = new AtomicLong();
+
+ private boolean sslOnly;
+
+ private final List> plugins;
+ private final ClusterManager clusterManager;
+ private final TestSecurityConfig testSecurityConfig;
+ private Settings nodeOverride;
+ private final String clusterName;
+ private final MinimumSecuritySettingsSupplierFactory minimumOpenSearchSettingsSupplierFactory;
+ private final TestCertificates testCertificates;
+ private final List clusterDependencies;
+ private final Map remotes;
+ private volatile LocalOpenSearchCluster localOpenSearchCluster;
+ private final List testIndices;
+
+ private boolean loadConfigurationIntoIndex;
+
+ private LocalCluster(
+ String clusterName,
+ TestSecurityConfig testSgConfig,
+ boolean sslOnly,
+ Settings nodeOverride,
+ ClusterManager clusterManager,
+ List> plugins,
+ TestCertificates testCertificates,
+ List clusterDependencies,
+ Map remotes,
+ List testIndices,
+ boolean loadConfigurationIntoIndex,
+ String defaultConfigurationInitDirectory
+ ) {
+ this.plugins = plugins;
+ this.testCertificates = testCertificates;
+ this.clusterManager = clusterManager;
+ this.testSecurityConfig = testSgConfig;
+ this.sslOnly = sslOnly;
+ this.nodeOverride = nodeOverride;
+ this.clusterName = clusterName;
+ this.minimumOpenSearchSettingsSupplierFactory = new MinimumSecuritySettingsSupplierFactory(testCertificates);
+ this.remotes = remotes;
+ this.clusterDependencies = clusterDependencies;
+ this.testIndices = testIndices;
+ this.loadConfigurationIntoIndex = loadConfigurationIntoIndex;
+ if (StringUtils.isNoneBlank(defaultConfigurationInitDirectory)) {
+ System.setProperty(INIT_CONFIGURATION_DIR, defaultConfigurationInitDirectory);
+ }
+ }
+
+ public String getSnapshotDirPath() {
+ return localOpenSearchCluster.getSnapshotDirPath();
+ }
+
+ @Override
+ public void before() throws Throwable {
+ if (localOpenSearchCluster == null) {
+ for (LocalCluster dependency : clusterDependencies) {
+ if (!dependency.isStarted()) {
+ dependency.before();
+ }
+ }
+
+ for (Map.Entry entry : remotes.entrySet()) {
+ @SuppressWarnings("resource")
+ InetSocketAddress transportAddress = entry.getValue().localOpenSearchCluster.clusterManagerNode().getTransportAddress();
+ String key = "cluster.remote." + entry.getKey() + ".seeds";
+ String value = transportAddress.getHostString() + ":" + transportAddress.getPort();
+ log.info("Remote cluster '{}' added to configuration with the following seed '{}'", key, value);
+ nodeOverride = Settings.builder().put(nodeOverride).putList(key, value).build();
+ }
+ start();
+ }
+ }
+
+ @Override
+ protected void after() {
+ System.clearProperty(INIT_CONFIGURATION_DIR);
+ close();
+ }
+
+ @Override
+ public void close() {
+ if (localOpenSearchCluster != null && localOpenSearchCluster.isStarted()) {
+ try {
+ localOpenSearchCluster.destroy();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ localOpenSearchCluster = null;
+ }
+ }
+ }
+
+ @Override
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ @Override
+ public InetSocketAddress getHttpAddress() {
+ return localOpenSearchCluster.clientNode().getHttpAddress();
+ }
+
+ public int getHttpPort() {
+ return getHttpAddress().getPort();
+ }
+
+ @Override
+ public InetSocketAddress getTransportAddress() {
+ return localOpenSearchCluster.clientNode().getTransportAddress();
+ }
+
+ /**
+ * Returns a Client object that performs cluster-internal requests. As these requests are regard as cluster-internal,
+ * no authentication is performed and no user-information is attached to these requests. Thus, this client should
+ * be only used for preparing test environments, but not as a test subject.
+ */
+ public Client getInternalNodeClient() {
+ return localOpenSearchCluster.clientNode().getInternalNodeClient();
+ }
+
+ /**
+ * Returns a random node of this cluster.
+ */
+ public PluginAwareNode node() {
+ return this.localOpenSearchCluster.clusterManagerNode().esNode();
+ }
+
+ /**
+ * Returns all nodes of this cluster.
+ */
+ public List nodes() {
+ return this.localOpenSearchCluster.getNodes();
+ }
+
+ public LocalOpenSearchCluster.Node getNodeByName(String name) {
+ return this.localOpenSearchCluster.getNodeByName(name);
+ }
+
+ public boolean isStarted() {
+ return localOpenSearchCluster != null;
+ }
+
+ public List getConfiguredUsers() {
+ return testSecurityConfig.getUsers();
+ }
+
+ public Random getRandom() {
+ return localOpenSearchCluster.getRandom();
+ }
+
+ private void start() {
+ try {
+ NodeSettingsSupplier nodeSettingsSupplier = minimumOpenSearchSettingsSupplierFactory.minimumOpenSearchSettings(
+ sslOnly,
+ nodeOverride
+ );
+ localOpenSearchCluster = new LocalOpenSearchCluster(
+ clusterName,
+ clusterManager,
+ nodeSettingsSupplier,
+ plugins,
+ testCertificates
+ );
+
+ localOpenSearchCluster.start();
+
+ if (loadConfigurationIntoIndex) {
+ initSecurityIndex(testSecurityConfig);
+ }
+
+ try (Client client = getInternalNodeClient()) {
+ for (TestIndex index : this.testIndices) {
+ index.create(client);
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("Local ES cluster start failed", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initSecurityIndex(TestSecurityConfig testSecurityConfig) {
+ log.info("Initializing OpenSearch Security index");
+ try (
+ Client client = new ContextHeaderDecoratorClient(
+ this.getInternalNodeClient(),
+ Map.of(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true")
+ )
+ ) {
+ testSecurityConfig.initIndex(client);
+ triggerConfigurationReload(client);
+ }
+ }
+
+ public void updateUserConfiguration(List users) {
+ try (
+ Client client = new ContextHeaderDecoratorClient(
+ this.getInternalNodeClient(),
+ Map.of(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true")
+ )
+ ) {
+ testSecurityConfig.updateInternalUsersConfiguration(client, users);
+ triggerConfigurationReload(client);
+ }
+ }
+
+ private static void triggerConfigurationReload(Client client) {
+ ConfigUpdateResponse configUpdateResponse = client.execute(
+ ConfigUpdateAction.INSTANCE,
+ new ConfigUpdateRequest(CType.lcStringValues().toArray(new String[0]))
+ ).actionGet();
+ if (configUpdateResponse.hasFailures()) {
+ throw new RuntimeException("ConfigUpdateResponse produced failures: " + configUpdateResponse.failures());
+ }
+ }
+
+ public CertificateData getAdminCertificate() {
+ return testCertificates.getAdminCertificateData();
+ }
+
+ public static class Builder {
+
+ private final Settings.Builder nodeOverrideSettingsBuilder = Settings.builder();
+
+ private boolean sslOnly = false;
+ private final List> plugins = new ArrayList<>();
+ private Map remoteClusters = new HashMap<>();
+ private List clusterDependencies = new ArrayList<>();
+ private List testIndices = new ArrayList<>();
+ private ClusterManager clusterManager = ClusterManager.DEFAULT;
+ private TestSecurityConfig testSecurityConfig = new TestSecurityConfig();
+ private String clusterName = "local_cluster";
+ private TestCertificates testCertificates;
+
+ private boolean loadConfigurationIntoIndex = true;
+
+ private String defaultConfigurationInitDirectory = null;
+
+ public Builder() {}
+
+ public Builder dependsOn(Object object) {
+ // We just want to make sure that the object is already done
+ if (object == null) {
+ throw new IllegalStateException("Dependency not fulfilled");
+ }
+ return this;
+ }
+
+ public Builder clusterManager(ClusterManager clusterManager) {
+ this.clusterManager = clusterManager;
+ return this;
+ }
+
+ /**
+ * Starts a cluster with only one node and thus saves some resources during startup. This shall be only used
+ * for tests where the node interactions are not relevant to the test. An example for this would be
+ * authentication tests, as authentication is always done on the directly connected node.
+ */
+ public Builder singleNode() {
+ this.clusterManager = ClusterManager.SINGLENODE;
+ return this;
+ }
+
+ /**
+ * Specifies the configuration of the security plugin that shall be used by this cluster.
+ */
+ public Builder config(TestSecurityConfig testSecurityConfig) {
+ this.testSecurityConfig = testSecurityConfig;
+ return this;
+ }
+
+ public Builder sslOnly(boolean sslOnly) {
+ this.sslOnly = sslOnly;
+ return this;
+ }
+
+ public Builder nodeSettings(Map settings) {
+ settings.forEach((key, value) -> {
+ if (value instanceof List) {
+ List values = ((List>) value).stream().map(String::valueOf).collect(Collectors.toList());
+ nodeOverrideSettingsBuilder.putList(key, values);
+ } else {
+ nodeOverrideSettingsBuilder.put(key, String.valueOf(value));
+ }
+ });
+
+ return this;
+ }
+
+ /**
+ * Adds additional plugins to the cluster
+ */
+ public Builder plugin(Class extends Plugin> plugin) {
+ this.plugins.add(plugin);
+
+ return this;
+ }
+
+ public Builder authFailureListeners(AuthFailureListeners listener) {
+ testSecurityConfig.authFailureListeners(listener);
+ return this;
+ }
+
+ /**
+ * Specifies a remote cluster and its name. The remote cluster can be then used in Cross Cluster Search
+ * operations with the specified name.
+ */
+ public Builder remote(String name, LocalCluster anotherCluster) {
+ remoteClusters.put(name, anotherCluster);
+
+ clusterDependencies.add(anotherCluster);
+
+ return this;
+ }
+
+ /**
+ * Specifies test indices that shall be created upon startup of the cluster.
+ */
+ public Builder indices(TestIndex... indices) {
+ this.testIndices.addAll(Arrays.asList(indices));
+ return this;
+ }
+
+ public Builder users(TestSecurityConfig.User... users) {
+ for (TestSecurityConfig.User user : users) {
+ testSecurityConfig.user(user);
+ }
+ return this;
+ }
+
+ public Builder audit(AuditConfiguration auditConfiguration) {
+ if (auditConfiguration != null) {
+ testSecurityConfig.audit(auditConfiguration);
+ }
+ if (auditConfiguration.isEnabled()) {
+ nodeOverrideSettingsBuilder.put("plugins.security.audit.type", TestRuleAuditLogSink.class.getName());
+ } else {
+ nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "noop");
+ }
+ return this;
+ }
+
+ public List getUsers() {
+ return testSecurityConfig.getUsers();
+ }
+
+ public Builder roles(Role... roles) {
+ testSecurityConfig.roles(roles);
+ return this;
+ }
+
+ public Builder rolesMapping(RolesMapping... mappings) {
+ testSecurityConfig.rolesMapping(mappings);
+ return this;
+ }
+
+ public Builder authc(TestSecurityConfig.AuthcDomain authc) {
+ testSecurityConfig.authc(authc);
+ return this;
+ }
+
+ public Builder authz(AuthzDomain authzDomain) {
+ testSecurityConfig.authz(authzDomain);
+ return this;
+ }
+
+ public Builder clusterName(String clusterName) {
+ this.clusterName = clusterName;
+ return this;
+ }
+
+ public Builder configIndexName(String configIndexName) {
+ testSecurityConfig.configIndexName(configIndexName);
+ return this;
+ }
+
+ public Builder testCertificates(TestCertificates certificates) {
+ this.testCertificates = certificates;
+ return this;
+ }
+
+ public Builder anonymousAuth(boolean anonAuthEnabled) {
+ testSecurityConfig.anonymousAuth(anonAuthEnabled);
+ return this;
+ }
+
+ public Builder xff(XffConfig xffConfig) {
+ testSecurityConfig.xff(xffConfig);
+ return this;
+ }
+
+ public Builder loadConfigurationIntoIndex(boolean loadConfigurationIntoIndex) {
+ this.loadConfigurationIntoIndex = loadConfigurationIntoIndex;
+ return this;
+ }
+
+ public Builder certificates(TestCertificates certificates) {
+ this.testCertificates = certificates;
+ return this;
+ }
+
+ public Builder doNotFailOnForbidden(boolean doNotFailOnForbidden) {
+ testSecurityConfig.doNotFailOnForbidden(doNotFailOnForbidden);
+ return this;
+ }
+
+ public Builder defaultConfigurationInitDirectory(String defaultConfigurationInitDirectory) {
+ this.defaultConfigurationInitDirectory = defaultConfigurationInitDirectory;
+ return this;
+ }
+
+ public LocalCluster build() {
+ try {
+ if (testCertificates == null) {
+ testCertificates = new TestCertificates();
+ }
+ clusterName += "_" + num.incrementAndGet();
+ Settings settings = nodeOverrideSettingsBuilder.build();
+ return new LocalCluster(
+ clusterName,
+ testSecurityConfig,
+ sslOnly,
+ settings,
+ clusterManager,
+ plugins,
+ testCertificates,
+ clusterDependencies,
+ remoteClusters,
+ testIndices,
+ loadConfigurationIntoIndex,
+ defaultConfigurationInitDirectory
+ );
+ } catch (Exception e) {
+ log.error("Failed to build LocalCluster", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @Override
+ public TestCertificates getTestCertificates() {
+ return testCertificates;
+ }
+
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java
new file mode 100644
index 0000000000..c09127e592
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java
@@ -0,0 +1,573 @@
+/*
+* Copyright 2015-2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.InetAddresses;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.opensearch.client.AdminClient;
+import org.opensearch.client.Client;
+import org.opensearch.cluster.health.ClusterHealthStatus;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.core.common.Strings;
+import org.opensearch.http.BindHttpException;
+import org.opensearch.node.PluginAwareNode;
+import org.opensearch.plugins.Plugin;
+import org.opensearch.test.framework.certificate.TestCertificates;
+import org.opensearch.test.framework.cluster.ClusterManager.NodeSettings;
+import org.opensearch.transport.BindTransportException;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+import static org.opensearch.test.framework.cluster.NodeType.CLIENT;
+import static org.opensearch.test.framework.cluster.NodeType.CLUSTER_MANAGER;
+import static org.opensearch.test.framework.cluster.NodeType.DATA;
+import static org.opensearch.test.framework.cluster.PortAllocator.TCP;
+
+/**
+* Encapsulates all the logic to start a local OpenSearch cluster - without any configuration of the security plugin.
+*
+* The security plugin configuration is the job of LocalCluster, which uses this class under the hood. Thus, test code
+* for the security plugin should always use LocalCluster.
+*/
+public class LocalOpenSearchCluster {
+
+ static {
+ System.setProperty("opensearch.enforce.bootstrap.checks", "true");
+ }
+
+ private static final Logger log = LogManager.getLogger(LocalOpenSearchCluster.class);
+
+ private final String clusterName;
+ private final ClusterManager clusterManager;
+ private final NodeSettingsSupplier nodeSettingsSupplier;
+ private final List> additionalPlugins;
+ private final List nodes = new ArrayList<>();
+ private final TestCertificates testCertificates;
+
+ private File clusterHomeDir;
+ private List seedHosts;
+ private List initialClusterManagerHosts;
+ private int retry = 0;
+ private boolean started;
+ private Random random = new Random();
+
+ private File snapshotDir;
+
+ public LocalOpenSearchCluster(
+ String clusterName,
+ ClusterManager clusterManager,
+ NodeSettingsSupplier nodeSettingsSupplier,
+ List> additionalPlugins,
+ TestCertificates testCertificates
+ ) {
+ this.clusterName = clusterName;
+ this.clusterManager = clusterManager;
+ this.nodeSettingsSupplier = nodeSettingsSupplier;
+ this.additionalPlugins = additionalPlugins;
+ this.testCertificates = testCertificates;
+ try {
+ createClusterDirectory(clusterName);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public String getSnapshotDirPath() {
+ return snapshotDir.getAbsolutePath();
+ }
+
+ private void createClusterDirectory(String clusterName) throws IOException {
+ this.clusterHomeDir = Files.createTempDirectory("local_cluster_" + clusterName).toFile();
+ log.debug("Cluster home directory '{}'.", clusterHomeDir.getAbsolutePath());
+ this.snapshotDir = new File(this.clusterHomeDir, "snapshots");
+ this.snapshotDir.mkdir();
+ }
+
+ private List getNodesByType(NodeType nodeType) {
+ return nodes.stream().filter(currentNode -> currentNode.hasAssignedType(nodeType)).collect(Collectors.toList());
+ }
+
+ private long countNodesByType(NodeType nodeType) {
+ return getNodesByType(nodeType).stream().count();
+ }
+
+ public void start() throws Exception {
+ log.info("Starting {}", clusterName);
+
+ int clusterManagerNodeCount = clusterManager.getClusterManagerNodes();
+ int nonClusterManagerNodeCount = clusterManager.getDataNodes() + clusterManager.getClientNodes();
+
+ SortedSet clusterManagerNodeTransportPorts = TCP.allocate(
+ clusterName,
+ Math.max(clusterManagerNodeCount, 4),
+ 5000 + 42 * 1000 + 300
+ );
+ SortedSet clusterManagerNodeHttpPorts = TCP.allocate(clusterName, clusterManagerNodeCount, 5000 + 42 * 1000 + 200);
+
+ this.seedHosts = toHostList(clusterManagerNodeTransportPorts);
+ Set clusterManagerPorts = clusterManagerNodeTransportPorts.stream()
+ .limit(clusterManagerNodeCount)
+ .collect(Collectors.toSet());
+ this.initialClusterManagerHosts = toHostList(clusterManagerPorts);
+
+ started = true;
+
+ CompletableFuture clusterManagerNodeFuture = startNodes(
+ clusterManager.getClusterManagerNodeSettings(),
+ clusterManagerNodeTransportPorts,
+ clusterManagerNodeHttpPorts
+ );
+
+ SortedSet nonClusterManagerNodeTransportPorts = TCP.allocate(
+ clusterName,
+ nonClusterManagerNodeCount,
+ 5000 + 42 * 1000 + 310
+ );
+ SortedSet nonClusterManagerNodeHttpPorts = TCP.allocate(clusterName, nonClusterManagerNodeCount, 5000 + 42 * 1000 + 210);
+
+ CompletableFuture nonClusterManagerNodeFuture = startNodes(
+ clusterManager.getNonClusterManagerNodeSettings(),
+ nonClusterManagerNodeTransportPorts,
+ nonClusterManagerNodeHttpPorts
+ );
+
+ CompletableFuture.allOf(clusterManagerNodeFuture, nonClusterManagerNodeFuture).join();
+
+ if (isNodeFailedWithPortCollision()) {
+ log.info("Detected port collision for cluster manager node. Retrying.");
+
+ retry();
+ return;
+ }
+
+ log.info("Startup finished. Waiting for GREEN");
+
+ waitForCluster(ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(10), nodes.size());
+
+ log.info("Started: {}", this);
+
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public void stop() {
+ List> stopFutures = new ArrayList<>();
+ for (Node node : nodes) {
+ stopFutures.add(node.stop(2, TimeUnit.SECONDS));
+ }
+ CompletableFuture.allOf(stopFutures.toArray(size -> new CompletableFuture[size])).join();
+ }
+
+ public void destroy() {
+ stop();
+ nodes.clear();
+
+ try {
+ FileUtils.deleteDirectory(clusterHomeDir);
+ } catch (IOException e) {
+ log.warn("Error while deleting " + clusterHomeDir, e);
+ }
+ }
+
+ public Node clientNode() {
+ return findRunningNode(getNodesByType(CLIENT), getNodesByType(DATA), getNodesByType(CLUSTER_MANAGER));
+ }
+
+ public Node clusterManagerNode() {
+ return findRunningNode(getNodesByType(CLUSTER_MANAGER));
+ }
+
+ public List getNodes() {
+ return Collections.unmodifiableList(nodes);
+ }
+
+ public Node getNodeByName(String name) {
+ return nodes.stream()
+ .filter(node -> node.getNodeName().equals(name))
+ .findAny()
+ .orElseThrow(
+ () -> new RuntimeException(
+ "No such node with name: " + name + "; available: " + nodes.stream().map(Node::getNodeName).collect(Collectors.toList())
+ )
+ );
+ }
+
+ private boolean isNodeFailedWithPortCollision() {
+ return nodes.stream().anyMatch(Node::isPortCollision);
+ }
+
+ private void retry() throws Exception {
+ retry++;
+
+ if (retry > 10) {
+ throw new RuntimeException("Detected port collisions for cluster manager node. Giving up.");
+ }
+
+ stop();
+
+ this.nodes.clear();
+ this.seedHosts = null;
+ this.initialClusterManagerHosts = null;
+ createClusterDirectory("local_cluster_" + clusterName + "_retry_" + retry);
+ start();
+ }
+
+ @SafeVarargs
+ private final Node findRunningNode(List nodes, List... moreNodes) {
+ for (Node node : nodes) {
+ if (node.isRunning()) {
+ return node;
+ }
+ }
+
+ if (moreNodes != null && moreNodes.length > 0) {
+ for (List nodesList : moreNodes) {
+ for (Node node : nodesList) {
+ if (node.isRunning()) {
+ return node;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private CompletableFuture startNodes(
+ List nodeSettingList,
+ SortedSet transportPorts,
+ SortedSet httpPorts
+ ) {
+ Iterator transportPortIterator = transportPorts.iterator();
+ Iterator httpPortIterator = httpPorts.iterator();
+ List> futures = new ArrayList<>();
+
+ for (NodeSettings nodeSettings : nodeSettingList) {
+ Node node = new Node(nodeSettings, transportPortIterator.next(), httpPortIterator.next());
+ futures.add(node.start());
+ }
+
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ }
+
+ public void waitForCluster(ClusterHealthStatus status, TimeValue timeout, int expectedNodeCount) throws IOException {
+ Client client = clientNode().getInternalNodeClient();
+
+ log.debug("waiting for cluster state {} and {} nodes", status.name(), expectedNodeCount);
+ AdminClient adminClient = client.admin();
+
+ final ClusterHealthResponse healthResponse = adminClient.cluster()
+ .prepareHealth()
+ .setWaitForStatus(status)
+ .setTimeout(timeout)
+ .setClusterManagerNodeTimeout(timeout)
+ .setWaitForNodes("" + expectedNodeCount)
+ .execute()
+ .actionGet();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Current ClusterState:\n{}", Strings.toString(XContentType.JSON, healthResponse));
+ }
+
+ if (healthResponse.isTimedOut()) {
+ throw new IOException(
+ "cluster state is " + healthResponse.getStatus().name() + " with " + healthResponse.getNumberOfNodes() + " nodes"
+ );
+ } else {
+ log.debug("... cluster state ok {} with {} nodes", healthResponse.getStatus().name(), healthResponse.getNumberOfNodes());
+ }
+
+ assertEquals(expectedNodeCount, healthResponse.getNumberOfNodes());
+
+ }
+
+ @Override
+ public String toString() {
+ String clusterManagerNodes = nodeByTypeToString(CLUSTER_MANAGER);
+ String dataNodes = nodeByTypeToString(DATA);
+ String clientNodes = nodeByTypeToString(CLIENT);
+ return "\nES Cluster "
+ + clusterName
+ + "\ncluster manager nodes: "
+ + clusterManagerNodes
+ + "\n data nodes: "
+ + dataNodes
+ + "\nclient nodes: "
+ + clientNodes
+ + "\n";
+ }
+
+ private String nodeByTypeToString(NodeType type) {
+ return getNodesByType(type).stream().map(Objects::toString).collect(Collectors.joining(", "));
+ }
+
+ private static List toHostList(Collection ports) {
+ return ports.stream().map(port -> "127.0.0.1:" + port).collect(Collectors.toList());
+ }
+
+ private String createNextNodeName(NodeSettings nodeSettings) {
+ NodeType type = nodeSettings.recognizeNodeType();
+ long nodeTypeCount = countNodesByType(type);
+ String nodeType = type.name().toLowerCase(Locale.ROOT);
+ return nodeType + "_" + nodeTypeCount;
+ }
+
+ public class Node implements OpenSearchClientProvider {
+ private final NodeType nodeType;
+ private final String nodeName;
+ private final NodeSettings nodeSettings;
+ private final File nodeHomeDir;
+ private final File dataDir;
+ private final File logsDir;
+ private final int transportPort;
+ private final int httpPort;
+ private final InetSocketAddress httpAddress;
+ private final InetSocketAddress transportAddress;
+ private PluginAwareNode node;
+ private boolean running = false;
+ private boolean portCollision = false;
+
+ Node(NodeSettings nodeSettings, int transportPort, int httpPort) {
+ this.nodeName = createNextNodeName(requireNonNull(nodeSettings, "Node settings are required."));
+ this.nodeSettings = nodeSettings;
+ this.nodeHomeDir = new File(clusterHomeDir, nodeName);
+ this.dataDir = new File(this.nodeHomeDir, "data");
+ this.logsDir = new File(this.nodeHomeDir, "logs");
+ this.transportPort = transportPort;
+ this.httpPort = httpPort;
+ InetAddress hostAddress = InetAddresses.forString("127.0.0.1");
+ this.httpAddress = new InetSocketAddress(hostAddress, httpPort);
+ this.transportAddress = new InetSocketAddress(hostAddress, transportPort);
+
+ this.nodeType = nodeSettings.recognizeNodeType();
+ nodes.add(this);
+ }
+
+ boolean hasAssignedType(NodeType type) {
+ return requireNonNull(type, "Node type is required.").equals(this.nodeType);
+ }
+
+ CompletableFuture start() {
+ CompletableFuture completableFuture = new CompletableFuture<>();
+ Class extends Plugin>[] mergedPlugins = nodeSettings.pluginsWithAddition(additionalPlugins);
+ this.node = new PluginAwareNode(nodeSettings.containRole(NodeRole.CLUSTER_MANAGER), getOpenSearchSettings(), mergedPlugins);
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ node.start();
+ running = true;
+ completableFuture.complete(StartStage.INITIALIZED);
+ } catch (BindTransportException | BindHttpException e) {
+ log.warn("Port collision detected for {}", this, e);
+ portCollision = true;
+ try {
+ node.close();
+ } catch (IOException e1) {
+ log.error(e1);
+ }
+
+ node = null;
+ TCP.reserve(transportPort, httpPort);
+
+ completableFuture.complete(StartStage.RETRY);
+
+ } catch (Throwable e) {
+ log.error("Unable to start {}", this, e);
+ node = null;
+ completableFuture.completeExceptionally(e);
+ }
+ }
+ }).start();
+
+ return completableFuture;
+ }
+
+ public Client getInternalNodeClient() {
+ return node.client();
+ }
+
+ public PluginAwareNode esNode() {
+ return node;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public X getInjectable(Class clazz) {
+ return node.injector().getInstance(clazz);
+ }
+
+ public CompletableFuture stop(long timeout, TimeUnit timeUnit) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ log.info("Stopping {}", this);
+
+ running = false;
+
+ if (node != null) {
+ node.close();
+ boolean stopped = node.awaitClose(timeout, timeUnit);
+ node = null;
+ return stopped;
+ } else {
+ return false;
+ }
+ } catch (Throwable e) {
+ String message = "Error while stopping " + this;
+ log.warn(message, e);
+ throw new RuntimeException(message, e);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ String state = running ? "RUNNING" : node != null ? "INITIALIZING" : "STOPPED";
+
+ return nodeName + " " + state + " [" + transportPort + ", " + httpPort + "]";
+ }
+
+ public boolean isPortCollision() {
+ return portCollision;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ @Override
+ public InetSocketAddress getHttpAddress() {
+ return httpAddress;
+ }
+
+ @Override
+ public InetSocketAddress getTransportAddress() {
+ return transportAddress;
+ }
+
+ private Settings getOpenSearchSettings() {
+ Settings settings = Settings.builder()
+ .put(getMinimalOpenSearchSettings())
+ .putList("path.repo", List.of(getSnapshotDirPath()))
+ .build();
+
+ if (nodeSettingsSupplier != null) {
+ // TODO node number
+ return Settings.builder().put(settings).put(nodeSettingsSupplier.get(0)).build();
+ }
+ return settings;
+ }
+
+ private Settings getMinimalOpenSearchSettings() {
+ return Settings.builder()
+ .put("node.name", nodeName)
+ .putList("node.roles", createNodeRolesSettings())
+ .put("cluster.name", clusterName)
+ .put("path.home", nodeHomeDir.toPath())
+ .put("path.data", dataDir.toPath())
+ .put("path.logs", logsDir.toPath())
+ .putList("cluster.initial_cluster_manager_nodes", initialClusterManagerHosts)
+ .put("discovery.initial_state_timeout", "8s")
+ .putList("discovery.seed_hosts", seedHosts)
+ .put("transport.tcp.port", transportPort)
+ .put("http.port", httpPort)
+ .put("cluster.routing.allocation.disk.threshold_enabled", false)
+ .put("discovery.probe.connect_timeout", "10s")
+ .put("discovery.probe.handshake_timeout", "10s")
+ .put("http.cors.enabled", true)
+ .put("gateway.auto_import_dangling_indices", "true")
+ .build();
+ }
+
+ private List createNodeRolesSettings() {
+ final ImmutableList.Builder nodeRolesBuilder = ImmutableList.builder();
+ if (nodeSettings.containRole(NodeRole.DATA)) {
+ nodeRolesBuilder.add("data");
+ }
+ if (nodeSettings.containRole(NodeRole.CLUSTER_MANAGER)) {
+ nodeRolesBuilder.add("cluster_manager");
+ }
+ if (nodeSettings.containRole(NodeRole.REMOTE_CLUSTER_CLIENT)) {
+ nodeRolesBuilder.add("remote_cluster_client");
+ }
+ return nodeRolesBuilder.build();
+ }
+
+ @Override
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ @Override
+ public TestCertificates getTestCertificates() {
+ return testCertificates;
+ }
+ }
+
+ public Random getRandom() {
+ return random;
+ }
+
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/MinimumSecuritySettingsSupplierFactory.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/MinimumSecuritySettingsSupplierFactory.java
new file mode 100644
index 0000000000..4ad5f8420e
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/MinimumSecuritySettingsSupplierFactory.java
@@ -0,0 +1,84 @@
+/*
+* Copyright 2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import org.opensearch.common.settings.Settings;
+import org.opensearch.security.support.ConfigConstants;
+import org.opensearch.test.framework.certificate.TestCertificates;
+
+public class MinimumSecuritySettingsSupplierFactory {
+
+ private final String PRIVATE_KEY_HTTP_PASSWORD = "aWVV63OJ4qzZyPrBwl2MFny4ZV8lQRZchjL";
+ private final String PRIVATE_KEY_TRANSPORT_PASSWORD = "iWbUv9w79sbd5tcxvSJNfHXS9GhcPCvdw9x";
+
+ private TestCertificates testCertificates;
+
+ public MinimumSecuritySettingsSupplierFactory(TestCertificates testCertificates) {
+ if (testCertificates == null) {
+ throw new IllegalArgumentException("certificates must not be null");
+ }
+ this.testCertificates = testCertificates;
+
+ }
+
+ public NodeSettingsSupplier minimumOpenSearchSettings(boolean sslOnly, Settings other) {
+ return i -> minimumOpenSearchSettingsBuilder(i, sslOnly).put(other).build();
+ }
+
+ private Settings.Builder minimumOpenSearchSettingsBuilder(int node, boolean sslOnly) {
+
+ Settings.Builder builder = Settings.builder();
+
+ builder.put("plugins.security.ssl.transport.pemtrustedcas_filepath", testCertificates.getRootCertificate().getAbsolutePath());
+ builder.put("plugins.security.ssl.transport.pemcert_filepath", testCertificates.getNodeCertificate(node).getAbsolutePath());
+ builder.put(
+ "plugins.security.ssl.transport.pemkey_filepath",
+ testCertificates.getNodeKey(node, PRIVATE_KEY_TRANSPORT_PASSWORD).getAbsolutePath()
+ );
+ builder.put("plugins.security.ssl.transport.pemkey_password", PRIVATE_KEY_TRANSPORT_PASSWORD);
+
+ builder.put("plugins.security.ssl.http.enabled", true);
+ builder.put("plugins.security.ssl.http.pemtrustedcas_filepath", testCertificates.getRootCertificate().getAbsolutePath());
+ builder.put("plugins.security.ssl.http.pemcert_filepath", testCertificates.getNodeCertificate(node).getAbsolutePath());
+ builder.put(
+ "plugins.security.ssl.http.pemkey_filepath",
+ testCertificates.getNodeKey(node, PRIVATE_KEY_HTTP_PASSWORD).getAbsolutePath()
+ );
+ builder.put("plugins.security.ssl.http.pemkey_password", PRIVATE_KEY_HTTP_PASSWORD);
+ if (sslOnly == false) {
+ builder.put(ConfigConstants.SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST, false);
+ builder.putList("plugins.security.authcz.admin_dn", testCertificates.getAdminDNs());
+ builder.put("plugins.security.compliance.salt", "1234567890123456");
+ builder.put("plugins.security.audit.type", "noop");
+ builder.put("plugins.security.background_init_if_securityindex_not_exist", "false");
+ }
+ return builder;
+
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeRole.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeRole.java
new file mode 100644
index 0000000000..0d465fa119
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeRole.java
@@ -0,0 +1,16 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+enum NodeRole {
+ DATA,
+ CLUSTER_MANAGER,
+ REMOTE_CLUSTER_CLIENT
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeSettingsSupplier.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeSettingsSupplier.java
new file mode 100644
index 0000000000..cab3a760ca
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeSettingsSupplier.java
@@ -0,0 +1,34 @@
+/*
+* Copyright 2015-2018 _floragunn_ GmbH
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import org.opensearch.common.settings.Settings;
+
+@FunctionalInterface
+public interface NodeSettingsSupplier {
+ Settings get(int i);
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeType.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeType.java
new file mode 100644
index 0000000000..8ae8941e8d
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/NodeType.java
@@ -0,0 +1,17 @@
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+package org.opensearch.test.framework.cluster;
+
+enum NodeType {
+ CLIENT,
+ DATA,
+ CLUSTER_MANAGER
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/OpenSearchClientProvider.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/OpenSearchClientProvider.java
new file mode 100644
index 0000000000..5e4ac59b92
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/OpenSearchClientProvider.java
@@ -0,0 +1,261 @@
+/*
+* Copyright 2020 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import static org.opensearch.test.framework.cluster.TestRestClientConfiguration.getBasicAuthHeader;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.security.support.PemKeyReader;
+import org.opensearch.test.framework.certificate.CertificateData;
+import org.opensearch.test.framework.certificate.TestCertificates;
+
+/**
+* OpenSearchClientProvider provides methods to get a REST client for an underlying cluster or node.
+*
+* This interface is implemented by both LocalCluster and LocalOpenSearchCluster.Node. Thus, it is possible to get a
+* REST client for a whole cluster (without choosing the node it is operating on) or to get a REST client for a specific
+* node.
+*/
+public interface OpenSearchClientProvider {
+
+ String getClusterName();
+
+ TestCertificates getTestCertificates();
+
+ InetSocketAddress getHttpAddress();
+
+ InetSocketAddress getTransportAddress();
+
+ default URI getHttpAddressAsURI() {
+ InetSocketAddress address = getHttpAddress();
+ return URI.create("https://" + address.getHostString() + ":" + address.getPort());
+ }
+
+ /**
+ * Returns a REST client that sends requests with basic authentication for the specified User object. Optionally,
+ * additional HTTP headers can be specified which will be sent with each request.
+ *
+ * This method should be usually preferred. The other getRestClient() methods shall be only used for specific
+ * situations.
+ */
+ default TestRestClient getRestClient(UserCredentialsHolder user, CertificateData useCertificateData, Header... headers) {
+ return getRestClient(user.getName(), user.getPassword(), useCertificateData, headers);
+ }
+
+ default TestRestClient getRestClient(UserCredentialsHolder user, Header... headers) {
+ return getRestClient(user.getName(), user.getPassword(), null, headers);
+ }
+
+ default RestHighLevelClient getRestHighLevelClient(String username, String password, Header... headers) {
+ return getRestHighLevelClient(new UserCredentialsHolder() {
+ @Override
+ public String getName() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+ }, Arrays.asList(headers));
+ }
+
+ default RestHighLevelClient getRestHighLevelClient(UserCredentialsHolder user) {
+ return getRestHighLevelClient(user, Collections.emptySet());
+ }
+
+ default RestHighLevelClient getRestHighLevelClient(UserCredentialsHolder user, Collection extends Header> defaultHeaders) {
+
+ BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(new AuthScope(null, -1), new UsernamePasswordCredentials(user.getName(), user.getPassword()));
+
+ return getRestHighLevelClient(credentialsProvider, defaultHeaders);
+ }
+
+ default RestHighLevelClient getRestHighLevelClient(Collection extends Header> defaultHeaders) {
+ return getRestHighLevelClient((BasicCredentialsProvider) null, defaultHeaders);
+ }
+
+ default RestHighLevelClient getRestHighLevelClient(
+ BasicCredentialsProvider credentialsProvider,
+ Collection extends Header> defaultHeaders
+ ) {
+ RestClientBuilder.HttpClientConfigCallback configCallback = httpClientBuilder -> {
+ Optional.ofNullable(credentialsProvider).ifPresent(httpClientBuilder::setDefaultCredentialsProvider);
+ httpClientBuilder.setSSLStrategy(
+ new SSLIOSessionStrategy(
+ getSSLContext(),
+ /* Use default supported protocols */ null,
+ /* Use default supported cipher suites */ null,
+ NoopHostnameVerifier.INSTANCE
+ )
+ );
+ httpClientBuilder.setDefaultHeaders(defaultHeaders);
+ return httpClientBuilder;
+ };
+
+ InetSocketAddress httpAddress = getHttpAddress();
+ RestClientBuilder builder = RestClient.builder(new HttpHost(httpAddress.getHostString(), httpAddress.getPort(), "https"))
+ .setHttpClientConfigCallback(configCallback);
+
+ return new RestHighLevelClient(builder);
+ }
+
+ default org.apache.http.impl.client.CloseableHttpClient getClosableHttpClient(String[] supportedCipherSuit) {
+ CloseableHttpClientFactory factory = new CloseableHttpClientFactory(getSSLContext(), null, null, supportedCipherSuit);
+ return factory.getHTTPClient();
+ }
+
+ /**
+ * Returns a REST client that sends requests with basic authentication for the specified user name and password. Optionally,
+ * additional HTTP headers can be specified which will be sent with each request.
+ *
+ * Normally, you should use the method with the User object argument instead. Use this only if you need more
+ * control over username and password - for example, when you want to send a wrong password.
+ */
+ default TestRestClient getRestClient(String user, String password, Header... headers) {
+ return createGenericClientRestClient(new TestRestClientConfiguration().username(user).password(password).headers(headers));
+ }
+
+ default TestRestClient getRestClient(String user, String password, CertificateData useCertificateData, Header... headers) {
+ Header basicAuthHeader = getBasicAuthHeader(user, password);
+ if (headers != null && headers.length > 0) {
+ List concatenatedHeaders = Stream.concat(Stream.of(basicAuthHeader), Stream.of(headers)).collect(Collectors.toList());
+ return getRestClient(concatenatedHeaders, useCertificateData);
+ }
+ return getRestClient(useCertificateData, basicAuthHeader);
+ }
+
+ /**
+ * Returns a REST client. You can specify additional HTTP headers that will be sent with each request. Use this
+ * method to test non-basic authentication, such as JWT bearer authentication.
+ */
+ default TestRestClient getRestClient(CertificateData useCertificateData, Header... headers) {
+ return getRestClient(Arrays.asList(headers), useCertificateData);
+ }
+
+ default TestRestClient getRestClient(Header... headers) {
+ return getRestClient((CertificateData) null, headers);
+ }
+
+ default TestRestClient getRestClient(List headers) {
+ return createGenericClientRestClient(new TestRestClientConfiguration().headers(headers));
+
+ }
+
+ default TestRestClient getRestClient(List headers, CertificateData useCertificateData) {
+ return createGenericClientRestClient(headers, useCertificateData, null);
+ }
+
+ default TestRestClient createGenericClientRestClient(
+ List headers,
+ CertificateData useCertificateData,
+ InetAddress sourceInetAddress
+ ) {
+ return new TestRestClient(getHttpAddress(), headers, getSSLContext(useCertificateData), sourceInetAddress);
+ }
+
+ default TestRestClient createGenericClientRestClient(TestRestClientConfiguration configuration) {
+ return new TestRestClient(getHttpAddress(), configuration.getHeaders(), getSSLContext(), configuration.getSourceInetAddress());
+ }
+
+ private SSLContext getSSLContext() {
+ return getSSLContext(null);
+ }
+
+ private SSLContext getSSLContext(CertificateData useCertificateData) {
+ X509Certificate[] trustCertificates;
+
+ try {
+ trustCertificates = PemKeyReader.loadCertificatesFromFile(getTestCertificates().getRootCertificate().getAbsolutePath());
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+
+ ks.load(null);
+
+ for (int i = 0; i < trustCertificates.length; i++) {
+ ks.setCertificateEntry("caCert-" + i, trustCertificates[i]);
+ }
+ KeyManager[] keyManagers = null;
+ if (useCertificateData != null) {
+ Certificate[] chainOfTrust = { useCertificateData.certificate() };
+ ks.setKeyEntry("admin-certificate", useCertificateData.getKey(), null, chainOfTrust);
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(ks, null);
+ keyManagers = keyManagerFactory.getKeyManagers();
+ }
+
+ tmf.init(ks);
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+
+ sslContext.init(keyManagers, tmf.getTrustManagers(), null);
+ return sslContext;
+
+ } catch (Exception e) {
+ throw new RuntimeException("Error loading root CA ", e);
+ }
+ }
+
+ public interface UserCredentialsHolder {
+ String getName();
+
+ String getPassword();
+ }
+
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/PortAllocator.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/PortAllocator.java
new file mode 100644
index 0000000000..139378fd22
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/PortAllocator.java
@@ -0,0 +1,165 @@
+/*
+* Copyright 2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.opensearch.test.framework.cluster.SocketUtils.SocketType;
+
+/**
+* Helper class that allows you to allocate ports. This helps with avoiding port conflicts when running tests.
+*
+* NOTE: This class shall be only considered as a heuristic; ports allocated by this class are just likely to be unused;
+* however, there is no guarantee that these will be unused. Thus, you still need to be prepared for port-conflicts
+* and retry the procedure in such a case. If you notice a port conflict, you can use the method reserve() to mark the
+* port as used.
+*/
+public class PortAllocator {
+
+ public static final PortAllocator TCP = new PortAllocator(SocketType.TCP, Duration.ofSeconds(100));
+ public static final PortAllocator UDP = new PortAllocator(SocketType.UDP, Duration.ofSeconds(100));
+
+ private final SocketType socketType;
+ private final Duration timeoutDuration;
+ private final Map allocatedPorts = new HashMap<>();
+
+ PortAllocator(SocketType socketType, Duration timeoutDuration) {
+ this.socketType = socketType;
+ this.timeoutDuration = timeoutDuration;
+ }
+
+ public SortedSet allocate(String clientName, int numRequested, int minPort) {
+
+ int startPort = minPort;
+
+ while (!isAvailable(startPort)) {
+ startPort += 10;
+ }
+
+ SortedSet foundPorts = new TreeSet<>();
+
+ for (int currentPort = startPort; foundPorts.size() < numRequested
+ && currentPort < SocketUtils.PORT_RANGE_MAX
+ && (currentPort - startPort) < 10000; currentPort++) {
+ if (allocate(clientName, currentPort)) {
+ foundPorts.add(currentPort);
+ }
+ }
+
+ if (foundPorts.size() < numRequested) {
+ throw new IllegalStateException("Could not find " + numRequested + " free ports starting at " + minPort + " for " + clientName);
+ }
+
+ return foundPorts;
+ }
+
+ public int allocateSingle(String clientName, int minPort) {
+
+ int startPort = minPort;
+
+ for (int currentPort = startPort; currentPort < SocketUtils.PORT_RANGE_MAX && (currentPort - startPort) < 10000; currentPort++) {
+ if (allocate(clientName, currentPort)) {
+ return currentPort;
+ }
+ }
+
+ throw new IllegalStateException("Could not find free port starting at " + minPort + " for " + clientName);
+
+ }
+
+ public void reserve(int... ports) {
+
+ for (int port : ports) {
+ allocate("reserved", port);
+ }
+ }
+
+ private boolean isInUse(int port) {
+ boolean result = !this.socketType.isPortAvailable(port);
+
+ if (result) {
+ synchronized (this) {
+ allocatedPorts.put(port, new AllocatedPort("external"));
+ }
+ }
+
+ return result;
+ }
+
+ private boolean isAvailable(int port) {
+ return !isAllocated(port) && !isInUse(port);
+ }
+
+ private synchronized boolean isAllocated(int port) {
+ AllocatedPort allocatedPort = this.allocatedPorts.get(port);
+
+ return allocatedPort != null && !allocatedPort.isTimedOut();
+ }
+
+ private synchronized boolean allocate(String clientName, int port) {
+
+ AllocatedPort allocatedPort = allocatedPorts.get(port);
+
+ if (allocatedPort != null && allocatedPort.isTimedOut()) {
+ allocatedPort = null;
+ allocatedPorts.remove(port);
+ }
+
+ if (allocatedPort == null && !isInUse(port)) {
+ allocatedPorts.put(port, new AllocatedPort(clientName));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private class AllocatedPort {
+ final String client;
+ final Instant allocatedAt;
+
+ AllocatedPort(String client) {
+ this.client = client;
+ this.allocatedAt = Instant.now();
+ }
+
+ boolean isTimedOut() {
+ return allocatedAt.plus(timeoutDuration).isBefore(Instant.now());
+ }
+
+ @Override
+ public String toString() {
+ return "AllocatedPort [client=" + client + ", allocatedAt=" + allocatedAt + "]";
+ }
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/RestClientException.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/RestClientException.java
new file mode 100644
index 0000000000..0023d65e98
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/RestClientException.java
@@ -0,0 +1,16 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+public class RestClientException extends RuntimeException {
+ RestClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/SearchRequestFactory.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/SearchRequestFactory.java
new file mode 100644
index 0000000000..b40aa9cfcb
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/SearchRequestFactory.java
@@ -0,0 +1,104 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.action.search.SearchScrollRequest;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.index.query.QueryBuilders;
+import org.opensearch.search.aggregations.AggregationBuilders;
+import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.search.sort.FieldSortBuilder;
+import org.opensearch.search.sort.SortOrder;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+public final class SearchRequestFactory {
+
+ private SearchRequestFactory() {
+
+ }
+
+ public static SearchRequest queryByIdsRequest(String indexName, String... ids) {
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(ids));
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchRequest queryStringQueryRequest(String indexName, String queryString) {
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.queryStringQuery(queryString));
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchRequest queryStringQueryRequest(String[] indicesNames, String queryString) {
+ SearchRequest searchRequest = new SearchRequest(indicesNames);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.queryStringQuery(queryString));
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchRequest queryStringQueryRequest(String queryString) {
+ SearchRequest searchRequest = new SearchRequest();
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.queryStringQuery(queryString));
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchRequest searchRequestWithScroll(String indexName, int pageSize) {
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.scroll(new TimeValue(1, MINUTES));
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.matchAllQuery());
+ searchSourceBuilder.sort(new FieldSortBuilder("_id").order(SortOrder.ASC));
+ searchSourceBuilder.size(pageSize);
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchRequest searchAll(String... indexNames) {
+ SearchRequest searchRequest = new SearchRequest(indexNames);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.matchAllQuery());
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchScrollRequest getSearchScrollRequest(SearchResponse searchResponse) {
+ SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId());
+ scrollRequest.scroll(new TimeValue(1, MINUTES));
+ return scrollRequest;
+ }
+
+ public static SearchRequest averageAggregationRequest(String indexName, String aggregationName, String fieldName) {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.aggregation(AggregationBuilders.avg(aggregationName).field(fieldName));
+ searchSourceBuilder.size(0);
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+
+ public static SearchRequest statsAggregationRequest(String indexName, String aggregationName, String fieldName) {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.aggregation(AggregationBuilders.stats(aggregationName).field(fieldName));
+ searchSourceBuilder.size(0);
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.source(searchSourceBuilder);
+ return searchRequest;
+ }
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/SocketUtils.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/SocketUtils.java
new file mode 100644
index 0000000000..5895829243
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/SocketUtils.java
@@ -0,0 +1,311 @@
+/*
+* Copyright 2002-2017 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.net.ServerSocketFactory;
+
+/**
+* Simple utility methods for working with network sockets — for example,
+* for finding available ports on {@code localhost}.
+*
+* Within this class, a TCP port refers to a port for a {@link ServerSocket};
+* whereas, a UDP port refers to a port for a {@link DatagramSocket}.
+*
+* @author Sam Brannen
+* @author Ben Hale
+* @author Arjen Poutsma
+* @author Gunnar Hillert
+* @author Gary Russell
+* @since 4.0
+*/
+public class SocketUtils {
+
+ /**
+ * The default minimum value for port ranges used when finding an available
+ * socket port.
+ */
+ public static final int PORT_RANGE_MIN = 1024;
+
+ /**
+ * The default maximum value for port ranges used when finding an available
+ * socket port.
+ */
+ public static final int PORT_RANGE_MAX = 65535;
+
+ private static final Random random = new Random(System.currentTimeMillis());
+
+ /**
+ * Although {@code SocketUtils} consists solely of static utility methods,
+ * this constructor is intentionally {@code public}.
+ *
Rationale
+ * Static methods from this class may be invoked from within XML
+ * configuration files using the Spring Expression Language (SpEL) and the
+ * following syntax.
+ *
<bean id="bean1" ... p:port="#{T(org.springframework.util.SocketUtils).findAvailableTcpPort(12000)}" />
+ * If this constructor were {@code private}, you would be required to supply
+ * the fully qualified class name to SpEL's {@code T()} function for each usage.
+ * Thus, the fact that this constructor is {@code public} allows you to reduce
+ * boilerplate configuration with SpEL as can be seen in the following example.
+ * <bean id="socketUtils" class="org.springframework.util.SocketUtils" />
+ * <bean id="bean1" ... p:port="#{socketUtils.findAvailableTcpPort(12000)}" />
+ * <bean id="bean2" ... p:port="#{socketUtils.findAvailableTcpPort(30000)}" />
+ */
+ public SocketUtils() {
+ /* no-op */
+ }
+
+ /**
+ * Find an available TCP port randomly selected from the range
+ * [{@value #PORT_RANGE_MIN}, {@value #PORT_RANGE_MAX}].
+ * @return an available TCP port number
+ * @throws IllegalStateException if no available port could be found
+ */
+ public static int findAvailableTcpPort() {
+ return findAvailableTcpPort(PORT_RANGE_MIN);
+ }
+
+ /**
+ * Find an available TCP port randomly selected from the range
+ * [{@code minPort}, {@value #PORT_RANGE_MAX}].
+ * @param minPort the minimum port number
+ * @return an available TCP port number
+ * @throws IllegalStateException if no available port could be found
+ */
+ public static int findAvailableTcpPort(int minPort) {
+ return findAvailableTcpPort(minPort, PORT_RANGE_MAX);
+ }
+
+ /**
+ * Find an available TCP port randomly selected from the range
+ * [{@code minPort}, {@code maxPort}].
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return an available TCP port number
+ * @throws IllegalStateException if no available port could be found
+ */
+ public static int findAvailableTcpPort(int minPort, int maxPort) {
+ return SocketType.TCP.findAvailablePort(minPort, maxPort);
+ }
+
+ /**
+ * Find the requested number of available TCP ports, each randomly selected
+ * from the range [{@value #PORT_RANGE_MIN}, {@value #PORT_RANGE_MAX}].
+ * @param numRequested the number of available ports to find
+ * @return a sorted set of available TCP port numbers
+ * @throws IllegalStateException if the requested number of available ports could not be found
+ */
+ public static SortedSet findAvailableTcpPorts(int numRequested) {
+ return findAvailableTcpPorts(numRequested, PORT_RANGE_MIN, PORT_RANGE_MAX);
+ }
+
+ /**
+ * Find the requested number of available TCP ports, each randomly selected
+ * from the range [{@code minPort}, {@code maxPort}].
+ * @param numRequested the number of available ports to find
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return a sorted set of available TCP port numbers
+ * @throws IllegalStateException if the requested number of available ports could not be found
+ */
+ public static SortedSet findAvailableTcpPorts(int numRequested, int minPort, int maxPort) {
+ return SocketType.TCP.findAvailablePorts(numRequested, minPort, maxPort);
+ }
+
+ /**
+ * Find an available UDP port randomly selected from the range
+ * [{@value #PORT_RANGE_MIN}, {@value #PORT_RANGE_MAX}].
+ * @return an available UDP port number
+ * @throws IllegalStateException if no available port could be found
+ */
+ public static int findAvailableUdpPort() {
+ return findAvailableUdpPort(PORT_RANGE_MIN);
+ }
+
+ /**
+ * Find an available UDP port randomly selected from the range
+ * [{@code minPort}, {@value #PORT_RANGE_MAX}].
+ * @param minPort the minimum port number
+ * @return an available UDP port number
+ * @throws IllegalStateException if no available port could be found
+ */
+ public static int findAvailableUdpPort(int minPort) {
+ return findAvailableUdpPort(minPort, PORT_RANGE_MAX);
+ }
+
+ /**
+ * Find an available UDP port randomly selected from the range
+ * [{@code minPort}, {@code maxPort}].
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return an available UDP port number
+ * @throws IllegalStateException if no available port could be found
+ */
+ public static int findAvailableUdpPort(int minPort, int maxPort) {
+ return SocketType.UDP.findAvailablePort(minPort, maxPort);
+ }
+
+ /**
+ * Find the requested number of available UDP ports, each randomly selected
+ * from the range [{@value #PORT_RANGE_MIN}, {@value #PORT_RANGE_MAX}].
+ * @param numRequested the number of available ports to find
+ * @return a sorted set of available UDP port numbers
+ * @throws IllegalStateException if the requested number of available ports could not be found
+ */
+ public static SortedSet findAvailableUdpPorts(int numRequested) {
+ return findAvailableUdpPorts(numRequested, PORT_RANGE_MIN, PORT_RANGE_MAX);
+ }
+
+ /**
+ * Find the requested number of available UDP ports, each randomly selected
+ * from the range [{@code minPort}, {@code maxPort}].
+ * @param numRequested the number of available ports to find
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return a sorted set of available UDP port numbers
+ * @throws IllegalStateException if the requested number of available ports could not be found
+ */
+ public static SortedSet findAvailableUdpPorts(int numRequested, int minPort, int maxPort) {
+ return SocketType.UDP.findAvailablePorts(numRequested, minPort, maxPort);
+ }
+
+ public enum SocketType {
+
+ TCP {
+ @Override
+ protected boolean isPortAvailable(int port) {
+ try {
+ ServerSocket serverSocket = ServerSocketFactory.getDefault()
+ .createServerSocket(port, 1, InetAddress.getByName("localhost"));
+ serverSocket.close();
+ return true;
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+ },
+
+ UDP {
+ @Override
+ protected boolean isPortAvailable(int port) {
+ try {
+ DatagramSocket socket = new DatagramSocket(port, InetAddress.getByName("localhost"));
+ socket.close();
+ return true;
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+ };
+
+ /**
+ * Determine if the specified port for this {@code SocketType} is
+ * currently available on {@code localhost}.
+ */
+ protected abstract boolean isPortAvailable(int port);
+
+ /**
+ * Find a pseudo-random port number within the range
+ * [{@code minPort}, {@code maxPort}].
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return a random port number within the specified range
+ */
+ private int findRandomPort(int minPort, int maxPort) {
+ int portRange = maxPort - minPort;
+ return minPort + random.nextInt(portRange + 1);
+ }
+
+ /**
+ * Find an available port for this {@code SocketType}, randomly selected
+ * from the range [{@code minPort}, {@code maxPort}].
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return an available port number for this socket type
+ * @throws IllegalStateException if no available port could be found
+ */
+ int findAvailablePort(int minPort, int maxPort) {
+ // Assert.assertTrue(minPort > 0, "'minPort' must be greater than 0");
+ // Assert.isTrue(maxPort >= minPort, "'maxPort' must be greater than or equal to 'minPort'");
+ // Assert.isTrue(maxPort <= PORT_RANGE_MAX, "'maxPort' must be less than or equal to " + PORT_RANGE_MAX);
+
+ int portRange = maxPort - minPort;
+ int candidatePort;
+ int searchCounter = 0;
+ do {
+ if (searchCounter > portRange) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find an available %s port in the range [%d, %d] after %d attempts",
+ name(),
+ minPort,
+ maxPort,
+ searchCounter
+ )
+ );
+ }
+ candidatePort = findRandomPort(minPort, maxPort);
+ searchCounter++;
+ } while (!isPortAvailable(candidatePort));
+
+ return candidatePort;
+ }
+
+ /**
+ * Find the requested number of available ports for this {@code SocketType},
+ * each randomly selected from the range [{@code minPort}, {@code maxPort}].
+ * @param numRequested the number of available ports to find
+ * @param minPort the minimum port number
+ * @param maxPort the maximum port number
+ * @return a sorted set of available port numbers for this socket type
+ * @throws IllegalStateException if the requested number of available ports could not be found
+ */
+ SortedSet findAvailablePorts(int numRequested, int minPort, int maxPort) {
+ SortedSet availablePorts = new TreeSet<>();
+ int attemptCount = 0;
+ while ((++attemptCount <= numRequested + 100) && availablePorts.size() < numRequested) {
+ availablePorts.add(findAvailablePort(minPort, maxPort));
+ }
+
+ if (availablePorts.size() != numRequested) {
+ throw new IllegalStateException(
+ String.format("Could not find %d available %s ports in the range [%d, %d]", numRequested, name(), minPort, maxPort)
+ );
+ }
+
+ return availablePorts;
+ }
+ }
+
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/SocketUtilsTests.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/SocketUtilsTests.java
new file mode 100644
index 0000000000..fb298c5283
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/SocketUtilsTests.java
@@ -0,0 +1,207 @@
+/*
+* Copyright 2002-2020 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* https://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.SortedSet;
+
+import javax.net.ServerSocketFactory;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThrows;
+import static org.opensearch.test.framework.cluster.SocketUtils.PORT_RANGE_MAX;
+import static org.opensearch.test.framework.cluster.SocketUtils.PORT_RANGE_MIN;
+
+/**
+* Unit tests for {@link SocketUtils}.
+*
+* @author Sam Brannen
+* @author Gary Russell
+*/
+public class SocketUtilsTests {
+
+ // TCP
+
+ @Test
+ public void findAvailableTcpPort() {
+ int port = SocketUtils.findAvailableTcpPort();
+ assertPortInRange(port, PORT_RANGE_MIN, PORT_RANGE_MAX);
+ }
+
+ @Test
+ public void findAvailableTcpPortWithMinPortEqualToMaxPort() {
+ int minMaxPort = SocketUtils.findAvailableTcpPort();
+ int port = SocketUtils.findAvailableTcpPort(minMaxPort, minMaxPort);
+ assertThat(port, equalTo(minMaxPort));
+ }
+
+ @Test
+ public void findAvailableTcpPortWhenPortOnLoopbackInterfaceIsNotAvailable() throws Exception {
+ int port = SocketUtils.findAvailableTcpPort();
+ try (ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket(port, 1, InetAddress.getByName("localhost"))) {
+ assertThat(socket, notNullValue());
+ // will only look for the exact port
+ IllegalStateException exception = assertThrows(IllegalStateException.class, () -> SocketUtils.findAvailableTcpPort(port, port));
+ assertThat(exception.getMessage(), startsWith("Could not find an available TCP port"));
+ assertThat(exception.getMessage(), endsWith("after 1 attempts"));
+ }
+ }
+
+ @Test
+ public void findAvailableTcpPortWithMin() {
+ int port = SocketUtils.findAvailableTcpPort(50000);
+ assertPortInRange(port, 50000, PORT_RANGE_MAX);
+ }
+
+ @Test
+ public void findAvailableTcpPortInRange() {
+ int minPort = 20000;
+ int maxPort = minPort + 1000;
+ int port = SocketUtils.findAvailableTcpPort(minPort, maxPort);
+ assertPortInRange(port, minPort, maxPort);
+ }
+
+ @Test
+ public void find4AvailableTcpPorts() {
+ findAvailableTcpPorts(4);
+ }
+
+ @Test
+ public void find50AvailableTcpPorts() {
+ findAvailableTcpPorts(50);
+ }
+
+ @Test
+ public void find4AvailableTcpPortsInRange() {
+ findAvailableTcpPorts(4, 30000, 35000);
+ }
+
+ @Test
+ public void find50AvailableTcpPortsInRange() {
+ findAvailableTcpPorts(50, 40000, 45000);
+ }
+
+ // UDP
+
+ @Test
+ public void findAvailableUdpPort() {
+ int port = SocketUtils.findAvailableUdpPort();
+ assertPortInRange(port, PORT_RANGE_MIN, PORT_RANGE_MAX);
+ }
+
+ @Test
+ public void findAvailableUdpPortWhenPortOnLoopbackInterfaceIsNotAvailable() throws Exception {
+ int port = SocketUtils.findAvailableUdpPort();
+ try (DatagramSocket socket = new DatagramSocket(port, InetAddress.getByName("localhost"))) {
+ assertThat(socket, notNullValue());
+ // will only look for the exact port
+ IllegalStateException exception = assertThrows(IllegalStateException.class, () -> SocketUtils.findAvailableUdpPort(port, port));
+ assertThat(exception.getMessage(), startsWith("Could not find an available UDP port"));
+ assertThat(exception.getMessage(), endsWith("after 1 attempts"));
+ }
+ }
+
+ @Test
+ public void findAvailableUdpPortWithMin() {
+ int port = SocketUtils.findAvailableUdpPort(50000);
+ assertPortInRange(port, 50000, PORT_RANGE_MAX);
+ }
+
+ @Test
+ public void findAvailableUdpPortInRange() {
+ int minPort = 20000;
+ int maxPort = minPort + 1000;
+ int port = SocketUtils.findAvailableUdpPort(minPort, maxPort);
+ assertPortInRange(port, minPort, maxPort);
+ }
+
+ @Test
+ public void find4AvailableUdpPorts() {
+ findAvailableUdpPorts(4);
+ }
+
+ @Test
+ public void find50AvailableUdpPorts() {
+ findAvailableUdpPorts(50);
+ }
+
+ @Test
+ public void find4AvailableUdpPortsInRange() {
+ findAvailableUdpPorts(4, 30000, 35000);
+ }
+
+ @Test
+ public void find50AvailableUdpPortsInRange() {
+ findAvailableUdpPorts(50, 40000, 45000);
+ }
+
+ // Helpers
+
+ private void findAvailableTcpPorts(int numRequested) {
+ SortedSet ports = SocketUtils.findAvailableTcpPorts(numRequested);
+ assertAvailablePorts(ports, numRequested, PORT_RANGE_MIN, PORT_RANGE_MAX);
+ }
+
+ private void findAvailableTcpPorts(int numRequested, int minPort, int maxPort) {
+ SortedSet ports = SocketUtils.findAvailableTcpPorts(numRequested, minPort, maxPort);
+ assertAvailablePorts(ports, numRequested, minPort, maxPort);
+ }
+
+ private void findAvailableUdpPorts(int numRequested) {
+ SortedSet ports = SocketUtils.findAvailableUdpPorts(numRequested);
+ assertAvailablePorts(ports, numRequested, PORT_RANGE_MIN, PORT_RANGE_MAX);
+ }
+
+ private void findAvailableUdpPorts(int numRequested, int minPort, int maxPort) {
+ SortedSet ports = SocketUtils.findAvailableUdpPorts(numRequested, minPort, maxPort);
+ assertAvailablePorts(ports, numRequested, minPort, maxPort);
+ }
+
+ private void assertPortInRange(int port, int minPort, int maxPort) {
+ assertThat("port [" + port + "] >= " + minPort, port, greaterThanOrEqualTo(minPort));
+ assertThat("port [" + port + "] <= " + maxPort, port, lessThanOrEqualTo(maxPort));
+ }
+
+ private void assertAvailablePorts(SortedSet ports, int numRequested, int minPort, int maxPort) {
+ assertThat("number of ports requested", ports.size(), equalTo(numRequested));
+ for (int port : ports) {
+ assertPortInRange(port, minPort, maxPort);
+ }
+ }
+
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/StartStage.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/StartStage.java
new file mode 100644
index 0000000000..d5dce0056a
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/StartStage.java
@@ -0,0 +1,15 @@
+/*
+* Copyright OpenSearch Contributors
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+*/
+package org.opensearch.test.framework.cluster;
+
+enum StartStage {
+ INITIALIZED,
+ RETRY
+}
diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/TestRestClient.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/TestRestClient.java
new file mode 100644
index 0000000000..59d2f57f4a
--- /dev/null
+++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/TestRestClient.java
@@ -0,0 +1,469 @@
+/*
+* Copyright 2021 floragunn GmbH
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+
+/*
+* SPDX-License-Identifier: Apache-2.0
+*
+* The OpenSearch Contributors require contributions made to
+* this file be licensed under the Apache-2.0 license or a
+* compatible open source license.
+*
+* Modifications Copyright OpenSearch Contributors. See
+* GitHub history for details.
+*/
+
+package org.opensearch.test.framework.cluster;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import javax.net.ssl.SSLContext;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.conn.routing.HttpRoutePlanner;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicHeader;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.core.common.Strings;
+import org.opensearch.core.xcontent.ToXContentObject;
+import org.opensearch.security.DefaultObjectMapper;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+
+/**
+* A OpenSearch REST client, which is tailored towards use in integration tests. Instances of this class can be
+* obtained via the OpenSearchClientProvider interface, which is implemented by LocalCluster and Node.
+*
+* Usually, an instance of this class sends constant authentication headers which are defined when obtaining the
+* instance from OpenSearchClientProvider.
+*/
+public class TestRestClient implements AutoCloseable {
+
+ private static final Logger log = LogManager.getLogger(TestRestClient.class);
+
+ private boolean enableHTTPClientSSL = true;
+ private boolean sendHTTPClientCertificate = false;
+ private InetSocketAddress nodeHttpAddress;
+ private RequestConfig requestConfig;
+ private List