diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index fa4454c9f2..9c68d24b4e 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -75,16 +75,17 @@ func (c *consumer) Begin(ctx context.Context) error { // set up config config := sarama.NewConfig() config.Consumer.Return.Errors = true + config.Consumer.Offsets.AutoCommit.Enable = true switch c.subscriber.FromOffset { case schema.FromOffsetBeginning, schema.FromOffsetUnspecified: config.Consumer.Offsets.Initial = sarama.OffsetOldest case schema.FromOffsetLatest: config.Consumer.Offsets.Initial = sarama.OffsetNewest } - config.Consumer.Offsets.Initial = sarama.OffsetOldest - config.Consumer.Offsets.AutoCommit.Enable = true + groupID := kafkaConsumerGroupID(c.moduleName, c.verb) + log.FromContext(ctx).Infof("Subscribing to topic %s for %s with offset %v", c.kafkaTopicID(), groupID, config.Consumer.Offsets.Initial) - group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, kafkaConsumerGroupID(c.moduleName, c.verb), config) + group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config) if err != nil { return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err) } diff --git a/backend/runner/pubsub/integration_test.go b/backend/runner/pubsub/integration_test.go index 511bafa2f0..a7bfdaf954 100644 --- a/backend/runner/pubsub/integration_test.go +++ b/backend/runner/pubsub/integration_test.go @@ -25,15 +25,20 @@ func TestPubSub(t *testing.T) { in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), + + // publish half the events before subscriber is deployed + in.Repeat(calls/2, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})), + in.Deploy("subscriber"), - // publish events - in.Repeat(calls, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})), + // publish the other half of the events after subscriber is deployed + in.Repeat(calls/2, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})), in.Sleep(time.Second*4), - // check that there are the right amount of successful async calls + // check that there are the right amount of consumed events, depending on "from" offset option checkConsumed("subscriber", "consume", true, events, optional.None[string]()), + checkConsumed("subscriber", "consumeFromLatest", true, events/2, optional.None[string]()), ) } @@ -75,9 +80,9 @@ func TestExternalPublishRuntimeCheck(t *testing.T) { func checkConsumed(module, verb string, success bool, count int, needle optional.Option[string]) in.Action { return func(t testing.TB, ic in.TestContext) { if needle, ok := needle.Get(); ok { - in.Infof("Checking for %v call(s) with needle %v", count, needle) + in.Infof("Checking for %v call(s) to %s.%s with needle %v", count, module, verb, needle) } else { - in.Infof("Checking for %v call(s)", count) + in.Infof("Checking for %v call(s) to %s.%s", count, module, verb) } resp, err := ic.Timeline.GetTimeline(ic.Context, connect.NewRequest(&timelinepb.GetTimelineRequest{ Limit: 100000, diff --git a/backend/runner/pubsub/testdata/go/publisher/publisher.go b/backend/runner/pubsub/testdata/go/publisher/publisher.go index ceb90f14c1..1512ad590d 100644 --- a/backend/runner/pubsub/testdata/go/publisher/publisher.go +++ b/backend/runner/pubsub/testdata/go/publisher/publisher.go @@ -8,8 +8,16 @@ import ( // Import the FTL SDK. ) +type PartitionMapper struct{} + +var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{} + +func (PartitionMapper) PartitionKey(event PubSubEvent) string { + return event.Time.String() +} + //ftl:export -type TestTopic = ftl.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]] +type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper] type PubSubEvent struct { Time time.Time diff --git a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go index 59e29ebbb3..208acf47d7 100644 --- a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go +++ b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go @@ -14,7 +14,14 @@ import ( //ftl:verb //ftl:subscribe publisher.testTopic from=beginning func Consume(ctx context.Context, req publisher.PubSubEvent) error { - ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time) + ftl.LoggerFromContext(ctx).Infof("Consume: %v", req.Time) + return nil +} + +//ftl:verb +//ftl:subscribe publisher.testTopic from=latest +func ConsumeFromLatest(ctx context.Context, req publisher.PubSubEvent) error { + ftl.LoggerFromContext(ctx).Infof("ConsumeFromLatest: %v", req.Time) return nil } diff --git a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go index 1959cc72b5..47a3957bbf 100644 --- a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go +++ b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go @@ -11,6 +11,8 @@ type ConsumeClient func(context.Context, ftlpublisher.PubSubEvent) error type ConsumeButFailAndRetryClient func(context.Context, ftlpublisher.PubSubEvent) error +type ConsumeFromLatestClient func(context.Context, ftlpublisher.PubSubEvent) error + type PublishToExternalModuleClient func(context.Context) error func init() { @@ -21,6 +23,9 @@ func init() { reflection.ProvideResourcesForVerb( ConsumeButFailAndRetry, ), + reflection.ProvideResourcesForVerb( + ConsumeFromLatest, + ), reflection.ProvideResourcesForVerb( PublishToExternalModule, ), diff --git a/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java b/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java index c7f35df6f7..8eea7135e1 100644 --- a/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java +++ b/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java @@ -12,14 +12,16 @@ public class Subscriber { - private static final AtomicInteger catchCount = new AtomicInteger(); - @Subscription(topic = TestTopicTopic.class, from = FromOffset.BEGINNING) void consume(PubSubEvent event) throws Exception { Log.infof("Subscriber is consuming %s", event.getTime()); } - @Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING) + @Subscription(topic = TestTopicTopic.class, from = FromOffset.LATEST) + void consumeFromLatest(PubSubEvent event) throws Exception { + } + + ConsumeFromLatest @Retry(count = 2, minBackoff = "1s", maxBackoff = "1s") public void consumeButFailAndRetry(PubSubEvent event) { throw new RuntimeException("always error: event " + event.getTime()); diff --git a/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt b/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt index 09aa03fe31..b10f8fbaef 100644 --- a/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt +++ b/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt @@ -17,6 +17,11 @@ class Subscriber { Log.infof("Subscriber is consuming %s", event.time) } + @Subscription(topic = TestTopicTopic::class, from = FromOffset.LATEST) + fun consumeFromLatest(event: PubSubEvent) { + Log.infof("Subscriber is consuming %s", event.time) + } + @Subscription(topic = Topic2Topic::class, from = FromOffset.BEGINNING) @Retry(count = 2, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catch") fun consumeButFailAndRetry(event: PubSubEvent) {