Skip to content

Commit

Permalink
Add airflow 2.0 min requirements (#395)
Browse files Browse the repository at this point in the history
* Add airflow 2.0 min requirements

* Fix AirflowVersionTwo variable name
  • Loading branch information
Adam Vandover committed Dec 9, 2020
1 parent db885e9 commit 70ef968
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
55 changes: 48 additions & 7 deletions deployment/deployment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deployment

import (
"errors"
"fmt"
"io"
"sort"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/astronomer/astro-cli/houston"
"github.com/astronomer/astro-cli/pkg/input"
"github.com/astronomer/astro-cli/pkg/printutil"
"github.com/astronomer/astro-cli/settings"
"github.com/fatih/camelcase"
)

Expand Down Expand Up @@ -199,14 +201,24 @@ func Update(id, cloudRole string, args map[string]string, client *houston.Client

// Upgrade airflow deployment
func AirflowUpgrade(id, desiredAirflowVersion string, client *houston.Client, out io.Writer) error {
deployment, err := getDeployment(id, client)
if err != nil {
return err
}

if desiredAirflowVersion == "" {
selectedVersion, err := getAirflowVersionSelection(id, client, out)
selectedVersion, err := getAirflowVersionSelection(deployment.AirflowVersion, client, out)
if err != nil {
return err
}
desiredAirflowVersion = selectedVersion
}

err = meetsAirflowUpgradeReqs(deployment.AirflowVersion, desiredAirflowVersion)
if err != nil {
return err
}

vars := map[string]interface{}{"deploymentId": id, "desiredAirflowVersion": desiredAirflowVersion}

req := houston.Request{
Expand Down Expand Up @@ -266,12 +278,8 @@ func AirflowUpgradeCancel(id string, client *houston.Client, out io.Writer) erro
return nil
}

func getAirflowVersionSelection(deploymentId string, client *houston.Client, out io.Writer) (string, error) {
deployment, err := getDeployment(deploymentId, client)
if err != nil {
return "", err
}
currentAirflowVersion, err := semver.NewVersion(deployment.AirflowVersion)
func getAirflowVersionSelection(airflowVersion string, client *houston.Client, out io.Writer) (string, error) {
currentAirflowVersion, err := semver.NewVersion(airflowVersion)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -331,3 +339,36 @@ func getDeployment(deploymentId string, client *houston.Client) (*houston.Deploy

return &r.Data.GetDeployment, nil
}

func meetsAirflowUpgradeReqs(airflowVersion string, desiredAirflowVersion string) error {
upgradeVersion := strconv.FormatUint(settings.AirflowVersionTwo, 10)
minRequiredVersion := "1.10.14"
airflowUpgradeVersion, err := semver.NewVersion(upgradeVersion)
if err != nil {
return err
}

desiredVersion, err := semver.NewVersion(desiredAirflowVersion)
if err != nil {
return err
}

if airflowUpgradeVersion.Compare(desiredVersion) < 1 {
minUpgrade, err := semver.NewVersion(minRequiredVersion)
if err != nil {
return err
}

currentVersion, err := semver.NewVersion(airflowVersion)
if err != nil {
return err
}

if currentVersion.Compare(minUpgrade) < 0 {
errorMessage := fmt.Sprintf("Airflow 2.0 has breaking changes. To upgrade to Airflow 2.0, upgrade to %s first and make sure your DAGs and configs are 2.0 compatible", minRequiredVersion)
return errors.New(errorMessage)
}
}

return nil
}
19 changes: 17 additions & 2 deletions deployment/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ func Test_getDeploymentError(t *testing.T) {
}

func Test_getAirflowVersionSelection(t *testing.T) {
deploymentId := "ckggzqj5f4157qtc9lescmehm"
testUtil.InitTestConfig()
okResponse := `{"data": {
"deployment": {
Expand Down Expand Up @@ -521,7 +520,7 @@ func Test_getAirflowVersionSelection(t *testing.T) {
defer func() { os.Stdin = stdin }()
os.Stdin = r

airflowVersion, err := getAirflowVersionSelection(deploymentId, api, buf)
airflowVersion, err := getAirflowVersionSelection("1.10.7", api, buf)
assert.NoError(t, err)
assert.Equal(t, airflowVersion, "1.10.12")
}
Expand All @@ -542,3 +541,19 @@ func Test_getAirflowVersionSelectionError(t *testing.T) {
assert.Error(t, err, "API error (500):")
assert.Equal(t, airflowVersion, "")
}

func Test_meetsAirflowUpgradeReqs(t *testing.T) {
airflowVersion := "1.10.12"
desiredAirflowVersion := "2.0.0"
err := meetsAirflowUpgradeReqs(airflowVersion, desiredAirflowVersion)
assert.Error(t, err)

airflowVersion = "1.10.14"
err = meetsAirflowUpgradeReqs(airflowVersion, desiredAirflowVersion)
assert.NoError(t, err)

airflowVersion = "1.10.7"
desiredAirflowVersion = "1.10.10"
err = meetsAirflowUpgradeReqs(airflowVersion, desiredAirflowVersion)
assert.NoError(t, err)
}
4 changes: 2 additions & 2 deletions settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
settings Config

// Version 2.0.0
newAirflowVersion uint64 = 2
AirflowVersionTwo uint64 = 2
)

// ConfigSettings is the main builder of the settings package
Expand Down Expand Up @@ -80,7 +80,7 @@ func AddVariables(id string, version uint64) {
if objectValidator(0, variable.VariableValue) {

baseCmd := "airflow variables "
if version >= newAirflowVersion {
if version >= AirflowVersionTwo {
baseCmd += "set %s " // Airflow 2.0.0 command
} else {
baseCmd += "-s %s " // Airflow 1.0.0 command
Expand Down

0 comments on commit 70ef968

Please sign in to comment.