Skip to content

Commit

Permalink
[otelarrowreceiver] New shutdown testing (open-telemetry#34236)
Browse files Browse the repository at this point in the history
**Description:** Several test flakes were identified in
open-telemetry#34179.
This improves the tests and adds one new test. Adds one new
context-canceled return path, fixing a potential goroutine leak covered
by new testing.

**Link to tracking Issue:** Fixes
open-telemetry/otel-arrow#237

**Testing:** 

- For the test change in
`receiver/otelarrowreceiver/internal/arrow/arrow_test.go` note that the
test now allows out-of-order responses, which is explicitly a feature.
Out-of-order responses may have resulted from performance changes in
Arrow v17.
- For the context handling in
`receiver/otelarrowreceiver/internal/arrow/arrow.go` this is needed to
satisfy the HalfOpen test introduced here. A comment in the code
explains how previously gRPC stream resources could leak
- The ` TestOTelArrowShutdown` test improves readability. It had been
using a test helper designed originally for the OTLP (non-streaming)
test, which made it convoluted here. A sync.Once has been eliminated.
- The new test `TestOTelArrowHalfOpenShutdown` exercises the
"adversarial" condition where a client never calls `Recv()` on their
stream handle. The new context handling is required to pass the test w/o
goroutine leaks.

---------

Co-authored-by: Matthew Wear <[email protected]>
  • Loading branch information
jmacd and mwear authored Jul 26, 2024
1 parent 79c0bf1 commit f9cd72a
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 36 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-shutdowntest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential goroutine leak when in stream-shutdown.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34236]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
13 changes: 9 additions & 4 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,19 @@ func (id *inFlightData) consumeDone(ctx context.Context, consumeErrPtr *error) {
id.span.SetStatus(otelcodes.Error, retErr.Error())
}

id.replyToCaller(retErr)
id.replyToCaller(ctx, retErr)
id.anyDone(ctx)
}

func (id *inFlightData) replyToCaller(callerErr error) {
id.pendingCh <- batchResp{
func (id *inFlightData) replyToCaller(ctx context.Context, callerErr error) {
select {
case id.pendingCh <- batchResp{
id: id.batchID,
err: callerErr,
}:
// OK: Responded.
case <-ctx.Done():
// OK: Never responded due to cancelation.
}
}

Expand Down Expand Up @@ -585,7 +590,7 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
var authErr error
inflightCtx, authErr = r.authServer.Authenticate(inflightCtx, authHdrs)
if authErr != nil {
flight.replyToCaller(status.Error(codes.Unauthenticated, authErr.Error()))
flight.replyToCaller(inflightCtx, status.Error(codes.Unauthenticated, authErr.Error()))
return nil
}
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,10 +1188,11 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) {
nil,
}

var recvBatches []*arrowpb.BatchStatus
recvBatches := make([]*arrowpb.BatchStatus, len(expectData))

ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).DoAndReturn(func(batch *arrowpb.BatchStatus) error {
recvBatches = append(recvBatches, batch)
require.Nil(t, recvBatches[batch.BatchId])
recvBatches[batch.BatchId] = batch
return nil
})

Expand Down
172 changes: 142 additions & 30 deletions receiver/otelarrowreceiver/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
Expand Down Expand Up @@ -316,7 +317,6 @@ func TestOTelArrowShutdown(t *testing.T) {
cfg.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{},
}
// Note that keepalive parameters are set very high
if !cooperative {
cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAge = time.Second
cfg.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace = 5 * time.Second
Expand All @@ -338,36 +338,48 @@ func TestOTelArrowShutdown(t *testing.T) {

conn, err := grpc.NewClient(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)
defer func() {
require.NoError(t, conn.Close())
}()

client := arrowpb.NewArrowTracesServiceClient(conn)
stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true))
require.NoError(t, err)
producer := arrowRecord.NewProducer()
defer func() {
require.NoError(t, conn.Close())
}()

start := time.Now()
var once sync.Once

