Skip to content

Commit

Permalink
Astro CLI does not add variables and connections declared in airflow_…
Browse files Browse the repository at this point in the history
…settings.yaml to the database fir Airflow 2.0.0 (#398)

* Astro CLI does not add variables and connections declared in airflow_settings.yaml to the database fir Airflow 2.0.0

* remove debug info

* Update settings/settings.go

Co-authored-by: Adam Vandover <[email protected]>

Co-authored-by: Adam Vandover <[email protected]>
  • Loading branch information
andriisoldatenko and Adam Vandover committed Feb 2, 2021
1 parent 748ca2e commit a2efe47
Showing 1 changed file with 64 additions and 17 deletions.
81 changes: 64 additions & 17 deletions settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var (
// ConfigSettings is the main builder of the settings package
func ConfigSettings(id string, version uint64) {
InitSettings()
AddConnections(id)
AddPools(id)
AddConnections(id, version)
AddPools(id, version)
AddVariables(id, version)
}

Expand Down Expand Up @@ -98,9 +98,45 @@ func AddVariables(id string, version uint64) {
}

// AddConnections is a function to add Connections from settings.yaml
func AddConnections(id string) {
func AddConnections(id string, airflowVersion uint64) {
connections := settings.Airflow.Connections
airflowCommand := fmt.Sprintf("airflow connections -l")
baseCmd := "airflow connections "
var (
baseAddCmd, baseRmCmd, baseListCmd, connIdArg, connTypeArg, connUriArg, connExtraArg, connHostArg, connLoginArg, connPasswordArg, connSchemaArg, connPortArg string
)
if airflowVersion >= AirflowVersionTwo {
// Airflow 2.0.0 command
// based on https://airflow.apache.org/docs/apache-airflow/2.0.0/cli-and-env-variables-ref.html
baseAddCmd = baseCmd + "add "
baseRmCmd = baseCmd + "delete "
baseListCmd = baseCmd + "list "
connIdArg = ""
connTypeArg = "--conn-type"
connUriArg = "--conn-uri"
connExtraArg = "--conn-extra"
connHostArg = "--conn-host"
connLoginArg = "--conn-login"
connPasswordArg = "--conn-password"
connSchemaArg = "--conn-schema"
connPortArg = "--conn-port"
} else {
// Airflow 1.0.0 command based on
// https://airflow.readthedocs.io/en/1.10.12/cli-ref.html#connections
baseAddCmd = baseCmd + "-a "
baseRmCmd = baseCmd + "-d "
baseListCmd = baseCmd + "-l "
connIdArg = "--conn_id"
connTypeArg = "--conn_type"
connUriArg = "--conn_uri"
connExtraArg = "--conn_extra"
connHostArg = "--conn_host"
connLoginArg = "--conn_login"
connPasswordArg = "--conn_password"
connSchemaArg = "--conn_schema"
connPortArg = "--conn_port"
}

airflowCommand := fmt.Sprintf("%s", baseListCmd)
out := docker.AirflowCommand(id, airflowCommand)

for _, conn := range connections {
Expand All @@ -109,39 +145,38 @@ func AddConnections(id string) {

if strings.Contains(out, quotedConnID) {
fmt.Printf("Found Connection: \"%s\"...replacing...\n", conn.ConnID)
airflowCommand = fmt.Sprintf("airflow connections -d --conn_id \"%s\"", conn.ConnID)
airflowCommand = fmt.Sprintf("%s %s \"%s\"", baseRmCmd, connIdArg, conn.ConnID)
docker.AirflowCommand(id, airflowCommand)
}

if !objectValidator(1, conn.ConnType, conn.ConnURI) {
fmt.Printf("Skipping %s: conn_type or conn_uri must be specified.\n", conn.ConnID)
} else {
airflowCommand = fmt.Sprintf("airflow connections -a --conn_id \"%s\" ", conn.ConnID)
airflowCommand = fmt.Sprintf("%s %s \"%s\" ", baseAddCmd, connIdArg, conn.ConnID)
if objectValidator(0, conn.ConnType) {
airflowCommand += fmt.Sprintf("--conn_type \"%s\" ", conn.ConnType)
airflowCommand += fmt.Sprintf("%s \"%s\" ", connTypeArg, conn.ConnType)
}
if objectValidator(0, conn.ConnURI) {
airflowCommand += fmt.Sprintf("--conn_uri '%s' ", conn.ConnURI)
airflowCommand += fmt.Sprintf("%s '%s' ", connUriArg, conn.ConnURI)
}
if objectValidator(0, conn.ConnExtra) {
airflowCommand += fmt.Sprintf("--conn_extra '%s' ", conn.ConnExtra)
airflowCommand += fmt.Sprintf("%s '%s' ", connExtraArg, conn.ConnExtra)
}
if objectValidator(0, conn.ConnHost) {
airflowCommand += fmt.Sprintf("--conn_host '%s' ", conn.ConnHost)
airflowCommand += fmt.Sprintf("%s '%s' ", connHostArg, conn.ConnHost)
}
if objectValidator(0, conn.ConnLogin) {
airflowCommand += fmt.Sprintf("--conn_login '%s' ", conn.ConnLogin)
airflowCommand += fmt.Sprintf("%s '%s' ", connLoginArg, conn.ConnLogin)
}
if objectValidator(0, conn.ConnPassword) {
airflowCommand += fmt.Sprintf("--conn_password '%s' ", conn.ConnPassword)
airflowCommand += fmt.Sprintf("%s '%s' ", connPasswordArg, conn.ConnPassword)
}
if objectValidator(0, conn.ConnSchema) {
airflowCommand += fmt.Sprintf("--conn_schema '%s' ", conn.ConnSchema)
airflowCommand += fmt.Sprintf("%s '%s' ", connSchemaArg, conn.ConnSchema)
}
if conn.ConnPort != 0 {
airflowCommand += fmt.Sprintf("--conn_port %v", conn.ConnPort)
airflowCommand += fmt.Sprintf("%s %v", connPortArg, conn.ConnPort)
}

docker.AirflowCommand(id, airflowCommand)
fmt.Printf("Added Connection: %s\n", conn.ConnID)
}
Expand All @@ -150,11 +185,23 @@ func AddConnections(id string) {
}

// AddPools is a function to add Pools from settings.yaml
func AddPools(id string) {
func AddPools(id string, airflowVersion uint64) {
pools := settings.Airflow.Pools
baseCmd := "airflow "
if airflowVersion >= AirflowVersionTwo {
// Airflow 2.0.0 command

// based on https://airflow.apache.org/docs/apache-airflow/2.0.0/cli-and-env-variables-ref.html
baseCmd += "pools set "
} else {
// Airflow 1.0.0 command
// based on https://airflow.apache.org/docs/apache-airflow/1.10.12/usage-cli.html
baseCmd += "pool -s "
}

for _, pool := range pools {
if objectValidator(0, pool.PoolName) {
airflowCommand := fmt.Sprintf("airflow pool -s %s ", pool.PoolName)
airflowCommand := fmt.Sprintf("%s %s ", baseCmd, pool.PoolName)
if pool.PoolSlot != 0 {
airflowCommand += fmt.Sprintf("%v ", pool.PoolSlot)
if objectValidator(0, pool.PoolDescription) {
Expand Down

0 comments on commit a2efe47

Please sign in to comment.