Skip to content

Commit

Permalink
fix: make sure file transfer flushes records when possible (#14481)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Oct 29, 2024
1 parent 000a748 commit ef9c40d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public BufferedReplicationWorker create(final ReplicationInput replicationInput,
new MessageMetricsTracker(metricClient),
messageWriterFactory,
destinationTimeout,
ContainerIOHandle.dest());
ContainerIOHandle.dest(),
replicationInput.getUseFileTransfer());

final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class LocalContainerAirbyteDestination(
private val messageWriterFactory: AirbyteMessageBufferedWriterFactory,
private val destinationTimeoutMonitor: DestinationTimeoutMonitor,
private val containerIOHandle: ContainerIOHandle,
private val flushImmediately: Boolean = false,
) : AirbyteDestination {
private val inputHasEnded = AtomicBoolean(false)
private lateinit var messageIterator: Iterator<AirbyteMessage>
Expand Down Expand Up @@ -128,11 +129,14 @@ class LocalContainerAirbyteDestination(
}

@Throws(IOException::class)
private fun acceptWithNoTimeoutMonitor(message: AirbyteMessage) {
fun acceptWithNoTimeoutMonitor(message: AirbyteMessage) {
// TODO also check if stdout file exists? or check if some other startup file exists?
check(!inputHasEnded.get())

writer.write(message)
if (flushImmediately) {
writer.flush()
}
}

@Throws(IOException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.workers.internal.ContainerIOHandle.Companion.EXIT_CODE_CHECK_E
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import io.mockk.verifyOrder
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
Expand Down Expand Up @@ -426,4 +427,35 @@ internal class LocalContainerAirbyteDestinationTest {
assertEquals("Destination has not terminated. This warning is normal if the job was cancelled.", error.message)
verify(exactly = 1) { mockedContainerIOHandle.terminate() }
}

@Test
fun testFlushAfterWrite() {
val writer = mockk<AirbyteMessageBufferedWriter>(relaxed = true)
every { messageWriterFactory.createWriter(any()) } returns writer

val localContainerAirbyteDestinationWithForcePush =
LocalContainerAirbyteDestination(
mockk(relaxed = true),
mockk(relaxed = true),
messageWriterFactory,
mockk(relaxed = true),
mockk(relaxed = true),
true,
)

val message =
AirbyteMessage()
.withAdditionalProperty("test", "message")
val workerDestinationConfig =
WorkerDestinationConfig()
.withDestinationId(UUID.randomUUID())

localContainerAirbyteDestinationWithForcePush.start(workerDestinationConfig, mockk())
localContainerAirbyteDestinationWithForcePush.acceptWithNoTimeoutMonitor(message)

verifyOrder {
writer.write(message)
writer.flush()
}
}
}

0 comments on commit ef9c40d

Please sign in to comment.