Skip to content

Commit

Permalink
Merge pull request #1 from Clever/get-ready-for-prod
Browse files Browse the repository at this point in the history
some cleanup / small feature additions
  • Loading branch information
rgarcia authored Sep 26, 2017
2 parents 79c1c47 + 4e5829c commit 3f72d31
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 49 deletions.
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,26 @@ Utility to create AWS Step Function activities out of command line programs.

## Usage

TODO
```
$ sfncli -h
Usage of sfncli:
-activityname string
The activity name to register with AWS Step Functions. $VAR and ${VAR} env variables are expanded.
-cmd string
The command to run to process activity tasks.
-region string
The AWS region to send Step Function API calls. Defaults to AWS_REGION.
-version
Print the version and exit.
-workername string
The worker name to send to AWS Step Functions when processing a task. Environment variables are expanded. The magic string ECS_TASK_ARN will be expanded to the ECS task ARN via the metadata service.
```

Example:

```
sfncli -activityname sleep-100 -region us-west-2 -workername sleep-worker -cmd sleep 100
```

## High-level logic

Expand All @@ -21,7 +40,7 @@ TODO
Start up a test activity that runs `echo` on the work it receives.

```
go run cmd/sfncli/*.go -region us-west-2 -name test-activity -cmd echo
go run cmd/sfncli/*.go -region us-west-2 -activityname test-activity -cmd echo
```

Create a new state machine that uses this activity for one of its states (this requires you to [create a role for use with Step Functions](http://docs.aws.amazon.com/step-functions/latest/dg/procedure-create-iam-role.html)):
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.1
0.1.0
133 changes: 133 additions & 0 deletions cmd/sfncli/ecs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"strings"
)

const magicECSTaskARN = "MAGIC_ECS_TASK_ARN"

func getDockerID() (string, error) {
file, err := os.Open("/proc/self/cgroup")
if err != nil {
return "", err
}
defer file.Close()

reader := bufio.NewReader(file)
for {
// Example: "2:cpu:/docker/93c562c426414f53582c9830a30bdb54d85642956e18115dd59bc9f435ae5644"
line, err := reader.ReadString('\n')
if err != nil {
break
}

components := strings.Split(line, ":")
if len(components) == 3 {
return strings.TrimRight(path.Base(components[2]), "\n"), nil
}
}

return "", fmt.Errorf("Failed to find Docker ID in /proc/self/group")
}

type ECSAgentMetadata struct {
Cluster string `json:"Cluster"`
}

type ECSAgentTaskMetadata struct {
Tasks []struct {
ARN string `json:"Arn"`
Containers []struct {
DockerID string `json:"DockerId"`
} `json:"Containers"`
} `json:"Tasks"`
}

// https://github.com/aws/amazon-ecs-agent/issues/258
// https://github.com/aws/amazon-ecs-agent/pull/709
func ecsAgentTaskMetadata() (ECSAgentTaskMetadata, error) {
response, err := http.Get("http://172.17.0.1:51678/v1/tasks")
if err != nil {
return ECSAgentTaskMetadata{}, err
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return ECSAgentTaskMetadata{}, err
}

metadata := ECSAgentTaskMetadata{}
err = json.Unmarshal(body, &metadata)
if err != nil {
return ECSAgentTaskMetadata{}, err
}

return metadata, nil
}

func ecsAgentMetadata() (ECSAgentMetadata, error) {
response, err := http.Get("http://172.17.0.1:51678/v1/metadata")
if err != nil {
return ECSAgentMetadata{}, err
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return ECSAgentMetadata{}, err
}

metadata := ECSAgentMetadata{}
err = json.Unmarshal(body, &metadata)
if err != nil {
return ECSAgentMetadata{}, err
}

return metadata, nil
}

func expandECSTaskARN(s string) (string, error) {
if !strings.Contains(s, magicECSTaskARN) {
return s, nil
}

dockerID, err := getDockerID()
if err != nil {
return "", err
}

agentMetadata, err := ecsAgentMetadata()
if err != nil {
return "", err
}

if agentMetadata.Cluster == "" {
return "", fmt.Errorf("Could not find ECS cluster for docker container '%s'", dockerID)
}

agentTaskMetadata, err := ecsAgentTaskMetadata()
if err != nil {
return "", err
}

taskARN := ""
for _, task := range agentTaskMetadata.Tasks {
for _, container := range task.Containers {
if strings.HasPrefix(container.DockerID, dockerID) {
taskARN = task.ARN
break
}
}
}
if taskARN == "" {
return "", fmt.Errorf("Could not find ECS task for docker container '%s' on cluster '%s'", dockerID, agentMetadata.Cluster)
}

return strings.Replace(s, magicECSTaskARN, taskARN, 1), nil
}
54 changes: 26 additions & 28 deletions cmd/sfncli/runner.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,43 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"

"github.com/armon/circbuf"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sfn"
"github.com/aws/aws-sdk-go/service/sfn/sfniface"
)

