Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control astro dev Output with --verbosity Flag #1770

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airflow/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ func generateConfig(projectName, airflowHome, envFile, buildImage, settingsFile

if envFile != "" {
if !envExists {
fmt.Printf(envNotFoundMsg, envFile)
envFile = ""
} else {
fmt.Printf(envFoundMsg, envFile)
envFile = fmt.Sprintf("env_file: %s", envFile)
}
}
Expand Down
114 changes: 74 additions & 40 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/astronomer/astro-cli/pkg/ansi"
"github.com/astronomer/astro-cli/pkg/fileutil"
"github.com/astronomer/astro-cli/pkg/logger"
"github.com/astronomer/astro-cli/pkg/spinner"
"github.com/astronomer/astro-cli/pkg/util"
"github.com/astronomer/astro-cli/settings"
"github.com/compose-spec/compose-go/v2/interpolation"
Expand Down Expand Up @@ -68,9 +70,7 @@ const (
composeUserPasswordMsg = "The default Airflow UI credentials are: %s"
postgresUserPasswordMsg = "The default Postgres DB credentials are: %s"

envPathMsg = "Error looking for \"%s\""
envFoundMsg = "Env file \"%s\" found. Loading...\n"
envNotFoundMsg = "Env file \"%s\" not found. Skipping...\n"
envPathMsg = "Error looking for \"%s\""
)

var (
Expand Down Expand Up @@ -166,7 +166,20 @@ func DockerComposeInit(airflowHome, envFile, dockerfile, imageName string) (*Doc
imageHandler := DockerImageInit(ImageName(imageName, "latest"))
composeFile := Composeyml

dockerCli, err := command.NewDockerCli()
// Determine if we should output to terminal or buffer.
output := logger.GetLevel() >= logrus.DebugLevel

// Route output streams according to verbosity.
var stdout, stderr io.Writer
if output {
stdout = os.Stdout
stderr = os.Stderr
} else {
stdout = io.Discard
stderr = io.Discard
}

dockerCli, err := command.NewDockerCli(command.WithOutputStream(stdout), command.WithErrorStream(stderr))
if err != nil {
logger.Fatalf("error creating compose client %s", err)
}
Expand Down Expand Up @@ -194,35 +207,19 @@ func DockerComposeInit(airflowHome, envFile, dockerfile, imageName string) (*Doc
//
//nolint:gocognit
func (d *DockerCompose) Start(imageName, settingsFile, composeFile, buildSecretString string, noCache, noBrowser bool, waitTime time.Duration, envConns map[string]astrocore.EnvironmentObjectConnection) error {
// Get project containers
psInfo, err := d.composeService.Ps(context.Background(), d.projectName, api.PsOptions{
All: true,
})
if err != nil {
return errors.Wrap(err, composeCreateErrMsg)
}
if len(psInfo) > 0 {
// Ensure project is not already running
for i := range psInfo {
if checkServiceState(psInfo[i].State, dockerStateUp) {
return errors.New("cannot start, project already running")
}
}
}

Comment on lines -197 to -212
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem necessary to throw an error if the project is already started. Let's just make sure the image is updated and started, then re-print the status info.

// Build this project image
if imageName == "" {
if !config.CFG.DisableAstroRun.GetBool() {
// add astro-run-dag package
err = fileutil.AddLineToFile("./requirements.txt", "astro-run-dag", "# This package is needed for the astro run command. It will be removed before a deploy")
err := fileutil.AddLineToFile("./requirements.txt", "astro-run-dag", "# This package is needed for the astro run command. It will be removed before a deploy")
if err != nil {
fmt.Printf("Adding 'astro-run-dag' package to requirements.txt unsuccessful: %s\nManually add package to requirements.txt", err.Error())
}
}
imageBuildErr := d.imageHandler.Build(d.dockerfile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true, NoCache: noCache})
imageBuildErr := d.imageHandler.Build(d.dockerfile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome, NoCache: noCache})
if !config.CFG.DisableAstroRun.GetBool() {
// remove astro-run-dag from requirments.txt
err = fileutil.RemoveLineFromFile("./requirements.txt", "astro-run-dag", " # This package is needed for the astro run command. It will be removed before a deploy")
err := fileutil.RemoveLineFromFile("./requirements.txt", "astro-run-dag", " # This package is needed for the astro run command. It will be removed before a deploy")
if err != nil {
fmt.Printf("Removing line 'astro-run-dag' package from requirements.txt unsuccessful: %s\n", err.Error())
}
Expand All @@ -243,6 +240,15 @@ func (d *DockerCompose) Start(imageName, settingsFile, composeFile, buildSecretS
return err
}

// Determine if we should output to terminal or buffer.
output := logger.GetLevel() >= logrus.DebugLevel

s := spinner.NewSpinner("Project is starting up…")
if !output {
s.Start()
defer s.Stop()
}

// Create a compose project
project, err := createDockerProject(d.projectName, d.airflowHome, d.envFile, "", settingsFile, composeFile, imageLabels)
if err != nil {
Expand All @@ -251,7 +257,9 @@ func (d *DockerCompose) Start(imageName, settingsFile, composeFile, buildSecretS

// Start up our project
err = d.composeService.Up(context.Background(), project, api.UpOptions{
Create: api.CreateOptions{},
Create: api.CreateOptions{
QuietPull: output,
},
Start: api.StartOptions{
Project: project,
},
Expand All @@ -260,13 +268,15 @@ func (d *DockerCompose) Start(imageName, settingsFile, composeFile, buildSecretS
return errors.Wrap(err, composeRecreateErrMsg)
}

fmt.Println("\n\nAirflow is starting up!")

airflowDockerVersion, err := d.checkAiflowVersion()
if err != nil {
return err
}

// If we're logging the output, only start the spinner for waiting on the webserver healthcheck.
s.Start()
defer s.Stop()

// Airflow webserver should be hosted at localhost
// from the perspective of the CLI running on the host machine.
webserverPort := config.CFG.WebserverPort.GetString()
Expand All @@ -279,6 +289,8 @@ func (d *DockerCompose) Start(imageName, settingsFile, composeFile, buildSecretS
return err
}

spinner.StopWithCheckmark(s, "Project has been started")

// If we've successfully gotten a healthcheck response, print the status.
err = printStatus(settingsFile, envConns, project, d.composeService, airflowDockerVersion, noBrowser)
if err != nil {
Expand Down Expand Up @@ -321,6 +333,15 @@ func (d *DockerCompose) ComposeExport(settingsFile, composeFile string) error {

// Stop a running docker project
func (d *DockerCompose) Stop(waitForExit bool) error {
// Determine if we should output to terminal or buffer.
output := logger.GetLevel() >= logrus.DebugLevel

s := spinner.NewSpinner("Stopping project…")
if !output {
s.Start()
defer s.Stop()
}

imageLabels, err := d.imageHandler.ListLabels()
if err != nil {
return err
Expand All @@ -339,6 +360,7 @@ func (d *DockerCompose) Stop(waitForExit bool) error {
}

if !waitForExit {
spinner.StopWithCheckmark(s, "Project has been stopped")
return nil
}

Expand All @@ -362,6 +384,7 @@ func (d *DockerCompose) Stop(waitForExit bool) error {
if strings.Contains(psInfo[i].Name, PostgresDockerContainerName) {
if psInfo[i].State == dockerExitState {
logger.Debug("postgres container reached exited state")
spinner.StopWithCheckmark(s, "Project has been stopped")
return nil
}
logger.Debugf("postgres container is still in %s state, waiting for it to be in exited state", psInfo[i].State)
Expand Down Expand Up @@ -403,18 +426,29 @@ func (d *DockerCompose) PS() error {

// Kill stops a local airflow development cluster
func (d *DockerCompose) Kill() error {
// Determine if we should output to terminal or buffer.
output := logger.GetLevel() >= logrus.DebugLevel

s := spinner.NewSpinner("Killing project…")
if !output {
s.Start()
defer s.Stop()
}

// Killing an already killed project produces an unsightly warning,
// so we briefly switch to a higher level before running the kill command.
// We then swap back to the original level.
originalLevel := logrus.GetLevel()
logrus.SetLevel(logrus.ErrorLevel)
defer logrus.SetLevel(originalLevel)

// Shut down our project
err := d.composeService.Down(context.Background(), d.projectName, api.DownOptions{Volumes: true, RemoveOrphans: true})
if err != nil {
return errors.Wrap(err, composeStopErrMsg)
}
logrus.SetLevel(originalLevel)

spinner.StopWithCheckmark(s, "Project has been killed")

return nil
}
Expand Down Expand Up @@ -488,7 +522,7 @@ func (d *DockerCompose) Pytest(pytestFile, customImageName, deployImageName, pyt
if deployImageName == "" {
// build image
if customImageName == "" {
err := d.imageHandler.Build(d.dockerfile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
err := d.imageHandler.Build(d.dockerfile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome})
if err != nil {
return "", err
}
Expand All @@ -514,7 +548,7 @@ func (d *DockerCompose) Pytest(pytestFile, customImageName, deployImageName, pyt
}

// run pytests
exitCode, err := d.imageHandler.Pytest(pytestFile, d.airflowHome, d.envFile, "", pytestArgs, false, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
exitCode, err := d.imageHandler.Pytest(pytestFile, d.airflowHome, d.envFile, "", pytestArgs, false, airflowTypes.ImageBuildConfig{Path: d.airflowHome})
if err != nil {
return exitCode, err
}
Expand Down Expand Up @@ -548,7 +582,7 @@ func (d *DockerCompose) UpgradeTest(newAirflowVersion, deploymentID, newImageNam
} else {
// build image for current Airflow version to get current Airflow version
fmt.Println("\nBuilding image for current Airflow version")
imageBuildErr := d.imageHandler.Build(d.dockerfile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
imageBuildErr := d.imageHandler.Build(d.dockerfile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome})
if imageBuildErr != nil {
return imageBuildErr
}
Expand Down Expand Up @@ -643,7 +677,7 @@ func (d *DockerCompose) conflictTest(testHomeDirectory, newImageName, newAirflow
return err
}

exitCode, conflictErr := d.imageHandler.ConflictTest(d.airflowHome, testHomeDirectory, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
exitCode, conflictErr := d.imageHandler.ConflictTest(d.airflowHome, testHomeDirectory, airflowTypes.ImageBuildConfig{Path: d.airflowHome})
if conflictErr != nil {
return conflictErr
}
Expand Down Expand Up @@ -673,7 +707,7 @@ func (d *DockerCompose) versionTest(testHomeDirectory, currentAirflowVersion, de
return err
}
fmt.Println("\nBuilding image for new Airflow version")
imageBuildErr := d.imageHandler.Build(newDockerFile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
imageBuildErr := d.imageHandler.Build(newDockerFile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome})
if imageBuildErr != nil {
return imageBuildErr
}
Expand Down Expand Up @@ -711,7 +745,7 @@ func (d *DockerCompose) dagTest(testHomeDirectory, newAirflowVersion, newDockerF
fmt.Printf("Adding 'pytest-html' package to requirements.txt unsuccessful: %s\nManually add package to requirements.txt", err.Error())
}
fmt.Println("\nBuilding image for new Airflow version")
imageBuildErr := d.imageHandler.Build(newDockerFile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
imageBuildErr := d.imageHandler.Build(newDockerFile, buildSecretString, airflowTypes.ImageBuildConfig{Path: d.airflowHome})

// remove pytest-html to the requirements
err = fileutil.RemoveLineFromFile(reqFile, "pytest-html", " # This package is needed for the upgrade dag test. It will be removed once the test is over")
Expand All @@ -738,7 +772,7 @@ func (d *DockerCompose) dagTest(testHomeDirectory, newAirflowVersion, newDockerF
htmlReportArgs := "--html=dag-test-report.html --self-contained-html"
// compare pip freeze files
fmt.Println("\nRunning DAG parse test with the new Airflow version")
exitCode, err := d.imageHandler.Pytest(pytestFile, d.airflowHome, d.envFile, testHomeDirectory, strings.Fields(htmlReportArgs), true, airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true})
exitCode, err := d.imageHandler.Pytest(pytestFile, d.airflowHome, d.envFile, testHomeDirectory, strings.Fields(htmlReportArgs), true, airflowTypes.ImageBuildConfig{Path: d.airflowHome})
if err != nil {
if strings.Contains(exitCode, "1") { // exit code is 1 meaning tests failed
fmt.Println("See above for errors detected in your DAGs")
Expand Down Expand Up @@ -1256,7 +1290,7 @@ func (d *DockerCompose) RunDAG(dagID, settingsFile, dagFile, executionDate strin
fmt.Printf("Removing line 'astro-run-dag' package from requirements.txt unsuccessful: %s\n", err.Error())
}
}()
err = d.imageHandler.Build(d.dockerfile, "", airflowTypes.ImageBuildConfig{Path: d.airflowHome, Output: true, NoCache: noCache})
err = d.imageHandler.Build(d.dockerfile, "", airflowTypes.ImageBuildConfig{Path: d.airflowHome, NoCache: noCache})
if err != nil {
return err
}
Expand Down Expand Up @@ -1378,13 +1412,13 @@ func printStatus(settingsFile string, envConns map[string]astrocore.EnvironmentO
}
}
}
fmt.Println("\nProject is running! All components are now available.")
parts := strings.Split(config.CFG.WebserverPort.GetString(), ":")
webserverURL := "http://localhost:" + parts[len(parts)-1]
fmt.Printf("\n"+composeLinkWebserverMsg+"\n", ansi.Bold(webserverURL))
fmt.Printf(composeLinkPostgresMsg+"\n", ansi.Bold("localhost:"+config.CFG.PostgresPort.GetString()+"/postgres"))
fmt.Printf(composeUserPasswordMsg+"\n", ansi.Bold("admin:admin"))
fmt.Printf(postgresUserPasswordMsg+"\n", ansi.Bold("postgres:postgres"))
bullet := ansi.Cyan("\u27A4") + " "
fmt.Printf(bullet+composeLinkWebserverMsg+"\n", ansi.Bold(webserverURL))
fmt.Printf(bullet+composeLinkPostgresMsg+"\n", ansi.Bold("postgresql://localhost:"+config.CFG.PostgresPort.GetString()+"/postgres"))
fmt.Printf(bullet+composeUserPasswordMsg+"\n", ansi.Bold("admin:admin"))
fmt.Printf(bullet+postgresUserPasswordMsg+"\n", ansi.Bold("postgres:postgres"))
if !(noBrowser || util.CheckEnvBool(os.Getenv("ASTRONOMER_NO_BROWSER"))) {
err = openURL(webserverURL)
if err != nil {
Expand Down
40 changes: 32 additions & 8 deletions airflow/docker_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import (
"github.com/astronomer/astro-cli/airflow/runtimes"
airflowTypes "github.com/astronomer/astro-cli/airflow/types"
"github.com/astronomer/astro-cli/config"
"github.com/astronomer/astro-cli/pkg/ansi"
"github.com/astronomer/astro-cli/pkg/logger"
"github.com/astronomer/astro-cli/pkg/spinner"
"github.com/astronomer/astro-cli/pkg/util"
cliConfig "github.com/docker/cli/cli/config"
cliTypes "github.com/docker/cli/cli/config/types"
"github.com/docker/cli/cli/streams"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -71,6 +74,17 @@ func shouldAddPullFlag(dockerfilePath string) (bool, error) {
}

func (d *DockerImage) Build(dockerfilePath, buildSecretString string, buildConfig airflowTypes.ImageBuildConfig) error {
// Determine if we should output to terminal or buffer.
output := logger.GetLevel() >= logrus.DebugLevel

// Start the spinner.
s := spinner.NewSpinner("Building project image…")
s.FinalMSG = ansi.Green("\u2714") + " Project image has been updated\n"
if !output {
s.Start()
defer s.Stop()
}

containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
Expand Down Expand Up @@ -118,24 +132,34 @@ func (d *DockerImage) Build(dockerfilePath, buildSecretString string, buildConfi
}
args = append(args, buildSecretArgs...)
}
// Build image

// Route output streams according to verbosity.
var stdout, stderr io.Writer
if buildConfig.Output {
var outBuff bytes.Buffer
if output {
stdout = os.Stdout
stderr = os.Stderr
} else {
stdout = nil
stderr = nil
stdout = &outBuff
stderr = &outBuff
}
fmt.Println(args)

// Build the image
err = cmdExec(containerRuntime, stdout, stderr, args...)
if err != nil {
return fmt.Errorf("command '%s build -t %s failed: %w", containerRuntime, d.imageName, err)
s.FinalMSG = ""
s.Stop()
fmt.Println(strings.TrimSpace(outBuff.String()) + "\n")
return errors.New("an error was encountered while building the image, see the build logs for details")
}
return err

return nil
}

func (d *DockerImage) Pytest(pytestFile, airflowHome, envFile, testHomeDirectory string, pytestArgs []string, htmlReport bool, buildConfig airflowTypes.ImageBuildConfig) (string, error) {
// Determine if we should output to terminal or buffer.
output := logger.GetLevel() >= logrus.DebugLevel

// delete container
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
Expand Down Expand Up @@ -167,7 +191,7 @@ func (d *DockerImage) Pytest(pytestFile, airflowHome, envFile, testHomeDirectory
args = append(args, pytestArgs...)
// run pytest image
var stdout, stderr io.Writer
if buildConfig.Output {
if output {
stdout = os.Stdout
stderr = os.Stderr
} else {
Expand Down
Loading