Skip to content

Commit

Permalink
merge squash for debug
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Alpar <[email protected]>
  • Loading branch information
aaron-kasten committed Jan 18, 2025
1 parent c1f47e3 commit 410c1f6
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 54 deletions.
50 changes: 49 additions & 1 deletion pkg/kando/process_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@
package kando

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"

"github.com/kanisterio/kanister/pkg/kanx"
"github.com/kanisterio/kanister/pkg/log"
)

const (
processAsJSONFlagName = "as-json"
processSignalProxyFlagName = "signal-proxy"
processAsJSONFlagName = "as-json"
)

func newProcessClientCommand() *cobra.Command {
Expand All @@ -28,6 +38,7 @@ func newProcessClientCommand() *cobra.Command {
Short: "Send commands to the process server",
}
cmd.AddCommand(newProcessClientCreateCommand())
cmd.AddCommand(newProcessClientExecuteCommand())
cmd.AddCommand(newProcessClientGetCommand())
cmd.AddCommand(newProcessClientListCommand())
cmd.AddCommand(newProcessClientSignalCommand())
Expand All @@ -43,3 +54,40 @@ func processAsJSONFlagValue(cmd *cobra.Command) bool {
}
return b
}

func proxySetup(ctx context.Context, addr string, pid int64) {
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal proxy is running for process %d", pid))
signalTermChan := make(chan os.Signal, 1)
signal.Notify(signalTermChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
BREAK:
for {
select {
case sig := <-signalTermChan:
ossig, ok := sig.(syscall.Signal)
if !ok {
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v is invalid, ignored for process %d", sig, pid))
continue
}
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v received for process %d", sig, pid))
_, err := kanx.SignalProcess(ctx, addr, pid, int64(ossig))
if err != nil {
signal.Reset(ossig)
log.Error().WithContext(ctx).WithError(err).Print(fmt.Sprintf("error on signal %v for process %d", sig, pid))
break BREAK
}
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v sent for process %d", sig, pid))
case <-ctx.Done():
break BREAK
}
}
}()
}

func procesSignalProxyAddFlag(cmd *cobra.Command) {
cmd.PersistentFlags().BoolP(processSignalProxyFlagName, "p", false, "pass signals from client to server")
}

func processSignalProxyFlagValue(cmd *cobra.Command) (bool, error) {
return cmd.Flags().GetBool(processSignalProxyFlagName)
}
7 changes: 5 additions & 2 deletions pkg/kando/process_client_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ func runProcessClientCreateWithOutput(out io.Writer, cmd *cobra.Command, args []
asJSON := processAsJSONFlagValue(cmd)
cmd.SilenceUsage = true
p, err := kanx.CreateProcess(cmd.Context(), addr, args[0], args[1:])
if err != nil {
return err
}
if asJSON {
buf, err := protojson.Marshal(p)
if err != nil {
return err
}
fmt.Fprintln(out, string(buf))
} else {
fmt.Fprintln(out, "Process: ", p.String())
fmt.Fprintln(out, "Process: ", p)
}
return err
return nil
}
86 changes: 86 additions & 0 deletions pkg/kando/process_client_execute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2020 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kando

import (
"context"
"fmt"
"io"

"github.com/kanisterio/errkit"
"github.com/spf13/cobra"
"google.golang.org/protobuf/encoding/protojson"

"github.com/kanisterio/kanister/pkg/kanx"
)

func newProcessClientExecuteCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "execute CMD ARG...",
Short: "execute a new managed process",
Args: cobra.MinimumNArgs(1),
RunE: runProcessClientExecute,
}
procesSignalProxyAddFlag(cmd)
return cmd
}

func runProcessClientExecute(cmd *cobra.Command, args []string) error {
return runProcessClientExecuteWithOutput(cmd.OutOrStdout(), cmd.ErrOrStderr(), cmd, args)
}

func runProcessClientExecuteWithOutput(stdout, stderr io.Writer, cmd *cobra.Command, args []string) error {
addr, err := processAddressFlagValue(cmd)
if err != nil {
return err
}
proxy, err := processSignalProxyFlagValue(cmd)
if err != nil {
return err
}
asJSON := processAsJSONFlagValue(cmd)
cmd.SilenceUsage = true
ctx, canfn := context.WithCancel(cmd.Context())
defer canfn()
p, err := kanx.CreateProcess(ctx, addr, args[0], args[1:])
if err != nil {
return err
}
if asJSON {
buf, err := protojson.Marshal(p)
if err != nil {
return err
}
fmt.Fprintln(stdout, string(buf))
} else {
fmt.Fprintln(stdout, "Process: ", p)
}

pid := p.Pid
errc := make(chan error)
if proxy {
proxySetup(ctx, addr, pid)
}
go func() { errc <- kanx.Stdout(ctx, addr, pid, stdout) }()
go func() { errc <- kanx.Stderr(ctx, addr, pid, stderr) }()
for i := 0; i < 2; i++ {
err0 := <-errc
// workaround bug in errkit
if err0 != nil {
err = errkit.Append(err, err0)
}
}
return err
}
2 changes: 1 addition & 1 deletion pkg/kando/process_client_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func runProcessClientGetWithOutput(out io.Writer, cmd *cobra.Command, args []str
}
fmt.Fprintln(out, string(buf))
} else {
fmt.Fprintln(out, "Process: ", p.String())
fmt.Fprintln(out, "Process: ", p)
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/kando/process_client_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func runProcessClientListWithOutput(out io.Writer, cmd *cobra.Command) error {
}
fmt.Fprintln(out, string(buf))
} else {
fmt.Fprintln(out, "Process: ", p.String())
fmt.Fprintln(out, "Process: ", p)
}
}
return nil
Expand Down
49 changes: 4 additions & 45 deletions pkg/kando/process_client_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,13 @@ package kando

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/kanisterio/errkit"
"github.com/spf13/cobra"

