Skip to content

Commit

Permalink
add verbose and execute date to astro run (#1330)
Browse files Browse the repository at this point in the history
* add verbose and execute date to astro run

* update mocks after merging the latest main

* update astro client mocks after merging the latest main

* add date formats

---------

Co-authored-by: neel-astro <[email protected]>
  • Loading branch information
sunkickr and neel-astro authored Aug 2, 2023
1 parent abba64e commit beca3f0
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 35 deletions.
4 changes: 2 additions & 2 deletions airflow/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ContainerHandler interface {
Logs(follow bool, containerNames ...string) error
Run(args []string, user string) error
Bash(container string) error
RunDAG(dagID, settingsFile, dagFile string, noCache, taskLogs bool) error
RunDAG(dagID, settingsFile, dagFile, executionDate string, noCache, taskLogs bool) error
ImportSettings(settingsFile, envFile string, connections, variables, pools bool) error
ExportSettings(settingsFile, envFile string, connections, variables, pools, envExport bool) error
ComposeExport(settingsFile, composeFile string) error
Expand All @@ -47,7 +47,7 @@ type ImageHandler interface {
GetLabel(labelName string) (string, error)
ListLabels() (map[string]string, error)
TagLocalImage(localImage string) error
Run(dagID, envFile, settingsFile, containerName, dagFile string, taskLogs bool) error
Run(dagID, envFile, settingsFile, containerName, dagFile, executionDate string, taskLogs bool) error
Pytest(pytestFile, airflowHome, envFile string, pytestArgs []string, config types.ImageBuildConfig) (string, error)
}

Expand Down
6 changes: 3 additions & 3 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (d *DockerCompose) getWebServerContainerID() (string, error) {
return "", err
}

func (d *DockerCompose) RunDAG(dagID, settingsFile, dagFile string, noCache, taskLogs bool) error {
func (d *DockerCompose) RunDAG(dagID, settingsFile, dagFile, executionDate string, noCache, taskLogs bool) error {
// Get project containers
psInfo, err := d.composeService.Ps(context.Background(), d.projectName, api.PsOptions{
All: true,
Expand All @@ -668,7 +668,7 @@ func (d *DockerCompose) RunDAG(dagID, settingsFile, dagFile string, noCache, tas
for i := range psInfo {
if checkServiceState(psInfo[i].State, dockerStateUp) {
if strings.Contains(psInfo[i].Name, SchedulerDockerContainerName) {
err = d.imageHandler.Run(dagID, d.envFile, settingsFile, psInfo[i].Name, dagFile, taskLogs)
err = d.imageHandler.Run(dagID, d.envFile, settingsFile, psInfo[i].Name, dagFile, executionDate, taskLogs)
if err != nil {
return err
}
Expand Down Expand Up @@ -706,7 +706,7 @@ func (d *DockerCompose) RunDAG(dagID, settingsFile, dagFile string, noCache, tas
return err
}

err = d.imageHandler.Run(dagID, d.envFile, settingsFile, "", dagFile, taskLogs)
err = d.imageHandler.Run(dagID, d.envFile, settingsFile, "", dagFile, executionDate, taskLogs)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion airflow/docker_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (d *DockerImage) TagLocalImage(localImage string) error {
return nil
}

func (d *DockerImage) Run(dagID, envFile, settingsFile, containerName, dagFile string, taskLogs bool) error {
func (d *DockerImage) Run(dagID, envFile, settingsFile, containerName, dagFile, executionDate string, taskLogs bool) error {
dockerCommand := config.CFG.DockerCommand.GetString()

stdout := os.Stdout
Expand Down Expand Up @@ -343,6 +343,16 @@ func (d *DockerImage) Run(dagID, envFile, settingsFile, containerName, dagFile s
}
args = append(args, cmdArgs...)

if executionDate != "" {
cmdArgs = append(cmdArgs, []string{"--execution-date", executionDate}...)
}

if taskLogs {
cmdArgs = append(cmdArgs, []string{"--verbose"}...)
}

args = append(args, cmdArgs...)

fmt.Println("\nStarting a DAG run for " + dagID + "...")
fmt.Println("\nLoading DAGs...")

Expand Down
6 changes: 3 additions & 3 deletions airflow/docker_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestDockerImageRun(t *testing.T) {
return nil
}

err = handler.Run("", "./testfiles/airflow_settings.yaml", "", "", "", true)
err = handler.Run("", "./testfiles/airflow_settings.yaml", "", "", "", "", true)
assert.NoError(t, err)
})

Expand All @@ -381,7 +381,7 @@ func TestDockerImageRun(t *testing.T) {
return nil
}

err = handler.Run("", "./testfiles/airflow_settings_invalid.yaml", "", "test-container", "", true)
err = handler.Run("", "./testfiles/airflow_settings_invalid.yaml", "", "test-container", "", "", true)
assert.NoError(t, err)
})

Expand All @@ -390,7 +390,7 @@ func TestDockerImageRun(t *testing.T) {
return errExecMock
}

err = handler.Run("", "./testfiles/airflow_settings.yaml", "", "", "", true)
err = handler.Run("", "./testfiles/airflow_settings.yaml", "", "", "", "", true)
assert.Contains(t, err.Error(), errExecMock.Error())
})

Expand Down
20 changes: 10 additions & 10 deletions airflow/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,15 +1389,15 @@ func TestDockerComposeRunDAG(t *testing.T) {
t.Run("success with container", func(t *testing.T) {
noCache := false
imageHandler := new(mocks.ImageHandler)
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{{ID: "test-scheduler-id", State: "running", Name: "test-scheduler"}}, nil).Once()

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.RunDAG("", "", "", noCache, false)
err := mockDockerCompose.RunDAG("", "", "", "", noCache, false)
assert.NoError(t, err)

imageHandler.AssertExpectations(t)
Expand All @@ -1407,15 +1407,15 @@ func TestDockerComposeRunDAG(t *testing.T) {
t.Run("error with container", func(t *testing.T) {
noCache := false
imageHandler := new(mocks.ImageHandler)
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errMockDocker).Once()
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errMockDocker).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{{ID: "test-scheduler-id", State: "running", Name: "test-scheduler"}}, nil).Once()

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.RunDAG("", "", "", noCache, false)
err := mockDockerCompose.RunDAG("", "", "", "", noCache, false)
assert.ErrorIs(t, err, errMockDocker)

imageHandler.AssertExpectations(t)
Expand All @@ -1426,15 +1426,15 @@ func TestDockerComposeRunDAG(t *testing.T) {
noCache := false
imageHandler := new(mocks.ImageHandler)
imageHandler.On("Build", airflowTypes.ImageBuildConfig{Path: mockDockerCompose.airflowHome, Output: true, NoCache: noCache}).Return(nil).Once()
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{}, nil).Once()

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.RunDAG("", "", "", noCache, false)
err := mockDockerCompose.RunDAG("", "", "", "", noCache, false)
assert.NoError(t, err)

imageHandler.AssertExpectations(t)
Expand All @@ -1445,15 +1445,15 @@ func TestDockerComposeRunDAG(t *testing.T) {
noCache := false
imageHandler := new(mocks.ImageHandler)
imageHandler.On("Build", airflowTypes.ImageBuildConfig{Path: mockDockerCompose.airflowHome, Output: true, NoCache: noCache}).Return(nil).Once()
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errMockDocker).Once()
imageHandler.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errMockDocker).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{}, nil).Once()

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.RunDAG("", "", "", noCache, false)
err := mockDockerCompose.RunDAG("", "", "", "", noCache, false)
assert.ErrorIs(t, err, errMockDocker)

imageHandler.AssertExpectations(t)
Expand All @@ -1471,7 +1471,7 @@ func TestDockerComposeRunDAG(t *testing.T) {
mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.RunDAG("", "", "", noCache, false)
err := mockDockerCompose.RunDAG("", "", "", "", noCache, false)
assert.ErrorIs(t, err, errMockDocker)

imageHandler.AssertExpectations(t)
Expand All @@ -1485,7 +1485,7 @@ func TestDockerComposeRunDAG(t *testing.T) {

mockDockerCompose.composeService = composeMock

err := mockDockerCompose.RunDAG("", "", "", noCache, false)
err := mockDockerCompose.RunDAG("", "", "", "", noCache, false)
assert.ErrorIs(t, err, errMockDocker)

composeMock.AssertExpectations(t)
Expand Down
10 changes: 5 additions & 5 deletions airflow/mocks/ContainerHandler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions airflow/mocks/ImageHandler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
)

var (
dagID string
dagFile string
taskLogs bool
dagID string
dagFile string
executionDate string
taskLogs bool
)

func newRunCommand() *cobra.Command {
Expand All @@ -30,6 +31,8 @@ func newRunCommand() *cobra.Command {
cmd.Flags().BoolVarP(&noCache, "no-cache", "", false, "Do not use cache when building container image")
cmd.Flags().StringVarP(&settingsFile, "settings-file", "s", "airflow_settings.yaml", "Settings file for importing Airflow objects")
cmd.Flags().StringVarP(&dagFile, "dag-file", "d", "", "(Optional) The file where your DAG is located. Use this flag to parse only the DAG file that has the DAG you want to run. You may get parsing errors related to other DAGs if you don't specify a DAG file")
cmd.Flags().StringVarP(&executionDate, "execution-date", "", "", "(Optional) Execution date for the dagrun. Defaults to now. Acceptable date formats: %Y-%m-%d, %Y-%m-%dT%H:%M:%S, %Y-%m-%d %H:%M:%S")
cmd.Flags().BoolVarP(&taskLogs, "verbose", "", false, "(Optional) Print out the logs of the dag run")

return cmd
}
Expand All @@ -53,5 +56,5 @@ func run(cmd *cobra.Command, args []string) error {
return err
}

return containerHandler.RunDAG(dagID, settingsFile, dagFile, noCache, taskLogs)
return containerHandler.RunDAG(dagID, settingsFile, dagFile, executionDate, noCache, taskLogs)
}
4 changes: 2 additions & 2 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestRun(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("RunDAG", "test-dag", "airflow_settings.yaml", "", false, false).Return(nil).Once()
mockContainerHandler.On("RunDAG", "test-dag", "airflow_settings.yaml", "", "", false, false).Return(nil).Once()
return mockContainerHandler, nil
}

Expand All @@ -39,7 +39,7 @@ func TestRun(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("RunDAG", "test-dag", "airflow_settings.yaml", "", false, false).Return(errMock).Once()
mockContainerHandler.On("RunDAG", "test-dag", "airflow_settings.yaml", "", "", false, false).Return(errMock).Once()
return mockContainerHandler, nil
}

Expand Down

0 comments on commit beca3f0

Please sign in to comment.