Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'produce' aliases in the Producer trait, making producer co… #1048

Closed
wants to merge 4 commits into from

Conversation

soujiro32167
Copy link

@soujiro32167 soujiro32167 commented Sep 12, 2023

Implement 'produce' aliases in the Producer trait, making producer composition easier

@CLAassistant
Copy link

CLAassistant commented Sep 12, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Member

@guizmaii guizmaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this make composition easier?

@soujiro32167
Copy link
Author

Consider a simple producer:

trait Producer:
  def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]]

I want to create a producer that, for each record, applies a function that modifies the record, and may tack on an effect, like logging each send call:

trait Producer:
  def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]]
      
     
def modifyHeadersAndLoggingProducer(producer: Producer): Producer = 
   new Producer:
      def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] = 
           producer.produce(record.addHeader("foo", "bar"))
                .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent")
            )

Without this MR, to compose producers, you'd have to implement all the produce/async/serialize methods all over again.

Even without this use case, all the methods other than produceChunkAsyncWithFailures can be implemented in terms of it, so why leave them unimplemented?

@soujiro32167
Copy link
Author

@soujiro32167
Copy link
Author

@yisraelU fyi

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. Thanks!

@guizmaii
Copy link
Member

guizmaii commented Sep 14, 2023

I don't think that's how you should do that

Here's how I'd do it:

package my.example

import zio.kafka.Producer as ZioProducer

trait MyProducer {
  def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]]
}

object MyProducer {
  val live: URLayer[ZioProducer, MyProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
      } yield new MyProducerLive(zioProducer)
    } 
}

final class MyProducerLive(zioProducer: ZioProducer) extends MyProducer {
  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] = {
    zioProducer
      .produce(record.addHeader("foo", "bar"))
      .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent"))
  }
}

You should prefer composition and should not depend on the interface that we're exposing.

@soujiro32167
Copy link
Author

soujiro32167 commented Sep 14, 2023

@guizmaii this way of composition makes sense, but has some shortcomings:

  1. What I'm after is a middleware, similar to what zio-http has.
  2. The end interface I'm after is the same interface as Producer. It may need produceAsync or produceChunkAsync and all their variations. That is because my end user doesn't care whether the producer is instrumented or not - they just need a Kafka producer
  3. Imagine integrating an instrumentation library into your code base. The consumer of a Producer would have to explicitly ask for MyProducer if they want an instrumented version, instead of handling instrumentation at the layer level

@guizmaii
Copy link
Member

guizmaii commented Sep 15, 2023

  1. About your point 2, here's how I'd do:
package my example

import zio.kafka.Producer as ZioProducer

// Because you extends the zio-kafka Producer, your implementation will have to implement all the zio-kafka Producer interface
trait MyProducer extends ZioProducer

object MyProducer {
  val live: URLayer[ZioProducer, ZioProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
      } yield new MyProducerLive(zioProducer)
    } 
}

final class MyProducerLive(zioProducer: ZioProducer) extends MyProducer {
 
 // example of methods where you do something around the producing
  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    zioProducer
      .produce(record.addHeader("foo", "bar"))
      .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent"))
  
  // example of methods where you do nothing around the producing, just call the zioProducer method
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    zioProducer.produceAsync(record)
    
  ... // all the other methods of the interface 
}

If we merge your PR, you'll not be able to do this anymore or you'll have to copy the code you moved from the ProducerLive to the Producer trait into your code, which is not what you want/what you should have to care about as a zio-kafka user.
So the code will end up being less composable than it is today.

  1. About your point 3, here is how I implement manual instrumentation of my services:
package my.example

import zio.telemetry.opentelemetry.Tracing

trait MyService {
  def myUsefulMethod(...): Task[Unit]
}

object MyService {
  val live: URLayer[... & Tracing, MyService] =
    for {
      ...     <- ...
      tracing <- ZIO.environment[Tracing]
      live    = new MyServiceLive(...)
      traced  = new MyServiceTraced(tracing)(live)
    } yield traced
}

final class MyServiceLive(...) extends MyService {

  override def myUsefulMethod(...): Task[Unit] = ...

}

// Copilot is very good at writing this boring code automatically BTW
final class MyServiceTraced(tracing: ZEnvironment[Tracing])(delegator: MyService) extends MyService {
  import zio.telemetry.opentelemetry.TracingSyntax.*

  override def myUsefulMethod(...): Task[Unit] = 
    delegator
      .myUsefulMethod(...)
      .span("MyService::myUsefulMethod")
      .provideEnv(tracing)
      
}

With your Producer needs, that'd give:

package my.example

import zio.kafka.Producer as ZioProducer
import zio.telemetry.opentelemetry.Tracing

object MyTunedProducer {
  val live: URLayer[Tracing & ZioProducer, ZioProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
        tracing     <- ZIO.environment[Tracing]
        headers     = new HeaderAddingProducer(producer)
        logged      = new LoggedProducer(headers)
        traced      = new MyTracedProducerLive(tracing)(logged)
      } yield traced
    }
}

final class HeaderAddingProducer(delegator: ZioProducer) extends ZioProducer {

  private def addHeaders([re](record: ProducerRecord[K, V])) = record.addHeader("foo", "bar")

  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produce(addHeaders(record))

  
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produceAsync(addHeaders(record))

  ...
}

final class LoggedProducer(delegator: ZioProducer) extends ZioProducer {

  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
     .produce(record)
     .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent"))

  // not logged
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator.produceAsync(record)

  ...
}

