From a982a30dce169dc6df5964156602b4fb7d7b25a9 Mon Sep 17 00:00:00 2001 From: yangchuan Date: Fri, 15 Sep 2023 17:55:18 +0800 Subject: [PATCH] fix --- cpp/CMakeLists.txt | 3 ++- cpp/core/CMakeLists.txt | 2 +- cpp/core/compute/ExecutionCtx.cc | 8 ++------ cpp/core/compute/ExecutionCtx.h | 8 ++++---- cpp/core/jni/JniCommon.h | 19 ++++++++----------- cpp/core/jni/JniWrapper.cc | 19 +++++++++---------- cpp/core/shuffle/LocalPartitionWriter.cc | 6 ++---- cpp/core/shuffle/utils.h | 1 + cpp/velox/compute/VeloxPlanConverter.cc | 4 +--- cpp/velox/compute/WholeStageResultIterator.cc | 6 ++---- cpp/velox/jni/VeloxJniWrapper.cc | 6 +----- cpp/velox/shuffle/VeloxShuffleWriter.cc | 13 ++++++------- .../SubstraitToVeloxPlanValidator.cc | 3 +-- 13 files changed, 40 insertions(+), 58 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7f588726a883..7357d533cc48 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -113,6 +113,8 @@ find_package(Threads REQUIRED) find_package(JNI REQUIRED) +find_package(glog REQUIRED) + if(BUILD_TESTS) set(GLUTEN_GTEST_MIN_VERSION "1.13.0") find_package(GTest ${GLUTEN_GTEST_MIN_VERSION} CONFIG) @@ -121,7 +123,6 @@ if(BUILD_TESTS) endif() include(GoogleTest) enable_testing() - find_package(glog REQUIRED) endif() function(ADD_TEST_CASE TEST_NAME) diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 1058aebd48ff..8cc496a9379c 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -318,7 +318,7 @@ else() endif() target_link_libraries(gluten - PUBLIC Arrow::arrow Arrow::parquet) + PUBLIC Arrow::arrow Arrow::parquet glog::glog) install(TARGETS gluten DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/cpp/core/compute/ExecutionCtx.cc b/cpp/core/compute/ExecutionCtx.cc index b6add7b6fea7..20b26b85c21f 100644 --- a/cpp/core/compute/ExecutionCtx.cc +++ b/cpp/core/compute/ExecutionCtx.cc @@ -28,16 +28,12 @@ void setExecutionCtxFactory( ExecutionCtxFactoryWithConf factory, const std::unordered_map& sparkConfs) { getExecutionCtxFactoryContext()->set(factory, sparkConfs); -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "Set execution context factory with conf." << std::endl; -#endif + DLOG(INFO) << "Set execution context factory with conf."; } void setExecutionCtxFactory(ExecutionCtxFactory factory) { getExecutionCtxFactoryContext()->set(factory); -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "Set execution context factory." << std::endl; -#endif + DLOG(INFO) << "Set execution context factory."; } ExecutionCtx* createExecutionCtx() { diff --git a/cpp/core/compute/ExecutionCtx.h b/cpp/core/compute/ExecutionCtx.h index 9aee84e48cc8..a8a79a81455a 100644 --- a/cpp/core/compute/ExecutionCtx.h +++ b/cpp/core/compute/ExecutionCtx.h @@ -72,11 +72,11 @@ class ExecutionCtx : public std::enable_shared_from_this { #ifdef GLUTEN_PRINT_DEBUG try { auto jsonPlan = substraitFromPbToJson("Plan", data, size); - std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; - std::cout << "Task stageId: " << taskInfo_.stageId << ", partitionId: " << taskInfo_.partitionId - << ", taskId: " << taskInfo_.taskId << "; " << jsonPlan << std::endl; + DLOG(INFO) << std::string(50, '#') << " received substrait::Plan:"; + DLOG(INFO) << "Task stageId: " << taskInfo_.stageId << ", partitionId: " << taskInfo_.partitionId + << ", taskId: " << taskInfo_.taskId << "; " << jsonPlan; } catch (const std::exception& e) { - std::cerr << "Error converting Substrait plan to JSON: " << e.what() << std::endl; + DLOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what(); } #endif GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 1b85ae473565..e11b40d3d8ab 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "compute/ProtobufUtils.h" @@ -124,17 +125,13 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) { int getEnvStat = vm->GetEnv(reinterpret_cast(out), jniVersion); if (getEnvStat == JNI_EDETACHED) { -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "JNIEnv was not attached to current thread." << std::endl; -#endif + DLOG(INFO) << "JNIEnv was not attached to current thread."; // Reattach current thread to JVM getEnvStat = vm->AttachCurrentThreadAsDaemon(reinterpret_cast(out), NULL); if (getEnvStat != JNI_OK) { throw gluten::GlutenException("Failed to reattach current thread to JVM."); } -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "Succeeded attaching current thread." << std::endl; -#endif + DLOG(INFO) << "Succeeded attaching current thread."; return; } if (getEnvStat != JNI_OK) { @@ -152,7 +149,7 @@ static inline void checkException(JNIEnv* env) { std::string description = jStringToCString(env, (jstring)env->CallStaticObjectMethod(describerClass, describeMethod, t)); if (env->ExceptionCheck()) { - std::cerr << "Fatal: Uncaught Java exception during calling the Java exception describer method! " << std::endl; + LOG(WARNING) << "Fatal: Uncaught Java exception during calling the Java exception describer method!"; } throw gluten::GlutenException("Error during calling Java code from native code: " + description); } @@ -198,8 +195,8 @@ class SparkAllocationListener final : public gluten::AllocationListener { ~SparkAllocationListener() override { JNIEnv* env; if (vm_->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { - std::cerr << "SparkAllocationListener#~SparkAllocationListener(): " - << "JNIEnv was not attached to current thread" << std::endl; + LOG(WARNING) << "SparkAllocationListener#~SparkAllocationListener(): " + << "JNIEnv was not attached to current thread"; return; } env->DeleteGlobalRef(jListenerGlobalRef_); @@ -303,8 +300,8 @@ class CelebornClient : public RssClient { ~CelebornClient() { JNIEnv* env; if (vm_->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { - std::cerr << "CelebornClient#~CelebornClient(): " - << "JNIEnv was not attached to current thread" << std::endl; + LOG(WARNING) << "CelebornClient#~CelebornClient(): " + << "JNIEnv was not attached to current thread"; return; } env->DeleteGlobalRef(javaCelebornShuffleWriter_); diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 464e590a8159..1edb0a7c6dc8 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -18,9 +18,7 @@ #include #include -#include #include "compute/ExecutionCtx.h" -#include "compute/ProtobufUtils.h" #include "config/GlutenConfig.h" #include "jni/JniCommon.h" #include "jni/JniErrors.h" @@ -92,15 +90,10 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream { try { auto status = JavaInputStreamAdaptor::Close(); if (!status.ok()) { -#ifdef GLUTEN_PRINT_DEBUG - std::cout << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString() - << std::endl; -#endif + LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString(); } } catch (std::exception& e) { -#ifdef GLUTEN_PRINT_DEBUG - std::cout << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what() << std::endl; -#endif + LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what() << std::endl; } } @@ -287,10 +280,16 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { shuffleReaderMetricsSetDecompressTime = getMethodIdOrError(env, shuffleReaderMetricsClass, "setDecompressTime", "(J)V"); + // logging + google::InitGoogleLogging("gluten"); + FLAGS_logtostderr = true; + return jniVersion; } void JNI_OnUnload(JavaVM* vm, void* reserved) { + google::ShutdownGoogleLogging(); + JNIEnv* env; vm->GetEnv(reinterpret_cast(&env), jniVersion); env->DeleteGlobalRef(serializableObjBuilderClass); @@ -839,7 +838,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper jobject thread = env->CallStaticObjectMethod(cls, mid); checkException(env); if (thread == NULL) { - std::cerr << "Thread.currentThread() return NULL" << std::endl; + LOG(WARNING) << "Thread.currentThread() return NULL"; } else { jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J"); jlong sid = env->CallLongMethod(thread, midGetid); diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 2566f0840104..d1ded11208e4 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -253,10 +253,8 @@ arrow::Status PreferCachePartitionWriter::evictPartition(int32_t partitionId /* RETURN_NOT_OK(flushCachedPayloads(spilledFileOs.get(), shuffleWriter_->partitionCachedRecordbatch()[pid])); ARROW_ASSIGN_OR_RAISE(auto end, spilledFileOs->Tell()); spillInfo.partitionSpillInfos.push_back({pid, end - start}); -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "Spilled partition " << pid << " file start: " << start << ", file end: " << end - << ", cachedPayloadSize: " << cachedPayloadSize << std::endl; -#endif + DLOG(INFO) << "Spilled partition " << pid << " file start: " << start << ", file end: " << end + << ", cachedPayloadSize: " << cachedPayloadSize; shuffleWriter_->clearCachedPayloads(pid); } } diff --git a/cpp/core/shuffle/utils.h b/cpp/core/shuffle/utils.h index 763b100d08f9..8732b66393bf 100644 --- a/cpp/core/shuffle/utils.h +++ b/cpp/core/shuffle/utils.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index f62605064bd0..8ceb9b155ccb 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -186,9 +186,7 @@ std::shared_ptr VeloxPlanConverter::toVel } } auto veloxPlan = substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan); -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "Plan Node: " << std::endl << veloxPlan->toString(true, true) << std::endl; -#endif + DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan->toString(true, true); return veloxPlan; } diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b04b5a4440dc..2d4d736ab5b3 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -218,10 +218,8 @@ void WholeStageResultIterator::collectMetrics() { const auto& nodeId = orderedNodeIds_[idx]; if (planStats.find(nodeId) == planStats.end()) { if (omittedNodeIds_.find(nodeId) == omittedNodeIds_.end()) { -#ifdef GLUTEN_PRINT_DEBUG - std::cout << "Not found node id: " << nodeId << std::endl; - std::cout << "Plan Node: " << std::endl << veloxPlan_->toString(true, true) << std::endl; -#endif + DLOG(INFO) << "Not found node id: " << nodeId; + DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan_->toString(true, true); throw std::runtime_error("Node id cannot be found in plan status."); } // Special handing for Filter over Project case. Filter metrics are diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 38bebc2a48f6..864523308a76 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -51,14 +51,11 @@ jint JNI_OnLoad(JavaVM* vm, void*) { return JNI_ERR; } - // logging - google::InitGoogleLogging("gluten"); - FLAGS_logtostderr = true; gluten::getJniErrorsState()->initialize(env); gluten::initVeloxJniFileSystem(env); gluten::initVeloxJniUDF(env); #ifdef GLUTEN_PRINT_DEBUG - std::cout << "Loaded Velox backend." << std::endl; + LOG(INFO) << "Loaded Velox backend." << std::endl; #endif return jniVersion; } @@ -68,7 +65,6 @@ void JNI_OnUnload(JavaVM* vm, void*) { vm->GetEnv(reinterpret_cast(&env), jniVersion); gluten::finalizeVeloxJniFileSystem(env); gluten::finalizeVeloxJniUDF(env); - google::ShutdownGoogleLogging(); } JNIEXPORT void JNICALL Java_io_glutenproject_init_BackendJniWrapper_initializeBackend( // NOLINT diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index 7bb63e0c1461..89863df2748f 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -997,7 +997,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const velox::RowVec binaryBuf.valueCapacity = capacity; dstValuePtr = binaryBuf.valuePtr + valueOffset - stringLen; - std::cout << "Split value buffer resized colIdx" << binaryIdx << std::endl; + LOG(INFO) << "Split value buffer resized colIdx" << binaryIdx; VsPrintSplit(" dst_start", dstOffsetBase[x]); VsPrintSplit(" dst_end", dstOffsetBase[x + 1]); VsPrintSplit(" old size", oldCapacity); @@ -1262,15 +1262,15 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const velox::RowVec while (status.IsOutOfMemory() && retry < 3) { // retry allocate ++retry; - std::cout << status.ToString() << std::endl + LOG(INFO) << status.ToString() << std::endl << std::to_string(retry) << " retry to allocate new buffer for partition " - << std::to_string(partitionId) << std::endl; + << std::to_string(partitionId); int64_t evictedSize = 0; RETURN_NOT_OK(evictPartitionsOnDemand(&evictedSize)); if (evictedSize <= 0) { - std::cout << "Failed to allocate new buffer for partition " << std::to_string(partitionId) - << ". No partition buffer to evict." << std::endl; + LOG(INFO) << "Failed to allocate new buffer for partition " << std::to_string(partitionId) + << ". No partition buffer to evict."; return status; } @@ -1278,8 +1278,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const velox::RowVec } if (status.IsOutOfMemory()) { - std::cout << "Failed to allocate new buffer for partition " << std::to_string(partitionId) << ". Out of memory." - << std::endl; + LOG(INFO) << "Failed to allocate new buffer for partition " << std::to_string(partitionId) << ". Out of memory."; } return status; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index dc797a66ac24..64196dcea03f 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -60,8 +60,7 @@ bool validateColNames(const ::substrait::NamedStruct& schema) { for (auto i = 0; i < name.size(); i++) { auto c = name[i]; if (!token.isUnquotedPathCharacter(c)) { - std::cout << "native validation failed due to: Illegal column charactor " << c << "in column " << name - << std::endl; + LOG(WARNING) << "native validation failed due to: Illegal column charactor " << c << "in column " << name; return false; } }