Skip to content

Commit

Permalink
Fix/airflow settings update (#388)
Browse files Browse the repository at this point in the history
* Check for airflow version to change command

* WIP Test case

* Update tests

* Add EOF line
  • Loading branch information
Adam Vandover authored Nov 23, 2020
1 parent fa4e5d6 commit 9fd909d
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 8 deletions.
28 changes: 26 additions & 2 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"text/tabwriter"

"github.com/Masterminds/semver"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"

Expand Down Expand Up @@ -201,7 +202,7 @@ func deploymentNameExists(name string, deployments []houston.Deployment) bool {
}

// Start starts a local airflow development cluster
func Start(airflowHome string, envFile string) error {
func Start(airflowHome string, dockerfile string, envFile string) error {
// Get project name from config
projectName, err := projectNameUnique()
replacer := strings.NewReplacer("_", "", "-", "")
Expand All @@ -211,6 +212,11 @@ func Start(airflowHome string, envFile string) error {
return errors.Wrap(err, "error retrieving working directory")
}

airflowDockerVersion, err := airflowVersionFromDockerFile(airflowHome, dockerfile)
if err != nil {
return errors.Wrap(err, "error parsing airflow version from dockerfile")
}

// Create a libcompose project
project, err := createProject(projectName, airflowHome, envFile)
if err != nil {
Expand Down Expand Up @@ -270,7 +276,7 @@ func Start(airflowHome string, envFile string) error {
for _, info := range psInfo {
if strings.Contains(info["Name"], strippedProjectName) &&
strings.Contains(info["Name"], "webserver") {
settings.ConfigSettings(info["Id"])
settings.ConfigSettings(info["Id"], airflowDockerVersion)
}
}
}
Expand Down Expand Up @@ -666,3 +672,21 @@ func validImageRepo(image string) bool {
}
return result
}

func airflowVersionFromDockerFile(airflowHome string, dockerfile string) (uint64, error) {
// parse dockerfile
cmd, err := docker.ParseFile(filepath.Join(airflowHome, dockerfile))
if err != nil {
return 0, errors.Wrapf(err, "failed to parse dockerfile: %s", filepath.Join(airflowHome, dockerfile))
}

_, airflowTag := docker.GetImageTagFromParsedFile(cmd)

semVer, err := semver.NewVersion(airflowTag)

if err != nil {
return 0, errors.Wrapf(err, "failed to parse dockerfile Airflow tag: %s", airflowTag)
}

return semVer.Major(), nil
}
35 changes: 34 additions & 1 deletion airflow/docker_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package airflow

import (
"testing"

"github.com/astronomer/astro-cli/config"
testUtils "github.com/astronomer/astro-cli/pkg/testing"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"testing"

"github.com/astronomer/astro-cli/houston"
)
Expand Down Expand Up @@ -157,3 +158,35 @@ func Test_validImageRepo(t *testing.T) {
assert.True(t, validImageRepo("astronomerinc/ap-airflow"))
assert.False(t, validImageRepo("personal-repo/ap-airflow"))
}

func Test_airflowVersionFromDockerFile(t *testing.T) {
airflowHome := config.WorkingPath + "/testfiles"

// Version 1
expected := uint64(0x1)
dockerfile := "Dockerfile.Airflow1.ok"
version, err := airflowVersionFromDockerFile(airflowHome, dockerfile)

assert.NoError(t, err)
assert.Equal(t, expected, version)

// Version 2
expected = uint64(0x2)
dockerfile = "Dockerfile.Airflow2.ok"
version, err = airflowVersionFromDockerFile(airflowHome, dockerfile)

assert.NoError(t, err)
assert.Equal(t, expected, version)

// Invalid Dockerfile
dockerfile = "Dockerfile.not.real"
version, err = airflowVersionFromDockerFile(airflowHome, dockerfile)

assert.Error(t, err)

// Invalid Airflow Tag
dockerfile = "Dockerfile.tag.invalid"
version, err = airflowVersionFromDockerFile(airflowHome, dockerfile)

assert.Error(t, err)
}
1 change: 1 addition & 0 deletions airflow/testfiles/Dockerfile.Airflow1.ok
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM quay.io/astronomer/ap-airflow-dev:1.10.12-alpine3.10-onbuild-23620
1 change: 1 addition & 0 deletions airflow/testfiles/Dockerfile.Airflow2.ok
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM quay.io/astronomer/ap-airflow-dev:2.0.0-1.beta3-buster-onbuild
1 change: 1 addition & 0 deletions airflow/testfiles/Dockerfile.tag.invalid
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM quay.io/astronomer/ap-airflow-dev:1.10.12-THIS_IS_INVALID
3 changes: 2 additions & 1 deletion cmd/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func airflowInit(cmd *cobra.Command, args []string, client *houston.Client, out

// Start an airflow cluster
func airflowStart(cmd *cobra.Command, args []string) error {
dockerfile := "Dockerfile"
// Silence Usage as we have now validated command input
cmd.SilenceUsage = true

Expand All @@ -321,7 +322,7 @@ func airflowStart(cmd *cobra.Command, args []string) error {
envFile = args[0]
}

return airflow.Start(config.WorkingPath, envFile)
return airflow.Start(config.WorkingPath, dockerfile, envFile)
}

// Kill an airflow cluster
Expand Down
18 changes: 14 additions & 4 deletions settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ var (
viperSettings *viper.Viper

settings Config

// Version 2.0.0
newAirflowVersion uint64 = 2
)

// ConfigSettings is the main builder of the settings package
func ConfigSettings(id string) {
func ConfigSettings(id string, version uint64) {
InitSettings()
AddConnections(id)
AddPools(id)
AddVariables(id)
AddVariables(id, version)
}

// InitSettings initializes settings file
Expand Down Expand Up @@ -65,7 +68,7 @@ func InitSettings() {
}

// AddVariables is a function to add Variables from settings.yaml
func AddVariables(id string) {
func AddVariables(id string, version uint64) {
variables := settings.Airflow.Variables

for _, variable := range variables {
Expand All @@ -76,7 +79,14 @@ func AddVariables(id string) {
} else {
if objectValidator(0, variable.VariableValue) {

airflowCommand := fmt.Sprintf("airflow variables -s %s ", variable.VariableName)
baseCmd := "airflow variables "
if version >= newAirflowVersion {
baseCmd += "set %s " // Airflow 2.0.0 command
} else {
baseCmd += "-s %s " // Airflow 1.0.0 command
}

airflowCommand := fmt.Sprintf(baseCmd, variable.VariableName)

airflowCommand += fmt.Sprintf("'%s'", variable.VariableValue)

Expand Down

0 comments on commit 9fd909d

Please sign in to comment.