diff --git a/pkg/sql/colexec/hash_group_joiner.go b/pkg/sql/colexec/hash_group_joiner.go index 04041365efab..d8dcf4cff5ad 100644 --- a/pkg/sql/colexec/hash_group_joiner.go +++ b/pkg/sql/colexec/hash_group_joiner.go @@ -29,6 +29,7 @@ type hashGroupJoiner struct { var _ colexecop.BufferingInMemoryOperator = &hashGroupJoiner{} var _ colexecop.Closer = &hashGroupJoiner{} +var _ colexecop.ResettableOperator = &hashGroupJoiner{} // NewHashGroupJoiner creates a new hash group-join operator. func NewHashGroupJoiner( @@ -144,6 +145,13 @@ func (h *hashGroupJoiner) ReleaseAfterExport(input colexecop.Operator) { h.hjLeftSource.sq = nil } +// Reset implements the colexecop.Resetter interface. +func (h *hashGroupJoiner) Reset(ctx context.Context) { + h.TwoInputInitHelper.Reset(ctx) + h.hj.(colexecop.ResettableOperator).Reset(ctx) + h.ha.Reset(ctx) +} + // Close implements the colexecop.Closer interface. func (h *hashGroupJoiner) Close(ctx context.Context) error { lastErr := h.ha.Close(ctx) @@ -165,6 +173,7 @@ type copyingOperator struct { } var _ colexecop.ClosableOperator = ©ingOperator{} +var _ colexecop.ResettableOperator = ©ingOperator{} func newCopyingOperator( input colexecop.Operator, args *colexecutils.NewSpillingQueueArgs, @@ -183,6 +192,15 @@ func (c *copyingOperator) Next() coldata.Batch { return b } +// Reset implements the colexecop.Resetter interface. +func (c *copyingOperator) Reset(ctx context.Context) { + if r, ok := c.Input.(colexecop.Resetter); ok { + r.Reset(ctx) + } + c.sq.Reset(ctx) + c.zeroBatchEnqueued = false +} + // Close implements the colexecop.Closer interface. func (c *copyingOperator) Close(ctx context.Context) error { if c.sq == nil {