Skip to content

Commit

Permalink
Merge pull request #25 from Clever/better-default-env
Browse files Browse the repository at this point in the history
require `_EXECUTION_NAME` in payload
  • Loading branch information
natebrennand authored Jul 10, 2018
2 parents 2a9fab9 + cc47519 commit 0f2d666
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ vendor/
bin/
release/
gen-go/
cmd/sfncli/sfncli
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ sfncli -activityname sleep-100 -region us-west-2 --cloudwatchregion us-west-1 -w
- Get a task. Take the JSON input for the task and
- if it's a JSON object, use this as the last arg to the `cmd` passed to `sfncli`.
- if it's anything else (e.g. JSON array), an error is thrown.
- if the task input an `_EXECUTION_NAME` key, it is added to the environment of the `cmd` as `_EXECUTION_NAME`.
- if `_EXECUTION_NAME` is missing from the payload, an error is thrown
- the `_EXECUTION_NAME` payload attribute value is added to the environment of the `cmd` as `_EXECUTION_NAME`.
- if workdirectory is set, create a sub-directory and add it to the environment of the `cmd` as `WORK_DIR`.
- Start [`SendTaskHeartbeat`](http://docs.aws.amazon.com/step-functions/latest/apireference/API_SendTaskHeartbeat.html) loop.
- When the command exits:
Expand All @@ -51,6 +52,7 @@ sfncli -activityname sleep-100 -region us-west-2 --cloudwatchregion us-west-1 -w
`sfncli` will report the following error names if it encounters errors it can identify:

- `sfncli.TaskInputNotJSON`: input to the task was not JSON
- `sfncli.TaskFailureTaskInputMissingExecutionName`: input is missing `_EXECUTION_NAME` attribute
- `sfncli.CommandNotFound`: the command passed to `sfncli` was not found
- `sfncli.CommandKilled`: the command process received SIGKILL
- `sfncli.CommandExitedNonzero`: the command process exited with a nonzero exit code
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.16
0.3.0
13 changes: 13 additions & 0 deletions cmd/sfncli/error_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ func (t TaskFailureTaskInputNotJSON) ErrorCause() string {
}
func (t TaskFailureTaskInputNotJSON) Error() string { return t.ErrorCause() }

// TaskFailureTaskInputMissingExecutionName is used when the input to the task is not a JSON object.
type TaskFailureTaskInputMissingExecutionName struct {
input string
}

func (t TaskFailureTaskInputMissingExecutionName) ErrorName() string {
return "sfncli.TaskInputMissingExecutionName"
}
func (t TaskFailureTaskInputMissingExecutionName) ErrorCause() string {
return fmt.Sprintf("task input missing _EXECUTION_NAME attribute: '%s'", t.input)
}
func (t TaskFailureTaskInputMissingExecutionName) Error() string { return t.ErrorCause() }

// TaskFailureCommandNotFound is used when the command passed to sfncli is not found.
type TaskFailureCommandNotFound struct {
path string
Expand Down
27 changes: 14 additions & 13 deletions cmd/sfncli/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
)

// stay within documented limits of SFN APIs
const maxTaskOutputLength = 32768
const maxTaskFailureCauseLength = 32768
const (
maxTaskOutputLength = 32768
maxTaskFailureCauseLength = 32768
)

// TaskRunner manages resources for executing a task
type TaskRunner struct {
Expand Down Expand Up @@ -65,12 +67,12 @@ func (t *TaskRunner) Process(ctx context.Context, args []string, input string) e
return t.sendTaskFailure(TaskFailureTaskInputNotJSON{input: input})
}

// convention: if the input contains _EXECUTION_NAME, pass it to the environment of the command
var executionName *string
if e, ok := taskInput["_EXECUTION_NAME"].(string); ok {
executionName = &e
t.logger.AddContext("execution_name", *executionName)
// _EXECUTION_NAME is a required payload parameter that we inject into the environment
executionName, ok := taskInput["_EXECUTION_NAME"].(string)
if !ok {
return t.sendTaskFailure(TaskFailureTaskInputMissingExecutionName{input: input})
}
t.logger.AddContext("execution_name", executionName)

marshaledInput, err := json.Marshal(taskInput)
if err != nil {
Expand All @@ -80,9 +82,8 @@ func (t *TaskRunner) Process(ctx context.Context, args []string, input string) e
args = append(args, string(marshaledInput))

t.execCmd = exec.CommandContext(ctx, t.cmd, args...)
if executionName != nil {
t.execCmd.Env = append(os.Environ(), "_EXECUTION_NAME="+*executionName)
}
t.execCmd.Env = append(os.Environ(), "_EXECUTION_NAME="+executionName)

tmpDir := ""
if t.workDirectory != "" {
// make a new tmpDir for every run
Expand Down Expand Up @@ -146,9 +147,9 @@ func (t *TaskRunner) Process(ctx context.Context, args []string, input string) e
} else if err := json.Unmarshal([]byte(taskOutput), &taskOutputMap); err != nil {
return t.sendTaskFailure(TaskFailureTaskOutputNotJSON{output: taskOutput})
}
if executionName != nil {
taskOutputMap["_EXECUTION_NAME"] = *executionName
}
// Add _EXECUTION_NAME back into the payload in case the executing worker omits the value
// in the output.
taskOutputMap["_EXECUTION_NAME"] = executionName

finalTaskOutput, err := json.Marshal(taskOutputMap)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions cmd/sfncli/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

const (
mockTaskToken = "taskToken"
emptyTaskInput = "{}"
emptyTaskInput = `{"_EXECUTION_NAME":"fake-WFM-uuid"}`
testScriptsDir = "./test_scripts"
)

Expand Down Expand Up @@ -84,14 +84,14 @@ func TestTaskOutputEmptyStringAsJSON(t *testing.T) {
defer testCtxCancel()
cmd := "stdout_empty_output.sh"
cmdArgs := []string{}
taskInput := "{}"
taskInput := `{"_EXECUTION_NAME":"fake-WFM-uuid"}`

controller := gomock.NewController(t)
defer controller.Finish()
mockSFN := mocksfn.NewMockSFNAPI(controller)
mockSFN.EXPECT().SendTaskSuccessWithContext(gomock.Any(), &sfn.SendTaskSuccessInput{
TaskToken: aws.String(mockTaskToken),
Output: aws.String("{}"),
Output: aws.String(`{"_EXECUTION_NAME":"fake-WFM-uuid"}`),
})
taskRunner := NewTaskRunner(path.Join(testScriptsDir, cmd), mockSFN, mockTaskToken, "")
err := taskRunner.Process(testCtx, cmdArgs, taskInput)
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestTaskSuccessSignalForwarded(t *testing.T) {
controller := gomock.NewController(t)
mockSFN := mocksfn.NewMockSFNAPI(controller)
mockSFN.EXPECT().SendTaskSuccessWithContext(gomock.Any(), &sfn.SendTaskSuccessInput{
Output: aws.String(`{"signal":"1"}`),
Output: aws.String(`{"_EXECUTION_NAME":"fake-WFM-uuid","signal":"1"}`),
TaskToken: aws.String(mockTaskToken),
})
defer controller.Finish()
Expand All @@ -318,7 +318,7 @@ func TestTaskSuccessOutputIsLastLineOfStdout(t *testing.T) {
controller := gomock.NewController(t)
mockSFN := mocksfn.NewMockSFNAPI(controller)
mockSFN.EXPECT().SendTaskSuccessWithContext(gomock.Any(), &sfn.SendTaskSuccessInput{
Output: aws.String(`{"task":"output"}`),
Output: aws.String(`{"_EXECUTION_NAME":"fake-WFM-uuid","task":"output"}`),
TaskToken: aws.String(mockTaskToken),
})
defer controller.Finish()
Expand All @@ -332,7 +332,7 @@ func TestTaskWorkDirectorySetup(t *testing.T) {
defer testCtxCancel()
cmd := "echo_workdir.sh"
cmdArgs := []string{}
taskInput := "{}"
taskInput := `{"_EXECUTION_NAME":"fake-WFM-uuid"}`

controller := gomock.NewController(t)
defer controller.Finish()
Expand All @@ -352,14 +352,14 @@ func TestTaskWorkDirectoryUnsetByDefault(t *testing.T) {
defer testCtxCancel()
cmd := "echo_workdir.sh"
cmdArgs := []string{}
taskInput := "{}" // output a env var using the key
taskInput := `{"_EXECUTION_NAME":"fake-WFM-uuid"}` // output a env var using the key

controller := gomock.NewController(t)
defer controller.Finish()
mockSFN := mocksfn.NewMockSFNAPI(controller)
mockSFN.EXPECT().SendTaskSuccessWithContext(gomock.Any(), &sfn.SendTaskSuccessInput{
TaskToken: aws.String(mockTaskToken),
Output: aws.String("{\"work_dir\":\"\"}"), // returns the result of WORK_DIR
Output: aws.String(`{"_EXECUTION_NAME":"fake-WFM-uuid","work_dir":""}`), // returns the result of WORK_DIR
})
taskRunner := NewTaskRunner(path.Join(testScriptsDir, cmd), mockSFN, mockTaskToken, "")
err := taskRunner.Process(testCtx, cmdArgs, taskInput)
Expand All @@ -372,7 +372,7 @@ func TestTaskWorkDirectoryCleaned(t *testing.T) {
defer testCtxCancel()
cmd := "create_file.sh"
cmdArgs := []string{}
taskInput := "{}"
taskInput := `{"_EXECUTION_NAME":"fake-WFM-uuid"}`

controller := gomock.NewController(t)
defer controller.Finish()
Expand Down

0 comments on commit 0f2d666

Please sign in to comment.