Skip to content

Commit

Permalink
Merge pull request #257 from Chuckame/fix-object-container
Browse files Browse the repository at this point in the history
feat: Allow writing on-demand object-container values instead of only a Sequence
  • Loading branch information
Chuckame authored Sep 12, 2024
2 parents 6e1d82a + fab7125 commit 712292d
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 74 deletions.
7 changes: 5 additions & 2 deletions Migrating-from-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ val dataSequence = sequenceOf(
TheDataClass(...),
)
Files.newOutputStream(Path("/your/file.avro")).use { outputStream ->
AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.encodeToStream(dataSequence, outputStream) {
val writer = AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.openWriter(outputStream) {
codec(CodecFactory.snappyCodec())
// you can also add your metadata !
metadata("myProp", 1234L)
metadata("a string metadata", "hello")
}
writer.use {
dataSequence.forEach { writer.write(it) }
}
}
```

Expand Down
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ fun main() {
## Object container

Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainer` class. This encoding will prefix the binary data with the
full schema to
allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

The main difference with the `AvroObjectContainer` is that you will encode and decode `Sequence` of objects instead of single objects.
full schema to allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

Be aware that consuming the decoded `Sequence` needs to be done **before** closing the stream, or you will get an exception as a sequence is a "hot" source,
which means that if there is millions of objects in the file, all the objects are extracted one-by-one when requested. If you take only the first 10 objects and close the stream,
Expand All @@ -126,12 +123,14 @@ fun main() {
Project("avro4k", "Kotlin"),
)
Files.newOutputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.encodeToStream(valuesToEncode, fileStream, builder)
AvroObjectContainer.openWriter(fileStream).use { writer ->
valuesToEncode.forEach { writer.write(it) }
}
}

