Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the consumer to commit an offset with metadata #1067

Merged
merged 7 commits into from
Oct 27, 2023

Conversation

flavienbert
Copy link
Contributor

@flavienbert flavienbert commented Oct 4, 2023

Fixes #1066

@erikvanoosten
Copy link
Collaborator

@flavienbert I have in my plans to merge commits into a single commit. Is that still possible when there is metadata attached to the commit?

@flavienbert
Copy link
Contributor Author

@erikvanoosten yes I think. The apache kafka consumer client take an OffsetAndMetadata so it should be OK.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Oct 4, 2023

@erikvanoosten yes I think. The apache kafka consumer client take an OffsetAndMetadata so it should be OK.

Ah I see. When we merge commits, per partition we keep the OffsetAndMetadata with the highest offset. It means we'll loose the metadata of earlier commits, but that is actually okay; those commits are no longer needed because they have been superseded by a newer one.

I'll study this PR a bit more (especially for breaking changes) but so far it looks good to me.

@flavienbert
Copy link
Contributor Author

flavienbert commented Oct 4, 2023

One point not handle by this PR: the metadata from the CommittableRecord.offset will be always empty:

This PR only able to publish metadata from consumer stream, not retrieve it when you consume a stream. Is it something you want to add too @erikvanoosten?

I don't really know how to handle it because the ConsumerRecord from org.apache.kafka.clients.consumer doesn't expose the metadata :

Creates a record to be received from a specified topic and partition (provided for compatibility with Kafka 0.9 before the message format supported timestamps and before serialized metadata were exposed).

I believe this feature is not possible. It seems we can only get metadata with the Admin client

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Oct 4, 2023

This PR only able to publish metadata from consumer stream, not retrieve it when you consume a stream. Is it something you want to add too @erikvanoosten?

We prefer smaller PRs. So another issue/pr can be created for this.

@flavienbert
Copy link
Contributor Author

flavienbert commented Oct 4, 2023

As I mentioned in the previous comment, I think it's not possible to retrieve the metadata from a ZStream with the Consumer because the API ConsumerRecord doesn't expose the metadata.

@erikvanoosten
Copy link
Collaborator

As I mentioned in the previous comment, I think it's not possible to retrieve the metadata from a ZStream with the Consumer because the API ConsumerRecord doesn't expose the metadata.

Makes sense, the existence of records and commits are unrelated.

@flavienbert
Copy link
Contributor Author

flavienbert commented Oct 4, 2023

Maybe we should put this method private:
private[consumer] def metadata: Option[String] in the Offset trait ?
To make sure a user doesn't try to get the metadata of an offset from the ConsumerRecord. Because It will always return None anyway otherwise.

@erikvanoosten
Copy link
Collaborator

Maybe we should put this method private: private[consumer] def metadata: Option[String] in the Offset trait ? To make sure a user doesn't try to get the metadata of an offset from the ConsumerRecord. Because It will always return None anyway otherwise.

Are you referring to a CommittableRecord?

I don't like that because it will mean that you can set the metadata but you can't read it back.

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done yet, but I wanted to submit these comments already.

