Skip to content

Commit

Permalink
bulker topic_manager: support bulker sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Mar 5, 2024
1 parent 25a661a commit 777a53c
Showing 1 changed file with 85 additions and 85 deletions.
170 changes: 85 additions & 85 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type TopicManager struct {

repository *Repository
cron *Cron
// consumedTopics by destinationId. Consumed topics are topics that have consumer started
consumedTopics map[string]utils.Set[string]
// destinationTopics by destinationId.
destinationTopics map[string]utils.Set[string]
// topicLastActiveDate last message timestamp found in topic
topicLastActiveDate map[string]*time.Time
abandonedTopics utils.Set[string]
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewTopicManager(appContext *Context) (*TopicManager, error) {
cron: appContext.cron,
kaftaAdminClient: admin,
kafkaBootstrapServer: appContext.config.KafkaBootstrapServers,
consumedTopics: make(map[string]utils.Set[string]),
destinationTopics: make(map[string]utils.Set[string]),
topicLastActiveDate: make(map[string]*time.Time),
batchProducer: appContext.batchProducer,
streamProducer: appContext.streamProducer,
Expand Down Expand Up @@ -188,11 +188,6 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics

for topic, topicMetadata := range metadata.Topics {
allTopics.Put(topic)
hash := utils.HashStringInt(topic)
topicShardNum := hash % uint32(tm.config.ShardsCount)
if int(topicShardNum) != tm.shardNumber {
continue
}
if tm.abandonedTopics.Contains(topic) {
abandonedTopicsCount++
continue
Expand All @@ -209,94 +204,99 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
continue
}
var dstTopics utils.Set[string]
if dstTopics, ok = tm.consumedTopics[destinationId]; !ok {
if dstTopics, ok = tm.destinationTopics[destinationId]; !ok {
dstTopics = utils.NewSet[string]()
tm.consumedTopics[destinationId] = dstTopics
tm.destinationTopics[destinationId] = dstTopics
}
if !dstTopics.Contains(topic) {
tm.Debugf("Found topic %s for destination %s and table %s", topic, destinationId, tableName)
destination := tm.repository.GetDestination(destinationId)
if destination == nil {
tm.Debugf("No destination found for topic: %s", topic)
tm.abandonedTopics.Put(topic)
continue
}
switch mode {
case "stream":
streamConsumer, err := NewStreamConsumer(tm.repository, destination, topic, tm.config, tm.kafkaConfig, tm.streamProducer, tm.eventsLogService)
if err != nil {
topicsErrorsByMode[mode]++
tm.SystemErrorf("Failed to create consumer for destination topic: %s: %v", topic, err)
continue
} else {
tm.Infof("Stream consumer for destination topic %s was started.", topic)
}
tm.streamConsumers[destinationId] = append(tm.streamConsumers[destinationId], streamConsumer)
case "batch":
batchPeriodSec := utils.Nvl(int(bulker.BatchFrequencyOption.Get(destination.streamOptions)*60), tm.config.BatchRunnerPeriodSec)
// check topic partitions count
var err error
if len(topicMetadata.Partitions) > 1 {
metrics.ConsumerErrors(topic, mode, destinationId, tableName, "invalid_partitions_count").Inc()
err = fmt.Errorf("Topic has more than 1 partition. Batch Consumer supports only topics with a single partition")
}
var batchConsumer *BatchConsumerImpl
if err == nil {
batchConsumer, err = NewBatchConsumer(tm.repository, destinationId, batchPeriodSec, topic, tm.config, tm.kafkaConfig, tm.batchProducer, tm.eventsLogService)
}
if err != nil {
topicsErrorsByMode[mode]++
tm.Errorf("Failed to create batch consumer for destination topic: %s: %v", topic, err)
continue
}
tm.batchConsumers[destinationId] = append(tm.batchConsumers[destinationId], batchConsumer)
_, err = tm.cron.AddBatchConsumer(batchConsumer)
if err != nil {
topicsErrorsByMode[mode]++
batchConsumer.Retire()
tm.Errorf("Failed to schedule consumer for destination topic: %s: %v", topic, err)
continue
} else {
tm.Infof("Consumer for destination topic %s was scheduled with batch period %ds.", topic, batchConsumer.BatchPeriodSec())
}
case retryTopicMode:
retryPeriodSec := utils.Nvl(int(bulker.RetryFrequencyOption.Get(destination.streamOptions)*60), tm.config.BatchRunnerRetryPeriodSec)
var err error
if len(topicMetadata.Partitions) > 1 {
metrics.ConsumerErrors(topic, mode, destinationId, tableName, "invalid_partitions_count").Inc()
err = fmt.Errorf("Topic has more than 1 partition. Retry Consumer supports only topics with a single partition")
}
var retryConsumer *RetryConsumer
if err == nil {
retryConsumer, err = NewRetryConsumer(tm.repository, destinationId, retryPeriodSec, topic, tm.config, tm.kafkaConfig, tm.batchProducer)
}
if err != nil {
topicsErrorsByMode[mode]++
tm.Errorf("Failed to create retry consumer for destination topic: %s: %v", topic, err)
hash := utils.HashStringInt(topic)
topicShardNum := hash % uint32(tm.config.ShardsCount)
startConsumer := int(topicShardNum) == tm.shardNumber
if startConsumer {
tm.Debugf("Found topic %s for destination %s and table %s", topic, destinationId, tableName)
destination := tm.repository.GetDestination(destinationId)
if destination == nil {
tm.Debugf("No destination found for topic: %s", topic)
tm.abandonedTopics.Put(topic)
continue
}
tm.retryConsumers[destinationId] = append(tm.retryConsumers[destinationId], retryConsumer)
_, err = tm.cron.AddBatchConsumer(retryConsumer)
if err != nil {
switch mode {
case "stream":
streamConsumer, err := NewStreamConsumer(tm.repository, destination, topic, tm.config, tm.kafkaConfig, tm.streamProducer, tm.eventsLogService)
if err != nil {
topicsErrorsByMode[mode]++
tm.SystemErrorf("Failed to create consumer for destination topic: %s: %v", topic, err)
continue
} else {
tm.Infof("Stream consumer for destination topic %s was started.", topic)
}
tm.streamConsumers[destinationId] = append(tm.streamConsumers[destinationId], streamConsumer)
case "batch":
batchPeriodSec := utils.Nvl(int(bulker.BatchFrequencyOption.Get(destination.streamOptions)*60), tm.config.BatchRunnerPeriodSec)
// check topic partitions count
var err error
if len(topicMetadata.Partitions) > 1 {
metrics.ConsumerErrors(topic, mode, destinationId, tableName, "invalid_partitions_count").Inc()
err = fmt.Errorf("Topic has more than 1 partition. Batch Consumer supports only topics with a single partition")
}
var batchConsumer *BatchConsumerImpl
if err == nil {
batchConsumer, err = NewBatchConsumer(tm.repository, destinationId, batchPeriodSec, topic, tm.config, tm.kafkaConfig, tm.batchProducer, tm.eventsLogService)
}
if err != nil {
topicsErrorsByMode[mode]++
tm.Errorf("Failed to create batch consumer for destination topic: %s: %v", topic, err)
continue
}
tm.batchConsumers[destinationId] = append(tm.batchConsumers[destinationId], batchConsumer)
_, err = tm.cron.AddBatchConsumer(batchConsumer)
if err != nil {
topicsErrorsByMode[mode]++
batchConsumer.Retire()
tm.Errorf("Failed to schedule consumer for destination topic: %s: %v", topic, err)
continue
} else {
tm.Infof("Consumer for destination topic %s was scheduled with batch period %ds.", topic, batchConsumer.BatchPeriodSec())
}
case retryTopicMode:
retryPeriodSec := utils.Nvl(int(bulker.RetryFrequencyOption.Get(destination.streamOptions)*60), tm.config.BatchRunnerRetryPeriodSec)
var err error
if len(topicMetadata.Partitions) > 1 {
metrics.ConsumerErrors(topic, mode, destinationId, tableName, "invalid_partitions_count").Inc()
err = fmt.Errorf("Topic has more than 1 partition. Retry Consumer supports only topics with a single partition")
}
var retryConsumer *RetryConsumer
if err == nil {
retryConsumer, err = NewRetryConsumer(tm.repository, destinationId, retryPeriodSec, topic, tm.config, tm.kafkaConfig, tm.batchProducer)
}
if err != nil {
topicsErrorsByMode[mode]++
tm.Errorf("Failed to create retry consumer for destination topic: %s: %v", topic, err)
continue
}
tm.retryConsumers[destinationId] = append(tm.retryConsumers[destinationId], retryConsumer)
_, err = tm.cron.AddBatchConsumer(retryConsumer)
if err != nil {
topicsErrorsByMode[mode]++
retryConsumer.Retire()
tm.Errorf("Failed to schedule retry consumer for destination topic: %s: %v", topic, err)
continue
} else {
tm.Infof("Retry consumer for destination topic %s was scheduled with batch period %ds", topic, retryConsumer.BatchPeriodSec())
}
case deadTopicMode:
tm.Debugf("Found topic %s for 'dead' events", topic)
default:
topicsErrorsByMode[mode]++
retryConsumer.Retire()
tm.Errorf("Failed to schedule retry consumer for destination topic: %s: %v", topic, err)
continue
} else {
tm.Infof("Retry consumer for destination topic %s was scheduled with batch period %ds", topic, retryConsumer.BatchPeriodSec())
tm.Errorf("Unknown stream mode: %s for topic: %s", mode, topic)
}
case deadTopicMode:
tm.Debugf("Found topic %s for 'dead' events", topic)
default:
topicsErrorsByMode[mode]++
tm.Errorf("Unknown stream mode: %s for topic: %s", mode, topic)
topicsCountByMode[mode]++
}
topicsCountByMode[mode]++
dstTopics.Put(topic)
}
}
for _, destination := range tm.repository.GetDestinations() {
dstTopics, hasTopics := tm.consumedTopics[destination.Id()]
dstTopics, hasTopics := tm.destinationTopics[destination.Id()]
for mode, config := range tm.requiredDestinationTopics {
topicId, _ := MakeTopicId(destination.Id(), mode, allTablesToken, false)
if (!hasTopics || !dstTopics.Contains(topicId)) && !staleTopics.Contains(topicId) {
Expand Down Expand Up @@ -480,7 +480,7 @@ func (tm *TopicManager) changeListener(changes RepositoryChange) {
consumer.Retire()
delete(tm.streamConsumers, deletedDstId)
}
delete(tm.consumedTopics, deletedDstId)
delete(tm.destinationTopics, deletedDstId)
tm.Unlock()
}
if len(changes.AddedDestinations) > 0 {
Expand Down

0 comments on commit 777a53c

Please sign in to comment.