Skip to content

Commit

Permalink
Merge branch 'main' into 30509-add-minio-resources
Browse files Browse the repository at this point in the history
  • Loading branch information
machariamuguku authored Mar 11, 2024
2 parents 3f3e1c3 + a245f5b commit 190c598
Show file tree
Hide file tree
Showing 133 changed files with 3,273 additions and 1,936 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class BlockingShutdownAnalyticsPluginTest {
val bodyJson = "{}"
val client: Client = mockk()
val response: Response = mockk()
val flushInterval = 120L
val flushInterval = 3L
val writeKey = "write-key"
val plugin = BlockingShutdownAnalyticsPlugin(flushInterval)

Expand All @@ -54,12 +54,13 @@ class BlockingShutdownAnalyticsPluginTest {
.builder(writeKey)
.client(client)
.flushInterval(flushInterval, TimeUnit.SECONDS)
.flushQueueSize(5001)
.plugin(plugin)
.build()

assertDoesNotThrow {
CompletableFuture.supplyAsync {
for (i in 0..50000) {
for (i in 0..5000) {
val builder = TrackMessage.builder("track").userId("user-id").properties(mapOf("property" to "value"))
analytics.enqueue(builder)
}
Expand Down
6 changes: 3 additions & 3 deletions airbyte-api-server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ airbyte:
persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE}
store:
aws:
access-key: ${AWS_ACCESS_KEY:}
secret-key: ${AWS_SECRET_ACCESS_KEY:}
region: ${AWS_REGION:}
access-key: ${AWS_SECRET_MANAGER_ACCESS_KEY_ID:}
secret-key: ${AWS_SECRET_MANAGER_SECRET_ACCESS_KEY:}
region: ${AWS_SECRET_MANAGER_REGION:}
gcp:
credentials: ${SECRET_STORE_GCP_CREDENTIALS:}
project-id: ${SECRET_STORE_GCP_PROJECT_ID:}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.config.secrets.SecretsRepositoryWriter;
import io.airbyte.config.specs.DefinitionsProvider;
import io.airbyte.config.specs.LocalDefinitionsProvider;
import io.airbyte.data.helpers.ActorDefinitionVersionUpdater;
import io.airbyte.data.services.SecretPersistenceConfigService;
import io.airbyte.data.services.impls.jooq.ActorDefinitionServiceJooqImpl;
import io.airbyte.data.services.impls.jooq.CatalogServiceJooqImpl;
Expand Down Expand Up @@ -95,7 +96,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.41.007";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.41.008";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.50.4.001";
private static final String CDK_VERSION = "1.2.3";

Expand Down Expand Up @@ -145,18 +146,22 @@ void testBootloaderAppBlankDb() throws Exception {
final SecretsRepositoryWriter secretsRepositoryWriter = mock(SecretsRepositoryWriter.class);
final SecretPersistenceConfigService secretPersistenceConfigService = mock(SecretPersistenceConfigService.class);
val connectionService = new ConnectionServiceJooqImpl(configDatabase);
val actorDefinitionService = new ActorDefinitionServiceJooqImpl(configDatabase);
val actorDefinitionVersionUpdater = new ActorDefinitionVersionUpdater(featureFlagClient, connectionService, actorDefinitionService);
val destinationService = new DestinationServiceJooqImpl(configDatabase,
featureFlagClient,
secretsRepositoryReader,
secretsRepositoryWriter,
secretPersistenceConfigService,
connectionService);
connectionService,
actorDefinitionVersionUpdater);
val sourceService = new SourceServiceJooqImpl(configDatabase,
featureFlagClient,
secretsRepositoryReader,
secretsRepositoryWriter,
secretPersistenceConfigService,
connectionService);
connectionService,
actorDefinitionVersionUpdater);
val configRepository = new ConfigRepository(
new ActorDefinitionServiceJooqImpl(configDatabase),
new CatalogServiceJooqImpl(configDatabase),
Expand Down Expand Up @@ -190,7 +195,6 @@ void testBootloaderAppBlankDb() throws Exception {
val actorDefinitionVersionHelper =
new ActorDefinitionVersionHelper(configRepository, new NoOpDefinitionVersionOverrideProvider(), new NoOpDefinitionVersionOverrideProvider(),
featureFlagClient);
val actorDefinitionService = new ActorDefinitionServiceJooqImpl(configDatabase);
val supportStateUpdater =
new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS,
actorDefinitionVersionHelper, breakingChangeNotificationHelper,
Expand Down Expand Up @@ -244,6 +248,8 @@ void testRequiredVersionUpgradePredicate() throws Exception {
val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false);
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);
val connectionService = new ConnectionServiceJooqImpl(configDatabase);
val actorDefinitionService = new ActorDefinitionServiceJooqImpl(configDatabase);
val actorDefinitionVersionUpdater = new ActorDefinitionVersionUpdater(featureFlagClient, connectionService, actorDefinitionService);
val configRepository = new ConfigRepository(
new ActorDefinitionServiceJooqImpl(configDatabase),
new CatalogServiceJooqImpl(configDatabase),
Expand All @@ -254,7 +260,8 @@ void testRequiredVersionUpgradePredicate() throws Exception {
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService),
connectionService,
actorDefinitionVersionUpdater),
new OAuthServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
Expand All @@ -265,7 +272,8 @@ void testRequiredVersionUpgradePredicate() throws Exception {
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService),
connectionService,
actorDefinitionVersionUpdater),
new WorkspaceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
Expand All @@ -286,19 +294,20 @@ void testRequiredVersionUpgradePredicate() throws Exception {
val actorDefinitionVersionHelper =
new ActorDefinitionVersionHelper(configRepository, new NoOpDefinitionVersionOverrideProvider(), new NoOpDefinitionVersionOverrideProvider(),
featureFlagClient);
val actorDefinitionService = new ActorDefinitionServiceJooqImpl(configDatabase);
val sourceService = new SourceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService);
connectionService,
actorDefinitionVersionUpdater);
val destinationService = new DestinationServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService);
connectionService,
actorDefinitionVersionUpdater);
val supportStateUpdater =
new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, actorDefinitionVersionHelper,
breakingChangeNotificationHelper, featureFlagClient);
Expand Down Expand Up @@ -381,6 +390,8 @@ void testPostLoadExecutionExecutes() throws Exception {
val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false);
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);
val connectionService = new ConnectionServiceJooqImpl(configDatabase);
val actorDefinitionService = new ActorDefinitionServiceJooqImpl(configDatabase);
val actorDefinitionVersionUpdater = new ActorDefinitionVersionUpdater(featureFlagClient, connectionService, actorDefinitionService);
val configRepository = new ConfigRepository(
new ActorDefinitionServiceJooqImpl(configDatabase),
new CatalogServiceJooqImpl(configDatabase),
Expand All @@ -391,7 +402,8 @@ void testPostLoadExecutionExecutes() throws Exception {
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService),
connectionService,
actorDefinitionVersionUpdater),
new OAuthServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
Expand All @@ -402,7 +414,8 @@ void testPostLoadExecutionExecutes() throws Exception {
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService),
connectionService,
actorDefinitionVersionUpdater),
new WorkspaceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.pro;
package io.airbyte.commons.auth;

