-
Notifications
You must be signed in to change notification settings - Fork 1
/
to_kafka.go
181 lines (140 loc) · 4.69 KB
/
to_kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
to_kafka reads delimited data from stdin and writes it to Kafka.
Usage
Supported command line options are
to_kafka [OPTIONS] topic
Application Options:
-b, --brokers= Broker addresses (can be given multiple times) (localhost:9092)
-d, --delimiter= Delimiter byte to use when reading stdin. Remember to either use single quotes around the option or quote the
backslash (\\n instead of \n) if using escape sequences (\n)
-v, --verbose Print extra debugging cruft to stderr
Help Options:
-h, --help Show this help message
Arguments:
topic: Topic to write to
To read newline-delimited data from stdin and write it to topic 'sometopic':
to_kafka sometopic
Piping a file to Kafka:
cat somefile|to_kafka sometopic
Use comma as a delimiter:
echo -n 'herp,derp,durr'|to_kafka -d, sometopic
*/
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/ORBAT/krater"
"gopkg.in/Shopify/sarama.v1"
f "github.com/jessevdk/go-flags"
)
// Char represents a single-byte character given via the command line. It can marshal itself to either a character like ',' (sans quotes) or an escape
// sequence like '\t', and it can unmarshal either escape sequences or plain 'ol characters.
type Char byte
// MarshalFlag marshals the Char into a string, which can be either a single character or an escape sequence
func (c *Char) MarshalFlag() (s string, err error) {
s = fmt.Sprintf("%+q", byte(*c))[1:] // the output of %+q has ' characters at the start and end, so remove those
s = s[:len(s)-1]
return
}
// UnmarshalFlag tries to unmarshal value into a Char. value must be either a single character or an escape sequence sequence that produces a
// single byte like \n (on UN*X) or \x00.
func (c *Char) UnmarshalFlag(value string) (err error) {
if len(value) > 1 && value[0] != '\\' {
return fmt.Errorf("Delimiter (%s) must be either 1 character or an escape sequence like \\n or \\x00", value)
}
uq, err := strconv.Unquote(`"` + value + `"`)
if err != nil {
return
}
if len(uq) != 1 {
return fmt.Errorf("Delimiter ('%s') produced a multibyte result", value)
}
*c = Char(uq[0])
return
}
var opts struct {
Brokers []string `short:"b" long:"brokers" description:"Broker addresses (can be given multiple times)" default:"localhost:9092"`
Delimiter Char `short:"d" long:"delimiter" default:"\\n" description:"Delimiter byte to use when reading stdin. Remember to either use single quotes around the option or quote the backslash (\\\\n instead of \\n) if using escape sequences"`
Verbose bool `short:"v" long:"verbose" description:"Print extra debugging cruft to stderr"`
Args struct {
Topic string `name:"topic" description:"Topic to write to"`
} `positional-args:"true" required:"true"`
}
func main() {
if _, err := f.Parse(&opts); err != nil {
os.Exit(1)
}
if opts.Verbose {
flag := log.Ldate | log.Lmicroseconds | log.Lshortfile
log.SetFlags(flag)
log.SetOutput(os.Stderr)
sarama.Logger = log.New(os.Stderr, "[Sarama] ", flag)
krater.LogTo(os.Stderr)
} else {
log.SetOutput(ioutil.Discard)
}
dels, _ := opts.Delimiter.MarshalFlag()
log.Printf("Delimiter: '%s' (%#x), brokers: %s\ntopic: %s\n", dels, byte(opts.Delimiter), strings.Join(opts.Brokers, ", "), opts.Args.Topic)
pc := sarama.NewConfig()
pc.Producer.Return.Successes = true
pc.Producer.Return.Errors = true
pc.Producer.RequiredAcks = sarama.WaitForLocal
kp, err := sarama.NewAsyncProducer(opts.Brokers, pc)
if err != nil {
fmt.Println("Error creating producer:", err.Error())
os.Exit(2)
}
writer := krater.NewAckingWriter(opts.Args.Topic, kp, 1)
termCh := make(chan os.Signal, 1)
signal.Notify(termCh, syscall.SIGINT, syscall.SIGTERM)
stopCh := make(chan struct{})
r := bufio.NewReader(os.Stdin)
defer func() {
if err := writer.Close(); err != nil {
panic(err)
}
}()
go func() {
totalN := int64(0)
for {
n, err := readAndPub(r, byte(opts.Delimiter), writer)
totalN += int64(n)
if err != nil {
if err != io.EOF {
fmt.Printf("Error after %d bytes: %s\n", totalN, err.Error())
}
fmt.Printf("Wrote %d bytes to Kafka\n", totalN)
close(stopCh)
return
}
}
}()
select {
case sig := <-termCh:
fmt.Println("\ngot signal", sig.String())
case <-stopCh:
}
}
// TODO(ORBAT): use bufio.Scanner instead of bufio.Reader
func readAndPub(r *bufio.Reader, delim byte, p io.Writer) (n int, err error) {
line, err := r.ReadBytes(delim)
if len(line) > 0 && line[len(line)-1] == delim {
line = line[:len(line)-1] // remove delimiter
}
if len(line) == 0 {
return
}
if err != nil && err != io.EOF {
return
}
n, err = p.Write(line)
return
}