Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 7, 2024
1 parent c55e8e7 commit e558c5e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 49 deletions.
61 changes: 19 additions & 42 deletions cpp/velox/utils/VeloxBatchAppender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,34 @@ gluten::VeloxBatchAppender::VeloxBatchAppender(
: pool_(pool), minOutputBatchSize_(minOutputBatchSize), in_(std::move(in)) {}

std::shared_ptr<ColumnarBatch> VeloxBatchAppender::next() {
if (eos_) {
auto cb = in_->next();
if (cb == nullptr) {
// Input iterator was drained.
return nullptr;
}
if (full()) {
return flush();
if (cb->numRows() >= minOutputBatchSize_) {
// Fast flush path.
return cb;
}
do {
auto cb = in_->next();
if (cb == nullptr) {
// End of stream.
eos_ = true;
if (buffer_ != nullptr) {
return flush();
}
return nullptr;
}
if (buffer_ == nullptr && cb->numRows() >= minOutputBatchSize_) {
// Fast flush path.
return cb;
}
append(cb);
} while (!full());
// Buffer is full.
return flush();
}

int64_t VeloxBatchAppender::spillFixedSize(int64_t size) {
return in_->spillFixedSize(size);
}

void VeloxBatchAppender::append(std::shared_ptr<ColumnarBatch> cb) {
auto vb = VeloxColumnarBatch::from(pool_, cb);
auto rv = vb->getRowVector();
if (buffer_ == nullptr) {
buffer_ = facebook::velox::RowVector::createEmpty(rv->type(), pool_);
}
buffer_->append(rv.get());
}

bool VeloxBatchAppender::full() {
if (buffer_ == nullptr) {
return false;
auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_);
buffer->append(rv.get());

for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
auto nextVb = VeloxColumnarBatch::from(pool_, cb);
auto nextRv = nextVb->getRowVector();
buffer->append(nextRv.get());
if (buffer->size() >= minOutputBatchSize_) {
// Buffer is full.
break;
}
}
return buffer_->size() >= minOutputBatchSize_;
return std::make_shared<VeloxColumnarBatch>(buffer);
}

std::shared_ptr<ColumnarBatch> VeloxBatchAppender::flush() {
GLUTEN_DCHECK(buffer_ != nullptr, "Assertion failed");
auto out = std::make_shared<VeloxColumnarBatch>(buffer_);
buffer_ = nullptr;
return out;
int64_t VeloxBatchAppender::spillFixedSize(int64_t size) {
return in_->spillFixedSize(size);
}
} // namespace gluten
7 changes: 0 additions & 7 deletions cpp/velox/utils/VeloxBatchAppender.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,9 @@ class VeloxBatchAppender : public ColumnarBatchIterator {
int64_t spillFixedSize(int64_t size) override;

private:
void append(std::shared_ptr<ColumnarBatch> cb);

bool full();

std::shared_ptr<ColumnarBatch> flush();

facebook::velox::memory::MemoryPool* pool_;
const int32_t minOutputBatchSize_;
std::unique_ptr<ColumnarBatchIterator> in_;
bool eos_{false};
facebook::velox::RowVectorPtr buffer_{nullptr};
};
} // namespace gluten

0 comments on commit e558c5e

Please sign in to comment.