Skip to content

Commit

Permalink
Optimize cross joins for single record build side (facebookincubator#…
Browse files Browse the repository at this point in the history
…10690)

Summary:
Pull Request resolved: facebookincubator#10690

Turns out that having a single record on the build side is common as
users often cross-join relations with literals. Cross joins in Velox are
implemented using Nested Loop Join (NLJ). Adding an optimization for this
case where the single build record gets wrapped with entire probe batches, so
that probe can be processed one batch at a time.

Reviewed By: Yuhta

Differential Revision: D60967489

fbshipit-source-id: 49cf39e73b7d2e87201dbb34b8461e8e7c6ae069
  • Loading branch information
pedroerp authored and facebook-github-bot committed Aug 9, 2024
1 parent 172fef2 commit c0f0a31
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 26 deletions.
75 changes: 57 additions & 18 deletions velox/exec/NestedLoopJoinProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ bool NestedLoopJoinProbe::getBuildData(ContinueFuture* future) {
}

buildVectors_ = std::move(buildData);
if (buildVectors_->empty()) {
buildSideEmpty_ = true;

for (const auto& build : buildVectors_.value()) {
buildRowCount_ += build->size();
}
buildSideEmpty_ = (buildRowCount_ == 0);
return true;
}

Expand Down Expand Up @@ -261,7 +263,8 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() {
return std::move(output_);
}

