Skip to content

Commit

Permalink
Merge branch 'main' into centos8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun authored Nov 3, 2023
2 parents 798e4a1 + 55f1480 commit b0f17a5
Show file tree
Hide file tree
Showing 28 changed files with 170 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.glutenproject.substrait.plan.PlanNode;

import com.google.protobuf.Any;
import io.substrait.proto.Plan;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

Expand Down Expand Up @@ -80,12 +79,12 @@ private PlanNode buildNativeConfNode(Map<String, String> confs) {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public GeneralOutIterator createKernelWithBatchIterator(
Plan wsPlan, List<GeneralInIterator> iterList, boolean materializeInput) {
byte[] wsPlan, List<GeneralInIterator> iterList, boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
getPlanBytesBuf(wsPlan),
wsPlan,
iterList.toArray(new GeneralInIterator[0]),
buildNativeConfNode(
GlutenConfig.getNativeBackendConf(
Expand Down Expand Up @@ -115,10 +114,6 @@ public GeneralOutIterator createKernelWithBatchIterator(
return createOutIterator(handle);
}

private byte[] getPlanBytesBuf(Plan planNode) {
return planNode.toByteArray();
}

private GeneralOutIterator createOutIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
fileFormats(i)),
SoftAffinityUtil.getFilePartitionLocations(f))
case _ =>
throw new UnsupportedOperationException(s"Unsupport operators.")
throw new UnsupportedOperationException(s"Unsupported input partition.")
})
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1))
val substraitPlan = wsCxt.root.toProtobuf
if (index < 3) {
if (index == 0) {
logOnLevel(
GlutenConfig.getConf.substraitPlanLogLevel,
s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil
.substraitPlanToJson(substraitPlan)}"
)
}
GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2)
GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2)
}

