Skip to content

Commit

Permalink
fix worker-queue delete/update error (#1546)
Browse files Browse the repository at this point in the history
* fix worker-queue delete/update error

* fix update command

* remove change

* remove print statment

* fix lint
  • Loading branch information
sunkickr authored and kushalmalani committed Feb 13, 2024
1 parent 94d2f64 commit ff7085a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
6 changes: 3 additions & 3 deletions cloud/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,9 @@ func Create(name, workspaceID, description, clusterID, runtimeVersion, dagDeploy
var requestedExecutor astroplatformcore.CreateDedicatedDeploymentRequestExecutor
if strings.EqualFold(executor, CeleryExecutor) || strings.EqualFold(executor, CELERY) {
requestedExecutor = astroplatformcore.CreateDedicatedDeploymentRequestExecutorCELERY
fmt.Println(requestedExecutor)
}
if strings.EqualFold(executor, KubeExecutor) || strings.EqualFold(executor, KUBERNETES) {
requestedExecutor = astroplatformcore.CreateDedicatedDeploymentRequestExecutorKUBERNETES
fmt.Println(requestedExecutor)
}
dedicatedDeploymentRequest := astroplatformcore.CreateDedicatedDeploymentRequest{
AstroRuntimeVersion: runtimeVersion,
Expand All @@ -393,7 +391,6 @@ func Create(name, workspaceID, description, clusterID, runtimeVersion, dagDeploy
if strings.EqualFold(executor, CeleryExecutor) || strings.EqualFold(executor, CELERY) {
dedicatedDeploymentRequest.WorkerQueues = &defautWorkerQueue
}
fmt.Println(dedicatedDeploymentRequest.Executor)
switch schedulerSize {
case SmallScheduler:
dedicatedDeploymentRequest.SchedulerSize = astroplatformcore.CreateDedicatedDeploymentRequestSchedulerSizeSMALL
Expand Down Expand Up @@ -799,6 +796,9 @@ func Update(deploymentID, name, ws, description, deploymentName, dagDeploy, exec
queueCreateUpdate = true
deploymentEnvironmentVariablesRequest = newEnvironmentVariables
}
if deploymentEnvironmentVariablesRequest == nil {
deploymentEnvironmentVariablesRequest = []astroplatformcore.DeploymentEnvironmentVariableRequest{}
}
if IsDeploymentStandard(*currentDeployment.Type) || IsDeploymentDedicated(*currentDeployment.Type) {
var workerQueuesRequest []astroplatformcore.WorkerQueueRequest
if currentDeployment.WorkerQueues != nil {
Expand Down
9 changes: 7 additions & 2 deletions cloud/deployment/workerqueue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
errCannotDeleteDefaultQueue = errors.New("default queue can not be deleted")
ErrNotSupported = errors.New("does not support")
errNoUseWorkerQueues = errors.New("don't use 'worker_queues' to update default queue with KubernetesExecutor, use 'default_task_pod_cpu' and 'default_task_pod_memory' instead")
errNoWorkerQueues = errors.New("no worker queues found for this deployment")
)

// CreateOrUpdate creates a new worker queue or updates an existing worker queue for a deployment.
Expand Down Expand Up @@ -612,9 +613,13 @@ func selectQueue(queueListIndex *[]astroplatformcore.WorkerQueue, out io.Writer)
errToReturn error
queueName, message string
queueToDelete astroplatformcore.WorkerQueue
queueList []astroplatformcore.WorkerQueue
)

queueList := *queueListIndex
if queueListIndex != nil {
queueList = *queueListIndex
} else {
return "", errNoWorkerQueues
}

tab := printutil.Table{
Padding: []int{5, 30, 20, 50},
Expand Down
5 changes: 5 additions & 0 deletions cloud/deployment/workerqueue/workerqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,11 @@ func TestSelectQueue(t *testing.T) {
assert.ErrorIs(t, err, errInvalidQueue)
assert.Equal(t, "", queueToDelete)
})
t.Run("errors if there are no worker queues", func(t *testing.T) {
queueToDelete, err = selectQueue(nil, out)
assert.ErrorIs(t, err, errNoWorkerQueues)
assert.Equal(t, "", queueToDelete)
})
}

func TestUpdateQueueList(t *testing.T) {
Expand Down

0 comments on commit ff7085a

Please sign in to comment.