Skip to content

Commit

Permalink
Revert "[GLUTEN-4660][CH]Asynchronous shuffle read (#4664)"
Browse files Browse the repository at this point in the history
This reverts commit bf528b3.
  • Loading branch information
baibaichen committed Feb 24, 2024
1 parent b823592 commit 85231f6
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ public CHStreamReader(ShuffleInputStream shuffleInputStream) {
this.inputStream,
inputStream.isCompressed(),
CHBackendSettings.maxShuffleReadRows(),
CHBackendSettings.maxShuffleReadBytes(),
CHBackendSettings.enableShufflePrefetch());
CHBackendSettings.maxShuffleReadBytes());
}

private static native long createNativeShuffleReader(
ShuffleInputStream inputStream,
boolean compressed,
long maxShuffleReadRows,
long maxShuffleReadBytes,
boolean enable_prefetch);
long maxShuffleReadBytes);

private native long nativeNext(long nativeShuffleReader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = -1

val GLUTEN_SHUFFLE_EBABLE_PREFETCH: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.shuffle_enable_prefetch"
val GLUTEN_SHUFFLE_EBABLE_PREFETCH_DEFAULT = true

def affinityMode: String = {
SparkEnv.get.conf
.get(
Expand Down Expand Up @@ -277,11 +272,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
}

def enableShufflePrefetch(): Boolean = {
SparkEnv.get.conf
.getBoolean(GLUTEN_SHUFFLE_EBABLE_PREFETCH, GLUTEN_SHUFFLE_EBABLE_PREFETCH_DEFAULT)
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ void configureCompressedReadBuffer(DB::CompressedReadBuffer & compressedReadBuff
{
compressedReadBuffer.disableChecksumming();
}
ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_, bool enable_prefetch)
ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_)
: in(std::move(in_)), max_shuffle_read_rows(max_shuffle_read_rows_), max_shuffle_read_bytes(max_shuffle_read_bytes_)
{
if (compressed)
{
compressed_in = std::make_unique<CompressedReadBuffer>(*in);
configureCompressedReadBuffer(static_cast<DB::CompressedReadBuffer &>(*compressed_in));
input_stream = std::make_unique<NativeReader>(*compressed_in, max_shuffle_read_rows_, max_shuffle_read_bytes_, enable_prefetch);
input_stream = std::make_unique<NativeReader>(*compressed_in, max_shuffle_read_rows_, max_shuffle_read_bytes_);
}
else
{
input_stream = std::make_unique<NativeReader>(*in, max_shuffle_read_rows_, max_shuffle_read_bytes_, enable_prefetch);
input_stream = std::make_unique<NativeReader>(*in);
}
}
Block * ShuffleReader::read()
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ShuffleReader : BlockIterator
{
public:
explicit ShuffleReader(
std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_, bool enable_prefetch);
std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_);
DB::Block * read();
~ShuffleReader();
static jclass input_stream_class;
Expand Down
41 changes: 0 additions & 41 deletions cpp-ch/local-engine/Storages/IO/NativeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,6 @@
#include <Common/Arena.h>
#include <Storages/IO/NativeWriter.h>
#include <Storages/IO/AggregateSerializationUtils.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>

#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/Context.h>

namespace CurrentMetrics
{
extern const Metric Read;
extern const Metric ThreadPoolFSReaderThreads;
extern const Metric ThreadPoolFSReaderThreadsActive;
extern const Metric ThreadPoolFSReaderThreadsScheduled;
}


namespace DB
{
Expand All @@ -62,33 +47,7 @@ Block NativeReader::getHeader() const
return header;
}

std::atomic<size_t> nn = 0;

void NativeReader::prefetch()
{
if (!enable_prefetch)
return;
auto & tpool = DB::Context::getGlobalContextInstance()->getPrefetchThreadpool();
prefetch_future = DB::scheduleFromThreadPool<DB::Block>(
[this]() { return this->readImpl(); }, tpool, "NativeReader", DB::DEFAULT_PREFETCH_PRIORITY);
}

DB::Block NativeReader::read()
{
DB::Block res;
if (prefetch_future.valid())
{
res = prefetch_future.get();
prefetch_future = {};
}
else
res = readImpl();
if (res)
prefetch();
return res;
}

DB::Block NativeReader::readImpl()
{
DB::Block result_block;
if (istr.eof())
Expand Down
14 changes: 1 addition & 13 deletions cpp-ch/local-engine/Storages/IO/NativeReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
#include <Core/Block.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <future>
#include <memory>
#include <Common/ThreadPool.h>

namespace local_engine
{
Expand All @@ -48,22 +45,16 @@ class NativeReader
};

NativeReader(
DB::ReadBuffer & istr_,
Int64 max_block_size_ = DB::DEFAULT_BLOCK_SIZE,
Int64 max_block_bytes_ = DB::DEFAULT_BLOCK_SIZE * 256,
bool enable_prefetch_ = false)
DB::ReadBuffer & istr_, Int64 max_block_size_ = DB::DEFAULT_BLOCK_SIZE, Int64 max_block_bytes_ = DB::DEFAULT_BLOCK_SIZE * 256)
: istr(istr_)
, max_block_size(max_block_size_ != 0 ? static_cast<size_t>(max_block_size_) : DB::DEFAULT_BLOCK_SIZE)
, max_block_bytes(max_block_bytes_ != 0 ? static_cast<size_t>(max_block_bytes_) : DB::DEFAULT_BLOCK_SIZE * 256)
, enable_prefetch(enable_prefetch_)
{
}

DB::Block getHeader() const;

DB::Block read();
DB::Block readImpl();
void prefetch();

private:
DB::ReadBuffer & istr;
Expand All @@ -75,9 +66,6 @@ class NativeReader

std::vector<ColumnParseUtil> columns_parse_util;

bool enable_prefetch = false;
std::future<DB::Block> prefetch_future;

void updateAvgValueSizeHints(const DB::Block & block);

DB::Block prepareByFirstBlock();
Expand Down
16 changes: 7 additions & 9 deletions cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,24 @@
*/
#include "SourceFromJavaIter.h"
#include <Columns/ColumnConst.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <jni/jni_common.h>
#include <Common/assert_cast.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/Exception.h>
#include <Common/JNIUtils.h>
#include <Common/assert_cast.h>

#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/IDataType.h>

namespace local_engine
{
Expand Down Expand Up @@ -92,7 +91,6 @@ DB::Chunk SourceFromJavaIter::generate()
}
CLEAN_JNIENV
return result;

}

SourceFromJavaIter::~SourceFromJavaIter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
bool enable_async_io = context->getSettingsRef().remote_filesystem_read_prefetch;
bool enable_async_io = context->getConfigRef().getBool("hdfs.enable_async_io", true);
Poco::URI file_uri(file_info.uri_file());
std::string uri_path = "hdfs://" + file_uri.getHost();
if (file_uri.getPort())
Expand Down
10 changes: 2 additions & 8 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,13 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(
}

JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env,
jclass /*clazz*/,
jobject input_stream,
jboolean compressed,
jlong max_shuffle_read_rows,
jlong max_shuffle_read_bytes,
jboolean enable_prefetch)
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * input = env->NewGlobalRef(input_stream);
auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input);
auto * shuffle_reader
= new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes, enable_prefetch);
= new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes);
return reinterpret_cast<jlong>(shuffle_reader);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
Expand Down

0 comments on commit 85231f6

Please sign in to comment.