Skip to content

Commit

Permalink
Merge branch 'master' into maxi297/facebook-marketing-update-to-api-v…
Browse files Browse the repository at this point in the history
…ersion-21
  • Loading branch information
maxi297 authored Jan 20, 2025
2 parents 685f3ad + f3705c9 commit 98d33b4
Show file tree
Hide file tree
Showing 886 changed files with 9,785 additions and 8,900 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ import java.util.function.Consumer

/** Emits the [AirbyteMessage] instances produced by the connector. */
@DefaultImplementation(StdoutOutputConsumer::class)
interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
val emittedAt: Instant
abstract class OutputConsumer(private val clock: Clock) : Consumer<AirbyteMessage>, AutoCloseable {
/**
* The constant emittedAt timestamp we use for record timestamps.
*
* TODO: use the correct emittedAt time for each record. Ryan: not changing this now as it could
* have performance implications for sources given the delicate serialization logic in place
* here.
*/
val recordEmittedAt: Instant = Instant.ofEpochMilli(clock.millis())

fun accept(record: AirbyteRecordMessage) {
record.emittedAt = emittedAt.toEpochMilli()
open fun accept(record: AirbyteRecordMessage) {
record.emittedAt = recordEmittedAt.toEpochMilli()
accept(AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(record))
}

Expand Down Expand Up @@ -66,7 +73,9 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
}

fun accept(trace: AirbyteTraceMessage) {
trace.emittedAt = emittedAt.toEpochMilli().toDouble()
// Use the correct emittedAt timestamp for trace messages. This allows platform and other
// downstream consumers to take emission time into account for error classification.
trace.emittedAt = clock.millis().toDouble()
accept(AirbyteMessage().withType(AirbyteMessage.Type.TRACE).withTrace(trace))
}

Expand Down Expand Up @@ -107,7 +116,7 @@ const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
@Secondary
private class StdoutOutputConsumer(
val stdout: PrintStream,
clock: Clock,
private val clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
Expand All @@ -132,9 +141,7 @@ private class StdoutOutputConsumer(
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)

) : OutputConsumer(clock) {
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
Expand Down Expand Up @@ -233,7 +240,7 @@ private class StdoutOutputConsumer(
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
}
return streamToTemplateMap.getOrPut(stream) {
RecordTemplate.create(stream, namespace, emittedAt)
RecordTemplate.create(stream, namespace, recordEmittedAt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.time.Clock
import java.time.Instant

/** [OutputConsumer] implementation for unit tests. Collects everything into thread-safe buffers. */
@Singleton
@Requires(notEnv = [Environment.CLI])
@Replaces(OutputConsumer::class)
class BufferingOutputConsumer(
clock: Clock,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)
) : OutputConsumer(clock) {

private val records = mutableListOf<AirbyteRecordMessage>()
private val states = mutableListOf<AirbyteStateMessage>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ sealed class FeedBootstrap<T : Feed>(
stream.schema.forEach { recordData.putNull(it.id) }
if (feed is Stream && precedingGlobalFeed != null) {
metaFieldDecorator.decorateRecordData(
timestamp = outputConsumer.emittedAt.atOffset(ZoneOffset.UTC),
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
globalStateValue = stateQuerier.current(precedingGlobalFeed),
stream,
recordData,
Expand All @@ -125,7 +125,7 @@ sealed class FeedBootstrap<T : Feed>(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withEmittedAt(outputConsumer.recordEmittedAt.toEpochMilli())
.withData(reusedRecordData)
)

Expand All @@ -138,7 +138,7 @@ sealed class FeedBootstrap<T : Feed>(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withEmittedAt(outputConsumer.recordEmittedAt.toEpochMilli())
.withData(reusedRecordData)
.withMeta(reusedRecordMeta)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FeedBootstrapTest {
FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateQuerier, this)

fun expected(vararg data: String): List<String> {
val ts = outputConsumer.emittedAt.toEpochMilli()
val ts = outputConsumer.recordEmittedAt.toEpochMilli()
return data.map { """{"namespace":"ns","stream":"tbl","data":$it,"emitted_at":$ts}""" }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.state

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.write.StreamLoader
Expand Down Expand Up @@ -141,7 +142,7 @@ class DefaultSyncManager(
.map { (stream, _) -> stream }
if (incompleteStreams.isNotEmpty()) {
val prettyStreams = incompleteStreams.map { it.toPrettyString() }
throw IllegalStateException(
throw TransientErrorException(
"Input was fully read, but some streams did not receive a terminal stream status message. This likely indicates an error in the source or platform. Streams without a status message: $prettyStreams"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.state

import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream1
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
Expand Down Expand Up @@ -133,7 +134,7 @@ class SyncManagerTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
manager1.markEndOfStream(true)
// This should fail, because stream2 was not marked with end of stream
val e = assertThrows<IllegalStateException> { syncManager.markInputConsumed() }
val e = assertThrows<TransientErrorException> { syncManager.markInputConsumed() }
assertEquals(
// stream1 is fine, so the message only includes stream2
"Input was fully read, but some streams did not receive a terminal stream status message. This likely indicates an error in the source or platform. Streams without a status message: [test.stream2]",
Expand Down
2 changes: 2 additions & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,8 @@ airbyte-ci connectors --language=low-code migrate-to-manifest-only

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| 4.48.9 | [#51609](https://github.com/airbytehq/airbyte/pull/51609) | Fix ownership of shared cache volume for non root connectors |
| 4.48.8 | [#51582](https://github.com/airbytehq/airbyte/pull/51582) | Fix typo in `migrate-to-inline-schemas` command |
| 4.48.7 | [#51579](https://github.com/airbytehq/airbyte/pull/51579) | Give back the ownership of /tmp to the original user on finalize build |
| 4.48.6 | [#51577](https://github.com/airbytehq/airbyte/pull/51577) | Run finalize build scripts as root |
| 4.48.5 | [#49827](https://github.com/airbytehq/airbyte/pull/49827) | Bypasses CI checks for promoted release candidate PRs. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ async def _build_from_base_image(self, platform: Platform) -> Container:

connector_container = build_customization.apply_airbyte_entrypoint(base_connector_container, self.context.connector)
customized_connector = await build_customization.post_install_hooks(self.context.connector, connector_container, self.logger)
# Make sure the user has access to /tmp
customized_connector = customized_connector.with_exec(["chown", "-R", f"{user}:{user}", "/tmp"])
return customized_connector.with_user(user)

async def _build_from_dockerfile(self, platform: Platform) -> Container:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def validate_environment(is_local: bool) -> None:
"migrate-to-base-image": "pipelines.airbyte_ci.connectors.migrate_to_base_image.commands.migrate_to_base_image",
"migrate-to-manifest-only": "pipelines.airbyte_ci.connectors.migrate_to_manifest_only.commands.migrate_to_manifest_only",
"migrate-to-poetry": "pipelines.airbyte_ci.connectors.migrate_to_poetry.commands.migrate_to_poetry",
"migrate-to-inline_schemas": "pipelines.airbyte_ci.connectors.migrate_to_inline_schemas.commands.migrate_to_inline_schemas",
"migrate-to-inline-schemas": "pipelines.airbyte_ci.connectors.migrate_to_inline_schemas.commands.migrate_to_inline_schemas",
"migrate-to-logging-logger": "pipelines.airbyte_ci.connectors.migrate_to_logging_logger.commands.migrate_to_logging_logger",
"generate-erd": "pipelines.airbyte_ci.connectors.generate_erd.commands.generate_erd",
"upgrade-cdk": "pipelines.airbyte_ci.connectors.upgrade_cdk.commands.upgrade_cdk",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def _run(self, connector_under_test: Container) -> StepResult:
pytest_command = self.get_pytest_command(test_config_file_name)

if self.bind_to_docker_host:
test_environment = pipelines.dagger.actions.system.docker.with_bound_docker_host(self.context, test_environment)
test_environment = await pipelines.dagger.actions.system.docker.with_bound_docker_host(self.context, test_environment)

test_execution = test_environment.with_exec(pytest_command)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult:
gradle_container = gradle_container.with_(await secrets.mounted_connector_secrets(self.context, secrets_dir, self.secrets))
if self.bind_to_docker_host:
# If this GradleTask subclass needs docker, then install it and bind it to the existing global docker host container.
gradle_container = pipelines.dagger.actions.system.docker.with_bound_docker_host(self.context, gradle_container)
gradle_container = await pipelines.dagger.actions.system.docker.with_bound_docker_host(self.context, gradle_container)
# This installation should be cheap, as the package has already been downloaded, and its dependencies are already installed.
gradle_container = gradle_container.with_exec(["yum", "install", "-y", "docker"], use_entrypoint=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
#

import json
import logging
import platform
import uuid
from typing import Callable, Dict, List, Optional, Union
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union

from dagger import Client, Container, File, Service
from dagger import Secret as DaggerSecret
Expand Down Expand Up @@ -56,13 +58,17 @@ def get_base_dockerd_container(dagger_client: Client) -> Container:
)
)
# Expose the docker host port.
.with_exec(["adduser", "-u", "1000", "-S", "-H", "airbyte"])
.with_exposed_port(DOCKER_HOST_PORT)
# We cache /tmp for file sharing between client and daemon.
.with_mounted_cache("/tmp", dagger_client.cache_volume(DOCKER_TMP_VOLUME_NAME))
.with_mounted_cache("/tmp", dagger_client.cache_volume(DOCKER_TMP_VOLUME_NAME), owner="airbyte")
.with_exec(["chmod", "777", "/tmp"])
)

# We cache /var/lib/docker to avoid downloading images and layers multiple times.
base_container = base_container.with_mounted_cache("/var/lib/docker", dagger_client.cache_volume(DOCKER_VAR_LIB_VOLUME_NAME))
base_container = base_container.with_mounted_cache(
"/var/lib/docker", dagger_client.cache_volume(DOCKER_VAR_LIB_VOLUME_NAME), owner="airbyte"
)
return base_container


Expand All @@ -75,8 +81,10 @@ def get_daemon_config_json(registry_mirror_url: Optional[str] = None) -> str:
Returns:
str: The json representation of the docker daemon config.
"""
storage_driver = "vfs" if platform.system() == "Darwin" else STORAGE_DRIVER
logging.info(f"Using storage driver: {storage_driver}")
daemon_config: Dict[str, Union[List[str], str]] = {
"storage-driver": STORAGE_DRIVER,
"storage-driver": storage_driver,
}
if registry_mirror_url:
daemon_config["registry-mirrors"] = ["http://" + registry_mirror_url]
Expand Down Expand Up @@ -152,7 +160,7 @@ def with_global_dockerd_service(
).as_service()


def with_bound_docker_host(
async def with_bound_docker_host(
context: ConnectorContext,
container: Container,
) -> Container:
Expand All @@ -165,21 +173,22 @@ def with_bound_docker_host(
Container: The container bound to the docker host.
"""
assert context.dockerd_service is not None
current_user = (await container.with_exec(["whoami"]).stdout()).strip()
return (
container.with_env_variable("DOCKER_HOST", f"tcp://{DOCKER_HOST_NAME}:{DOCKER_HOST_PORT}")
.with_service_binding(DOCKER_HOST_NAME, context.dockerd_service)
.with_mounted_cache("/tmp", context.dagger_client.cache_volume(DOCKER_TMP_VOLUME_NAME))
.with_mounted_cache("/tmp", context.dagger_client.cache_volume(DOCKER_TMP_VOLUME_NAME), owner=current_user)
)


def bound_docker_host(context: ConnectorContext) -> Callable[[Container], Container]:
def bound_docker_host_inner(container: Container) -> Container:
return with_bound_docker_host(context, container)
def bound_docker_host(context: ConnectorContext) -> Callable[[Container], Coroutine[Any, Any, Container]]:
async def bound_docker_host_inner(container: Container) -> Container:
return await with_bound_docker_host(context, container)

return bound_docker_host_inner


def with_docker_cli(context: ConnectorContext) -> Container:
async def with_docker_cli(context: ConnectorContext) -> Container:
"""Create a container with the docker CLI installed and bound to a persistent docker host.
Args:
Expand All @@ -189,7 +198,7 @@ def with_docker_cli(context: ConnectorContext) -> Container:
Container: A docker cli container bound to a docker host.
"""
docker_cli = context.dagger_client.container().from_(consts.DOCKER_CLI_IMAGE)
return with_bound_docker_host(context, docker_cli)
return await with_bound_docker_host(context, docker_cli)


async def load_image_to_docker_host(context: ConnectorContext, tar_file: File, image_tag: str) -> str:
Expand All @@ -202,7 +211,7 @@ async def load_image_to_docker_host(context: ConnectorContext, tar_file: File, i
"""
# Hacky way to make sure the image is always loaded
tar_name = f"{str(uuid.uuid4())}.tar"
docker_cli = with_docker_cli(context).with_mounted_file(tar_name, tar_file)
docker_cli = (await with_docker_cli(context)).with_mounted_file(tar_name, tar_file)

image_load_output = await docker_cli.with_exec(["docker", "load", "--input", tar_name], use_entrypoint=True).stdout()
# Not tagged images only have a sha256 id the load output shares.
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "4.48.7"
version = "4.48.9"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <[email protected]>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: vectorstore
connectorType: destination
definitionId: 0b75218b-f702-4a28-85ac-34d3d84c0fc2
dockerImageTag: 0.0.40
dockerImageTag: 0.0.41
dockerRepository: airbyte/destination-chroma
githubIssueLabel: destination-chroma
icon: chroma.svg
Expand Down
Loading

0 comments on commit 98d33b4

Please sign in to comment.