Skip to content

Commit

Permalink
filter keys and values with jq
Browse files Browse the repository at this point in the history
  • Loading branch information
zrl committed Jun 14, 2022
1 parent abfa5cd commit ab6371d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
60 changes: 58 additions & 2 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"strconv"

"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/proto"
"github.com/golang/protobuf/jsonpb"
prettyjson "github.com/hokaccha/go-prettyjson"
"github.com/itchyny/gojq"
"github.com/spf13/cobra"
"github.com/vmihailenco/msgpack/v5"

"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/proto"
)

var (
Expand All @@ -32,6 +34,10 @@ var (

protoType string
keyProtoType string
keyFilter string
valueFilter string
keyJQCode *gojq.Code
valueJQCode *gojq.Code

flagPartitions []int32

Expand All @@ -55,6 +61,8 @@ func init() {
consumeCmd.Flags().Int64VarP(&limitMessagesFlag, "limit-messages", "l", 0, "Limit messages per partition")
consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume")
consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group")
consumeCmd.Flags().StringVar(&keyFilter, "key-filter", "", "jq path expression to filter on message keys.")
consumeCmd.Flags().StringVar(&valueFilter, "value-filter", "", "jq path expression to filter on message values.")

keyfmt = prettyjson.NewFormatter()
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
Expand Down Expand Up @@ -110,6 +118,28 @@ var consumeCmd = &cobra.Command{
offset = o
}

if keyFilter != "" {
keyJQuery, err := gojq.Parse(keyFilter)
if err != nil {
errorExit("Could not parse jq key filter '%s': %w", keyFilter, err)
}
keyJQCode, err = gojq.Compile(keyJQuery)
if err != nil {
errorExit("Could not compile jq key filter '%s': %w", keyFilter, err)
}
}

if valueFilter != "" {
valueJQuery, err := gojq.Parse(valueFilter)
if err != nil {
errorExit("Could not parse jq value filter '%s': %w", valueFilter, err)
}
valueJQCode, err = gojq.Compile(valueJQuery)
if err != nil {
errorExit("Could not compile jq value filter '%s': %w", valueFilter, err)
}
}

if groupFlag != "" {
withConsumerGroup(cmd.Context(), client, topic, groupFlag)
} else {
Expand Down Expand Up @@ -253,6 +283,10 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}
}

if !matchesFilter(keyToDisplay, keyJQCode) {
return
}

if decodeMsgPack {
var obj interface{}
err = msgpack.Unmarshal(msg.Value, &obj)
Expand All @@ -266,6 +300,10 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}
}

if !matchesFilter(dataToDisplay, valueJQCode) {
return
}

if !raw {
if isJSON(dataToDisplay) {
dataToDisplay = formatValue(dataToDisplay)
Expand Down Expand Up @@ -366,3 +404,21 @@ func isJSON(data []byte) bool {
}
return false
}

func matchesFilter(data []byte, filter *gojq.Code) bool {
if filter == nil {
return true
}
var in interface{}
err := json.Unmarshal(data, &in)
if err != nil {
return true
}
iter := filter.Run(in)
v, ok := iter.Next()
if !ok {
return false
}
b, ok := v.(bool)
return ok && b
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/Shopify/sarama v1.34.1
github.com/golang/protobuf v1.5.2
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e
github.com/itchyny/gojq v0.12.8
github.com/jhump/protoreflect v1.12.0
github.com/linkedin/goavro/v2 v2.11.1
github.com/magiconair/properties v1.8.6
Expand Down Expand Up @@ -48,6 +49,7 @@ require (
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/itchyny/timefmt-go v0.1.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
Expand All @@ -74,7 +76,7 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
Expand Down
15 changes: 12 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/itchyny/gojq v0.12.8 h1:Zxcwq8w4IeR8JJYEtoG2MWJZUv0RGY6QqJcO1cqV8+A=
github.com/itchyny/gojq v0.12.8/go.mod h1:gE2kZ9fVRU0+JAksaTzjIlgnCa2akU+a1V0WXgJQN5c=
github.com/itchyny/timefmt-go v0.1.3 h1:7M3LGVDsqcd0VZH2U+x393obrzZisp7C0uEe921iRkU=
github.com/itchyny/timefmt-go v0.1.3/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A=
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
Expand Down Expand Up @@ -260,6 +265,9 @@ github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
Expand Down Expand Up @@ -310,6 +318,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
Expand Down Expand Up @@ -514,9 +524,8 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down

0 comments on commit ab6371d

Please sign in to comment.