diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHStreamReader.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHStreamReader.java index e5415924db3e..e7497c9de7e7 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHStreamReader.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHStreamReader.java @@ -38,16 +38,14 @@ public CHStreamReader(ShuffleInputStream shuffleInputStream) { this.inputStream, inputStream.isCompressed(), CHBackendSettings.maxShuffleReadRows(), - CHBackendSettings.maxShuffleReadBytes(), - CHBackendSettings.enableShufflePrefetch()); + CHBackendSettings.maxShuffleReadBytes()); } private static native long createNativeShuffleReader( ShuffleInputStream inputStream, boolean compressed, long maxShuffleReadRows, - long maxShuffleReadBytes, - boolean enable_prefetch); + long maxShuffleReadBytes); private native long nativeNext(long nativeShuffleReader); diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 96ef784a2f76..fbcb804a3e59 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -129,11 +129,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { ".runtime_config.max_source_concatenate_bytes" val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = -1 - val GLUTEN_SHUFFLE_EBABLE_PREFETCH: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".runtime_config.shuffle_enable_prefetch" - val GLUTEN_SHUFFLE_EBABLE_PREFETCH_DEFAULT = true - def affinityMode: String = { SparkEnv.get.conf .get( @@ -277,11 +272,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { .getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT) } - def enableShufflePrefetch(): Boolean = { - SparkEnv.get.conf - .getBoolean(GLUTEN_SHUFFLE_EBABLE_PREFETCH, GLUTEN_SHUFFLE_EBABLE_PREFETCH_DEFAULT) - } - override def supportWriteFilesExec( format: FileFormat, fields: Array[StructField], diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp index 9ed56336a39d..6af003fad112 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp @@ -32,18 +32,18 @@ void configureCompressedReadBuffer(DB::CompressedReadBuffer & compressedReadBuff { compressedReadBuffer.disableChecksumming(); } -ShuffleReader::ShuffleReader(std::unique_ptr in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_, bool enable_prefetch) +ShuffleReader::ShuffleReader(std::unique_ptr in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_) : in(std::move(in_)), max_shuffle_read_rows(max_shuffle_read_rows_), max_shuffle_read_bytes(max_shuffle_read_bytes_) { if (compressed) { compressed_in = std::make_unique(*in); configureCompressedReadBuffer(static_cast(*compressed_in)); - input_stream = std::make_unique(*compressed_in, max_shuffle_read_rows_, max_shuffle_read_bytes_, enable_prefetch); + input_stream = std::make_unique(*compressed_in, max_shuffle_read_rows_, max_shuffle_read_bytes_); } else { - input_stream = std::make_unique(*in, max_shuffle_read_rows_, max_shuffle_read_bytes_, enable_prefetch); + input_stream = std::make_unique(*in); } } Block * ShuffleReader::read() diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.h b/cpp-ch/local-engine/Shuffle/ShuffleReader.h index ee2a65119951..0a7650420400 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.h @@ -35,7 +35,7 @@ class ShuffleReader : BlockIterator { public: explicit ShuffleReader( - std::unique_ptr in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_, bool enable_prefetch); + std::unique_ptr in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_); DB::Block * read(); ~ShuffleReader(); static jclass input_stream_class; diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp index 8f12a89c9277..6584ef580b9e 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp @@ -24,21 +24,6 @@ #include #include #include -#include -#include - -#include -#include -#include - -namespace CurrentMetrics -{ - extern const Metric Read; - extern const Metric ThreadPoolFSReaderThreads; - extern const Metric ThreadPoolFSReaderThreadsActive; - extern const Metric ThreadPoolFSReaderThreadsScheduled; -} - namespace DB { @@ -62,33 +47,7 @@ Block NativeReader::getHeader() const return header; } -std::atomic nn = 0; - -void NativeReader::prefetch() -{ - if (!enable_prefetch) - return; - auto & tpool = DB::Context::getGlobalContextInstance()->getPrefetchThreadpool(); - prefetch_future = DB::scheduleFromThreadPool( - [this]() { return this->readImpl(); }, tpool, "NativeReader", DB::DEFAULT_PREFETCH_PRIORITY); -} - DB::Block NativeReader::read() -{ - DB::Block res; - if (prefetch_future.valid()) - { - res = prefetch_future.get(); - prefetch_future = {}; - } - else - res = readImpl(); - if (res) - prefetch(); - return res; -} - -DB::Block NativeReader::readImpl() { DB::Block result_block; if (istr.eof()) diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.h b/cpp-ch/local-engine/Storages/IO/NativeReader.h index 5e9fb220e64b..4e85526e7d64 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.h +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.h @@ -20,9 +20,6 @@ #include #include #include -#include -#include -#include namespace local_engine { @@ -48,22 +45,16 @@ class NativeReader }; NativeReader( - DB::ReadBuffer & istr_, - Int64 max_block_size_ = DB::DEFAULT_BLOCK_SIZE, - Int64 max_block_bytes_ = DB::DEFAULT_BLOCK_SIZE * 256, - bool enable_prefetch_ = false) + DB::ReadBuffer & istr_, Int64 max_block_size_ = DB::DEFAULT_BLOCK_SIZE, Int64 max_block_bytes_ = DB::DEFAULT_BLOCK_SIZE * 256) : istr(istr_) , max_block_size(max_block_size_ != 0 ? static_cast(max_block_size_) : DB::DEFAULT_BLOCK_SIZE) , max_block_bytes(max_block_bytes_ != 0 ? static_cast(max_block_bytes_) : DB::DEFAULT_BLOCK_SIZE * 256) - , enable_prefetch(enable_prefetch_) { } DB::Block getHeader() const; DB::Block read(); - DB::Block readImpl(); - void prefetch(); private: DB::ReadBuffer & istr; @@ -75,9 +66,6 @@ class NativeReader std::vector columns_parse_util; - bool enable_prefetch = false; - std::future prefetch_future; - void updateAvgValueSizeHints(const DB::Block & block); DB::Block prepareByFirstBlock(); diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp index ae3e59d98c1b..d67161623af2 100644 --- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp +++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp @@ -16,25 +16,24 @@ */ #include "SourceFromJavaIter.h" #include -#include #include +#include #include #include #include -#include -#include -#include -#include #include -#include #include #include +#include #include #include #include #include -#include - +#include +#include +#include +#include +#include namespace local_engine { @@ -92,7 +91,6 @@ DB::Chunk SourceFromJavaIter::generate() } CLEAN_JNIENV return result; - } SourceFromJavaIter::~SourceFromJavaIter() diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index b2c70e9a40af..bf4071734326 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -208,7 +208,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override { - bool enable_async_io = context->getSettingsRef().remote_filesystem_read_prefetch; + bool enable_async_io = context->getConfigRef().getBool("hdfs.enable_async_io", true); Poco::URI file_uri(file_info.uri_file()); std::string uri_path = "hdfs://" + file_uri.getHost(); if (file_uri.getPort()) diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index f63bd8e8f070..d1a675f4916e 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -566,19 +566,13 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes( } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader( - JNIEnv * env, - jclass /*clazz*/, - jobject input_stream, - jboolean compressed, - jlong max_shuffle_read_rows, - jlong max_shuffle_read_bytes, - jboolean enable_prefetch) + JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes) { LOCAL_ENGINE_JNI_METHOD_START auto * input = env->NewGlobalRef(input_stream); auto read_buffer = std::make_unique(input); auto * shuffle_reader - = new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes, enable_prefetch); + = new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes); return reinterpret_cast(shuffle_reader); LOCAL_ENGINE_JNI_METHOD_END(env, -1) }