// Send traces to the receiver until we signal via done channel, and then
// send one more trace after that.
go generateTraces(func(td ptrace.Traces) {
if time.Since(start) > 5*time.Second {
once.Do(func() {
if cooperative {
require.NoError(t, stream.CloseSend())
}
})
return

// Send traces to the receiver until we signal.
go func() {
for time.Since(start) < 5*time.Second {
td := testdata.GenerateTraces(1)
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
require.NoError(t, batchErr)
require.NoError(t, stream.Send(batch))
}
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
require.NoError(t, batchErr)
require.NoError(t, stream.Send(batch))
}, doneSignalGrpc)

if cooperative {
require.NoError(t, stream.CloseSend())
}
}()

var recvWG sync.WaitGroup
recvWG.Add(1)

// Receive batch responses. See the comment on
// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
// to explain why this must be done. We do not use the
// return value, this just avoids leaking the stream context,
// which can otherwise hang this test.
go func() {
defer recvWG.Done()
for {
if _, recvErr := stream.Recv(); recvErr == nil {
continue
}
break
}
}()

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
Expand All @@ -380,18 +392,14 @@ func TestOTelArrowShutdown(t *testing.T) {
err = r.Shutdown(context.Background())
assert.NoError(t, err)

// recvWG ensures the stream has been read before the test exits.
recvWG.Wait()

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpanCount()

// Now signal to generateTraces to exit the main generation loop, then send
// one more trace and stop.
doneSignalGrpc <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc

// The last, additional trace should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpanCount())
Expand All @@ -413,6 +421,7 @@ func TestOTelArrowShutdown(t *testing.T) {
}
}

// generateTraces originates from the OTLP receiver "standard" shutdown test.
func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
Expand Down Expand Up @@ -792,3 +801,106 @@ func TestConcurrentArrowReceiver(t *testing.T) {
require.Equal(t, numStreams, counts[i])
}
}

// TestOTelArrowHalfOpenShutdown exercises a known condition in which Shutdown
// can't succeed until the stream is canceled by an external signal.
func TestOTelArrowHalfOpenShutdown(t *testing.T) {
ctx, testCancel := context.WithCancel(context.Background())
defer testCancel()

endpointGrpc := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{},
}
// No keepalive parameters are set
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
set := receivertest.NewNopSettings()

set.ID = testReceiverID
r, err := NewFactory().CreateTracesReceiver(
ctx,
set,
cfg,
nextSink)
require.NoError(t, err)
require.NotNil(t, r)
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.NewClient(endpointGrpc, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer func() {
require.NoError(t, conn.Close())
}()

client := arrowpb.NewArrowTracesServiceClient(conn)
stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true))
require.NoError(t, err)
producer := arrowRecord.NewProducer()

start := time.Now()

// Send traces to the receiver until we signal.
go func() {
for time.Since(start) < 5*time.Second {
select {
case <-ctx.Done():
return
default:
}
td := testdata.GenerateTraces(1)
batch, batchErr := producer.BatchArrowRecordsFromTraces(td)
require.NoError(t, batchErr)

sendErr := stream.Send(batch)
select {
case <-ctx.Done():
if sendErr != nil {
require.ErrorIs(t, sendErr, io.EOF)
}
return
default:
require.NoError(t, sendErr)
}
}
}()

// Do not receive batch responses.

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpanCount() > 0
}, time.Second, 10*time.Millisecond)

// Let more load pile up.
time.Sleep(time.Second)

// The receiver has wedged itself in a call to Send() that is blocked
// and there is not a graceful way to recover. Schedule an operation
// that will unblock it un-gracefully.
go func() {
// Without this cancel, the test hangs.
time.Sleep(3 * time.Second)
testCancel()
}()

// Now shutdown the receiver, while continuing sending traces to it.
err = r.Shutdown(context.Background())
assert.NoError(t, err)

// Ensure that calls to Recv() get canceled
for {
_, err := stream.Recv()
if err == nil {
continue
}
status, ok := status.FromError(err)
require.True(t, ok, "is a status error")
require.Equal(t, codes.Canceled, status.Code())
break
}
}

0 comments on commit f9cd72a

Please sign in to comment.