Skip to content

Commit

Permalink
feat: I: proxy signals to subordinate processes ... (#3335)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Alpar <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
aaron-kasten and mergify[bot] authored Jan 18, 2025
1 parent 01c766c commit c1f47e3
Showing 1 changed file with 67 additions and 4 deletions.
71 changes: 67 additions & 4 deletions pkg/kando/process_client_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@
package kando

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/kanisterio/errkit"
"github.com/spf13/cobra"

"github.com/kanisterio/kanister/pkg/kanx"
"github.com/kanisterio/kanister/pkg/log"
)

const (
processSignalProxyFlagName = "signal-proxy"
)

func newProcessClientOutputCommand() *cobra.Command {
Expand All @@ -30,22 +41,74 @@ func newProcessClientOutputCommand() *cobra.Command {
Args: cobra.ExactArgs(1),
RunE: runProcessClientOutput,
}
cmd.PersistentFlags().BoolP(processSignalProxyFlagName, "p", false, "pass signals from client to server")
return cmd
}

func processSignalProxyFlagValue(cmd *cobra.Command) (bool, error) {
return cmd.Flags().GetBool(processSignalProxyFlagName)
}

func runProcessClientOutput(cmd *cobra.Command, args []string) error {
return runProcessClientOutputWithOutput(cmd.OutOrStdout(), cmd, args)
return runProcessClientOutputWithOutput(cmd.OutOrStdout(), cmd.ErrOrStderr(), cmd, args)
}

func proxySetup(ctx context.Context, addr string, pid int64) {
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal proxy is running for process %d", pid))
signalTermChan := make(chan os.Signal, 1)
signal.Notify(signalTermChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
BREAK:
for {
select {
case sig := <-signalTermChan:
ossig, ok := sig.(syscall.Signal)
if !ok {
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v is invalid, ignored for process %d", sig, pid))
continue
}
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v received for process %d", sig, pid))
_, err := kanx.SignalProcess(ctx, addr, pid, int64(ossig))
if err != nil {
signal.Reset(ossig)
log.Error().WithContext(ctx).WithError(err).Print(fmt.Sprintf("error on signal %v for process %d", sig, pid))
break BREAK
}
log.Info().WithContext(ctx).Print(fmt.Sprintf("signal %v sent for process %d", sig, pid))
case <-ctx.Done():
break BREAK
}
}
}()
}

func runProcessClientOutputWithOutput(out io.Writer, cmd *cobra.Command, args []string) error {
pid, err := strconv.Atoi(args[0])
func runProcessClientOutputWithOutput(stdout, stderr io.Writer, cmd *cobra.Command, args []string) error {
pid, err := strconv.ParseInt(args[0], 0, 64)
if err != nil {
return err
}
addr, err := processAddressFlagValue(cmd)
if err != nil {
return err
}
proxy, err := processSignalProxyFlagValue(cmd)
if err != nil {
return err
}
ctx, canfn := context.WithCancel(cmd.Context())
errc := make(chan error)
if proxy {
proxySetup(ctx, addr, pid)
}
cmd.SilenceUsage = true
return kanx.Stdout(cmd.Context(), addr, int64(pid), out)
go func() { errc <- kanx.Stdout(ctx, addr, pid, stdout) }()
go func() { errc <- kanx.Stderr(ctx, addr, pid, stderr) }()
for i := 0; i < 2; i++ {
err0 := <-errc
if err0 != nil {
err = errkit.Append(err, err0)
}
}
canfn()
return err
}

0 comments on commit c1f47e3

Please sign in to comment.