diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 3a37b32..5c2e594 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -12,12 +12,14 @@ import ( "strconv" "github.com/Shopify/sarama" - "github.com/birdayz/kaf/pkg/avro" - "github.com/birdayz/kaf/pkg/proto" "github.com/golang/protobuf/jsonpb" prettyjson "github.com/hokaccha/go-prettyjson" + "github.com/itchyny/gojq" "github.com/spf13/cobra" "github.com/vmihailenco/msgpack/v5" + + "github.com/birdayz/kaf/pkg/avro" + "github.com/birdayz/kaf/pkg/proto" ) var ( @@ -32,6 +34,10 @@ var ( protoType string keyProtoType string + keyFilter string + valueFilter string + keyJQCode *gojq.Code + valueJQCode *gojq.Code flagPartitions []int32 @@ -55,6 +61,8 @@ func init() { consumeCmd.Flags().Int64VarP(&limitMessagesFlag, "limit-messages", "l", 0, "Limit messages per partition") consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume") consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group") + consumeCmd.Flags().StringVar(&keyFilter, "key-filter", "", "jq path expression to filter on message keys.") + consumeCmd.Flags().StringVar(&valueFilter, "value-filter", "", "jq path expression to filter on message values.") keyfmt = prettyjson.NewFormatter() keyfmt.Newline = " " // Replace newline with space to avoid condensed output. @@ -110,6 +118,28 @@ var consumeCmd = &cobra.Command{ offset = o } + if keyFilter != "" { + keyJQuery, err := gojq.Parse(keyFilter) + if err != nil { + errorExit("Could not parse jq key filter '%s': %w", keyFilter, err) + } + keyJQCode, err = gojq.Compile(keyJQuery) + if err != nil { + errorExit("Could not compile jq key filter '%s': %w", keyFilter, err) + } + } + + if valueFilter != "" { + valueJQuery, err := gojq.Parse(valueFilter) + if err != nil { + errorExit("Could not parse jq value filter '%s': %w", valueFilter, err) + } + valueJQCode, err = gojq.Compile(valueJQuery) + if err != nil { + errorExit("Could not compile jq value filter '%s': %w", valueFilter, err) + } + } + if groupFlag != "" { withConsumerGroup(cmd.Context(), client, topic, groupFlag) } else { @@ -253,6 +283,10 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { } } + if !matchesFilter(keyToDisplay, keyJQCode) { + return + } + if decodeMsgPack { var obj interface{} err = msgpack.Unmarshal(msg.Value, &obj) @@ -266,6 +300,10 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { } } + if !matchesFilter(dataToDisplay, valueJQCode) { + return + } + if !raw { if isJSON(dataToDisplay) { dataToDisplay = formatValue(dataToDisplay) @@ -366,3 +404,21 @@ func isJSON(data []byte) bool { } return false } + +func matchesFilter(data []byte, filter *gojq.Code) bool { + if filter == nil { + return true + } + var in interface{} + err := json.Unmarshal(data, &in) + if err != nil { + return true + } + iter := filter.Run(in) + v, ok := iter.Next() + if !ok { + return false + } + b, ok := v.(bool) + return ok && b +} diff --git a/go.mod b/go.mod index e22f5ec..a6ad135 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Shopify/sarama v1.34.1 github.com/golang/protobuf v1.5.2 github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e + github.com/itchyny/gojq v0.12.8 github.com/jhump/protoreflect v1.12.0 github.com/linkedin/goavro/v2 v2.11.1 github.com/magiconair/properties v1.8.6 @@ -48,6 +49,7 @@ require ( github.com/huandu/xstrings v1.3.2 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/itchyny/timefmt-go v0.1.3 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect @@ -74,7 +76,7 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect diff --git a/go.sum b/go.sum index c597c65..a4c2381 100644 --- a/go.sum +++ b/go.sum @@ -205,6 +205,11 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/itchyny/gojq v0.12.8 h1:Zxcwq8w4IeR8JJYEtoG2MWJZUv0RGY6QqJcO1cqV8+A= +github.com/itchyny/gojq v0.12.8/go.mod h1:gE2kZ9fVRU0+JAksaTzjIlgnCa2akU+a1V0WXgJQN5c= +github.com/itchyny/timefmt-go v0.1.3 h1:7M3LGVDsqcd0VZH2U+x393obrzZisp7C0uEe921iRkU= +github.com/itchyny/timefmt-go v0.1.3/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A= +github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= @@ -260,6 +265,9 @@ github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= @@ -310,6 +318,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= @@ -514,9 +524,8 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=