Skip to content

Commit

Permalink
Add status return and fixes for nulls (facebookincubator#10954)
Browse files Browse the repository at this point in the history
Summary:
- Add per instruction status return block above the BlockStatus array. Copy the Blockstatus array to host and do not use unified memory for this. The instruction return has a grid-scope status where all grid scope statuses are directly after the the BlockStatuses. Additionally there can be a block level status with n bytes per TB. These are after all the grid level statuses.

- Zero out the grid level statuses in reader, so that subsequent ops are guaranteed a zero grid level status. Block level statuses are uninitialized. If an instruction is continuable, it will first set its grid status and optionally its block level statuses.

- Add one status copy at the end of each pipeline. This serves for all error and continuability information.
- test coverage for cases with nulls with different numbers of rows per tb, including a case of TB doing more rows than the stripe has, in which case griddize can be skipped.

- Add an optional counter for tracking host to device transfer latency separately from compute latency.

Pull Request resolved: facebookincubator#10954

Reviewed By: Yuhta

Differential Revision: D62397804

Pulled By: oerling

fbshipit-source-id: 3ede44b15289838d183c979c50168c931f03266b
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Sep 12, 2024
1 parent baaf559 commit 741876f
Show file tree
Hide file tree
Showing 24 changed files with 507 additions and 146 deletions.
1 change: 1 addition & 0 deletions velox/experimental/wave/common/Cuda.cu
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void Stream::hostToDeviceAsync(
size,
cudaMemcpyHostToDevice,
stream_->stream));
isTransfer_ = true;
}