/**
Expand Down Expand Up @@ -185,7 +185,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}.asJava)
// we need to complete dependency RDD's firstly
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf,
rootNode.toProtobuf.toByteArray,
columnarNativeIterator,
materializeInput)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1))
val substraitPlan = wsCxt.root.toProtobuf
GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2)
GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2)
}

/**
Expand Down Expand Up @@ -187,7 +187,9 @@ class IteratorApiImpl extends IteratorApi with Logging {
iter => new ColumnarBatchInIterator(iter.asJava)
}.asJava)
val nativeResultIterator =
transKernel.createKernelWithBatchIterator(rootNode.toProtobuf, columnarNativeIterator)
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
columnarNativeIterator)

pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild)

Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,9 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
env->ReleaseStringUTFChars(dataFileJstr, dataFileC);

auto localDirs = env->GetStringUTFChars(localDirsJstr, JNI_FALSE);
setenv(gluten::kGlutenSparkLocalDirs.c_str(), localDirs, 1);
shuffleWriterOptions.local_dirs = std::string(localDirs);
env->ReleaseStringUTFChars(localDirsJstr, localDirs);

partitionWriterCreator = std::make_shared<LocalPartitionWriterCreator>();
} else if (partitionWriterType == "celeborn") {
shuffleWriterOptions.partition_writer_type = PartitionWriterType::kCeleborn;
Expand Down
13 changes: 2 additions & 11 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <thread>
#include "shuffle/Utils.h"
#include "utils/DebugOut.h"
#include "utils/StringUtil.h"
#include "utils/Timer.h"

namespace gluten {
Expand Down Expand Up @@ -169,22 +170,12 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
}

arrow::Status LocalPartitionWriter::setLocalDirs() {
ARROW_ASSIGN_OR_RAISE(configuredDirs_, getConfiguredLocalDirs());
configuredDirs_ = splitPaths(shuffleWriter_->options().local_dirs);
// Shuffle the configured local directories. This prevents each task from using the same directory for spilled files.
std::random_device rd;
std::default_random_engine engine(rd());
std::shuffle(configuredDirs_.begin(), configuredDirs_.end(), engine);

subDirSelection_.assign(configuredDirs_.size(), 0);

// Both data_file and shuffle_index_file should be set through jni.
// For test purpose, Create a temporary subdirectory in the system temporary
// dir with prefix "columnar-shuffle"
if (shuffleWriter_->options().data_file.length() == 0) {
std::string dataFileTemp;
size_t id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % configuredDirs_.size();
ARROW_ASSIGN_OR_RAISE(shuffleWriter_->options().data_file, createTempShuffleFile(configuredDirs_[id]));
}
return arrow::Status::OK();
}

Expand Down
27 changes: 2 additions & 25 deletions cpp/core/shuffle/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/

#include "shuffle/Utils.h"
#include "options.h"
#include "utils/StringUtil.h"

#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
Expand All @@ -39,31 +41,6 @@ std::string gluten::getSpilledShuffleFileDir(const std::string& configuredDir, i
return dir;
}

arrow::Result<std::vector<std::string>> gluten::getConfiguredLocalDirs() {
auto joinedDirsC = std::getenv(kGlutenSparkLocalDirs.c_str());
if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) {
auto joinedDirs = std::string(joinedDirsC);
std::string delimiter = ",";

size_t pos;
std::vector<std::string> res;
while ((pos = joinedDirs.find(delimiter)) != std::string::npos) {
auto dir = joinedDirs.substr(0, pos);
if (dir.length() > 0) {
res.push_back(std::move(dir));
}
joinedDirs.erase(0, pos + delimiter.length());
}
if (joinedDirs.length() > 0) {
res.push_back(std::move(joinedDirs));
}
return res;
} else {
ARROW_ASSIGN_OR_RAISE(auto arrow_tmp_dir, arrow::internal::TemporaryDir::Make("columnar-shuffle-"));
return std::vector<std::string>{arrow_tmp_dir->path().ToString()};
}
}

arrow::Result<std::string> gluten::createTempShuffleFile(const std::string& dir) {
if (dir.length() == 0) {
return arrow::Status::Invalid("Failed to create spilled file, got empty path.");
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/shuffle/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ std::string generateUuid();

std::string getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId);

arrow::Result<std::vector<std::string>> getConfiguredLocalDirs();

arrow::Result<std::string> createTempShuffleFile(const std::string& dir);

arrow::Result<std::vector<std::shared_ptr<arrow::DataType>>> toShuffleWriterTypeId(
Expand Down
1 change: 1 addition & 0 deletions cpp/core/shuffle/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct ShuffleWriterOptions {

std::string partitioning_name{};
std::string data_file{};
std::string local_dirs{};
arrow::MemoryPool* memory_pool{};

static ShuffleWriterOptions defaults();
Expand Down
10 changes: 8 additions & 2 deletions cpp/core/utils/StringUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include "exception.h"

std::vector<std::string> gluten::splitByDelim(const std::string& s, const char delimiter) {
if (s.empty()) {
return {};
}
std::vector<std::string> result;
size_t start = 0;
size_t end = s.find(delimiter);
Expand All @@ -38,13 +41,16 @@ std::vector<std::string> gluten::splitByDelim(const std::string& s, const char d
return result;
}

std::vector<std::string> gluten::splitPaths(const std::string& s) {
std::vector<std::string> gluten::splitPaths(const std::string& s, bool checkExists) {
if (s.empty()) {
return {};
}
auto splits = splitByDelim(s, ',');
std::vector<std::string> paths;
for (auto i = 0; i < splits.size(); ++i) {
if (!splits[i].empty()) {
std::filesystem::path path(splits[i]);
if (!std::filesystem::exists(path)) {
if (checkExists && !std::filesystem::exists(path)) {
throw gluten::GlutenException("File path not exists: " + splits[i]);
}
if (path.is_relative()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/utils/StringUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ namespace gluten {

std::vector<std::string> splitByDelim(const std::string& s, const char delimiter);

std::vector<std::string> splitPaths(const std::string& s);
std::vector<std::string> splitPaths(const std::string& s, bool checkExists = false);

} // namespace gluten
8 changes: 0 additions & 8 deletions cpp/core/utils/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,6 @@
} \
std::cout << std::endl;

#define THROW_NOT_OK(expr) \
do { \
auto __s = (expr); \
if (!__s.ok()) { \
throw GlutenException(__s.message()); \
} \
} while (false);

#define TIME_TO_STRING(time) (time > 10000 ? time / 1000 : time) << (time > 10000 ? " ms" : " us")

#define TIME_NANO_TO_STRING(time) \
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(VeloxMemoryManager* memo
options.compression_type = arrow::Compression::GZIP;
}

GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(options));

GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxShuffleWriter::create(
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class BenchmarkShuffleSplit {

auto options = ShuffleWriterOptions::defaults();
options.buffer_size = kPartitionBufferSize;
options.buffered_write = true;
options.memory_pool = pool.get();
options.partitioning_name = "rr";
GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(options));

std::shared_ptr<VeloxShuffleWriter> shuffleWriter;
int64_t elapseRead = 0;
Expand Down
26 changes: 26 additions & 0 deletions cpp/velox/benchmarks/common/BenchmarkUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "compute/VeloxBackend.h"
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "shuffle/Utils.h"
#include "utils/StringUtil.h"
#include "velox/dwio/common/Options.h"

using namespace facebook;
Expand Down Expand Up @@ -150,3 +152,27 @@ void setCpu(uint32_t cpuindex) {
exit(EXIT_FAILURE);
}
}

arrow::Status setLocalDirsAndDataFileFromEnv(gluten::ShuffleWriterOptions& options) {
auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str());
if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) {
// Set local dirs.
auto joinedDirs = std::string(joinedDirsC);
options.local_dirs = joinedDirs;
// Split local dirs and use thread id to choose one directory for data file.
auto localDirs = gluten::splitPaths(joinedDirs);
size_t id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % localDirs.size();
ARROW_ASSIGN_OR_RAISE(options.data_file, gluten::createTempShuffleFile(localDirs[id]));
} else {
// Otherwise create 1 temp dir and data file.
static const std::string kBenchmarkDirsPrefix = "columnar-shuffle-benchmark-";
{
// Because tmpDir will be deleted in the dtor, allow it to be deleted upon exiting the block and then recreate it
// in createTempShuffleFile.
ARROW_ASSIGN_OR_RAISE(auto tmpDir, arrow::internal::TemporaryDir::Make(kBenchmarkDirsPrefix))
options.local_dirs = tmpDir->path().ToString();
}
ARROW_ASSIGN_OR_RAISE(options.data_file, gluten::createTempShuffleFile(options.local_dirs));
}
return arrow::Status::OK();
}
5 changes: 4 additions & 1 deletion cpp/velox/benchmarks/common/BenchmarkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "compute/ProtobufUtils.h"
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/options.h"
#include "utils/exception.h"
#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
Expand Down Expand Up @@ -104,4 +105,6 @@ inline std::shared_ptr<gluten::ColumnarBatch> convertBatch(std::shared_ptr<glute
/// Return whether the data ends with suffix.
bool endsWith(const std::string& data, const std::string& suffix);

void setCpu(uint32_t cpuindex);
void setCpu(uint32_t cpuindex);

arrow::Status setLocalDirsAndDataFileFromEnv(gluten::ShuffleWriterOptions& options);
1 change: 1 addition & 0 deletions cpp/velox/memory/VeloxColumnarBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ std::pair<char*, int> VeloxColumnarBatch::getRowBytes(int32_t rowId) const {
auto fast = std::make_unique<facebook::velox::row::UnsafeRowFast>(rowVector_);
auto size = fast->rowSize(rowId);
char* rowBytes = new char[size];
std::memset(rowBytes, 0, size);
fast->serialize(0, rowBytes);
return std::make_pair(rowBytes, size);
}
Expand Down
14 changes: 6 additions & 8 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,12 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

class EvictGuard {
public:
explicit EvictGuard(SplitState& splitState) : splitState_(splitState) {
oldState_ = splitState;
splitState_ = SplitState::kUnevictable;
explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
evictState_ = EvictState::kUnevictable;
}

~EvictGuard() {
splitState_ = oldState_;
evictState_ = EvictState::kEvictable;
}

// For safety and clarity.
Expand All @@ -299,8 +298,7 @@ class EvictGuard {
EvictGuard& operator=(EvictGuard&&) = delete;

private:
SplitState& splitState_;
SplitState oldState_;
EvictState& evictState_;
};

template <facebook::velox::TypeKind kind>
Expand Down Expand Up @@ -1416,11 +1414,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}

arrow::Status VeloxShuffleWriter::evictFixedSize(int64_t size, int64_t * actual) {
if (splitState_ == SplitState::kUnevictable) {
if (evictState_ == EvictState::kUnevictable) {
*actual = 0;
return arrow::Status::OK();
}
EvictGuard{splitState_};
EvictGuard evictGuard{evictState_};

int64_t reclaimed = 0;
if (reclaimed < size && shrinkPartitionBuffersBeforeSpill()) {
Expand Down
5 changes: 4 additions & 1 deletion cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ namespace gluten {

#endif // end of VELOX_SHUFFLE_WRITER_PRINT

enum SplitState { kInit, kPreAlloc, kSplit, kStop, kUnevictable };
enum SplitState { kInit, kPreAlloc, kSplit, kStop };
enum EvictState { kEvictable, kUnevictable };

class VeloxShuffleWriter final : public ShuffleWriter {
enum { kValidityBufferIndex = 0, kLengthBufferIndex = 1, kValueBufferIndex = 2 };
Expand Down Expand Up @@ -310,6 +311,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {

SplitState splitState_{kInit};

EvictState evictState_{kEvictable};

bool supportAvx512_ = false;

// store arrow column types
Expand Down
Loading

0 comments on commit b0f17a5

Please sign in to comment.