Skip to content

Commit

Permalink
bulker: disable consumers for stale topics
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 15, 2024
1 parent 8e6e92d commit dc7a847
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 34 deletions.
3 changes: 1 addition & 2 deletions bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ type BatchFunction func(destination *Destination, batchNum, batchSize, retryBatc
type ShouldConsumeFunction func(committedOffset, highOffset int64) bool

type BatchConsumer interface {
Consumer
RunJob()
ConsumeAll() (consumed BatchCounters, err error)
Retire()
BatchPeriodSec() int
UpdateBatchPeriod(batchPeriodSec int)
TopicId() string
}

type AbstractBatchConsumer struct {
Expand Down
5 changes: 5 additions & 0 deletions bulkerapp/app/abstract_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type AbstractConsumer struct {
repository *Repository
}

type Consumer interface {
Retire()
TopicId() string
}

func NewAbstractConsumer(config *Config, repository *Repository, topicId string, bulkerProducer *Producer) *AbstractConsumer {
return &AbstractConsumer{
Service: appbase.NewServiceBase(topicId),
Expand Down
2 changes: 2 additions & 0 deletions bulkerapp/app/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestGoodAndBadStreams(t *testing.T) {
"BULKER_BATCH_RUNNER_DEFAULT_PERIOD_SEC": "1"})
t.Cleanup(func() {
app.Exit(appbase.SIG_SHUTDOWN_FOR_TESTS)
time.Sleep(5 * time.Second)
if postgresContainer != nil {
_ = postgresContainer.Close()
}
Expand Down Expand Up @@ -185,6 +186,7 @@ func TestEventsRetry(t *testing.T) {
"BULKER_BATCH_RUNNER_DEFAULT_PERIOD_SEC": "1"})
t.Cleanup(func() {
app.Exit(appbase.SIG_SHUTDOWN_FOR_TESTS)
time.Sleep(5 * time.Second)
if postgresContainer != nil {
_ = postgresContainer.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerapp/app/retry_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (rc *RetryConsumer) shouldConsumeFuncImpl(committedOffset, highOffset int64
message, err := rc.consumer.Load().ReadMessage(rc.waitForMessages)
if err != nil {
kafkaErr := err.(kafka.Error)
if kafkaErr.Code() == kafka.ErrTimedOut && currentOffset == highOffset-1 {
if kafkaErr.Code() == kafka.ErrTimedOut {
rc.Debugf("Timeout. No messages to retry. %d-%d", committedOffset, highOffset)
return false
}
Expand Down
32 changes: 23 additions & 9 deletions bulkerapp/app/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

const streamConsumerMessageWaitTimeout = 5 * time.Second

type StreamConsumer struct {
type StreamConsumerImpl struct {
*AbstractConsumer
repository *Repository
destination *Destination
Expand All @@ -36,7 +36,12 @@ type StreamConsumer struct {
closed chan struct{}
}

func NewStreamConsumer(repository *Repository, destination *Destination, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService eventslog.EventsLogService) (*StreamConsumer, error) {
type StreamConsumer interface {
Consumer
UpdateDestination(destination *Destination) error
}

func NewStreamConsumer(repository *Repository, destination *Destination, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService eventslog.EventsLogService) (*StreamConsumerImpl, error) {
abstract := NewAbstractConsumer(config, repository, topicId, bulkerProducer)
_, _, tableName, err := ParseTopicId(topicId)
if err != nil {
Expand Down Expand Up @@ -73,7 +78,7 @@ func NewStreamConsumer(repository *Repository, destination *Destination, topicId
// return nil, fmt.Errorf("[%s] Destination not found", destinationId)
//}

sc := &StreamConsumer{
sc := &StreamConsumerImpl{
AbstractConsumer: abstract,
repository: repository,
destination: destination,
Expand Down Expand Up @@ -127,7 +132,7 @@ func (sw *StreamWrapper) Complete(ctx context.Context) (bulker.State, error) {
return sw.stream.Complete(ctx)
}

func (sc *StreamConsumer) restartConsumer() {
func (sc *StreamConsumerImpl) restartConsumer() {
sc.Infof("Restarting consumer")
go func(c *kafka.Consumer) {
err := c.Close()
Expand Down Expand Up @@ -162,7 +167,7 @@ func (sc *StreamConsumer) restartConsumer() {
}

// start consuming messages from kafka
func (sc *StreamConsumer) start() {
func (sc *StreamConsumerImpl) start() {
sc.Infof("Starting stream consumer for topic. Ver: %s", sc.destination.config.UpdatedAt)
safego.RunWithRestart(func() {
var err error
Expand Down Expand Up @@ -264,17 +269,26 @@ func (sc *StreamConsumer) start() {
})
}

func (sc *StreamConsumerImpl) TopicId() string {
return sc.topicId
}

// Close consumer
func (sc *StreamConsumer) Close() error {
func (sc *StreamConsumerImpl) Retire() {
select {
case <-sc.closed:
return
default:
}
sc.Infof("Closing stream consumer. Ver: %s", sc.destination.config.UpdatedAt)
close(sc.closed)
sc.destination.Release()
//TODO: wait for closing?
return nil
return
}

// UpdateDestination
func (sc *StreamConsumer) UpdateDestination(destination *Destination) error {
func (sc *StreamConsumerImpl) UpdateDestination(destination *Destination) error {
sc.Infof("[Updating stream consumer for topic. Ver: %s", sc.destination.config.UpdatedAt)

//create new stream
Expand All @@ -287,7 +301,7 @@ func (sc *StreamConsumer) UpdateDestination(destination *Destination) error {
return nil
}

func (sc *StreamConsumer) postEventsLog(message []byte, representation any, processedObject types.Object, processedErr error) {
func (sc *StreamConsumerImpl) postEventsLog(message []byte, representation any, processedObject types.Object, processedErr error) {
object := map[string]any{
"original": string(message),
"status": "SUCCESS",
Expand Down
101 changes: 82 additions & 19 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jitsucom/bulker/jitsubase/safego"
"github.com/jitsucom/bulker/jitsubase/utils"
"regexp"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -48,12 +49,13 @@ type TopicManager struct {
// consumedTopics by destinationId. Consumed topics are topics that have consumer started
consumedTopics map[string]utils.Set[string]
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]

//batch consumers by destinationId
batchConsumers map[string][]BatchConsumer
retryConsumers map[string][]BatchConsumer
streamConsumers map[string][]*StreamConsumer
streamConsumers map[string][]StreamConsumer

batchProducer *Producer
streamProducer *Producer
Expand Down Expand Up @@ -83,7 +85,7 @@ func NewTopicManager(appContext *Context) (*TopicManager, error) {
eventsLogService: appContext.eventsLogService,
batchConsumers: make(map[string][]BatchConsumer),
retryConsumers: make(map[string][]BatchConsumer),
streamConsumers: make(map[string][]*StreamConsumer),
streamConsumers: make(map[string][]StreamConsumer),
abandonedTopics: utils.NewSet[string](),
allTopics: utils.NewSet[string](),
closed: make(chan struct{}),
Expand Down Expand Up @@ -129,39 +131,71 @@ func (tm *TopicManager) Start() {
}

func (tm *TopicManager) LoadMetadata() {
topicsLastMessageDates := make(map[string]time.Time)
metadata, err := tm.kaftaAdminClient.GetMetadata(nil, true, tm.config.KafkaAdminMetadataTimeoutMs)
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
t := topic.Topic
if !strings.HasPrefix(t, "__") {
for _, partition := range topic.Partitions {
topicPartitionOffsets[kafka.TopicPartition{Topic: &t, Partition: partition.ID}] = kafka.MaxTimestampOffsetSpec
}
}
}
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset > 0 {
topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
} else {
topicsLastMessageDates[*tp.Topic] = time.Time{}
}
}
}
tm.Infof("Got topic offsets in %v", time.Since(start))

if err != nil {
metrics.TopicManagerError("load_metadata_error").Inc()
tm.Errorf("Error getting metadata: %v", err)
} else {
tm.processMetadata(metadata)
tm.processMetadata(metadata, topicsLastMessageDates)
}
}

func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) {
func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDates map[string]time.Time) {
tm.Lock()
defer tm.Unlock()
start := time.Now()
staleTopicsCutOff := time.Now().Add(-1 * time.Duration(tm.config.KafkaTopicRetentionHours) * time.Hour)
var abandonedTopicsCount float64
var otherTopicsCount float64
topicsCountByMode := make(map[string]float64)
topicsErrorsByMode := make(map[string]float64)

allTopics := utils.NewSet[string]()
staleTopics := utils.NewSet[string]()

for topic, topicMetadata := range metadata.Topics {
allTopics.Put(topic)
if tm.abandonedTopics.Contains(topic) {
abandonedTopicsCount++
continue
}
lastMessageDate, ok := lastMessageDates[topic]
if ok && (lastMessageDate.IsZero() || lastMessageDate.Before(staleTopicsCutOff)) {
staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
continue
}
destinationId, mode, tableName, err := ParseTopicId(topic)
if err != nil {
otherTopicsCount++
continue
}
var dstTopics utils.Set[string]
ok := false
if dstTopics, ok = tm.consumedTopics[destinationId]; !ok {
dstTopics = utils.NewSet[string]()
tm.consumedTopics[destinationId] = dstTopics
Expand All @@ -170,7 +204,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) {
tm.Debugf("Found topic %s for destination %s and table %s", topic, destinationId, tableName)
destination := tm.repository.GetDestination(destinationId)
if destination == nil {
tm.Warnf("No destination found for topic: %s", topic)
tm.Debugf("No destination found for topic: %s", topic)
tm.abandonedTopics.Put(topic)
continue
}
Expand Down Expand Up @@ -252,14 +286,28 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) {
dstTopics, hasTopics := tm.consumedTopics[destination.Id()]
for mode, config := range tm.requiredDestinationTopics {
topicId, _ := MakeTopicId(destination.Id(), mode, allTablesToken, false)
if !hasTopics || !dstTopics.Contains(topicId) {
if (!hasTopics || !dstTopics.Contains(topicId)) && !staleTopics.Contains(topicId) {
//tm.Debugf("Creating topic %s for destination %s", topicId, destination.Id())
err := tm.createDestinationTopic(topicId, config)
if err != nil {
tm.Errorf("Failed to create topic %s for destination %s: %v", topicId, destination.Id(), err)
}
}
}
for topic := range dstTopics {
if staleTopics.Contains(topic) {
destinationId, mode, _, _ := ParseTopicId(topic)
tm.Infof("Removing consumer for stale topic: %s", topic, destinationId)
switch mode {
case "stream":
tm.streamConsumers[destinationId] = ExcludeConsumerForTopic(tm.streamConsumers[destinationId], topic, tm.cron)
case "batch":
tm.batchConsumers[destinationId] = ExcludeConsumerForTopic(tm.batchConsumers[destinationId], topic, tm.cron)
case retryTopicMode:
tm.retryConsumers[destinationId] = ExcludeConsumerForTopic(tm.retryConsumers[destinationId], topic, tm.cron)
}
}
}
if destination.config.Special == "backup" || destination.config.Special == "metrics" {
// create predefined tables for special kind of destinations: backup and metrics
tables := []string{destination.config.Special}
Expand All @@ -279,6 +327,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) {
}
}
tm.allTopics = allTopics
tm.staleTopics = staleTopics
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions,
map[string]string{
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
Expand Down Expand Up @@ -336,11 +385,30 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) {
metrics.TopicManagerDestinations(mode, "error").Set(count)
}
metrics.TopicManagerAbandonedTopics.Set(abandonedTopicsCount)
metrics.TopicManagerOtherTopics.Set(otherTopicsCount)
metrics.TopicManagerAllTopics.Set(float64(allTopics.Size()))
metrics.TopicManagerStaleTopics.Set(float64(staleTopics.Size()))
tm.Debugf("[topic-manager] Refreshed metadata in %v", time.Since(start))
tm.ready = true
}

func ExcludeConsumerForTopic[T Consumer](consumers []T, topicId string, cron *Cron) []T {
newConsumers := make([]T, 0, len(consumers))
for _, consumer := range consumers {
if consumer.TopicId() != topicId {
newConsumers = append(newConsumers, consumer)
} else {
//if consumer instance of BatchConsumer
batchConsumer, ok := any(consumer).(BatchConsumer)
if ok {
cron.RemoveBatchConsumer(batchConsumer)
}
consumer.Retire()
}

}
return newConsumers
}

func (tm *TopicManager) changeListener(changes RepositoryChange) {
for _, changedDst := range changes.ChangedDestinations {
tm.Lock()
Expand Down Expand Up @@ -376,34 +444,30 @@ func (tm *TopicManager) changeListener(changes RepositoryChange) {
err := consumer.UpdateDestination(changedDst)
if err != nil {
metrics.TopicManagerError("update_stream_consumer_error").Inc()
tm.SystemErrorf("Failed to re-create consumer for destination topic: %s: %v", consumer.topicId, err)
tm.SystemErrorf("Failed to re-create consumer for destination topic: %s: %v", consumer.TopicId(), err)
continue
}
}
tm.Unlock()
}
for _, deletedDstId := range changes.RemovedDestinationIds {
tm.Lock()
for _, consumer := range tm.batchConsumers[deletedDstId] {
tm.Lock()
tm.cron.RemoveBatchConsumer(consumer)
consumer.Retire()
delete(tm.batchConsumers, deletedDstId)
tm.Unlock()
}
for _, consumer := range tm.retryConsumers[deletedDstId] {
tm.Lock()
tm.cron.RemoveBatchConsumer(consumer)
consumer.Retire()
delete(tm.retryConsumers, deletedDstId)
tm.Unlock()
}
for _, consumer := range tm.streamConsumers[deletedDstId] {
tm.Lock()
_ = consumer.Close()
consumer.Retire()
delete(tm.streamConsumers, deletedDstId)
tm.Unlock()
}
delete(tm.consumedTopics, deletedDstId)
tm.Unlock()
}
if len(changes.AddedDestinations) > 0 {
tm.Lock()
Expand Down Expand Up @@ -443,8 +507,7 @@ func (tm *TopicManager) IsReady() bool {
func (tm *TopicManager) EnsureDestinationTopic(destination *Destination, topicId string) error {
tm.Lock()
defer tm.Unlock()
set := tm.consumedTopics[destination.Id()]
if !set.Contains(topicId) {
if !tm.allTopics.Contains(topicId) {
return tm.createDestinationTopic(topicId, nil)
}
return nil
Expand Down Expand Up @@ -600,7 +663,7 @@ func (tm *TopicManager) Close() error {
}
for _, consumers := range tm.streamConsumers {
for _, consumer := range consumers {
consumer.Close()
consumer.Retire()
}
}
return nil
Expand Down
Loading

0 comments on commit dc7a847

Please sign in to comment.