Skip to content

Commit

Permalink
Destination S3V2: Fix: Parquet never writes empty files (#49982)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Dec 23, 2024
1 parent 668409a commit ab8e834
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.io.Closeable
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicLong
import org.apache.avro.Schema

interface ObjectStorageFormattingWriter : Closeable {
Expand Down Expand Up @@ -207,16 +208,25 @@ class BufferedFormattingWriter<T : OutputStream>(
private val streamProcessor: StreamProcessor<T>,
private val wrappingBuffer: T
) : ObjectStorageFormattingWriter {
// An empty buffer is not a guarantee of a non-empty
// file, some writers (parquet) start with a
// header. Avoid writing empty files by requiring
// both 0 bytes AND 0 rows.
private val rowsAdded = AtomicLong(0)
val bufferSize: Int
get() = buffer.size()
get() =
if (rowsAdded.get() == 0L) {
0
} else buffer.size()

override fun accept(record: DestinationRecordAirbyteValue) {
writer.accept(record)
rowsAdded.incrementAndGet()
}

fun takeBytes(): ByteArray? {
wrappingBuffer.flush()
if (buffer.size() == 0) {
if (bufferSize == 0) {
return null
}

Expand All @@ -229,7 +239,7 @@ class BufferedFormattingWriter<T : OutputStream>(
writer.flush()
writer.close()
streamProcessor.partFinisher.invoke(wrappingBuffer)
return if (buffer.size() > 0) {
return if (bufferSize > 0) {
buffer.toByteArray()
} else {
null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.write.object_storage

import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter
import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriter
import io.mockk.coEvery
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import java.io.ByteArrayOutputStream
import org.junit.jupiter.api.Test

class ObjectStorageFormattingWriterTest {
@MockK(relaxed = true) lateinit var underlyingWriter: ObjectStorageFormattingWriter

@Test
fun `buffered formatting writer never produces empty parts`() {
val outputStream = ByteArrayOutputStream()
outputStream.write("i am a header".toByteArray())
val bufferedWriter =
BufferedFormattingWriter(
underlyingWriter,
outputStream,
NoopProcessor,
NoopProcessor.wrapper(outputStream),
)

assert(bufferedWriter.bufferSize == 0) { "buffer appears empty despite header" }
assert(bufferedWriter.takeBytes() == null) { "buffer yields no data despite header" }
assert(bufferedWriter.finish() == null) { "buffer yields no data despite header" }
}

@Test
fun `buffered formatting writer yields entire buffer once any data has been added`() {
val outputStream = ByteArrayOutputStream()
outputStream.write("i am a header".toByteArray())
val bufferedWriter =
BufferedFormattingWriter(
underlyingWriter,
outputStream,
NoopProcessor,
NoopProcessor.wrapper(outputStream),
)

assert(bufferedWriter.takeBytes() == null)
coEvery { bufferedWriter.accept(any()) } coAnswers { outputStream.write("!".toByteArray()) }
bufferedWriter.accept(mockk())
val bytes = bufferedWriter.takeBytes()
assert(bytes != null) { "buffer yields data now that we've written to it" }
assert(bytes.contentEquals("i am a header!".toByteArray())) {
"buffer yields all data written to it"
}
}
}

0 comments on commit ab8e834

Please sign in to comment.