Skip to content

Commit

Permalink
[CALCITE-6593] NPE when outer joining tables with many fields and unm…
Browse files Browse the repository at this point in the history
…atching rows

EnumUtils:
- Split the code paths of compact code and normal code;
- Keep the normal code path as is and change the compact code path:
  - Generate a null check if the row might be null;
  - Add the early break in case of semi/anti joins;
  - Remove the "optimization" of generating an array of a specific
  type if all output fields are of the same type;

JavaRowType:
- Make the copy method abstract and implement a specific copy for
every JavaRowType;

Also, add a new system property to determine the threshold that triggers
compact code generation.
  • Loading branch information
rorueda authored and rubenada committed Sep 27, 2024
1 parent ed18e58 commit a8802c7
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.calcite.linq4j.tree.NewArrayExpression;
import org.apache.calcite.linq4j.tree.ParameterExpression;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.linq4j.tree.Statement;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.linq4j.tree.UnaryExpression;
import org.apache.calcite.rel.RelNode;
Expand All @@ -64,7 +65,6 @@

import org.checkerframework.checker.nullness.qual.Nullable;

import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
Expand All @@ -86,6 +86,8 @@
import java.util.TimeZone;
import java.util.function.Function;

import static org.apache.calcite.config.CalciteSystemProperty.JOIN_SELECTOR_COMPACT_CODE_THRESHOLD;

