Skip to content

Commit

Permalink
fix possible zero queueSize
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 4, 2020
1 parent 6e0a8e4 commit 23f3cab
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Refers to [design](./design.md) for how it works.
- Support multiple Kafka client: kafka-go(recommended), sarama.
- Support multiple Kafka security mechanisms: SSL, SASL/PLAIN, SASL/SCRAM, SASL/GSSAPI and combinations of them.
- Support multiple sinker tasks, each runs on parallel.
- Support multiply kafka and ClickHouse clusters.
- Support multiple kafka and ClickHouse clusters.
- Bulk insert (by config `bufferSize` and `flushInterval`).
- Parse messages concurrently.
- Write batches concurrently.
Expand Down Expand Up @@ -240,7 +240,7 @@ Kerberos setup is complex. Please ensure [`kafka-console-consumer.sh`](https://d

Every message is routed to a determined ClickHouse shard.

By default, the node number is caculated by `(kafka_offset/roundup(batch_size))%clickhouse_shards`, where `roundup()` round upward an unsigned integer to the the nearest 2^n.
By default, the shard number is caculated by `(kafka_offset/roundup(batch_size))%clickhouse_shards`, where `roundup()` round upward an unsigned integer to the the nearest 2^n.

This above expression can be customized with `shardingKey` and `shardingPolicy`. `shardingKey` value is a column name. `shardingPolicy` value could be:

Expand Down
6 changes: 3 additions & 3 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
}
}
}
// 2. Stop all tasks in parallel found at the step 2.
// 2. Stop all tasks in parallel found at previous step.
for _, taskName := range tasksToStop {
if task, ok := s.tasks[taskName]; ok {
task.NotifyStop()
Expand All @@ -427,7 +427,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
log.Warnf("Failed to stop task %s. It's disappeared.", taskName)
}
}
// 3. Initailize all tasks which is new or its config differ.
// 3. Initailize all tasks which are new or their config differ.
var newTasks []*task.Service
if taskNames, ok := newCfg.Assignment[selfAddr]; ok {
for _, taskName := range taskNames {
Expand All @@ -450,7 +450,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
totalConn := pool.GetTotalConn()
util.GlobalWritingPool.Resize(totalConn)

// 5. Start new tasks. We don't do it at step 4 in order to avoid goroutine leak due to errors raised by later steps.
// 5. Start new tasks. We don't do it at step 3 in order to avoid goroutine leak due to errors raised by later steps.
for _, t := range newTasks {
go t.Run(s.ctx)
}
Expand Down
6 changes: 5 additions & 1 deletion util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func InitGlobalParsingPool(maxWorkers int) {

// InitGlobalWritingPool initialize GlobalWritingPool
func InitGlobalWritingPool(maxWorkers int) {
GlobalWritingPool = NewWorkerPool(maxWorkers, runtime.NumCPU()/4)
queueSize := runtime.NumCPU() / 4
if queueSize < 3 {
queueSize = 3
}
GlobalWritingPool = NewWorkerPool(maxWorkers, queueSize)
}

// StringContains check if contains string in array
Expand Down

0 comments on commit 23f3cab

Please sign in to comment.