Skip to content

Commit

Permalink
Merge branch 'master' into tope/klaviyo/upgrade-2024-10-15
Browse files Browse the repository at this point in the history
  • Loading branch information
DanyloGL authored Jan 22, 2025
2 parents c481e39 + 056d4f2 commit 45dd3b6
Show file tree
Hide file tree
Showing 45 changed files with 3,423 additions and 1,139 deletions.
29 changes: 29 additions & 0 deletions .github/actions/install-java-environment/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Install Java Environment
description: "Installs the Java environment"
inputs:
java_version:
description: "Java version"
required: false
default: "21"
type: string
gradle_cache_read_only:
description: "Whether to use a read-only Gradle cache"
required: false
default: false
type: boolean
gradle_cache_write_only:
description: "Whether to use a write-only Gradle cache"
required: false
default: false
type: boolean
runs:
using: "composite"
steps:
- uses: actions/setup-java@v4
with:
distribution: corretto
java-version: ${{ inputs.java_version }}
- uses: gradle/actions/setup-gradle@v3
with:
cache-read-only: ${{ inputs.gradle_cache_read_only }}
cache-write-only: ${{ inputs.gradle_cache_write_only }}
4 changes: 3 additions & 1 deletion .github/actions/run-airbyte-ci/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ runs:
ls -la
ls -la airbyte-python-cdk || echo "No airbyte-python-cdk directory"
ls -laL ../airbyte-python-cdk || echo "No airbyte-python-cdk symlink"
- name: Install Java Environment
id: install-java-environment
uses: ./.github/actions/install-java-environment
- name: Docker login
id: docker-login
uses: docker/login-action@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ abstract class DestinationConfiguration : Configuration {
*/
open val gracefulCancellationTimeoutMs: Long = 10 * 60 * 1000L // 10 minutes

open val numOpenStreamWorkers: Int = 1
open val numProcessRecordsWorkers: Int = 2
open val numProcessBatchWorkers: Int = 5
open val numProcessBatchWorkersForFileTransfer: Int = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package io.airbyte.cdk.load.config

import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
Expand Down Expand Up @@ -89,4 +91,8 @@ class SyncBeanFactory {
val channel = Channel<FileTransferQueueMessage>(config.batchQueueDepth)
return MultiProducerChannel(1, channel, "fileMessageQueue")
}

@Singleton
@Named("openStreamQueue")
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>()
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

interface DestinationTaskLauncher : TaskLauncher {
suspend fun handleSetupComplete()
suspend fun handleStreamStarted(stream: DestinationStream.Descriptor)
suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>)
suspend fun handleStreamClosed(stream: DestinationStream.Descriptor)
suspend fun handleTeardownComplete(success: Boolean = true)
Expand Down Expand Up @@ -129,7 +127,9 @@ class DefaultDestinationTaskLauncher(
private val recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
private val checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
@Named("fileMessageQueue") private val fileTransferQueue: MessageQueue<FileTransferQueueMessage>
@Named("fileMessageQueue")
private val fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
@Named("openStreamQueue") private val openStreamQueue: MessageQueue<DestinationStream>
) : DestinationTaskLauncher {
private val log = KotlinLogging.logger {}

Expand Down Expand Up @@ -184,9 +184,17 @@ class DefaultDestinationTaskLauncher(

// Launch the client interface setup task
log.info { "Starting startup task" }
val setupTask = setupTaskFactory.make(this)
val setupTask = setupTaskFactory.make()
launch(setupTask)

log.info { "Enqueueing open stream tasks" }
catalog.streams.forEach { openStreamQueue.publish(it) }
openStreamQueue.close()
repeat(config.numOpenStreamWorkers) {
log.info { "Launching open stream task $it" }
launch(openStreamTaskFactory.make())
}

// TODO: pluggable file transfer
if (!fileTransferEnabled) {
// Start a spill-to-disk task for each record stream
Expand Down Expand Up @@ -239,21 +247,6 @@ class DefaultDestinationTaskLauncher(
}
}

/** Called when the initial destination setup completes. */
override suspend fun handleSetupComplete() {
catalog.streams.forEach {
log.info { "Starting open stream task for $it" }
val task = openStreamTaskFactory.make(this, it)
launch(task)
}
}

/** Called when a stream is ready for loading. */
override suspend fun handleStreamStarted(stream: DestinationStream.Descriptor) {
// Nothing to do because the SpillToDiskTask will trigger the next calls
log.info { "Stream $stream successfully opened for writing." }
}

/**
* Called for each new batch. Enqueues processing for any incomplete batch, and enqueues closing
* the stream if all batches are complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
package io.airbyte.cdk.load.task.implementor

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.MessageQueue
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.SelfTerminating
import io.airbyte.cdk.load.task.Task
import io.airbyte.cdk.load.task.TerminalCondition
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Named
import jakarta.inject.Singleton
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList

interface OpenStreamTask : Task

Expand All @@ -25,44 +28,43 @@ interface OpenStreamTask : Task
class DefaultOpenStreamTask(
private val destinationWriter: DestinationWriter,
private val syncManager: SyncManager,
val streamDescriptor: DestinationStream.Descriptor,
private val taskLauncher: DestinationTaskLauncher,
private val stream: DestinationStream,
private val openStreamQueue: MessageQueue<DestinationStream>
) : OpenStreamTask {
override val terminalCondition: TerminalCondition = SelfTerminating

override suspend fun execute() {
val streamLoader = destinationWriter.createStreamLoader(stream)
val result = runCatching {
streamLoader.start()
streamLoader
}
syncManager.registerStartedStreamLoader(stream.descriptor, result)
result.getOrThrow() // throw after registering the failure
taskLauncher.handleStreamStarted(streamDescriptor)
val results =
openStreamQueue
.consume()
.map { stream ->
val streamLoader = destinationWriter.createStreamLoader(stream)
val result = runCatching {
streamLoader.start()
streamLoader
}
syncManager.registerStartedStreamLoader(
stream.descriptor,
result
) // throw after registering the failure
result
}
.toList()
results.forEach { it.getOrThrow() }
}
}

interface OpenStreamTaskFactory {
fun make(taskLauncher: DestinationTaskLauncher, stream: DestinationStream): OpenStreamTask
fun make(): OpenStreamTask
}

@Singleton
@Secondary
class DefaultOpenStreamTaskFactory(
private val destinationWriter: DestinationWriter,
private val syncManager: SyncManager
private val syncManager: SyncManager,
@Named("openStreamQueue") private val openStreamQueue: MessageQueue<DestinationStream>
) : OpenStreamTaskFactory {
override fun make(
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream
): OpenStreamTask {
return DefaultOpenStreamTask(
destinationWriter,
syncManager,
stream.descriptor,
taskLauncher,
stream
)
override fun make(): OpenStreamTask {
return DefaultOpenStreamTask(destinationWriter, syncManager, openStreamQueue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.cdk.load.task.implementor

import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.SelfTerminating
import io.airbyte.cdk.load.task.Task
import io.airbyte.cdk.load.task.TerminalCondition
Expand All @@ -14,34 +13,27 @@ import jakarta.inject.Singleton

interface SetupTask : Task

/**
* Wraps @[DestinationWriter.setup] and starts the open stream tasks.
*
* TODO: This should call something like "TaskLauncher.setupComplete" and let it decide what to do
* next.
*/
/** Wraps @[DestinationWriter.setup] and starts the open stream tasks. */
class DefaultSetupTask(
private val destination: DestinationWriter,
private val taskLauncher: DestinationTaskLauncher
) : SetupTask {
override val terminalCondition: TerminalCondition = SelfTerminating

override suspend fun execute() {
destination.setup()
taskLauncher.handleSetupComplete()
}
}

interface SetupTaskFactory {
fun make(taskLauncher: DestinationTaskLauncher): SetupTask
fun make(): SetupTask
}

@Singleton
@Secondary
class DefaultSetupTaskFactory(
private val destination: DestinationWriter,
) : SetupTaskFactory {
override fun make(taskLauncher: DestinationTaskLauncher): SetupTask {
return DefaultSetupTask(destination, taskLauncher)
override fun make(): SetupTask {
return DefaultSetupTask(destination)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class DestinationTaskLauncherTest {
@Inject lateinit var mockInputConsumerTask: MockInputConsumerTaskFactory
@Inject lateinit var mockSetupTaskFactory: MockSetupTaskFactory
@Inject lateinit var mockSpillToDiskTaskFactory: MockSpillToDiskTaskFactory
@Inject lateinit var mockOpenStreamTaskFactory: MockOpenStreamTaskFactory
@Inject lateinit var processRecordsTaskFactory: ProcessRecordsTaskFactory
@Inject lateinit var processBatchTaskFactory: ProcessBatchTaskFactory
@Inject lateinit var closeStreamTaskFactory: MockCloseStreamTaskFactory
Expand Down Expand Up @@ -172,9 +171,7 @@ class DestinationTaskLauncherTest {
class MockSetupTaskFactory : SetupTaskFactory {
val hasRun: Channel<Unit> = Channel(Channel.UNLIMITED)

override fun make(
taskLauncher: DestinationTaskLauncher,
): SetupTask {
override fun make(): SetupTask {
return object : SetupTask {
override val terminalCondition: TerminalCondition = SelfTerminating

Expand Down Expand Up @@ -216,22 +213,13 @@ class DestinationTaskLauncherTest {
@Singleton
@Replaces(DefaultOpenStreamTaskFactory::class)
@Requires(env = ["DestinationTaskLauncherTest"])
class MockOpenStreamTaskFactory(catalog: DestinationCatalog) : OpenStreamTaskFactory {
val streamHasRun = mutableMapOf<DestinationStream, Channel<Unit>>()

init {
catalog.streams.forEach { streamHasRun[it] = Channel(Channel.UNLIMITED) }
}

override fun make(
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream
): OpenStreamTask {
class MockOpenStreamTaskFactory : OpenStreamTaskFactory {
override fun make(): OpenStreamTask {
return object : OpenStreamTask {
override val terminalCondition: TerminalCondition = SelfTerminating

override suspend fun execute() {
streamHasRun[stream]?.send(Unit)
// Do nothing
}
}
}
Expand Down Expand Up @@ -389,13 +377,6 @@ class DestinationTaskLauncherTest {
job.cancel()
}

@Test
fun testHandleSetupComplete() = runTest {
// Verify that open stream ran for each stream
taskLauncher.handleSetupComplete()
mockOpenStreamTaskFactory.streamHasRun.values.forEach { it.receive() }
}

@Test
fun testHandleNewBatch() = runTest {
val range = TreeRangeSet.create(listOf(Range.closed(0L, 100L)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class DestinationTaskLauncherUTest {
private val checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>> =
mockk(relaxed = true)
private val fileTransferQueue: MessageQueue<FileTransferQueueMessage> = mockk(relaxed = true)
private val openStreamQueue: MessageQueue<DestinationStream> = mockk(relaxed = true)

private fun getDefaultDestinationTaskLauncher(
useFileTranfer: Boolean
): DefaultDestinationTaskLauncher {
Expand Down Expand Up @@ -112,6 +114,7 @@ class DestinationTaskLauncherUTest {
recordQueueSupplier,
checkpointQueue,
fileTransferQueue,
openStreamQueue,
)
}

Expand Down Expand Up @@ -226,4 +229,18 @@ class DestinationTaskLauncherUTest {
)
}
}

@Test
fun `test numOpenStreamWorkers open stream tasks are launched`() = runTest {
val numOpenStreamWorkers = 3
val destinationTaskLauncher = getDefaultDestinationTaskLauncher(false)

coEvery { config.numOpenStreamWorkers } returns numOpenStreamWorkers

val job = launch { destinationTaskLauncher.run() }
destinationTaskLauncher.handleTeardownComplete()
job.join()

coVerify(exactly = numOpenStreamWorkers) { openStreamTaskFactory.make() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ import jakarta.inject.Singleton
class MockTaskLauncher : DestinationTaskLauncher {
val batchEnvelopes = mutableListOf<BatchEnvelope<*>>()

override suspend fun handleSetupComplete() {
throw NotImplementedError()
}

override suspend fun handleStreamStarted(stream: DestinationStream.Descriptor) {
throw NotImplementedError()
}

override suspend fun handleNewBatch(
stream: DestinationStream.Descriptor,
wrapped: BatchEnvelope<*>
Expand Down
Loading

0 comments on commit 45dd3b6

Please sign in to comment.