Skip to content

Commit

Permalink
colexec: fix Reset behavior of hash group joiner
Browse files Browse the repository at this point in the history
Previously, the hash group joiner inherited the reset behavior from
`TwoInputInitHelper` that it embeds, which could leave the hash joiner
and the hash aggregator encapsulated in the hash group joiner in the
non-reset state. As a result, if the external hash group joiner spilled
to disk, we could get incorrect results. This is now fixed. There is no
explicit test added for this because the change in the following commit
(where `ChildCount` on the disk spiller now returns one less) will
exercise this code path (via "reusing after reset" part of the RunTests
harness). This is also an experimental feature, so there is no release
note.

Release note: None
  • Loading branch information
yuzefovich committed Dec 19, 2024
1 parent 64a6493 commit 0c4cd45
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/sql/colexec/hash_group_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type hashGroupJoiner struct {

var _ colexecop.BufferingInMemoryOperator = &hashGroupJoiner{}
var _ colexecop.Closer = &hashGroupJoiner{}
var _ colexecop.ResettableOperator = &hashGroupJoiner{}

// NewHashGroupJoiner creates a new hash group-join operator.
func NewHashGroupJoiner(
Expand Down Expand Up @@ -144,6 +145,13 @@ func (h *hashGroupJoiner) ReleaseAfterExport(input colexecop.Operator) {
h.hjLeftSource.sq = nil
}

// Reset implements the colexecop.Resetter interface.
func (h *hashGroupJoiner) Reset(ctx context.Context) {
h.TwoInputInitHelper.Reset(ctx)
h.hj.(colexecop.ResettableOperator).Reset(ctx)
h.ha.Reset(ctx)
}

// Close implements the colexecop.Closer interface.
func (h *hashGroupJoiner) Close(ctx context.Context) error {
lastErr := h.ha.Close(ctx)
Expand All @@ -165,6 +173,7 @@ type copyingOperator struct {
}

var _ colexecop.ClosableOperator = &copyingOperator{}
var _ colexecop.ResettableOperator = &copyingOperator{}

func newCopyingOperator(
input colexecop.Operator, args *colexecutils.NewSpillingQueueArgs,
Expand All @@ -183,6 +192,15 @@ func (c *copyingOperator) Next() coldata.Batch {
return b
}

// Reset implements the colexecop.Resetter interface.
func (c *copyingOperator) Reset(ctx context.Context) {
if r, ok := c.Input.(colexecop.Resetter); ok {
r.Reset(ctx)
}
c.sq.Reset(ctx)
c.zeroBatchEnqueued = false
}

// Close implements the colexecop.Closer interface.
func (c *copyingOperator) Close(ctx context.Context) error {
if c.sq == nil {
Expand Down

0 comments on commit 0c4cd45

Please sign in to comment.