Skip to content

Commit

Permalink
cache multiple inputs and split
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 5, 2024
1 parent 15c99ba commit 3f55ea4
Show file tree
Hide file tree
Showing 24 changed files with 401 additions and 334 deletions.
5 changes: 3 additions & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_stop( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleWriterHandle) {
jlong shuffleWriterHandle,
jlong memLimit) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

Expand All @@ -963,7 +964,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
throw gluten::GlutenException(errorMessage);
}

gluten::arrowAssertOkOrThrow(shuffleWriter->stop(), "Native shuffle write: ShuffleWriter stop failed");
gluten::arrowAssertOkOrThrow(shuffleWriter->stop(memLimit), "Native shuffle write: ShuffleWriter stop failed");

const auto& partitionLengths = shuffleWriter->partitionLengths();
auto partitionLengthArr = env->NewLongArray(partitionLengths.size());
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/operators/serializer/ColumnarBatchSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace gluten {

class ColumnarBatchSerializer {
public:
ColumnarBatchSerializer(arrow::MemoryPool* arrowPool, struct ArrowSchema* cSchema) : arrowPool_(arrowPool) {}
ColumnarBatchSerializer(arrow::MemoryPool* arrowPool) : arrowPool_(arrowPool) {}

virtual ~ColumnarBatchSerializer() = default;

Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/FallbackRangePartitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ arrow::Status gluten::FallbackRangePartitioner::compute(
std::vector<uint32_t>& row2Partition,
std::vector<uint32_t>& partition2RowCount) {
row2Partition.resize(numRows);
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
for (auto i = 0; i < numRows; ++i) {
auto pid = pidArr[i];
if (pid >= numPartitions_) {
Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/HashPartitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ arrow::Status gluten::HashPartitioner::compute(
std::vector<uint32_t>& row2partition,
std::vector<uint32_t>& partition2RowCount) {
row2partition.resize(numRows);
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);

for (auto i = 0; i < numRows; ++i) {
auto pid = computePid(pidArr, i, numPartitions_);
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace gluten {

static constexpr int16_t kDefaultBatchSize = 4096;
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultSortBufferThreshold = std::numeric_limits<int64_t>::max();
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
Expand Down
1 change: 1 addition & 0 deletions cpp/core/shuffle/Partitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "shuffle/RandomPartitioner.h"
#include "shuffle/RoundRobinPartitioner.h"
#include "shuffle/SinglePartitioner.h"
#include "utils/exception.h"

namespace gluten {

Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/RandomPartitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ arrow::Status gluten::RandomPartitioner::compute(
const int64_t numRows,
std::vector<uint32_t>& row2Partition,
std::vector<uint32_t>& partition2RowCount) {
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
row2Partition.resize(numRows);

for (int32_t i = 0; i < numRows; ++i) {
Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/RoundRobinPartitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ arrow::Status gluten::RoundRobinPartitioner::compute(
const int64_t numRows,
std::vector<uint32_t>& row2Partition,
std::vector<uint32_t>& partition2RowCount) {
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
row2Partition.resize(numRows);

for (int32_t i = 0; i < numRows; ++i) {
Expand Down
15 changes: 6 additions & 9 deletions cpp/core/shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ShuffleWriter : public Reclaimable {

virtual arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) = 0;

virtual arrow::Status stop() = 0;
virtual arrow::Status stop(int64_t memLimit) = 0;

int32_t numPartitions() const {
return numPartitions_;
Expand Down Expand Up @@ -99,7 +99,11 @@ class ShuffleWriter : public Reclaimable {
options_(std::move(options)),
pool_(pool),
partitionBufferPool_(std::make_unique<ShuffleMemoryPool>(pool)),
partitionWriter_(std::move(partitionWriter)) {}
partitionWriter_(std::move(partitionWriter)) {
GLUTEN_ASSIGN_OR_THROW(
partitioner_,
partitioner_ = Partitioner::make(options_.partitioning, numPartitions_, options_.startPartitionId));
}

virtual ~ShuffleWriter() = default;

Expand All @@ -114,13 +118,6 @@ class ShuffleWriter : public Reclaimable {

std::unique_ptr<PartitionWriter> partitionWriter_;

std::vector<int64_t> rowVectorLengths_;

std::shared_ptr<arrow::Schema> schema_;

// Column index, partition id, buffers.
std::vector<std::vector<std::vector<std::shared_ptr<arrow::ResizableBuffer>>>> partitionBuffers_;

std::shared_ptr<Partitioner> partitioner_;

ShuffleWriterMetrics metrics_{};
Expand Down
1 change: 1 addition & 0 deletions cpp/core/tests/RoundRobinPartitionerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class RoundRobinPartitionerTest : public ::testing::Test {
row2Partition_.clear();
partition2RowCount_.clear();
partition2RowCount_.resize(numPart);
std::fill(std::begin(partition2RowCount_), std::end(partition2RowCount_), 0);
}

void checkResult(const std::vector<uint32_t>& expectRow2Part, const std::vector<uint32_t>& expectPart2RowCount)
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void runShuffle(
while (resultIter->hasNext()) {
GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit));
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
GLUTEN_THROW_NOT_OK(shuffleWriter->stop(ShuffleWriter::kMinMemLimit));
}

populateWriterMetrics(shuffleWriter, totalTime, metrics);
Expand Down
18 changes: 15 additions & 3 deletions cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema)
: ColumnarBatchSerializer(arrowPool, cSchema), veloxPool_(std::move(veloxPool)) {
: ColumnarBatchSerializer(arrowPool), veloxPool_(std::move(veloxPool)) {
// serializeColumnarBatches don't need rowType_
if (cSchema != nullptr) {
rowType_ = asRowType(importFromArrow(*cSchema));
ArrowSchemaRelease(cSchema); // otherwise the c schema leaks memory
}
serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
options_.useLosslessTimestamp = true;
initSerde();
}

VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
facebook::velox::RowTypePtr rowType)
: ColumnarBatchSerializer(arrowPool), veloxPool_(std::move(veloxPool)), rowType_(std::move(rowType)) {
initSerde();
}

std::shared_ptr<arrow::Buffer> VeloxColumnarBatchSerializer::serializeColumnarBatches(
Expand Down Expand Up @@ -88,4 +95,9 @@ std::shared_ptr<ColumnarBatch> VeloxColumnarBatchSerializer::deserialize(uint8_t
serde_->deserialize(byteStream.get(), veloxPool_.get(), rowType_, &result, &options_);
return std::make_shared<VeloxColumnarBatch>(result);
}

void VeloxColumnarBatchSerializer::initSerde() {
serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
options_.useLosslessTimestamp = true;
}
} // namespace gluten
7 changes: 7 additions & 0 deletions cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema);

VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
facebook::velox::RowTypePtr rowType);

std::shared_ptr<arrow::Buffer> serializeColumnarBatches(
const std::vector<std::shared_ptr<ColumnarBatch>>& batches) override;

std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size) override;

private:
void initSerde();

std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
facebook::velox::RowTypePtr rowType_;
std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde> serde_;
Expand Down
Loading

0 comments on commit 3f55ea4

Please sign in to comment.