// stay within documented limits of SFN APIs
const maxTaskOutputLength = 32768
const maxTaskFailureCauseLength = 32768

type TaskRunner struct {
sfnapi sfniface.SFNAPI
taskToken string
cmd string
inputs []string
args []string
}

func NewTaskRunner(cmd string, args []string, sfnapi sfniface.SFNAPI, taskInput string, taskToken string) TaskRunner {
var params []string
if err := json.Unmarshal([]byte(taskInput), &params); err != nil {
sfnapi.SendTaskFailure(&sfn.SendTaskFailureInput{
Cause: aws.String(fmt.Sprintf("Task input must be array of strings: %s", err.Error())),
TaskToken: &taskToken,
})
return TaskRunner{}
// if the task input is an array of strings, interpret these as an args array
// otherwise pass the raw input as a single arg
var taskInputArgs []string
if err := json.Unmarshal([]byte(taskInput), &taskInputArgs); err != nil {
taskInputArgs = []string{taskInput}
}

// append the input on the cmd passed through the CLI
// example:
// sfncli -cmd echo how now
// input = ["brown", "cow"]
// exec(echo, ["how", "now", "brown", "cow"])
inputs := append(args, params...)

return TaskRunner{
sfnapi: sfnapi,
taskToken: taskToken,
cmd: cmd,
inputs: inputs,
args: append(args, taskInputArgs...),
}
}

Expand All @@ -53,23 +48,23 @@ func (t TaskRunner) Process(ctx context.Context) error {
return nil // if New failed :-/
}
log.InfoD("exec-command", map[string]interface{}{
"inputs": t.inputs,
"cmd": t.cmd,
"args": t.args,
"cmd": t.cmd,
})

cmd := exec.CommandContext(ctx, t.cmd, t.inputs...)
cmd := exec.CommandContext(ctx, t.cmd, t.args...)
cmd.Env = os.Environ()

// Write the stdout and stderr of the process to both this process' stdout and stderr
// and also write to a byte buffer so that we can save it in the ResultsStore
var stderrbuf bytes.Buffer
var stdoutbuf bytes.Buffer
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrbuf)
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutbuf)
// and also write to a byte buffer so that we can send the result to step functions
stderrbuf, _ := circbuf.NewBuffer(maxTaskFailureCauseLength)
stdoutbuf, _ := circbuf.NewBuffer(maxTaskOutputLength)
cmd.Stderr = io.MultiWriter(os.Stderr, stderrbuf)
cmd.Stdout = io.MultiWriter(os.Stdout, stdoutbuf)