// Deserializing objects
Files.newInputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream, builder).forEach {
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream).forEach {
println(it) // Project(name=kotlinx.serialization, language=Kotlin) ...
}
}
Expand Down
12 changes: 8 additions & 4 deletions api/avro4k-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,20 @@ public abstract class com/github/avrokotlin/avro4k/AvroObjectContainer {
public synthetic fun <init> (Lcom/github/avrokotlin/avro4k/Avro;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun decodeFromStream (Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;Lkotlin/jvm/functions/Function1;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeFromStream$default (Lcom/github/avrokotlin/avro4k/AvroObjectContainer;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public final fun encodeToStream (Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Lkotlin/sequences/Sequence;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun encodeToStream$default (Lcom/github/avrokotlin/avro4k/AvroObjectContainer;Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Lkotlin/sequences/Sequence;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public final fun getAvro ()Lcom/github/avrokotlin/avro4k/Avro;
public final fun openWriter (Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;)Lcom/github/avrokotlin/avro4k/AvroObjectContainerWriter;
public static synthetic fun openWriter$default (Lcom/github/avrokotlin/avro4k/AvroObjectContainer;Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/github/avrokotlin/avro4k/AvroObjectContainerWriter;
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainer$Default : com/github/avrokotlin/avro4k/AvroObjectContainer {
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerBuilder {
public fun <init> (Lorg/apache/avro/file/DataFileWriter;)V
public final fun codec (Lorg/apache/avro/file/CodecFactory;)V
public final fun metadata (Ljava/lang/String;J)V
public final fun metadata (Ljava/lang/String;Ljava/lang/String;)V
public final fun metadata (Ljava/lang/String;[B)V
public final fun syncInterval (I)V
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerKt {
Expand All @@ -191,7 +191,6 @@ public final class com/github/avrokotlin/avro4k/AvroObjectContainerKt {
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerMetadataDumper {
public fun <init> (Lorg/apache/avro/file/DataFileStream;)V
public final fun metadata (Ljava/lang/String;)Lcom/github/avrokotlin/avro4k/AvroObjectContainerMetadataDumper$MetadataAccessor;
}

Expand All @@ -202,6 +201,11 @@ public final class com/github/avrokotlin/avro4k/AvroObjectContainerMetadataDumpe
public final fun asString ()Ljava/lang/String;
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerWriter : java/io/Closeable {
public fun close ()V
public final fun writeValue (Ljava/lang/Object;)V
}

public final class com/github/avrokotlin/avro4k/AvroOkioExtensionsKt {
public static final fun decodeFromSource (Lcom/github/avrokotlin/avro4k/Avro;Lorg/apache/avro/Schema;Lkotlinx/serialization/DeserializationStrategy;Lokio/BufferedSource;)Ljava/lang/Object;
public static final fun encodeToSink (Lcom/github/avrokotlin/avro4k/Avro;Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Lokio/BufferedSink;)V
Expand Down
4 changes: 4 additions & 0 deletions benchmark/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-avro:$jacksonVersion")

implementation(project(":"))
}

repositories {
mavenCentral()
}
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,8 @@ spotless {
kotlinGradle {
ktlint()
}
}

repositories {
mavenCentral()
}
26 changes: 26 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[versions]
kotlin = "2.0.0"
jvm = "21"
kotlinxSerialization = "1.7.0"
kotestVersion = "5.9.1"
okio = "3.9.0"
apache-avro = "1.11.3"

[libraries]
apache-avro = { group = "org.apache.avro", name = "avro", version.ref = "apache-avro" }
okio = { group = "com.squareup.okio", name = "okio", version.ref = "okio" }
kotlinx-serialization-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-serialization-core", version.ref = "kotlinxSerialization" }
kotlinx-serialization-json = { group = "org.jetbrains.kotlinx", name = "kotlinx-serialization-json", version.ref = "kotlinxSerialization" }
kotest-core = { group = "io.kotest", name = "kotest-assertions-core", version.ref = "kotestVersion" }
kotest-json = { group = "io.kotest", name = "kotest-assertions-json", version.ref = "kotestVersion" }
kotest-junit5 = { group = "io.kotest", name = "kotest-runner-junit5", version.ref = "kotestVersion" }
kotest-property = { group = "io.kotest", name = "kotest-property", version.ref = "kotestVersion" }

[plugins]
dokka = { id = "org.jetbrains.dokka", version = "1.9.20" }
kotest = { id = "io.kotest", version = "0.4.11" }
github-versions = { id = "com.github.ben-manes.versions", version = "0.51.0" }
nexus-publish = { id = "io.github.gradle-nexus.publish-plugin", version = "2.0.0" }
spotless = { id = "com.diffplug.spotless", version = "6.25.0" }
kover = { id = "org.jetbrains.kotlinx.kover", version = "0.8.1" }
binary-compatibility-validator = { id = "org.jetbrains.kotlinx.binary-compatibility-validator", version = "0.14.0" }
34 changes: 0 additions & 34 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,4 @@ gradleEnterprise {
termsOfServiceAgree = "yes"
}
}
}

dependencyResolutionManagement {
versionCatalogs {
create("libs") {
version("kotlin", "2.0.0")
version("jvm", "21")

library("apache-avro", "org.apache.avro", "avro").version("1.11.3")
library("okio", "com.squareup.okio", "okio").version("3.9.0")

val kotlinxSerialization = "1.7.0"
library("kotlinx-serialization-core", "org.jetbrains.kotlinx", "kotlinx-serialization-core").version(kotlinxSerialization)
library("kotlinx-serialization-json", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version(kotlinxSerialization)

val kotestVersion = "5.9.1"
library("kotest-core", "io.kotest", "kotest-assertions-core").version(kotestVersion)
library("kotest-json", "io.kotest", "kotest-assertions-json").version(kotestVersion)
library("kotest-junit5", "io.kotest", "kotest-runner-junit5").version(kotestVersion)
library("kotest-property", "io.kotest", "kotest-property").version(kotestVersion)

plugin("dokka", "org.jetbrains.dokka").version("1.9.20")
plugin("kotest", "io.kotest").version("0.4.11")
plugin("github-versions", "com.github.ben-manes.versions").version("0.51.0")
plugin("nexus-publish", "io.github.gradle-nexus.publish-plugin").version("2.0.0")
plugin("spotless", "com.diffplug.spotless").version("6.25.0")
plugin("kover", "org.jetbrains.kotlinx.kover").version("0.8.1")
plugin("binary-compatibility-validator", "org.jetbrains.kotlinx.binary-compatibility-validator").version("0.14.0")
}
}
@Suppress("UnstableApiUsage")
repositories {
mavenCentral()
}
}
47 changes: 28 additions & 19 deletions src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.avro.file.DataFileStream
import org.apache.avro.file.DataFileWriter
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DatumWriter
import java.io.Closeable
import java.io.InputStream
import java.io.OutputStream

Expand All @@ -28,28 +29,21 @@ public sealed class AvroObjectContainer(
public companion object Default : AvroObjectContainer(Avro)

/**
* Encodes the given sequence to the given output stream.
* Opens a writer to allow encoding values to avro and writing them to the output stream.
*
* Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks.
*/
public fun <T> encodeToStream(
public fun <T> openWriter(
schema: Schema,
serializer: SerializationStrategy<T>,
values: Sequence<T>,
outputStream: OutputStream,
builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
): AvroObjectContainerWriter<T> {
val datumWriter: DatumWriter<T> = KotlinxSerializationDatumWriter(serializer, avro)
val dataFileWriter = DataFileWriter(datumWriter)
try {
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
values.forEach {
dataFileWriter.append(it)
}
} finally {
dataFileWriter.flush()
}
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
return AvroObjectContainerWriter(dataFileWriter)
}

public fun <T> decodeFromStream(
Expand All @@ -66,6 +60,18 @@ public sealed class AvroObjectContainer(
}
}

public class AvroObjectContainerWriter<T> internal constructor(
private val writer: DataFileWriter<T>,
) : Closeable {
public fun writeValue(value: T) {
writer.append(value)
}

override fun close() {
writer.flush()
}
}

private class AvroObjectContainerImpl(avro: Avro) : AvroObjectContainer(avro)

public fun AvroObjectContainer(
Expand All @@ -76,13 +82,12 @@ public fun AvroObjectContainer(
}

@ExperimentalSerializationApi
public inline fun <reified T> AvroObjectContainer.encodeToStream(
values: Sequence<T>,
public inline fun <reified T> AvroObjectContainer.openWriter(
outputStream: OutputStream,
noinline builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
): AvroObjectContainerWriter<T> {
val serializer = avro.serializersModule.serializer<T>()
encodeToStream(avro.schema(serializer), serializer, values, outputStream, builder)
return openWriter(avro.schema(serializer), serializer, outputStream, builder)
}

@ExperimentalSerializationApi
Expand All @@ -94,7 +99,7 @@ public inline fun <reified T> AvroObjectContainer.decodeFromStream(
return decodeFromStream(serializer, inputStream, metadataDumper)
}

public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*>) {
public class AvroObjectContainerBuilder internal constructor(private val fileWriter: DataFileWriter<*>) {
public fun metadata(
key: String,
value: ByteArray,
Expand All @@ -116,12 +121,16 @@ public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*
fileWriter.setMeta(key, value)
}

public fun syncInterval(value: Int) {
fileWriter.setSyncInterval(value)
}

public fun codec(codec: CodecFactory) {
fileWriter.setCodec(codec)
}
}

public class AvroObjectContainerMetadataDumper(private val fileStream: DataFileStream<*>) {
public class AvroObjectContainerMetadataDumper internal constructor(private val fileStream: DataFileStream<*>) {
public fun metadata(key: String): MetadataAccessor? {
return fileStream.getMeta(key)?.let { MetadataAccessor(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ internal class AvroObjectContainerTest : StringSpec({
// write with avro4k
val bytes =
ByteArrayOutputStream().use {
AvroObjectContainer.encodeToStream(sequenceOf(firstProfile, secondProfile), it) {
metadata("meta-string", "awesome string")
metadata("meta-long", 42)
metadata("bytes", byteArrayOf(1, 3, 2, 42))
}
val writer =
AvroObjectContainer.openWriter<UserProfile>(it) {
metadata("meta-string", "awesome string")
metadata("meta-long", 42)
metadata("bytes", byteArrayOf(1, 3, 2, 42))
}
writer.writeValue(firstProfile)
writer.writeValue(secondProfile)
writer.close()
it.toByteArray()
}
// read with apache avro lib
Expand Down Expand Up @@ -105,7 +109,6 @@ internal class AvroObjectContainerTest : StringSpec({
var closed = false

override fun write(b: Int) {
throw UnsupportedOperationException()
}

override fun close() {
Expand All @@ -114,9 +117,8 @@ internal class AvroObjectContainerTest : StringSpec({
}

val os = SimpleOutputStream()
shouldThrow<UnsupportedOperationException> {
AvroObjectContainer.encodeToStream<UserId>(sequence {}, os)
}
val writer = AvroObjectContainer.openWriter<UserId>(os)
writer.close()
os.closed shouldBe false
}
"decoding error is not closing the stream" {
Expand Down

0 comments on commit 712292d

Please sign in to comment.