if (advanceProbeRow()) {
// Try to advance the probe cursor; call finish if no more probe input.
if (advanceProbe()) {
finishProbeInput();
}

Expand All @@ -271,9 +274,15 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() {
return std::move(output_);
}

bool NestedLoopJoinProbe::advanceProbeRow() {
bool NestedLoopJoinProbe::advanceProbe() {
if (hasProbedAllBuildData()) {
++probeRow_;
// For cross joins, if there is a single record on the build side, we return
// batches containing all probe records from `input_` at a time.
if (isCrossJoin() && buildRowCount_ == 1) {
probeRow_ = input_->size();
} else {
++probeRow_;
}
probeRowHasMatch_ = false;
buildIndex_ = 0;

Expand All @@ -292,7 +301,12 @@ bool NestedLoopJoinProbe::addToOutput() {
// First, create a new output vector. By default, allocate space for
// outputBatchSize_ rows. The output always generates dictionaries wrapped
// around the probe vector being processed.
prepareOutput();
//
// Since cross join batches can be returned without filter evaluation, no need
// to prepare output here.
if (!isCrossJoin()) {
prepareOutput();
}

while (!hasProbedAllBuildData()) {
const auto& currentBuild = buildVectors_.value()[buildIndex_];
Expand All @@ -308,7 +322,7 @@ bool NestedLoopJoinProbe::addToOutput() {
// return the output vector directly, which is composed of the build
// projections at `probeRow_` (as constants), and current vector of the
// build side. Also don't need to bother about adding mismatched rows.
if (joinCondition_ == nullptr) {
if (isCrossJoin()) {
output_ = getNextCrossProductBatch(
currentBuild, outputType_, identityProjections_, buildProjections_);
numOutputRows_ = output_->size();
Expand Down Expand Up @@ -355,7 +369,9 @@ bool NestedLoopJoinProbe::addToOutput() {
// Check if the current probed row needs to be added as a mismatch (for left
// and full outer joins).
checkProbeMismatchRow();
output_->resize(numOutputRows_);
if (output_ != nullptr) {
output_->resize(numOutputRows_);
}

// Signals that all input has been generated for the probeRow and build
// vectors; safe to move to the next probe record.
Expand Down Expand Up @@ -418,19 +434,41 @@ RowVectorPtr NestedLoopJoinProbe::getNextCrossProductBatch(
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK_GT(buildVector->size(), 0);
std::vector<VectorPtr> projectedChildren(outputType->size());
const auto numOutputRows = buildVector->size();
size_t numOutputRows = 0;

// Project columns from the build side.
projectChildren(
projectedChildren, buildVector, buildProjections, numOutputRows, nullptr);
// If it's a cross join and there is a single build record, we use the entire
// probe batch `input_` and the single build record wrapped as a constant.
if (isCrossJoin() && buildRowCount_ == 1) {
numOutputRows = input_->size();

// Wrap projections from the probe side as constants.
for (auto [inputChannel, outputChannel] : probeProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, probeRow_, input_->childAt(inputChannel));
}
// Project columns from the probe side.
projectChildren(
projectedChildren, input_, probeProjections, numOutputRows, nullptr);

// Wrap projections from the build side as constants.
for (const auto [inputChannel, outputChannel] : buildProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, 0, buildVector->childAt(inputChannel));
}
} else {
numOutputRows = buildVector->size();

// Project columns from the build side.
projectChildren(
projectedChildren,
buildVector,
buildProjections,
numOutputRows,
nullptr);

// Wrap projections from the probe side as constants.
for (const auto [inputChannel, outputChannel] : probeProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, probeRow_, input_->childAt(inputChannel));
}
}
return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}
Expand Down Expand Up @@ -477,6 +515,7 @@ void NestedLoopJoinProbe::checkProbeMismatchRow() {
// to add a probe mismatch record.
if (needsProbeMismatch(joinType_) && hasProbedAllBuildData() &&
!probeRowHasMatch_) {
prepareOutput();
addProbeMismatchRow();
++numOutputRows_;
}
Expand Down Expand Up @@ -551,7 +590,7 @@ RowVectorPtr NestedLoopJoinProbe::getBuildMismatchedOutput(
// product but the build or probe side is empty, there could still be
// mismatched rows from the other side.
if (matched.isAllSelected() ||
(joinCondition_ == nullptr && !probeSideEmpty_ && !buildSideEmpty_)) {
(isCrossJoin() && !probeSideEmpty_ && !buildSideEmpty_)) {
return nullptr;
}

Expand Down
34 changes: 26 additions & 8 deletions velox/exec/NestedLoopJoinProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace facebook::velox::exec {
///
/// This class is generally useful to evaluate non-equi-joins (e.g. "k1 >= k2"),
/// when join conditions may need to be evaluated against a full cross product
/// of the input.
/// of the input. It can also implement cross-join semantics if joinCondition is
/// nullptr.
///
/// The output follows the order of the probe side rows (for inner and left
/// joins). All build vectors are materialized upfront (check buildVectors_),
Expand All @@ -49,10 +50,14 @@ namespace facebook::velox::exec {
/// been matched by any of the peers.
///
/// The output always contains dictionaries wrapped around probe columns, and
/// copies for build columns. The buid-side copies are done lazily; it first
/// accumulates the ranges to be copied, then performs the copies in batch,
/// column-by-column. It produces at most `outputBatchSize_` records, but it may
/// produce fewer since the output needs to follow the probe vector boundaries.
/// copies for build columns. The only exception are cases when the build side
/// contains a single record. In that case, each probe batch will be wrapped
/// with the single build record (as a constant).
///
/// The buid-side copies are done lazily; it first accumulates the ranges to be
/// copied, then performs the copies in batch, column-by-column. It produces at
/// most `outputBatchSize_` records, but it may produce fewer since the output
/// needs to follow the probe vector boundaries.
class NestedLoopJoinProbe : public Operator {
public:
NestedLoopJoinProbe(
Expand Down Expand Up @@ -116,7 +121,7 @@ class NestedLoopJoinProbe : public Operator {
// Advances 'probeRow_' and resets required state information. Returns true
// if there is not more probe data to be processed in the current `input_`
// (and hence a new probe input is required). False otherwise.
bool advanceProbeRow();
bool advanceProbe();

// Ensures a new batch of records is available at `output_` and ready to
// receive rows. Batches have space for `outputBatchSize_`.
Expand All @@ -135,9 +140,14 @@ class NestedLoopJoinProbe : public Operator {
}

// Generates the next batch of a cross product between probe and build. It
// uses the current probe record being processed (`probeRow_` from `intput_`)
// for probe projections, and the columns from buildVector for build
// handles two cases:
//
// #1. Use the current probe record being processed (`probeRow_` from
// `input_`) for probe projections, and the columns from buildVector for build
// projections.
// #2. For cross joins, if there is a single build record, it uses the columns
// from the current probe batch (`input_`), and the single build record
// wrapped as a constant.
//
// Output projections can be specified so that this function can be used to
// generate both filter input and actual output (in case there is no join
Expand Down Expand Up @@ -194,6 +204,11 @@ class NestedLoopJoinProbe : public Operator {
return (buildIndex_ >= buildVectors_.value().size());
}

/// Cross joins are translated into NLJ's without a join conditition.
bool isCrossJoin() const {
return joinCondition_ == nullptr;
}

// Wraps rows of 'data' that are not selected in 'matched' and projects
// to the output according to 'projections'. 'nullProjections' is used to
// create null column vectors in output for outer join. 'unmatchedMapping' is
Expand Down Expand Up @@ -274,6 +289,9 @@ class NestedLoopJoinProbe : public Operator {
std::optional<std::vector<RowVectorPtr>> buildVectors_;
bool buildSideEmpty_{false};

// Total number of records from the build side (across all vectors).
vector_size_t buildRowCount_{0};

// Index into `buildVectors_` for the build vector being currently processed.
size_t buildIndex_{0};

Expand Down

0 comments on commit c0f0a31

Please sign in to comment.