Skip to content

Commit

Permalink
colexec: clean up some tests a bit
Browse files Browse the repository at this point in the history
The change in the previous commit made sure that we now close all
closers in the tests (if the closer registry is specified), so we
can clean up some logic around that in tests. The main idea is rather
than rely on the RunTests harness to try to close the necessary
operators, each test will now be responsible for doing so if it might
create a closer.

Release note: None
  • Loading branch information
yuzefovich committed Dec 19, 2024
1 parent 1669fad commit 64a6493
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 41 deletions.
19 changes: 0 additions & 19 deletions pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,21 +432,6 @@ func RunTestsWithOrderedCols(
"non-nulls in the input tuples, we expect for all nulls injection to "+
"change the output")
}
closeIfCloser(t, originalOp)
closeIfCloser(t, opWithNulls)
}
}

// closeIfCloser is a testing utility function that checks whether op is a
// colexecop.Closer and closes it if so.
//
// RunTests harness needs to do that once it is done with op. In non-test
// setting, the closing happens at the end of the query execution.
func closeIfCloser(t *testing.T, op colexecop.Operator) {
if c, ok := op.(colexecop.Closer); ok {
if err := c.Close(context.Background()); err != nil {
t.Fatal(err)
}
}
}

Expand Down Expand Up @@ -543,7 +528,6 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
errorHandler(err)
}
}
closeIfCloser(t, op)
})

