Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Idempotent Resource Creation #298

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object ConnectionCreateMapper {
connectionCreateOss.sourceId = connectionCreateRequest.sourceId
connectionCreateOss.destinationId = connectionCreateRequest.destinationId
connectionCreateOss.name = connectionCreateRequest.name
connectionCreateOss.idempotencyKey = connectionCreateRequest.idempotencyKey
connectionCreateOss.nonBreakingChangesPreference =
ConnectionHelper.convertNonBreakingSchemaUpdatesBehaviorEnum(
connectionCreateRequest.nonBreakingSchemaUpdatesBehavior,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class DestinationServiceImpl(private val configApiClient: ConfigApiClient, priva
destinationCreateOss.destinationDefinitionId = destinationDefinitionId
destinationCreateOss.workspaceId = destinationCreateRequest.workspaceId
destinationCreateOss.connectionConfiguration = destinationCreateRequest.configuration
destinationCreateOss.idempotencyKey = destinationCreateRequest.idempotencyKey

val response =
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ open class SourceServiceImpl(
sourceCreateOss.workspaceId = sourceCreateRequest.workspaceId
sourceCreateOss.connectionConfiguration = sourceCreateRequest.configuration
sourceCreateOss.secretId = sourceCreateRequest.secretId
sourceCreateOss.idempotencyKey = sourceCreateRequest.idempotencyKey

val response =
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ open class WorkspaceServiceImpl(
authorization: String?,
userInfo: String?,
): WorkspaceResponse {
val workspaceCreate = WorkspaceCreate().name(workspaceCreateRequest.name)
val workspaceCreate = WorkspaceCreate().name(workspaceCreateRequest.name).idempotencyKey(workspaceCreateRequest.idempotencyKey)
val workspaceReadHttpResponse =
try {
configApiClient.createWorkspace(workspaceCreate, authorization, userInfo)
Expand Down
13 changes: 13 additions & 0 deletions airbyte-api/src/main/openapi/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,8 @@ components:
$ref: "#/components/schemas/NonBreakingSchemaUpdatesBehaviorEnum"
status:
$ref: "#/components/schemas/ConnectionStatusEnum"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
ConnectionPatchRequest:
type: object
properties:
Expand Down Expand Up @@ -1299,6 +1301,8 @@ components:
secretId:
description: Optional secretID obtained through the public API OAuth redirect flow.
type: string
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
x-implements: io.airbyte.api.common.ConfigurableActor
SourcePutRequest:
required:
Expand Down Expand Up @@ -1586,6 +1590,8 @@ components:
type: string
configuration:
$ref: "#/components/schemas/DestinationConfiguration"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
x-implements: io.airbyte.api.common.ConfigurableActor
DestinationPatchRequest:
type: object
Expand Down Expand Up @@ -1614,6 +1620,8 @@ components:
name:
description: Name of the workspace
type: string
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
WorkspaceUpdateRequest:
required:
- name
Expand Down Expand Up @@ -1811,3 +1819,8 @@ components:
enum:
- source
- destination
IdempotencyKey:
type: string
format: uuid
description: |
An optional UUID that can be provided in a create request to prevent creating duplicates of the same resource
31 changes: 29 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4428,6 +4428,8 @@ components:
$ref: "#/components/schemas/WebhookConfigWrite"
organizationId:
$ref: "#/components/schemas/OrganizationId"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
WorkspaceCreateWithId:
type: object
required:
Expand Down Expand Up @@ -4464,6 +4466,8 @@ components:
$ref: "#/components/schemas/WebhookConfigWrite"
organizationId:
$ref: "#/components/schemas/OrganizationId"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
WebhookConfigWrite:
type: object
properties:
Expand Down Expand Up @@ -4997,6 +5001,8 @@ components:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
SourceDefinitionUpdate:
type: object
description: Update the SourceDefinition. Currently, the only allowed attribute to update is the default docker image version.
Expand Down Expand Up @@ -5090,6 +5096,8 @@ components:
$ref: "#/components/schemas/ScopeType"
sourceDefinition:
$ref: "#/components/schemas/SourceDefinitionCreate"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
SourceDefinitionIdWithWorkspaceId:
type: object
required:
Expand Down Expand Up @@ -5280,6 +5288,8 @@ components:
secretId:
example: "airbyte_oauth_workspace_0509f049-d671-48cb-8105-0a23d47e6db6_secret_e0d38206-034e-4d75-9d21-da5a99b02826_v1"
type: string
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
SourceDiscoverSchemaRequestBody:
type: object
required:
Expand Down Expand Up @@ -5478,6 +5488,8 @@ components:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
DestinationDefinitionUpdate:
type: object
required:
Expand Down Expand Up @@ -5558,6 +5570,8 @@ components:
$ref: "#/components/schemas/ScopeId"
scopeType:
$ref: "#/components/schemas/ScopeType"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
DestinationDefinitionIdWithWorkspaceId:
type: object
required:
Expand Down Expand Up @@ -5657,6 +5671,8 @@ components:
$ref: "#/components/schemas/DestinationDefinitionId"
connectionConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
DestinationUpdate:
type: object
required:
Expand Down Expand Up @@ -5969,6 +5985,8 @@ components:
type: boolean
nonBreakingChangesPreference:
$ref: "#/components/schemas/NonBreakingChangesPreference"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
WebBackendConnectionCreate:
type: object
required:
Expand Down Expand Up @@ -6020,6 +6038,8 @@ components:
$ref: "#/components/schemas/Geography"
nonBreakingChangesPreference:
$ref: "#/components/schemas/NonBreakingChangesPreference"
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
ConnectionStateCreateOrUpdate:
type: object
required:
Expand Down Expand Up @@ -6687,6 +6707,8 @@ components:
type: boolean
orgLevelBilling:
type: boolean
idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
OrganizationRead:
type: object
required:
Expand Down Expand Up @@ -7038,7 +7060,8 @@ components:
news:
type: boolean
default: false

idempotencyKey:
$ref: "#/components/schemas/IdempotencyKey"
UserGetOrCreateByAuthIdResponse:
type: object
required:
Expand Down Expand Up @@ -9159,7 +9182,11 @@ components:
properties:
access_token:
type: string

IdempotencyKey:
type: string
format: uuid
description: |
An optional UUID that can be provided in a create request to prevent creating duplicates of the same resource
responses:
NotFoundResponse:
description: Object with given id was not found.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.33.015";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.33.016";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.50.4.001";
private static final String CDK_VERSION = "1.2.3";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.data.services.ConnectionService;
import io.airbyte.featureflag.CheckWithCatalog;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Workspace;
Expand Down Expand Up @@ -151,6 +152,7 @@ public class ConnectionsHandler {
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
private final ConnectorDefinitionSpecificationHandler connectorSpecHandler;
private final JobNotifier jobNotifier;
private final ConnectionService connectionService;
private final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
private final Integer maxFailedJobsInARowBeforeConnectionDisable;
private final int maxJobLookback = 10;
Expand All @@ -168,6 +170,7 @@ public ConnectionsHandler(
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final ConnectorDefinitionSpecificationHandler connectorSpecHandler,
final JobNotifier jobNotifier,
final ConnectionService connectionService,
@Value("${airbyte.server.connection.disable.max-days}") final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable,
@Value("${airbyte.server.connection.disable.max-jobs}") final Integer maxFailedJobsInARowBeforeConnectionDisable) {
this.jobPersistence = jobPersistence;
Expand All @@ -181,6 +184,7 @@ public ConnectionsHandler(
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
this.connectorSpecHandler = connectorSpecHandler;
this.jobNotifier = jobNotifier;
this.connectionService = connectionService;
this.maxDaysOfOnlyFailedJobsBeforeConnectionDisable = maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
this.maxFailedJobsInARowBeforeConnectionDisable = maxFailedJobsInARowBeforeConnectionDisable;
}
Expand Down Expand Up @@ -432,7 +436,13 @@ private boolean warningPreviouslySentForMaxDays(final int numFailures,

public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
throws JsonValidationException, IOException, ConfigNotFoundException {

final UUID idempotencyKey = connectionCreate.getIdempotencyKey();
if (idempotencyKey != null) {
final Optional<StandardSync> found = connectionService.getStandardSyncByIdempotencyKey(idempotencyKey);
if (found.isPresent()) {
return buildConnectionRead(found.get().getConnectionId());
}
}
// Validate source and destination
final SourceConnection sourceConnection = configRepository.getSourceConnection(connectionCreate.getSourceId());
final DestinationConnection destinationConnection = configRepository.getDestinationConnection(connectionCreate.getDestinationId());
Expand Down Expand Up @@ -469,6 +479,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
.withSourceCatalogId(connectionCreate.getSourceCatalogId())
.withGeography(getGeographyFromConnectionCreateOrWorkspace(connectionCreate))
.withBreakingChange(false)
.withIdempotencyKey(idempotencyKey)
.withNonBreakingChangesPreference(
ApiPojoConverters.toPersistenceNonBreakingChangesPreference(connectionCreate.getNonBreakingChangesPreference()));
if (connectionCreate.getResourceRequirements() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.specs.RemoteDefinitionsProvider;
import io.airbyte.data.services.DestinationService;
import io.airbyte.featureflag.DestinationDefinition;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.HideActorDefinitionFromList;
Expand All @@ -54,6 +55,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -68,6 +70,7 @@
public class DestinationDefinitionsHandler {

private final ConfigRepository configRepository;
private final DestinationService destinationService;
private final Supplier<UUID> uuidSupplier;
private final ActorDefinitionHandlerHelper actorDefinitionHandlerHelper;
private final RemoteDefinitionsProvider remoteDefinitionsProvider;
Expand All @@ -82,14 +85,16 @@ public DestinationDefinitionsHandler(final ConfigRepository configRepository,
final RemoteDefinitionsProvider remoteDefinitionsProvider,
final DestinationHandler destinationHandler,
final SupportStateUpdater supportStateUpdater,
final FeatureFlagClient featureFlagClient) {
final FeatureFlagClient featureFlagClient,
final DestinationService destinationService) {
this.configRepository = configRepository;
this.uuidSupplier = uuidSupplier;
this.actorDefinitionHandlerHelper = actorDefinitionHandlerHelper;
this.remoteDefinitionsProvider = remoteDefinitionsProvider;
this.destinationHandler = destinationHandler;
this.supportStateUpdater = supportStateUpdater;
this.featureFlagClient = featureFlagClient;
this.destinationService = destinationService;
}

@VisibleForTesting
Expand Down Expand Up @@ -233,6 +238,20 @@ public DestinationDefinitionRead getDestinationDefinitionForScope(final ActorDef

public DestinationDefinitionRead createCustomDestinationDefinition(final CustomDestinationDefinitionCreate customDestinationDefinitionCreate)
throws IOException {
final UUID idempotencyKey = customDestinationDefinitionCreate.getIdempotencyKey();
if (idempotencyKey != null) {
final Optional<StandardDestinationDefinition> found = destinationService.getStandardDestinationDefinitionByIdempotencyKey(idempotencyKey);
if (found.isPresent()) {
final StandardDestinationDefinition destinationDefinition = found.get();
final ActorDefinitionVersion actorDefinitionVersion;
try {
actorDefinitionVersion = configRepository.getActorDefinitionVersion(destinationDefinition.getDefaultVersionId());
return buildDestinationDefinitionRead(destinationDefinition, actorDefinitionVersion);
} catch (ConfigNotFoundException e) {
// not found, so must create
}
Comment on lines +250 to +252
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too happy with this. I suspect this is not a situation that you can get in, since the existence of the destdef implies it has at least one version (i would think). But i have to catch this due to buildDestinationDefinitionRead throwing it.

}
}
final UUID id = uuidSupplier.get();
final DestinationDefinitionCreate destinationDefCreate = customDestinationDefinitionCreate.getDestinationDefinition();
final ActorDefinitionVersion actorDefinitionVersion =
Expand Down
Loading
Loading