Skip to content

Commit

Permalink
Merge pull request #11 from aws-samples/config_refactoring_and_improv…
Browse files Browse the repository at this point in the history
…ements

Config refactoring and improvements
  • Loading branch information
bdesert authored Feb 2, 2023
2 parents 52f397e + 43e9352 commit 50c6dd1
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,28 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

public class S3ImportConfigDef extends AbstractConfig{
import com.amazonaws.kafka.config.providers.common.CommonConfigUtils;

public class S3ImportConfig extends AbstractConfig{

public static final String LOCAL_DIR = "local_dir";
private static final String LOCAL_DIR_DOC =
"Local directory to store imported from S3 files. " +
"If not provided, temporary directory defined in OS will be used.)";
public static final String REGION = "region";
private static final String REGION_DOC = "Specify region if needed. Default region is the same where connector is running";

public S3ImportConfigDef(Map<?, ?> originals) {
public S3ImportConfig(Map<?, ?> originals) {
super(config(), originals);
}

private static ConfigDef config() {
return new ConfigDef()
return new ConfigDef(CommonConfigUtils.COMMON_CONFIG)
.define(
LOCAL_DIR,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.HIGH,
LOCAL_DIR_DOC
)
.define(
REGION,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.LOW,
REGION_DOC
)
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@

import org.apache.kafka.common.config.ConfigChangeCallback;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.kafka.config.providers.common.AwsServiceConfigProvider;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
Expand Down Expand Up @@ -63,25 +64,25 @@
* <code>property_name=${s3import:<REGION>:<OBJECT_KEY>}</code>
*
*/
public class S3ImportConfigProvider implements ConfigProvider {
public class S3ImportConfigProvider extends AwsServiceConfigProvider {

private final Logger log = LoggerFactory.getLogger(getClass());


private S3ImportConfigDef config;
private S3ImportConfig config;

private String localDir;
private String defaultRegion;


@Override
public void configure(Map<String, ?> configs) {
this.config = new S3ImportConfigDef(configs);
this.localDir = this.config.getString(S3ImportConfigDef.LOCAL_DIR);
this.config = new S3ImportConfig(configs);
setCommonConfig(config);

this.localDir = this.config.getString(S3ImportConfig.LOCAL_DIR);
if (this.localDir == null || this.localDir.isBlank()) {
// if not defined, use temp dir defined in OS
this.localDir = System.getProperty("java.io.tmpdir");
}
// default region from configuration. It can be null, empty or blank.
this.defaultRegion = this.config.getString(S3ImportConfigDef.REGION);
}

/**
Expand All @@ -90,27 +91,27 @@ public void configure(Map<String, ?> configs) {
* @param path the path in Parameters Store
* @return the configuration data
*/
public ConfigData get(String path) {
@Override
public ConfigData get(String path) {
return get(path, Collections.emptySet());
}


/**
* Retrieves all parameters at the given path in SSM Parameters Store with given key.
* Copies a file from S3 to a local (to a process) file system.
*
* @param path the path in Parameters Store
* @return the configuration data
* @param path (optional) a region where an S3 object is located. If null,
* a default region from config provider's configuration will be used.
* @return the configuration data with resolved variables.
*/
@Override
public ConfigData get(String path, Set<String> keys) {
Map<String, String> data = new HashMap<>();
if ( (path == null || path.isEmpty())
&& (keys== null || keys.isEmpty()) ) {
return new ConfigData(data);
}
// regionDef still can be null!
String regionDef = path != null && !path.isBlank() ? path : this.defaultRegion;

S3Client s3 = getS3Client(regionDef);
S3Client s3 = checkOrInitS3Client(path);

for (String key: keys) {
try {
Expand Down Expand Up @@ -160,16 +161,22 @@ public void subscribe(String path, Set<String> keys, ConfigChangeCallback callba
log.info("Subscription is not implemented and will be ignored");
}


@Override
public void close() throws IOException {
}

protected static S3Client getS3Client(String regionStr) {
protected S3Client checkOrInitS3Client(String regionStr) {
S3ClientBuilder s3cb = S3Client.builder();
if (regionStr != null && !regionStr.isBlank()) s3cb.region(Region.of(regionStr));
S3Client s3Client = s3cb.build();

setClientCommonConfig(s3cb);

// If region is not provided as path, then Common Config sets default region.
// No need to override.
if (regionStr != null && !regionStr.isBlank()) {
s3cb.region(Region.of(regionStr));
}

return s3Client;
return s3cb.build();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this
* software and associated documentation files (the "Software"), to deal in the Software
* without restriction, including without limitation the rights to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.amazonaws.kafka.config.providers;

import java.util.Map;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ValidString;

import com.amazonaws.kafka.config.providers.common.CommonConfigUtils;

public class SecretsManagerConfig extends AbstractConfig{

public static final String NOT_FOUND_STRATEGY = "NotFoundStrategy";
public static final String NOT_FOUND_FAIL = "fail";
public static final String NOT_FOUND_IGNORE = "ignore";

private static final String NOT_FOUND_STRATEGY_DOC =
"An action to take in case a secret cannot be found. "
+ "Possible actions are: `ignore` and `fail`. <br>"
+ "If `ignore` is selected and a secret cannot be found, the empty string will be assigned to a parameter.<br>"
+ "If `fail` is selected, the config provider will throw an exception to signal the issue.<br>"
+ "If there is a connectivity or access issue with AWS Secrets Manager service, an exception will be thrown.";

public SecretsManagerConfig(Map<?, ?> originals) {
super(config(), originals);
}

private static ConfigDef config() {
return new ConfigDef(CommonConfigUtils.COMMON_CONFIG)
.define(
NOT_FOUND_STRATEGY,
ConfigDef.Type.STRING,
NOT_FOUND_IGNORE,
ValidString.in(NOT_FOUND_FAIL, NOT_FOUND_IGNORE),
ConfigDef.Importance.LOW,
NOT_FOUND_STRATEGY_DOC
)
;
}
}
Loading

0 comments on commit 50c6dd1

Please sign in to comment.