Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha committed Nov 9, 2023
1 parent 2bb522f commit fa64634
Show file tree
Hide file tree
Showing 18 changed files with 76 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ else()
endif()

target_link_libraries(gluten
PUBLIC Arrow::arrow Arrow::parquet)
PUBLIC Arrow::arrow Arrow::parquet glog::glog)

install(TARGETS gluten
DESTINATION ${CMAKE_INSTALL_LIBDIR})
Expand Down
23 changes: 12 additions & 11 deletions cpp/core/benchmarks/CompressionBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arrow/type_fwd.h>
#include <benchmark/benchmark.h>
#include <execinfo.h>
#include <glog/logging.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <sched.h>
Expand Down Expand Up @@ -126,7 +127,7 @@ class BenchmarkCompression {
}
case gluten::kQatZstd: {
ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, CodecBackend::QAT);
std::cout << "load qatzstd" << std::endl;
LOG(INFO) << "load qatzstd";
break;
}
#endif
Expand Down Expand Up @@ -163,7 +164,7 @@ class BenchmarkCompression {
state);
auto endTime = std::chrono::steady_clock::now();
auto totalTime = (endTime - startTime).count();
std::cout << "Thread " << state.thread_index() << " took " << (1.0 * totalTime / 1e9) << "s" << std::endl;
LOG(INFO) << "Thread " << state.thread_index() << " took " << (1.0 * totalTime / 1e9) << "s";

state.counters["rowgroups"] =
benchmark::Counter(rowGroupIndices_.size(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
Expand Down Expand Up @@ -301,8 +302,8 @@ class BenchmarkCompressionCacheScanBenchmark final : public BenchmarkCompression
}
} while (recordBatch);

std::cout << "parquet parse done elapsed time " << elapseRead / 1e6 << " ms " << std::endl;
std::cout << "batches = " << numBatches << " rows = " << numRows << std::endl;
LOG(INFO) << "parquet parse done elapsed time " << elapseRead / 1e6 << " ms ";
LOG(INFO) << "batches = " << numBatches << " rows = " << numRows;

std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> payloads(batches.size());
std::vector<std::vector<int64_t>> uncompressedBufferSize(batches.size());
Expand Down Expand Up @@ -418,26 +419,26 @@ int main(int argc, char** argv) {
} else if (strcmp(argv[i], "--file") == 0) {
datafile = argv[i + 1];
} else if (strcmp(argv[i], "--qat-gzip") == 0) {
std::cout << "QAT gzip is used as codec" << std::endl;
LOG(INFO) << "QAT gzip is used as codec";
codec = gluten::kQatGzip;
} else if (strcmp(argv[i], "--qat-zstd") == 0) {
std::cout << "QAT zstd is used as codec" << std::endl;
LOG(INFO) << "QAT zstd is used as codec";
codec = gluten::kQatZstd;
} else if (strcmp(argv[i], "--qpl-gzip") == 0) {
std::cout << "QPL gzip is used as codec" << std::endl;
LOG(INFO) << "QPL gzip is used as codec";
codec = gluten::kQplGzip;
} else if (strcmp(argv[i], "--zstd") == 0) {
std::cout << "CPU zstd is used as codec" << std::endl;
LOG(INFO) << "CPU zstd is used as codec";
codec = gluten::kZstd;
} else if (strcmp(argv[i], "--buffer-size") == 0) {
compressBufferSize = atol(argv[i + 1]);
} else if (strcmp(argv[i], "--cpu-offset") == 0) {
cpuOffset = atol(argv[i + 1]);
}
}
std::cout << "iterations = " << iterations << std::endl;
std::cout << "threads = " << threads << std::endl;
std::cout << "datafile = " << datafile << std::endl;
LOG(INFO) << "iterations = " << iterations;
LOG(INFO) << "threads = " << threads;
LOG(INFO) << "datafile = " << datafile;

gluten::BenchmarkCompressionIterateScanBenchmark bmIterateScan(datafile, compressBufferSize);
gluten::BenchmarkCompressionCacheScanBenchmark bmCacheScan(datafile, compressBufferSize);
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <glog/logging.h>

