diff --git a/pkg/sql/sqlstats/insights/BUILD.bazel b/pkg/sql/sqlstats/insights/BUILD.bazel index 26cfaf070618..fe161b45623b 100644 --- a/pkg/sql/sqlstats/insights/BUILD.bazel +++ b/pkg/sql/sqlstats/insights/BUILD.bazel @@ -50,13 +50,11 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/appstatspb", "//pkg/sql/clusterunique", - "//pkg/testutils", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/stop", "//pkg/util/uint128", "//pkg/util/uuid", - "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/sqlstats/insights/ingester.go b/pkg/sql/sqlstats/insights/ingester.go index 7352d7f3dbe5..64c342c10e18 100644 --- a/pkg/sql/sqlstats/insights/ingester.go +++ b/pkg/sql/sqlstats/insights/ingester.go @@ -38,8 +38,9 @@ type ConcurrentBufferIngester struct { eventBufferCh chan eventBufChPayload registry *lockingRegistry - running uint64 clearRegistry uint32 + + closeCh chan struct{} } type eventBufChPayload struct { @@ -89,7 +90,6 @@ func (i *ConcurrentBufferIngester) Start( // This task pulls buffers from the channel and forwards them along to the // underlying registry. _ = stopper.RunAsyncTask(ctx, "insights-ingester", func(ctx context.Context) { - atomic.StoreUint64(&i.running, 1) for { select { @@ -100,7 +100,7 @@ func (i *ConcurrentBufferIngester) Start( } eventBufferPool.Put(payload.events) case <-stopper.ShouldQuiesce(): - atomic.StoreUint64(&i.running, 0) + close(i.closeCh) return } } @@ -188,6 +188,7 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *ConcurrentBufferIng // adjusting our carrying capacity. eventBufferCh: make(chan eventBufChPayload, 1), registry: registry, + closeCh: make(chan struct{}), } i.guard.eventBuffer = eventBufferPool.Get().(*eventBuffer) @@ -202,11 +203,12 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *ConcurrentBufferIng atomic.StoreUint32(&i.clearRegistry, 0) }() } - if atomic.LoadUint64(&i.running) == 1 { - i.eventBufferCh <- eventBufChPayload{ - clearRegistry: clearRegistry, - events: i.guard.eventBuffer, - } + select { + case i.eventBufferCh <- eventBufChPayload{ + clearRegistry: clearRegistry, + events: i.guard.eventBuffer, + }: + case <-i.closeCh: } i.guard.eventBuffer = eventBufferPool.Get().(*eventBuffer) }, diff --git a/pkg/sql/sqlstats/insights/ingester_test.go b/pkg/sql/sqlstats/insights/ingester_test.go index 726923f07878..ec36f28b4480 100644 --- a/pkg/sql/sqlstats/insights/ingester_test.go +++ b/pkg/sql/sqlstats/insights/ingester_test.go @@ -12,19 +12,17 @@ package insights import ( "context" - "sync/atomic" + "sync" "testing" "time" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -127,56 +125,53 @@ func TestIngester_Clear(t *testing.T) { defer log.Scope(t).Close(t) settings := cluster.MakeTestingClusterSettings() - t.Run("clears buffer", func(t *testing.T) { - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - store := newStore(settings) - ingester := newConcurrentBufferIngester( - newRegistry(settings, &fakeDetector{ - stubEnabled: true, - stubIsSlow: true, - }, store)) - // Start the ingester and wait for its async task to start. - // Disable timed flushes, so we can guarantee the presence of data - // in the buffer later on. - ingester.Start(ctx, stopper, WithoutTimedFlush()) - testutils.SucceedsSoon(t, func() error { - if !(atomic.LoadUint64(&ingester.running) == 1) { - return errors.New("ingester not yet started") - } - return nil - }) - // Fill the ingester's buffer with some data. This sets us up to - // call Clear() with guaranteed data in the buffer, so we can assert - // afterward that it's been cleared. - ingesterObservations := []testEvent{ - {sessionID: 1, statementID: 10}, - {sessionID: 2, statementID: 20}, - {sessionID: 1, statementID: 11}, - {sessionID: 2, statementID: 21}, - {sessionID: 1, transactionID: 100}, - {sessionID: 2, transactionID: 200}, - } - for _, o := range ingesterObservations { - if o.statementID != 0 { - ingester.ObserveStatement(o.SessionID(), &Statement{ID: o.StatementID()}) - } else { - ingester.ObserveTransaction(o.SessionID(), &Transaction{ID: o.TransactionID()}) - } + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + store := newStore(settings) + ingester := newConcurrentBufferIngester( + newRegistry(settings, &fakeDetector{ + stubEnabled: true, + stubIsSlow: true, + }, store)) + + // Fill the ingester's buffer with some data. This sets us up to + // call Clear() with guaranteed data in the buffer, so we can assert + // afterward that it's been cleared. + ingesterObservations := []testEvent{ + {sessionID: 1, statementID: 10}, + {sessionID: 2, statementID: 20}, + {sessionID: 1, statementID: 11}, + {sessionID: 2, statementID: 21}, + {sessionID: 1, transactionID: 100}, + {sessionID: 2, transactionID: 200}, + } + for _, o := range ingesterObservations { + if o.statementID != 0 { + ingester.ObserveStatement(o.SessionID(), &Statement{ID: o.StatementID()}) + } else { + ingester.ObserveTransaction(o.SessionID(), &Transaction{ID: o.TransactionID()}) } - empty := event{} - require.NotEqual(t, empty, ingester.guard.eventBuffer[0]) - // Now, call Clear() to verify it clears the buffer. - // Use SucceedsSoon for assertions, since the operation is async. - ingester.Clear() - testutils.SucceedsSoon(t, func() error { - if ingester.guard.eventBuffer[0] != empty { - return errors.New("eventBuffer not empty") - } - return nil - }) - }) + } + empty := event{} + require.Empty(t, ingester.eventBufferCh) + require.NotEqual(t, empty, ingester.guard.eventBuffer[0]) + // Now, call Clear() to verify it clears the buffer. This operation + // is synchronous here because `ingester.eventBufferCh` has a buffer + // of 1 so the `Clear` operation can write to it without requiring a + // corresponding insights ingester task running. We just check to + // make sure `Clear` results in something getting posted to the + // channel. + ingester.Clear() + require.Equal(t, empty, ingester.guard.eventBuffer[0]) + require.NotEmpty(t, ingester.eventBufferCh) + recv := <-ingester.eventBufferCh + for i := range ingesterObservations { + require.NotEqual(t, empty, recv.events[i]) + } + // events 0-5 contain the payloads above, rest are empty + require.Equal(t, empty, recv.events[6]) } func TestIngester_Disabled(t *testing.T) { @@ -193,6 +188,8 @@ func TestIngester_Disabled(t *testing.T) { } func TestIngester_DoesNotBlockWhenReceivingManyObservationsAfterShutdown(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) // We have seen some tests hanging in CI, implicating this ingester in // their goroutine dumps. We reproduce what we think is happening here, // observing high volumes of SQL traffic after our consumer has shut down. @@ -246,3 +243,40 @@ func (s testEvent) TransactionID() uuid.UUID { func (s testEvent) StatementID() clusterunique.ID { return clusterunique.ID{Uint128: uint128.FromInts(0, s.statementID)} } + +// We had an issue with the insights ingester flush task being blocked +// forever on shutdown. This was because of a bug where the order of +// operations during stopper quiescence could cause `ForceSync()` to be +// triggered twice without an intervening ingest operation. The second +// `ForceSync()` would block forever because the buffer channel has a +// capacity of 1. +func TestIngesterBlockedForceSync(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + st := cluster.MakeTestingClusterSettings() + registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st)) + ingester := newConcurrentBufferIngester(registry) + + // We queue up a bunch of sync operations because it's unclear how + // many will proceed between the `Start()` and `Stop()` calls below. + ingester.guard.ForceSync() + + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ingester.guard.ForceSync() + }() + } + + ingester.Start(ctx, stopper, WithoutTimedFlush()) + stopper.Stop(ctx) + <-stopper.IsStopped() + wg.Wait() +}