Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix stream termination in MRD. #11432

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
31 changes: 24 additions & 7 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
delete(rr.mp, key)
}
}
drainInboundReadStream(rr.stream)
rr.mu.Unlock()
return
case currentSpec = <-rr.data:
Expand Down Expand Up @@ -1257,7 +1258,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
for {
select {
case <-rr.ctx.Done():
rr.mu.Lock()
rr.done = true
rr.mu.Unlock()
return
case <-rr.receiverRetry:
return
Expand Down Expand Up @@ -1474,6 +1477,10 @@ func (mr *gRPCBidiReader) wait() {

// Close will notify stream manager goroutine that the reader has been closed, if it's still running.
func (mr *gRPCBidiReader) close() error {
// Before release of resource we close the client->server connection.
if err := mr.stream.CloseSend(); err != nil {
return err
}
Copy link
Contributor

@BrennaEpp BrennaEpp Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also drain the stream after the CloseSend (as we do in drainInboundStream(), receiving from stream until we get a non-nil error) to make sure its resources are released? See grpc/grpc-go@365770f

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the best practice, applied this.
Using stream.recv just for determination of error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually i am not sure that will be get any outputs here if we call stream.Recv post CloseSend(). What if all the responses where consumed by streamReceiver go routine?

Although there are some cases when we close the stream even with requests added i have drained responses there.

LMK your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Recv() continually returns the err (io.EOF or something else) once the stream is done? This should be easy to check with a toy program.

However, multiple concurrent calls to Recv() are not allowed, so if streamReceiver goroutine may be calling Recv(), you have to be careful not to call Recv() elsewhere until streamReceiver is done. I think it's probably easiest for all Recv() calls to live on that one goroutine.

It's maybe easiest to call CloseSend() on the same goroutine which calls Send()? Then you have one goroutine for Send/CloseSend, and another for Recv, and user code can cancel the context then call Close() to trigger a cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Chris for the suggestion.
That kind of simplifies structure too.

Now one go routine has all Recv() calls and other has all Send()/CloseSend() calls.

if mr.cancel != nil {
mr.cancel()
}
Expand All @@ -1486,6 +1493,16 @@ func (mr *gRPCBidiReader) close() error {
return nil
}

// drainInboundReadStream calls stream.Recv() repeatedly until an error is returned.
// drainInboundReadStream always returns a non-nil error. io.EOF indicates all
// messages were successfully read.
func drainInboundReadStream(stream storagepb.Storage_BidiReadObjectClient) (err error) {
for err == nil {
_, err = stream.Recv()
}
return err
}

func (mrr *gRPCBidiReader) getHandle() []byte {
return mrr.readHandle
}
Expand Down Expand Up @@ -2628,11 +2645,11 @@ func bucketContext(ctx context.Context, bucket string) context.Context {
return gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
}

// drainInboundStream calls stream.Recv() repeatedly until an error is returned.
// drainInboundWriteStream calls stream.Recv() repeatedly until an error is returned.
// It returns the last Resource received on the stream, or nil if no Resource
// was returned. drainInboundStream always returns a non-nil error. io.EOF
// was returned. drainInboundWriteStream always returns a non-nil error. io.EOF
// indicates all messages were successfully read.
func drainInboundStream(stream storagepb.Storage_BidiWriteObjectClient) (object *storagepb.Object, err error) {
func drainInboundWriteStream(stream storagepb.Storage_BidiWriteObjectClient) (object *storagepb.Object, err error) {
for err == nil {
var resp *storagepb.BidiWriteObjectResponse
resp, err = stream.Recv()
Expand Down Expand Up @@ -2714,7 +2731,7 @@ func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(buf []byte, offset int64,

sendErr := s.stream.Send(req)
if sendErr != nil {
obj, err = drainInboundStream(s.stream)
obj, err = drainInboundWriteStream(s.stream)
s.stream = nil
if sendErr != io.EOF {
err = sendErr
Expand All @@ -2727,7 +2744,7 @@ func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(buf []byte, offset int64,
s.stream.CloseSend()
// Oneshot uploads only read from the response stream on completion or
// failure
obj, err = drainInboundStream(s.stream)
obj, err = drainInboundWriteStream(s.stream)
s.stream = nil
if err == io.EOF {
err = nil
Expand Down Expand Up @@ -2842,7 +2859,7 @@ func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(buf []byte, offset int64

sendErr := s.stream.Send(req)
if sendErr != nil {
obj, err = drainInboundStream(s.stream)
obj, err = drainInboundWriteStream(s.stream)
s.stream = nil
if err == io.EOF {
// This is unexpected - we got an error on Send(), but not on Recv().
Expand All @@ -2854,7 +2871,7 @@ func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(buf []byte, offset int64

if finishWrite {
s.stream.CloseSend()
obj, err = drainInboundStream(s.stream)
obj, err = drainInboundWriteStream(s.stream)
s.stream = nil
if err == io.EOF {
err = nil
Expand Down
Loading