diff --git a/pkg/kando/process_client.go b/pkg/kando/process_client.go index 985f853e2f..a26f9f22d1 100644 --- a/pkg/kando/process_client.go +++ b/pkg/kando/process_client.go @@ -15,11 +15,21 @@ package kando import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "github.com/spf13/cobra" + + "github.com/kanisterio/kanister/pkg/kanx" + "github.com/kanisterio/kanister/pkg/log" ) const ( - processAsJSONFlagName = "as-json" + processSignalProxyFlagName = "signal-proxy" + processAsJSONFlagName = "as-json" ) func newProcessClientCommand() *cobra.Command { @@ -28,6 +38,7 @@ func newProcessClientCommand() *cobra.Command { Short: "Send commands to the process server", } cmd.AddCommand(newProcessClientCreateCommand()) + cmd.AddCommand(newProcessClientExecuteCommand()) cmd.AddCommand(newProcessClientGetCommand()) cmd.AddCommand(newProcessClientListCommand()) cmd.AddCommand(newProcessClientSignalCommand()) @@ -43,3 +54,40 @@ func processAsJSONFlagValue(cmd *cobra.Command) bool { } return b } + +func proxySetup(ctx context.Context, addr string, pid int64) { + log.Info().WithContext(ctx).Print(fmt.Sprintf("signal proxy is running for process %d", pid)) + signalTermChan := make(chan os.Signal, 1) + signal.Notify(signalTermChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + BREAK: + for { + select { + case sig := <-signalTermChan: + ossig, ok := sig.(syscall.Signal) + if !ok { + log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v is invalid, ignored for process %d", sig, pid)) + continue + } + log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v received for process %d", sig, pid)) + _, err := kanx.SignalProcess(ctx, addr, pid, int64(ossig)) + if err != nil { + signal.Reset(ossig) + log.Error().WithContext(ctx).WithError(err).Print(fmt.Sprintf("error on signal %v for process %d", sig, pid)) + break BREAK + } + log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v sent for process %d", sig, pid)) + case <-ctx.Done(): + break BREAK + } + } + }() +} + +func procesSignalProxyAddFlag(cmd *cobra.Command) { + cmd.PersistentFlags().BoolP(processSignalProxyFlagName, "p", false, "pass signals from client to server") +} + +func processSignalProxyFlagValue(cmd *cobra.Command) (bool, error) { + return cmd.Flags().GetBool(processSignalProxyFlagName) +} diff --git a/pkg/kando/process_client_create.go b/pkg/kando/process_client_create.go index d4e85cfe90..cc514ca063 100644 --- a/pkg/kando/process_client_create.go +++ b/pkg/kando/process_client_create.go @@ -46,6 +46,9 @@ func runProcessClientCreateWithOutput(out io.Writer, cmd *cobra.Command, args [] asJSON := processAsJSONFlagValue(cmd) cmd.SilenceUsage = true p, err := kanx.CreateProcess(cmd.Context(), addr, args[0], args[1:]) + if err != nil { + return err + } if asJSON { buf, err := protojson.Marshal(p) if err != nil { @@ -53,7 +56,7 @@ func runProcessClientCreateWithOutput(out io.Writer, cmd *cobra.Command, args [] } fmt.Fprintln(out, string(buf)) } else { - fmt.Fprintln(out, "Process: ", p.String()) + fmt.Fprintln(out, "Process: ", p) } - return err + return nil } diff --git a/pkg/kando/process_client_execute.go b/pkg/kando/process_client_execute.go new file mode 100644 index 0000000000..60adaba77a --- /dev/null +++ b/pkg/kando/process_client_execute.go @@ -0,0 +1,86 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "context" + "fmt" + "io" + + "github.com/kanisterio/errkit" + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/kanisterio/kanister/pkg/kanx" +) + +func newProcessClientExecuteCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "execute CMD ARG...", + Short: "execute a new managed process", + Args: cobra.MinimumNArgs(1), + RunE: runProcessClientExecute, + } + procesSignalProxyAddFlag(cmd) + return cmd +} + +func runProcessClientExecute(cmd *cobra.Command, args []string) error { + return runProcessClientExecuteWithOutput(cmd.OutOrStdout(), cmd.ErrOrStderr(), cmd, args) +} + +func runProcessClientExecuteWithOutput(stdout, stderr io.Writer, cmd *cobra.Command, args []string) error { + addr, err := processAddressFlagValue(cmd) + if err != nil { + return err + } + proxy, err := processSignalProxyFlagValue(cmd) + if err != nil { + return err + } + asJSON := processAsJSONFlagValue(cmd) + cmd.SilenceUsage = true + ctx, canfn := context.WithCancel(cmd.Context()) + defer canfn() + p, err := kanx.CreateProcess(ctx, addr, args[0], args[1:]) + if err != nil { + return err + } + if asJSON { + buf, err := protojson.Marshal(p) + if err != nil { + return err + } + fmt.Fprintln(stdout, string(buf)) + } else { + fmt.Fprintln(stdout, "Process: ", p) + } + + pid := p.Pid + errc := make(chan error) + if proxy { + proxySetup(ctx, addr, pid) + } + go func() { errc <- kanx.Stdout(ctx, addr, pid, stdout) }() + go func() { errc <- kanx.Stderr(ctx, addr, pid, stderr) }() + for i := 0; i < 2; i++ { + err0 := <-errc + // workaround bug in errkit + if err0 != nil { + err = errkit.Append(err, err0) + } + } + return err +} diff --git a/pkg/kando/process_client_get.go b/pkg/kando/process_client_get.go index bbe7d33fb6..7ea8e1bf2a 100644 --- a/pkg/kando/process_client_get.go +++ b/pkg/kando/process_client_get.go @@ -61,7 +61,7 @@ func runProcessClientGetWithOutput(out io.Writer, cmd *cobra.Command, args []str } fmt.Fprintln(out, string(buf)) } else { - fmt.Fprintln(out, "Process: ", p.String()) + fmt.Fprintln(out, "Process: ", p) } return nil } diff --git a/pkg/kando/process_client_list.go b/pkg/kando/process_client_list.go index 5101bc1ce1..ee841c97aa 100644 --- a/pkg/kando/process_client_list.go +++ b/pkg/kando/process_client_list.go @@ -57,7 +57,7 @@ func runProcessClientListWithOutput(out io.Writer, cmd *cobra.Command) error { } fmt.Fprintln(out, string(buf)) } else { - fmt.Fprintln(out, "Process: ", p.String()) + fmt.Fprintln(out, "Process: ", p) } } return nil diff --git a/pkg/kando/process_client_output.go b/pkg/kando/process_client_output.go index 110d58e82a..6d5c721230 100644 --- a/pkg/kando/process_client_output.go +++ b/pkg/kando/process_client_output.go @@ -16,22 +16,13 @@ package kando import ( "context" - "fmt" "io" - "os" - "os/signal" "strconv" - "syscall" "github.com/kanisterio/errkit" "github.com/spf13/cobra" "github.com/kanisterio/kanister/pkg/kanx" - "github.com/kanisterio/kanister/pkg/log" -) - -const ( - processSignalProxyFlagName = "signal-proxy" ) func newProcessClientOutputCommand() *cobra.Command { @@ -41,47 +32,14 @@ func newProcessClientOutputCommand() *cobra.Command { Args: cobra.ExactArgs(1), RunE: runProcessClientOutput, } - cmd.PersistentFlags().BoolP(processSignalProxyFlagName, "p", false, "pass signals from client to server") + procesSignalProxyAddFlag(cmd) return cmd } -func processSignalProxyFlagValue(cmd *cobra.Command) (bool, error) { - return cmd.Flags().GetBool(processSignalProxyFlagName) -} - func runProcessClientOutput(cmd *cobra.Command, args []string) error { return runProcessClientOutputWithOutput(cmd.OutOrStdout(), cmd.ErrOrStderr(), cmd, args) } -func proxySetup(ctx context.Context, addr string, pid int64) { - log.Info().WithContext(ctx).Print(fmt.Sprintf("signal proxy is running for process %d", pid)) - signalTermChan := make(chan os.Signal, 1) - signal.Notify(signalTermChan, syscall.SIGINT, syscall.SIGTERM) - go func() { - BREAK: - for { - select { - case sig := <-signalTermChan: - ossig, ok := sig.(syscall.Signal) - if !ok { - log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v is invalid, ignored for process %d", sig, pid)) - continue - } - log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v received for process %d", sig, pid)) - _, err := kanx.SignalProcess(ctx, addr, pid, int64(ossig)) - if err != nil { - signal.Reset(ossig) - log.Error().WithContext(ctx).WithError(err).Print(fmt.Sprintf("error on signal %v for process %d", sig, pid)) - break BREAK - } - log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v sent for process %d", sig, pid)) - case <-ctx.Done(): - break BREAK - } - } - }() -} - func runProcessClientOutputWithOutput(stdout, stderr io.Writer, cmd *cobra.Command, args []string) error { pid, err := strconv.ParseInt(args[0], 0, 64) if err != nil { @@ -95,20 +53,21 @@ func runProcessClientOutputWithOutput(stdout, stderr io.Writer, cmd *cobra.Comma if err != nil { return err } + cmd.SilenceUsage = true ctx, canfn := context.WithCancel(cmd.Context()) + defer canfn() errc := make(chan error) if proxy { proxySetup(ctx, addr, pid) } - cmd.SilenceUsage = true go func() { errc <- kanx.Stdout(ctx, addr, pid, stdout) }() go func() { errc <- kanx.Stderr(ctx, addr, pid, stderr) }() for i := 0; i < 2; i++ { err0 := <-errc if err0 != nil { + // workaround bug in errkit err = errkit.Append(err, err0) } } - canfn() return err } diff --git a/pkg/kando/process_client_signal.go b/pkg/kando/process_client_signal.go index 785838b6bf..b887851bba 100644 --- a/pkg/kando/process_client_signal.go +++ b/pkg/kando/process_client_signal.go @@ -65,7 +65,7 @@ func runProcessClientSignalWithOutput(out io.Writer, cmd *cobra.Command, args [] } fmt.Fprintln(out, string(buf)) } else { - fmt.Fprintln(out, "Process: ", p.String()) + fmt.Fprintln(out, "Process: ", p) } return nil } diff --git a/pkg/kopia/command/common.go b/pkg/kopia/command/common.go index 2d9d284554..c96bc93faf 100644 --- a/pkg/kopia/command/common.go +++ b/pkg/kopia/command/common.go @@ -64,6 +64,11 @@ func bashCommand(args logsafe.Cmd) []string { return []string{"bash", "-o", "errexit", "-c", args.PlainText()} } +func kanxCommand(args logsafe.Cmd) []string { + log.Info().Print("Kopia Command", field.M{"Command": args.String()}) + return append([]string{"kando", "process", "client", "execute", "--signal-proxy", "--as-json", "--"}, args.StringSliceCMD()...) +} + func stringSliceCommand(args logsafe.Cmd) []string { log.Info().Print("Kopia Command", field.M{"Command": args.String()}) return args.StringSliceCMD() diff --git a/pkg/kopia/command/server.go b/pkg/kopia/command/server.go index 9cebd00ebb..c52ef537d1 100644 --- a/pkg/kopia/command/server.go +++ b/pkg/kopia/command/server.go @@ -16,6 +16,7 @@ package command import ( "github.com/kanisterio/kanister/pkg/kopia/cli/args" + "github.com/kanisterio/kanister/pkg/logsafe" ) type ServerStartCommandArgs struct { @@ -35,8 +36,7 @@ type ServerStartCommandArgs struct { MetricsListenAddress string } -// ServerStart returns the kopia command for starting the Kopia API Server -func ServerStart(cmdArgs ServerStartCommandArgs) []string { +func commonCommand(cmdArgs ServerStartCommandArgs) logsafe.Cmd { args := commonArgs(&CommandArgs{ConfigFilePath: cmdArgs.ConfigFilePath, LogDirectory: cmdArgs.LogDirectory}) if cmdArgs.AutoGenerateCert { @@ -76,8 +76,17 @@ func ServerStart(cmdArgs ServerStartCommandArgs) []string { // To start the server and run in the background args = args.AppendLoggable(redirectToDevNull, runInBackground) } + return args +} + +// ServerStart returns the kopia command for starting the Kopia API Server +func ServerStart(cmdArgs ServerStartCommandArgs) []string { + return bashCommand(commonCommand(cmdArgs)) +} - return bashCommand(args) +// ServerStartKanx returns the kopia command for starting the Kopia API Server +func ServerStartKanx(cmdArgs ServerStartCommandArgs) []string { + return kanxCommand(commonCommand(cmdArgs)) } type ServerRefreshCommandArgs struct {