final class MyTracedProducerLive(tracing: ZEnvironment[Tracing])(delegator: ZioProducer) extends ZioProducer {
  import zio.telemetry.opentelemetry.TracingSyntax.*
  
  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produce(record)
      .span("Producer::produce")
      .provideEnv(produce)
  
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produceAsync(record)
      .span("Producer::produceAsync")
      .provideEnv(produce)
    
  ... 
}

@soujiro32167
Copy link
Author

final class MyServiceTraced is exactly what I'm trying to avoid: my service should not care if its Kafka producer is traced or not.

Here is how my current code looks like:

object MyTunedProducer {
  val live: URLayer[Tracing & ZioProducer, ZioProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
        tracing     <- ZIO.environment[Tracing]
        headers     = new HeaderAddingProducer(producer)
        logged      = new LoggedProducer(headers)
        traced      = new MyTracedProducerLive(tracing)(logged)
      } yield traced
    }
}

trait MyService:
  def foo: UIO[Unit]


case class MyServiceLive(producer: Producer) extends MyService:
  def foo: UIO[Unit] = producer.produce(...)

object MyServiceLive:
  val live: ZLayer[Producer, MyService] = ZLayer.fromFunction(MyServiceLive(_))

Only in Main does the tracing come out:

object Main extends ZIOAppDefault:
   def run = app.provide(
      MyTunedProducer.live,
      MyServiceLive.live
   )

@soujiro32167
Copy link
Author

Besides composition: Producer has 11 aliases in total to produce. All of them are implemented in terms of produceChunkAsyncWithFailures.

What is the value of letting users composing with Producer re-implement all 11 aliases?

In your example, both HeaderAddingProducer and HeaderAddingProducer would have to re-implement all 11 aliases again, exactly in the same way, taking a risk of deviating from the canonical LiveProducer implementation

@soujiro32167
Copy link
Author

If we merge your PR, you'll not be able to do this anymore or you'll have to copy the code you moved from the ProducerLive to the Producer trait into your code, which is not what you want/what you should have to care about as a zio-kafka user.

Why not? All I'm doing is removing the need to reimplement aliases, which (arguably) have no other useful implementations

  // example of methods where you do nothing around the producing, just call the zioProducer method
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    zioProducer.produceAsync(record)
    
  ... // all the other methods of the interface 

During the implementation of the traced producer, I had to carefully check and make sure if any of the 11 aliases actually did something other than call produceChunkAsyncWithFailures.

If they did, I'd have to instrument them as well. Since they don't, I can now confidently instrument one method and know all other methods are instrumented as well.

@soujiro32167
Copy link
Author

Look how clean the FS2 Producer implementation is.

The interface has 1 produce method, with 4 aliases like produceOne all implemented in terms of it. Thats where I'd like us to be

@guizmaii
Copy link
Member

final class MyServiceTraced is exactly what I'm trying to avoid: my service should not care if its Kafka producer is traced or not.

Your service doesn't know if the passed Producer instance is traced or not:

// my/package/MyService.scala
trait MyService {
  ...
}

object MyService {
  val live: URLayer[ZioProducer, MyService] = ...
}

final class MyServiceLive(producer: ZioProducer) extends MyService {
  ...
}

// my/package/Main.scala
object Main extends zio.App {

  val kafkaProducer: ULayer[ZioProducer] = 
    ZLayer.make[ZioProducer](
      Tracing.live,
      MyTunedProducer.live
    )
  

  def run = 
    ( ... ).provide(kafkaProducer, MyService.live)

}

@soujiro32167
Copy link
Author

Ok, we agree that instrumentation should be transparent 👍

To the matter at hand, and the reason I submitted this PR:

  1. The change from 2.4.0 and 2.4.2 introduced yet another alias to produce

  2. Because the 11 produce aliases remain unimplemented, the instrumentation library I maintain had to be significantly changed to account for the new alias

  3. The change I'm proposing shifts maintenance of aliases to zio-kafka, the only library that needs to know how to implement produce in terms of produceAsyncChunkWithFooOrWhatever

  4. The solution you propose @guizmaii does not solve this issue, since it requires downstream consumers of zio-kafka to know how to implement the aliases.

@erikvanoosten
Copy link
Collaborator

@soujiro32167 I agree with you. I find this solution more elegant.

@soujiro32167
Copy link
Author

@guizmaii i cannot merge this MR with your objection

@svroonland
Copy link
Collaborator

Even with the default implementations of the aliases in Producer like in this PR, they are just defaults, but not guarantees that all implementations of Producer are like that. To deal with that, if you create a wrapper, you'd have to forward all method calls to the delegated Producer anyway. Especially if you start composing wrappers that may have slightly different behavior for each of the produce* methods.

But because of the specifics of our Producer, I'd say that we can safely consider everything but produceChunkAsyncWithFailures to be convenience methods on the interface, not something that may vary between implementations. If we make those methods final defs, we don't have to worry about the above situation.

@erikvanoosten erikvanoosten self-requested a review September 23, 2023 09:01
@erikvanoosten
Copy link
Collaborator

Sorry @soujiro32167, the other maintainers convinced me that this is not a good idea. @svroonland explained it well in the previous comment.

To support your use case, perhaps it is better to introduce something like the Diagnostics trait that is available on consumers.

@soujiro32167
Copy link
Author

they are just defaults, but not guarantees that all implementations of Producer are like that

Absolutely @svroonland, the idea is to create better defaults. If an implementation needs different behaviors for produce*, they are free to re-implement

@soujiro32167
Copy link
Author

soujiro32167 commented Sep 28, 2023

@erikvanoosten could you say more about the Diagnostics trait?

Is that a sub-trait with produce* implemented and produceChunkAsyncWithFailures left unimplemented?
Nvm, found it . Very cool! I'd love something like that for the producer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants