diff --git a/README.md b/README.md index 0553990a..e22b01a0 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Refers to [design](./design.md) for how it works. - Support multiple Kafka client: kafka-go(recommended), sarama. - Support multiple Kafka security mechanisms: SSL, SASL/PLAIN, SASL/SCRAM, SASL/GSSAPI and combinations of them. - Support multiple sinker tasks, each runs on parallel. -- Support multiply kafka and ClickHouse clusters. +- Support multiple kafka and ClickHouse clusters. - Bulk insert (by config `bufferSize` and `flushInterval`). - Parse messages concurrently. - Write batches concurrently. @@ -240,7 +240,7 @@ Kerberos setup is complex. Please ensure [`kafka-console-consumer.sh`](https://d Every message is routed to a determined ClickHouse shard. -By default, the node number is caculated by `(kafka_offset/roundup(batch_size))%clickhouse_shards`, where `roundup()` round upward an unsigned integer to the the nearest 2^n. +By default, the shard number is caculated by `(kafka_offset/roundup(batch_size))%clickhouse_shards`, where `roundup()` round upward an unsigned integer to the the nearest 2^n. This above expression can be customized with `shardingKey` and `shardingPolicy`. `shardingKey` value is a column name. `shardingPolicy` value could be: diff --git a/cmd/clickhouse_sinker/main.go b/cmd/clickhouse_sinker/main.go index f19474ea..073d13cd 100644 --- a/cmd/clickhouse_sinker/main.go +++ b/cmd/clickhouse_sinker/main.go @@ -413,7 +413,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) { } } } - // 2. Stop all tasks in parallel found at the step 2. + // 2. Stop all tasks in parallel found at previous step. for _, taskName := range tasksToStop { if task, ok := s.tasks[taskName]; ok { task.NotifyStop() @@ -427,7 +427,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) { log.Warnf("Failed to stop task %s. It's disappeared.", taskName) } } - // 3. Initailize all tasks which is new or its config differ. + // 3. Initailize all tasks which are new or their config differ. var newTasks []*task.Service if taskNames, ok := newCfg.Assignment[selfAddr]; ok { for _, taskName := range taskNames { @@ -450,7 +450,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) { totalConn := pool.GetTotalConn() util.GlobalWritingPool.Resize(totalConn) - // 5. Start new tasks. We don't do it at step 4 in order to avoid goroutine leak due to errors raised by later steps. + // 5. Start new tasks. We don't do it at step 3 in order to avoid goroutine leak due to errors raised by later steps. for _, t := range newTasks { go t.Run(s.ctx) } diff --git a/util/common.go b/util/common.go index 02963de4..16be5669 100644 --- a/util/common.go +++ b/util/common.go @@ -50,7 +50,11 @@ func InitGlobalParsingPool(maxWorkers int) { // InitGlobalWritingPool initialize GlobalWritingPool func InitGlobalWritingPool(maxWorkers int) { - GlobalWritingPool = NewWorkerPool(maxWorkers, runtime.NumCPU()/4) + queueSize := runtime.NumCPU() / 4 + if queueSize < 3 { + queueSize = 3 + } + GlobalWritingPool = NewWorkerPool(maxWorkers, queueSize) } // StringContains check if contains string in array