Skip to content

Commit

Permalink
topic: Optionally includes leader information per partition.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Mar 2, 2016
1 parent a2d898d commit 16d0e7a
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ type topicConfig struct {
brokers []string
filter *regexp.Regexp
partitions bool
leaders bool
args struct {
brokers string
filter string
partitions bool
leaders bool
}
}

Expand All @@ -29,9 +31,10 @@ type topic struct {
}

type partition struct {
Id int32 `json:"id"`
OldestOffset int64 `json:"oldestOffset"`
NewestOffset int64 `json:"newestOffset"`
Id int32 `json:"id"`
OldestOffset int64 `json:"oldestOffset"`
NewestOffset int64 `json:"newestOffset"`
Leader string `json:"leader,omitempty"`
}

func topicCommand() command {
Expand All @@ -44,7 +47,8 @@ func topicCommand() command {
func init() {
topic := flag.NewFlagSet("topic", flag.ExitOnError)
topic.StringVar(&config.topic.args.brokers, "brokers", "localhost:9092", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
topic.BoolVar(&config.topic.args.partitions, "partitions", false, "Include detailed partition information.")
topic.BoolVar(&config.topic.args.partitions, "partitions", false, "Include information per partition.")
topic.BoolVar(&config.topic.args.leaders, "leaders", false, "Include leader information per partition.")
topic.StringVar(&config.topic.args.filter, "filter", "", "Regex to filter topics by name.")

topic.Usage = func() {
Expand Down Expand Up @@ -74,6 +78,7 @@ func topicParseArgs(args []string) {

config.topic.filter = re
config.topic.partitions = config.topic.args.partitions
config.topic.leaders = config.topic.args.leaders
}

func topicRun(closer chan struct{}) {
Expand Down Expand Up @@ -124,17 +129,29 @@ func readTopic(client sarama.Client, name string) (topic, error) {
}

for _, p := range ps {
np := partition{Id: p}

oldest, err := client.GetOffset(name, p, sarama.OffsetOldest)
if err != nil {
return t, err
}
np.OldestOffset = oldest

newest, err := client.GetOffset(name, p, sarama.OffsetNewest)
if err != nil {
return t, err
}
np.NewestOffset = newest

if config.topic.leaders {
b, err := client.Leader(name, p)
if err != nil {
return t, err
}
np.Leader = b.Addr()
}

t.Partitions = append(t.Partitions, partition{Id: p, OldestOffset: oldest, NewestOffset: newest})
t.Partitions = append(t.Partitions, np)
}
}

Expand Down

0 comments on commit 16d0e7a

Please sign in to comment.