Skip to content

Commit

Permalink
Merge pull request #25 from heetch/consume-avro-support
Browse files Browse the repository at this point in the history
consume: add Avro support
  • Loading branch information
Gilles Fabio authored Mar 31, 2020
2 parents 3c1c580 + 73dc2ee commit 71a86be
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 116 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/kt
/quickfix
hkt
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ $ hkt topic -filter news -partitions
<details><summary>Produce messages</summary>

```sh
$ echo 'Alice wins Oscar' | kt produce -topic actor-news -literal
$ echo 'Alice wins Oscar' | hkt produce -topic actor-news -literal
{
"count": 1,
"partition": 0,
"startOffset": 0
}
$ echo 'Bob wins Oscar' | kt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal
$ echo 'Bob wins Oscar' | hkt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal
{
"count": 1,
"partition": 0,
"startOffset": 0
}
$ for i in {6..9} ; do echo Bourne sequel $i in production. | kt produce -topic actor-news -literal ;done
$ for i in {6..9} ; do echo Bourne sequel $i in production. | hkt produce -topic actor-news -literal ;done
{
"count": 1,
"partition": 0,
Expand Down
34 changes: 0 additions & 34 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,40 +290,6 @@ func decoderForType(typ string) (func(m json.RawMessage) ([]byte, error), error)

var nullJSON = json.RawMessage("null")

func encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) {
var enc func([]byte) string
switch typ {
case "json":
return func(data []byte) (json.RawMessage, error) {
if err := json.Unmarshal(data, new(json.RawMessage)); err != nil {
return nil, fmt.Errorf("invalid JSON value %q: %v", data, err)
}
return json.RawMessage(data), nil
}, nil
case "hex":
enc = hex.EncodeToString
case "base64":
enc = base64.StdEncoding.EncodeToString
case "string":
enc = func(data []byte) string {
return string(data)
}
default:
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex and base64 are supported`, typ)
}
return func(data []byte) (json.RawMessage, error) {
if data == nil {
return nullJSON, nil
}
data1, err := json.Marshal(enc(data))
if err != nil {
// marshaling a string cannot fail but be defensive.
return nil, err
}
return json.RawMessage(data1), nil
}, nil
}

