diff --git a/docker/docker.go b/docker/docker.go index 04e565ffc..07f80f210 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" + "github.com/astronomer/astro-cli/airflow/runtimes" "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/stdcopy" "github.com/pkg/errors" @@ -17,8 +18,12 @@ const ( ) // AirflowCommand is the main method of interaction with Airflow -func AirflowCommand(id, airflowCommand string) string { - cmd := exec.Command("docker", "exec", "-it", id, "bash", "-c", airflowCommand) +func AirflowCommand(id, airflowCommand string) (string, error) { + containerRuntime, err := runtimes.GetContainerRuntimeBinary() + if err != nil { + return "", err + } + cmd := exec.Command(containerRuntime, "exec", "-it", id, "bash", "-c", airflowCommand) cmd.Stdin = os.Stdin cmd.Stderr = os.Stderr @@ -28,7 +33,7 @@ func AirflowCommand(id, airflowCommand string) string { } stringOut := string(out) - return stringOut + return stringOut, nil } // ExecPipe does pipe stream into stdout/stdin and stderr diff --git a/docker/docker_test.go b/docker/docker_test.go index 24c929507..dc5fea5d7 100644 --- a/docker/docker_test.go +++ b/docker/docker_test.go @@ -4,9 +4,13 @@ import ( "bufio" "bytes" "fmt" + "os" "strings" "testing" + "github.com/astronomer/astro-cli/config" + testUtil "github.com/astronomer/astro-cli/pkg/testing" + "github.com/docker/docker/api/types" "github.com/stretchr/testify/suite" ) @@ -29,8 +33,22 @@ func (s *Suite) TestExecPipe() { } func (s *Suite) TestAirflowCommand() { + testUtil.InitTestConfig(testUtil.LocalPlatform) s.Run("success", func() { - out := AirflowCommand("test-id", "-f docker_image_test.go") + err := config.CFG.DockerCommand.SetHomeString("docker") + s.NoError(err) + out, err := AirflowCommand("test-id", "-f docker_image_test.go") + s.NoError(err) + s.Empty(out) + }) + + s.Run("error", func() { + err := config.CFG.DockerCommand.SetHomeString("") + s.NoError(err) + err = os.Setenv("PATH", "") // set PATH to empty string to force error on container runtime check + s.NoError(err) + out, err := AirflowCommand("test-id", "-f docker_image_test.go") + s.Error(err) s.Empty(out) }) } diff --git a/settings/settings.go b/settings/settings.go index ae6e0d88a..34a55c8ae 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -67,13 +67,19 @@ func ConfigSettings(id, settingsFile string, envConns map[string]astrocore.Envir return err } if pools { - AddPools(id, version) + if err := AddPools(id, version); err != nil { + return fmt.Errorf("error adding pools: %w", err) + } } if variables { - AddVariables(id, version) + if err := AddVariables(id, version); err != nil { + return fmt.Errorf("error adding variables: %w", err) + } } if connections { - AddConnections(id, version, envConns) + if err := AddConnections(id, version, envConns); err != nil { + return fmt.Errorf("error adding connections: %w", err) + } } return nil } @@ -105,7 +111,7 @@ func InitSettings(settingsFile string) error { } // AddVariables is a function to add Variables from settings.yaml -func AddVariables(id string, version uint64) { +func AddVariables(id string, version uint64) error { variables := settings.Airflow.Variables for _, variable := range variables { if !objectValidator(0, variable.VariableName) { @@ -123,15 +129,19 @@ func AddVariables(id string, version uint64) { airflowCommand := fmt.Sprintf(baseCmd, variable.VariableName) airflowCommand += fmt.Sprintf("'%s'", variable.VariableValue) - out := execAirflowCommand(id, airflowCommand) + out, err := execAirflowCommand(id, airflowCommand) + if err != nil { + return fmt.Errorf("Error adding variable %s: %w", variable.VariableName, err) + } logrus.Debugf("Adding variable logs:\n" + out) fmt.Printf("Added Variable: %s\n", variable.VariableName) } } + return nil } // AddConnections is a function to add Connections from settings.yaml -func AddConnections(id string, version uint64, envConns map[string]astrocore.EnvironmentObjectConnection) { +func AddConnections(id string, version uint64, envConns map[string]astrocore.EnvironmentObjectConnection) error { connections := settings.Airflow.Connections connections = AppendEnvironmentConnections(connections, envConns) @@ -169,7 +179,10 @@ func AddConnections(id string, version uint64, envConns map[string]astrocore.Env connPortArg = "--conn_port" } airflowCommand := baseListCmd - out := execAirflowCommand(id, airflowCommand) + out, err := execAirflowCommand(id, airflowCommand) + if err != nil { + return fmt.Errorf("error listing connections: %w", err) + } for i := range connections { var j int @@ -185,7 +198,10 @@ func AddConnections(id string, version uint64, envConns map[string]astrocore.Env if strings.Contains(out, quotedConnID) || strings.Contains(out, conn.ConnID) { fmt.Printf("Updating Connection %q...\n", conn.ConnID) airflowCommand = fmt.Sprintf("%s %s %q", baseRmCmd, connIDArg, conn.ConnID) - execAirflowCommand(id, airflowCommand) + _, err = execAirflowCommand(id, airflowCommand) + if err != nil { + return fmt.Errorf("error removing connection %s: %w", conn.ConnID, err) + } } if !objectValidator(1, conn.ConnType, conn.ConnURI) { @@ -225,10 +241,14 @@ func AddConnections(id string, version uint64, envConns map[string]astrocore.Env airflowCommand += fmt.Sprintf("%s '%s' ", connURIArg, conn.ConnURI) } - out := execAirflowCommand(id, airflowCommand) + out, err := execAirflowCommand(id, airflowCommand) + if err != nil { + return fmt.Errorf("error adding connection %s: %w", conn.ConnID, err) + } logrus.Debugf("Adding Connection logs:\n\n" + out) fmt.Printf("Added Connection: %s\n", conn.ConnID) } + return nil } func AppendEnvironmentConnections(connections Connections, envConnections map[string]astrocore.EnvironmentObjectConnection) Connections { @@ -271,7 +291,7 @@ func AppendEnvironmentConnections(connections Connections, envConnections map[st } // AddPools is a function to add Pools from settings.yaml -func AddPools(id string, version uint64) { +func AddPools(id string, version uint64) error { pools := settings.Airflow.Pools baseCmd := "airflow " @@ -294,7 +314,10 @@ func AddPools(id string, version uint64) { airflowCommand += "''" } fmt.Println(airflowCommand) - out := execAirflowCommand(id, airflowCommand) + out, err := execAirflowCommand(id, airflowCommand) + if err != nil { + return fmt.Errorf("error adding pool %s: %w", pool.PoolName, err) + } logrus.Debugf("Adding pool logs:\n" + out) fmt.Printf("Added Pool: %s\n", pool.PoolName) } else { @@ -302,6 +325,7 @@ func AddPools(id string, version uint64) { } } } + return nil } func objectValidator(bound int, args ...string) bool { @@ -347,12 +371,18 @@ func EnvExport(id, envFile string, version uint64, connections, variables bool) func EnvExportVariables(id, envFile string) error { // setup airflow command to export variables - out := execAirflowCommand(id, airflowVarExport) + out, err := execAirflowCommand(id, airflowVarExport) + if err != nil { + return fmt.Errorf("error exporting variables: %w", err) + } logrus.Debugf("Env Export Variables logs:\n\n" + out) if strings.Contains(out, "successfully") { // get variables from file created by airflow command - out = execAirflowCommand(id, catVarFile) + out, err = execAirflowCommand(id, catVarFile) + if err != nil { + return fmt.Errorf("error reading variables file: %w", err) + } m := map[string]string{} err := json.Unmarshal([]byte(out), &m) @@ -375,7 +405,10 @@ func EnvExportVariables(id, envFile string) error { } } fmt.Println("Aiflow variables successfully export to the file " + envFile + "\n") - _ = execAirflowCommand(id, rmVarFile) + _, err = execAirflowCommand(id, rmVarFile) + if err != nil { + return fmt.Errorf("error removing variables file: %w", err) + } return nil } return errors.New("variable export unsuccessful") @@ -383,12 +416,18 @@ func EnvExportVariables(id, envFile string) error { func EnvExportConnections(id, envFile string) error { // Airflow command to export connections to env uris - out := execAirflowCommand(id, airflowConnExport) + out, err := execAirflowCommand(id, airflowConnExport) + if err != nil { + return fmt.Errorf("error exporting connections: %w", err) + } logrus.Debugf("Env Export Connections logs:\n" + out) if strings.Contains(out, "successfully") { // get connections from file craeted by airflow command - out = execAirflowCommand(id, catConnFile) + out, err = execAirflowCommand(id, catConnFile) + if err != nil { + return fmt.Errorf("error reading connections file: %w", err) + } vars := strings.Split(out, "\n") // add connections to the env file @@ -411,7 +450,10 @@ func EnvExportConnections(id, envFile string) error { } fmt.Println("Aiflow connections successfully export to the file " + envFile + "\n") rmCmd := "rm tmp.connection" - _ = execAirflowCommand(id, rmCmd) + _, err = execAirflowCommand(id, rmCmd) + if err != nil { + return fmt.Errorf("error removing connections file: %w", err) + } return nil } return errors.New("connection export unsuccessful") @@ -460,7 +502,10 @@ func Export(id, settingsFile string, version uint64, connections, variables, poo func ExportConnections(id string) error { // Setup airflow command to export connections - out := execAirflowCommand(id, airflowConnectionList) + out, err := execAirflowCommand(id, airflowConnectionList) + if err != nil { + return fmt.Errorf("error listing connections: %w", err) + } logrus.Debugf("Export Connections logs:\n" + out) // remove all color from output of the airflow command plainOut := re.ReplaceAllString(out, "") @@ -469,7 +514,7 @@ func ExportConnections(id string) error { var connections AirflowConnections - err := yaml.Unmarshal([]byte(yamlCons), &connections) + err = yaml.Unmarshal([]byte(yamlCons), &connections) if err != nil { return err } @@ -517,12 +562,18 @@ func ExportConnections(id string) error { func ExportVariables(id string) error { // setup files - out := execAirflowCommand(id, airflowVarExport) + out, err := execAirflowCommand(id, airflowVarExport) + if err != nil { + return fmt.Errorf("error exporting variables: %w", err) + } logrus.Debugf("Export Variables logs:\n" + out) if strings.Contains(out, "successfully") { // get variables created by the airflow command - out = execAirflowCommand(id, catVarFile) + out, err = execAirflowCommand(id, catVarFile) + if err != nil { + return fmt.Errorf("error reading variables file: %w", err) + } m := map[string]string{} err := json.Unmarshal([]byte(out), &m) @@ -550,7 +601,10 @@ func ExportVariables(id string) error { if err != nil { return err } - _ = execAirflowCommand(id, rmVarFile) + _, err = execAirflowCommand(id, rmVarFile) + if err != nil { + return fmt.Errorf("error removing variables file: %w", err) + } fmt.Printf("successfully exported variables\n\n") return nil } @@ -560,7 +614,10 @@ func ExportVariables(id string) error { func ExportPools(id string) error { // Setup airflow command to export pools airflowCommand := ariflowPoolsList - out := execAirflowCommand(id, airflowCommand) + out, err := execAirflowCommand(id, airflowCommand) + if err != nil { + return fmt.Errorf("error listing pools: %w", err) + } logrus.Debugf("Export Pools logs:\n" + out) // remove all color from output of the airflow command @@ -570,7 +627,7 @@ func ExportPools(id string) error { // remove warnings and extra text from the the output yamlpools := "- description:" + strings.SplitN(plainOut, "- description:", 2)[1] //nolint:gomnd - err := yaml.Unmarshal([]byte(yamlpools), &pools) + err = yaml.Unmarshal([]byte(yamlpools), &pools) if err != nil { return err } diff --git a/settings/settings_test.go b/settings/settings_test.go index dcbef3828..9e1602e67 100644 --- a/settings/settings_test.go +++ b/settings/settings_test.go @@ -2,6 +2,7 @@ package settings import ( "encoding/json" + "fmt" "os" "testing" @@ -54,11 +55,12 @@ func (s *Suite) TestAddConnectionsAirflowOne() { expectedAddCmd := "airflow connections -a --conn_id 'test-id' --conn_type 'test-type' --conn_host 'test-host' --conn_login 'test-login' --conn_password 'test-password' --conn_schema 'test-schema' --conn_port 1" expectedListCmd := "airflow connections -l " - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Contains([]string{expectedAddCmd, expectedListCmd}, airflowCommand) - return "" + return "", nil } - AddConnections("test-conn-id", 1, nil) + err := AddConnections("test-conn-id", 1, nil) + s.NoError(err) } func (s *Suite) TestAddConnectionsAirflowTwo() { @@ -80,14 +82,15 @@ func (s *Suite) TestAddConnectionsAirflowTwo() { expectedAddCmd := "airflow connections add 'test-id' --conn-type 'test-type' --conn-host 'test-host' --conn-login 'test-login' --conn-password 'test-password' --conn-schema 'test-schema' --conn-port 1" expectedDelCmd := "airflow connections delete \"test-id\"" expectedListCmd := "airflow connections list -o plain" - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Contains([]string{expectedAddCmd, expectedListCmd, expectedDelCmd}, airflowCommand) if airflowCommand == expectedListCmd { - return "'test-id' 'test-type' 'test-host' 'test-uri'" + return "'test-id' 'test-type' 'test-host' 'test-uri'", nil } - return "" + return "", nil } - AddConnections("test-conn-id", 2, nil) + err := AddConnections("test-conn-id", 2, nil) + s.NoError(err) } func ptr[T any](t T) *T { @@ -130,14 +133,15 @@ func (s *Suite) TestAddConnectionsAirflowTwoWithEnvConns() { expectedEnvAddCmd := "airflow connections add 'test-env-id' --conn-type 'test-env-type' --conn-extra '{\"test-extra-key\":\"test-extra-value\"}' --conn-host 'test-env-host' --conn-login 'test-env-login' --conn-password 'test-env-password' --conn-schema 'test-env-schema' --conn-port 2" - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Contains([]string{expectedAddCmd, expectedEnvAddCmd, expectedListCmd, expectedDelCmd}, airflowCommand) if airflowCommand == expectedListCmd { - return "'test-id' 'test-type' 'test-host' 'test-uri'" + return "'test-id' 'test-type' 'test-host' 'test-uri'", nil } - return "" + return "", nil } - AddConnections("test-conn-id", 2, envConns) + err := AddConnections("test-conn-id", 2, envConns) + s.NoError(err) } func (s *Suite) TestAddConnectionsAirflowTwoURI() { @@ -149,14 +153,41 @@ func (s *Suite) TestAddConnectionsAirflowTwoURI() { expectedAddCmd := "airflow connections add 'test-id' --conn-uri 'test-uri'" expectedDelCmd := "airflow connections delete \"test-id\"" expectedListCmd := "airflow connections list -o plain" - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Contains([]string{expectedAddCmd, expectedListCmd, expectedDelCmd}, airflowCommand) if airflowCommand == expectedListCmd { - return "'test-id' 'test-type' 'test-host' 'test-uri'" + return "'test-id' 'test-type' 'test-host' 'test-uri'", nil } - return "" + return "", nil } - AddConnections("test-conn-id", 2, nil) + err := AddConnections("test-conn-id", 2, nil) + s.NoError(err) +} + +func (s *Suite) TestAddConnectionsFailure() { + var testExtra map[string]string + + testConn := Connection{ + ConnID: "test-id", + ConnType: "test-type", + ConnHost: "test-host", + ConnSchema: "test-schema", + ConnLogin: "test-login", + ConnPassword: "test-password", + ConnPort: 1, + ConnURI: "test-uri", + ConnExtra: testExtra, + } + settings.Airflow.Connections = []Connection{testConn} + + expectedAddCmd := "airflow connections -a --conn_id 'test-id' --conn_type 'test-type' --conn_host 'test-host' --conn_login 'test-login' --conn_password 'test-password' --conn_schema 'test-schema' --conn_port 1" + expectedListCmd := "airflow connections -l " + execAirflowCommand = func(id, airflowCommand string) (string, error) { + s.Contains([]string{expectedAddCmd, expectedListCmd}, airflowCommand) + return "", fmt.Errorf("mock error") + } + err := AddConnections("test-conn-id", 1, nil) + s.Contains(err.Error(), "mock error") } func (s *Suite) TestAddVariableAirflowOne() { @@ -168,11 +199,12 @@ func (s *Suite) TestAddVariableAirflowOne() { } expectedAddCmd := "airflow variables -s test-var-name'test-var-val'" - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Equal(expectedAddCmd, airflowCommand) - return "" + return "", nil } - AddVariables("test-conn-id", 1) + err := AddVariables("test-conn-id", 1) + s.NoError(err) } func (s *Suite) TestAddVariableAirflowTwo() { @@ -184,11 +216,12 @@ func (s *Suite) TestAddVariableAirflowTwo() { } expectedAddCmd := "airflow variables set test-var-name 'test-var-val'" - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Equal(expectedAddCmd, airflowCommand) - return "" + return "", nil } - AddVariables("test-conn-id", 2) + err := AddVariables("test-conn-id", 2) + s.NoError(err) } func (s *Suite) TestAddPoolsAirflowOne() { @@ -201,11 +234,12 @@ func (s *Suite) TestAddPoolsAirflowOne() { } expectedAddCmd := "airflow pool -s test-pool-name 1 'test-pool-description' " - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Equal(expectedAddCmd, airflowCommand) - return "" + return "", nil } - AddPools("test-conn-id", 1) + err := AddPools("test-conn-id", 1) + s.NoError(err) } func (s *Suite) TestAddPoolsAirflowTwo() { @@ -218,11 +252,29 @@ func (s *Suite) TestAddPoolsAirflowTwo() { } expectedAddCmd := "airflow pools set test-pool-name 1 'test-pool-description' " - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { + s.Equal(expectedAddCmd, airflowCommand) + return "", nil + } + err := AddPools("test-conn-id", 2) + s.NoError(err) +} + +func (s *Suite) TestAddVariableFailure() { + settings.Airflow.Variables = Variables{ + { + VariableName: "test-var-name", + VariableValue: "test-var-val", + }, + } + + expectedAddCmd := "airflow variables -s test-var-name'test-var-val'" + execAirflowCommand = func(id, airflowCommand string) (string, error) { s.Equal(expectedAddCmd, airflowCommand) - return "" + return "", fmt.Errorf("mock error") } - AddPools("test-conn-id", 2) + err := AddVariables("test-conn-id", 1) + s.Contains(err.Error(), "mock error") } func (s *Suite) TestInitSettingsSuccess() { @@ -242,20 +294,20 @@ func (s *Suite) TestInitSettingsFailure() { func (s *Suite) TestEnvExport() { s.Run("success", func() { - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { switch airflowCommand { case airflowVarExport: - return "1 variables successfully exported to tmp.var" + return "1 variables successfully exported to tmp.var", nil case catVarFile: return `{ "myvar": "myval" - }` + }`, nil case airflowConnExport: - return "Connections successfully exported to tmp.json" + return "Connections successfully exported to tmp.json", nil case catConnFile: - return "local_postgres=postgres://username:password@example.db.example.com:5432/schema" + return "local_postgres=postgres://username:password@example.db.example.com:5432/schema", nil default: - return "" + return "", nil } } @@ -269,12 +321,12 @@ func (s *Suite) TestEnvExport() { }) s.Run("variable failure", func() { - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { switch airflowCommand { case airflowVarExport: - return "" + return "", nil default: - return "" + return "", nil } } @@ -284,12 +336,12 @@ func (s *Suite) TestEnvExport() { }) s.Run("connection failure", func() { - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { switch airflowCommand { case airflowConnExport: - return "" + return "", nil default: - return "" + return "", nil } } @@ -306,7 +358,7 @@ func (s *Suite) TestEnvExport() { func (s *Suite) TestExport() { s.Run("success", func() { - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { switch airflowCommand { case airflowConnectionList: return ` @@ -322,13 +374,13 @@ func (s *Suite) TestExport() { login: username password: password port: '5432' - schema: schema` + schema: schema`, nil case airflowVarExport: - return "1 variables successfully exported to tmp.var" + return "1 variables successfully exported to tmp.var", nil case catVarFile: return `{ "myvar": "myval" - }` + }`, nil case ariflowPoolsList: return ` - description: Default pool @@ -336,9 +388,9 @@ func (s *Suite) TestExport() { slots: '128' - description: '' pool: subdag_limit - slots: '3'` + slots: '3'`, nil default: - return "" + return "", nil } } @@ -347,12 +399,12 @@ func (s *Suite) TestExport() { }) s.Run("variable failure", func() { - execAirflowCommand = func(id, airflowCommand string) string { + execAirflowCommand = func(id, airflowCommand string) (string, error) { switch airflowCommand { case airflowVarExport: - return "" + return "", nil default: - return "" + return "", nil } }