Skip to content

Commit

Permalink
cloud: Fix status
Browse files Browse the repository at this point in the history
  • Loading branch information
ctessum committed Mar 16, 2019
1 parent 76d2465 commit afb636d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
9 changes: 4 additions & 5 deletions cloud/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func readBlob(ctx context.Context, bucket *blob.Bucket, key string) ([]byte, err
var b bytes.Buffer
r, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("Reading blob key %s: %v", key, err)
}
defer r.Close()
_, err = io.Copy(&b, r)
return b.Bytes(), err
return b.Bytes(), fmt.Errorf("Reading blob key %s: %v", key, err)
}

// writeBlob writes the given data to the given bucket.
Expand Down Expand Up @@ -69,12 +69,11 @@ func (c *Client) Output(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.J
o := &cloudrpc.JobOutput{
Files: make(map[string][]byte),
}
//TODO: k8sJob, err := c.getk8sJob(ctx, job)
k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return nil, err
}
//TODO: addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
addrs, err := c.jobOutputAddresses(ctx, job.Name, []string{"inmap", "run", "steady"})
addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
if err != nil {
return nil, err
}
Expand Down
15 changes: 8 additions & 7 deletions cloud/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.J
}

status, err := c.Status(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
if err != nil {
if status.Status != cloudrpc.Status_Missing && err != nil {
return nil, err
}
if status.Status != cloudrpc.Status_Failed { //TODO: status.Status != cloudrpc.Status_Missing && {
if status.Status != cloudrpc.Status_Failed && status.Status != cloudrpc.Status_Missing {
// Only create the job if it is missing or failed.
return status, nil
}
// TODO: Is this necessary?
c.Delete(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
if status.Status != cloudrpc.Status_Missing {
c.Delete(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
}

if err := c.stageInputs(ctx, job); err != nil {
return nil, err
Expand Down Expand Up @@ -179,7 +181,7 @@ func userJobName(user, name string) string {
// Status returns the status of the given job.
func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.JobStatus, error) {
s := new(cloudrpc.JobStatus)
/*k8sJob, err := c.getk8sJob(ctx, job)
k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return &cloudrpc.JobStatus{
Status: cloudrpc.Status_Missing,
Expand All @@ -198,9 +200,8 @@ func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.J
if k8sJob.Status.Active > 0 {
s.Status = cloudrpc.Status_Running
s.StartTime = k8sJob.Status.StartTime.Time.Unix()
}*/
//TODO: err = c.checkOutputs(ctx, name, k8sJob.Spec.Template.Spec.Containers[0].Command)
err := c.checkOutputs(ctx, job.Name, []string{"inmap", "run", "steady"})
}
err = c.checkOutputs(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
if err != nil {
s.Status = cloudrpc.Status_Failed
s.Message = fmt.Sprintf("job completed but the following error occurred when checking outputs: %s", err)
Expand Down
14 changes: 12 additions & 2 deletions cloud/fakerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ import (
// and after executing the inmap command, respectively.
func NewFakeClient(checkConfig func([]string), checkRun func([]byte, error), bucket string, root *cobra.Command, config *viper.Viper, inputFileArgs, outputFileArgs []string) (*Client, error) {
k8sClient := fake.NewSimpleClientset()
k8sClient.Fake.PrependReactor("create", "jobs", fakeRun(checkConfig, checkRun))
jobs := make([]batch.Job, 0, 1000)
k8sClient.Fake.PrependReactor("create", "jobs", fakeRun(checkConfig, checkRun, &jobs))
k8sClient.Fake.PrependReactor("list", "jobs", fakeList(&jobs))
return NewClient(k8sClient, root, config, bucket, inputFileArgs, outputFileArgs)
}

// fakeRun runs the InMAP simulation specified by the job.
// The InMAP command must be compiled for it to work,
// e.g., `go install github.com/spatialmodel/inmap/cmd/inmap`.
func fakeRun(checkConfig func([]string), checkRun func([]byte, error)) func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
func fakeRun(checkConfig func([]string), checkRun func([]byte, error), jobs *[]batch.Job) func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
job := action.(k8stesting.CreateAction).GetObject().(*batch.Job)
*jobs = append(*jobs, *job)
cmd := job.Spec.Template.Spec.Containers[0].Command
args := job.Spec.Template.Spec.Containers[0].Args
for i := 0; i < len(args); i += 2 {
Expand All @@ -70,6 +73,13 @@ func fakeRun(checkConfig func([]string), checkRun func([]byte, error)) func(acti
}
}

// fakeList returns the job that was most recently run, if any.
func fakeList(jobs *[]batch.Job) func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &batch.JobList{Items: *jobs}, nil
}
}

// FakeRPCClient is a local RPC client for testing.
type FakeRPCClient struct {
Client *Client
Expand Down

0 comments on commit afb636d

Please sign in to comment.