Skip to content

Commit

Permalink
Fix Upgrade Test (#1385)
Browse files Browse the repository at this point in the history
* fix runtime

* update parse test

* update mocks

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix lint

* fix test

* add tests

* add tests

* fix lint

* add test

* fix test

* fix test

* add comment

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and kushalmalani committed Sep 12, 2023
1 parent 9a02d31 commit ff31806
Show file tree
Hide file tree
Showing 21 changed files with 395 additions and 92 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ requirements.txt
sagemaker-batch-inference.py
testbin/
testbin/golangci-lint
upgrade-test*
vendor/github.com/theupdateframework/notary/Dockerfile
11 changes: 5 additions & 6 deletions airflow-client/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions airflow/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ImageHandler interface {
Push(registry, username, token, remoteImage string) error
Pull(registry, username, token, remoteImage string) error
GetLabel(altImageName, labelName string) (string, error)
DoesImageExist(image string) error
ListLabels() (map[string]string, error)
TagLocalImage(localImage string) error
Run(dagID, envFile, settingsFile, containerName, dagFile, executionDate string, taskLogs bool) error
Expand Down
54 changes: 48 additions & 6 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ const (
)

var (
errNoFile = errors.New("file specified does not exist")
errSettingsPath = "error looking for settings.yaml"
errComposeProjectRunning = errors.New("project is up and running")
errNoFile = errors.New("file specified does not exist")
errSettingsPath = "error looking for settings.yaml"
errComposeProjectRunning = errors.New("project is up and running")
errCustomImageDoesNotExist = errors.New("The custom image provided either does not exist or Docker is unable to connect to the repository")

initSettings = settings.ConfigSettings
exportSettings = settings.Export
Expand Down Expand Up @@ -540,8 +541,16 @@ func (d *DockerCompose) UpgradeTest(newAirflowVersion, deploymentID, newImageNam
versionTest = true
dagTest = true
}
// if user supplies deployment id pull down current image
var deploymentImage string
// if custom image is used get new Airflow version
if customImage != "" {
err := d.imageHandler.DoesImageExist(customImage)
if err != nil {
return errCustomImageDoesNotExist
}
newAirflowVersion = strings.SplitN(customImage, ":", partsNum)[1]
}
// if user supplies deployment id pull down current image
if deploymentID != "" {
err := d.pullImageFromDeployment(deploymentID, client)
if err != nil {
Expand Down Expand Up @@ -747,7 +756,7 @@ func (d *DockerCompose) dagTest(testHomeDirectory, newAirflowVersion, newDockerF
// create html report
htmlReportArgs := "--html=dag-test-report.html --self-contained-html"
// compare pip freeze files
fmt.Println("\nRunning parse test")
fmt.Println("\nRunning DAG parse test with the new Airflow version")
exitCode, err := d.imageHandler.Pytest(pytestFile, d.airflowHome, d.envFile, testHomeDirectory, strings.Fields(htmlReportArgs), true, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
if err != nil {
if strings.Contains(exitCode, "1") { // exit code is 1 meaning tests failed
Expand All @@ -771,7 +780,7 @@ func GetRegistryURL(domain string) string {
return registry
}

func upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, newImage string) error {
func upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, newImage string) error { //nolint:gocognit
// Read the content of the old Dockerfile
content, err := os.ReadFile(oldDockerfilePath)
if err != nil {
Expand All @@ -789,6 +798,25 @@ func upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, newImage st
line = parts[0] + ":" + newTag
}
}
if strings.HasPrefix(strings.TrimSpace(line), "FROM quay.io/astronomer/ap-airflow:") {
isRuntime, err := isRuntimeVersion(newTag)
if err != nil {
logrus.Debug(err)
}
if isRuntime {
// Replace the tag on the matching line
parts := strings.SplitN(line, "/", partsNum)
if len(parts) >= partsNum {
line = parts[0] + "/astronomer/astro-runtime:" + newTag
}
} else {
// Replace the tag on the matching line
parts := strings.SplitN(line, ":", partsNum)
if len(parts) == partsNum {
line = parts[0] + ":" + newTag
}
}
}
newContent.WriteString(line + "\n") // Add a newline after each line
}
} else {
Expand All @@ -813,6 +841,20 @@ func upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, newImage st
return nil
}

func isRuntimeVersion(versionStr string) (bool, error) {
// Parse the version string
v, err := semver.NewVersion(versionStr)
if err != nil {
return false, err
}

// Runtime versions start 4.0.0 to not get confused with Airflow versions that are currently in 2.X.X
referenceVersion := semver.MustParse("4.0.0")

// Compare the parsed version with the reference version
return v.Compare(referenceVersion) > 0, nil
}

func CreateVersionTestFile(beforeFile, afterFile, outputFile string) error {
// Open the before file for reading
before, err := os.Open(beforeFile)
Expand Down
12 changes: 12 additions & 0 deletions airflow/docker_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,18 @@ func (d *DockerImage) GetLabel(altImageName, labelName string) (string, error) {
return label, nil
}

func (d *DockerImage) DoesImageExist(image string) error {
dockerCommand := config.CFG.DockerCommand.GetString()
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)

err := cmdExec(dockerCommand, stdout, stderr, "manifest", "inspect", image)
if err != nil {
return err
}
return nil
}

func (d *DockerImage) ListLabels() (map[string]string, error) {
dockerCommand := config.CFG.DockerCommand.GetString()

Expand Down
30 changes: 30 additions & 0 deletions airflow/docker_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,36 @@ func TestDockerImageListLabel(t *testing.T) {
})
}

func TestDoesImageExist(t *testing.T) {
handler := DockerImage{
imageName: "testing",
}
testImage := "image"

previousCmdExec := cmdExec
defer func() { cmdExec = previousCmdExec }()

t.Run("success", func(t *testing.T) {
cmdExec = func(cmd string, stdout, stderr io.Writer, args ...string) error {
assert.Contains(t, args, "inspect")
return nil
}

err := handler.DoesImageExist(testImage)
assert.NoError(t, err)
})

t.Run("cmdExec error", func(t *testing.T) {
cmdExec = func(cmd string, stdout, stderr io.Writer, args ...string) error {
assert.Contains(t, args, "inspect")
return errMockDocker
}

err := handler.DoesImageExist(testImage)
assert.ErrorIs(t, err, errMockDocker)
})
}

func TestDockerTagLocalImage(t *testing.T) {
handler := DockerImage{
imageName: "testing",
Expand Down
117 changes: 117 additions & 0 deletions airflow/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,3 +1979,120 @@ func TestCreateDockerProject(t *testing.T) {
assert.Equal(t, 5433, int(postgresService.Ports[len(prj.Services[0].Ports)-1].Published))
})
}

func TestUpgradeDockerfile(t *testing.T) {
t.Run("update Dockerfile with new tag", func(t *testing.T) {
// Create a temporary old Dockerfile
oldDockerfilePath := "test_old_Dockerfile"
oldContent := "FROM quay.io/astronomer/astro-runtime:old-tag\n"
err := os.WriteFile(oldDockerfilePath, []byte(oldContent), 0o644)
assert.NoError(t, err)
defer os.Remove(oldDockerfilePath)

// Define test data
newDockerfilePath := "test_new_Dockerfile"
newTag := "new-tag"

// Call the function
err = upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, "")
defer os.Remove(newDockerfilePath)

// Check for errors
assert.NoError(t, err)

// Read the new Dockerfile and check its content
newContent, err := os.ReadFile(newDockerfilePath)
assert.NoError(t, err)
assert.Contains(t, string(newContent), "FROM quay.io/astronomer/astro-runtime:new-tag")
})

t.Run("update Dockerfile with new image", func(t *testing.T) {
// Create a temporary old Dockerfile
oldDockerfilePath := "test_old_Dockerfile"
oldContent := "FROM quay.io/astronomer/astro-runtime:old-tag\n"
err := os.WriteFile(oldDockerfilePath, []byte(oldContent), 0o644)
assert.NoError(t, err)
defer os.Remove(oldDockerfilePath)

// Define test data
newDockerfilePath := "test_new_Dockerfile"
newImage := "new-image"

// Call the function
err = upgradeDockerfile(oldDockerfilePath, newDockerfilePath, "", newImage)
defer os.Remove(newDockerfilePath)

// Check for errors
assert.NoError(t, err)

// Read the new Dockerfile and check its content
newContent, err := os.ReadFile(newDockerfilePath)
assert.NoError(t, err)
assert.Contains(t, string(newContent), "FROM new-image")
})

t.Run("update Dockerfile for ap-airflow with runtime version", func(t *testing.T) {
// Create a temporary old Dockerfile with a line matching the pattern
oldDockerfilePath := "test_old_Dockerfile"
oldContent := "FROM quay.io/astronomer/ap-airflow:old-tag"
err := os.WriteFile(oldDockerfilePath, []byte(oldContent), 0o644)
assert.NoError(t, err)
defer os.Remove(oldDockerfilePath)

// Define test data
newDockerfilePath := "test_new_Dockerfile"
newTag := "5.0.0"

// Call the function
err = upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, "")
defer os.Remove(newDockerfilePath)

// Check for errors
assert.NoError(t, err)

// Read the new Dockerfile and check its content
newContent, err := os.ReadFile(newDockerfilePath)
assert.NoError(t, err)
assert.Contains(t, string(newContent), "FROM quay.io/astronomer/astro-runtime:5.0.0\n")
})

t.Run("update Dockerfile for ap-airflow with non-runtime version", func(t *testing.T) {
// Create a temporary old Dockerfile with a line matching the pattern
oldDockerfilePath := "test_old_Dockerfile"
oldContent := "FROM quay.io/astronomer/ap-airflow:old-tag\n"
err := os.WriteFile(oldDockerfilePath, []byte(oldContent), 0o644)
assert.NoError(t, err)
defer os.Remove(oldDockerfilePath)

// Define test data
newDockerfilePath := "test_new_Dockerfile"
newTag := "new-tag"

// Call the function
err = upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, "")
defer os.Remove(newDockerfilePath)

// Check for errors
assert.NoError(t, err)

// Read the new Dockerfile and check its content
newContent, err := os.ReadFile(newDockerfilePath)
assert.NoError(t, err)
assert.Contains(t, string(newContent), "FROM quay.io/astronomer/ap-airflow:new-tag")
})

t.Run("error reading old Dockerfile", func(t *testing.T) {
// Define test data with an invalid path
oldDockerfilePath := "non_existent_Dockerfile"
newDockerfilePath := "test_new_Dockerfile"
newTag := "new-tag"

// Call the function
err := upgradeDockerfile(oldDockerfilePath, newDockerfilePath, newTag, "")
defer os.Remove(newDockerfilePath)

// Check for errors
assert.Error(t, err)
assert.Contains(t, err.Error(), "no such file or directory")
})
}
28 changes: 21 additions & 7 deletions airflow/include/dagintegritytestdefault.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,38 @@ def suppress_logging(namespace):

def get_import_errors():
"""
Generate a tuple for import errors in the dag bag
Generate a tuple for import errors in the dag bag, and include DAGs without errors.
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)

def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))

# prepend "(None,None)" to ensure that a test object is always created even if it's a no op.
return [(None, None)] + [
(strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items()
]
# Initialize an empty list to store the tuples
result = []

# Iterate over the items in import_errors
for k, v in dag_bag.import_errors.items():
result.append((strip_path_prefix(k), v.strip()))

# Check if there are DAGs without errors
for file_path in dag_bag.dags:
# Check if the file_path is not in import_errors, meaning no errors
if file_path not in dag_bag.import_errors:
result.append((strip_path_prefix(file_path), "No import errors"))

return result


@pytest.mark.parametrize(
"rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]
"rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]
)
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if rel_path and rv: # Make sure our no op test doesn't raise an error
if rv != "No import errors":
# If rv is not "No import errors," consider it a failed test
raise Exception(f"{rel_path} failed to import with message \n {rv}")
else:
# If rv is "No import errors," consider it a passed test
print(f"{rel_path} passed the import test")
11 changes: 5 additions & 6 deletions airflow/mocks/ContainerHandler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ff31806

Please sign in to comment.