-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
consume: add Avro support #25
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
/kt | ||
/quickfix | ||
hkt |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ package main | |
import ( | ||
"bytes" | ||
"context" | ||
"encoding/base64" | ||
"encoding/hex" | ||
"encoding/json" | ||
"flag" | ||
"fmt" | ||
|
@@ -13,12 +15,14 @@ import ( | |
"time" | ||
|
||
"github.com/Shopify/sarama" | ||
"github.com/heetch/avro/avroregistry" | ||
"github.com/linkedin/goavro/v2" | ||
) | ||
|
||
type consumeCmd struct { | ||
commonFlags | ||
|
||
topic string | ||
offsets string | ||
timeout time.Duration | ||
valueCodecType string | ||
keyCodecType string | ||
|
@@ -33,6 +37,9 @@ type consumeCmd struct { | |
encodeKey func([]byte) (json.RawMessage, error) | ||
client sarama.Client | ||
consumer sarama.Consumer | ||
|
||
registryURL string | ||
registry *avroregistry.Registry | ||
} | ||
|
||
// consumedMessage defines the format that's used to | ||
|
@@ -48,13 +55,15 @@ type consumedMessage struct { | |
func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { | ||
cmd.commonFlags.addFlags(flags) | ||
cmd.partitioners = []string{"sarama"} | ||
|
||
flags.Var(listFlag{&cmd.partitioners}, "partitioners", "Comma-separated list of partitioners to consider when using the key flag. See below for details") | ||
flags.DurationVar(&cmd.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).") | ||
flags.StringVar(&cmd.keyStr, "key", "", "Print only messages with this key. Note: this relies on the producer using one of the partitioning algorithms specified with the -partitioners argument") | ||
flags.BoolVar(&cmd.pretty, "pretty", true, "Control output pretty printing.") | ||
flags.BoolVar(&cmd.follow, "f", false, "Follow topic by waiting new messages (default is to stop at end of topic)") | ||
flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64), defaults to json.") | ||
flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64|avro), defaults to json.") | ||
flags.StringVar(&cmd.keyCodecType, "keycodec", "string", "Present message key as (string|hex|base64), defaults to string.") | ||
flags.StringVar(&cmd.registryURL, "registry", "", "The Avro schema registry server URL.") | ||
|
||
flags.Usage = func() { | ||
fmt.Fprintln(os.Stderr, "Usage: hkt consume [flags] TOPIC [OFFSETS]") | ||
|
@@ -65,7 +74,8 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { | |
|
||
func (cmd *consumeCmd) environFlags() map[string]string { | ||
return map[string]string{ | ||
"brokers": "KT_BROKERS", | ||
"brokers": "KT_BROKERS", | ||
"registry": "KT_REGISTRY", | ||
} | ||
} | ||
|
||
|
@@ -88,18 +98,28 @@ func (cmd *consumeCmd) run(args []string) error { | |
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags) | ||
} | ||
var err error | ||
cmd.encodeValue, err = encoderForType(cmd.valueCodecType) | ||
if cmd.valueCodecType == "avro" { | ||
if cmd.registryURL == "" { | ||
return fmt.Errorf("-registry or $KT_REGISTRY is required for avro codec type") | ||
} | ||
cmd.registry, err = avroregistry.New(avroregistry.Params{ServerURL: cmd.registryURL}) | ||
if err != nil { | ||
return fmt.Errorf("cannot make Avro registry client: %v", err) | ||
} | ||
} | ||
cmd.encodeValue, err = cmd.encoderForType(cmd.valueCodecType) | ||
if err != nil { | ||
return fmt.Errorf("bad -valuecodec argument: %v", err) | ||
} | ||
if cmd.keyCodecType == "json" { | ||
// JSON for keys is not a good idea. | ||
return fmt.Errorf("JSON key codec not supported") | ||
} | ||
cmd.encodeKey, err = encoderForType(cmd.keyCodecType) | ||
cmd.encodeKey, err = cmd.encoderForType(cmd.keyCodecType) | ||
if err != nil { | ||
return fmt.Errorf("bad -keycodec argument: %v", err) | ||
} | ||
|
||
offsets, err := parseOffsets(offsetsStr, time.Now()) | ||
if err != nil { | ||
return err | ||
|
@@ -108,16 +128,14 @@ func (cmd *consumeCmd) run(args []string) error { | |
if err != nil { | ||
return fmt.Errorf("bad -partitioners argument: %v", err) | ||
} | ||
c, err := cmd.newClient() | ||
cmd.client, err = cmd.newClient() | ||
if err != nil { | ||
return err | ||
} | ||
cmd.client = c | ||
consumer, err := sarama.NewConsumerFromClient(cmd.client) | ||
cmd.consumer, err = sarama.NewConsumerFromClient(cmd.client) | ||
if err != nil { | ||
return fmt.Errorf("cannot create kafka consumer: %v", err) | ||
} | ||
cmd.consumer = consumer | ||
defer logClose("consumer", cmd.consumer) | ||
cmd.allPartitions, err = cmd.consumer.Partitions(cmd.topic) | ||
if err != nil { | ||
|
@@ -145,10 +163,7 @@ func (cmd *consumeCmd) run(args []string) error { | |
if err != nil { | ||
return fmt.Errorf("cannot resolve offsets: %v", err) | ||
} | ||
if err := cmd.consume(resolvedOffsets, limits); err != nil { | ||
return err | ||
} | ||
return nil | ||
return cmd.consume(resolvedOffsets, limits) | ||
} | ||
|
||
func (cmd *consumeCmd) newClient() (sarama.Client, error) { | ||
|
@@ -291,6 +306,68 @@ func (cmd *consumeCmd) newConsumedMessage(m *sarama.ConsumerMessage) (consumedMe | |
return result, nil | ||
} | ||
|
||
func (cmd *consumeCmd) encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) { | ||
var enc func([]byte) string | ||
switch typ { | ||
case "json": | ||
return func(data []byte) (json.RawMessage, error) { | ||
if err := json.Unmarshal(data, new(json.RawMessage)); err != nil { | ||
return nil, fmt.Errorf("invalid JSON value %q: %v", data, err) | ||
} | ||
return json.RawMessage(data), nil | ||
}, nil | ||
case "hex": | ||
enc = hex.EncodeToString | ||
case "base64": | ||
enc = base64.StdEncoding.EncodeToString | ||
case "string": | ||
enc = func(data []byte) string { | ||
return string(data) | ||
} | ||
case "avro": | ||
return cmd.encodeAvro, nil | ||
default: | ||
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex, base64 and avro are supported`, typ) | ||
} | ||
return func(data []byte) (json.RawMessage, error) { | ||
if data == nil { | ||
return nullJSON, nil | ||
} | ||
data1, err := json.Marshal(enc(data)) | ||
if err != nil { | ||
// marshaling a string cannot fail but be defensive. | ||
return nil, err | ||
} | ||
return json.RawMessage(data1), nil | ||
}, nil | ||
} | ||
|
||
func (cmd *consumeCmd) encodeAvro(data []byte) (json.RawMessage, error) { | ||
dec := cmd.registry.Decoder() | ||
id, body := dec.DecodeSchemaID(data) | ||
if body == nil { | ||
return nil, fmt.Errorf("cannot decode schema id") | ||
} | ||
// TODO: cache the schema | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO's are great, but I like them to be converted into issues, so that they can be scheduled and not forgotten when we don't regularly visit the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Done (#26). |
||
schema, err := dec.SchemaForID(context.Background(), id) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot get schema for id %d: %v", id, err) | ||
} | ||
codec, err := goavro.NewCodec(schema.String()) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot create codec from schema %s", schema.String()) | ||
} | ||
native, _, err := codec.NativeFromBinary(body) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot convert native from binary: %v", err) | ||
} | ||
textual, err := codec.TextualFromNative(nil, native) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot convert textual from native: %v", err) | ||
} | ||
return json.RawMessage(textual), nil | ||
} | ||
|
||
// mergeConsumers merges all the given channels in timestamp order | ||
// until all existing messages have been received; it then produces | ||
// messages as soon as they're received. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point I'd probably consider pushing the generation of this function (and the one like it below) out to named functions of their own, just because the switch statement is getting a bit long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. I will do it in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to "simplify" the switch using a map with dedicated method per type: 0e4a242 If you think it's okay, I can propose the PR (it's based on this branch).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I don't think it's too unwieldy currently - not sure that splitting it up will help readability.