From e1f65d47d153950bfb22f3d998d485c7290571e6 Mon Sep 17 00:00:00 2001 From: Adam Mihelcsik Date: Wed, 2 Feb 2022 13:36:07 +0000 Subject: [PATCH 1/2] set timeout on SendMsg - this helps to avoid socket leaks when channel is full --- pkg/server/backend_manager.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index 6fd92026b..75d2a46fe 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/klog/v2" client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" pkgagent "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -85,7 +87,25 @@ type backend struct { func (b *backend) Send(p *client.Packet) error { b.mu.Lock() defer b.mu.Unlock() - return b.conn.Send(p) + errChan := make(chan error, 1) + go func() { + err := b.conn.Send(p) + errChan <- err + close(errChan) + if err != nil { + klog.ErrorS(err, "SendMsg has exited abnormally") + } + }() + t := time.NewTimer(10 * time.Second) + select { + case <-t.C: + return status.Errorf(codes.DeadlineExceeded, "conn.Send has timed out") + case err := <-errChan: + if !t.Stop() { + <-t.C + } + return err + } } func (b *backend) Context() context.Context { From 963084eec285c6c8ea99a8ba924ca14005d242d3 Mon Sep 17 00:00:00 2001 From: Adam Mihelcsik Date: Wed, 23 Feb 2022 18:43:18 +0000 Subject: [PATCH 2/2] - add reference why the wrapper is needed - simplify the anonymous func() --- pkg/server/backend_manager.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index 75d2a46fe..b6c6d4735 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/klog/v2" client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" pkgagent "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -87,14 +88,13 @@ type backend struct { func (b *backend) Send(p *client.Packet) error { b.mu.Lock() defer b.mu.Unlock() + // https://github.com/grpc/grpc-go/issues/1229 + // wrap a timer for around SendMsg to avoid blocking grpc call + // (e.g. stream is full) errChan := make(chan error, 1) go func() { - err := b.conn.Send(p) - errChan <- err + errChan <- b.conn.Send(p) close(errChan) - if err != nil { - klog.ErrorS(err, "SendMsg has exited abnormally") - } }() t := time.NewTimer(10 * time.Second) select {