diff --git a/pkg/kando/process_client_signal.go b/pkg/kando/process_client_signal.go index 8fdddd8fc9..da727c4c1b 100644 --- a/pkg/kando/process_client_signal.go +++ b/pkg/kando/process_client_signal.go @@ -40,11 +40,11 @@ func runProcessClientSignal(cmd *cobra.Command, args []string) error { } func runProcessClientSignalWithOutput(out io.Writer, cmd *cobra.Command, args []string) error { - pid, err := strconv.Atoi(args[0]) + pid, err := strconv.ParseInt(args[0], 0, 64) if err != nil { return err } - signal, err := strconv.Atoi(args[1]) + signal, err := strconv.ParseInt(args[1], 0, 64) if err != nil { return err } @@ -54,7 +54,7 @@ func runProcessClientSignalWithOutput(out io.Writer, cmd *cobra.Command, args [] } asJSON := processAsJSONFlagValue(cmd) cmd.SilenceUsage = true - p, err := kanx.SignalProcess(cmd.Context(), addr, int64(pid), int32(signal)) + p, err := kanx.SignalProcess(cmd.Context(), addr, pid, signal) if err != nil { return err } diff --git a/pkg/kanx/client.go b/pkg/kanx/client.go index 1b688cada4..5deb965e34 100644 --- a/pkg/kanx/client.go +++ b/pkg/kanx/client.go @@ -80,7 +80,7 @@ func ListProcesses(ctx context.Context, addr string) ([]*Process, error) { } } -func SignalProcess(ctx context.Context, addr string, pid int64, signal int32) (*Process, error) { +func SignalProcess(ctx context.Context, addr string, pid int64, signal int64) (*Process, error) { conn, err := newGRPCConnection(addr) if err != nil { return nil, err diff --git a/pkg/kanx/kanx.pb.go b/pkg/kanx/kanx.pb.go index be2ee97cbc..4c6458187d 100644 --- a/pkg/kanx/kanx.pb.go +++ b/pkg/kanx/kanx.pb.go @@ -207,7 +207,7 @@ func (x *ProcessPidRequest) GetPid() int64 { type SignalProcessRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` - Signal int32 `protobuf:"varint,2,opt,name=signal,proto3" json:"signal,omitempty"` + Signal int64 `protobuf:"varint,2,opt,name=signal,proto3" json:"signal,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -249,7 +249,7 @@ func (x *SignalProcessRequest) GetPid() int64 { return 0 } -func (x *SignalProcessRequest) GetSignal() int32 { +func (x *SignalProcessRequest) GetSignal() int64 { if x != nil { return x.Signal } @@ -384,7 +384,7 @@ var file_pkg_kanx_kanx_proto_rawDesc = []byte{ 0x67, 0x6e, 0x61, 0x6c, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x7b, 0x0a, 0x07, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x7b, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, diff --git a/pkg/kanx/kanx.proto b/pkg/kanx/kanx.proto index f3c0035445..072a45a689 100644 --- a/pkg/kanx/kanx.proto +++ b/pkg/kanx/kanx.proto @@ -27,7 +27,7 @@ message ProcessPidRequest { message SignalProcessRequest { int64 pid = 1; - int32 signal = 2; + int64 signal = 2; } message Process { diff --git a/pkg/kanx/kanx_test.go b/pkg/kanx/kanx_test.go index 942fd5e215..7d89f39ad3 100644 --- a/pkg/kanx/kanx_test.go +++ b/pkg/kanx/kanx_test.go @@ -210,7 +210,7 @@ func (s *KanXSuite) TestSignalProcess_Int(c *C) { c.Assert(err, IsNil) // test SignalProcess, SIGINT - p0, err := SignalProcess(ctx, addr, p.GetPid(), int32(syscall.SIGINT)) + p0, err := SignalProcess(ctx, addr, p.GetPid(), int64(syscall.SIGINT)) c.Assert(err, IsNil) c.Assert(p0.GetPid(), Equals, p.GetPid()) c.Assert(p0.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) @@ -252,7 +252,7 @@ func (s *KanXSuite) TestSignalProcess_Stp(c *C) { c.Assert(err, IsNil) // test SignalProcess, SIGSTOP - p0, err := SignalProcess(ctx, addr, p.GetPid(), int32(syscall.SIGSTOP)) + p0, err := SignalProcess(ctx, addr, p.GetPid(), int64(syscall.SIGSTOP)) c.Assert(err, IsNil) c.Assert(p0.GetPid(), Equals, p.GetPid()) c.Assert(p0.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) @@ -260,7 +260,7 @@ func (s *KanXSuite) TestSignalProcess_Stp(c *C) { c.Assert(p0.GetExitCode(), Equals, int64(0)) // test SignalProcess, SIGCONT - p0, err = SignalProcess(ctx, addr, p.GetPid(), int32(syscall.SIGCONT)) + p0, err = SignalProcess(ctx, addr, p.GetPid(), int64(syscall.SIGCONT)) c.Assert(err, IsNil) c.Assert(p0.GetPid(), Equals, p.GetPid()) c.Assert(p0.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) @@ -301,7 +301,7 @@ func (s *KanXSuite) TestSignalProcess_Kill(c *C) { c.Assert(err, IsNil) // test SignalProcess, SIGKILL - p0, err := SignalProcess(ctx, addr, p.GetPid(), int32(syscall.SIGKILL)) + p0, err := SignalProcess(ctx, addr, p.GetPid(), int64(syscall.SIGKILL)) c.Assert(err, IsNil) c.Assert(p0.GetPid(), Equals, p.GetPid()) c.Assert(p0.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) diff --git a/pkg/kanx/server.go b/pkg/kanx/server.go index 9c2d13c1d1..10b9633ac1 100644 --- a/pkg/kanx/server.go +++ b/pkg/kanx/server.go @@ -34,6 +34,8 @@ type processServiceServer struct { } type process struct { + // many reads on process data and only a write on process exit - use RWMutex. + // minimal risk of reads blocking writes. mu *sync.RWMutex cmd *exec.Cmd doneCh chan struct{} @@ -90,14 +92,21 @@ func (s *processServiceServer) CreateProcess(_ context.Context, cpr *CreateProce // one goroutine in server per forked process. link between pid and output files will be lost // if &process structure is lost. go func() { - // wait until process is finished + // wait until process is finished. do not use lock as there may be readers or writers + // on p and cmd is not expected to change (the state in cmd is system managed) err := p.cmd.Wait() // possible readers concurrent to write: lock the p structure for exit status update. + // keep the lock period as short as possible. remove the possibility of blocking + // on log writes by moving them outside the region of the lock. + // go doesn't have lock promotion, so there's a small gap here from when Wait finishes + // until acquiring a write lock. p.mu.Lock() p.err = err if exiterr, ok := err.(*exec.ExitError); ok { p.exitCode = exiterr.ExitCode() } + // no action will be taken on close errors, so just save the errors for logging + // later stdoutErr := stdoutfd.Close() stderrErr := stderrfd.Close() can() @@ -145,12 +154,14 @@ func (s *processServiceServer) SignalProcess(ctx context.Context, grp *SignalPro } // low level signal call syssig := syscall.Signal(grp.Signal) + p.mu.Lock() + defer p.mu.Unlock() err := p.cmd.Process.Signal(syssig) if err != nil { // `fault` tracks IPC errors p.fault = err } - return processToProtoWithLock(p), err + return processToProto(p), err } func (s *processServiceServer) ListProcesses(lpr *ListProcessesRequest, lps ProcessService_ListProcessesServer) error {