Skip to content

Commit

Permalink
minor improvement of systest config
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 8, 2020
1 parent 7c67299 commit d341f99
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 67 deletions.
1 change: 1 addition & 0 deletions docker/conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"bufferSize": 90000,
"minBufferSize": 2000,
"flushInterval": 5,
"layoutDateTime": "2006-01-02 15:04:05.999999999Z07:00",
"logLevel": "debug"
}
}
42 changes: 21 additions & 21 deletions docker/conf/tasks/test1.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
{

"name" : "test1",

"name": "test1",
"kafka": "kfk1",
"topic": "topic1",
"consumerGroup" : "test_sinker",
"earliest" : true,
"parser" : "json",
"clickhouse" : "ch1",

"tableName" : "test1",

"dims" : [
{"name" : "timestamp" , "type" : "UInt64"},
{"name" : "name" , "type" : "String"}
"consumerGroup": "test_sinker",
"earliest": true,
"parser": "json",
"clickhouse": "ch1",
"tableName": "test1",
"dims": [
{
"name": "time",
"type": "DateTime"
},
{
"name": "name",
"type": "String"
},
{
"name": "value",
"type": "Float32"
}
],

"metrics" : [
{"name" : "value" , "type" : "Float32"}
],

"bufferSize" : 50000
}

"bufferSize": 50000
}
27 changes: 12 additions & 15 deletions docker/conf/tasks/test_auto_schema.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
{

"name" : "test_auto_schema",

"name": "test_auto_schema",
"kafka": "kfk1",
"topic": "topic1",
"consumerGroup" : "test_auto_schema",
"earliest" : true,
"parser" : "json",
"clickhouse" : "ch1",

"autoSchema" : true,
"tableName" : "test_auto_schema",
"excludeColumns" : ["day", "time"],

"bufferSize" : 50000
}

"consumerGroup": "test_auto_schema",
"earliest": true,
"parser": "json",
"clickhouse": "ch1",
"autoSchema": true,
"tableName": "test_auto_schema",
"excludeColumns": [
"day"
],
"bufferSize": 50000
}
22 changes: 9 additions & 13 deletions go.test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,33 @@ curl "localhost:8123" -d 'DROP TABLE IF EXISTS test1'
curl "localhost:8123" -d 'CREATE TABLE IF NOT EXISTS test1
(
`day` Date DEFAULT toDate(time),
`time` DateTime DEFAULT toDateTime(timestamp / 1000),
`timestamp` UInt64,
`time` DateTime,
`name` String,
`value` Float64
)
ENGINE = MergeTree
PARTITION BY day
ORDER BY time'
ORDER BY (time, name)'

curl "localhost:8123" -d 'DROP TABLE IF EXISTS test_auto_schema'
curl "localhost:8123" -d 'CREATE TABLE IF NOT EXISTS test_auto_schema
(
`day` Date DEFAULT toDate(time),
`time` DateTime DEFAULT toDateTime(timestamp / 1000),
`timestamp` UInt64,
`time` DateTime,
`name` String,
`value` Float64
)
ENGINE = MergeTree
PARTITION BY day
ORDER BY time'

ORDER BY (time, name)'

## send the messages to kafka
current_timestamp=`date +%s`000
now=`date --rfc-3339=ns`
for i in `seq 1 100000`;do
echo "{\"timestamp\" : \"${current_timestamp}\", \"name\" : \"sundy-li\", \"value\" : \"$i\" }"
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : \"$i\" }"
done > a.json
echo "generated a.json"
echo "cat /tmp/a.json | kafka-console-producer --topic topic1 --broker-list localhost:9092" > send.sh

sudo docker cp a.json kafka:/tmp/
sudo docker cp send.sh kafka:/tmp/
sudo docker exec kafka sh /tmp/send.sh
Expand All @@ -53,9 +50,8 @@ echo "Got test_auto_schema count => $count"


## reset kafka consumer-group offsets
echo "kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_sinker --all-topics --to-earliest; kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_auto_schema --all-topics --to-earliest" > reset-offsets.sh
sudo docker cp reset-offsets.sh kafka:/tmp/
sudo docker exec kafka sh /tmp/reset-offsets.sh
sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_sinker --all-topics --to-earliest
sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_auto_schema --all-topics --to-earliest

## truncate tables
curl "localhost:8123" -d 'TRUNCATE TABLE test1'
Expand Down
22 changes: 10 additions & 12 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func shouldReconnect(err error) bool {
// LoopWrite will dead loop to write the records
func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Batch) error) {
var err error
times := c.chCfg.RetryTimes
var times int
for {
if err = c.write(batch); err == nil {
for {
Expand All @@ -150,13 +150,12 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Ba
log.Infof("%s: ClickHouse.loopWrite quit due to the context has been cancelled", c.taskCfg.Name)
return
}
log.Errorf("%s: committing offset(try #%d) failed with error %+v", c.taskCfg.Name, c.chCfg.RetryTimes-times, err)
if c.chCfg.RetryTimes > 0 {
times--
if times <= 0 {
os.Exit(-1)
}
log.Errorf("%s: committing offset(try #%d) failed with error %+v", c.taskCfg.Name, times, err)
times++
if c.chCfg.RetryTimes <= 0 || times < c.chCfg.RetryTimes {
time.Sleep(10 * time.Second)
} else {
os.Exit(-1)
}
}
}
Expand All @@ -166,12 +165,11 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, callback func(batch *model.Ba
}
log.Errorf("%s: flush batch(try #%d) failed with error %+v", c.taskCfg.Name, c.chCfg.RetryTimes-times, err)
statistics.FlushMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize))
if c.chCfg.RetryTimes > 0 {
times--
if times <= 0 {
os.Exit(-1)
}
times++
if c.chCfg.RetryTimes <= 0 || times < c.chCfg.RetryTimes {
time.Sleep(10 * time.Second)
} else {
os.Exit(-1)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func FreeConn(name string) {
defer lock.Unlock()
if cc, ok := poolMaps[name]; ok {
cc.ref--
if cc.ref <= 0 {
if cc.ref == 0 {
delete(poolMaps, name)
}
for _, conn := range cc.connections {
if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil {
err = errors.Wrapf(err, "")
log.Errorf("got error: %+v", err)
for _, conn := range cc.connections {
if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil {
err = errors.Wrapf(err, conn.dsn)
log.Errorf("got error: %+v", err)
}
}
}
}
Expand Down

0 comments on commit d341f99

Please sign in to comment.