Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[GLUTEN-4660][CH]Asynchronous shuffle read (#4664)" #4767

Merged
merged 1 commit into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ public CHStreamReader(ShuffleInputStream shuffleInputStream) {
this.inputStream,
inputStream.isCompressed(),
CHBackendSettings.maxShuffleReadRows(),
CHBackendSettings.maxShuffleReadBytes(),
CHBackendSettings.enableShufflePrefetch());
CHBackendSettings.maxShuffleReadBytes());
}

private static native long createNativeShuffleReader(
ShuffleInputStream inputStream,
boolean compressed,
long maxShuffleReadRows,
long maxShuffleReadBytes,
boolean enable_prefetch);
long maxShuffleReadBytes);

private native long nativeNext(long nativeShuffleReader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = -1

val GLUTEN_SHUFFLE_EBABLE_PREFETCH: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.shuffle_enable_prefetch"
val GLUTEN_SHUFFLE_EBABLE_PREFETCH_DEFAULT = true

def affinityMode: String = {
SparkEnv.get.conf
.get(
Expand Down Expand Up @@ -277,11 +272,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
}

def enableShufflePrefetch(): Boolean = {
SparkEnv.get.conf
.getBoolean(GLUTEN_SHUFFLE_EBABLE_PREFETCH, GLUTEN_SHUFFLE_EBABLE_PREFETCH_DEFAULT)
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ void configureCompressedReadBuffer(DB::CompressedReadBuffer & compressedReadBuff
{
compressedReadBuffer.disableChecksumming();
}
ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_, bool enable_prefetch)
ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_)
: in(std::move(in_)), max_shuffle_read_rows(max_shuffle_read_rows_), max_shuffle_read_bytes(max_shuffle_read_bytes_)
{
if (compressed)
{
compressed_in = std::make_unique<CompressedReadBuffer>(*in);
configureCompressedReadBuffer(static_cast<DB::CompressedReadBuffer &>(*compressed_in));
input_stream = std::make_unique<NativeReader>(*compressed_in, max_shuffle_read_rows_, max_shuffle_read_bytes_, enable_prefetch);
input_stream = std::make_unique<NativeReader>(*compressed_in, max_shuffle_read_rows_, max_shuffle_read_bytes_);
}
else
{
input_stream = std::make_unique<NativeReader>(*in, max_shuffle_read_rows_, max_shuffle_read_bytes_, enable_prefetch);
input_stream = std::make_unique<NativeReader>(*in);
}
}
Block * ShuffleReader::read()
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ShuffleReader : BlockIterator
{
public:
explicit ShuffleReader(
std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_, bool enable_prefetch);
std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_);
DB::Block * read();
~ShuffleReader();
static jclass input_stream_class;
Expand Down
41 changes: 0 additions & 41 deletions cpp-ch/local-engine/Storages/IO/NativeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,6 @@
#include <Common/Arena.h>
#include <Storages/IO/NativeWriter.h>
#include <Storages/IO/AggregateSerializationUtils.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>

#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/Context.h>

namespace CurrentMetrics
{
extern const Metric Read;
extern const Metric ThreadPoolFSReaderThreads;
extern const Metric ThreadPoolFSReaderThreadsActive;
extern const Metric ThreadPoolFSReaderThreadsScheduled;
}


namespace DB
{
Expand All @@ -62,33 +47,7 @@ Block NativeReader::getHeader() const
return header;
}

std::atomic<size_t> nn = 0;

void NativeReader::prefetch()
{
if (!enable_prefetch)
return;
auto & tpool = DB::Context::getGlobalContextInstance()->getPrefetchThreadpool();
prefetch_future = DB::scheduleFromThreadPool<DB::Block>(
[this]() { return this->readImpl(); }, tpool, "NativeReader", DB::DEFAULT_PREFETCH_PRIORITY);
}

DB::Block NativeReader::read()
{
DB::Block res;
if (prefetch_future.valid())
{
res = prefetch_future.get();
prefetch_future = {};
}
else
res = readImpl();
if (res)
prefetch();
return res;
}

DB::Block NativeReader::readImpl()
{
DB::Block result_block;
if (istr.eof())
Expand Down
14 changes: 1 addition & 13 deletions cpp-ch/local-engine/Storages/IO/NativeReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
#include <Core/Block.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <future>
#include <memory>
#include <Common/ThreadPool.h>

namespace local_engine
{
Expand All @@ -48,22 +45,16 @@ class NativeReader
};

NativeReader(
DB::ReadBuffer & istr_,
Int64 max_block_size_ = DB::DEFAULT_BLOCK_SIZE,
Int64 max_block_bytes_ = DB::DEFAULT_BLOCK_SIZE * 256,
bool enable_prefetch_ = false)
DB::ReadBuffer & istr_, Int64 max_block_size_ = DB::DEFAULT_BLOCK_SIZE, Int64 max_block_bytes_ = DB::DEFAULT_BLOCK_SIZE * 256)
: istr(istr_)
, max_block_size(max_block_size_ != 0 ? static_cast<size_t>(max_block_size_) : DB::DEFAULT_BLOCK_SIZE)
, max_block_bytes(max_block_bytes_ != 0 ? static_cast<size_t>(max_block_bytes_) : DB::DEFAULT_BLOCK_SIZE * 256)
, enable_prefetch(enable_prefetch_)
{
}

DB::Block getHeader() const;

DB::Block read();
DB::Block readImpl();
void prefetch();

private:
DB::ReadBuffer & istr;
Expand All @@ -75,9 +66,6 @@ class NativeReader

std::vector<ColumnParseUtil> columns_parse_util;

bool enable_prefetch = false;
std::future<DB::Block> prefetch_future;

void updateAvgValueSizeHints(const DB::Block & block);

DB::Block prepareByFirstBlock();
Expand Down
16 changes: 7 additions & 9 deletions cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,24 @@
*/
#include "SourceFromJavaIter.h"
#include <Columns/ColumnConst.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <jni/jni_common.h>
#include <Common/assert_cast.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/Exception.h>
#include <Common/JNIUtils.h>
#include <Common/assert_cast.h>

#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/IDataType.h>

namespace local_engine
{
Expand Down Expand Up @@ -92,7 +91,6 @@ DB::Chunk SourceFromJavaIter::generate()
}
CLEAN_JNIENV
return result;

}

SourceFromJavaIter::~SourceFromJavaIter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
bool enable_async_io = context->getSettingsRef().remote_filesystem_read_prefetch;
bool enable_async_io = context->getConfigRef().getBool("hdfs.enable_async_io", true);
Poco::URI file_uri(file_info.uri_file());
std::string uri_path = "hdfs://" + file_uri.getHost();
if (file_uri.getPort())
Expand Down
10 changes: 2 additions & 8 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,13 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(
}

JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env,
jclass /*clazz*/,
jobject input_stream,
jboolean compressed,
jlong max_shuffle_read_rows,
jlong max_shuffle_read_bytes,
jboolean enable_prefetch)
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * input = env->NewGlobalRef(input_stream);
auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input);
auto * shuffle_reader
= new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes, enable_prefetch);
= new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes);
return reinterpret_cast<jlong>(shuffle_reader);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
Expand Down
Loading