diff --git a/README.md b/README.md index e14f9f9..f14b350 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ -# Google Genomics Pipelines Tools +# Google Lifesciences Pipelines Tools [![Build Status](https://travis-ci.org/googlegenomics/pipelines-tools.svg?branch=master)](https://travis-ci.org/googlegenomics/pipelines-tools) This repository contains various tools that are useful when running pipelines -with the [Google Genomics API][1]. +with the [Google Lifesciences API][1]. # Quick Start Using Cloud Shell -1. Enable the Genomics API and the Compute Engine API in a new or existing +1. Enable the Lifesciences API and the Compute Engine API in a new or existing Google Cloud project. 2. Start a [Cloud Shell][cloud-shell] inside your project. 3. Inside the Cloud Shell, run the command @@ -97,9 +97,9 @@ output. Please report problems using the issue tracker. -[1]: https://cloud.google.com/genomics +[1]: https://cloud.google.com/life-sciences/ [2]: https://golang.org/ [3]: https://github.com/googlegenomics/pipelines-tools/blob/master/pipelines/internal/commands/run/run.go#L18 [cloud-shell]: https://cloud.google.com/shell/docs/quickstart -[api-reference]: https://cloud.google.com/genomics/reference/rest/v2alpha1/pipelines/run +[api-reference]: https://cloud.google.com/life-sciences/docs/reference/rest/v2beta/projects.locations.pipelines/run [gcs-fuse]: https://cloud.google.com/storage/docs/gcs-fuse diff --git a/pipelines/internal/commands/cancel/cancel.go b/pipelines/internal/commands/cancel/cancel.go index d0a2e8b..17bbbaf 100644 --- a/pipelines/internal/commands/cancel/cancel.go +++ b/pipelines/internal/commands/cancel/cancel.go @@ -22,18 +22,18 @@ import ( "net/http" "github.com/googlegenomics/pipelines-tools/pipelines/internal/common" - genomics "google.golang.org/api/genomics/v2alpha1" "google.golang.org/api/googleapi" + genomics "google.golang.org/api/lifesciences/v2beta" ) -func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { +func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error { if len(arguments) < 1 { return errors.New("missing operation name") } - name := common.ExpandOperationName(project, arguments[0]) + name := common.ExpandOperationName(project, location, arguments[0]) req := &genomics.CancelOperationRequest{} - if _, err := service.Projects.Operations.Cancel(name, req).Context(ctx).Do(); err != nil { + if _, err := service.Projects.Locations.Operations.Cancel(name, req).Context(ctx).Do(); err != nil { if err, ok := err.(*googleapi.Error); ok && err.Code == http.StatusNotFound { return fmt.Errorf("operation %q not found", name) } diff --git a/pipelines/internal/commands/export/export.go b/pipelines/internal/commands/export/export.go index 396b1ce..9334d40 100644 --- a/pipelines/internal/commands/export/export.go +++ b/pipelines/internal/commands/export/export.go @@ -21,11 +21,12 @@ import ( "errors" "flag" "fmt" + "path" "time" "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" - genomics "google.golang.org/api/genomics/v2alpha1" + genomics "google.golang.org/api/lifesciences/v2beta" ) var ( @@ -72,15 +73,15 @@ type label struct { Key, Value string } -func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { +func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error { flags.Parse(arguments) if *datasetName == "" || *tableName == "" { return errors.New("dataset and table are required") } - path := fmt.Sprintf("projects/%s/operations", project) - call := service.Projects.Operations.List(path).Context(ctx) + p := path.Join("projects", project, "locations", location) + call := service.Projects.Locations.Operations.List(p).Context(ctx) call.PageSize(256) bq, err := bigquery.NewClient(ctx, project) diff --git a/pipelines/internal/commands/query/query.go b/pipelines/internal/commands/query/query.go index 96ff4c4..112feff 100644 --- a/pipelines/internal/commands/query/query.go +++ b/pipelines/internal/commands/query/query.go @@ -19,9 +19,10 @@ import ( "context" "flag" "fmt" + "path" "strings" - genomics "google.golang.org/api/genomics/v2alpha1" + genomics "google.golang.org/api/lifesciences/v2beta" ) var ( @@ -32,11 +33,11 @@ var ( all = flags.Bool("all", false, "show all operations (when false, show only running operations)") ) -func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { +func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error { flags.Parse(arguments) - path := fmt.Sprintf("projects/%s/operations", project) - call := service.Projects.Operations.List(path).Context(ctx) + p := path.Join("projects", project, "locations", location) + call := service.Projects.Locations.Operations.List(p).Context(ctx) if !*all { *filter = strings.Join([]string{*filter, "done=false"}, " ") diff --git a/pipelines/internal/commands/run/run.go b/pipelines/internal/commands/run/run.go index 8d4cb3c..d66c9cb 100644 --- a/pipelines/internal/commands/run/run.go +++ b/pipelines/internal/commands/run/run.go @@ -129,8 +129,8 @@ import ( "github.com/googlegenomics/pipelines-tools/pipelines/internal/common" "golang.org/x/oauth2/google" compute "google.golang.org/api/compute/v1" - genomics "google.golang.org/api/genomics/v2alpha1" "google.golang.org/api/googleapi" + genomics "google.golang.org/api/lifesciences/v2beta" ) var ( @@ -181,7 +181,7 @@ func init() { flags.Var(&common.MapFlagValue{vmLabels}, "vm-labels", "label names and values to apply to the virtual machine") } -func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { +func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error { filenames := common.ParseFlags(flags, arguments) var filename string @@ -207,10 +207,10 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu return nil } - return runPipeline(ctx, service, req) + return runPipeline(ctx, service, req, project, location) } -func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.RunPipelineRequest) error { +func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.RunPipelineRequest, project, location string) error { abort := make(chan os.Signal, 1) signal.Notify(abort, os.Interrupt) @@ -218,7 +218,9 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R for { req.Pipeline.Resources.VirtualMachine.Preemptible = (attempt <= *pvmAttempts) - lro, err := service.Pipelines.Run(req).Context(ctx).Do() + parent := path.Join("projects", project, "locations", location) + lro, err := service.Projects.Locations.Pipelines.Run(parent, req).Context(ctx).Do() + if err != nil { if err, ok := err.(*googleapi.Error); ok && err.Message != "" { return fmt.Errorf("starting pipeline: %q: %q", err.Message, err.Body) @@ -237,7 +239,7 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R return nil } - if err := watch.Invoke(ctx, service, req.Pipeline.Resources.ProjectId, []string{lro.Name}); err != nil { + if err := watch.Invoke(ctx, service, project, location, []string{lro.Name}); err != nil { if err, ok := err.(common.PipelineExecutionError); ok && err.IsRetriable() { if attempt < *pvmAttempts+*attempts { attempt++ @@ -324,14 +326,14 @@ func buildRequest(filename, project string) (*genomics.RunPipelineRequest, error if *output != "" { action := gsutil("cp", "/google/logs/output", *output) - action.Flags = []string{"ALWAYS_RUN"} + action.AlwaysRun = true delocalizers = append(delocalizers, action) } var actions []*genomics.Action if *outputInterval != 0 && *output != "" { action := bash(fmt.Sprintf("while true; do sleep %.0f; gsutil -q cp /google/logs/output %s; done", (*outputInterval).Seconds(), *output)) - action.Flags = []string{"RUN_IN_BACKGROUND"} + action.RunInBackground = true actions = append(actions, action) } @@ -368,7 +370,7 @@ func buildRequest(filename, project string) (*genomics.RunPipelineRequest, error } if *network != "" { - vm.Network.Name = *network + vm.Network.Network = *network } if *subnetwork != "" { vm.Network.Subnetwork = *subnetwork @@ -382,7 +384,6 @@ func buildRequest(filename, project string) (*genomics.RunPipelineRequest, error } resources := &genomics.Resources{ - ProjectId: project, VirtualMachine: vm, } if *regions != "" && *zones != "" { @@ -494,12 +495,23 @@ func parse(line string) (*genomics.Action, error) { var action genomics.Action options := make(map[string]string) + flags := map[string]*bool{ + "IGNORE_EXIT_STATUS": &action.IgnoreExitStatus, + "RUN_IN_BACKGROUND": &action.RunInBackground, + "ALWAYS_RUN": &action.AlwaysRun, + "ENABLE_FUSE": &action.EnableFuse, + "PUBLISH_EXPOSED_PORTS": &action.PublishExposedPorts, + "DISABLE_IMAGE_PREFETCH": &action.DisableImagePrefetch, + "DISABLE_STANDARD_ERROR_CAPTURE": &action.DisableStandardErrorCapture, + } if n := strings.Index(line, "#"); n >= 0 { for _, option := range strings.Fields(strings.TrimSpace(line[n+1:])) { if n := strings.Index(option, "="); n >= 0 { options[option[:n]] = option[n+1:] + } else if p, ok := flags[strings.ToUpper(option)]; ok { + *p = true } else { - action.Flags = append(action.Flags, strings.ToUpper(option)) + return nil, fmt.Errorf("unknown action flag %q specified", option) } } line = line[:n] @@ -508,7 +520,7 @@ func parse(line string) (*genomics.Action, error) { commands := strings.Fields(strings.TrimSpace(line)) if len(commands) > 0 { if commands[len(commands)-1] == "&" { - action.Flags = append(action.Flags, "RUN_IN_BACKGROUND") + action.RunInBackground = true commands = commands[:len(commands)-1] } action.Commands = []string{"-c", strings.Join(commands, " ")} @@ -726,7 +738,7 @@ func cancelOnInterrupt(ctx context.Context, service *genomics.Service, name stri <-abort fmt.Println("Cancelling operation...") req := &genomics.CancelOperationRequest{} - if _, err := service.Projects.Operations.Cancel(name, req).Context(ctx).Do(); err != nil { + if _, err := service.Projects.Locations.Operations.Cancel(name, req).Context(ctx).Do(); err != nil { fmt.Printf("Failed to cancel operation: %v\n", err) } }() @@ -771,10 +783,11 @@ func gcsFuse(buckets map[string]string) []*genomics.Action { var actions []*genomics.Action for bucket, path := range buckets { actions = append(actions, &genomics.Action{ - ImageUri: "gcr.io/cloud-genomics-pipelines/gcsfuse", - Commands: []string{"--implicit-dirs", "--foreground", bucket, path}, - Flags: []string{"ENABLE_FUSE", "RUN_IN_BACKGROUND"}, - Mounts: []*genomics.Mount{googleRoot}, + ImageUri: "gcr.io/cloud-genomics-pipelines/gcsfuse", + Commands: []string{"--implicit-dirs", "--foreground", bucket, path}, + EnableFuse: true, + RunInBackground: true, + Mounts: []*genomics.Mount{googleRoot}, }) actions = append(actions, &genomics.Action{ ImageUri: "gcr.io/cloud-genomics-pipelines/gcsfuse", @@ -787,9 +800,9 @@ func gcsFuse(buckets map[string]string) []*genomics.Action { func sshDebug(project string) *genomics.Action { return &genomics.Action{ - ImageUri: "gcr.io/cloud-genomics-pipelines/tools", - Entrypoint: "ssh-server", - PortMappings: map[string]int64{"22": 22}, - Flags: []string{"RUN_IN_BACKGROUND"}, + ImageUri: "gcr.io/cloud-genomics-pipelines/tools", + Entrypoint: "ssh-server", + PortMappings: map[string]int64{"22": 22}, + RunInBackground: true, } } diff --git a/pipelines/internal/commands/watch/watch.go b/pipelines/internal/commands/watch/watch.go index 35d0690..c50d49f 100644 --- a/pipelines/internal/commands/watch/watch.go +++ b/pipelines/internal/commands/watch/watch.go @@ -24,7 +24,7 @@ import ( "time" "github.com/googlegenomics/pipelines-tools/pipelines/internal/common" - genomics "google.golang.org/api/genomics/v2alpha1" + genomics "google.golang.org/api/lifesciences/v2beta" ) var ( @@ -34,13 +34,13 @@ var ( details = flags.Bool("details", false, "show event details") ) -func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { +func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error { names := common.ParseFlags(flags, arguments) if len(names) < 1 { return errors.New("missing operation name") } - name := common.ExpandOperationName(project, names[0]) + name := common.ExpandOperationName(project, location, names[0]) result, err := watch(ctx, service, name) if err != nil { return fmt.Errorf("watching pipeline: %v", err) @@ -58,8 +58,9 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa var events []*genomics.Event const initialDelay = 5 * time.Second delay := initialDelay + for { - lro, err := service.Projects.Operations.Get(name).Context(ctx).Do() + lro, err := service.Projects.Locations.Operations.Get(name).Context(ctx).Do() if err != nil { return nil, fmt.Errorf("getting operation status: %v", err) } @@ -84,7 +85,11 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa fmt.Println(timestamp.Format("15:04:05"), metadata.Events[i].Description) if *details { - fmt.Println(string(metadata.Events[i].Details)) + encoded, err := json.MarshalIndent(metadata.Events[i], "", " ") + if err != nil { + return nil, fmt.Errorf("encoding event: %v", err) + } + fmt.Println(string(encoded)) } } events = metadata.Events diff --git a/pipelines/internal/common/common.go b/pipelines/internal/common/common.go index 7b1467c..4118e48 100644 --- a/pipelines/internal/common/common.go +++ b/pipelines/internal/common/common.go @@ -7,16 +7,19 @@ import ( "path" "strings" - genomics "google.golang.org/api/genomics/v2alpha1" + genomics "google.golang.org/api/lifesciences/v2beta" "google.golang.org/genproto/googleapis/rpc/code" ) -// ExpandOperationName adds the project and operations prefixes to name (if +// ExpandOperationName adds the project, location and operations prefixes to name (if // they are not already present). -func ExpandOperationName(project, name string) string { +func ExpandOperationName(project, location, name string) string { if !strings.HasPrefix(name, "projects/") { - if !strings.HasPrefix(name, "operations/") { - name = path.Join("operations/", name) + if !strings.HasPrefix(name, "locations/") { + if !strings.HasPrefix(name, "operations/") { + name = path.Join("operations/", name) + } + name = path.Join("locations", location, name) } name = path.Join("projects", project, name) } diff --git a/pipelines/main.go b/pipelines/main.go index 4685c57..5e7bc13 100644 --- a/pipelines/main.go +++ b/pipelines/main.go @@ -33,14 +33,15 @@ import ( "golang.org/x/oauth2" "golang.org/x/oauth2/google" - genomics "google.golang.org/api/genomics/v2alpha1" + genomics "google.golang.org/api/lifesciences/v2beta" ) var ( project = flag.String("project", defaultProject(), "the cloud project name") + location = flag.String("location", "us-central1", "Google cloud location to store the metadata for the operations") basePath = flag.String("api", "", "the API base to use") - commands = map[string]func(context.Context, *genomics.Service, string, []string) error{ + commands = map[string]func(context.Context, *genomics.Service, string, string, []string) error{ "run": run.Invoke, "cancel": cancel.Invoke, "query": query.Invoke, @@ -76,7 +77,7 @@ func main() { exitf("Failed to create service: %v", err) } - if err := invoke(ctx, service, *project, flag.Args()[1:]); err != nil { + if err := invoke(ctx, service, *project, *location, flag.Args()[1:]); err != nil { exitf("%q: %v", command, err) } } @@ -98,7 +99,7 @@ func newService(ctx context.Context, basePath string) (*genomics.Service, error) ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{Transport: &transport}) - client, err := google.DefaultClient(ctx, genomics.GenomicsScope) + client, err := google.DefaultClient(ctx, genomics.CloudPlatformScope) if err != nil { return nil, fmt.Errorf("creating authenticated client: %v", err) }