Skip to content

Commit

Permalink
fix: start consuming based on from offset option (#3750)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e authored Dec 16, 2024
1 parent 94497d6 commit a57500b
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 13 deletions.
7 changes: 4 additions & 3 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ func (c *consumer) Begin(ctx context.Context) error {
// set up config
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
switch c.subscriber.FromOffset {
case schema.FromOffsetBeginning, schema.FromOffsetUnspecified:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case schema.FromOffsetLatest:
config.Consumer.Offsets.Initial = sarama.OffsetNewest
}
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = true
groupID := kafkaConsumerGroupID(c.moduleName, c.verb)
log.FromContext(ctx).Infof("Subscribing to topic %s for %s with offset %v", c.kafkaTopicID(), groupID, config.Consumer.Offsets.Initial)

group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, kafkaConsumerGroupID(c.moduleName, c.verb), config)
group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err)
}
Expand Down
15 changes: 10 additions & 5 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ func TestPubSub(t *testing.T) {
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),

// publish half the events before subscriber is deployed
in.Repeat(calls/2, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),

in.Deploy("subscriber"),

// publish events
in.Repeat(calls, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),
// publish the other half of the events after subscriber is deployed
in.Repeat(calls/2, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),

in.Sleep(time.Second*4),

// check that there are the right amount of successful async calls
// check that there are the right amount of consumed events, depending on "from" offset option
checkConsumed("subscriber", "consume", true, events, optional.None[string]()),
checkConsumed("subscriber", "consumeFromLatest", true, events/2, optional.None[string]()),
)
}

Expand Down Expand Up @@ -75,9 +80,9 @@ func TestExternalPublishRuntimeCheck(t *testing.T) {
func checkConsumed(module, verb string, success bool, count int, needle optional.Option[string]) in.Action {
return func(t testing.TB, ic in.TestContext) {
if needle, ok := needle.Get(); ok {
in.Infof("Checking for %v call(s) with needle %v", count, needle)
in.Infof("Checking for %v call(s) to %s.%s with needle %v", count, module, verb, needle)
} else {
in.Infof("Checking for %v call(s)", count)
in.Infof("Checking for %v call(s) to %s.%s", count, module, verb)
}
resp, err := ic.Timeline.GetTimeline(ic.Context, connect.NewRequest(&timelinepb.GetTimelineRequest{
Limit: 100000,
Expand Down
10 changes: 9 additions & 1 deletion backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,16 @@ import (
// Import the FTL SDK.
)

type PartitionMapper struct{}

var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{}

func (PartitionMapper) PartitionKey(event PubSubEvent) string {
return event.Time.String()
}

//ftl:export
type TestTopic = ftl.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]
type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type PubSubEvent struct {
Time time.Time
Expand Down
9 changes: 8 additions & 1 deletion backend/runner/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ import (
//ftl:verb
//ftl:subscribe publisher.testTopic from=beginning
func Consume(ctx context.Context, req publisher.PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time)
ftl.LoggerFromContext(ctx).Infof("Consume: %v", req.Time)
return nil
}

//ftl:verb
//ftl:subscribe publisher.testTopic from=latest
func ConsumeFromLatest(ctx context.Context, req publisher.PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("ConsumeFromLatest: %v", req.Time)
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions backend/runner/pubsub/testdata/go/subscriber/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@

public class Subscriber {

private static final AtomicInteger catchCount = new AtomicInteger();

@Subscription(topic = TestTopicTopic.class, from = FromOffset.BEGINNING)
void consume(PubSubEvent event) throws Exception {
Log.infof("Subscriber is consuming %s", event.getTime());
}

@Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING)
@Subscription(topic = TestTopicTopic.class, from = FromOffset.LATEST)
void consumeFromLatest(PubSubEvent event) throws Exception {
}

ConsumeFromLatest
@Retry(count = 2, minBackoff = "1s", maxBackoff = "1s")
public void consumeButFailAndRetry(PubSubEvent event) {
throw new RuntimeException("always error: event " + event.getTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ class Subscriber {
Log.infof("Subscriber is consuming %s", event.time)
}

@Subscription(topic = TestTopicTopic::class, from = FromOffset.LATEST)
fun consumeFromLatest(event: PubSubEvent) {
Log.infof("Subscriber is consuming %s", event.time)
}

@Subscription(topic = Topic2Topic::class, from = FromOffset.BEGINNING)
@Retry(count = 2, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catch")
fun consumeButFailAndRetry(event: PubSubEvent) {
Expand Down

0 comments on commit a57500b

Please sign in to comment.