From 46976185e79b3a7f1dc121f6569414c0c90269b3 Mon Sep 17 00:00:00 2001 From: Marcin Ziolo Date: Tue, 1 Oct 2019 16:17:54 +0200 Subject: [PATCH] Provide credentials via configuration --- README.md | 8 ++++++-- .../kafka/AmazonKinesisSinkConnector.java | 13 +++++++++++++ .../kinesis/kafka/AmazonKinesisSinkTask.java | 18 +++++++++++++++++- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 194c5e3..cdf26e5 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ You can build the project by running "maven package" and it will build amazon-ki 1. Make sure you create a delivery stream in AWS Console/CLI/SDK – See more details [here](http://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) and configure destination. -2. Connector uses [DefaultAWSCredentialsProviderChain](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) for authenitication. It looks for credentials in following order - environment variable, java system properties, credentials profile file at default location ( (~/.aws/credentials), credentials delievered through Amazon EC2 container service, and instance profile credentails delivered through Amazon EC2 metadata service. Make sure user has at least permission to list streams/delivery stream, describe streams/delivery stream and put records for stream/delivery stream. +2. If you don't specify aws user key nor aws secret key then connector will use [DefaultAWSCredentialsProviderChain](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) for authenitication. It looks for credentials in following order - environment variable, java system properties, credentials profile file at default location ( (~/.aws/credentials), credentials delievered through Amazon EC2 container service, and instance profile credentails delivered through Amazon EC2 metadata service. Make sure user has at least permission to list streams/delivery stream, describe streams/delivery stream and put records for stream/delivery stream. ### Running a Connector @@ -39,6 +39,8 @@ You can build the project by running "maven package" and it will build amazon-ki | connector.class | Class for Amazon Kinesis Firehose Connector | com.amazon.kinesis.kafka.FirehoseSinkConnector | | topics | Kafka topics from where you want to consume messages. It can be single topic or comma separated list of topics | - | | region| Specify region of your Kinesis Firehose | - | +| awsKey | AWS user key.| - | +| awsSecret | AWS user secret key.| - | | batch | Connector batches messages before sending to Kinesis Firehose (true/false) | true | | batchSize | Number of messages to be batched together. Firehose accepts at max 500 messages in one batch. | 500 | | batchSizeInBytes | Message size in bytes when batched together. Firehose accepts at max 4MB in one batch. | 3670016 | @@ -49,7 +51,7 @@ You can build the project by running "maven package" and it will build amazon-ki 1. Make sure you create Kinesis stream in AWS Console/CLI/SDK – See more details [here](http://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html). -2. Connector uses [DefaultAWSCredentialsProviderChain](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) for authenitication. It looks for credentials in following order - environment variable, java system properties, credentials profile file at default location ( (~/.aws/credentials), credentials delievered through Amazon EC2 container service, and instance profile credentails delivered through Amazon EC2 metadata service. Make sure user has at least permission to list streams/delivery stream, describe streams/delivery stream and put records for stream/delivery stream. +2. If you don't specify aws user key nor aws secret key then connector will use [DefaultAWSCredentialsProviderChain](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) for authenitication. It looks for credentials in following order - environment variable, java system properties, credentials profile file at default location ( (~/.aws/credentials), credentials delievered through Amazon EC2 container service, and instance profile credentails delivered through Amazon EC2 metadata service. Make sure user has at least permission to list streams/delivery stream, describe streams/delivery stream and put records for stream/delivery stream. ### Running a Connector @@ -73,6 +75,8 @@ You can build the project by running "maven package" and it will build amazon-ki | topics | Kafka topics from where you want to consume messages. It can be single topic or comma separated list of topics | - | | region| Specify region of your Kinesis Firehose | - | | streamName | Kinesis Stream Name.| - | +| awsKey | AWS user key.| - | +| awsSecret | AWS user secret key.| - | | usePartitionAsHashKey | Using Kafka partition key as hash key for Kinesis streams. | false | | maxBufferedTime | Maximum amount of time (milliseconds) a record may spend being buffered before it gets sent. Records may be sent sooner than this depending on the other buffering limits. Range: [100..... 9223372036854775807] | 15000 | | maxConnections | Maximum number of connections to open to the backend. HTTP requests are sent in parallel over multiple connections. Range: [1...256]. | 24 | diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java index 7adcdd5..462ff9e 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java @@ -14,6 +14,10 @@ public class AmazonKinesisSinkConnector extends SinkConnector { public static final String REGION = "region"; + public static final String AWS_KEY = "awsKey"; + + public static final String AWS_SECRET = "awsSecret"; + public static final String STREAM_NAME = "streamName"; public static final String MAX_BUFFERED_TIME = "maxBufferedTime"; @@ -80,6 +84,10 @@ public class AmazonKinesisSinkConnector extends SinkConnector { private String sleepCycles; + private String awsKey; + + private String awsSecret; + @Override public void start(Map props) { region = props.get(REGION); @@ -99,6 +107,8 @@ public void start(Map props) { outstandingRecordsThreshold = props.get(OUTSTANDING_RECORDS_THRESHOLD); sleepPeriod = props.get(SLEEP_PERIOD); sleepCycles = props.get(SLEEP_CYCLES); + awsKey = props.get(AWS_KEY); + awsSecret = props.get(AWS_SECRET); } @Override @@ -198,6 +208,9 @@ public List> taskConfigs(int maxTasks) { config.put(SLEEP_CYCLES, sleepCycles); else config.put(SLEEP_CYCLES, "10"); + + config.put(AWS_KEY, awsKey); + config.put(AWS_SECRET, awsSecret); configs.add(config); diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java index c49d13e..920de65 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java @@ -4,6 +4,9 @@ import java.util.HashMap; import java.util.Map; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.DataException; @@ -28,6 +31,10 @@ public class AmazonKinesisSinkTask extends SinkTask { private String regionName; + private String awsKey; + + private String awsSecret; + private int maxConnections; private int rateLimit; @@ -264,6 +271,10 @@ public void start(Map props) { sleepCycles = Integer.parseInt(props.get(AmazonKinesisSinkConnector.SLEEP_CYCLES)); + awsKey = props.get(AmazonKinesisSinkConnector.AWS_KEY); + + awsSecret = props.get(AmazonKinesisSinkConnector.AWS_SECRET); + if (!singleKinesisProducerPerPartition) kinesisProducer = getKinesisProducer(); @@ -303,7 +314,7 @@ public void stop() { private KinesisProducer getKinesisProducer() { KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion(regionName); - config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); + config.setCredentialsProvider(getCredentialProvider(awsKey, awsSecret)); config.setMaxConnections(maxConnections); config.setAggregationEnabled(aggregration); @@ -340,4 +351,9 @@ private KinesisProducer getKinesisProducer() { } + private AWSCredentialsProvider getCredentialProvider(String key, String secret) { + if(key != null && secret != null) + return new AWSStaticCredentialsProvider(new BasicAWSCredentials(key, secret)); + else return new DefaultAWSCredentialsProviderChain(); + } }