Skip to content

Commit

Permalink
[VL] Fix Arrow ColumnarBatch cannnot revoke rowIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Aug 16, 2024
1 parent 29bba29 commit 91f8f76
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ public void testCreateByHandle() {
});
}

@Test
public void testOffloadAndLoadReadRow() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch.column(1);
for (int j = 0; j < numRows; j++) {
col0.putBoolean(j, j % 2 == 0);
col1.putInt(j, 15 - j);
}
col1.putNull(numRows - 1);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
loaded.rowIterator(), Spliterator.ORDERED),
false)
.count();
Assert.assertEquals(numRows, cnt);
Assert.assertEquals(loaded.getRow(0).getInt(1), 15);
loaded.close();
return null;
});
}

private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
Expand Down
12 changes: 12 additions & 0 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
}
#endif

#ifdef ENABLE_ABFS
const auto& confValue = conf->rawConfigsCopy();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
connectorConfMap[k] = v;
} else if (k.find("spark.hadoop.fs.azure.account.key") == 0) {
constexpr int32_t accountKeyPrefixLength = 13;
connectorConfMap[k.substr(accountKeyPrefixLength)] = std::string(v);
}
}
#endif

hiveConfMap[facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarBatchRowUtil;

import java.lang.reflect.Field;
import java.util.Arrays;
Expand Down Expand Up @@ -100,6 +101,10 @@ private static void transferVectors(ColumnarBatch from, ColumnarBatch target) {
newVectors[i] = from.column(i);
}
FIELD_COLUMNS.set(target, newVectors);
// Light batch does not need the row.
if (isHeavyBatch(target)) {
ColumnarBatchRowUtil.setColumnarBatchRow(newVectors, target);
}
} catch (IllegalAccessException e) {
throw new GlutenException(e);
}
Expand Down Expand Up @@ -202,7 +207,8 @@ private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch input

// populate new vectors to input
transferVectors(output, input);
return input;

return output;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.vectorized;

import org.apache.gluten.exception.GlutenException;

import java.lang.reflect.Field;

public class ColumnarBatchRowUtil {

private static final Field FIELD_COLUMNAR_BATCH_ROW;

static {
try {
Field row = ColumnarBatch.class.getDeclaredField("row");
row.setAccessible(true);
FIELD_COLUMNAR_BATCH_ROW = row;
} catch (NoSuchFieldException e) {
throw new GlutenException(e);
}
}

public static void setColumnarBatchRow(ColumnVector[] columns, ColumnarBatch target) {
ColumnarBatchRow row = new ColumnarBatchRow(columns);
try {
FIELD_COLUMNAR_BATCH_ROW.set(target, row);
} catch (IllegalAccessException e) {
throw new GlutenException(e);
}
}
}

0 comments on commit 91f8f76

Please sign in to comment.