diff --git a/velox/exec/NestedLoopJoinProbe.cpp b/velox/exec/NestedLoopJoinProbe.cpp index cf2656b4924f..58349d0217d6 100644 --- a/velox/exec/NestedLoopJoinProbe.cpp +++ b/velox/exec/NestedLoopJoinProbe.cpp @@ -207,9 +207,11 @@ bool NestedLoopJoinProbe::getBuildData(ContinueFuture* future) { } buildVectors_ = std::move(buildData); - if (buildVectors_->empty()) { - buildSideEmpty_ = true; + + for (const auto& build : buildVectors_.value()) { + buildRowCount_ += build->size(); } + buildSideEmpty_ = (buildRowCount_ == 0); return true; } @@ -261,7 +263,8 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() { return std::move(output_); } - if (advanceProbeRow()) { + // Try to advance the probe cursor; call finish if no more probe input. + if (advanceProbe()) { finishProbeInput(); } @@ -271,9 +274,15 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() { return std::move(output_); } -bool NestedLoopJoinProbe::advanceProbeRow() { +bool NestedLoopJoinProbe::advanceProbe() { if (hasProbedAllBuildData()) { - ++probeRow_; + // For cross joins, if there is a single record on the build side, we return + // batches containing all probe records from `input_` at a time. + if (isCrossJoin() && buildRowCount_ == 1) { + probeRow_ = input_->size(); + } else { + ++probeRow_; + } probeRowHasMatch_ = false; buildIndex_ = 0; @@ -292,7 +301,12 @@ bool NestedLoopJoinProbe::addToOutput() { // First, create a new output vector. By default, allocate space for // outputBatchSize_ rows. The output always generates dictionaries wrapped // around the probe vector being processed. - prepareOutput(); + // + // Since cross join batches can be returned without filter evaluation, no need + // to prepare output here. + if (!isCrossJoin()) { + prepareOutput(); + } while (!hasProbedAllBuildData()) { const auto& currentBuild = buildVectors_.value()[buildIndex_]; @@ -308,7 +322,7 @@ bool NestedLoopJoinProbe::addToOutput() { // return the output vector directly, which is composed of the build // projections at `probeRow_` (as constants), and current vector of the // build side. Also don't need to bother about adding mismatched rows. - if (joinCondition_ == nullptr) { + if (isCrossJoin()) { output_ = getNextCrossProductBatch( currentBuild, outputType_, identityProjections_, buildProjections_); numOutputRows_ = output_->size(); @@ -355,7 +369,9 @@ bool NestedLoopJoinProbe::addToOutput() { // Check if the current probed row needs to be added as a mismatch (for left // and full outer joins). checkProbeMismatchRow(); - output_->resize(numOutputRows_); + if (output_ != nullptr) { + output_->resize(numOutputRows_); + } // Signals that all input has been generated for the probeRow and build // vectors; safe to move to the next probe record. @@ -418,19 +434,41 @@ RowVectorPtr NestedLoopJoinProbe::getNextCrossProductBatch( const RowTypePtr& outputType, const std::vector& probeProjections, const std::vector& buildProjections) { + VELOX_CHECK_GT(buildVector->size(), 0); std::vector projectedChildren(outputType->size()); - const auto numOutputRows = buildVector->size(); + size_t numOutputRows = 0; - // Project columns from the build side. - projectChildren( - projectedChildren, buildVector, buildProjections, numOutputRows, nullptr); + // If it's a cross join and there is a single build record, we use the entire + // probe batch `input_` and the single build record wrapped as a constant. + if (isCrossJoin() && buildRowCount_ == 1) { + numOutputRows = input_->size(); - // Wrap projections from the probe side as constants. - for (auto [inputChannel, outputChannel] : probeProjections) { - projectedChildren[outputChannel] = BaseVector::wrapInConstant( - numOutputRows, probeRow_, input_->childAt(inputChannel)); - } + // Project columns from the probe side. + projectChildren( + projectedChildren, input_, probeProjections, numOutputRows, nullptr); + // Wrap projections from the build side as constants. + for (const auto [inputChannel, outputChannel] : buildProjections) { + projectedChildren[outputChannel] = BaseVector::wrapInConstant( + numOutputRows, 0, buildVector->childAt(inputChannel)); + } + } else { + numOutputRows = buildVector->size(); + + // Project columns from the build side. + projectChildren( + projectedChildren, + buildVector, + buildProjections, + numOutputRows, + nullptr); + + // Wrap projections from the probe side as constants. + for (const auto [inputChannel, outputChannel] : probeProjections) { + projectedChildren[outputChannel] = BaseVector::wrapInConstant( + numOutputRows, probeRow_, input_->childAt(inputChannel)); + } + } return std::make_shared( pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren)); } @@ -477,6 +515,7 @@ void NestedLoopJoinProbe::checkProbeMismatchRow() { // to add a probe mismatch record. if (needsProbeMismatch(joinType_) && hasProbedAllBuildData() && !probeRowHasMatch_) { + prepareOutput(); addProbeMismatchRow(); ++numOutputRows_; } @@ -551,7 +590,7 @@ RowVectorPtr NestedLoopJoinProbe::getBuildMismatchedOutput( // product but the build or probe side is empty, there could still be // mismatched rows from the other side. if (matched.isAllSelected() || - (joinCondition_ == nullptr && !probeSideEmpty_ && !buildSideEmpty_)) { + (isCrossJoin() && !probeSideEmpty_ && !buildSideEmpty_)) { return nullptr; } diff --git a/velox/exec/NestedLoopJoinProbe.h b/velox/exec/NestedLoopJoinProbe.h index 0f75f910b304..2fad3840cc31 100644 --- a/velox/exec/NestedLoopJoinProbe.h +++ b/velox/exec/NestedLoopJoinProbe.h @@ -27,7 +27,8 @@ namespace facebook::velox::exec { /// /// This class is generally useful to evaluate non-equi-joins (e.g. "k1 >= k2"), /// when join conditions may need to be evaluated against a full cross product -/// of the input. +/// of the input. It can also implement cross-join semantics if joinCondition is +/// nullptr. /// /// The output follows the order of the probe side rows (for inner and left /// joins). All build vectors are materialized upfront (check buildVectors_), @@ -49,10 +50,14 @@ namespace facebook::velox::exec { /// been matched by any of the peers. /// /// The output always contains dictionaries wrapped around probe columns, and -/// copies for build columns. The buid-side copies are done lazily; it first -/// accumulates the ranges to be copied, then performs the copies in batch, -/// column-by-column. It produces at most `outputBatchSize_` records, but it may -/// produce fewer since the output needs to follow the probe vector boundaries. +/// copies for build columns. The only exception are cases when the build side +/// contains a single record. In that case, each probe batch will be wrapped +/// with the single build record (as a constant). +/// +/// The buid-side copies are done lazily; it first accumulates the ranges to be +/// copied, then performs the copies in batch, column-by-column. It produces at +/// most `outputBatchSize_` records, but it may produce fewer since the output +/// needs to follow the probe vector boundaries. class NestedLoopJoinProbe : public Operator { public: NestedLoopJoinProbe( @@ -116,7 +121,7 @@ class NestedLoopJoinProbe : public Operator { // Advances 'probeRow_' and resets required state information. Returns true // if there is not more probe data to be processed in the current `input_` // (and hence a new probe input is required). False otherwise. - bool advanceProbeRow(); + bool advanceProbe(); // Ensures a new batch of records is available at `output_` and ready to // receive rows. Batches have space for `outputBatchSize_`. @@ -135,9 +140,14 @@ class NestedLoopJoinProbe : public Operator { } // Generates the next batch of a cross product between probe and build. It - // uses the current probe record being processed (`probeRow_` from `intput_`) - // for probe projections, and the columns from buildVector for build + // handles two cases: + // + // #1. Use the current probe record being processed (`probeRow_` from + // `input_`) for probe projections, and the columns from buildVector for build // projections. + // #2. For cross joins, if there is a single build record, it uses the columns + // from the current probe batch (`input_`), and the single build record + // wrapped as a constant. // // Output projections can be specified so that this function can be used to // generate both filter input and actual output (in case there is no join @@ -194,6 +204,11 @@ class NestedLoopJoinProbe : public Operator { return (buildIndex_ >= buildVectors_.value().size()); } + /// Cross joins are translated into NLJ's without a join conditition. + bool isCrossJoin() const { + return joinCondition_ == nullptr; + } + // Wraps rows of 'data' that are not selected in 'matched' and projects // to the output according to 'projections'. 'nullProjections' is used to // create null column vectors in output for outer join. 'unmatchedMapping' is @@ -274,6 +289,9 @@ class NestedLoopJoinProbe : public Operator { std::optional> buildVectors_; bool buildSideEmpty_{false}; + // Total number of records from the build side (across all vectors). + vector_size_t buildRowCount_{0}; + // Index into `buildVectors_` for the build vector being currently processed. size_t buildIndex_{0};