Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
Yohahaha committed Dec 26, 2023
1 parent 42e6990 commit 8f4d6ed
Show file tree
Hide file tree
Showing 27 changed files with 80 additions and 181 deletions.
1 change: 0 additions & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/rss/CelebornPartitionWriter.cc
shuffle/Utils.cc
utils/Compression.cc
utils/DebugOut.cc
utils/StringUtil.cc
utils/ObjectStore.cc
jni/JniError.cc
Expand Down
23 changes: 12 additions & 11 deletions cpp/core/benchmarks/CompressionBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arrow/type_fwd.h>
#include <benchmark/benchmark.h>
#include <execinfo.h>
#include <glog/logging.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <sched.h>
Expand Down Expand Up @@ -126,7 +127,7 @@ class BenchmarkCompression {
}
case gluten::kQatZstd: {
ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, CodecBackend::QAT);
std::cout << "load qatzstd" << std::endl;
LOG(INFO) << "load qatzstd";
break;
}
#endif
Expand Down Expand Up @@ -163,7 +164,7 @@ class BenchmarkCompression {
state);
auto endTime = std::chrono::steady_clock::now();
auto totalTime = (endTime - startTime).count();
std::cout << "Thread " << state.thread_index() << " took " << (1.0 * totalTime / 1e9) << "s" << std::endl;
LOG(INFO) << "Thread " << state.thread_index() << " took " << (1.0 * totalTime / 1e9) << "s";

state.counters["rowgroups"] =
benchmark::Counter(rowGroupIndices_.size(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
Expand Down Expand Up @@ -301,8 +302,8 @@ class BenchmarkCompressionCacheScanBenchmark final : public BenchmarkCompression
}
} while (recordBatch);

std::cout << "parquet parse done elapsed time " << elapseRead / 1e6 << " ms " << std::endl;
std::cout << "batches = " << numBatches << " rows = " << numRows << std::endl;
LOG(INFO) << "parquet parse done elapsed time " << elapseRead / 1e6 << " ms ";
LOG(INFO) << "batches = " << numBatches << " rows = " << numRows;

std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> payloads(batches.size());
std::vector<std::vector<int64_t>> uncompressedBufferSize(batches.size());
Expand Down Expand Up @@ -418,26 +419,26 @@ int main(int argc, char** argv) {
} else if (strcmp(argv[i], "--file") == 0) {
datafile = argv[i + 1];
} else if (strcmp(argv[i], "--qat-gzip") == 0) {
std::cout << "QAT gzip is used as codec" << std::endl;
LOG(INFO) << "QAT gzip is used as codec";
codec = gluten::kQatGzip;
} else if (strcmp(argv[i], "--qat-zstd") == 0) {
std::cout << "QAT zstd is used as codec" << std::endl;
LOG(INFO) << "QAT zstd is used as codec";
codec = gluten::kQatZstd;
} else if (strcmp(argv[i], "--qpl-gzip") == 0) {
std::cout << "QPL gzip is used as codec" << std::endl;
LOG(INFO) << "QPL gzip is used as codec";
codec = gluten::kQplGzip;
} else if (strcmp(argv[i], "--zstd") == 0) {
std::cout << "CPU zstd is used as codec" << std::endl;
LOG(INFO) << "CPU zstd is used as codec";
codec = gluten::kZstd;
} else if (strcmp(argv[i], "--buffer-size") == 0) {
compressBufferSize = atol(argv[i + 1]);
} else if (strcmp(argv[i], "--cpu-offset") == 0) {
cpuOffset = atol(argv[i + 1]);
}
}
std::cout << "iterations = " << iterations << std::endl;
std::cout << "threads = " << threads << std::endl;
std::cout << "datafile = " << datafile << std::endl;
LOG(INFO) << "iterations = " << iterations;
LOG(INFO) << "threads = " << threads;
LOG(INFO) << "datafile = " << datafile;

gluten::BenchmarkCompressionIterateScanBenchmark bmIterateScan(datafile, compressBufferSize);
gluten::BenchmarkCompressionCacheScanBenchmark bmCacheScan(datafile, compressBufferSize);
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <glog/logging.h>

#include "compute/ProtobufUtils.h"
#include "compute/ResultIterator.h"
#include "memory/ArrowMemoryPool.h"
Expand All @@ -29,7 +31,6 @@
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
#include "utils/DebugOut.h"
#include "utils/ObjectStore.h"

