-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory leaks while using fs2-kafka consumer #687
Comments
After some more tests I have found out that problem started at To be even more precise, the reason is actually bump to zio greater than 2.0.10. Any version above affects code (Consumer in my case) so the memory is leaking (amount of |
After even more tests I have found the commit/PR and the exact line of code that has broken cats-interop back then and my example as well. It was one of the PRs of 2.0.11 release.
If I would revert this line (and also the other two dependent lines - 2575 and 5044 of code, otherwise compile fails) my example works just fine. But adding to a working example just the problematic line (not touching those two others 2575 and 5044) will break the code again. So obviously the problem is somehow connected to line 2589. Maybe the actual code that has to be fixed is in cats-interop itself, but I am not able to track it down, and the only thing I was able to find is actually the exact change that caused my example to fail. |
@adamgfraser , It was confusing to me how changing type from TLDR, I have made wrong conclusion. Using more specific type does not help us. Having that said - the memory leak is still there. I am testing older versions of ZIO and interops to find a 100% working solution. I will put any new information here. |
Hey @adamgfraser, just to echo @vkorchik |
Sorry, it doesn't ring a bell for me. |
/bounty $500 |
💎 $500 bounty created by ZIO 🙋 If you start working on this, comment 👉 Add a bounty • Share on socials
|
It seems to me that issue first appeared in And what causes the issue is switching from I have switched back from global scope to parent one, and the heap usage is stable for me (that I am not a PRO and it is hard for me to explain why parent scope vs global scope makes such a difference. Anyway, I will create PR soon, so everyone can check it out and share thoughts, and we can compare the results for both zio 2.0.x as well as 2.1.x. PS: should I use /attempt #687 tag? Options |
Thanks for investigating this! |
@neko-kai Also I am not sure I can tackle with this issue on my own, I am really very novice here, so, if you know how to fix it, please, go on 💪 |
💎 kaizen-solutions added a $500 bounty into the prize pool! |
I have added explicit PS: results are here. |
💡 @vkorchik submitted a pull request that claims the bounty. You can visit your bounty board to reward. |
Hello again, I think it's safe to close this unless you all say otherwise (i.e. if there's anything more to be done in ZIO core?) Here is my reproducer with interop 23.1.0.1 (which has @vkorchik's changes) and zio 2.0.21: import fs2.Stream
import fs2.kafka.*
import zio.{durationInt as _, *}
import zio.interop.catz.*
import zio.logging.backend.SLF4J
import scala.concurrent.duration.*
// Leak
object ReproducerFs2KafkaZIO extends ZIOAppDefault:
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] =
Runtime.removeDefaultLoggers ++ SLF4J.slf4j
val topic = "example-topic"
val consumer: Stream[Task, Unit] =
val settings =
ConsumerSettings[Task, String, String]
.withBootstrapServers("localhost:9092")
.withGroupId("test-consumer-group-id-2")
.withAutoOffsetReset(AutoOffsetReset.Earliest)
KafkaConsumer
.stream[Task, String, String](settings)
.evalTap(_.subscribeTo(topic))
.stream
.mapChunks(_.map(_.offset))
.through(commitBatchWithin[Task](2048, 10.seconds))
val producer: Stream[Task, ProducerResult[String, String]] =
val settings =
ProducerSettings[Task, String, String]
.withBootstrapServers("localhost:9092")
.withBatchSize(128)
.withAcks(Acks.One)
.withEnableIdempotence(false)
.withRetries(128)
val producerPipe = KafkaProducer.pipe[Task, String, String](settings)
Stream
.iterate[Task, Long](0L)(_ + 1L)
.map(n => ProducerRecord(topic = topic, key = s"key: $n", value = s"value: $n"))
.chunkN(n = 128, allowFewer = true)
.map(ProducerRecords[fs2.Chunk, String, String])
.through(producerPipe)
override val run: Task[Unit] =
Stream(producer, consumer).parJoinUnbounded.compile.drain As you can see below, nice and flat, stable heap usage: Prior to this change, I was seeing the following: So this definitely fixed it |
🎉🎈 @vkorchik has been awarded $500 by kaizen-solutions.io! 🎈🎊 |
🎉🎈 @vkorchik has been awarded $500 by ZIO! 🎈🎊 |
I have a ZIO-based app but for kafka I am still using fs2-kafka. As of interops I was using
zio-interop-cats:23.0.0.0
. Everything was fine.After bumping all the libs to the newest versions I got memory leak. After some investigation I thought memory leak could be due to inconsistency of
cats-effect
versions: the newest one is 3.5.3 butzio-interops-cats
are compiled against 3.4.8. Starting from 3.5.3 there are some crucial changes in runtime semantics so I have rollbacked versions a bit so they are the same throughout the whole stack, but memory leak still appears.I have examined different versions of cats/interops/zio but no success.
After even more debugging I see that the consumer side is leaking: once consumer consumes events it does something wrong. A lot of objects of
zio.ZIO$EvaluationStep$UpdateRuntimeFlags$$anon$28
are created, but never released. Number is just growing.Versions that are working fine:
Versions that I am testing against and getting leaks:
Versions that are leaking are many but basically all of them are based on
cats-effect:3.4.x
.This is the link to the repo and to the heap dump from leaking run.
And also picture of heap usage. Once heap is close to the current max size of heap it just breaks through it and this never ends until all heap is used.
Appreciate any help regarding this issue.
The text was updated successfully, but these errors were encountered: