Skip to content

Commit

Permalink
fix: use db clock for delaying event consumption (#1727)
Browse files Browse the repository at this point in the history
We have a small delay between when a topic event is published and when
we allow subscriptions to consume it to guarantee no new events will be
inserted in-between.

Previously this check was done by the controller, comparing the db row's
created_at with the controller's clock.
It seems there are cases where the controller thinks that enough time
has passed, but not according to the db clock.
This PR moves these checks to the db layer. This should make our
integration test more reliable as it also relies on comparing two
timestamps generated by the db clock.

Example flaky integration test:
https://github.com/TBD54566975/ftl/actions/runs/9432792904/job/25983007930#step:8:1
  • Loading branch information
matt2e authored Jun 11, 2024
1 parent 4ac5ecc commit ea750d0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
4 changes: 2 additions & 2 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

successful := 0
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor)
if err != nil {
return 0, fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
return 0, fmt.Errorf("could not find event to progress subscription: %w", translatePGError(err))
}
if nextCreatedAt, ok := nextCursor.CreatedAt.Get(); ok && nextCreatedAt.Add(eventConsumptionDelay).After(time.Now()) {
if !nextCursor.Ready {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ WITH cursor AS (
)
SELECT events."key" as event,
events.payload,
events.created_at
events.created_at,
NOW() - events.created_at >= sqlc.arg('consumption_delay')::interval AS ready
FROM topics
LEFT JOIN topic_events as events ON events.topic_id = topics.id
WHERE topics.key = sqlc.arg('topic')::topic_key
Expand Down
19 changes: 13 additions & 6 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ea750d0

Please sign in to comment.