Skip to content

Commit

Permalink
bulker: fix stale topic logic
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 27, 2024
1 parent bdb1c05 commit aa2e571
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ type TopicManager struct {
repository *Repository
cron *Cron
// consumedTopics by destinationId. Consumed topics are topics that have consumer started
consumedTopics map[string]utils.Set[string]
nonEmptyTopics utils.Set[string]
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]
consumedTopics map[string]utils.Set[string]
// topicLastActiveDate last message timestamp found in topic
topicLastActiveDate map[string]*time.Time
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]

//batch consumers by destinationId
batchConsumers map[string][]BatchConsumer
Expand Down Expand Up @@ -81,6 +82,7 @@ func NewTopicManager(appContext *Context) (*TopicManager, error) {
kaftaAdminClient: admin,
kafkaBootstrapServer: appContext.config.KafkaBootstrapServers,
consumedTopics: make(map[string]utils.Set[string]),
topicLastActiveDate: make(map[string]*time.Time),
batchProducer: appContext.batchProducer,
streamProducer: appContext.streamProducer,
eventsLogService: appContext.eventsLogService,
Expand Down Expand Up @@ -132,7 +134,7 @@ func (tm *TopicManager) Start() {
}

func (tm *TopicManager) LoadMetadata() {
nonEmptyTopics := utils.NewSet[string]()
topicsLastMessageDates := map[string]*time.Time{}
metadata, err := tm.kaftaAdminClient.GetMetadata(nil, true, tm.config.KafkaAdminMetadataTimeoutMs)
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
Expand All @@ -149,28 +151,30 @@ func (tm *TopicManager) LoadMetadata() {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 {
nonEmptyTopics.Put(*tp.Topic)
if offset.Offset >= 0 && offset.Timestamp > 0 {
lastMessageDate := time.UnixMilli(offset.Timestamp)
topicsLastMessageDates[*tp.Topic] = &lastMessageDate
}
}
tm.Debugf("Got topic offsets for %d topics in %v", len(nonEmptyTopics), time.Since(start))
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
}

if err != nil {
metrics.TopicManagerError("load_metadata_error").Inc()
tm.Errorf("Error getting metadata: %v", err)
} else {
tm.processMetadata(metadata, nonEmptyTopics)
tm.processMetadata(metadata, topicsLastMessageDates)
}
}

func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics utils.Set[string]) {
func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics map[string]*time.Time) {
tm.Lock()
defer tm.Unlock()
start := time.Now()
if nonEmptyTopics.Size() > 0 {
tm.nonEmptyTopics = nonEmptyTopics
for k, v := range nonEmptyTopics {
tm.topicLastActiveDate[k] = v
}
staleTopicsCutOff := time.Now().Add(-1 * time.Duration(tm.config.KafkaTopicRetentionHours) * time.Hour)
var abandonedTopicsCount float64
var otherTopicsCount float64
topicsCountByMode := make(map[string]float64)
Expand All @@ -185,10 +189,10 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
abandonedTopicsCount++
continue
}
ok := tm.nonEmptyTopics.Contains(topic)
if !ok {
lastMessageDate, ok := tm.topicLastActiveDate[topic]
if !ok || lastMessageDate.Before(staleTopicsCutOff) {
staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. It doesn't contain any messages", topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
continue
}
destinationId, mode, tableName, err := ParseTopicId(topic)
Expand Down

0 comments on commit aa2e571

Please sign in to comment.