import java.util.Set;

Expand All @@ -17,18 +17,18 @@ public class AirbyteAuthConstants {
* Header used for internal service authentication. This header is dropped by the webapp proxy so
* that external requests cannot use it to authenticate as an internal service.
*/
public static final String AIRBYTE_AUTH_HEADER = "X-Airbyte-Auth";
public static final String X_AIRBYTE_AUTH_HEADER = "X-Airbyte-Auth";

/**
* Header prefix used to identify internal service authentication. For now, this is the only use
* case for the X-Airbyte-Auth header, but in the future, we may add new prefixes for other use
* cases.
*/
public static final String AIRBYTE_AUTH_HEADER_INTERNAL_PREFIX = "Internal";
public static final String X_AIRBYTE_AUTH_HEADER_INTERNAL_PREFIX = "Internal";

/**
* Set of valid internal service names that are able to use the X-Airbyte-Auth: Internal header.
*/
public static final Set<String> VALID_INTERNAL_SERVICE_NAMES = Set.of("worker");
public static final Set<String> VALID_INTERNAL_SERVICE_NAMES = Set.of("worker", "test-client");

}
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAut
&& !result.appliedDiff().getTransforms().isEmpty()
&& (Boolean.TRUE == connectionRead.getNotifySchemaChanges())) {
notifySchemaPropagated(notificationSettings, diff, workspace, connectionRead, source,
workspace.getEmail(), result);
workspace.getEmail());
}
} else {
LOGGER.info("Not propagating changes for connectionId: '{}', new catalogId '{}'",
Expand All @@ -474,8 +474,7 @@ public void notifySchemaPropagated(final NotificationSettings notificationSettin
final StandardWorkspace workspace,
final ConnectionRead connection,
final SourceConnection source,
final String email,
final UpdateSchemaResult result)
final String email)
throws IOException {
final NotificationItem item;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.server.validation;

import static io.airbyte.commons.auth.AuthRoleConstants.ADMIN;

import io.airbyte.commons.license.annotation.RequiresAirbyteProEnabled;
import io.airbyte.commons.server.errors.ApplicationErrorKnownException;
import io.airbyte.config.Permission.PermissionType;
Expand All @@ -13,6 +15,7 @@
import io.micronaut.security.utils.SecurityService;
import jakarta.inject.Singleton;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

/**
* Enterprise edition implementation of {@link ActorDefinitionAccessValidator}. Allows any
Expand All @@ -24,6 +27,7 @@
*/
@Singleton
@RequiresAirbyteProEnabled
@Slf4j
@Replaces(CommunityActorDefinitionAccessValidator.class)
public class EnterpriseActorDefinitionAccessValidator implements ActorDefinitionAccessValidator {

Expand All @@ -42,7 +46,7 @@ public void validateWriteAccess(final UUID actorDefinitionId) throws Application
final String authId = securityService.username().orElse(null);

// instance admin always has write access
if (permissionPersistence.isAuthUserInstanceAdmin(authId)) {
if (securityService.hasRole(ADMIN)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,7 @@ void testSchemaPropagatedEmptyDiff() throws IOException, JsonValidationException
.catalog(newCatalog);
spySchedulerHandler.applySchemaChangeForSource(request);
verify(connectionsHandler).updateConnection(any());
verify(spySchedulerHandler, never()).notifySchemaPropagated(any(), any(), any(), any(), any(), any(), any());
verify(spySchedulerHandler, never()).notifySchemaPropagated(any(), any(), any(), any(), any(), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.server.validation;

import static io.airbyte.commons.auth.AuthRoleConstants.ADMIN;
import static org.mockito.Mockito.when;

import io.airbyte.commons.server.errors.ApplicationErrorKnownException;
Expand Down Expand Up @@ -42,9 +43,8 @@ void setup() {
class ValidateWriteAccess {

@Test
void instanceAdminAllowed() throws IOException {
when(mSecurityService.username()).thenReturn(java.util.Optional.of(USERNAME));
when(mPermissionPersistence.isAuthUserInstanceAdmin(USERNAME)).thenReturn(true);
void instanceAdminAllowed() {
when(mSecurityService.hasRole(ADMIN)).thenReturn(true);

// any actor definition ID passes this check for an instance admin.
Assertions.assertDoesNotThrow(() -> enterpriseActorDefinitionAccessValidator.validateWriteAccess(UUID.randomUUID()));
Expand All @@ -53,7 +53,6 @@ void instanceAdminAllowed() throws IOException {
@Test
void defaultOrgAdminAllowed() throws IOException {
when(mSecurityService.username()).thenReturn(java.util.Optional.of(USERNAME));
when(mPermissionPersistence.isAuthUserInstanceAdmin(USERNAME)).thenReturn(false);
when(mPermissionPersistence.findPermissionTypeForUserAndOrganization(OrganizationPersistence.DEFAULT_ORGANIZATION_ID, USERNAME))
.thenReturn(PermissionType.ORGANIZATION_ADMIN);

Expand All @@ -66,7 +65,7 @@ void defaultOrgAdminAllowed() throws IOException {
@Test
void otherwiseThrows() throws IOException {
when(mSecurityService.username()).thenReturn(java.util.Optional.of(USERNAME));
when(mPermissionPersistence.isAuthUserInstanceAdmin(USERNAME)).thenReturn(false);
when(mSecurityService.hasRole(ADMIN)).thenReturn(false);
when(mPermissionPersistence.findPermissionTypeForUserAndOrganization(OrganizationPersistence.DEFAULT_ORGANIZATION_ID, USERNAME))
.thenReturn(PermissionType.ORGANIZATION_EDITOR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.commons.workers.config;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.envvar.EnvVar;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.ResourceRequirements;
Expand All @@ -18,6 +19,9 @@
*/
public class WorkerConfigs {

public static final String DEFAULT_JOB_KUBE_BUSYBOX_IMAGE = "busybox:1.35";
private static final String DEFAULT_JOB_KUBE_CURL_IMAGE = "curlimages/curl:7.87.0";
private static final String DEFAULT_JOB_KUBE_SOCAT_IMAGE = "alpine/socat:1.7.4.4-r0";
private final Configs.WorkerEnvironment workerEnvironment;
private final ResourceRequirements resourceRequirements;
private final List<TolerationPOJO> workerKubeTolerations;
Expand Down Expand Up @@ -80,9 +84,9 @@ public WorkerConfigs(final Configs configs) {
configs.getJobKubeMainContainerImagePullSecrets(),
configs.getJobKubeMainContainerImagePullPolicy(),
configs.getJobKubeSidecarContainerImagePullPolicy(),
configs.getJobKubeSocatImage(),
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
EnvVar.JOB_KUBE_SOCAT_IMAGE.fetch(DEFAULT_JOB_KUBE_SOCAT_IMAGE),
EnvVar.JOB_KUBE_BUSYBOX_IMAGE.fetch(DEFAULT_JOB_KUBE_BUSYBOX_IMAGE),
EnvVar.JOB_KUBE_CURL_IMAGE.fetch(DEFAULT_JOB_KUBE_CURL_IMAGE),
configs.getJobDefaultEnvMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,18 @@ public static FailureReason sourceFailure(final AirbyteTraceMessage m, final Lon
* @param attemptNumber attempt number
* @return failure reason
*/
public static FailureReason sourceHeartbeatFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
public static FailureReason sourceHeartbeatFailure(final Throwable t,
final Long jobId,
final Integer attemptNumber,
final String humanReadableThreshold,
final String timeBetweenLastRecord) {
final var errorMessage = String.format(
"Airbyte detected that the Source didn't send any records in the last %s, exceeding the configured %s threshold. Airbyte will try reading again on the next sync. Please see https://docs.airbyte.com/understanding-airbyte/heartbeats for more info.",
timeBetweenLastRecord, humanReadableThreshold);
return connectorCommandFailure(t, jobId, attemptNumber, ConnectorCommand.READ)
.withFailureOrigin(FailureOrigin.SOURCE)
.withFailureType(FailureType.HEARTBEAT_TIMEOUT)
.withExternalMessage("The source is unresponsive");
.withExternalMessage(errorMessage);
}

/**
Expand All @@ -195,11 +202,18 @@ public static FailureReason sourceHeartbeatFailure(final Throwable t, final Long
* @param attemptNumber attempt number
* @return failure reason
*/
public static FailureReason destinationTimeoutFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
public static FailureReason destinationTimeoutFailure(final Throwable t,
final Long jobId,
final Integer attemptNumber,
final String humanReadableThreshold,
final String timeBetweenLastAction) {
final var errorMessage = String.format(
"Airbyte detected that the Destination didn't make progress in the last %s, exceeding the configured %s threshold. Airbyte will try reading again on the next sync. Please see https://docs.airbyte.com/understanding-airbyte/heartbeats for more info.",
timeBetweenLastAction, humanReadableThreshold);
return connectorCommandFailure(t, jobId, attemptNumber, ConnectorCommand.WRITE)
.withFailureOrigin(FailureOrigin.DESTINATION)
.withFailureType(FailureType.DESTINATION_TIMEOUT)
.withExternalMessage("Something went wrong when calling the destination. The destination seems stuck");
.withExternalMessage(errorMessage);
}

/**
Expand Down
Loading

0 comments on commit 190c598

Please sign in to comment.