Skip to content

Commit

Permalink
improve the performance of converting row batch to column batch (#136)
Browse files Browse the repository at this point in the history
* improve the performance of converting row to column

improve the performance of converting row to column by reducing jvm calls

* try to release local jobject

* rename interface
  • Loading branch information
lgbo-ustc authored Sep 30, 2022
1 parent 198ba4e commit d6a6cc9
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
1 change: 1 addition & 0 deletions utils/local-engine/Parser/SparkRowToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace local_engine
jclass SparkRowToCHColumn::spark_row_interator_class = nullptr;
jmethodID SparkRowToCHColumn::spark_row_interator_hasNext = nullptr;
jmethodID SparkRowToCHColumn::spark_row_interator_next = nullptr;
jmethodID SparkRowToCHColumn::spark_row_iterator_nextBatch = nullptr;

int64_t getStringColumnTotalSize(int ordinal, SparkRowInfo & spark_row_info)
{
Expand Down
22 changes: 15 additions & 7 deletions utils/local-engine/Parser/SparkRowToCHColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class SparkRowToCHColumn
static jclass spark_row_interator_class;
static jmethodID spark_row_interator_hasNext;
static jmethodID spark_row_interator_next;
static jmethodID spark_row_iterator_nextBatch;

// case 1: rows are batched (this is often directly converted from Block)
static std::unique_ptr<Block> convertSparkRowInfoToCHColumn(SparkRowInfo & spark_row_info, Block & header);
Expand All @@ -143,13 +144,20 @@ class SparkRowToCHColumn
JNIEnv * env = JNIUtils::getENV(&attached);
while (env->CallBooleanMethod(java_iter, spark_row_interator_hasNext))
{
jbyteArray row_data = static_cast<jbyteArray>(env->CallObjectMethod(java_iter, spark_row_interator_next));

jsize len = env->GetArrayLength(row_data);
char * c_arr = new char[len];
env->GetByteArrayRegion(row_data, 0, len, reinterpret_cast<jbyte*>(c_arr));
appendSparkRowToCHColumn(helper, reinterpret_cast<int64_t>(c_arr), len);
delete[] c_arr;
jobject rows_buf = env->CallObjectMethod(java_iter, spark_row_iterator_nextBatch);
auto * rows_buf_ptr = static_cast<char*>(env->GetDirectBufferAddress(rows_buf));
int len = *(reinterpret_cast<int*>(rows_buf_ptr));

// when len = -1, reach the buf's end.
while (len > 0)
{
rows_buf_ptr += 4;
appendSparkRowToCHColumn(helper, reinterpret_cast<int64_t>(rows_buf_ptr), len);
rows_buf_ptr += len;
len = *(reinterpret_cast<int*>(rows_buf_ptr));
}
// Try to release reference.
env->DeleteLocalRef(rows_buf);
}
return getWrittenBlock(helper);
}
Expand Down
2 changes: 2 additions & 0 deletions utils/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z");
local_engine::SparkRowToCHColumn::spark_row_interator_next
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B");
local_engine::SparkRowToCHColumn::spark_row_iterator_nextBatch
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "nextBatch", "()Ljava/nio/ByteBuffer;");

local_engine::JNIUtils::vm = vm;
local_engine::registerReadBufferBuildes(local_engine::ReadBufferBuilderFactory::instance());
Expand Down

0 comments on commit d6a6cc9

Please sign in to comment.