Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3: inject assume role creds via micronaut rather than system.getenv #50951

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
package io.airbyte.cdk

import io.airbyte.cdk.command.ConnectorCommandLinePropertySource
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.MetadataYamlPropertySource
import io.micronaut.configuration.picocli.MicronautFactory
import io.micronaut.context.ApplicationContext
import io.micronaut.context.RuntimeBeanDefinition
import io.micronaut.context.env.CommandLinePropertySource
import io.micronaut.context.env.Environment
import io.micronaut.context.env.MapPropertySource
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
import java.nio.file.Path
import kotlin.system.exitProcess
Expand All @@ -17,15 +17,20 @@ import picocli.CommandLine.Model.ArgGroupSpec
import picocli.CommandLine.Model.OptionSpec
import picocli.CommandLine.Model.UsageMessageSpec

// A pair of micronaut environment names. Intended for use by connectors,
// which can create `src/main/resources/application-override.yaml` and
// `src/test-integration/resources/application-test-override.yaml` to override
// CDK property declarations.
const val OVERRIDE_ENV = "override"
const val TEST_OVERRIDE_ENV = "test-override"

/** Source connector entry point. */
class AirbyteSourceRunner(
/** CLI args. */
args: Array<out String>,
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("source", args, systemEnv, testBeanDefinitions) {
) : AirbyteConnectorRunner("source", args, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -38,15 +43,31 @@ class AirbyteSourceRunner(
class AirbyteDestinationRunner(
/** CLI args. */
args: Array<out String>,
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("destination", args, systemEnv, testBeanDefinitions) {
additionalEnvironments: Array<out String> = emptyArray(),
additionalProperties: Map<String, String> = emptyMap(),
) :
AirbyteConnectorRunner(
"destination",
args,
testBeanDefinitions,
additionalEnvironments,
additionalProperties,
) {
companion object {
@JvmStatic
fun run(vararg args: String) {
AirbyteDestinationRunner(args).run<AirbyteConnectorRunnable>()
fun run(
vararg args: String,
additionalEnvironments: Array<out String> = emptyArray(),
additionalProperties: Map<String, String> = emptyMap()
) {
AirbyteDestinationRunner(
args,
additionalEnvironments = additionalEnvironments,
additionalProperties = additionalProperties
)
.run<AirbyteConnectorRunnable>()
}
}
}
Expand All @@ -58,13 +79,12 @@ class AirbyteDestinationRunner(
sealed class AirbyteConnectorRunner(
val connectorType: String,
val args: Array<out String>,
systemEnv: Map<String, String>,
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
val additionalEnvironments: Array<out String> = emptyArray(),
val additionalProperties: Map<String, String> = emptyMap(),
) {
val envs: Array<String> =
arrayOf(Environment.CLI, connectorType) +
// Set feature flag environments.
FeatureFlag.active(systemEnv).map { it.micronautEnvironmentName } +
// Micronaut's TEST env detection relies on inspecting the stacktrace and checking for
// any junit calls. This doesn't work if we launch the connector from a different
// thread, e.g. `Dispatchers.IO`. Force the test env if needed. Some tests launch the
Expand All @@ -80,13 +100,21 @@ sealed class AirbyteConnectorRunner(
picocliCommandLineFactory.commands.options().map { it.longestName() },
)
val commandLinePropertySource = CommandLinePropertySource(micronautCommandLine)
val additionalPropertiesSource =
MapPropertySource("additional_properties", additionalProperties)
val ctx: ApplicationContext =
ApplicationContext.builder(R::class.java, *envs)
// Note: the override envs are applied last, with test-override coming after override.
// If the same property appears in multiple application-XYZ.yaml files,
// the last-declared env wins, so this allows connectors to override CDK declarations,
// and connector tests to override connector runtime declarations..
// (sidenote: application-XYZ.yaml wins over application.yaml)
ApplicationContext.builder(R::class.java, *envs, *additionalEnvironments, OVERRIDE_ENV, TEST_OVERRIDE_ENV)
.propertySources(
*listOfNotNull(
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
additionalPropertiesSource,
)
.toTypedArray(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package io.airbyte.cdk.command

import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import io.micronaut.context.condition.Condition
import io.micronaut.context.condition.ConditionContext
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.util.EnumSet
Expand Down Expand Up @@ -62,3 +65,31 @@ enum class FeatureFlag(
}

const val AIRBYTE_CLOUD_ENV = "airbyte-cloud"

const val AIRBYTE_DEPLOYMENT_MODE_PROPERTY = "airbyte.core.deployment.mode"
const val DEPLOYMENT_MODE_CLOUD = "CLOUD"

enum class DeploymentMode { OSS, CLOUD }
@Factory
class DeploymentModeFactory(@Value("\${airbyte.core.deployment.mode}") val deploymentMode: String?) {
@Singleton
fun getDeploymentMode(): DeploymentMode {
return when (deploymentMode) {
"CLOUD" -> DeploymentMode.CLOUD
"OSS" -> DeploymentMode.OSS
// if deployment mode is unset, or some weird value, assume OSS
else -> DeploymentMode.OSS
}
}
}
class DeploymentModeCloudCondition: Condition {
override fun matches(context: ConditionContext<*>): Boolean {
val deploymentMode = context.getBean(DeploymentMode::class.java)
return deploymentMode == DeploymentMode.CLOUD
}
}
class DeploymentModeOssCondition: Condition {
override fun matches(context: ConditionContext<*>): Boolean {
return !DeploymentModeCloudCondition().matches(context)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
airbyte:
core:
deployment:
mode: ${DEPLOYMENT_MODE:OSS}
file-transfer:
enabled: ${USE_FILE_TRANSFER:false}
staging-path: ${AIRBYTE_STAGING_DIRECTORY:/staging/files}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
vararg featureFlags: FeatureFlag,
): CliRunnable {
val out = CliRunnerOutputStream()
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteSourceRunner(args, featureFlags.systemEnv, out.beanDefinition)
AirbyteSourceRunner(args, out.beanDefinition)
}
return CliRunnable(runnable, out.results)
}
Expand All @@ -53,7 +52,8 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
vararg featureFlags: FeatureFlag,
additionalEnvironments: Array<out String>,
additionalProperties: Map<String, String>,
): CliRunnable {
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream }
Expand All @@ -65,9 +65,10 @@ data object CliRunner {
makeRunnable(op, configPath, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
featureFlags.systemEnv,
inputBeanDefinition,
out.beanDefinition,
additionalEnvironments = additionalEnvironments,
additionalProperties = additionalProperties,
)
}
return CliRunnable(runnable, out.results)
Expand All @@ -79,7 +80,8 @@ data object CliRunner {
configContents: String? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
additionalEnvironments: Array<out String>,
additionalProperties: Map<String, String>,
vararg input: AirbyteMessage,
): CliRunnable {
val inputJsonBytes: ByteArray =
Expand All @@ -97,7 +99,8 @@ data object CliRunner {
catalog,
state,
inputStream,
*featureFlags.toTypedArray()
additionalEnvironments = additionalEnvironments,
additionalProperties = additionalProperties,
)
}

Expand Down Expand Up @@ -140,9 +143,6 @@ data object CliRunner {
}
}

private val Array<out FeatureFlag>.systemEnv: Map<String, String>
get() = toSet().map { it.envVar.name to it.requiredEnvVarValue }.toMap()

private fun inputFile(contents: Any?): Path? =
contents?.let { inputFileFromString(Jsons.writeValueAsString(contents)) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.Property
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -46,6 +47,7 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
"check",
configContents = config,
featureFlags = featureFlags.toTypedArray(),
micronautProperties = mapOf(Property("airbyte.core.deployment.mode", "DEPLOYMENT_MODE") to "CLOUD"),
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand Down Expand Up @@ -73,6 +75,7 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
destinationProcessFactory.createDestinationProcess(
"check",
configContents = config,
micronautProperties = mapOf(Property("airbyte.core.deployment.mode", "DEPLOYMENT_MODE") to "CLOUD"),
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory
import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException
import io.airbyte.cdk.load.test.util.destination_process.NonDockerizedDestination
import io.airbyte.cdk.load.test.util.destination_process.Property
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
Expand Down Expand Up @@ -57,11 +58,13 @@ abstract class IntegrationTest(
val nullEqualsUnset: Boolean = false,
val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
val envVars: Map<String, String> = emptyMap(),
additionalEnvironments: Array<out String> = emptyArray(),
val additionalMicronautProperties: Map<Property, String> = emptyMap(),
) {
// Intentionally don't inject the actual destination process - we need a full factory
// because some tests want to run multiple syncs, so we need to run the destination
// multiple times.
val destinationProcessFactory = DestinationProcessFactory.get()
val destinationProcessFactory = DestinationProcessFactory.get(additionalEnvironments)

@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
Expand Down Expand Up @@ -131,13 +134,17 @@ abstract class IntegrationTest(
messages: List<InputMessage>,
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
micronautProperties: Map<Property, String> = emptyMap(),
): List<AirbyteMessage> =
runSync(
configContents,
DestinationCatalog(listOf(stream)),
messages,
streamStatus,
useFileTransfer,
envVars,
micronautProperties,
)

/**
Expand Down Expand Up @@ -172,6 +179,8 @@ abstract class IntegrationTest(
*/
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
micronautProperties: Map<Property, String> = emptyMap(),
): List<AirbyteMessage> {
val destination =
destinationProcessFactory.createDestinationProcess(
Expand All @@ -180,6 +189,7 @@ abstract class IntegrationTest(
catalog.asProtocolObject(),
useFileTransfer = useFileTransfer,
envVars = envVars,
micronautProperties = micronautProperties + additionalMicronautProperties,
)
return runBlocking(Dispatchers.IO) {
launch { destination.run() }
Expand Down Expand Up @@ -215,14 +225,17 @@ abstract class IntegrationTest(
inputStateMessage: StreamCheckpoint,
allowGracefulShutdown: Boolean,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
micronautProperties: Map<Property, String> = emptyMap(),
): AirbyteStateMessage {
val destination =
destinationProcessFactory.createDestinationProcess(
"write",
configContents,
DestinationCatalog(listOf(stream)).asProtocolObject(),
useFileTransfer,
envVars
envVars,
micronautProperties = micronautProperties + additionalMicronautProperties,
)
return runBlocking(Dispatchers.IO) {
launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ abstract class DestinationProcessFactory {
configContents: String? = null,
catalog: ConfiguredAirbyteCatalog? = null,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
micronautProperties: Map<Property, String> = emptyMap(),
): DestinationProcess

companion object {
fun get(): DestinationProcessFactory =
/**
* @param nonDockerDefaultEnvironments The environments to launch the connector with, in
* non-docker mode. (in Docker mode, the connector is expected to launch itself with these
* environments)
*/
fun get(nonDockerDefaultEnvironments: Array<out String>): DestinationProcessFactory =
when (val runner = System.getenv("AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER")) {
null,
"non-docker" -> NonDockerizedDestinationFactory()
"non-docker" -> NonDockerizedDestinationFactory(nonDockerDefaultEnvironments)
"docker" -> {
val rawProperties: Map<String, Any?> =
YamlPropertySourceLoader()
Expand All @@ -91,3 +95,19 @@ abstract class DestinationProcessFactory {
}
}
}

/**
* Represents a micronaut property, which has a corresponding entry in micronaut's `application.yml`
* file, which is populated by an environment variable. Just a pair of the micronaut property name,
* and that corresponding env var name.
*
* For example, this application.yaml:
* ```yaml
* airbyte:
* destination:
* foo-bar: ${FOO_BAR}
* ```
*
* Would be represented as `Property("airbyte.destination.foo-bar", "FOO_BAR")`.
*/
data class Property(val micronautProperty: String, val environmentVariable: String)
Loading
Loading