#include "compute/ProtobufUtils.h"
#include "compute/ResultIterator.h"
#include "memory/ArrowMemoryPool.h"
Expand Down
12 changes: 6 additions & 6 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static inline void checkException(JNIEnv* env) {
std::string description =
jStringToCString(env, (jstring)env->CallStaticObjectMethod(describerClass, describeMethod, t));
if (env->ExceptionCheck()) {
std::cerr << "Fatal: Uncaught Java exception during calling the Java exception describer method! " << std::endl;
LOG(WARNING) << "Fatal: Uncaught Java exception during calling the Java exception describer method! ";
}
throw gluten::GlutenException("Error during calling Java code from native code: " + description);
}
Expand Down Expand Up @@ -161,7 +161,7 @@ static inline void backtrace() {
auto size = backtrace(array, 1024);
char** strings = backtrace_symbols(array, size);
for (size_t i = 0; i < size; ++i) {
std::cout << strings[i] << std::endl;
LOG(INFO) << strings[i];
}
free(strings);
}
Expand Down Expand Up @@ -232,8 +232,8 @@ class SparkAllocationListener final : public gluten::AllocationListener {
~SparkAllocationListener() override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
std::cerr << "SparkAllocationListener#~SparkAllocationListener(): "
<< "JNIEnv was not attached to current thread" << std::endl;
LOG(WARNING) << "SparkAllocationListener#~SparkAllocationListener(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(jListenerGlobalRef_);
Expand Down Expand Up @@ -332,8 +332,8 @@ class CelebornClient : public RssClient {
~CelebornClient() {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
std::cerr << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread" << std::endl;
LOG(WARNING) << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(javaCelebornShuffleWriter_);
Expand Down
3 changes: 1 addition & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <jni.h>
#include <filesystem>

#include <glog/logging.h>
#include "compute/ProtobufUtils.h"
#include "compute/Runtime.h"
#include "config/GlutenConfig.h"
Expand Down Expand Up @@ -798,7 +797,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
jobject thread = env->CallStaticObjectMethod(cls, mid);
checkException(env);
if (thread == NULL) {
std::cerr << "Thread.currentThread() return NULL" << std::endl;
LOG(WARNING) << "Thread.currentThread() return NULL";
} else {
jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J");
jlong sid = env->CallLongMethod(thread, midGetid);
Expand Down
10 changes: 6 additions & 4 deletions cpp/core/tests/RoundRobinPartitionerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "shuffle/RoundRobinPartitioner.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <numeric>

Expand Down Expand Up @@ -46,10 +47,11 @@ class RoundRobinPartitionerTest : public ::testing::Test {

template <typename T>
void toString(const std::vector<T>& vec, const std::string& name) const {
std::cout << name << " = [";
std::copy(vec.cbegin(), vec.cend(), std::ostream_iterator<T>(std::cout, ","));
std::cout << " ]";
std::cout << std::endl;
std::stringstream ss;
ss << name << " = [";
std::copy(vec.cbegin(), vec.cend(), std::ostream_iterator<T>(ss, ","));
ss << " ]";
LOG(INFO) << ss.str();
}

int32_t getPidSelection() const {
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/utils/ObjectStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include "ObjectStore.h"
#include <glog/logging.h>
#include <iostream>

gluten::ObjectStore::~ObjectStore() {
Expand All @@ -24,7 +25,7 @@ gluten::ObjectStore::~ObjectStore() {
for (auto itr = aliveObjectHandles_.rbegin(); itr != aliveObjectHandles_.rend(); itr++) {
ResourceHandle handle = *itr;
if (store_.lookup(handle) == nullptr) {
std::cerr << "Fatal: resource handle " + std::to_string(handle) + " not found in store." << std::endl;
LOG(WARNING) << "Fatal: resource handle " + std::to_string(handle) + " not found in store.";
}
store_.erase(handle);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <time.h>

#include <arrow/status.h>
#include <glog/logging.h>
#include <chrono>

#include "utils/exception.h"
Expand Down
12 changes: 6 additions & 6 deletions cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public GoogleBenchmarkCol
localSchema = std::make_shared<arrow::Schema>(*schema_.get());

if (state.thread_index() == 0)
std::cout << localSchema->ToString() << std::endl;
LOG(INFO) << localSchema->ToString();

std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
std::shared_ptr<RecordBatchReader> recordBatchReader;
Expand All @@ -148,7 +148,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public GoogleBenchmarkCol
}
} while (recordBatch);

std::cout << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows << std::endl;
LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows;

// reuse the columnarToRowConverter for batches caused system % increase a lot
auto ctxPool = defaultLeafVeloxMemoryPool();
Expand Down Expand Up @@ -261,10 +261,10 @@ int main(int argc, char** argv) {
cpu = atol(argv[i + 1]);
}
}
std::cout << "iterations = " << iterations << std::endl;
std::cout << "threads = " << threads << std::endl;
std::cout << "datafile = " << datafile << std::endl;
std::cout << "cpu = " << cpu << std::endl;
LOG(INFO) << "iterations = " << iterations;
LOG(INFO) << "threads = " << threads;
LOG(INFO) << "datafile = " << datafile;
LOG(INFO) << "cpu = " << cpu;

gluten::GoogleBenchmarkColumnarToRowCacheScanBenchmark bck(datafile);

Expand Down
31 changes: 15 additions & 16 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ auto BM_Generic = [](::benchmark::State& state,
return;
}
if (FLAGS_print_result) {
std::cout << maybeBatch.ValueOrDie()->ToString() << std::endl;
LOG(INFO) << maybeBatch.ValueOrDie()->ToString();
}
}

Expand All @@ -203,7 +203,7 @@ auto BM_Generic = [](::benchmark::State& state,
const auto& task = rawIter->task_;
const auto& planNode = rawIter->veloxPlan_;
auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true);
std::cout << statsStr << std::endl;
LOG(INFO) << statsStr;
}
Runtime::release(runtime);

Expand Down Expand Up @@ -237,27 +237,26 @@ int main(int argc, char** argv) {

try {
if (argc < 2) {
std::cout << "No input args. Usage: " << std::endl
<< "./generic_benchmark /path/to/substrait_json_file /path/to/data_file_1 /path/to/data_file_2 ..."
<< std::endl;
std::cout << "Running example..." << std::endl;
LOG(INFO) << "No input args. Usage: " << std::endl
<< "./generic_benchmark /path/to/substrait_json_file /path/to/data_file_1 /path/to/data_file_2 ...";
LOG(INFO) << "Running example...";
inputFiles.resize(2);
substraitJsonFile = getGeneratedFilePath("example.json");
inputFiles[0] = getGeneratedFilePath("example_orders");
inputFiles[1] = getGeneratedFilePath("example_lineitem");
} else {
substraitJsonFile = argv[1];
abortIfFileNotExists(substraitJsonFile);
std::cout << "Using substrait json file: " << std::endl << substraitJsonFile << std::endl;
std::cout << "Using " << argc - 2 << " input data file(s): " << std::endl;
LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile;
LOG(INFO) << "Using " << argc - 2 << " input data file(s): ";
for (auto i = 2; i < argc; ++i) {
inputFiles.emplace_back(argv[i]);
abortIfFileNotExists(inputFiles.back());
std::cout << inputFiles.back() << std::endl;
LOG(INFO) << inputFiles.back();
}
}
} catch (const std::exception& e) {
std::cout << "Failed to run benchmark: " << e.what() << std::endl;
LOG(INFO) << "Failed to run benchmark: " << e.what();
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
Expand All @@ -278,12 +277,12 @@ int main(int argc, char** argv) {
} while (0)

#if 0
std::cout << "FLAGS_threads:" << FLAGS_threads << std::endl;
std::cout << "FLAGS_iterations:" << FLAGS_iterations << std::endl;
std::cout << "FLAGS_cpu:" << FLAGS_cpu << std::endl;
std::cout << "FLAGS_print_result:" << FLAGS_print_result << std::endl;
std::cout << "FLAGS_write_file:" << FLAGS_write_file << std::endl;
std::cout << "FLAGS_batch_size:" << FLAGS_batch_size << std::endl;
LOG(INFO) << "FLAGS_threads:" << FLAGS_threads ;
LOG(INFO) << "FLAGS_iterations:" << FLAGS_iterations ;
LOG(INFO) << "FLAGS_cpu:" << FLAGS_cpu ;
LOG(INFO) << "FLAGS_print_result:" << FLAGS_print_result ;
LOG(INFO) << "FLAGS_write_file:" << FLAGS_write_file ;
LOG(INFO) << "FLAGS_batch_size:" << FLAGS_batch_size ;
#endif

if (FLAGS_skip_input) {
Expand Down
18 changes: 9 additions & 9 deletions cpp/velox/benchmarks/ParquetWriteBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark : public GoogleBenchmar
localSchema = std::make_shared<arrow::Schema>(*schema_.get());

if (state.thread_index() == 0)
std::cout << localSchema->ToString() << std::endl;
LOG(INFO) << localSchema->ToString();

std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
std::shared_ptr<RecordBatchReader> recordBatchReader;
Expand All @@ -158,7 +158,7 @@ class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark : public GoogleBenchmar
}
} while (recordBatch);

std::cout << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows << std::endl;
LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows;

// reuse the ParquetWriteConverter for batches caused system % increase a lot
auto fileName = "arrow_parquet_write.parquet";
Expand Down Expand Up @@ -232,7 +232,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar
localSchema = std::make_shared<arrow::Schema>(*schema_.get());

if (state.thread_index() == 0)
std::cout << localSchema->ToString() << std::endl;
LOG(INFO) << localSchema->ToString();

std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
std::shared_ptr<RecordBatchReader> recordBatchReader;
Expand All @@ -251,7 +251,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar
}
} while (recordBatch);

std::cout << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows << std::endl;
LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000 << " rows = " << numRows;

// reuse the ParquetWriteConverter for batches caused system % increase a lot
auto fileName = "velox_parquet_write.parquet";
Expand Down Expand Up @@ -323,11 +323,11 @@ int main(int argc, char** argv) {
output = (argv[i + 1]);
}
}
std::cout << "iterations = " << iterations << std::endl;
std::cout << "threads = " << threads << std::endl;
std::cout << "datafile = " << datafile << std::endl;
std::cout << "cpu = " << cpu << std::endl;
std::cout << "output = " << output << std::endl;
LOG(INFO) << "iterations = " << iterations;
LOG(INFO) << "threads = " << threads;
LOG(INFO) << "datafile = " << datafile;
LOG(INFO) << "cpu = " << cpu;
LOG(INFO) << "output = " << output;

gluten::GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark bck(datafile, output);

Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/benchmarks/QueryBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ auto BM = [](::benchmark::State& state,
state.SkipWithError(maybeBatch.status().message().c_str());
return;
}
std::cout << maybeBatch.ValueOrDie()->ToString() << std::endl;
LOG(INFO) << maybeBatch.ValueOrDie()->ToString();
}
}
Runtime::release(runtime);
Expand Down
Loading

0 comments on commit fa64634

Please sign in to comment.