if !skipVerifySelAndNullsResets {
Expand Down Expand Up @@ -571,8 +555,6 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
if err != nil {
t.Fatal(err)
}
// We might short-circuit, so defer the closing of the operator.
defer closeIfCloser(t, op)
op.Init(ctx)
// NOTE: this test makes sense only if the operator returns two
// non-zero length batches (if not, we short-circuit the test since
Expand Down Expand Up @@ -654,7 +636,6 @@ func RunTestsWithoutAllNullsInjectionWithErrorHandler(
}); err != nil {
errorHandler(err)
}
closeIfCloser(t, op)
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/external_distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func TestExternalDistinct(t *testing.T) {
require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers)
return distinct, err
})
// Close all closers manually (in production this is done on the
// flow cleanup).
closerRegistry.Close(ctx)
closerRegistry.Reset()
for i, sem := range semsToCheck {
require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ func TestExternalHashJoiner(t *testing.T) {
log.Infof(ctx, "spillForced=%t/numRepartitions=%d/%s/delegateFDAcquisitions=%t",
spillForced, numForcedRepartitions, tc.description, delegateFDAcquisitions)
var semsToCheck []semaphore.Semaphore
oldSkipAllNullsInjection := tc.skipAllNullsInjection
if !tc.onExpr.Empty() {
// When we have ON expression, there might be other operators (like
// selections) on top of the external hash joiner in
// diskSpiller.diskBackedOp chain. This will not allow for Close()
// call to propagate to the external hash joiner, so we will skip
// allNullsInjection test for now.
tc.skipAllNullsInjection = true
}
runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, error) {
numOldClosers := closerRegistry.NumClosers()
sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions)
Expand All @@ -98,7 +89,6 @@ func TestExternalHashJoiner(t *testing.T) {
for i, sem := range semsToCheck {
require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i)
}
tc.skipAllNullsInjection = oldSkipAllNullsInjection
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func TestExternalSort(t *testing.T) {
require.Equal(t, 2, closerRegistry.NumClosers()-numOldClosers)
return sorter, err
})
// Close all closers manually (in production this is done on the
// flow cleanup).
closerRegistry.Close(ctx)
closerRegistry.Reset()
for i, sem := range semsToCheck {
require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i)
}
Expand Down Expand Up @@ -167,18 +171,6 @@ func TestExternalSortRandomized(t *testing.T) {
delegateFDAcquisition := rng.Float64() < 0.5
name := fmt.Sprintf("%s/nCols=%d/nOrderingCols=%d/delegateFDAcquisition=%t/k=%d", namePrefix, nCols, nOrderingCols, delegateFDAcquisition, k)
log.Infof(ctx, "%s", name)
// Unfortunately, there is currently no better way to check that a
// sorter does not have leftover file descriptors other than appending
// each semaphore used to this slice on construction. This is because
// some tests don't fully drain the input, making intercepting the
// sorter.Close() method not a useful option, since it is impossible
// to check between an expected case where more than 0 FDs are open
// (e.g. in verifySelAndNullResets, where the sorter is not fully
// drained so Close must be called explicitly) and an unexpected one.
// These cases happen during normal execution when a limit is
// satisfied, but flows will call Close explicitly on Cleanup.
// TODO(asubiotto): Not implemented yet, currently we rely on the
// flow tracking open FDs and releasing any leftovers.
var semsToCheck []semaphore.Semaphore
tups, expected, ordCols := generateRandomDataForTestSort(rng, nTups, nCols, nOrderingCols, 0 /* matchLen */)
if k > 0 {
Expand All @@ -200,6 +192,10 @@ func TestExternalSortRandomized(t *testing.T) {
&monitorRegistry, &closerRegistry,
)
})
// Close all closers manually (in production this is done on
// the flow cleanup).
closerRegistry.Close(ctx)
closerRegistry.Reset()
for i, sem := range semsToCheck {
require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,8 @@ func TestHashJoiner(t *testing.T) {
}
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
rng, _ := randutil.NewTestRand()

for _, tcs := range [][]*joinTestCase{getHJTestCases(), getMJTestCases()} {
Expand All @@ -1026,6 +1028,7 @@ func TestHashJoiner(t *testing.T) {
Spec: spec,
Inputs: colexectestutils.MakeInputs(sources),
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
args.TestingKnobs.DiskSpillingDisabled = true
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
Expand Down Expand Up @@ -1174,6 +1177,8 @@ func TestHashJoinerProjection(t *testing.T) {
}
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)

leftTypes := []*types.T{types.Bool, types.Int, types.Bytes}
rightTypes := []*types.T{types.Int, types.Float, types.Decimal}
Expand Down Expand Up @@ -1208,6 +1213,7 @@ func TestHashJoinerProjection(t *testing.T) {
Spec: spec,
Inputs: []colexecargs.OpWithMetaInfo{{Root: leftSource}, {Root: rightSource}},
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
args.TestingKnobs.DiskSpillingDisabled = true
hjOp, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,8 @@ func TestMergeJoiner(t *testing.T) {
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
for _, tc := range getMJTestCases() {
for _, tc := range tc.mutateTypes() {
tc.init()
Expand Down Expand Up @@ -1702,6 +1704,7 @@ func TestMergeJoiner(t *testing.T) {
DiskQueueCfg: queueCfg,
FDSemaphore: colexecop.NewTestingSemaphore(mjFDLimit),
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colflow/vectorized_flow_space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
}
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)

oneInput := []execinfrapb.InputSyncSpec{
{ColumnTypes: []*types.T{types.Int}},
Expand Down Expand Up @@ -228,6 +230,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
StreamingMemAccount: &acc,
FDSemaphore: colexecop.NewTestingSemaphore(256),
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
var (
result *colexecargs.NewColOperatorResult
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sem/eval/eval_test/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func TestEval(t *testing.T) {
rng, _ := randutil.NewTestRand()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
walk(t, func(t *testing.T, d *datadriven.TestData) string {
st := cluster.MakeTestingClusterSettings()
flowCtx := &execinfra.FlowCtx{
Expand Down Expand Up @@ -191,6 +193,7 @@ func TestEval(t *testing.T) {
// the row execution engine.
ProcessorConstructor: rowexec.NewProcessor,
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
// If the expression is of the boolean type, in 50% cases we'll
// additionally run it as a filter (i.e. as a "selection" operator
Expand Down

0 comments on commit 64a6493

Please sign in to comment.