void Stream::deviceToHostAsync(
Expand Down
6 changes: 6 additions & 0 deletions velox/experimental/wave/common/Cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ class Stream {
void*& userData() {
return userData_;
}
bool getAndClearIsTransfer() {
auto flag = isTransfer_;
isTransfer_ = false;
return flag;
}

protected:
std::unique_ptr<StreamImpl> stream_;
void* userData_{nullptr};
bool isTransfer_{false};

friend class Event;
};
Expand Down
1 change: 1 addition & 0 deletions velox/experimental/wave/dwio/ColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class ReadStream : public Executable {
// Leading bytes in control_->deviceData used for BlockStatus. Cleared on
// device. The bytes after that are set on host and then prefetched to device.
int32_t statusBytes_{0};
int32_t gridStatusBytes_{0};
LaunchControl* control_{nullptr};

// Set to true when after first griddize() and akeOps().
Expand Down
19 changes: 15 additions & 4 deletions velox/experimental/wave/dwio/FormatData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ std::unique_ptr<GpuDecode> FormatData::makeStep(
ColumnOp& op,
const ColumnOp* previousFilter,
ResultStaging& deviceStaging,
SplitStaging& splitStaging,
ReadStream& stream,
WaveTypeKind columnKind,
int32_t blockIdx) {
Expand All @@ -206,15 +207,25 @@ std::unique_ptr<GpuDecode> FormatData::makeStep(
rowsPerBlock, op.rows.size() - (blockIdx * rowsPerBlock));

auto step = std::make_unique<GpuDecode>();
if (grid_.nulls) {
step->nonNullBases = grid_.numNonNull;
step->nulls = grid_.nulls;
bool hasNulls = false;
if (nullsBufferId_ != kNoBufferId) {
hasNulls = true;
if (grid_.nulls) {
step->nonNullBases = grid_.numNonNull;
step->nulls = grid_.nulls;
} else {
// The nulls transfer is staged but no pointer yet. Happens when the nulls
// decoding is in the same kernel as decode, i.e. single TB per column.
splitStaging.registerPointer(nullsBufferId_, &step->nulls, true);
step->nonNullBases = nullptr;
}
}
step->numRowsPerThread = bits::roundUp(rowsInBlock, kBlockSize) / kBlockSize;
step->gridNumRowsPerThread = maxRowsPerThread;
setFilter(step.get(), op.reader, nullptr);
bool dense = previousFilter == nullptr &&
simd::isDense(op.rows.data(), op.rows.size());
step->nullMode = grid_.nulls
step->nullMode = hasNulls
? (dense ? NullMode::kDenseNullable : NullMode::kSparseNullable)
: (dense ? NullMode::kDenseNonNull : NullMode::kSparseNonNull);
step->nthBlock = blockIdx;
Expand Down
4 changes: 4 additions & 0 deletions velox/experimental/wave/dwio/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,16 @@ class FormatData {
ColumnOp& op,
const ColumnOp* previousFilter,
ResultStaging& deviceStaging,
SplitStaging& splitStaging,
ReadStream& stream,
WaveTypeKind columnKind,
int32_t blockIdx);

// Staging id for nulls.
int32_t nullsStagingId_{SplitStaging::kNoStaging};
// Id for nulls buffer. The nulls buffer has no address at time of scheduling
// if nulls decode is in the same kernel as decoding.
BufferId nullsBufferId_{kNoBufferId};
// id of last splitStaging 'this' depends on.
int32_t lastStagingId_{SplitStaging::kNoStaging};

Expand Down
44 changes: 34 additions & 10 deletions velox/experimental/wave/dwio/ReadStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ ReadStream::ReadStream(
FileInfo& fileInfo,
const OperandSet* firstColumns)
: Executable(), ioStats_(ioStats), fileInfo_(fileInfo) {
VELOX_CHECK_EQ(
0,
FLAGS_wave_reader_rows_per_tb & 1023,
"wave_reader_rows_per_tb must be a multiple of 1K");
waveStream = &_waveStream;
allOperands(columnReader, outputOperands, &abstractOperands_);
output.resize(outputOperands.size());
Expand Down Expand Up @@ -85,17 +89,23 @@ void ReadStream::prefetchStatus(Stream* stream) {
return;
}
char* data = control_->deviceData->as<char>();
auto size = control_->deviceData->size();
stream->prefetch(getDevice(), data, size);
auto size = control_->deviceData->size() - (statusBytes_ + gridStatusBytes_);
stream->prefetch(getDevice(), data + statusBytes_ + gridStatusBytes_, size);
}

namespace {
void maybeRecordTransferTime(Stream& stream, WaveStream& waveStream) {
if (stream.getAndClearIsTransfer() && FLAGS_wave_transfer_timing) {
WaveTimer t(waveStream.mutableStats().transferWaitTime);
stream.wait();
}
}
} // namespace

void ReadStream::makeGrid(Stream* stream) {
programs_.clear();
auto total = reader_->formatData()->totalRows();
auto blockSize = FLAGS_wave_reader_rows_per_tb;
if (total < blockSize) {
return;
}
auto numBlocks = bits::roundUp(total, blockSize) / blockSize;
auto& children = reader_->children();
for (auto i = 0; i < children.size(); ++i) {
Expand Down Expand Up @@ -126,6 +136,7 @@ void ReadStream::makeGrid(Stream* stream) {
LaunchParams params(waveStream->deviceArena());
WaveBufferPtr extra;
{
maybeRecordTransferTime(*stream, *waveStream);
PrintTime l("grid");
launchDecode(programs_, params, stream);
}
Expand Down Expand Up @@ -154,6 +165,7 @@ void ReadStream::makeCompact(bool isSerial) {
step->numRowsPerThread = blockIdx == numTBs - 1
? numBlocks_ - (numTBs - 1) * maxRowsPerThread
: maxRowsPerThread;
step->gridNumRowsPerThread = maxRowsPerThread;
if (filters_.back().deviceResult) {
step->data.compact.finalRows =
filters_.back().deviceResult + blockIdx * rowsPerBlock;
Expand Down Expand Up @@ -298,12 +310,15 @@ bool ReadStream::makePrograms(bool& needSync) {
}
}
filtersDone_ = true;
if (filters_.empty() && allDone) {
if ((filters_.empty() || gridStatusBytes_ > 0) && allDone) {
auto setCount = std::make_unique<GpuDecode>();
setCount->step = DecodeStep::kRowCountNoFilter;
setCount->data.rowCountNoFilter.numRows = rows_.size();
setCount->data.rowCountNoFilter.status =
control_->deviceData->as<BlockStatus>();
setCount->data.rowCountNoFilter.gridStatusSize = gridStatusBytes_;
setCount->data.rowCountNoFilter.gridOnly = !filters_.empty();

programs_.programs.emplace_back();
programs_.programs.back().push_back(std::move(setCount));
}
Expand Down Expand Up @@ -382,6 +397,7 @@ void ReadStream::launch(
readStream->syncStaging(*stream);
LaunchParams params(waveStream->deviceArena());
{
maybeRecordTransferTime(*stream, *readStream->waveStream);
PrintTime l("decode");
launchDecode(readStream->programs(), params, stream);
}
Expand All @@ -407,7 +423,7 @@ void ReadStream::launch(
LaunchParams params(readStream->waveStream->deviceArena());
readStream->syncStaging(*stream);
{
// stream->wait();
maybeRecordTransferTime(*stream, *readStream->waveStream);
PrintTime l("decode-f");
launchDecode(readStream->programs(), params, stream);
}
Expand All @@ -425,13 +441,19 @@ void ReadStream::makeControl() {
waveStream->setNumRows(numRows);
WaveStream::ExeLaunchInfo info;
waveStream->exeLaunchInfo(*this, numBlocks_, info);
auto instructionStatus = waveStream->instructionStatus();
int32_t instructionBytes =
instructionStatusSize(instructionStatus, numBlocks_);
statusBytes_ = bits::roundUp(sizeof(BlockStatus) * numBlocks_, 8);
auto deviceBytes = statusBytes_ + info.totalBytes;
auto deviceBytes = statusBytes_ + instructionBytes + info.totalBytes;
auto control = std::make_unique<LaunchControl>(0, numRows);
control->deviceData = waveStream->arena().allocate<char>(deviceBytes);
// The operand section must be cleared before written on host. The statuses
// are cleared on device.
memset(control->deviceData->as<char>(), 0, deviceBytes);
memset(
control->deviceData->as<char>() + statusBytes_ + instructionBytes,
0,
info.totalBytes);
control->params.status = control->deviceData->as<BlockStatus>();
for (auto& reader : reader_->children()) {
if (!reader->formatData()->hasNulls() || reader->hasNonNullFilter()) {
Expand All @@ -442,7 +464,9 @@ void ReadStream::makeControl() {
}
}
operands = waveStream->fillOperands(
*this, control->deviceData->as<char>() + statusBytes_, info)[0];
*this,
control->deviceData->as<char>() + statusBytes_ + instructionBytes,
info)[0];
control_ = control.get();
waveStream->setLaunchControl(0, 0, std::move(control));
}
Expand Down
9 changes: 8 additions & 1 deletion velox/experimental/wave/dwio/decode/DecodeStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,14 @@ struct alignas(16) GpuDecode {
// launched in the same grid but are independent. The ordinal for non-first
// TBs gets the base index for values.
uint16_t nthBlock{0};

/// Number of rows to process per thread of this block. This is equal across
/// the grid, except for last block.
uint16_t numRowsPerThread{1};

/// Number of rows per thread in the grid, same for all blocks including the
/// last one.
uint16_t gridNumRowsPerThread{1};

/// Number of rows to decode. if kFilterHits, the previous GpuDecode gives
/// this number in BlockStatus. If 'rows' is set, this is the number of valid
/// elements in 'rows'. If 'rows' is not set, the start is ''baseRow'
Expand Down Expand Up @@ -325,6 +330,8 @@ struct alignas(16) GpuDecode {
struct RowCountNoFilter {
int32_t numRows;
BlockStatus* status;
int32_t gridStatusSize;
bool gridOnly;
};

struct CountBits {
Expand Down
Loading

0 comments on commit 741876f

Please sign in to comment.