-
Notifications
You must be signed in to change notification settings - Fork 1
/
main_consumer.go
60 lines (51 loc) · 1.63 KB
/
main_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main
import (
"time"
"github.com/urfave/cli/v2"
)
func mainStore(c *cli.Context) error {
conf := newConfig(
c.String("log-level"),
c.Uint("static-cache-ttl"),
c.String("collector-url"),
)
clickhouseConfig := clickhouseConfig{
servers: c.String("clickhouse-servers"),
database: c.String("clickhouse-database"),
username: c.String("clickhouse-username"),
password: c.String("clickhouse-password"),
maxExecutionTime: c.Int("clickhouse-max-execution-time"),
dialTimeout: c.Int("clickhouse-dial-timeout"),
debug: c.Bool("test-mode"),
compressionLZ4: c.Bool("clickhouse-compression-lz4"),
maxIdleConns: c.Int("clickhouse-max-idle-conns"),
maxOpenConns: c.Int("clickhouse-max-open-conns"),
connMaxLifetime: c.Int("clickhouse-conn-max-lifetime"),
maxBlockSize: c.Int("clickhouse-max-block-size"),
maxInsertBlockSize: c.Int("clickhouse-max-insert-block-size"),
rootCAPath: c.String("clickhouse-root-ca"),
clientCertPath: c.String("clickhouse-client-cert"),
clientKeyPath: c.String("clickhouse-client-key"),
}
checkInterval := time.Duration(c.Int("check-interval")) * time.Second
for {
func() {
r := workerRun(&clickhouseConfig, conf, c.String("redis-uri"))
if r.e != nil {
conf.getLogger().
Error().
Str("state", r.errorState).
Str("error", r.e.Error()).
Send()
} else {
conf.getLogger().
Info().
Int64("records", r.records).
Int64("clientErrors", r.clientErrors).
Float64("timeTaken", r.timeTaken).
Send()
}
}()
time.Sleep(checkInterval)
}
}