namespace gluten {
Expand Down
15 changes: 6 additions & 9 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "memory/AllocationListener.h"
#include "shuffle/rss/RssClient.h"
#include "utils/Compression.h"
#include "utils/DebugOut.h"
#include "utils/exception.h"

static jint jniVersion = JNI_VERSION_1_8;
Expand All @@ -52,7 +51,7 @@ static inline void checkException(JNIEnv* env) {
std::string description =
jStringToCString(env, (jstring)env->CallStaticObjectMethod(describerClass, describeMethod, t));
if (env->ExceptionCheck()) {
std::cerr << "Fatal: Uncaught Java exception during calling the Java exception describer method! " << std::endl;
LOG(WARNING) << "Fatal: Uncaught Java exception during calling the Java exception describer method! ";
}
throw gluten::GlutenException("Error during calling Java code from native code: " + description);
}
Expand Down Expand Up @@ -106,13 +105,11 @@ static inline jmethodID getStaticMethodIdOrError(JNIEnv* env, jclass thisClass,
static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) {
int getEnvStat = vm->GetEnv(reinterpret_cast<void**>(out), jniVersion);
if (getEnvStat == JNI_EDETACHED) {
DEBUG_OUT << "JNIEnv was not attached to current thread." << std::endl;
// Reattach current thread to JVM
getEnvStat = vm->AttachCurrentThreadAsDaemon(reinterpret_cast<void**>(out), NULL);
if (getEnvStat != JNI_OK) {
throw gluten::GlutenException("Failed to reattach current thread to JVM.");
}
DEBUG_OUT << "Succeeded attaching current thread." << std::endl;
return;
}
if (getEnvStat != JNI_OK) {
Expand Down Expand Up @@ -161,7 +158,7 @@ static inline void backtrace() {
auto size = backtrace(array, 1024);
char** strings = backtrace_symbols(array, size);
for (size_t i = 0; i < size; ++i) {
std::cout << strings[i] << std::endl;
LOG(INFO) << strings[i];
}
free(strings);
}
Expand Down Expand Up @@ -229,8 +226,8 @@ class SparkAllocationListener final : public gluten::AllocationListener {
~SparkAllocationListener() override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
std::cerr << "SparkAllocationListener#~SparkAllocationListener(): "
<< "JNIEnv was not attached to current thread" << std::endl;
LOG(WARNING) << "SparkAllocationListener#~SparkAllocationListener(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(jListenerGlobalRef_);
Expand Down Expand Up @@ -329,8 +326,8 @@ class CelebornClient : public RssClient {
~CelebornClient() {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
std::cerr << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread" << std::endl;
LOG(WARNING) << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(javaCelebornShuffleWriter_);
Expand Down
8 changes: 3 additions & 5 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <jni.h>
#include <filesystem>

#include <glog/logging.h>
#include "compute/ProtobufUtils.h"
#include "compute/Runtime.h"
#include "config/GlutenConfig.h"
Expand Down Expand Up @@ -92,11 +91,10 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream {
try {
auto status = JavaInputStreamAdaptor::Close();
if (!status.ok()) {
DEBUG_OUT << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString()
<< std::endl;
LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString();
}
} catch (std::exception& e) {
DEBUG_OUT << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what() << std::endl;
LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what();
}
}

Expand Down Expand Up @@ -820,7 +818,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
jobject thread = env->CallStaticObjectMethod(cls, mid);
checkException(env);
if (thread == NULL) {
std::cerr << "Thread.currentThread() return NULL" << std::endl;
LOG(WARNING) << "Thread.currentThread() return NULL";
} else {
jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J");
jlong sid = env->CallLongMethod(thread, midGetid);
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <random>
#include <thread>
#include "shuffle/Utils.h"
#include "utils/DebugOut.h"
#include "utils/StringUtil.h"
#include "utils/Timer.h"

Expand Down Expand Up @@ -129,7 +128,6 @@ class FlushOnSpillEvictHandle final : public LocalPartitionWriter::LocalEvictHan
ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, os_.get(), &metadataLength));
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
DEBUG_OUT << "Spilled partition " << partitionId << " file start: " << start << ", file end: " << end << std::endl;
spillInfo_->partitionSpillInfos.push_back({partitionId, end - start});
return arrow::Status::OK();
}
Expand Down
10 changes: 6 additions & 4 deletions cpp/core/tests/RoundRobinPartitionerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "shuffle/RoundRobinPartitioner.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <numeric>

Expand Down Expand Up @@ -46,10 +47,11 @@ class RoundRobinPartitionerTest : public ::testing::Test {

template <typename T>
void toString(const std::vector<T>& vec, const std::string& name) const {
std::cout << name << " = [";
std::copy(vec.cbegin(), vec.cend(), std::ostream_iterator<T>(std::cout, ","));
std::cout << " ]";
std::cout << std::endl;
std::stringstream ss;
ss << name << " = [";
std::copy(vec.cbegin(), vec.cend(), std::ostream_iterator<T>(ss, ","));
ss << " ]";
LOG(INFO) << ss.str();
}

int32_t getPidSelection() const {
Expand Down
28 changes: 0 additions & 28 deletions cpp/core/utils/DebugOut.cc

This file was deleted.

47 changes: 0 additions & 47 deletions cpp/core/utils/DebugOut.h

This file was deleted.

3 changes: 2 additions & 1 deletion cpp/core/utils/ObjectStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include "ObjectStore.h"
#include <glog/logging.h>
#include <iostream>

gluten::ObjectStore::~ObjectStore() {
Expand All @@ -24,7 +25,7 @@ gluten::ObjectStore::~ObjectStore() {
for (auto itr = aliveObjectHandles_.rbegin(); itr != aliveObjectHandles_.rend(); itr++) {
ResourceHandle handle = *itr;
if (store_.lookup(handle) == nullptr) {
std::cerr << "Fatal: resource handle " + std::to_string(handle) + " not found in store." << std::endl;
LOG(WARNING) << "Fatal: resource handle " + std::to_string(handle) + " not found in store.";
}
store_.erase(handle);
}
Expand Down
1 change: 0 additions & 1 deletion cpp/core/utils/Print.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <iostream>
#include <string>
#include <vector>
#include "DebugOut.h"

namespace gluten {

Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <time.h>

#include <arrow/status.h>
#include <glog/logging.h>
#include <chrono>

#include "utils/exception.h"
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/utils/qpl/qpl_job_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ QplJobHWPool& QplJobHWPool::GetInstance() {
QplJobHWPool::QplJobHWPool() : randomEngine(std::random_device()()), distribution(0, MAX_JOB_NUMBER - 1) {
uint64_t initTime = 0;
TIME_NANO(initTime, InitJobPool());
DEBUG_OUT << "Init job pool took " << 1.0 * initTime / 1e6 << "ms" << std::endl;
LOG(INFO) << "Init job pool took " << 1.0 * initTime / 1e6 << "ms";
}

QplJobHWPool::~QplJobHWPool() {
Expand Down Expand Up @@ -96,7 +96,7 @@ qpl_job* QplJobHWPool::AcquireJob(uint32_t& jobId) {
}
}
jobId = MAX_JOB_NUMBER - index;
DEBUG_OUT << "Acquired job index " << index << " after " << retry << " retries." << std::endl;
LOG(INFO) << "Acquired job index " << index << " after " << retry << " retries.";
return jobPool[index];
}

Expand Down
12 changes: 6 additions & 6 deletions cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public GoogleBenchmarkCol
localSchema = std::make_shared<arrow::Schema>(*schema_.get());

if (state.thread_index() == 0)
std::cout << localSchema->ToString() << std::endl;
LOG(INFO) << localSchema->ToString();

std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
std::shared_ptr<RecordBatchReader> recordBatchReader;
Expand All @@ -150,7 +150,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public GoogleBenchmarkCol
}
} while (recordBatch);

std::cout << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows << std::endl;
LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows;

// reuse the columnarToRowConverter for batches caused system % increase a lot
auto ctxPool = defaultLeafVeloxMemoryPool();
Expand Down Expand Up @@ -263,10 +263,10 @@ int main(int argc, char** argv) {
cpu = atol(argv[i + 1]);
}
}
std::cout << "iterations = " << iterations << std::endl;
std::cout << "threads = " << threads << std::endl;
std::cout << "datafile = " << datafile << std::endl;
std::cout << "cpu = " << cpu << std::endl;
LOG(INFO) << "iterations = " << iterations;
LOG(INFO) << "threads = " << threads;
LOG(INFO) << "datafile = " << datafile;
LOG(INFO) << "cpu = " << cpu;

gluten::GoogleBenchmarkColumnarToRowCacheScanBenchmark bck(datafile);

Expand Down
Loading

0 comments on commit 8f4d6ed

Please sign in to comment.