Skip to content

Commit

Permalink
fix: start rollout via workflow instead of update for automated rollo…
Browse files Browse the repository at this point in the history
…uts (#14905)
  • Loading branch information
clnoll committed Jan 6, 2025
1 parent acdace8 commit 30d94ad
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ internal class ConnectorRolloutHandlerTest {
val connectorRollout = createMockConnectorRollout(rolloutId, rolloutStrategy = null)

every { connectorRolloutService.listConnectorRollouts(any(), any()) } returns listOf(connectorRollout)
every { connectorRolloutClient.startRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.WORKFLOW_STARTED)
every { connectorRolloutClient.startRollout(any()) } just Runs
every { connectorRolloutService.getConnectorRollout(rolloutId) } returns connectorRollout
every { actorDefinitionService.getActorDefinitionVersion(any()) } returns createMockActorDefinitionVersion()
every {
Expand Down Expand Up @@ -1099,7 +1099,7 @@ internal class ConnectorRolloutHandlerTest {
// Rollout has been initialized, but workflow hasn't been started
connectorRollout.apply { this.state = ConnectorEnumRolloutState.INITIALIZED }

every { connectorRolloutClient.startRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.WORKFLOW_STARTED)
every { connectorRolloutClient.startRollout(any()) } just Runs
every { connectorRolloutClient.doRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.IN_PROGRESS)
every { connectorRolloutService.getConnectorRollout(rolloutId) } returns connectorRollout
every { actorDefinitionService.getActorDefinitionVersion(any()) } returns createMockActorDefinitionVersion()
Expand Down Expand Up @@ -1138,7 +1138,7 @@ internal class ConnectorRolloutHandlerTest {
every { connectorRolloutService.getConnectorRollout(any()) } returns rollout
every { actorDefinitionService.getActorDefinitionVersion(any()) } returns createMockActorDefinitionVersion()
if (initialState == ConnectorEnumRolloutState.INITIALIZED) {
every { connectorRolloutClient.startRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.WORKFLOW_STARTED)
every { connectorRolloutClient.startRollout(any()) } just Runs
every { rolloutActorFinder.getActorSelectionInfo(any(), any()) } returns ActorSelectionInfo(listOf(), 0, 0, 0, 0)
every { rolloutActorFinder.getSyncInfoForPinnedActors(any()) } returns emptyMap()
}
Expand Down Expand Up @@ -1178,7 +1178,7 @@ internal class ConnectorRolloutHandlerTest {
every { connectorRolloutService.getConnectorRollout(any()) } returns rollout
every { actorDefinitionService.getActorDefinitionVersion(any()) } returns createMockActorDefinitionVersion()
if (initialState == ConnectorEnumRolloutState.INITIALIZED) {
every { connectorRolloutClient.startRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.WORKFLOW_STARTED)
every { connectorRolloutClient.startRollout(any()) } just Runs
every { rolloutActorFinder.getActorSelectionInfo(any(), any()) } returns ActorSelectionInfo(listOf(), 0, 0, 0, 0)
every { rolloutActorFinder.getSyncInfoForPinnedActors(any()) } returns emptyMap()
}
Expand Down Expand Up @@ -1215,7 +1215,7 @@ internal class ConnectorRolloutHandlerTest {
// Rollout has been started
connectorRollout.apply { this.state = state }

every { connectorRolloutClient.startRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.WORKFLOW_STARTED)
every { connectorRolloutClient.startRollout(any()) } just Runs
every { connectorRolloutClient.pauseRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.PAUSED)
every { connectorRolloutService.getConnectorRollout(rolloutId) } returns connectorRollout
every { actorDefinitionService.getActorDefinitionVersion(any()) } returns createMockActorDefinitionVersion()
Expand Down Expand Up @@ -1251,7 +1251,7 @@ internal class ConnectorRolloutHandlerTest {
// Rollout has been initialized, but workflow hasn't been started
connectorRollout.apply { this.state = ConnectorEnumRolloutState.INITIALIZED }

every { connectorRolloutClient.startRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.WORKFLOW_STARTED)
every { connectorRolloutClient.startRollout(any()) } just Runs
every { connectorRolloutClient.pauseRollout(any()) } returns ConnectorRolloutOutput(state = ConnectorEnumRolloutState.PAUSED)
every { connectorRolloutService.getConnectorRollout(rolloutId) } returns connectorRollout
every { actorDefinitionService.getActorDefinitionVersion(any()) } returns createMockActorDefinitionVersion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.connector.rollout.client

import io.airbyte.config.ConnectorEnumRolloutStrategy
import io.airbyte.connector.rollout.shared.Constants
import io.airbyte.connector.rollout.shared.models.ConnectorRolloutActivityInputFinalize
import io.airbyte.connector.rollout.shared.models.ConnectorRolloutActivityInputPause
Expand Down Expand Up @@ -58,7 +59,7 @@ class ConnectorRolloutClient
}
}

fun startRollout(input: ConnectorRolloutWorkflowInput): ConnectorRolloutOutput {
fun startRollout(input: ConnectorRolloutWorkflowInput) {
logger.info { "ConnectorRolloutService.startWorkflow with input: id=${input.rolloutId} rolloutStrategy=${input.rolloutStrategy}" }
if (input.rolloutId == null) {
throw RuntimeException("Rollout ID is required to start a rollout workflow")
Expand All @@ -75,23 +76,25 @@ class ConnectorRolloutClient
.build(),
)

val connectorRolloutActivityInputStart =
ConnectorRolloutActivityInputStart(
input.dockerRepository,
input.dockerImageTag,
input.actorDefinitionId,
input.rolloutId,
input.updatedBy,
input.rolloutStrategy,
input.migratePins,
)

logger.info { "Starting workflow $workflowId" }
val workflowExecution = WorkflowClient.start(workflowStub::run, input)
logger.info { "Workflow $workflowId initialized with ID: ${workflowExecution.workflowId}" }
val startOutput = executeUpdate(connectorRolloutActivityInputStart, workflowId) { stub, i -> stub.startRollout(i) }
logger.info { "Rollout $workflowId started with ID: ${workflowExecution.workflowId}" }
return startOutput

if (input.rolloutStrategy == ConnectorEnumRolloutStrategy.MANUAL) {
val connectorRolloutActivityInputStart =
ConnectorRolloutActivityInputStart(
input.dockerRepository,
input.dockerImageTag,
input.actorDefinitionId,
input.rolloutId,
input.updatedBy,
input.rolloutStrategy,
input.migratePins,
)

executeUpdate(connectorRolloutActivityInputStart, workflowId) { stub, i -> stub.startRollout(i) }
logger.info { "Rollout $workflowId started with ID: ${workflowExecution.workflowId}" }
}
}

fun doRollout(input: ConnectorRolloutActivityInputRollout): ConnectorRolloutOutput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,18 @@ class ConnectorRolloutWorkflowImpl : ConnectorRolloutWorkflow {
val waitBetweenResultPollsSeconds = input.waitBetweenSyncResultsQueriesSeconds
val stepSizePercentage = rollout.initialRolloutPct ?: Constants.DEFAULT_INITIAL_ROLLOUT_PERCENTAGE

startRollout(
ConnectorRolloutActivityInputStart(
input.dockerRepository,
input.dockerImageTag,
input.actorDefinitionId,
input.rolloutId,
input.updatedBy,
input.rolloutStrategy,
input.migratePins,
),
)

// Continuously manage the rollout until we reach a terminal state, the workflow is paused, or the rollout expires.
// The loop performs the following steps:
// 1. If it's time to advance the rollout (based on `nextRolloutTime`), increase the rollout percentage and attempt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class ConnectorRolloutWorkflowImplTest {
.numActorsEligibleOrAlreadyPinned(0),
)

`when`(
startRolloutActivity.startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject()),
).thenReturn(getMockOutput(ConnectorEnumRolloutState.WORKFLOW_STARTED))
`when`(getRolloutActivity.getRollout(MockitoHelper.anyObject())).thenReturn(insufficientDataConnectorRolloutOutput)

// Run workflow
Expand All @@ -223,6 +226,7 @@ class ConnectorRolloutWorkflowImplTest {
}
assertNotNull(failure)

verify(startRolloutActivity).startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject())
verify(getRolloutActivity).getRollout(MockitoHelper.anyObject())
verify(verifyDefaultVersionActivity, Mockito.never()).getAndVerifyDefaultVersion(MockitoHelper.anyObject())
verify(promoteOrRollbackActivity, Mockito.never()).promoteOrRollback(MockitoHelper.anyObject())
Expand Down Expand Up @@ -268,6 +272,9 @@ class ConnectorRolloutWorkflowImplTest {
actorSelectionInfo = successActorSelectionInfo,
)

`when`(
startRolloutActivity.startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject()),
).thenReturn(getMockOutput(ConnectorEnumRolloutState.WORKFLOW_STARTED))
`when`(getRolloutActivity.getRollout(MockitoHelper.anyObject())).thenReturn(successConnectorRolloutOutput)
`when`(verifyDefaultVersionActivity.getAndVerifyDefaultVersion(MockitoHelper.anyObject()))
.thenReturn(ConnectorRolloutActivityOutputVerifyDefaultVersion(true))
Expand All @@ -282,6 +289,7 @@ class ConnectorRolloutWorkflowImplTest {
val result = workflowById.getResult(String::class.java)
assertEquals(ConnectorEnumRolloutState.SUCCEEDED.toString(), result)

verify(startRolloutActivity).startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject())
verify(getRolloutActivity).getRollout(MockitoHelper.anyObject())
verify(verifyDefaultVersionActivity).getAndVerifyDefaultVersion(MockitoHelper.anyObject())
verify(promoteOrRollbackActivity).promoteOrRollback(MockitoHelper.anyObject())
Expand Down Expand Up @@ -344,6 +352,9 @@ class ConnectorRolloutWorkflowImplTest {
.numActorsEligibleOrAlreadyPinned(1),
)

`when`(
startRolloutActivity.startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject()),
).thenReturn(getMockOutput(ConnectorEnumRolloutState.WORKFLOW_STARTED))
`when`(getRolloutActivity.getRollout(MockitoHelper.anyObject())).thenReturn(failureConnectorRolloutOutput)
`when`(pauseRolloutActivity.pauseRollout(MockitoHelper.anyObject())).thenReturn(pausedConnectorRolloutOutput)

Expand All @@ -363,6 +374,7 @@ class ConnectorRolloutWorkflowImplTest {
}
assertNotNull(failure)

verify(startRolloutActivity).startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject())
verify(getRolloutActivity).getRollout(MockitoHelper.anyObject())
verify(pauseRolloutActivity).pauseRollout(MockitoHelper.anyObject())
verify(verifyDefaultVersionActivity, Mockito.never()).getAndVerifyDefaultVersion(MockitoHelper.anyObject())
Expand Down Expand Up @@ -412,6 +424,9 @@ class ConnectorRolloutWorkflowImplTest {
.numActorsEligibleOrAlreadyPinned(1),
)

`when`(
startRolloutActivity.startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject()),
).thenReturn(getMockOutput(ConnectorEnumRolloutState.WORKFLOW_STARTED))
if (exceptionType == ApplicationFailure::class.java) {
`when`(doRolloutActivity.doRollout(MockitoHelper.anyObject()))
.thenThrow(ApplicationFailure.newFailure("Simulated ApplicationFailure", "TestFailure"))
Expand All @@ -437,6 +452,7 @@ class ConnectorRolloutWorkflowImplTest {
}
assertNotNull(failure)

verify(startRolloutActivity).startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject())
verify(pauseRolloutActivity).pauseRollout(MockitoHelper.anyObject())
verify(verifyDefaultVersionActivity, Mockito.never()).getAndVerifyDefaultVersion(MockitoHelper.anyObject())
verify(promoteOrRollbackActivity, Mockito.never()).promoteOrRollback(MockitoHelper.anyObject())
Expand Down Expand Up @@ -467,8 +483,9 @@ class ConnectorRolloutWorkflowImplTest {
),
)

`when`(startRolloutActivity.startRollout(Mockito.anyString(), MockitoHelper.anyObject()))
.thenReturn(getMockOutput(ConnectorEnumRolloutState.WORKFLOW_STARTED))
`when`(
startRolloutActivity.startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject()),
).thenReturn(getMockOutput(ConnectorEnumRolloutState.WORKFLOW_STARTED))
if (finalState != ConnectorRolloutFinalState.CANCELED) {
`when`(
promoteOrRollbackActivity.promoteOrRollback(MockitoHelper.anyObject()),
Expand Down Expand Up @@ -511,7 +528,7 @@ class ConnectorRolloutWorkflowImplTest {
val result = workflowById.getResult(String::class.java)
assertEquals(finalState.toString(), result)

verify(startRolloutActivity).startRollout(Mockito.anyString(), MockitoHelper.anyObject())
verify(startRolloutActivity).startRollout(MockitoHelper.anyObject(), MockitoHelper.anyObject())
if (finalState != ConnectorRolloutFinalState.CANCELED) {
verify(promoteOrRollbackActivity).promoteOrRollback(MockitoHelper.anyObject())
}
Expand Down

0 comments on commit 30d94ad

Please sign in to comment.