Skip to content

Commit

Permalink
Merge branch 'main' into support_array_join
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinyhZou authored Jul 8, 2024
2 parents 02a7874 + 8300f3b commit ef2a26c
Show file tree
Hide file tree
Showing 19 changed files with 64 additions and 63 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

name: Build bundle package

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.arrow.vector.types.pojo.Schema

import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}

class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator)
class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator, queueSize: Int)
extends Iterator[ColumnarBatch]
with AutoCloseable {
private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](64)
private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize)
private var currentBatch: Option[ColumnarBatch] = None

def enqueue(batch: ColumnarBatch): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class VeloxWriteQueue(
schema: Schema,
allocator: BufferAllocator,
datasourceJniWrapper: DatasourceJniWrapper,
outputFileURI: String)
outputFileURI: String,
queueSize: Int)
extends AutoCloseable {
private val scanner = new VeloxColumnarBatchIterator(schema, allocator)
private val scanner = new VeloxColumnarBatchIterator(schema, allocator, queueSize)
private val writeException = new AtomicReference[Throwable]

private val writeThread = new Thread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.exception.GlutenException
Expand Down Expand Up @@ -73,14 +74,18 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
cSchema.close()
}

// FIXME: remove this once we support push-based write.
val queueSize = context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)

val writeQueue =
new VeloxWriteQueue(
TaskResources.getLocalTaskContext(),
dsHandle,
arrowSchema,
allocator,
datasourceJniWrapper,
filePath)
filePath,
queueSize)

new OutputWriter {
override def write(row: InternalRow): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240705
CH_COMMIT=531a87ed802
CH_BRANCH=rebase_ch/20240706
CH_COMMIT=25bf31bfbdf

6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Operator/DefaultHashAggregateResult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,18 @@ class DefaultHashAggrgateResultTransform : public DB::IProcessor
has_input = true;
output_chunk = DB::Chunk(result_cols, 1);
auto info = std::make_shared<DB::AggregatedChunkInfo>();
output_chunk.getChunkInfos().add(std::move(info));
output_chunk.setChunkInfo(info);
return Status::Ready;
}

input.setNeeded();
if (input.hasData())
{
output_chunk = input.pull(true);
if (output_chunk.getChunkInfos().empty())
if (!output_chunk.hasChunkInfo())
{
auto info = std::make_shared<DB::AggregatedChunkInfo>();
output_chunk.getChunkInfos().add(std::move(info));
output_chunk.setChunkInfo(info);
}
has_input = true;
return Status::Ready;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ void SparkMergeTreeWriter::write(const DB::Block & block)
checkAndMerge();
}

bool SparkMergeTreeWriter::chunkToPart(Chunk && plan_chunk)
bool SparkMergeTreeWriter::chunkToPart(Chunk && chunk)
{
if (Chunk result_chunk = DB::Squashing::squash(std::move(plan_chunk)))
if (chunk.hasChunkInfo())
{
auto result = squashing->getHeader().cloneWithColumns(result_chunk.detachColumns());
Chunk squash_chunk = DB::Squashing::squash(std::move(chunk));
Block result = header.cloneWithColumns(squash_chunk.getColumns());
return blockToPart(result);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class SparkMergeTreeWriter
void saveMetadata();
void commitPartToRemoteStorageIfNeeded();
void finalizeMerge();
bool chunkToPart(Chunk && plan_chunk);
bool chunkToPart(Chunk && chunk);
bool blockToPart(Block & block);
bool useLocalStorage() const;

Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ DB::Chunk SourceFromJavaIter::generate()
auto info = std::make_shared<DB::AggregatedChunkInfo>();
info->is_overflows = data->info.is_overflows;
info->bucket_num = data->info.bucket_num;
result.getChunkInfos().add(std::move(info));
result.setChunkInfo(info);
}
else
{
result = BlockUtil::buildRowCountChunk(rows);
auto info = std::make_shared<DB::AggregatedChunkInfo>();
result.getChunkInfos().add(std::move(info));
result.setChunkInfo(info);
}
}
return result;
Expand Down
19 changes: 17 additions & 2 deletions cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <IO/BoundedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
Expand Down Expand Up @@ -52,6 +51,10 @@
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>

#if USE_AZURE_BLOB_STORAGE
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#endif

#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/model/CopyObjectRequest.h>
Expand Down Expand Up @@ -687,7 +690,19 @@ class AzureBlobReadBuffer : public ReadBufferBuilder
{
if (shared_client)
return shared_client;
shared_client = DB::getAzureBlobContainerClient(context->getConfigRef(), "blob");

const std::string config_prefix = "blob";
const Poco::Util::AbstractConfiguration & config = context->getConfigRef();
bool is_client_for_disk = false;
auto new_settings = DB::AzureBlobStorage::getRequestSettings(config, config_prefix, context);
DB::AzureBlobStorage::ConnectionParams params
{
.endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix),
.auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix),
.client_options = DB::AzureBlobStorage::getClientOptions(*new_settings, is_client_for_disk),
};

