diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 2170d44..f8102eb 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -175,7 +175,7 @@ type ( // Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled. EnableEnhancedFanOutConsumer bool - // EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. + // EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. If this isn't set the ApplicationName will be used. EnhancedFanOutConsumerName string // EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted diff --git a/clientlibrary/config/config_test.go b/clientlibrary/config/config_test.go index 576042c..c02dfab 100644 --- a/clientlibrary/config/config_test.go +++ b/clientlibrary/config/config_test.go @@ -34,7 +34,7 @@ func TestConfig(t *testing.T) { WithIdleTimeBetweenReadsInMillis(20). WithCallProcessRecordsEvenForEmptyRecordList(true). WithTaskBackoffTimeMillis(10). - WithEnhancedFanOutConsumer("fan-out-consumer") + WithEnhancedFanOutConsumerName("fan-out-consumer") assert.Equal(t, "appName", kclConfig.ApplicationName) assert.Equal(t, 500, kclConfig.FailoverTimeMillis) @@ -47,9 +47,17 @@ func TestConfig(t *testing.T) { contextLogger.Infof("Default logger is awesome") } +func TestConfigDefaultEnhancedFanOutConsumerName(t *testing.T) { + kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "us-west-2", "workerId") + + assert.Equal(t, "appName", kclConfig.ApplicationName) + assert.False(t, kclConfig.EnableEnhancedFanOutConsumer) + assert.Equal(t, "appName", kclConfig.EnhancedFanOutConsumerName) +} + func TestEmptyEnhancedFanOutConsumerName(t *testing.T) { assert.PanicsWithValue(t, "Non-empty value expected for EnhancedFanOutConsumerName, actual: ", func() { - NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumer("") + NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumerName("") }) } diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 810e4c9..91f39b7 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -73,6 +73,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio KinesisCredentials: kiniesisCreds, DynamoDBCredentials: dynamodbCreds, TableName: applicationName, + EnhancedFanOutConsumerName: applicationName, StreamName: streamName, RegionName: regionName, WorkerID: workerID, @@ -213,10 +214,18 @@ func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.M return c } -// WithEnhancedFanOutConsumer enables enhanced fan-out consumer with the specified name +// WithEnhancedFanOutConsumer sets EnableEnhancedFanOutConsumer. If enhanced fan-out is enabled and ConsumerName is not specified ApplicationName is used as ConsumerName. // For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html // Note: You can register up to twenty consumers per stream to use enhanced fan-out. -func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(consumerName string) *KinesisClientLibConfiguration { +func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(enable bool) *KinesisClientLibConfiguration { + c.EnableEnhancedFanOutConsumer = enable + return c +} + +// WithEnhancedFanOutConsumerName enables enhanced fan-out consumer with the specified name +// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html +// Note: You can register up to twenty consumers per stream to use enhanced fan-out. +func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName(consumerName string) *KinesisClientLibConfiguration { checkIsValueNotEmpty("EnhancedFanOutConsumerName", consumerName) c.EnhancedFanOutConsumerName = consumerName c.EnableEnhancedFanOutConsumer = true diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 4d4bc06..0ab4d17 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -184,19 +184,14 @@ func (w *Worker) initialize() error { if w.kclConfig.EnableEnhancedFanOutConsumer { log.Debugf("Enhanced fan-out is enabled") - switch { - case w.kclConfig.EnhancedFanOutConsumerARN != "": - w.consumerARN = w.kclConfig.EnhancedFanOutConsumerARN - case w.kclConfig.EnhancedFanOutConsumerName != "": + w.consumerARN = w.kclConfig.EnhancedFanOutConsumerARN + if w.consumerARN == "" { var err error w.consumerARN, err = w.fetchConsumerARNWithRetry() if err != nil { log.Errorf("Failed to fetch consumer ARN for: %s, %v", w.kclConfig.EnhancedFanOutConsumerName, err) return err } - default: - log.Errorf("Consumer Name or ARN were not specified with enhanced fan-out enabled") - return errors.New("Consumer Name or ARN must be specified when enhanced fan-out is enabled") } } diff --git a/test/worker_test.go b/test/worker_test.go index 2e2e784..b9f9a32 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -191,7 +191,36 @@ func TestEnhancedFanOutConsumer(t *testing.T) { kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). - WithEnhancedFanOutConsumer(consumerName). + WithEnhancedFanOutConsumerName(consumerName). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithLogger(log) + + runTest(kclConfig, false, t) +} + +func TestEnhancedFanOutConsumerDefaultConsumerName(t *testing.T) { + // At miminal, use standard logrus logger + // log := logger.NewLogrusLogger(logrus.StandardLogger()) + // + // In order to have precise control over logging. Use logger with config + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: false, + EnableFile: true, + FileLevel: logger.Info, + FileJSONFormat: true, + Filename: "log.log", + } + // Use logrus logger + log := logger.NewLogrusLoggerWithConfig(config) + + kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID). + WithInitialPositionInStream(cfg.LATEST). + WithEnhancedFanOutConsumer(true). WithMaxRecords(10). WithMaxLeasesForWorker(1). WithShardSyncIntervalMillis(5000).