Skip to content

Commit

Permalink
Merge pull request #10 from Hinge/pull-from-upstream
Browse files Browse the repository at this point in the history
Pull from upstream
  • Loading branch information
zrl authored Nov 3, 2023
2 parents 7754830 + 9ed18ee commit d32a2ec
Show file tree
Hide file tree
Showing 11 changed files with 425 additions and 542 deletions.
12 changes: 6 additions & 6 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ builds:
- darwin
- windows
archives:
- replacements:
darwin: Darwin
linux: Linux
windows: Windows
386: i386
amd64: x86_64
- name_template: >-
{{ .ProjectName }}_{{ .Version }}_
{{- title .Os }}_
{{- if eq .Arch "amd64" }}x86_64
{{- else if eq .Arch "386" }}i386
{{- else }}{{ .Arch }}{{ end }}
checksum:
name_template: 'checksums.txt'
snapshot:
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Describe a given topic called _mqtt.messages.incoming_

`kaf topic describe mqtt.messages.incoming`

### Group Inspection

List consumer groups

`kaf groups`
Expand All @@ -111,10 +113,20 @@ Write message into given topic from stdin

`echo test | kaf produce mqtt.messages.incoming`

### Offset Reset

Set offset for consumer group _dispatcher_ consuming from topic _mqtt.messages.incoming_ to latest for all partitions

`kaf group commit dispatcher -t mqtt.messages.incoming --offset latest --all-partitions`

Set offset to oldest

`kaf group commit dispatcher -t mqtt.messages.incoming --offset oldest --all-partitions`

Set offset to 1001 for partition 0

`kaf group commit dispatcher -t mqtt.messages.incoming --offset 1001 --partition 0`

## Configuration

See the [examples](examples) folder
Expand Down
128 changes: 107 additions & 21 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"strconv"
"sync"
"text/tabwriter"

"strconv"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/jsonpb"
prettyjson "github.com/hokaccha/go-prettyjson"
Expand All @@ -23,16 +22,18 @@ import (
)

var (
offsetFlag string
groupFlag string
groupCommitFlag bool
raw bool
follow bool
offsetFlag string
groupFlag string
groupCommitFlag bool
outputFormat = OutputFormatDefault
// Deprecated: Use outputFormat instead.
raw bool
follow bool
trimKeyHeaderBytes uint32
trimMessageHeaderBytes uint32
tail int32
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter
tail int32
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter

protoType string
keyProtoType string
Expand All @@ -52,6 +53,7 @@ func init() {
rootCmd.AddCommand(consumeCmd)
consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest, or integer.")
consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON")
consumeCmd.Flags().Var(&outputFormat, "output", "Set output format messages: default, raw (without key or prettified JSON), json")
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Continue to consume messages until program execution is interrupted/terminated")
consumeCmd.Flags().Int32VarP(&tail, "tail", "n", 0, "Print last n messages per partition")
consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files")
Expand All @@ -68,6 +70,14 @@ func init() {
consumeCmd.Flags().StringVar(&keyFilter, "key-filter", "", "jq path expression to filter on message keys.")
consumeCmd.Flags().StringVar(&valueFilter, "value-filter", "", "jq path expression to filter on message values.")

if err := consumeCmd.RegisterFlagCompletionFunc("output", completeOutputFormat); err != nil {
errorExit("Failed to register flag completion: %v", err)
}

if err := consumeCmd.Flags().MarkDeprecated("raw", "use --output raw instead"); err != nil {
errorExit("Failed to mark flag as deprecated: %v", err)
}

keyfmt = prettyjson.NewFormatter()
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
keyfmt.Indent = 0
Expand Down Expand Up @@ -107,6 +117,11 @@ var consumeCmd = &cobra.Command{
topic := args[0]
client := getClientFromConfig(cfg)

// Allow deprecated flag to override when outputFormat is not specified, or default.
if outputFormat == OutputFormatDefault && raw {
outputFormat = OutputFormatRaw
}

switch offsetFlag {
case "oldest":
offset = sarama.OffsetOldest
Expand Down Expand Up @@ -181,6 +196,8 @@ func withConsumerGroup(ctx context.Context, client sarama.Client, topic, group s
errorExit("Failed to create consumer group: %v", err)
}

schemaCache = getSchemaCache()

err = cg.Consume(ctx, []string{topic}, &g{})
if err != nil {
errorExit("Error on consume: %v", err)
Expand Down Expand Up @@ -310,16 +327,51 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
return
}

if !raw {
if isJSON(dataToDisplay) {
dataToDisplay = formatValue(dataToDisplay)
dataToDisplay = formatMessage(msg, dataToDisplay, keyToDisplay, &stderr)

mu.Lock()
stderr.WriteTo(errWriter)
_, _ = colorableOut.Write(dataToDisplay)
fmt.Fprintln(outWriter)
mu.Unlock()
}

func formatMessage(msg *sarama.ConsumerMessage, rawMessage []byte, keyToDisplay []byte, stderr *bytes.Buffer) []byte {
switch outputFormat {
case OutputFormatRaw:
return rawMessage
case OutputFormatJSON:
jsonMessage := make(map[string]interface{})

jsonMessage["partition"] = msg.Partition
jsonMessage["offset"] = msg.Offset
jsonMessage["timestamp"] = msg.Timestamp

if len(msg.Headers) > 0 {
jsonMessage["headers"] = msg.Headers
}

jsonMessage["key"] = formatJSON(keyToDisplay)
jsonMessage["payload"] = formatJSON(rawMessage)

jsonToDisplay, err := json.Marshal(jsonMessage)
if err != nil {
fmt.Fprintf(stderr, "could not decode JSON data: %v", err)
}

return jsonToDisplay
case OutputFormatDefault:
fallthrough
default:
if isJSON(rawMessage) {
rawMessage = formatValue(rawMessage)
}

if isJSON(keyToDisplay) {
keyToDisplay = formatKey(keyToDisplay)
}

w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
w := tabwriter.NewWriter(stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)

if len(msg.Headers) > 0 {
fmt.Fprintf(w, "Headers:\n")
Expand Down Expand Up @@ -348,14 +400,9 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
w.Flush()
}

mu.Lock()
stderr.WriteTo(errWriter)
_, _ = colorableOut.Write(dataToDisplay)
fmt.Fprintln(outWriter)
mu.Unlock()

return rawMessage
}
}

// proto to JSON
Expand Down Expand Up @@ -403,6 +450,15 @@ func formatValue(key []byte) []byte {
return key
}

func formatJSON(data []byte) interface{} {
var i interface{}
if err := json.Unmarshal(data, &i); err != nil {
return string(data)
}

return i
}

func isJSON(data []byte) bool {
var i interface{}
if err := json.Unmarshal(data, &i); err == nil {
Expand All @@ -428,3 +484,33 @@ func matchesFilter(data []byte, filter *gojq.Code) bool {
b, ok := v.(bool)
return ok && b
}

type OutputFormat string

const (
OutputFormatDefault OutputFormat = "default"
OutputFormatRaw OutputFormat = "raw"
OutputFormatJSON OutputFormat = "json"
)

func (e *OutputFormat) String() string {
return string(*e)
}

func (e *OutputFormat) Set(v string) error {
switch v {
case "default", "raw", "json":
*e = OutputFormat(v)
return nil
default:
return fmt.Errorf("must be one of: default, raw, json")
}
}

func (e *OutputFormat) Type() string {
return "OutputFormat"
}

func completeOutputFormat(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{"default", "raw", "json"}, cobra.ShellCompDirectiveNoFileComp
}
Loading

0 comments on commit d32a2ec

Please sign in to comment.