Skip to content

Commit

Permalink
bulker: disable stale topic logic
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 27, 2024
1 parent fd9a1af commit 260be8e
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
}
lastMessageDate, ok := tm.topicsLastMessageDates[topic]
if ok && (lastMessageDate.IsZero() || lastMessageDate.Before(staleTopicsCutOff)) {
staleTopics.Put(topic)
//staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
continue
//continue
}
destinationId, mode, tableName, err := ParseTopicId(topic)
if err != nil {
Expand Down Expand Up @@ -301,7 +301,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
for topic := range dstTopics {
if staleTopics.Contains(topic) {
destinationId, mode, _, _ := ParseTopicId(topic)
tm.Infof("Removing consumer for stale topic: %s", topic, destinationId)
tm.Infof("Removing consumer for stale topic: %s", topic)
switch mode {
case "stream":
tm.streamConsumers[destinationId] = ExcludeConsumerForTopic(tm.streamConsumers[destinationId], topic, tm.cron)
Expand Down

0 comments on commit 260be8e

Please sign in to comment.