@@ -349,6 +349,30 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offsets.values.map(_.map(_.offset)))(forall(isSome(equalTo(nrMessages.toLong / nrPartitions))))
},
test("commits an offset with metadata") {
for {
// Produce messages on several partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not correct. Please remove it.

@@ -125,7 +125,9 @@ private[consumer] final class Runloop private (
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val offsets = cmd.offsets.map { case (tp, offset) =>
tp -> new OffsetAndMetadata(offset.offset + 1, offset.metadata)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be on the safe side, we should retain the leader epoch as well (even though we don't support it at the moment).
Also, I keep thinking that we could hide this increaseOffsetByOne operation to a class extension of OffsetAndMetadata. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps we can move the +1 operation to where the OffsetAndMetadata is created... (I guess inside asJavaOffsetAndMetadata)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of disagree with you @erikvanoosten here. See #1067 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guizmaii I have no strong opinion on this yet, I was just exploring possibilities. Sorry for that @flavienbert, this aspect is totally not relevant for this PR. If we want this change (probably not) we can do it in another PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flavienbert Please don't forget what this thread started with:

To be on the safe side, we should retain the leader epoch as well (even though we don't support it at the moment).

@flavienbert
Copy link
Contributor Author

yes I am referring to it:

It's true you can't read it back but you shouldn't need to read it. The purpose is to publish an offset with metadata.

    Consumer
      .plainStream(Subscription.topics("random"), Serde.long, Serde.string)
      .tap(r => Console.printLine(r.value))
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .mapZIO(_.commit)
      .drain

In this code for sample a user can believe he can retrieve the metadata offset of the CommittableRecord using _.offset.metadata but in fact it will always return None. We should only able to use _.offset.withMetadata("metadata") from my point of view

@erikvanoosten
Copy link
Collaborator

Okay, I am changing my mind 🙂 The offset method from CommitableRecord should indeed not have a metadata field because that doesn't make sense. Unfortunately, that also means that the withMetadata method (the basis of this PR) doesn't make sense. (Off topic, there is more in Offset that doesn't make sense. In particular, there shouldn't be a commit method in it. @guizmaii is trying to change this part of zio-kafka for a good reason.)

What about adding methods commitWithMetadata and commitOrRetryWithMetadata?

@flavienbert
Copy link
Contributor Author

Agree with you commit method shouldn't be here. I am not sure commitWithMetadata could work because we usually use commit after .aggregateAsync(Consumer.offsetBatches) this means we call commit on the most recent offset.

But in my case I need to set the metadata for each offset, and then the most recent offset is commit with the metadata set earlier for sample:

     ...
     offsetOpt  <- offsetState match {
                                 case Committable =>
                                    ZIO.some(record.offset)

                                 case CommittableWithMetadata =>
                                   StorageService
                                       .put(state, record.offset.topic, record.offset.partition)
                                       .map(versionId => Some(record.offset.withMetadata(versionId)))

                                 case NotCommittable => ZIO.none
                               }
        } yield offsetOpt
      }
      .map(_.toSeq)
      .flattenIterables
      .aggregateAsync(Consumer.offsetBatches)
      .mapZIO(_.commit)

And what about having this model:

sealed trait Offset {

  type Self <: Offset

  def topic: String
  def partition: Int
  def offset: Long
  def consumerGroupMetadata: Option[ConsumerGroupMetadata]

  /**
   * Converts this instance of Offset to a CommittableOffset (increment the offset)
   */
  def toCommittableOffset: CommittableOffset

  final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition)
}

sealed trait CommittableOffset extends Offset {

  type Self <: CommittableOffset

  def commit: Task[Unit]
  def batch: OffsetBatch
  def addMetadata(metadata: String): Self
  def metadata: Option[String]

  private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull)

  /**
   * Attempts to commit and retries according to the given policy when the commit fails with a
   * RetriableCommitFailedException
   */
  final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] =
    Offset.commitOrRetry(commit, policy)
}

With this model, we could use Offset trait for the offset get from the CommitableRecord and then if you want to commit or add metadata you should convert this Offset to a CommitableOffset which will be able to commit. And we could introduce an implicit conversion from Offset to CommitableOffset to avoid breaking changes.

@erikvanoosten
Copy link
Collaborator

Okay, I understand your use case,
And I like your thinking. CommittableOffset is indeed a reasonable way to model this. This is a breaking change though, even with an implicit conversion it will not be binary compatible (we don't really have that as a goal so it could still be fine).
Since not many people use metadata for commits, IMHO source compatibility is required.

@guizmaii WDYT?

@flavienbert
Copy link
Contributor Author

I published a version with CommittableOffset. To be honest I like this new version. I think it's much more understandable. There is different models between offsets get from the consumer and offsets that you want to commit.

However, there is one breaking change: you have to explicitly convert ReadOnlyOffset to CommitableOffsetin order to commit offsets: offset.nextCommittableOffset.commit. But I prefer explicit conversion this way users can understand how it works internally.

@erikvanoosten let me know your point of view.

@flavienbert
Copy link
Contributor Author

@erikvanoosten any updates? Could be great to have the metadata commitment feature merged.

@erikvanoosten
Copy link
Collaborator

@erikvanoosten any updates? Could be great to have the metadata commitment feature merged.

I have asked @svroonland and @guizmaii to also take a look at this PR.

Since we want to change this API in near future, and changing this many times is not nice for our users, for me source compatibility would be essential. Perhaps the other committers see this differently...

newOffsets ++= offsets
otherOffsets.offsets.foreach { case (tp, offset) =>
val existing = offsets.getOrElse(tp, -1L)
if (existing < offset)
val existing = offsets.getOrElse(tp, new OffsetAndMetadata(-1L))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want to instantiate more than one OffsetAndMetadata(-1L)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@erikvanoosten erikvanoosten Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is important. Using a constant means less memory locality (making the L2 useless) while creating a new instance prevents inter-cpu coordination.
Instead I would propose this logic is rewritten such that creating a dummy OffsetAndMetadata is not even necessary.

Copy link
Member

@guizmaii guizmaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments

I think there is still some refinement work needed to make this PR mergeable

@flavienbert flavienbert reopened this Oct 12, 2023
@flavienbert
Copy link
Contributor Author

I rebased my branch, But the CI fail for sbt ciGenerateGithubWorkflow but it seems to be good

.github/workflows/ci.yml Outdated Show resolved Hide resolved
Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small suggestions. Otherwise LGTM.

Copy link
Member

@guizmaii guizmaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guizmaii
Copy link
Member

@erikvanoosten You merge once the CI pass :)

@guizmaii
Copy link
Member

@flavienbert Thanks for your work :)

@erikvanoosten
Copy link
Collaborator

@flavienbert Could you run sbt fmt please?

@erikvanoosten erikvanoosten merged commit 35fdcc9 into zio:master Oct 27, 2023
13 checks passed
@erikvanoosten
Copy link
Collaborator

Thanks for you patience @flavienbert ! This PR is now available in https://github.com/zio/zio-kafka/releases/tag/v2.6.0 🥳

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consumer should be able to commit offset with metadata
4 participants