Skip to content

Commit

Permalink
major refactoring and producing
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Dec 11, 2018
1 parent da19c6b commit 70770af
Show file tree
Hide file tree
Showing 27 changed files with 662 additions and 717 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
*.iws
*.iml
*.ipr

### Binary
kafkactl
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ command-line interface for interaction with kafka
### from source

```bash
go get -u github.com/random-dwi/kafkactl
go get -u github.com/deviceinsight/kafkactl
```

**NOTE:** make sure that `kafkactl` is on PATH otherwise auto-completion won't work.
Expand Down Expand Up @@ -48,4 +48,37 @@ source <(kafkactl completion bash)
source <(kafkactl completion zsh)
```

- `fish` is currently not supported. see: https://github.com/spf13/cobra/issues/350
- `fish` is currently not supported. see: https://github.com/spf13/cobra/issues/350

## examples

### producing messages

Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`,
`value='my-value'` to the topic `my-topic` this can be achieved with one of the following commands:

```bash
echo "my-key#my-value" | kafkactl produce my-topic --separator=#
echo "my-value" | kafkactl produce my-topic --key=my-key
kafkactl produce my-topic --key=my-key --value=my-value
```

It is also possible to specify the partition to insert the message:
```bash
kafkactl produce my-topic --key=my-key --value=my-value --partition=2
```

Additionally a different partitioning scheme can be used. when a `key` is provided the default partitioner
uses the `hash` of the `key` to assign a partition. so the same `key` will end up in the same partition:
```bash
# the following 3 messages will all be inserted to the same partition
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value
# the following 3 messages will probably be inserted to different partitions
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
```