shared_client = DB::AzureBlobStorage::getContainerClient(params, true);
return shared_client;
}
};
Expand Down
3 changes: 1 addition & 2 deletions cpp-ch/local-engine/tests/gtest_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ TEST(LocalExecutor, StorageObjectStorageSink)

/// 2. Create Chunk
/// 3. comsume
Chunk data = testChunk();
sink.consume(data);
sink.consume(testChunk());
sink.onFinish();
}

Expand Down
37 changes: 0 additions & 37 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,6 @@ JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIte
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (iter == nullptr) {
std::string errorMessage =
Expand Down Expand Up @@ -438,8 +436,6 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIter
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
auto metrics = iter->getMetrics();
unsigned int numMetrics = 0;
Expand Down Expand Up @@ -499,8 +495,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterat
jlong iterHandle,
jlong size) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto it = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (it == nullptr) {
std::string errorMessage = "Invalid result iter handle " + std::to_string(iterHandle);
Expand All @@ -515,8 +509,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

ObjectStore::release(iterHandle);
JNI_METHOD_END()
}
Expand All @@ -540,7 +532,6 @@ Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
jlong c2rHandle,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto columnarToRowConverter = ObjectStore::retrieve<ColumnarToRowConverter>(c2rHandle);
auto cb = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
columnarToRowConverter->convert(cb);
Expand Down Expand Up @@ -569,8 +560,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_NativeColumnarToRowJniW
jobject wrapper,
jlong c2rHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

ObjectStore::release(c2rHandle);
JNI_METHOD_END()
}
Expand Down Expand Up @@ -614,8 +603,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_NativeRowToColumnarJniW
jobject wrapper,
jlong r2cHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

ObjectStore::release(r2cHandle);
JNI_METHOD_END()
}
Expand All @@ -625,7 +612,6 @@ JNIEXPORT jstring JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniW
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return env->NewStringUTF(batch->getType().c_str());
JNI_METHOD_END(nullptr)
Expand All @@ -636,7 +622,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return batch->numBytes();
JNI_METHOD_END(kInvalidObjectHandle)
Expand All @@ -647,7 +632,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return batch->numColumns();
JNI_METHOD_END(kInvalidObjectHandle)
Expand All @@ -658,7 +642,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return batch->numRows();
JNI_METHOD_END(kInvalidObjectHandle)
Expand Down Expand Up @@ -692,7 +675,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrap
jlong cSchema,
jlong cArray) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
std::shared_ptr<ArrowSchema> exportedSchema = batch->exportArrowSchema();
std::shared_ptr<ArrowArray> exportedArray = batch->exportArrowArray();
Expand Down Expand Up @@ -755,7 +737,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrap
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
ObjectStore::release(batchHandle);
JNI_METHOD_END()
}
Expand Down Expand Up @@ -900,8 +881,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jlong size,
jboolean callBySelf) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto shuffleWriter = ObjectStore::retrieve<ShuffleWriter>(shuffleWriterHandle);
if (!shuffleWriter) {
std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle);
Expand All @@ -922,8 +901,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jlong batchHandle,
jlong memLimit) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto shuffleWriter = ObjectStore::retrieve<ShuffleWriter>(shuffleWriterHandle);
if (!shuffleWriter) {
std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle);
Expand All @@ -943,8 +920,6 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
jobject wrapper,
jlong shuffleWriterHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto shuffleWriter = ObjectStore::retrieve<ShuffleWriter>(shuffleWriterHandle);
if (!shuffleWriter) {
std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle);
Expand Down Expand Up @@ -985,8 +960,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper
jobject wrapper,
jlong shuffleWriterHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

ObjectStore::release(shuffleWriterHandle);
JNI_METHOD_END()
}
Expand Down Expand Up @@ -1053,8 +1026,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper
jlong shuffleReaderHandle,
jobject metrics) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto reader = ObjectStore::retrieve<ShuffleReader>(shuffleReaderHandle);
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDecompressTime, reader->getDecompressTime());
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getDeserializeTime());
Expand All @@ -1068,8 +1039,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper
jobject wrapper,
jlong shuffleReaderHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto reader = ObjectStore::retrieve<ShuffleReader>(shuffleReaderHandle);
GLUTEN_THROW_NOT_OK(reader->close());
ObjectStore::release(shuffleReaderHandle);
Expand Down Expand Up @@ -1111,8 +1080,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_in
jlong dsHandle,
jlong cSchema) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
JNI_METHOD_END()
Expand All @@ -1123,8 +1090,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_cl
jobject wrapper,
jlong dsHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
datasource->close();
ObjectStore::release(dsHandle);
Expand Down Expand Up @@ -1252,8 +1217,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSerializer
jobject wrapper,
jlong serializerHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

ObjectStore::release(serializerHandle);
JNI_METHOD_END()
}
Expand Down
Loading

0 comments on commit ef2a26c

Please sign in to comment.