From c7e50d1dc29336fe9551f127b4c19ad2bd87db88 Mon Sep 17 00:00:00 2001 From: Felix Geller Date: Mon, 4 Apr 2016 16:25:39 +1200 Subject: [PATCH] refactors flag handling across commands. --- consume.go | 65 ++++++++++++++++++++++++------------------------------ main.go | 5 +++-- produce.go | 3 +-- topic.go | 9 ++++---- 4 files changed, 37 insertions(+), 45 deletions(-) diff --git a/consume.go b/consume.go index aa0260c..58767ad 100644 --- a/consume.go +++ b/consume.go @@ -150,40 +150,38 @@ func failStartup(msg string) { os.Exit(1) } -func consumeParseArgs(f *flag.FlagSet) func([]string) { - return func(args []string) { - var err error - - if len(args) == 0 { - f.Usage() - } - - f.Parse(args) - - if config.consume.args.topic == "" { - failStartup("Topic name is required.") - } - config.consume.topic = config.consume.args.topic +func consumeParseArgs() { + var err error + if config.consume.args.topic == "" { + failStartup("Topic name is required.") + } + config.consume.topic = config.consume.args.topic - config.consume.brokers = strings.Split(config.consume.args.brokers, ",") - for i, b := range config.consume.brokers { - if !strings.Contains(b, ":") { - config.consume.brokers[i] = b + ":9092" - } + config.consume.brokers = strings.Split(config.consume.args.brokers, ",") + for i, b := range config.consume.brokers { + if !strings.Contains(b, ":") { + config.consume.brokers[i] = b + ":9092" } + } - config.consume.offsets, err = parseOffsets(config.consume.args.offsets) - if err != nil { - failStartup(fmt.Sprintf("%s", err)) - } + config.consume.offsets, err = parseOffsets(config.consume.args.offsets) + if err != nil { + failStartup(fmt.Sprintf("%s", err)) } } -func consumeUsage(f *flag.FlagSet) func() { - return func() { +func consumeFlags() *flag.FlagSet { + flags := flag.NewFlagSet("consume", flag.ExitOnError) + flags.StringVar(&config.consume.args.topic, "topic", "", "Topic to consume (required).") + flags.StringVar(&config.consume.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.") + flags.StringVar(&config.consume.args.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).") + flags.DurationVar(&config.consume.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).") + + flags.Usage = func() { fmt.Fprintln(os.Stderr, "Usage of consume:") - f.PrintDefaults() + flags.PrintDefaults() fmt.Fprintln(os.Stderr, ` + Offsets can be specified as a comma-separated list of intervals: partition1:start-end,partition2:start-end @@ -212,23 +210,18 @@ This would consume messages from three partitions: - Messages between offsets 1 and 10 from partition 2. - Anything from partition 6. `) + os.Exit(2) } + + return flags } func consumeCommand() command { - flags := flag.NewFlagSet("consume", flag.ExitOnError) - flags.StringVar(&config.consume.args.topic, "topic", "", "Topic to consume (required).") - flags.StringVar(&config.consume.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.") - flags.StringVar(&config.consume.args.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).") - flags.DurationVar(&config.consume.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).") - - flags.Usage = consumeUsage(flags) return command{ - flags: flags, - parseArgs: consumeParseArgs(flags), - + flags: consumeFlags(), + parseArgs: consumeParseArgs, run: func(closer chan struct{}) { consumer, err := sarama.NewConsumer(config.consume.brokers, nil) diff --git a/main.go b/main.go index fadfa82..3a89eb3 100644 --- a/main.go +++ b/main.go @@ -29,7 +29,7 @@ func listenForInterrupt() chan struct{} { type command struct { flags *flag.FlagSet - parseArgs func([]string) + parseArgs func() run func(chan struct{}) } @@ -70,7 +70,8 @@ func parseArgs() command { usage() } - cmd.parseArgs(os.Args[2:]) + cmd.flags.Parse(os.Args[2:]) + cmd.parseArgs() return cmd } diff --git a/produce.go b/produce.go index e73781e..861ed19 100644 --- a/produce.go +++ b/produce.go @@ -73,8 +73,7 @@ Keep reading input from stdin until interrupted (via ^C). return command{ flags: produce, - parseArgs: func(args []string) { - produce.Parse(args) + parseArgs: func() { failStartup := func(msg string) { fmt.Fprintln(os.Stderr, msg) diff --git a/topic.go b/topic.go index 2cf8767..e5538ef 100644 --- a/topic.go +++ b/topic.go @@ -43,12 +43,13 @@ type partition struct { func topicCommand() command { return command{ + flags: topicFlags(), parseArgs: topicParseArgs, run: topicRun, } } -func init() { +func topicFlags() *flag.FlagSet { topic := flag.NewFlagSet("topic", flag.ExitOnError) topic.StringVar(&config.topic.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.") topic.BoolVar(&config.topic.args.partitions, "partitions", false, "Include information per partition.") @@ -62,12 +63,10 @@ func init() { os.Exit(2) } - config.topic.flags = topic + return topic } -func topicParseArgs(args []string) { - config.topic.flags.Parse(args) - +func topicParseArgs() { config.topic.brokers = strings.Split(config.topic.args.brokers, ",") for i, b := range config.topic.brokers { if !strings.Contains(b, ":") {