func min(x, y int64) int64 {
if x < y {
return x
Expand Down
103 changes: 90 additions & 13 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
Expand All @@ -13,12 +15,14 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/heetch/avro/avroregistry"
"github.com/linkedin/goavro/v2"
)

type consumeCmd struct {
commonFlags

topic string
offsets string
timeout time.Duration
valueCodecType string
keyCodecType string
Expand All @@ -33,6 +37,9 @@ type consumeCmd struct {
encodeKey func([]byte) (json.RawMessage, error)
client sarama.Client
consumer sarama.Consumer

registryURL string
registry *avroregistry.Registry
}

// consumedMessage defines the format that's used to
Expand All @@ -48,13 +55,15 @@ type consumedMessage struct {
func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) {
cmd.commonFlags.addFlags(flags)
cmd.partitioners = []string{"sarama"}

flags.Var(listFlag{&cmd.partitioners}, "partitioners", "Comma-separated list of partitioners to consider when using the key flag. See below for details")
flags.DurationVar(&cmd.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).")
flags.StringVar(&cmd.keyStr, "key", "", "Print only messages with this key. Note: this relies on the producer using one of the partitioning algorithms specified with the -partitioners argument")
flags.BoolVar(&cmd.pretty, "pretty", true, "Control output pretty printing.")
flags.BoolVar(&cmd.follow, "f", false, "Follow topic by waiting new messages (default is to stop at end of topic)")
flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64), defaults to json.")
flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64|avro), defaults to json.")
flags.StringVar(&cmd.keyCodecType, "keycodec", "string", "Present message key as (string|hex|base64), defaults to string.")
flags.StringVar(&cmd.registryURL, "registry", "", "The Avro schema registry server URL.")

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage: hkt consume [flags] TOPIC [OFFSETS]")
Expand All @@ -65,7 +74,8 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) {

func (cmd *consumeCmd) environFlags() map[string]string {
return map[string]string{
"brokers": "KT_BROKERS",
"brokers": "KT_BROKERS",
"registry": "KT_REGISTRY",
}
}

Expand All @@ -88,18 +98,28 @@ func (cmd *consumeCmd) run(args []string) error {
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
var err error
cmd.encodeValue, err = encoderForType(cmd.valueCodecType)
if cmd.valueCodecType == "avro" {
if cmd.registryURL == "" {
return fmt.Errorf("-registry or $KT_REGISTRY is required for avro codec type")
}
cmd.registry, err = avroregistry.New(avroregistry.Params{ServerURL: cmd.registryURL})
if err != nil {
return fmt.Errorf("cannot make Avro registry client: %v", err)
}
}
cmd.encodeValue, err = cmd.encoderForType(cmd.valueCodecType)
if err != nil {
return fmt.Errorf("bad -valuecodec argument: %v", err)
}
if cmd.keyCodecType == "json" {
// JSON for keys is not a good idea.
return fmt.Errorf("JSON key codec not supported")
}
cmd.encodeKey, err = encoderForType(cmd.keyCodecType)
cmd.encodeKey, err = cmd.encoderForType(cmd.keyCodecType)
if err != nil {
return fmt.Errorf("bad -keycodec argument: %v", err)
}

offsets, err := parseOffsets(offsetsStr, time.Now())
if err != nil {
return err
Expand All @@ -108,16 +128,14 @@ func (cmd *consumeCmd) run(args []string) error {
if err != nil {
return fmt.Errorf("bad -partitioners argument: %v", err)
}
c, err := cmd.newClient()
cmd.client, err = cmd.newClient()
if err != nil {
return err
}
cmd.client = c
consumer, err := sarama.NewConsumerFromClient(cmd.client)
cmd.consumer, err = sarama.NewConsumerFromClient(cmd.client)
if err != nil {
return fmt.Errorf("cannot create kafka consumer: %v", err)
}
cmd.consumer = consumer
defer logClose("consumer", cmd.consumer)
cmd.allPartitions, err = cmd.consumer.Partitions(cmd.topic)
if err != nil {
Expand Down Expand Up @@ -145,10 +163,7 @@ func (cmd *consumeCmd) run(args []string) error {
if err != nil {
return fmt.Errorf("cannot resolve offsets: %v", err)
}
if err := cmd.consume(resolvedOffsets, limits); err != nil {
return err
}
return nil
return cmd.consume(resolvedOffsets, limits)
}

func (cmd *consumeCmd) newClient() (sarama.Client, error) {
Expand Down Expand Up @@ -291,6 +306,68 @@ func (cmd *consumeCmd) newConsumedMessage(m *sarama.ConsumerMessage) (consumedMe
return result, nil
}

func (cmd *consumeCmd) encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) {
var enc func([]byte) string
switch typ {
case "json":
return func(data []byte) (json.RawMessage, error) {
if err := json.Unmarshal(data, new(json.RawMessage)); err != nil {
return nil, fmt.Errorf("invalid JSON value %q: %v", data, err)
}
return json.RawMessage(data), nil
}, nil
case "hex":
enc = hex.EncodeToString
case "base64":
enc = base64.StdEncoding.EncodeToString
case "string":
enc = func(data []byte) string {
return string(data)
}
case "avro":
return cmd.encodeAvro, nil
default:
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex, base64 and avro are supported`, typ)
}
return func(data []byte) (json.RawMessage, error) {
if data == nil {
return nullJSON, nil
}
data1, err := json.Marshal(enc(data))
if err != nil {
// marshaling a string cannot fail but be defensive.
return nil, err
}
return json.RawMessage(data1), nil
}, nil
}

func (cmd *consumeCmd) encodeAvro(data []byte) (json.RawMessage, error) {
dec := cmd.registry.Decoder()
id, body := dec.DecodeSchemaID(data)
if body == nil {
return nil, fmt.Errorf("cannot decode schema id")
}
// TODO: cache the schema
schema, err := dec.SchemaForID(context.Background(), id)
if err != nil {
return nil, fmt.Errorf("cannot get schema for id %d: %v", id, err)
}
codec, err := goavro.NewCodec(schema.String())
if err != nil {
return nil, fmt.Errorf("cannot create codec from schema %s", schema.String())
}
native, _, err := codec.NativeFromBinary(body)
if err != nil {
return nil, fmt.Errorf("cannot convert native from binary: %v", err)
}
textual, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, fmt.Errorf("cannot convert textual from native: %v", err)
}
return json.RawMessage(textual), nil
}

// mergeConsumers merges all the given channels in timestamp order
// until all existing messages have been received; it then produces
// messages as soon as they're received.
Expand Down
Loading

0 comments on commit 71a86be

Please sign in to comment.