Skip to content

Commit

Permalink
bulker: disable consumers for stale topics
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 15, 2024
1 parent dc7a847 commit 3a409df
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
4 changes: 3 additions & 1 deletion bulkerapp/app/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func initApp(t *testing.T, envVars map[string]string) (app *appbase.App[Config],
// Test streams in autocommit and bath mode. Both with good batches and batches with primary key violation error
func TestGoodAndBadStreams(t *testing.T) {
app, kafkaContainer, postgresContainer := initApp(t, map[string]string{"BULKER_MESSAGES_RETRY_COUNT": "0",
"BULKER_BATCH_RUNNER_DEFAULT_PERIOD_SEC": "1"})
"BULKER_TOPIC_MANAGER_REFRESH_PERIOD_SEC": "1",
"BULKER_BATCH_RUNNER_DEFAULT_PERIOD_SEC": "1"})
t.Cleanup(func() {
app.Exit(appbase.SIG_SHUTDOWN_FOR_TESTS)
time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -183,6 +184,7 @@ func TestEventsRetry(t *testing.T) {
"BULKER_BATCH_RUNNER_DEFAULT_RETRY_PERIOD_SEC": "5",
"BULKER_BATCH_RUNNER_DEFAULT_RETRY_BATCH_FRACTION": "1",
"BULKER_MESSAGES_RETRY_BACKOFF_BASE": "0",
"BULKER_TOPIC_MANAGER_REFRESH_PERIOD_SEC": "1",
"BULKER_BATCH_RUNNER_DEFAULT_PERIOD_SEC": "1"})
t.Cleanup(func() {
app.Exit(appbase.SIG_SHUTDOWN_FOR_TESTS)
Expand Down
2 changes: 1 addition & 1 deletion bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (tm *TopicManager) LoadMetadata() {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset > 0 {
if offset.Offset >= 0 {
topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
} else {
topicsLastMessageDates[*tp.Topic] = time.Time{}
Expand Down

0 comments on commit 3a409df

Please sign in to comment.