import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -159,40 +161,18 @@ static List<RelDataType> fieldRowTypes(

static Expression joinSelector(JoinRelType joinType, PhysType physType,
List<PhysType> inputPhysTypes) {
// A parameter for each input.
final List<ParameterExpression> parameters = new ArrayList<>();

// Generate all fields.
final List<Expression> expressions = new ArrayList<>();
final int outputFieldCount = physType.getRowType().getFieldCount();

// If there are many output fields, create the output dynamically so that the code size stays
// below the limit. See CALCITE-3094.
final boolean generateCompactCode = outputFieldCount >= 100;
final ParameterExpression compactOutputVar;
final BlockBuilder compactCode = new BlockBuilder();
if (generateCompactCode) {
Class<?> fieldClass = physType.fieldClass(0);
// If all fields have the same type, use the specific type. Otherwise just use Object.
for (int fieldIndex = 1; fieldIndex < outputFieldCount; ++fieldIndex) {
if (fieldClass != physType.fieldClass(fieldIndex)) {
fieldClass = Object.class;
break;
}
}

final Class<?> arrayClass = Array.newInstance(fieldClass, 0).getClass();
compactOutputVar = Expressions.variable(arrayClass, "outputArray");
final DeclarationStatement exp =
Expressions.declare(
0, compactOutputVar, new NewArrayExpression(fieldClass, 1,
Expressions.constant(outputFieldCount), null));
compactCode.add(exp);
} else {
compactOutputVar = null;
if (shouldGenerateCompactCode(outputFieldCount)) {
return joinSelectorCompact(joinType, physType, inputPhysTypes);
}

int outputField = 0;
// A parameter for each input.
final List<ParameterExpression> parameters = new ArrayList<>();

// Generate all fields.
final List<Expression> expressions = new ArrayList<>();
for (Ord<PhysType> ord : Ord.zip(inputPhysTypes)) {
final PhysType inputPhysType =
ord.e.makeNullable(joinType.generatesNullsOn(ord.i));
Expand All @@ -208,18 +188,6 @@ static Expression joinSelector(JoinRelType joinType, PhysType physType,
break;
}
final int fieldCount = inputPhysType.getRowType().getFieldCount();
if (generateCompactCode) {
// use an array copy if possible
final Expression copyExpr =
Nullness.castNonNull(
inputPhysType.getFormat().copy(parameter, Nullness.castNonNull(compactOutputVar),
outputField, fieldCount));
compactCode.add(Expressions.statement(copyExpr));
outputField += fieldCount;
continue;
}

// otherwise access the fields individually
for (int i = 0; i < fieldCount; i++) {
Expression expression =
inputPhysType.fieldReference(parameter, i,
Expand All @@ -234,37 +202,76 @@ static Expression joinSelector(JoinRelType joinType, PhysType physType,
expressions.add(expression);
}
}
return Expressions.lambda(
Function2.class,
physType.record(expressions),
parameters);
}

if (generateCompactCode) {
compactCode.add(Nullness.castNonNull(compactOutputVar));

// This expression generates code of the form:
// new org.apache.calcite.linq4j.function.Function2() {
// public String[] apply(org.apache.calcite.interpreter.Row left,
// org.apache.calcite.interpreter.Row right) {
// String[] outputArray = new String[left.length + right.length];
// System.arraycopy(left.copyValues(), 0, outputArray, 0, left.length);
// System.arraycopy(right.copyValues(), 0, outputArray, left.length, right.length);
// return outputArray;
// }
// public String[] apply(Object left, Object right) {
// return apply(
// (org.apache.calcite.interpreter.Row) left,
// (org.apache.calcite.interpreter.Row) right);
// }
// }
// That is, it converts the left and right Row objects to Object[] using Row#copyValues()
// then writes each to an output Object[] using System.arraycopy()

return Expressions.lambda(
Function2.class,
compactCode.toBlock(),
parameters);
static boolean shouldGenerateCompactCode(int outputFieldCount) {
int compactCodeThreshold = JOIN_SELECTOR_COMPACT_CODE_THRESHOLD.value();
return compactCodeThreshold >= 0 && outputFieldCount >= compactCodeThreshold;
}

static Expression joinSelectorCompact(JoinRelType joinType, PhysType physType,
List<PhysType> inputPhysTypes) {
// A parameter for each input.
final List<ParameterExpression> parameters = new ArrayList<>();

// Generate all fields.
final int outputFieldCount = physType.getRowType().getFieldCount();

final BlockBuilder compactCode = new BlockBuilder();
// Even if the fields are all of the same type, they are always boxed,
// so we use an Object[] that is easier to match with the input arrays.
final ParameterExpression compactOutputVar =
Expressions.variable(Object[].class, "outputArray");
final DeclarationStatement exp =
Expressions.declare(
0, compactOutputVar, new NewArrayExpression(Object.class, 1,
Expressions.constant(outputFieldCount), null));
compactCode.add(exp);

int outputField = 0;
for (Ord<PhysType> ord : Ord.zip(inputPhysTypes)) {
final PhysType inputPhysType =
ord.e.makeNullable(joinType.generatesNullsOn(ord.i));
// If the parameter is an array we declare as Object[] because it
// needs to match the type of the array that will be returned
final Type parameterType = Types.isArray(inputPhysType.getJavaRowType())
? Object[].class
: Primitive.box(inputPhysType.getJavaRowType());

final ParameterExpression parameter =
Expressions.parameter(parameterType, EnumUtils.LEFT_RIGHT.get(ord.i));
parameters.add(parameter);
if (outputField == outputFieldCount) {
// For instance, if semi-join needs to return just the left inputs
break;
}
final int fieldCount = inputPhysType.getRowType().getFieldCount();
// Delegate copying the row values to JavaRowFormat
final List<Statement> copyStatements =
Nullness.castNonNull(
inputPhysType.getFormat().copy(parameter, Nullness.castNonNull(compactOutputVar),
outputField, fieldCount));
if (joinType.generatesNullsOn(ord.i)) {
// [CALCITE-6593] NPE when outer joining tables with many fields and unmatching rows
compactCode.add(
Expressions.ifThen(Expressions.notEqual(parameter, Expressions.constant(null)),
Expressions.block(copyStatements)));
} else {
for (Statement copyStatement : copyStatements) {
compactCode.add(copyStatement);
}
}
outputField += fieldCount;
}

compactCode.add(Nullness.castNonNull(compactOutputVar));
return Expressions.lambda(
Function2.class,
physType.record(expressions),
compactCode.toBlock(),
parameters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.linq4j.tree.MemberExpression;
import org.apache.calcite.linq4j.tree.MethodCallExpression;
import org.apache.calcite.linq4j.tree.ParameterExpression;
import org.apache.calcite.linq4j.tree.Statement;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.runtime.FlatLists;
Expand All @@ -34,9 +35,11 @@
import org.checkerframework.checker.nullness.qual.Nullable;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;

import static org.apache.calcite.util.BuiltInMethod.ARRAY_COPY;
import static org.apache.calcite.util.BuiltInMethod.LIST_TO_ARRAY;
import static org.apache.calcite.util.BuiltInMethod.ROW_COPY_VALUES;

/**
Expand Down Expand Up @@ -80,6 +83,24 @@ public enum JavaRowFormat {
return Expressions.field(expression, Types.nthField(field, type));
}
}

@Override public List<Statement> copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length) {
// Parameter holds an expression representing a POJO Object
// Results in:
// outputArray[outputStartIndex] = parameter.field{1};
// ...
// outputArray[outputStartIndex + length - 1] = parameter.field{length - 1};
final List<Statement> statements = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
statements.add(
Expressions.statement(
Expressions.assign(
Expressions.arrayIndex(outputArray, Expressions.constant(outputStartIndex + i)),
field(parameter, i, null, Object.class))));
}
return statements;
}
},

SCALAR {
Expand Down Expand Up @@ -111,6 +132,19 @@ public enum JavaRowFormat {
assert field == 0;
return expression;
}

@Override public List<Statement> copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length) {
// Parameter holds an expression representing a scalar Object
// Results in:
// outputArray[outputStartIndex] = parameter;
assert length == 1;
return FlatLists.of(
Expressions.statement(
Expressions.assign(
Expressions.arrayIndex(outputArray,
Expressions.constant(outputStartIndex)), parameter)));
}
},

/** A list that is comparable and immutable. Useful for records with 0 fields
Expand Down Expand Up @@ -199,6 +233,18 @@ public enum JavaRowFormat {
}
return EnumUtils.convert(e, fromType, fieldType);
}

@Override public List<Statement> copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length) {
// Parameter holds an expression representing a List
// Results in:
// System.arraycopy(parameter.toArray(), 0, outputArray, outputStartIndex, length);
return FlatLists.of(
Expressions.statement(
Expressions.call(ARRAY_COPY.method, Expressions.call(parameter, LIST_TO_ARRAY.method),
Expressions.constant(0), outputArray, Expressions.constant(outputStartIndex),
Expressions.constant(length))));
}
},

/**
Expand Down Expand Up @@ -230,9 +276,17 @@ public enum JavaRowFormat {
return EnumUtils.convert(e, fromType, fieldType);
}

@Override public Expression fieldDynamic(Expression expression, Expression field) {
return Expressions.call(expression,
BuiltInMethod.ROW_VALUE.method, Expressions.constant(field));
@Override public List<Statement> copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length) {
// Parameter holds an expression representing a org.apache.calcite.interpreter.Row
// Results in:
// System.arraycopy(parameter.copyValues(), 0, outputArray, outputStartIndex, length);
return FlatLists.of(
Expressions.statement(
Expressions.call(ARRAY_COPY.method,
Expressions.call(Object[].class, parameter, ROW_COPY_VALUES.method),
Expressions.constant(0), outputArray, Expressions.constant(outputStartIndex),
Expressions.constant(length))));
}
},

Expand Down Expand Up @@ -266,21 +320,15 @@ public enum JavaRowFormat {
return EnumUtils.convert(e, fromType, fieldType);
}

@Override public Expression fieldDynamic(Expression expression, Expression field) {
return Expressions.arrayIndex(expression, field);
}

@Override public Expression setFieldDynamic(Expression expression, Expression field,
Expression value) {
final IndexExpression e =
Expressions.arrayIndex(expression, Expressions.constant(field));
return Expressions.assign(e, value);
}

@Override public Expression copy(ParameterExpression parameter,
@Override public List<Statement> copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length) {
return Expressions.call(ARRAY_COPY.method, parameter, Expressions.constant(0),
outputArray, Expressions.constant(outputStartIndex), Expressions.constant(length));
// Parameter holds an expression representing an Object[]
// Results in:
// System.arraycopy(parameter, 0, outputArray, outputStartIndex, length);
return FlatLists.of(
Expressions.statement(
Expressions.call(ARRAY_COPY.method, parameter, Expressions.constant(0),
outputArray, Expressions.constant(outputStartIndex), Expressions.constant(length))));
}
};

Expand Down Expand Up @@ -328,32 +376,9 @@ public abstract Expression record(
public abstract Expression field(Expression expression, int field,
@Nullable Type fromType, Type fieldType);

/**
* Similar to {@link #field(Expression, int, Type, Type)}, where the field index is determined
* dynamically at runtime.
*/
public Expression fieldDynamic(Expression expression, Expression field) {
throw new UnsupportedOperationException(this.toString());
}

public Expression setFieldDynamic(Expression expression, Expression field, Expression value) {
throw new UnsupportedOperationException(this.toString());
}

/**
* Returns an expression that copies the fields of a row of this type to the array.
*/
public @Nullable Expression copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length) {
// Note: parameter holds an expression representing a org.apache.calcite.interpreter.Row.

// Copy the Row as an Object[].
final Expression rowParameterAsArrayExpression =
Expressions.call(Object[].class, parameter, ROW_COPY_VALUES.method);

// Use System.arraycopy() with the contents of the Row as the source.
return Expressions.call(ARRAY_COPY.method, rowParameterAsArrayExpression,
Expressions.constant(0), outputArray, Expressions.constant(outputStartIndex),
Expressions.constant(length));
}
public abstract List<Statement> copy(ParameterExpression parameter,
ParameterExpression outputArray, int outputStartIndex, int length);
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,23 @@ public final class CalciteSystemProperty<T> {
public static final CalciteSystemProperty<Integer> FUNCTION_LEVEL_CACHE_MAX_SIZE =
intProperty("calcite.function.cache.maxSize", 1_000, v -> v >= 0);

/**
* Minimum numbers of fields in a Join result that will trigger the "compact code generation".
* This feature reduces the risk of running into a compilation error due to the code of a
* dynamically generated method growing beyond the 64KB limit.
*
* <p>Note that the compact code makes use of arraycopy operations when possible,
* instead of using a static array initialization. For joins with a large number of fields
* the resulting code should be faster, but it can be slower for joins with a very small number
* of fields.
*
* <p>The default value is 100, a negative value disables completely the "compact code" feature.
*
* @see org.apache.calcite.adapter.enumerable.EnumUtils
*/
public static final CalciteSystemProperty<Integer> JOIN_SELECTOR_COMPACT_CODE_THRESHOLD =
intProperty("calcite.join.selector.compact.code.threshold", 100);

private static CalciteSystemProperty<Boolean> booleanProperty(String key,
boolean defaultValue) {
// Note that "" -> true (convenient for command-lines flags like '-Dflag')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ public enum BuiltInMethod {
COLLECTION_RETAIN_ALL(Collection.class, "retainAll", Collection.class),
LIST_CONTAINS(List.class, "contains", Object.class),
LIST_GET(List.class, "get", int.class),
LIST_TO_ARRAY(List.class, "toArray"),
ITERATOR_HAS_NEXT(Iterator.class, "hasNext"),
ITERATOR_NEXT(Iterator.class, "next"),
MATH_MAX(Math.class, "max", int.class, int.class),
Expand Down
Loading

0 comments on commit a8802c7

Please sign in to comment.