"github.com/kanisterio/kanister/pkg/kanx"
"github.com/kanisterio/kanister/pkg/log"
)

const (
processSignalProxyFlagName = "signal-proxy"
)

func newProcessClientOutputCommand() *cobra.Command {
Expand All @@ -41,47 +32,14 @@ func newProcessClientOutputCommand() *cobra.Command {
Args: cobra.ExactArgs(1),
RunE: runProcessClientOutput,
}
cmd.PersistentFlags().BoolP(processSignalProxyFlagName, "p", false, "pass signals from client to server")
procesSignalProxyAddFlag(cmd)
return cmd
}

func processSignalProxyFlagValue(cmd *cobra.Command) (bool, error) {
return cmd.Flags().GetBool(processSignalProxyFlagName)
}

func runProcessClientOutput(cmd *cobra.Command, args []string) error {
return runProcessClientOutputWithOutput(cmd.OutOrStdout(), cmd.ErrOrStderr(), cmd, args)
}

func proxySetup(ctx context.Context, addr string, pid int64) {
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal proxy is running for process %d", pid))
signalTermChan := make(chan os.Signal, 1)
signal.Notify(signalTermChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
BREAK:
for {
select {
case sig := <-signalTermChan:
ossig, ok := sig.(syscall.Signal)
if !ok {
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v is invalid, ignored for process %d", sig, pid))
continue
}
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v received for process %d", sig, pid))
_, err := kanx.SignalProcess(ctx, addr, pid, int64(ossig))
if err != nil {
signal.Reset(ossig)
log.Error().WithContext(ctx).WithError(err).Print(fmt.Sprintf("error on signal %v for process %d", sig, pid))
break BREAK
}
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v sent for process %d", sig, pid))
case <-ctx.Done():
break BREAK
}
}
}()
}

func runProcessClientOutputWithOutput(stdout, stderr io.Writer, cmd *cobra.Command, args []string) error {
pid, err := strconv.ParseInt(args[0], 0, 64)
if err != nil {
Expand All @@ -95,20 +53,21 @@ func runProcessClientOutputWithOutput(stdout, stderr io.Writer, cmd *cobra.Comma
if err != nil {
return err
}
cmd.SilenceUsage = true
ctx, canfn := context.WithCancel(cmd.Context())
defer canfn()
errc := make(chan error)
if proxy {
proxySetup(ctx, addr, pid)
}
cmd.SilenceUsage = true
go func() { errc <- kanx.Stdout(ctx, addr, pid, stdout) }()
go func() { errc <- kanx.Stderr(ctx, addr, pid, stderr) }()
for i := 0; i < 2; i++ {
err0 := <-errc
if err0 != nil {
// workaround bug in errkit
err = errkit.Append(err, err0)
}
}
canfn()
return err
}
2 changes: 1 addition & 1 deletion pkg/kando/process_client_signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func runProcessClientSignalWithOutput(out io.Writer, cmd *cobra.Command, args []
}
fmt.Fprintln(out, string(buf))
} else {
fmt.Fprintln(out, "Process: ", p.String())
fmt.Fprintln(out, "Process: ", p)
}
return nil
}
5 changes: 5 additions & 0 deletions pkg/kopia/command/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func bashCommand(args logsafe.Cmd) []string {
return []string{"bash", "-o", "errexit", "-c", args.PlainText()}
}

func kanxCommand(args logsafe.Cmd) []string {
log.Info().Print("Kopia Command", field.M{"Command": args.String()})
return append([]string{"kando", "process", "client", "execute", "--signal-proxy", "--as-json", "--"}, args.StringSliceCMD()...)
}

func stringSliceCommand(args logsafe.Cmd) []string {
log.Info().Print("Kopia Command", field.M{"Command": args.String()})
return args.StringSliceCMD()
Expand Down
15 changes: 12 additions & 3 deletions pkg/kopia/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package command

import (
"github.com/kanisterio/kanister/pkg/kopia/cli/args"
"github.com/kanisterio/kanister/pkg/logsafe"
)

type ServerStartCommandArgs struct {
Expand All @@ -35,8 +36,7 @@ type ServerStartCommandArgs struct {
MetricsListenAddress string
}

// ServerStart returns the kopia command for starting the Kopia API Server
func ServerStart(cmdArgs ServerStartCommandArgs) []string {
func commonCommand(cmdArgs ServerStartCommandArgs) logsafe.Cmd {
args := commonArgs(&CommandArgs{ConfigFilePath: cmdArgs.ConfigFilePath, LogDirectory: cmdArgs.LogDirectory})

if cmdArgs.AutoGenerateCert {
Expand Down Expand Up @@ -76,8 +76,17 @@ func ServerStart(cmdArgs ServerStartCommandArgs) []string {
// To start the server and run in the background
args = args.AppendLoggable(redirectToDevNull, runInBackground)
}
return args
}

// ServerStart returns the kopia command for starting the Kopia API Server
func ServerStart(cmdArgs ServerStartCommandArgs) []string {
return bashCommand(commonCommand(cmdArgs))
}

return bashCommand(args)
// ServerStartKanx returns the kopia command for starting the Kopia API Server
func ServerStartKanx(cmdArgs ServerStartCommandArgs) []string {
return kanxCommand(commonCommand(cmdArgs))
}

type ServerRefreshCommandArgs struct {
Expand Down

0 comments on commit 410c1f6

Please sign in to comment.