Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha committed Sep 15, 2023
1 parent d5f6f6e commit a982a30
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 58 deletions.
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ find_package(Threads REQUIRED)

find_package(JNI REQUIRED)

find_package(glog REQUIRED)

if(BUILD_TESTS)
set(GLUTEN_GTEST_MIN_VERSION "1.13.0")
find_package(GTest ${GLUTEN_GTEST_MIN_VERSION} CONFIG)
Expand All @@ -121,7 +123,6 @@ if(BUILD_TESTS)
endif()
include(GoogleTest)
enable_testing()
find_package(glog REQUIRED)
endif()

function(ADD_TEST_CASE TEST_NAME)
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ else()
endif()

target_link_libraries(gluten
PUBLIC Arrow::arrow Arrow::parquet)
PUBLIC Arrow::arrow Arrow::parquet glog::glog)

install(TARGETS gluten
DESTINATION ${CMAKE_INSTALL_LIBDIR})
Expand Down
8 changes: 2 additions & 6 deletions cpp/core/compute/ExecutionCtx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@ void setExecutionCtxFactory(
ExecutionCtxFactoryWithConf factory,
const std::unordered_map<std::string, std::string>& sparkConfs) {
getExecutionCtxFactoryContext()->set(factory, sparkConfs);
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Set execution context factory with conf." << std::endl;
#endif
DLOG(INFO) << "Set execution context factory with conf.";
}

void setExecutionCtxFactory(ExecutionCtxFactory factory) {
getExecutionCtxFactoryContext()->set(factory);
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Set execution context factory." << std::endl;
#endif
DLOG(INFO) << "Set execution context factory.";
}

