Skip to content

Commit

Permalink
produce: adds batching (fix #18).
Browse files Browse the repository at this point in the history
- produce now waits for either batch size or timeout limit to be hit
  before sending out a message.

- produce now prints JSON object with result info in case of success.
  • Loading branch information
fgeller committed Apr 10, 2016
1 parent e81ffbe commit d7470c5
Showing 1 changed file with 140 additions and 30 deletions.
170 changes: 140 additions & 30 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/Shopify/sarama"
)

type produceConfig struct {
topic string
brokers []string
batch int
timeout time.Duration
verbose bool
args struct {
topic string
brokers string
batch int
timeout time.Duration
verbose bool
}
}

Expand All @@ -28,8 +35,36 @@ type message struct {

func produceFlags() *flag.FlagSet {
flags := flag.NewFlagSet("produce", flag.ExitOnError)
flags.StringVar(&config.produce.args.topic, "topic", "", "Topic to produce to (required).")
flags.StringVar(&config.produce.args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).")
flags.StringVar(
&config.produce.args.topic,
"topic",
"",
"Topic to produce to (required).",
)
flags.StringVar(
&config.produce.args.brokers,
"brokers",
"",
"Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).",
)
flags.IntVar(
&config.produce.args.batch,
"batch",
1,
"Max size of a batch before sending it off",
)
flags.DurationVar(
&config.produce.args.timeout,
"timeout",
50*time.Millisecond,
"Duration to wait for batch to be filled before sending it off",
)
flags.BoolVar(
&config.produce.args.verbose,
"v",
false,
"Verbose output",
)

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of produce:")
Expand Down Expand Up @@ -109,6 +144,8 @@ func produceParseArgs() {
}
}

config.produce.batch = config.produce.args.batch
config.produce.timeout = config.produce.args.timeout
}

func produceCommand() command {
Expand Down Expand Up @@ -137,54 +174,127 @@ func produceCommand() command {
}()

stdinLines := make(chan string)
go readStdinLines(stdinLines, closer)
messages := make(chan message)
batchedMessages := make(chan []message)
go readStdinLines(closer, stdinLines)
go batchRecords(closer, messages, batchedMessages)
go produce(closer, broker, batchedMessages)

for {
select {
case <-closer:
return
case _, ok := <-closer:
if !ok {
return
}
case l := <-stdinLines:
var in message
err := json.Unmarshal([]byte(l), &in)
if err != nil {
if config.produce.verbose {
fmt.Printf("Failed to unmarshal input, falling back to defaults. err=%v\n", err)
}
in = message{Key: l, Value: l, Partition: 0}
}
messages <- in
}
}
},
}
}

req := &sarama.ProduceRequest{
RequiredAcks: sarama.WaitForAll,
Timeout: 1000,
}
msg := sarama.Message{
Codec: sarama.CompressionNone,
Key: []byte(in.Key),
Value: []byte(in.Value),
Set: nil,
}
req.AddMessage(config.produce.topic, in.Partition, &msg)
func batchRecords(closer chan struct{}, in chan message, out chan []message) {
messages := []message{}
send := func() {
out <- messages
messages = []message{}
}

resp, err := broker.Produce(req)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", err)
return
}
for {
select {
case _, ok := <-closer:
if !ok {
fmt.Fprintf(os.Stderr, "Stopping, discarding %v messages.\n", len(messages))
return
}
case m := <-in:
messages = append(messages, m)
if len(messages) > 0 && len(messages) >= config.produce.batch {
send()
}
case <-time.After(config.produce.timeout):
if len(messages) > 0 {
send()
}
}
}
}

block := resp.GetBlock(config.produce.topic, in.Partition)
if block.Err != sarama.ErrNoError {
fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", block.Err.Error())
return
}
fmt.Fprintf(os.Stderr, "Sent to partition %d at offset %d with key %s.\n", in.Partition, block.Offset, in.Key)
type partitionProduceResult struct {
start int64
count int64
}

func produce(closer chan struct{}, broker *sarama.Broker, in chan []message) {
for {
select {
case _, ok := <-closer:
if !ok {
return
}
case batch := <-in:
req := &sarama.ProduceRequest{
RequiredAcks: sarama.WaitForAll,
Timeout: 1000,
}
for _, m := range batch {
msg := sarama.Message{
Codec: sarama.CompressionNone,
Key: []byte(m.Key),
Value: []byte(m.Value),
Set: nil,
}
req.AddMessage(config.produce.topic, m.Partition, &msg)
}
},
resp, err := broker.Produce(req)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", err)
return
}

offsets := map[int32]partitionProduceResult{}
for _, m := range batch {
block := resp.GetBlock(config.produce.topic, m.Partition)
if block.Err != sarama.ErrNoError {
fmt.Fprintf(os.Stderr, "Failed to send message, quitting. err=%s\n", block.Err.Error())
return
}

if r, ok := offsets[m.Partition]; ok {
offsets[m.Partition] = partitionProduceResult{start: block.Offset, count: r.count + 1}
} else {
offsets[m.Partition] = partitionProduceResult{start: block.Offset, count: 1}
}
}

for p, o := range offsets {
fmt.Fprintf(
os.Stdout,
`{"partition": %v, "startOffset": %v, "count": %v}
`,
p,
o.start,
o.count,
)
}
}
}
}

func readStdinLines(out chan string, stop chan struct{}) {
func readStdinLines(stop chan struct{}, out chan string) {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
line := scanner.Text()
out <- line
}
stop <- struct{}{}
close(stop)
}

0 comments on commit d7470c5

Please sign in to comment.