From afb636d5f22c71d6e00760a42fa90c5b7fe387a6 Mon Sep 17 00:00:00 2001 From: Chris Tessum Date: Sat, 16 Mar 2019 02:34:10 +0000 Subject: [PATCH] cloud: Fix status --- cloud/blob.go | 9 ++++----- cloud/client.go | 15 ++++++++------- cloud/fakerunner.go | 14 ++++++++++++-- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/cloud/blob.go b/cloud/blob.go index c132cbd39..207b7595c 100644 --- a/cloud/blob.go +++ b/cloud/blob.go @@ -36,11 +36,11 @@ func readBlob(ctx context.Context, bucket *blob.Bucket, key string) ([]byte, err var b bytes.Buffer r, err := bucket.NewReader(ctx, key, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("Reading blob key %s: %v", key, err) } defer r.Close() _, err = io.Copy(&b, r) - return b.Bytes(), err + return b.Bytes(), fmt.Errorf("Reading blob key %s: %v", key, err) } // writeBlob writes the given data to the given bucket. @@ -69,12 +69,11 @@ func (c *Client) Output(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.J o := &cloudrpc.JobOutput{ Files: make(map[string][]byte), } - //TODO: k8sJob, err := c.getk8sJob(ctx, job) + k8sJob, err := c.getk8sJob(ctx, job) if err != nil { return nil, err } - //TODO: addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command) - addrs, err := c.jobOutputAddresses(ctx, job.Name, []string{"inmap", "run", "steady"}) + addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command) if err != nil { return nil, err } diff --git a/cloud/client.go b/cloud/client.go index 7856d23c1..6c60fec3d 100644 --- a/cloud/client.go +++ b/cloud/client.go @@ -102,15 +102,17 @@ func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.J } status, err := c.Status(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version}) - if err != nil { + if status.Status != cloudrpc.Status_Missing && err != nil { return nil, err } - if status.Status != cloudrpc.Status_Failed { //TODO: status.Status != cloudrpc.Status_Missing && { + if status.Status != cloudrpc.Status_Failed && status.Status != cloudrpc.Status_Missing { // Only create the job if it is missing or failed. return status, nil } // TODO: Is this necessary? - c.Delete(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version}) + if status.Status != cloudrpc.Status_Missing { + c.Delete(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version}) + } if err := c.stageInputs(ctx, job); err != nil { return nil, err @@ -179,7 +181,7 @@ func userJobName(user, name string) string { // Status returns the status of the given job. func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.JobStatus, error) { s := new(cloudrpc.JobStatus) - /*k8sJob, err := c.getk8sJob(ctx, job) + k8sJob, err := c.getk8sJob(ctx, job) if err != nil { return &cloudrpc.JobStatus{ Status: cloudrpc.Status_Missing, @@ -198,9 +200,8 @@ func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.J if k8sJob.Status.Active > 0 { s.Status = cloudrpc.Status_Running s.StartTime = k8sJob.Status.StartTime.Time.Unix() - }*/ - //TODO: err = c.checkOutputs(ctx, name, k8sJob.Spec.Template.Spec.Containers[0].Command) - err := c.checkOutputs(ctx, job.Name, []string{"inmap", "run", "steady"}) + } + err = c.checkOutputs(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command) if err != nil { s.Status = cloudrpc.Status_Failed s.Message = fmt.Sprintf("job completed but the following error occurred when checking outputs: %s", err) diff --git a/cloud/fakerunner.go b/cloud/fakerunner.go index dc97ccaac..9ed29d98f 100644 --- a/cloud/fakerunner.go +++ b/cloud/fakerunner.go @@ -41,16 +41,19 @@ import ( // and after executing the inmap command, respectively. func NewFakeClient(checkConfig func([]string), checkRun func([]byte, error), bucket string, root *cobra.Command, config *viper.Viper, inputFileArgs, outputFileArgs []string) (*Client, error) { k8sClient := fake.NewSimpleClientset() - k8sClient.Fake.PrependReactor("create", "jobs", fakeRun(checkConfig, checkRun)) + jobs := make([]batch.Job, 0, 1000) + k8sClient.Fake.PrependReactor("create", "jobs", fakeRun(checkConfig, checkRun, &jobs)) + k8sClient.Fake.PrependReactor("list", "jobs", fakeList(&jobs)) return NewClient(k8sClient, root, config, bucket, inputFileArgs, outputFileArgs) } // fakeRun runs the InMAP simulation specified by the job. // The InMAP command must be compiled for it to work, // e.g., `go install github.com/spatialmodel/inmap/cmd/inmap`. -func fakeRun(checkConfig func([]string), checkRun func([]byte, error)) func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { +func fakeRun(checkConfig func([]string), checkRun func([]byte, error), jobs *[]batch.Job) func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { job := action.(k8stesting.CreateAction).GetObject().(*batch.Job) + *jobs = append(*jobs, *job) cmd := job.Spec.Template.Spec.Containers[0].Command args := job.Spec.Template.Spec.Containers[0].Args for i := 0; i < len(args); i += 2 { @@ -70,6 +73,13 @@ func fakeRun(checkConfig func([]string), checkRun func([]byte, error)) func(acti } } +// fakeList returns the job that was most recently run, if any. +func fakeList(jobs *[]batch.Job) func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &batch.JobList{Items: *jobs}, nil + } +} + // FakeRPCClient is a local RPC client for testing. type FakeRPCClient struct { Client *Client