29 changes: 9 additions & 20 deletions cmd/completion.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"github.com/deviceinsight/kafkactl/output"
"github.com/spf13/cobra"
"os"
)
Expand All @@ -31,12 +18,12 @@ __kafkactl_get_topics()
__kafkactl_get_contexts()
{
local kafkactl_contexts_output out
if kafkactl_contexts_output=$(kafkactl config get-contexts 2>/dev/null); then
if kafkactl_contexts_output=$(kafkactl config get-contexts -o compact 2>/dev/null); then
COMPREPLY=( $( compgen -W "${kafkactl_contexts_output[*]}" -- "$cur" ) )
fi
}
__custom_func() {
__kafkactl_custom_func() {
case ${last_command} in
kafkactl_consume | kafkactl_produce | kafkactl_delete_topic | kafkactl_describe_topic)
__kafkactl_get_topics
Expand All @@ -52,15 +39,17 @@ __custom_func() {
}
`

// completionCmd represents the completion command
var completionCmd = &cobra.Command{
var cmdCompletion = &cobra.Command{
Use: "completion",
Short: "generate bash completion",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenBashCompletion(os.Stdout)
err := rootCmd.GenBashCompletion(os.Stdout)
if err != nil {
output.Failf("Failed to generate bash completion: %s", err)
}
},
}

func init() {
rootCmd.AddCommand(completionCmd)
rootCmd.AddCommand(cmdCompletion)
}
23 changes: 4 additions & 19 deletions cmd/config/config.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"github.com/spf13/cobra"
)

// ConfigCmd represents the config command
var ConfigCmd = &cobra.Command{
var CmdConfig = &cobra.Command{
Use: "config",
Short: "show and edit configurations",
}

func init() {
ConfigCmd.AddCommand(currentContextCmd)
ConfigCmd.AddCommand(getContextsCmd)
ConfigCmd.AddCommand(useContextCmd)
CmdConfig.AddCommand(cmdCurrentContext)
CmdConfig.AddCommand(cmdGetContexts)
CmdConfig.AddCommand(cmdUseContext)
}
17 changes: 1 addition & 16 deletions cmd/config/currentContext.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
Expand All @@ -20,8 +6,7 @@ import (
"github.com/spf13/viper"
)

// currentContextCmd represents the currentContext command
var currentContextCmd = &cobra.Command{
var cmdCurrentContext = &cobra.Command{
Use: "current-context",
Aliases: []string{"currentContext"},
Short: "show current context",
Expand Down
45 changes: 27 additions & 18 deletions cmd/config/getContexts.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"fmt"
"github.com/deviceinsight/kafkactl/output"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// getContextsCmd represents the getContexts command
var getContextsCmd = &cobra.Command{
var outputFormat string

var cmdGetContexts = &cobra.Command{
Use: "get-contexts",
Aliases: []string{"getContexts"},
Short: "list configured contexts",
Long: `Output names of all configured contexts`,
Run: func(cmd *cobra.Command, args []string) {
contexts := viper.GetStringMap("contexts")
for name := range contexts {
fmt.Println(name)
currentContext := viper.GetString("current-context")

if outputFormat == "compact" {
for name := range contexts {
fmt.Println(name)
}
} else {
writer := output.CreateTableWriter()

writer.WriteHeader("ACTIVE", "NAME")
for context := range contexts {
if currentContext == context {
writer.Write("*", context)
} else {
writer.Write("", context)
}
}

writer.Flush()
}
},
}

func init() {
cmdGetContexts.Flags().StringVarP(&outputFormat, "output", "o", outputFormat, "Output format. One of: compact")
}
17 changes: 1 addition & 16 deletions cmd/config/useContext.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
Expand All @@ -22,8 +8,7 @@ import (
"strings"
)

// useContextCmd represents the useContext command
var useContextCmd = &cobra.Command{
var cmdUseContext = &cobra.Command{
Use: "use-context",
Aliases: []string{"useContext"},
Short: "switch active context",
Expand Down
51 changes: 9 additions & 42 deletions cmd/consume/consume.go
Original file line number Diff line number Diff line change
@@ -1,58 +1,25 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consume

import (
consumerTools "github.com/random-dwi/kafkactl/kafka/consumer"
"github.com/random-dwi/kafkactl/util"
"github.com/random-dwi/kafkactl/util/output"
"github.com/deviceinsight/kafkactl/operations"
"github.com/spf13/cobra"
)

var flags consumerTools.ConsumerFlags
var flags operations.ConsumerFlags

// ConsumeCmd represents the consume command
var ConsumeCmd = &cobra.Command{
var CmdConsume = &cobra.Command{
Use: "consume",
Short: "consume messages from a topic",
Args: cobra.MinimumNArgs(1),
Run: func(cobraCmd *cobra.Command, args []string) {

context := util.CreateClientContext()

var topic = args[0]

consumerContext := consumerTools.CreateConsumerContext(&context, topic, flags)

partitions := consumerContext.FindPartitions()
if len(partitions) == 0 {
output.Failf("Found no partitions to consume")
}

defer consumerContext.Close()

consumerContext.Consume(partitions)
(&operations.ConsumerOperation{}).Consume(args[0], flags)
},
}

func init() {

ConsumeCmd.Flags().BoolVarP(&flags.PrintKeys, "print-keys", "k", false, "print message printKeys")
ConsumeCmd.Flags().BoolVarP(&flags.PrintTimestamps, "print-timestamps", "t", false, "print message printTimestamps")
ConsumeCmd.Flags().StringVarP(&flags.ConsumerGroup, "consumer-group", "g", "", "consumer group to join")
ConsumeCmd.Flags().StringArrayVarP(&flags.Offsets, "offset", "f", flags.Offsets, "define offsets for consumer")

ConsumeCmd.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "Output format. One of: json|yaml")
CmdConsume.Flags().BoolVarP(&flags.PrintKeys, "print-keys", "k", false, "print message printKeys")
CmdConsume.Flags().BoolVarP(&flags.PrintTimestamps, "print-timestamps", "t", false, "print message printTimestamps")
CmdConsume.Flags().StringVarP(&flags.ConsumerGroup, "consumer-group", "g", "", "consumer group to join")
CmdConsume.Flags().StringArrayVarP(&flags.Offsets, "offset", "f", flags.Offsets, "define offsets for consumer")
CmdConsume.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "Output format. One of: json|yaml")
}
Loading

0 comments on commit 70770af

Please sign in to comment.