From d7470c5785c9d1f44ce67af98672541474938bbf Mon Sep 17 00:00:00 2001 From: Felix Geller Date: Sun, 10 Apr 2016 13:53:26 +1200 Subject: [PATCH] produce: adds batching (fix #18). - produce now waits for either batch size or timeout limit to be hit before sending out a message. - produce now prints JSON object with result info in case of success. --- produce.go | 170 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 140 insertions(+), 30 deletions(-) diff --git a/produce.go b/produce.go index a229d5c..310614e 100644 --- a/produce.go +++ b/produce.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/Shopify/sarama" ) @@ -14,9 +15,15 @@ import ( type produceConfig struct { topic string brokers []string + batch int + timeout time.Duration + verbose bool args struct { topic string brokers string + batch int + timeout time.Duration + verbose bool } } @@ -28,8 +35,36 @@ type message struct { func produceFlags() *flag.FlagSet { flags := flag.NewFlagSet("produce", flag.ExitOnError) - flags.StringVar(&config.produce.args.topic, "topic", "", "Topic to produce to (required).") - flags.StringVar(&config.produce.args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).") + flags.StringVar( + &config.produce.args.topic, + "topic", + "", + "Topic to produce to (required).", + ) + flags.StringVar( + &config.produce.args.brokers, + "brokers", + "", + "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).", + ) + flags.IntVar( + &config.produce.args.batch, + "batch", + 1, + "Max size of a batch before sending it off", + ) + flags.DurationVar( + &config.produce.args.timeout, + "timeout", + 50*time.Millisecond, + "Duration to wait for batch to be filled before sending it off", + ) + flags.BoolVar( + &config.produce.args.verbose, + "v", + false, + "Verbose output", + ) flags.Usage = func() { fmt.Fprintln(os.Stderr, "Usage of produce:") @@ -109,6 +144,8 @@ func produceParseArgs() { } } + config.produce.batch = config.produce.args.batch + config.produce.timeout = config.produce.args.timeout } func produceCommand() command { @@ -137,54 +174,127 @@ func produceCommand() command { }() stdinLines := make(chan string) - go readStdinLines(stdinLines, closer) + messages := make(chan message) + batchedMessages := make(chan []message) + go readStdinLines(closer, stdinLines) + go batchRecords(closer, messages, batchedMessages) + go produce(closer, broker, batchedMessages) for { select { - case <-closer: - return + case _, ok := <-closer: + if !ok { + return + } case l := <-stdinLines: var in message err := json.Unmarshal([]byte(l), &in) if err != nil { + if config.produce.verbose { + fmt.Printf("Failed to unmarshal input, falling back to defaults. err=%v\n", err) + } in = message{Key: l, Value: l, Partition: 0} } + messages <- in + } + } + }, + } +} - req := &sarama.ProduceRequest{ - RequiredAcks: sarama.WaitForAll, - Timeout: 1000, - } - msg := sarama.Message{ - Codec: sarama.CompressionNone, - Key: []byte(in.Key), - Value: []byte(in.Value), - Set: nil, - } - req.AddMessage(config.produce.topic, in.Partition, &msg) +func batchRecords(closer chan struct{}, in chan message, out chan []message) { + messages := []message{} + send := func() { + out <- messages + messages = []message{} + } - resp, err := broker.Produce(req) - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", err) - return - } + for { + select { + case _, ok := <-closer: + if !ok { + fmt.Fprintf(os.Stderr, "Stopping, discarding %v messages.\n", len(messages)) + return + } + case m := <-in: + messages = append(messages, m) + if len(messages) > 0 && len(messages) >= config.produce.batch { + send() + } + case <-time.After(config.produce.timeout): + if len(messages) > 0 { + send() + } + } + } +} - block := resp.GetBlock(config.produce.topic, in.Partition) - if block.Err != sarama.ErrNoError { - fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", block.Err.Error()) - return - } - fmt.Fprintf(os.Stderr, "Sent to partition %d at offset %d with key %s.\n", in.Partition, block.Offset, in.Key) +type partitionProduceResult struct { + start int64 + count int64 +} + +func produce(closer chan struct{}, broker *sarama.Broker, in chan []message) { + for { + select { + case _, ok := <-closer: + if !ok { + return + } + case batch := <-in: + req := &sarama.ProduceRequest{ + RequiredAcks: sarama.WaitForAll, + Timeout: 1000, + } + for _, m := range batch { + msg := sarama.Message{ + Codec: sarama.CompressionNone, + Key: []byte(m.Key), + Value: []byte(m.Value), + Set: nil, } + req.AddMessage(config.produce.topic, m.Partition, &msg) } - }, + resp, err := broker.Produce(req) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", err) + return + } + + offsets := map[int32]partitionProduceResult{} + for _, m := range batch { + block := resp.GetBlock(config.produce.topic, m.Partition) + if block.Err != sarama.ErrNoError { + fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", block.Err.Error()) + return + } + + if r, ok := offsets[m.Partition]; ok { + offsets[m.Partition] = partitionProduceResult{start: block.Offset, count: r.count + 1} + } else { + offsets[m.Partition] = partitionProduceResult{start: block.Offset, count: 1} + } + } + + for p, o := range offsets { + fmt.Fprintf( + os.Stdout, + `{"partition": %v, "startOffset": %v, "count": %v} +`, + p, + o.start, + o.count, + ) + } + } } } -func readStdinLines(out chan string, stop chan struct{}) { +func readStdinLines(stop chan struct{}, out chan string) { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { line := scanner.Text() out <- line } - stop <- struct{}{} + close(stop) }