Skip to content

Commit

Permalink
defaults -topic & -brokers to env vars KT_TOPIC & KT_BROKERS (fix #14).
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Apr 4, 2016
1 parent c7e50d1 commit 658df27
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 28 deletions.
19 changes: 17 additions & 2 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,24 @@ func failStartup(msg string) {

func consumeParseArgs() {
var err error
envTopic := os.Getenv("KT_TOPIC")
if config.consume.args.topic == "" {
failStartup("Topic name is required.")
if envTopic == "" {
failStartup("Topic name is required.")
} else {
config.consume.args.topic = envTopic
}
}
config.consume.topic = config.consume.args.topic

envBrokers := os.Getenv("KT_BROKERS")
if config.consume.args.brokers == "" {
if envBrokers != "" {
config.consume.args.brokers = envBrokers
} else {
config.consume.args.brokers = "localhost:9092"
}
}
config.consume.brokers = strings.Split(config.consume.args.brokers, ",")
for i, b := range config.consume.brokers {
if !strings.Contains(b, ":") {
Expand All @@ -173,14 +186,16 @@ func consumeParseArgs() {
func consumeFlags() *flag.FlagSet {
flags := flag.NewFlagSet("consume", flag.ExitOnError)
flags.StringVar(&config.consume.args.topic, "topic", "", "Topic to consume (required).")
flags.StringVar(&config.consume.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
flags.StringVar(&config.consume.args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).")
flags.StringVar(&config.consume.args.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).")
flags.DurationVar(&config.consume.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).")

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of consume:")
flags.PrintDefaults()
fmt.Fprintln(os.Stderr, `
The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively.
The values supplied on the command line win over environment variable values.
Offsets can be specified as a comma-separated list of intervals:
Expand Down
70 changes: 70 additions & 0 deletions consume_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"os"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -295,3 +296,72 @@ func (c tConsumer) ConsumePartition(topic string, partition int32, offset int64)
func (c tConsumer) Close() error {
return c.closeErr
}

func TestConsumeParseArgs(t *testing.T) {
configBefore := config
defer func() {
config = configBefore
}()

expectedTopic := "test-topic"
givenBroker := "hans:9092"
expectedBrokers := []string{givenBroker}

config.consume.args.topic = ""
config.consume.args.brokers = ""
os.Setenv("KT_TOPIC", expectedTopic)
os.Setenv("KT_BROKERS", givenBroker)

consumeParseArgs()
if config.consume.topic != expectedTopic ||
!reflect.DeepEqual(config.consume.brokers, expectedBrokers) {
t.Errorf(
"Expected topic %v and brokers %v from env vars, got topic %v and brokers %v.",
expectedTopic,
expectedBrokers,
config.consume.topic,
config.consume.brokers,
)
return
}

// default brokers to localhost:9092
os.Setenv("KT_TOPIC", "")
os.Setenv("KT_BROKERS", "")
config.consume.args.topic = expectedTopic
config.consume.args.brokers = ""
expectedBrokers = []string{"localhost:9092"}

consumeParseArgs()
if config.consume.topic != expectedTopic ||
!reflect.DeepEqual(config.consume.brokers, expectedBrokers) {
t.Errorf(
"Expected topic %v and brokers %v from env vars, got topic %v and brokers %v.",
expectedTopic,
expectedBrokers,
config.consume.topic,
config.consume.brokers,
)
return
}

// command line arg wins
os.Setenv("KT_TOPIC", "BLUBB")
os.Setenv("KT_BROKERS", "BLABB")
config.consume.args.topic = expectedTopic
config.consume.args.brokers = givenBroker
expectedBrokers = []string{givenBroker}

consumeParseArgs()
if config.consume.topic != expectedTopic ||
!reflect.DeepEqual(config.consume.brokers, expectedBrokers) {
t.Errorf(
"Expected topic %v and brokers %v from env vars, got topic %v and brokers %v.",
expectedTopic,
expectedBrokers,
config.consume.topic,
config.consume.brokers,
)
return
}
}
71 changes: 46 additions & 25 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ type message struct {
Partition int32 `json:"partition"`
}

func produceCommand() command {
produce := flag.NewFlagSet("produce", flag.ExitOnError)
produce.StringVar(&config.produce.args.topic, "topic", "", "Topic to produce to (required).")
produce.StringVar(&config.produce.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
func produceFlags() *flag.FlagSet {
flags := flag.NewFlagSet("produce", flag.ExitOnError)
flags.StringVar(&config.produce.args.topic, "topic", "", "Topic to produce to (required).")
flags.StringVar(&config.produce.args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).")

produce.Usage = func() {
flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of produce:")
produce.PrintDefaults()
flags.PrintDefaults()

fmt.Fprintln(os.Stderr, `
The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively.
The values supplied on the command line win over environment variable values.
Input is read from stdin and separated by newlines.
To specify the key, value and partition individually pass it as a JSON object
Expand Down Expand Up @@ -71,29 +74,47 @@ Keep reading input from stdin until interrupted (via ^C).
os.Exit(2)
}

return command{
flags: produce,
parseArgs: func() {
return flags
}

failStartup := func(msg string) {
fmt.Fprintln(os.Stderr, msg)
fmt.Fprintln(os.Stderr, "Use \"kt produce -help\" for more information.")
os.Exit(1)
}
func produceParseArgs() {
failStartup := func(msg string) {
fmt.Fprintln(os.Stderr, msg)
fmt.Fprintln(os.Stderr, "Use \"kt produce -help\" for more information.")
os.Exit(1)
}

if config.produce.args.topic == "" {
failStartup("Topic name is required.")
}
config.produce.topic = config.produce.args.topic
envTopic := os.Getenv("KT_TOPIC")
if config.produce.args.topic == "" {
if envTopic == "" {
failStartup("Topic name is required.")
} else {
config.produce.args.topic = envTopic
}
}
config.produce.topic = config.produce.args.topic

envBrokers := os.Getenv("KT_BROKERS")
if config.produce.args.brokers == "" {
if envBrokers != "" {
config.produce.args.brokers = envBrokers
} else {
config.produce.args.brokers = "localhost:9092"
}
}
config.produce.brokers = strings.Split(config.produce.args.brokers, ",")
for i, b := range config.produce.brokers {
if !strings.Contains(b, ":") {
config.produce.brokers[i] = b + ":9092"
}
}

config.produce.brokers = strings.Split(config.produce.args.brokers, ",")
for i, b := range config.produce.brokers {
if !strings.Contains(b, ":") {
config.produce.brokers[i] = b + ":9092"
}
}
},
}

func produceCommand() command {
return command{
flags: produceFlags(),
parseArgs: produceParseArgs,
run: func(closer chan struct{}) {

broker := sarama.NewBroker(config.produce.brokers[0])
Expand Down
76 changes: 76 additions & 0 deletions produce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"os"
"reflect"
"testing"
)

func TestProduceParseArgs(t *testing.T) {
configBefore := config
defer func() {
config = configBefore
}()

expectedTopic := "test-topic"
givenBroker := "hans:9092"
expectedBrokers := []string{givenBroker}

config.produce.args.topic = ""
config.produce.args.brokers = ""
os.Setenv("KT_TOPIC", expectedTopic)
os.Setenv("KT_BROKERS", givenBroker)

produceParseArgs()
if config.produce.topic != expectedTopic ||
!reflect.DeepEqual(config.produce.brokers, expectedBrokers) {
t.Errorf(
"Expected topic %v and brokers %v from env vars, got topic %v and brokers %v.",
expectedTopic,
expectedBrokers,
config.produce.topic,
config.produce.brokers,
)
return
}

// default brokers to localhost:9092
os.Setenv("KT_TOPIC", "")
os.Setenv("KT_BROKERS", "")
config.produce.args.topic = expectedTopic
config.produce.args.brokers = ""
expectedBrokers = []string{"localhost:9092"}

produceParseArgs()
if config.produce.topic != expectedTopic ||
!reflect.DeepEqual(config.produce.brokers, expectedBrokers) {
t.Errorf(
"Expected topic %v and brokers %v from env vars, got topic %v and brokers %v.",
expectedTopic,
expectedBrokers,
config.produce.topic,
config.produce.brokers,
)
return
}

// command line arg wins
os.Setenv("KT_TOPIC", "BLUBB")
os.Setenv("KT_BROKERS", "BLABB")
config.produce.args.topic = expectedTopic
config.produce.args.brokers = givenBroker
expectedBrokers = []string{givenBroker}

produceParseArgs()
if config.produce.topic != expectedTopic ||
!reflect.DeepEqual(config.produce.brokers, expectedBrokers) {
t.Errorf(
"Expected topic %v and brokers %v from env vars, got topic %v and brokers %v.",
expectedTopic,
expectedBrokers,
config.produce.topic,
config.produce.brokers,
)
return
}
}
14 changes: 13 additions & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func topicCommand() command {

func topicFlags() *flag.FlagSet {
topic := flag.NewFlagSet("topic", flag.ExitOnError)
topic.StringVar(&config.topic.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
topic.StringVar(&config.topic.args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
topic.BoolVar(&config.topic.args.partitions, "partitions", false, "Include information per partition.")
topic.BoolVar(&config.topic.args.leaders, "leaders", false, "Include leader information per partition.")
topic.BoolVar(&config.topic.args.replicas, "replicas", false, "Include replica ids per partition.")
Expand All @@ -60,13 +60,25 @@ func topicFlags() *flag.FlagSet {
topic.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of topic:")
topic.PrintDefaults()
fmt.Fprintln(os.Stderr, `
The values for -brokers can also be set via the environment variable KT_BROKERS respectively.
The values supplied on the command line win over environment variable values.
`)
os.Exit(2)
}

return topic
}

func topicParseArgs() {
envBrokers := os.Getenv("KT_BROKERS")
if config.topic.args.brokers == "" {
if envBrokers != "" {
config.topic.args.brokers = envBrokers
} else {
config.topic.args.brokers = "localhost:9092"
}
}
config.topic.brokers = strings.Split(config.topic.args.brokers, ",")
for i, b := range config.topic.brokers {
if !strings.Contains(b, ":") {
Expand Down
60 changes: 60 additions & 0 deletions topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"os"
"reflect"
"testing"
)

func TestTopicParseArgs(t *testing.T) {
configBefore := config
defer func() {
config = configBefore
}()

givenBroker := "hans:9092"
expectedBrokers := []string{givenBroker}

config.topic.args.brokers = ""
os.Setenv("KT_BROKERS", givenBroker)

topicParseArgs()
if !reflect.DeepEqual(config.topic.brokers, expectedBrokers) {
t.Errorf(
"Expected brokers %v from env vars, got brokers %v.",
expectedBrokers,
config.topic.brokers,
)
return
}

// default brokers to localhost:9092
os.Setenv("KT_BROKERS", "")
config.topic.args.brokers = ""
expectedBrokers = []string{"localhost:9092"}

topicParseArgs()
if !reflect.DeepEqual(config.topic.brokers, expectedBrokers) {
t.Errorf(
"Expected brokers %v from env vars, got brokers %v.",
expectedBrokers,
config.topic.brokers,
)
return
}

// command line arg wins
os.Setenv("KT_BROKERS", "BLABB")
config.topic.args.brokers = givenBroker
expectedBrokers = []string{givenBroker}

topicParseArgs()
if !reflect.DeepEqual(config.topic.brokers, expectedBrokers) {
t.Errorf(
"Expected brokers %v from env vars, got brokers %v.",
expectedBrokers,
config.topic.brokers,
)
return
}
}

0 comments on commit 658df27

Please sign in to comment.