Skip to content

Commit

Permalink
refactors flag handling across commands.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Apr 4, 2016
1 parent a752c6d commit c7e50d1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 45 deletions.
65 changes: 29 additions & 36 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,40 +150,38 @@ func failStartup(msg string) {
os.Exit(1)
}

func consumeParseArgs(f *flag.FlagSet) func([]string) {
return func(args []string) {
var err error

if len(args) == 0 {
f.Usage()
}

f.Parse(args)

if config.consume.args.topic == "" {
failStartup("Topic name is required.")
}
config.consume.topic = config.consume.args.topic
func consumeParseArgs() {
var err error
if config.consume.args.topic == "" {
failStartup("Topic name is required.")
}
config.consume.topic = config.consume.args.topic

config.consume.brokers = strings.Split(config.consume.args.brokers, ",")
for i, b := range config.consume.brokers {
if !strings.Contains(b, ":") {
config.consume.brokers[i] = b + ":9092"
}
config.consume.brokers = strings.Split(config.consume.args.brokers, ",")
for i, b := range config.consume.brokers {
if !strings.Contains(b, ":") {
config.consume.brokers[i] = b + ":9092"
}
}

config.consume.offsets, err = parseOffsets(config.consume.args.offsets)
if err != nil {
failStartup(fmt.Sprintf("%s", err))
}
config.consume.offsets, err = parseOffsets(config.consume.args.offsets)
if err != nil {
failStartup(fmt.Sprintf("%s", err))
}
}

func consumeUsage(f *flag.FlagSet) func() {
return func() {
func consumeFlags() *flag.FlagSet {
flags := flag.NewFlagSet("consume", flag.ExitOnError)
flags.StringVar(&config.consume.args.topic, "topic", "", "Topic to consume (required).")
flags.StringVar(&config.consume.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
flags.StringVar(&config.consume.args.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).")
flags.DurationVar(&config.consume.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).")

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of consume:")
f.PrintDefaults()
flags.PrintDefaults()
fmt.Fprintln(os.Stderr, `
Offsets can be specified as a comma-separated list of intervals:
partition1:start-end,partition2:start-end
Expand Down Expand Up @@ -212,23 +210,18 @@ This would consume messages from three partitions:
- Messages between offsets 1 and 10 from partition 2.
- Anything from partition 6.
`)

os.Exit(2)
}

return flags
}

func consumeCommand() command {
flags := flag.NewFlagSet("consume", flag.ExitOnError)
flags.StringVar(&config.consume.args.topic, "topic", "", "Topic to consume (required).")
flags.StringVar(&config.consume.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
flags.StringVar(&config.consume.args.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).")
flags.DurationVar(&config.consume.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).")

flags.Usage = consumeUsage(flags)

return command{
flags: flags,
parseArgs: consumeParseArgs(flags),

flags: consumeFlags(),
parseArgs: consumeParseArgs,
run: func(closer chan struct{}) {

consumer, err := sarama.NewConsumer(config.consume.brokers, nil)
Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func listenForInterrupt() chan struct{} {

type command struct {
flags *flag.FlagSet
parseArgs func([]string)
parseArgs func()
run func(chan struct{})
}

Expand Down Expand Up @@ -70,7 +70,8 @@ func parseArgs() command {
usage()
}

cmd.parseArgs(os.Args[2:])
cmd.flags.Parse(os.Args[2:])
cmd.parseArgs()

return cmd
}
Expand Down
3 changes: 1 addition & 2 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ Keep reading input from stdin until interrupted (via ^C).

return command{
flags: produce,
parseArgs: func(args []string) {
produce.Parse(args)
parseArgs: func() {

failStartup := func(msg string) {
fmt.Fprintln(os.Stderr, msg)
Expand Down
9 changes: 4 additions & 5 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ type partition struct {

func topicCommand() command {
return command{
flags: topicFlags(),
parseArgs: topicParseArgs,
run: topicRun,
}
}

func init() {
func topicFlags() *flag.FlagSet {
topic := flag.NewFlagSet("topic", flag.ExitOnError)
topic.StringVar(&config.topic.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
topic.BoolVar(&config.topic.args.partitions, "partitions", false, "Include information per partition.")
Expand All @@ -62,12 +63,10 @@ func init() {
os.Exit(2)
}

config.topic.flags = topic
return topic
}

func topicParseArgs(args []string) {
config.topic.flags.Parse(args)

func topicParseArgs() {
config.topic.brokers = strings.Split(config.topic.args.brokers, ",")
for i, b := range config.topic.brokers {
if !strings.Contains(b, ":") {
Expand Down

0 comments on commit c7e50d1

Please sign in to comment.