From 0c4cd45ca892d09a94192a95f36dcde0cad27fd1 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 13 Dec 2024 15:43:17 -0800 Subject: [PATCH] colexec: fix Reset behavior of hash group joiner Previously, the hash group joiner inherited the reset behavior from `TwoInputInitHelper` that it embeds, which could leave the hash joiner and the hash aggregator encapsulated in the hash group joiner in the non-reset state. As a result, if the external hash group joiner spilled to disk, we could get incorrect results. This is now fixed. There is no explicit test added for this because the change in the following commit (where `ChildCount` on the disk spiller now returns one less) will exercise this code path (via "reusing after reset" part of the RunTests harness). This is also an experimental feature, so there is no release note. Release note: None --- pkg/sql/colexec/hash_group_joiner.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/sql/colexec/hash_group_joiner.go b/pkg/sql/colexec/hash_group_joiner.go index 04041365efab..d8dcf4cff5ad 100644 --- a/pkg/sql/colexec/hash_group_joiner.go +++ b/pkg/sql/colexec/hash_group_joiner.go @@ -29,6 +29,7 @@ type hashGroupJoiner struct { var _ colexecop.BufferingInMemoryOperator = &hashGroupJoiner{} var _ colexecop.Closer = &hashGroupJoiner{} +var _ colexecop.ResettableOperator = &hashGroupJoiner{} // NewHashGroupJoiner creates a new hash group-join operator. func NewHashGroupJoiner( @@ -144,6 +145,13 @@ func (h *hashGroupJoiner) ReleaseAfterExport(input colexecop.Operator) { h.hjLeftSource.sq = nil } +// Reset implements the colexecop.Resetter interface. +func (h *hashGroupJoiner) Reset(ctx context.Context) { + h.TwoInputInitHelper.Reset(ctx) + h.hj.(colexecop.ResettableOperator).Reset(ctx) + h.ha.Reset(ctx) +} + // Close implements the colexecop.Closer interface. func (h *hashGroupJoiner) Close(ctx context.Context) error { lastErr := h.ha.Close(ctx) @@ -165,6 +173,7 @@ type copyingOperator struct { } var _ colexecop.ClosableOperator = ©ingOperator{} +var _ colexecop.ResettableOperator = ©ingOperator{} func newCopyingOperator( input colexecop.Operator, args *colexecutils.NewSpillingQueueArgs, @@ -183,6 +192,15 @@ func (c *copyingOperator) Next() coldata.Batch { return b } +// Reset implements the colexecop.Resetter interface. +func (c *copyingOperator) Reset(ctx context.Context) { + if r, ok := c.Input.(colexecop.Resetter); ok { + r.Reset(ctx) + } + c.sq.Reset(ctx) + c.zeroBatchEnqueued = false +} + // Close implements the colexecop.Closer interface. func (c *copyingOperator) Close(ctx context.Context) error { if c.sq == nil {