if err := cmd.Run(); err != nil {
if _, e := t.sfnapi.SendTaskFailureWithContext(ctx, &sfn.SendTaskFailureInput{
Cause: aws.String(stderrbuf.String()), // TODO: limits on length?
Cause: aws.String(stderrbuf.String()),
TaskToken: &t.taskToken,
}); e != nil {
return fmt.Errorf("error sending task failure: %s", e)
Expand All @@ -81,15 +76,18 @@ func (t TaskRunner) Process(ctx context.Context) error {
output := stdoutbuf.String()
var test interface{}
if err := json.Unmarshal([]byte(output), &test); err != nil {
// output isn't JSON, make it json
// output isn't JSON, make it json and stay under the length limit
if len(output)+100 > maxTaskOutputLength && len(output) > 100 {
output = output[100:] // stay under the limit
}
newOutputBs, _ := json.Marshal(map[string]interface{}{
"raw": output,
})
output = string(newOutputBs)
}

_, err := t.sfnapi.SendTaskSuccessWithContext(ctx, &sfn.SendTaskSuccessInput{
Output: aws.String(output), // TODO: limits on length?
Output: aws.String(output),
TaskToken: &t.taskToken,
})
return err
Expand Down
40 changes: 25 additions & 15 deletions cmd/sfncli/sfncli.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var log = logger.New("sfncli")
var Version string

func main() {
name := flag.String("name", "", "The activity name to register with AWS Step Functions.")
activityName := flag.String("activityname", "", "The activity name to register with AWS Step Functions. $VAR and ${VAR} env variables are expanded.")
workerName := flag.String("workername", "", "The worker name to send to AWS Step Functions when processing a task. Environment variables are expanded. The magic string ECS_TASK_ARN will be expanded to the ECS task ARN via the metadata service.")
cmd := flag.String("cmd", "", "The command to run to process activity tasks.")
region := flag.String("region", "", "The AWS region to send Step Function API calls. Defaults to AWS_REGION.")
printVersion := flag.Bool("version", false, "Print the version and exit.")
Expand All @@ -33,10 +34,22 @@ func main() {
os.Exit(0)
}

if *name == "" {
fmt.Println("name is required")
if *activityName == "" {
fmt.Println("activityname is required")
os.Exit(1)
}
*activityName = os.ExpandEnv(*activityName)
if *workerName == "" {
fmt.Println("workername is required")
os.Exit(1)
}
*workerName = os.ExpandEnv(*workerName)
if newWorkerName, err := expandECSTaskARN(*workerName); err != nil {
fmt.Printf("error expanding %s: %s", magicECSTaskARN, err)
os.Exit(1)
} else {
*workerName = newWorkerName
}
if *cmd == "" {
fmt.Println("cmd is required")
os.Exit(1)
Expand All @@ -49,9 +62,6 @@ func main() {
}
}

// TODO: set workerName to something useful
workerName := "worker-name"

mainCtx, mainCtxCancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
Expand All @@ -65,28 +75,27 @@ func main() {
// register the activity with AWS (it might already exist, which is ok)
sfnapi := sfn.New(session.New(), aws.NewConfig().WithRegion(*region))
createOutput, err := sfnapi.CreateActivityWithContext(mainCtx, &sfn.CreateActivityInput{
Name: name,
Name: activityName,
})
if err != nil {
fmt.Printf("error creating activity: %s\n", err)
os.Exit(1)
}
log.InfoD("startup", logger.M{"activity": *createOutput.ActivityArn, "worker-name": workerName})
log.InfoD("startup", logger.M{"activity": *createOutput.ActivityArn, "worker-name": *workerName})

// run getactivitytask and get some work
// getactivitytask itself claims to initiate a polling loop, but wrap it in a polling loop of our own
// since it seems to return every minute or so with a nil error and empty output
// getactivitytask claims to initiate a polling loop, but it seems to return every few minutes with
// a nil error and empty output. So wrap it in a polling loop of our own
ticker := time.NewTicker(5 * time.Second)
OuterLoop:
for {
for mainCtx.Err() == nil {
select {
case <-mainCtx.Done():
log.Info("getactivitytask-stop")
break OuterLoop // :-/ https://golang.org/ref/spec#Break_statements
continue
case <-ticker.C:
getATOutput, err := sfnapi.GetActivityTaskWithContext(mainCtx, &sfn.GetActivityTaskInput{
ActivityArn: createOutput.ActivityArn,
WorkerName: &workerName,
WorkerName: workerName,
})
if err != nil {
log.ErrorD("getactivitytask-error", logger.M{"error": err.Error()})
Expand Down Expand Up @@ -116,7 +125,8 @@ OuterLoop:
log.InfoD("heartbeat-end", logger.M{"token": token})
}()

// Run the command
// Run the command. Treat unprocessed args (flag.Args()) as additional args to
// send to the command on every invocation of the command
taskRunner := NewTaskRunner(*cmd, flag.Args(), sfnapi, input, token)
if err := taskRunner.Process(taskCtx); err != nil {
log.ErrorD("process-error", logger.M{"error": err.Error()})
Expand Down
8 changes: 5 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ import:
version: ^6.7.3
subpackages:
- logger
- package: github.com/armon/circbuf

0 comments on commit 3f72d31

Please sign in to comment.