Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
128713: sqlstats: add close chan to buffer ingester r=xinhaoz a=dhartunian

The buffer ingester could get stuck during closing because it would independently evaluate a `running` bool and then try to write to a channel with a buffer of 1. This channel could have accumulated a payload prior to the ingester shutting down and would then block forever.

Instead of using the boolean, the flush operation now selects on a channel that we close on shutdown. This ensure that no matter how many extra flushes we have blocked, they will all get cancelled.

Resolves: cockroachdb#128453
Epic: None

Release note: None

Co-authored-by: David Hartunian <[email protected]>
  • Loading branch information
craig[bot] and dhartunian committed Aug 13, 2024
2 parents a8b016c + 6e1ead5 commit 9765a2c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 62 deletions.
2 changes: 0 additions & 2 deletions pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/appstatspb",
"//pkg/sql/clusterunique",
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Expand Down
18 changes: 10 additions & 8 deletions pkg/sql/sqlstats/insights/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type ConcurrentBufferIngester struct {

eventBufferCh chan eventBufChPayload
registry *lockingRegistry
running uint64
clearRegistry uint32

closeCh chan struct{}
}

type eventBufChPayload struct {
Expand Down Expand Up @@ -89,7 +90,6 @@ func (i *ConcurrentBufferIngester) Start(
// This task pulls buffers from the channel and forwards them along to the
// underlying registry.
_ = stopper.RunAsyncTask(ctx, "insights-ingester", func(ctx context.Context) {
atomic.StoreUint64(&i.running, 1)

for {
select {
Expand All @@ -100,7 +100,7 @@ func (i *ConcurrentBufferIngester) Start(
}
eventBufferPool.Put(payload.events)
case <-stopper.ShouldQuiesce():
atomic.StoreUint64(&i.running, 0)
close(i.closeCh)
return
}
}
Expand Down Expand Up @@ -188,6 +188,7 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *ConcurrentBufferIng
// adjusting our carrying capacity.
eventBufferCh: make(chan eventBufChPayload, 1),
registry: registry,
closeCh: make(chan struct{}),
}

i.guard.eventBuffer = eventBufferPool.Get().(*eventBuffer)
Expand All @@ -202,11 +203,12 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *ConcurrentBufferIng
atomic.StoreUint32(&i.clearRegistry, 0)
}()
}
if atomic.LoadUint64(&i.running) == 1 {
i.eventBufferCh <- eventBufChPayload{
clearRegistry: clearRegistry,
events: i.guard.eventBuffer,
}
select {
case i.eventBufferCh <- eventBufChPayload{
clearRegistry: clearRegistry,
events: i.guard.eventBuffer,
}:
case <-i.closeCh:
}
i.guard.eventBuffer = eventBufferPool.Get().(*eventBuffer)
},
Expand Down
138 changes: 86 additions & 52 deletions pkg/sql/sqlstats/insights/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@ package insights

import (
"context"
"sync/atomic"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -127,56 +125,53 @@ func TestIngester_Clear(t *testing.T) {
defer log.Scope(t).Close(t)
settings := cluster.MakeTestingClusterSettings()

t.Run("clears buffer", func(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
store := newStore(settings)
ingester := newConcurrentBufferIngester(
newRegistry(settings, &fakeDetector{
stubEnabled: true,
stubIsSlow: true,
}, store))
// Start the ingester and wait for its async task to start.
// Disable timed flushes, so we can guarantee the presence of data
// in the buffer later on.
ingester.Start(ctx, stopper, WithoutTimedFlush())
testutils.SucceedsSoon(t, func() error {
if !(atomic.LoadUint64(&ingester.running) == 1) {
return errors.New("ingester not yet started")
}
return nil
})
// Fill the ingester's buffer with some data. This sets us up to
// call Clear() with guaranteed data in the buffer, so we can assert
// afterward that it's been cleared.
ingesterObservations := []testEvent{
{sessionID: 1, statementID: 10},
{sessionID: 2, statementID: 20},
{sessionID: 1, statementID: 11},
{sessionID: 2, statementID: 21},
{sessionID: 1, transactionID: 100},
{sessionID: 2, transactionID: 200},
}
for _, o := range ingesterObservations {
if o.statementID != 0 {
ingester.ObserveStatement(o.SessionID(), &Statement{ID: o.StatementID()})
} else {
ingester.ObserveTransaction(o.SessionID(), &Transaction{ID: o.TransactionID()})
}
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

store := newStore(settings)
ingester := newConcurrentBufferIngester(
newRegistry(settings, &fakeDetector{
stubEnabled: true,
stubIsSlow: true,
}, store))

// Fill the ingester's buffer with some data. This sets us up to
// call Clear() with guaranteed data in the buffer, so we can assert
// afterward that it's been cleared.
ingesterObservations := []testEvent{
{sessionID: 1, statementID: 10},
{sessionID: 2, statementID: 20},
{sessionID: 1, statementID: 11},
{sessionID: 2, statementID: 21},
{sessionID: 1, transactionID: 100},
{sessionID: 2, transactionID: 200},
}
for _, o := range ingesterObservations {
if o.statementID != 0 {
ingester.ObserveStatement(o.SessionID(), &Statement{ID: o.StatementID()})
} else {
ingester.ObserveTransaction(o.SessionID(), &Transaction{ID: o.TransactionID()})
}
empty := event{}
require.NotEqual(t, empty, ingester.guard.eventBuffer[0])
// Now, call Clear() to verify it clears the buffer.
// Use SucceedsSoon for assertions, since the operation is async.
ingester.Clear()
testutils.SucceedsSoon(t, func() error {
if ingester.guard.eventBuffer[0] != empty {
return errors.New("eventBuffer not empty")
}
return nil
})
})
}
empty := event{}
require.Empty(t, ingester.eventBufferCh)
require.NotEqual(t, empty, ingester.guard.eventBuffer[0])
// Now, call Clear() to verify it clears the buffer. This operation
// is synchronous here because `ingester.eventBufferCh` has a buffer
// of 1 so the `Clear` operation can write to it without requiring a
// corresponding insights ingester task running. We just check to
// make sure `Clear` results in something getting posted to the
// channel.
ingester.Clear()
require.Equal(t, empty, ingester.guard.eventBuffer[0])
require.NotEmpty(t, ingester.eventBufferCh)
recv := <-ingester.eventBufferCh
for i := range ingesterObservations {
require.NotEqual(t, empty, recv.events[i])
}
// events 0-5 contain the payloads above, rest are empty
require.Equal(t, empty, recv.events[6])
}

func TestIngester_Disabled(t *testing.T) {
Expand All @@ -193,6 +188,8 @@ func TestIngester_Disabled(t *testing.T) {
}

func TestIngester_DoesNotBlockWhenReceivingManyObservationsAfterShutdown(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// We have seen some tests hanging in CI, implicating this ingester in
// their goroutine dumps. We reproduce what we think is happening here,
// observing high volumes of SQL traffic after our consumer has shut down.
Expand Down Expand Up @@ -246,3 +243,40 @@ func (s testEvent) TransactionID() uuid.UUID {
func (s testEvent) StatementID() clusterunique.ID {
return clusterunique.ID{Uint128: uint128.FromInts(0, s.statementID)}
}

// We had an issue with the insights ingester flush task being blocked
// forever on shutdown. This was because of a bug where the order of
// operations during stopper quiescence could cause `ForceSync()` to be
// triggered twice without an intervening ingest operation. The second
// `ForceSync()` would block forever because the buffer channel has a
// capacity of 1.
func TestIngesterBlockedForceSync(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

st := cluster.MakeTestingClusterSettings()
registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st))
ingester := newConcurrentBufferIngester(registry)

// We queue up a bunch of sync operations because it's unclear how
// many will proceed between the `Start()` and `Stop()` calls below.
ingester.guard.ForceSync()

wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ingester.guard.ForceSync()
}()
}

ingester.Start(ctx, stopper, WithoutTimedFlush())
stopper.Stop(ctx)
<-stopper.IsStopped()
wg.Wait()
}

0 comments on commit 9765a2c

Please sign in to comment.