ExecutionCtx* createExecutionCtx() {
Expand Down
8 changes: 4 additions & 4 deletions cpp/core/compute/ExecutionCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ class ExecutionCtx : public std::enable_shared_from_this<ExecutionCtx> {
#ifdef GLUTEN_PRINT_DEBUG
try {
auto jsonPlan = substraitFromPbToJson("Plan", data, size);
std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl;
std::cout << "Task stageId: " << taskInfo_.stageId << ", partitionId: " << taskInfo_.partitionId
<< ", taskId: " << taskInfo_.taskId << "; " << jsonPlan << std::endl;
DLOG(INFO) << std::string(50, '#') << " received substrait::Plan:";
DLOG(INFO) << "Task stageId: " << taskInfo_.stageId << ", partitionId: " << taskInfo_.partitionId
<< ", taskId: " << taskInfo_.taskId << "; " << jsonPlan;
} catch (const std::exception& e) {
std::cerr << "Error converting Substrait plan to JSON: " << e.what() << std::endl;
DLOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what();
}
#endif
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed");
Expand Down
19 changes: 8 additions & 11 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <arrow/ipc/writer.h>
#include <arrow/util/parallel.h>
#include <execinfo.h>
#include <glog/logging.h>
#include <jni.h>

#include "compute/ProtobufUtils.h"
Expand Down Expand Up @@ -124,17 +125,13 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co
static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) {
int getEnvStat = vm->GetEnv(reinterpret_cast<void**>(out), jniVersion);
if (getEnvStat == JNI_EDETACHED) {
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "JNIEnv was not attached to current thread." << std::endl;
#endif
DLOG(INFO) << "JNIEnv was not attached to current thread.";
// Reattach current thread to JVM
getEnvStat = vm->AttachCurrentThreadAsDaemon(reinterpret_cast<void**>(out), NULL);
if (getEnvStat != JNI_OK) {
throw gluten::GlutenException("Failed to reattach current thread to JVM.");
}
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Succeeded attaching current thread." << std::endl;
#endif
DLOG(INFO) << "Succeeded attaching current thread.";
return;
}
if (getEnvStat != JNI_OK) {
Expand All @@ -152,7 +149,7 @@ static inline void checkException(JNIEnv* env) {
std::string description =
jStringToCString(env, (jstring)env->CallStaticObjectMethod(describerClass, describeMethod, t));
if (env->ExceptionCheck()) {
std::cerr << "Fatal: Uncaught Java exception during calling the Java exception describer method! " << std::endl;
LOG(WARNING) << "Fatal: Uncaught Java exception during calling the Java exception describer method!";
}
throw gluten::GlutenException("Error during calling Java code from native code: " + description);
}
Expand Down Expand Up @@ -198,8 +195,8 @@ class SparkAllocationListener final : public gluten::AllocationListener {
~SparkAllocationListener() override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
std::cerr << "SparkAllocationListener#~SparkAllocationListener(): "
<< "JNIEnv was not attached to current thread" << std::endl;
LOG(WARNING) << "SparkAllocationListener#~SparkAllocationListener(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(jListenerGlobalRef_);
Expand Down Expand Up @@ -303,8 +300,8 @@ class CelebornClient : public RssClient {
~CelebornClient() {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
std::cerr << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread" << std::endl;
LOG(WARNING) << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(javaCelebornShuffleWriter_);
Expand Down
19 changes: 9 additions & 10 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include <jni.h>
#include <filesystem>

#include <glog/logging.h>
#include "compute/ExecutionCtx.h"
#include "compute/ProtobufUtils.h"
#include "config/GlutenConfig.h"
#include "jni/JniCommon.h"
#include "jni/JniErrors.h"
Expand Down Expand Up @@ -92,15 +90,10 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream {
try {
auto status = JavaInputStreamAdaptor::Close();
if (!status.ok()) {
#ifdef GLUTEN_PRINT_DEBUG
std::cout << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString()
<< std::endl;
#endif
LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString();
}
} catch (std::exception& e) {
#ifdef GLUTEN_PRINT_DEBUG
std::cout << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what() << std::endl;
#endif
LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what() << std::endl;
}
}

Expand Down Expand Up @@ -287,10 +280,16 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
shuffleReaderMetricsSetDecompressTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDecompressTime", "(J)V");

// logging
google::InitGoogleLogging("gluten");
FLAGS_logtostderr = true;

return jniVersion;
}

void JNI_OnUnload(JavaVM* vm, void* reserved) {
google::ShutdownGoogleLogging();

JNIEnv* env;
vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion);
env->DeleteGlobalRef(serializableObjBuilderClass);
Expand Down Expand Up @@ -839,7 +838,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
jobject thread = env->CallStaticObjectMethod(cls, mid);
checkException(env);
if (thread == NULL) {
std::cerr << "Thread.currentThread() return NULL" << std::endl;
LOG(WARNING) << "Thread.currentThread() return NULL";
} else {
jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J");
jlong sid = env->CallLongMethod(thread, midGetid);
Expand Down
6 changes: 2 additions & 4 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,8 @@ arrow::Status PreferCachePartitionWriter::evictPartition(int32_t partitionId /*
RETURN_NOT_OK(flushCachedPayloads(spilledFileOs.get(), shuffleWriter_->partitionCachedRecordbatch()[pid]));
ARROW_ASSIGN_OR_RAISE(auto end, spilledFileOs->Tell());
spillInfo.partitionSpillInfos.push_back({pid, end - start});
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Spilled partition " << pid << " file start: " << start << ", file end: " << end
<< ", cachedPayloadSize: " << cachedPayloadSize << std::endl;
#endif
DLOG(INFO) << "Spilled partition " << pid << " file start: " << start << ", file end: " << end
<< ", cachedPayloadSize: " << cachedPayloadSize;
shuffleWriter_->clearCachedPayloads(pid);
}
}
Expand Down
1 change: 1 addition & 0 deletions cpp/core/shuffle/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <arrow/ipc/writer.h>
#include <arrow/type.h>
#include <arrow/util/io_util.h>
#include <glog/logging.h>

#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
Expand Down
4 changes: 1 addition & 3 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ std::shared_ptr<const facebook::velox::core::PlanNode> VeloxPlanConverter::toVel
}
}
auto veloxPlan = substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan);
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Plan Node: " << std::endl << veloxPlan->toString(true, true) << std::endl;
#endif
DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan->toString(true, true);
return veloxPlan;
}

Expand Down
6 changes: 2 additions & 4 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,8 @@ void WholeStageResultIterator::collectMetrics() {
const auto& nodeId = orderedNodeIds_[idx];
if (planStats.find(nodeId) == planStats.end()) {
if (omittedNodeIds_.find(nodeId) == omittedNodeIds_.end()) {
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Not found node id: " << nodeId << std::endl;
std::cout << "Plan Node: " << std::endl << veloxPlan_->toString(true, true) << std::endl;
#endif
DLOG(INFO) << "Not found node id: " << nodeId;
DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan_->toString(true, true);
throw std::runtime_error("Node id cannot be found in plan status.");
}
// Special handing for Filter over Project case. Filter metrics are
Expand Down
6 changes: 1 addition & 5 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,11 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
return JNI_ERR;
}

// logging
google::InitGoogleLogging("gluten");
FLAGS_logtostderr = true;
gluten::getJniErrorsState()->initialize(env);
gluten::initVeloxJniFileSystem(env);
gluten::initVeloxJniUDF(env);
#ifdef GLUTEN_PRINT_DEBUG
std::cout << "Loaded Velox backend." << std::endl;
LOG(INFO) << "Loaded Velox backend." << std::endl;
#endif
return jniVersion;
}
Expand All @@ -68,7 +65,6 @@ void JNI_OnUnload(JavaVM* vm, void*) {
vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion);
gluten::finalizeVeloxJniFileSystem(env);
gluten::finalizeVeloxJniUDF(env);
google::ShutdownGoogleLogging();
}

JNIEXPORT void JNICALL Java_io_glutenproject_init_BackendJniWrapper_initializeBackend( // NOLINT
Expand Down
13 changes: 6 additions & 7 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const velox::RowVec
binaryBuf.valueCapacity = capacity;
dstValuePtr = binaryBuf.valuePtr + valueOffset - stringLen;

std::cout << "Split value buffer resized colIdx" << binaryIdx << std::endl;
LOG(INFO) << "Split value buffer resized colIdx" << binaryIdx;
VsPrintSplit(" dst_start", dstOffsetBase[x]);
VsPrintSplit(" dst_end", dstOffsetBase[x + 1]);
VsPrintSplit(" old size", oldCapacity);
Expand Down Expand Up @@ -1262,24 +1262,23 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const velox::RowVec
while (status.IsOutOfMemory() && retry < 3) {
// retry allocate
++retry;
std::cout << status.ToString() << std::endl
LOG(INFO) << status.ToString() << std::endl
<< std::to_string(retry) << " retry to allocate new buffer for partition "
<< std::to_string(partitionId) << std::endl;
<< std::to_string(partitionId);

int64_t evictedSize = 0;
RETURN_NOT_OK(evictPartitionsOnDemand(&evictedSize));
if (evictedSize <= 0) {
std::cout << "Failed to allocate new buffer for partition " << std::to_string(partitionId)
<< ". No partition buffer to evict." << std::endl;
LOG(INFO) << "Failed to allocate new buffer for partition " << std::to_string(partitionId)
<< ". No partition buffer to evict.";
return status;
}

status = allocatePartitionBuffers(partitionId, newSize);
}

if (status.IsOutOfMemory()) {
std::cout << "Failed to allocate new buffer for partition " << std::to_string(partitionId) << ". Out of memory."
<< std::endl;
LOG(INFO) << "Failed to allocate new buffer for partition " << std::to_string(partitionId) << ". Out of memory.";
}

return status;
Expand Down
3 changes: 1 addition & 2 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ bool validateColNames(const ::substrait::NamedStruct& schema) {
for (auto i = 0; i < name.size(); i++) {
auto c = name[i];
if (!token.isUnquotedPathCharacter(c)) {
std::cout << "native validation failed due to: Illegal column charactor " << c << "in column " << name
<< std::endl;
LOG(WARNING) << "native validation failed due to: Illegal column charactor " << c << "in column " << name;
return false;
}
}
Expand Down

0 comments on commit a982a30

Please sign in to comment.