Skip to content

Commit

Permalink
Serialization test mix
Browse files Browse the repository at this point in the history
  • Loading branch information
Orri Erling committed Feb 7, 2024
1 parent e4a2ce2 commit e9ebcd5
Show file tree
Hide file tree
Showing 35 changed files with 2,182 additions and 683 deletions.
5 changes: 5 additions & 0 deletions velox/benchmarks/tpch/TpchBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ DEFINE_int32(

DEFINE_int32(split_preload_per_driver, 2, "Prefetch split metadata");

DEFINE_int32(batch_bytes, 10 < 20, "Preferred Operator batch size bytes");

struct RunStats {
std::map<std::string, std::string> flags;
int64_t micros{0};
Expand Down Expand Up @@ -287,6 +289,9 @@ class TpchBenchmark {
params.planNode = tpchPlan.plan;
params.queryConfigs[core::QueryConfig::kMaxSplitPreloadPerDriver] =
std::to_string(FLAGS_split_preload_per_driver);
params.queryConfigs[core::QueryConfig::kPreferredOutputBatchBytes] =
std::to_string(FLAGS_batch_bytes);

const int numSplitsPerFile = FLAGS_num_splits_per_file;

bool noMoreSplits = false;
Expand Down
5 changes: 5 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ class ByteOutputStream {
}

void startWrite(int32_t initialSize) {
ranges_.clear();
isReversed_ = false;
allocatedBytes_ = 0;
current_ = nullptr;
lastRangeEnd_ = 0;
extend(initialSize);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ HashStringAllocator::~HashStringAllocator() {
clear();
}

void HashStringAllocator::clear() {
void HashStringAllocator::clear(bool) {
numFree_ = 0;
freeBytes_ = 0;
std::fill(std::begin(freeNonEmpty_), std::end(freeNonEmpty_), 0);
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class HashStringAllocator : public StreamArena {
}

// Frees all memory associated with 'this' and leaves 'this' ready for reuse.
void clear();
void clear(bool /*reservePreviousSize*/ = true) override;

memory::MemoryPool* FOLLY_NONNULL pool() const {
return pool_.pool();
Expand Down
11 changes: 11 additions & 0 deletions velox/common/memory/StreamArena.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ class StreamArena {
return pool_;
}

///
virtual void clear(bool /*reservePreviousSize*/ = true) {
allocations_.clear();
pool_->freeNonContiguous(allocation_);
currentRun_ = 0;
currentOffset_ = 0;
largeAllocations_.clear();
size_ = 0;
tinyRanges_.clear();
}

private:
memory::MemoryPool* const pool_;
const memory::MachinePageCount allocationQuantum_{2};
Expand Down
20 changes: 20 additions & 0 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,33 @@ RowVectorPtr Exchange::getOutput() {

uint64_t rawInputBytes{0};
vector_size_t resultOffset = 0;
int32_t numBatchMerges = 0;
for (const auto& page : currentPages_) {
rawInputBytes += page->size();

auto inputStream = page->prepareStreamForDeserialize();

while (!inputStream.atEnd()) {
if (resultOffset > 0) {
++numBatchMerges;
}
getSerde()->deserialize(
&inputStream, pool(), outputType_, &result_, resultOffset);
const auto newRows = result_->size() - resultOffset;
resultOffset = result_->size();
int32_t constantRows = 0;
for (auto i = 0; i < result_->childrenSize(); ++i) {
auto& column = result_->childAt(i);
if (column->encoding() == VectorEncoding::Simple::CONSTANT) {
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat("constantRows", RuntimeCounter(newRows));
}
if (column->encoding() == VectorEncoding::Simple::DICTIONARY) {
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
"dictionaryRows", RuntimeCounter(newRows));
}
}
}
}

Expand All @@ -135,6 +153,8 @@ RowVectorPtr Exchange::getOutput() {
lockedStats->rawInputBytes += rawInputBytes;
lockedStats->rawInputPositions += result_->size();
lockedStats->addInputVector(result_->estimateFlatSize(), result_->size());
lockedStats->addRuntimeStat(
"numBatchMerges", RuntimeCounter(numBatchMerges));
}

return result_;
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ void HashBuild::removeInputRowsForAntiJoinFilter() {
changed = true;
// NOTE: the true value of a raw null bit indicates non-null so we AND
// 'rawActiveRows' with the raw bit.
bits::andBits(rawActiveRows, decoded.nulls(), 0, activeRows_.end());
bits::andBits(
rawActiveRows, decoded.nulls(&activeRows_), 0, activeRows_.end());
}
};
for (auto channel : keyFilterChannels_) {
Expand Down Expand Up @@ -346,7 +347,7 @@ void HashBuild::addInput(RowVectorPtr input) {
for (auto& hasher : hashers) {
auto& decoded = hasher->decodedVector();
if (decoded.mayHaveNulls()) {
auto* nulls = decoded.nulls();
auto* nulls = decoded.nulls(&activeRows_);
if (nulls && bits::countNulls(nulls, 0, activeRows_.end()) > 0) {
joinHasNullKeys_ = true;
break;
Expand Down
13 changes: 12 additions & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,10 +998,21 @@ void HashProbe::prepareFilterRowsForNullAwareJoin(
filterInputColumnDecodedVector_.decode(
*filterInput_->childAt(projection.outputChannel), filterInputRows_);
if (filterInputColumnDecodedVector_.mayHaveNulls()) {
std::vector<uint64_t> nullsInActiveRows(bits::nwords(numRows));
memcpy(
nullsInActiveRows.data(),
filterInputColumnDecodedVector_.nulls(&filterInputRows_),
bits::nbytes(numRows));
// All rows that are not active count as non-null here.
bits::orWithNegatedBits(
nullsInActiveRows.data(),
filterInputRows_.asRange().bits(),
0,
numRows);
// NOTE: the false value of a raw null bit indicates null so we OR with
// negative of the raw bit.
bits::orWithNegatedBits(
rawNullRows, filterInputColumnDecodedVector_.nulls(), 0, numRows);
rawNullRows, nullsInActiveRows.data(), 0, numRows);
}
}
nullFilterInputRows_.updateBounds();
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void deselectRowsWithNulls(
auto& decoded = hashers[i]->decodedVector();
if (decoded.mayHaveNulls()) {
anyChange = true;
const auto* nulls = hashers[i]->decodedVector().nulls();
const auto* nulls = hashers[i]->decodedVector().nulls(&rows);
bits::andBits(rows.asMutableRange().bits(), nulls, 0, rows.end());
}
}
Expand Down Expand Up @@ -219,7 +219,7 @@ vector_size_t processEncodedFilterResults(
DecodedVector& decoded = filterEvalCtx.decodedResult;
decoded.decode(*filterResult.get(), rows);
auto values = decoded.data<uint64_t>();
auto nulls = decoded.nulls();
auto nulls = decoded.nulls(&rows);
auto indices = decoded.indices();

vector_size_t passed = 0;
Expand Down
Loading

0 comments on commit e9ebcd5

Please sign in to comment.