diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java index 95d1887d0..8a4843971 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java @@ -35,11 +35,13 @@ import cn.edu.tsinghua.iginx.engine.shared.function.MappingFunction; import cn.edu.tsinghua.iginx.engine.shared.function.RowMappingFunction; import cn.edu.tsinghua.iginx.engine.shared.function.SetMappingFunction; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Bitmap; import cn.edu.tsinghua.iginx.utils.Pair; import cn.edu.tsinghua.iginx.utils.StringUtils; @@ -575,7 +577,8 @@ private RowStream executeNestedLoopInnerJoin(InnerJoin innerJoin, Table tableA, flag: for (Row rowB : rowsB) { for (String joinColumn : joinColumns){ - if (!Objects.equals(rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn), + rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) { continue flag; } } @@ -630,19 +633,33 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table } } + boolean needTypeCast = false; List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); + if (!rowsA.isEmpty() && !rowsB.isEmpty()) { + Value valueA = rowsA.get(0).getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value valueB = rowsB.get(0).getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB); + if (valueA.getDataType() != valueB.getDataType()) { + if (ValueUtils.isNumericType(valueA) && ValueUtils.isNumericType(valueB)) { + needTypeCast = true; + } + } + } + HashMap> rowsBHashMap = new HashMap<>(); for (Row rowB: rowsB) { - Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List l = rowsBHashMap.containsKey(hash) ? rowsBHashMap.get(hash) : new ArrayList<>(); l.add(rowB); @@ -653,15 +670,18 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table if (filter != null) { // Join condition: on newHeader = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB()); for (Row rowA : rowsA) { - Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } if (rowsBHashMap.containsKey(hash)) { @@ -678,16 +698,20 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table newHeader = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB(), Collections.singletonList(joinColumnB), true).getV(); int index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB); for (Row rowA : rowsA) { - Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } + if (rowsBHashMap.containsKey(hash)) { List hashRowsB = rowsBHashMap.get(hash); for (Row rowB : hashRowsB) { @@ -702,10 +726,6 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, Table tableB) throws PhysicalException { Filter filter = innerJoin.getFilter(); - - List fieldsA = new ArrayList<>(tableA.getHeader().getFields()); - List fieldsB = new ArrayList<>(tableB.getHeader().getFields()); - List joinColumns = new ArrayList<>(innerJoin.getJoinColumns()); if (innerJoin.isNaturalJoin()) { RowUtils.fillNaturalJoinColumns(joinColumns, tableA.getHeader(), tableB.getHeader(), innerJoin.getPrefixA(), innerJoin.getPrefixB()); @@ -718,17 +738,22 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); + List joinColumnsA = new ArrayList<>(joinColumns); + List joinColumnsB = new ArrayList<>(joinColumns); if (filter != null) { - joinColumns = new ArrayList<>(); + joinColumnsA = new ArrayList<>(); + joinColumnsB = new ArrayList<>(); List> pairs = FilterUtils.getJoinColumnsFromFilter(filter); if (pairs.isEmpty()) { throw new InvalidOperatorParameterException("on condition in join operator has no join columns."); } for(Pair p : pairs) { if (headerA.indexOf(p.k) != -1 && headerB.indexOf(p.v) != -1) { - joinColumns.add(p.k.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.k.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.v.replaceFirst(innerJoin.getPrefixB() + '.', "")); } else if (headerA.indexOf(p.v) != -1 && headerB.indexOf(p.k) != -1) { - joinColumns.add(p.v.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.v.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.k.replaceFirst(innerJoin.getPrefixB() + '.', "")); } else { throw new InvalidOperatorParameterException("invalid join path filter input."); } @@ -736,8 +761,8 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, } boolean isAscendingSorted; - int flagA = RowUtils.checkRowsSortedByColumns(rowsA, innerJoin.getPrefixA(), joinColumns); - int flagB = RowUtils.checkRowsSortedByColumns(rowsB, innerJoin.getPrefixB(), joinColumns); + int flagA = RowUtils.checkRowsSortedByColumns(rowsA, innerJoin.getPrefixA(), joinColumnsA); + int flagB = RowUtils.checkRowsSortedByColumns(rowsB, innerJoin.getPrefixB(), joinColumnsB); if (flagA == -1 || flagB == -1) { throw new InvalidOperatorParameterException("input rows in merge join haven't be sorted."); } else if (flagA + flagB == 3) { @@ -759,22 +784,16 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, Header newHeader; List transformedRows = new ArrayList<>(); if (filter != null) { - fieldsA.addAll(fieldsB); - newHeader = new Header(fieldsA); + newHeader = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB()); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin = new Object[valuesA.length + valuesB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - System.arraycopy(valuesB, 0, valuesJoin, valuesA.length, valuesB.length); - Row transformedRow = new Row(newHeader, valuesJoin); - if (FilterUtils.validate(filter, transformedRow)) { - transformedRows.add(transformedRow); + Row joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB)); + if (FilterUtils.validate(filter, joinedRow)) { + transformedRows.add(joinedRow); } if (indexA + 1 == rowsA.size()) { @@ -788,8 +807,8 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, indexA++; indexB = startIndexOfContinuousEqualValuesB; } else { - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumns); + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumnsB, joinColumnsB); if (flagBEqualNextB == 0) { indexB++; } else { @@ -811,52 +830,42 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, } } } else { // Join condition: natural or using - int[] indexOfJoinColumnInTableB = new int[joinColumns.size()]; - int i = 0; - flag1: - for (Field fieldB : fieldsB) { - for (String joinColumn : joinColumns) { - if (Objects.equals(fieldB.getName(), innerJoin.getPrefixB() + '.' + joinColumn)) { - indexOfJoinColumnInTableB[i++] = headerB.indexOf(fieldB); - continue flag1; - } - } - fieldsA.add(fieldB); - } - newHeader = new Header(fieldsA); + Pair pair = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumns, true); + int[] indexOfJoinColumnInTableB = pair.getK(); + newHeader = pair.getV(); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin = new Object[valuesA.length + valuesB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - int k = valuesA.length; - flag3: - for (int j = 0; j < valuesB.length; j++) { - for (int index : indexOfJoinColumnInTableB) { - if (j == index) { - continue flag3; - } + Row joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB), indexOfJoinColumnInTableB, true); + transformedRows.add(joinedRow); + + if (indexA + 1 == rowsA.size()) { + if (indexB + 1 == rowsB.size()) { + break; + } else { + indexB++; } - valuesJoin[k++] = valuesB[j]; - } - transformedRows.add(new Row(newHeader, valuesJoin)); - - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumns); - if (flagBEqualNextB == 0) { - indexB++; } else { - indexA++; - if (flagAEqualNextA != 0) { - indexB++; - startIndexOfContinuousEqualValuesB = indexB; - } else { + if (indexB + 1 == rowsB.size()) { + indexA++; indexB = startIndexOfContinuousEqualValuesB; + } else { + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumnsB, joinColumnsB); + if (flagBEqualNextB == 0) { + indexB++; + } else { + indexA++; + if (flagAEqualNextA != 0) { + indexB++; + startIndexOfContinuousEqualValuesB = indexB; + } else { + indexB = startIndexOfContinuousEqualValuesB; + } + } } } } else if (flagAEqualB == -1) { @@ -946,7 +955,8 @@ private RowStream executeNestedLoopOuterJoin(OuterJoin outerJoin, Table tableA, for (int indexB = 0; indexB < rowsB.size(); indexB++) { Row rowB = rowsB.get(indexB); for (String joinColumn : joinColumns){ - if (!Objects.equals(rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn), + rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) { continue flag; } } @@ -1039,9 +1049,19 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table throw new InvalidOperatorParameterException("invalid hash join column input."); } } - + + boolean needTypeCast = false; List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); + if (!rowsA.isEmpty() && !rowsB.isEmpty()) { + Value valueA = rowsA.get(0).getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value valueB = rowsB.get(0).getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB); + if (valueA.getDataType() != valueB.getDataType()) { + if (ValueUtils.isNumericType(valueA) && ValueUtils.isNumericType(valueB)) { + needTypeCast = true; + } + } + } Bitmap bitmapA = new Bitmap(rowsA.size()); Bitmap bitmapB = new Bitmap(rowsB.size()); @@ -1049,15 +1069,18 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table HashMap> rowsBHashMap = new HashMap<>(); HashMap> indexOfRowBHashMap = new HashMap<>(); for (int indexB = 0; indexB < rowsB.size(); indexB++) { - Object value = rowsB.get(indexB).getValue(outerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowsB.get(indexB).getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List l = rowsBHashMap.containsKey(hash) ? rowsBHashMap.get(hash) : new ArrayList<>(); List il = rowsBHashMap.containsKey(hash) ? indexOfRowBHashMap.get(hash) : new ArrayList<>(); @@ -1073,16 +1096,20 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table newHeader = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB()); for (int indexA = 0; indexA < rowsA.size(); indexA++) { Row rowA = rowsA.get(indexA); - Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } + if (rowsBHashMap.containsKey(hash)) { List hashRowsB = rowsBHashMap.get(hash); List hashIndexB = indexOfRowBHashMap.get(hash); @@ -1113,16 +1140,20 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table for (int indexA = 0; indexA < rowsA.size(); indexA++) { Row rowA = rowsA.get(indexA); - Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } + if (rowsBHashMap.containsKey(hash)) { List hashRowsB = rowsBHashMap.get(hash); List hashIndexB = indexOfRowBHashMap.get(hash); @@ -1205,18 +1236,22 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); - + List joinColumnsA = new ArrayList<>(joinColumns); + List joinColumnsB = new ArrayList<>(joinColumns); if (filter != null) { - joinColumns = new ArrayList<>(); + joinColumnsA = new ArrayList<>(); + joinColumnsB = new ArrayList<>(); List> pairs = FilterUtils.getJoinColumnsFromFilter(filter); if (pairs.isEmpty()) { throw new InvalidOperatorParameterException("on condition in join operator has no join columns."); } for(Pair p : pairs) { if (headerA.indexOf(p.k) != -1 && headerB.indexOf(p.v) != -1) { - joinColumns.add(p.k.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.k.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.v.replaceFirst(outerJoin.getPrefixB() + '.', "")); } else if (headerA.indexOf(p.v) != -1 && headerB.indexOf(p.k) != -1) { - joinColumns.add(p.v.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.v.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.k.replaceFirst(outerJoin.getPrefixB() + '.', "")); } else { throw new InvalidOperatorParameterException("invalid join path filter input."); } @@ -1224,8 +1259,8 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, } boolean isAscendingSorted; - int flagA = RowUtils.checkRowsSortedByColumns(rowsA, outerJoin.getPrefixA(), joinColumns); - int flagB = RowUtils.checkRowsSortedByColumns(rowsB, outerJoin.getPrefixB(), joinColumns); + int flagA = RowUtils.checkRowsSortedByColumns(rowsA, outerJoin.getPrefixA(), joinColumnsA); + int flagB = RowUtils.checkRowsSortedByColumns(rowsB, outerJoin.getPrefixB(), joinColumnsB); if (flagA == -1 || flagB == -1) { throw new InvalidOperatorParameterException("input rows in merge join haven't be sorted."); } else if (flagA + flagB == 3) { @@ -1250,28 +1285,22 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, Header newHeader; List transformedRows = new ArrayList<>(); if (filter != null) { - fieldsA.addAll(fieldsB); - newHeader = new Header(fieldsA); + newHeader = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB()); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin = new Object[valuesA.length + valuesB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - System.arraycopy(valuesB, 0, valuesJoin, valuesA.length, valuesB.length); - Row transformedRow = new Row(newHeader, valuesJoin); - if (FilterUtils.validate(filter, transformedRow)) { + Row joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB)); + if (FilterUtils.validate(filter, joinedRow)) { if (!bitmapA.get(indexA)) { bitmapA.mark(indexA); } if (!bitmapB.get(indexB)) { bitmapB.mark(indexB); } - transformedRows.add(transformedRow); + transformedRows.add(joinedRow); } if (indexA + 1 == rowsA.size()) { @@ -1285,8 +1314,8 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, indexA++; indexB = startIndexOfContinuousEqualValuesB; } else { - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumnsB, joinColumnsB); if (flagBEqualNextB == 0) { indexB++; } else { @@ -1308,72 +1337,25 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, } } } else { // Join condition: natural or using - List newFields = new ArrayList<>(); - int[] indexOfJoinColumnInTableB = new int[joinColumns.size()]; - int[] indexOfJoinColumnInTableA = new int[joinColumns.size()]; - int i = 0; - int j = 0; + Pair pair; if (outerType == OuterJoinType.RIGHT) { - flag1: - for (Field fieldA : fieldsA) { - for (String joinColumn : joinColumns) { - if (Objects.equals(fieldA.getName(), outerJoin.getPrefixA() + '.' + joinColumn)) { - indexOfJoinColumnInTableA[j++] = headerA.indexOf(fieldA); - continue flag1; - } - } - newFields.add(fieldA); - } - newFields.addAll(fieldsB); + pair = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns, false); } else { - newFields.addAll(fieldsA); - flag2: - for (Field fieldB : fieldsB) { - for (String joinColumn : joinColumns) { - if (Objects.equals(fieldB.getName(), outerJoin.getPrefixB() + '.' + joinColumn)) { - indexOfJoinColumnInTableB[i++] = headerB.indexOf(fieldB); - continue flag2; - } - } - newFields.add(fieldB); - } + pair = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns, true); } - newHeader = new Header(newFields); + int[] indexOfJoinColumnInTable = pair.getK(); + newHeader = pair.getV(); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin; + Row joinedRow; if (outerType == OuterJoinType.RIGHT) { - valuesJoin = new Object[valuesA.length + valuesB.length - indexOfJoinColumnInTableA.length]; - System.arraycopy(valuesB, 0, valuesJoin, valuesA.length - indexOfJoinColumnInTableA.length, valuesB.length); - int k = 0; - flag3: - for (int m = 0; m < valuesA.length; m++) { - for (int index : indexOfJoinColumnInTableA) { - if (m == index) { - continue flag3; - } - } - valuesJoin[k++] = valuesA[m]; - } + joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB), indexOfJoinColumnInTable, false); } else { - valuesJoin = new Object[valuesA.length + valuesB.length - indexOfJoinColumnInTableB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - int k = valuesA.length; - flag4: - for (int m = 0; m < valuesB.length; m++) { - for (int index : indexOfJoinColumnInTableB) { - if (m == index) { - continue flag4; - } - } - valuesJoin[k++] = valuesB[m]; - } + joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB), indexOfJoinColumnInTable, true); } if (!bitmapA.get(indexA)) { @@ -1382,10 +1364,10 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, if (!bitmapB.get(indexB)) { bitmapB.mark(indexB); } - transformedRows.add(new Row(newHeader, valuesJoin)); + transformedRows.add(joinedRow); - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumnsB, joinColumnsB); if (flagBEqualNextB == 0) { indexB++; } else { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java index 506a435cf..6a742d5aa 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java @@ -4,7 +4,7 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; -import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; @@ -12,7 +12,9 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -38,6 +40,8 @@ public class HashInnerJoinLazyStream extends BinaryLazyStream { private String joinColumnA; private String joinColumnB; + + private boolean needTypeCast = false; public HashInnerJoinLazyStream(InnerJoin innerJoin, RowStream streamA, RowStream streamB) { super(streamA, streamB); @@ -89,18 +93,28 @@ private void initialize() throws PhysicalException { } } this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB); + + int indexA = headerA.indexOf(innerJoin.getPrefixA() + '.' + joinColumnA); + DataType dataTypeA = headerA.getField(indexA).getType(); + DataType dataTypeB = headerB.getField(index).getType(); + if (ValueUtils.isNumericType(dataTypeA) && ValueUtils.isNumericType(dataTypeB)) { + this.needTypeCast = true; + } while (streamB.hasNext()) { Row rowB = streamB.next(); - Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List rows = streamBHashMap.getOrDefault(hash, new ArrayList<>()); rows.add(rowB); @@ -139,16 +153,18 @@ public boolean hasNext() throws PhysicalException { private void tryMatch() throws PhysicalException { Row rowA = streamA.next(); - Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { return; } - + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } if (streamBHashMap.containsKey(hash)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java index cea28213e..6f999343f 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java @@ -4,14 +4,17 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.Arrays; @@ -48,6 +51,8 @@ public class HashOuterJoinLazyStream extends BinaryLazyStream { private String joinColumnA; private String joinColumnB; + + private boolean needTypeCast = false; public HashOuterJoinLazyStream(OuterJoin outerJoin, RowStream streamA, RowStream streamB) { super(streamA, streamB); @@ -103,24 +108,35 @@ private void initialize() throws PhysicalException { } } + int indexAnother; if (outerJoinType == OuterJoinType.RIGHT) { this.index = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA); + indexAnother = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB); } else { this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB); + indexAnother = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA); } + DataType dataType1 = headerA.getField(indexAnother).getType(); + DataType dataType2 = headerB.getField(index).getType(); + if (ValueUtils.isNumericType(dataType1) && ValueUtils.isNumericType(dataType2)) { + this.needTypeCast = true; + } while (streamB.hasNext()) { Row rowB = streamB.next(); - Object value = rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List rows = streamBHashMap.getOrDefault(hash, new ArrayList<>()); rows.add(rowB); @@ -203,16 +219,18 @@ public boolean hasNext() throws PhysicalException { private void tryMatch() throws PhysicalException { Row rowA = streamA.next(); - Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { return; } - + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } if (streamBHashMap.containsKey(hash)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java index 79458a679..00103739a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java @@ -4,16 +4,15 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; -import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.List; -import java.util.Objects; public class NestedLoopInnerJoinLazyStream extends BinaryLazyStream { @@ -131,7 +130,8 @@ private Row tryMatch() throws PhysicalException { } } else { // Join condition: natural or using for (String joinColumn : joinColumns) { - if (!Objects.equals(nextA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(nextA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn), + nextB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) { nextB = null; return null; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java index 70b526406..c393daea9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java @@ -7,6 +7,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; @@ -14,7 +15,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; public class NestedLoopOuterJoinLazyStream extends BinaryLazyStream { @@ -205,7 +205,8 @@ private Row tryMatch() throws PhysicalException { } } else { // Join condition: natural or using for (String joinColumn : joinColumns) { - if (!Objects.equals(nextA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(nextA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn), + nextB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) { nextB = null; return null; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java index 274b6f67c..7a52b8ae9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java @@ -4,22 +4,21 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; -import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; -import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.LinkedList; import java.util.List; -import java.util.Objects; /** * input two stream must be ascending order. @@ -36,15 +35,13 @@ public class SortedMergeInnerJoinLazyStream extends BinaryLazyStream { private String joinColumnB; - private DataType joinColumnDataType; - private Row nextA; private Row nextB; private int index; - private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join + private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join private final List sameValueStreamBRows; // StreamB中join列的值相同的列缓存 @@ -101,13 +98,6 @@ private void initialize() throws PhysicalException { } this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB); - DataType dataTypeA = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType(); - DataType dataTypeB = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType(); - if (!dataTypeA.equals(dataTypeB)) { - throw new InvalidOperatorParameterException("the datatype of join columns is different"); - } - joinColumnDataType = dataTypeA; - if (filter != null) { // Join condition: on this.header = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB()); } else { // Join condition: natural or using @@ -135,8 +125,8 @@ public boolean hasNext() throws PhysicalException { } private void tryMatch() throws PhysicalException { - Object curJoinColumnAValue = nextA.getValue(innerJoin.getPrefixA() + "." + joinColumnA); - int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue); + Value curJoinColumnAValue = nextA.getAsValue(innerJoin.getPrefixA() + "." + joinColumnA); + int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue); if (cmp < 0) { nextA = null; } else if (cmp > 0) { @@ -169,13 +159,13 @@ private boolean hasMoreRows() throws PhysicalException { } while (sameValueStreamBRows.isEmpty() && nextB != null) { sameValueStreamBRows.add(nextB); - curJoinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB); + curJoinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB); nextB = null; while (streamB.hasNext()) { nextB = streamB.next(); - Object joinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB); - if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) { + Value joinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB); + if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) { sameValueStreamBRows.add(nextB); nextB = null; } else { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java index 81641d01d..c2485fa03 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java @@ -4,22 +4,22 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; -import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.LinkedList; import java.util.List; -import java.util.Objects; public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream { @@ -35,13 +35,11 @@ public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream { private String joinColumnB; - private DataType joinColumnDataType; - private Row nextA; private Row nextB; - private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join + private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join private final List sameValueStreamBRows; // StreamB中join列的值相同的列缓存 @@ -113,13 +111,6 @@ private void initialize() throws PhysicalException { this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB); } - DataType dataTypeA = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType(); - DataType dataTypeB = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType(); - if (!dataTypeA.equals(dataTypeB)) { - throw new InvalidOperatorParameterException("the datatype of join columns is different"); - } - joinColumnDataType = dataTypeA; - if (filter != null) { // Join condition: on this.header = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB()); } else { // Join condition: natural or using @@ -205,8 +196,8 @@ public boolean hasNext() throws PhysicalException { } private void tryMatch() throws PhysicalException { - Object curJoinColumnAValue = nextA.getValue(outerJoin.getPrefixA() + "." + joinColumnA); - int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue); + Value curJoinColumnAValue = nextA.getAsValue(outerJoin.getPrefixA() + "." + joinColumnA); + int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue); if (cmp < 0) { unmatchedStreamARows.add(nextA); nextA = null; @@ -255,13 +246,13 @@ private boolean hasMoreRows() throws PhysicalException { } while (sameValueStreamBRows.isEmpty() && nextB != null) { sameValueStreamBRows.add(nextB); - curJoinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB); + curJoinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB); nextB = null; while (streamB.hasNext()) { nextB = streamB.next(); - Object joinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB); - if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) { + Value joinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB); + if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) { sameValueStreamBRows.add(nextB); nextB = null; } else { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java index cc675c931..8ebe844a4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils; +import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; @@ -29,7 +30,7 @@ public class FilterUtils { - public static boolean validate(Filter filter, Row row) { + public static boolean validate(Filter filter, Row row) throws PhysicalException { switch (filter.getType()) { case Or: OrFilter orFilter = (OrFilter) filter; @@ -89,7 +90,7 @@ private static boolean validateTimeFilter(KeyFilter keyFilter, Row row) { return false; } - private static boolean validateValueFilter(ValueFilter valueFilter, Row row) { + private static boolean validateValueFilter(ValueFilter valueFilter, Row row) throws PhysicalException { String path = valueFilter.getPath(); Value targetValue = valueFilter.getValue(); if (targetValue.isNull()) { // targetValue是空值,则认为不可比较 @@ -116,7 +117,7 @@ private static boolean validateValueFilter(ValueFilter valueFilter, Row row) { } } - private static boolean validatePathFilter(PathFilter pathFilter, Row row) { + private static boolean validatePathFilter(PathFilter pathFilter, Row row) throws PhysicalException { Value valueA = row.getAsValue(pathFilter.getPathA()); Value valueB = row.getAsValue(pathFilter.getPathB()); if (valueA == null || valueA.isNull() || valueB == null || valueB.isNull()) { // 如果任何一个是空值,则认为不可比较 @@ -125,7 +126,7 @@ private static boolean validatePathFilter(PathFilter pathFilter, Row row) { return validateValueCompare(pathFilter.getOp(), valueA, valueB); } - private static boolean validateValueCompare(Op op, Value valueA, Value valueB) { + private static boolean validateValueCompare(Op op, Value valueA, Value valueB) throws PhysicalException { if (valueA.getDataType() != valueB.getDataType()) { if (ValueUtils.isNumericType(valueA) && ValueUtils.isNumericType(valueB)) { valueA = ValueUtils.transformToDouble(valueA); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java index 7beb01a1d..d4df978b8 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java @@ -24,6 +24,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; @@ -54,12 +55,12 @@ public static Row transform(Row row, Header targetHeader) { * 2: descending sorted */ public static int checkRowsSortedByColumns(List rows, String prefix, - List columns) { + List columns) throws PhysicalException { int res = 0; int index = 0; while (index < rows.size() - 1) { int mark = compareRowsSortedByColumns(rows.get(index), rows.get(index + 1), prefix, - prefix, columns); + prefix, columns, columns); if (mark == -1) { if (res == 0) { res = 1; @@ -84,10 +85,12 @@ public static int checkRowsSortedByColumns(List rows, String prefix, * 1: row1 > row2 */ public static int compareRowsSortedByColumns(Row row1, Row row2, String prefix1, String prefix2, - List columns) { - for (String column : columns) { - Object value1 = row1.getValue(prefix1 + '.' + column); - Object value2 = row2.getValue(prefix2 + '.' + column); + List columns1, List columns2) throws PhysicalException { + assert columns1.size() == columns2.size(); + int size = columns1.size(); + for (int index = 0; index < size; index++) { + Object value1 = row1.getValue(prefix1 + '.' + columns1.get(index)); + Object value2 = row2.getValue(prefix2 + '.' + columns2.get(index)); if (value1 == null && value2 == null) { return 0; } else if (value1 == null) { @@ -95,9 +98,11 @@ public static int compareRowsSortedByColumns(Row row1, Row row2, String prefix1, } else if (value2 == null) { return 1; } - DataType dataType = row1.getField(row1.getHeader().indexOf(prefix1 + '.' + column)) + DataType dataType1 = row1.getField(row1.getHeader().indexOf(prefix1 + '.' + columns1.get(index))) .getType(); - int cmp = compareObjects(dataType, value1, value2); + DataType dataType2 = row2.getField(row2.getHeader().indexOf(prefix2 + '.' + columns2.get(index))) + .getType(); + int cmp = ValueUtils.compare(value1, value2, dataType1, dataType2); if (cmp != 0) { return cmp; } @@ -105,37 +110,6 @@ public static int compareRowsSortedByColumns(Row row1, Row row2, String prefix1, return 0; } - public static int compareObjects(DataType dataType, Object value1, Object value2) { - switch (dataType) { - case BOOLEAN: - boolean boolean1 = (boolean) value1; - boolean boolean2 = (boolean) value2; - return Boolean.compare(boolean1, boolean2); - case INTEGER: - int int1 = (int) value1; - int int2 = (int) value2; - return Integer.compare(int1, int2); - case LONG: - long long1 = (long) value1; - long long2 = (long) value2; - return Long.compare(long1, long2); - case FLOAT: - float float1 = (float) value1; - float float2 = (float) value2; - return Float.compare(float1, float2); - case DOUBLE: - double double1 = (double) value1; - double double2 = (double) value2; - return Double.compare(double1, double2); - case BINARY: - String string1 = (String) value1; - String string2 = (String) value2; - return string1.compareTo(string2); - default: - throw new IllegalArgumentException("unknown datatype: " + dataType); - } - } - public static Header constructNewHead(Header headerA, Header headerB, String prefixA, String prefixB) { List fields = new ArrayList<>(); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java index a5d0f29d5..6edc76e20 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java @@ -18,6 +18,8 @@ */ package cn.edu.tsinghua.iginx.engine.shared.function.system.utils; +import cn.edu.tsinghua.iginx.engine.physical.exception.InvalidOperatorParameterException; +import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.thrift.DataType; @@ -35,6 +37,10 @@ public class ValueUtils { public static boolean isNumericType(Value value) { return numericTypeSet.contains(value.getDataType()); } + + public static boolean isNumericType(DataType dataType) { + return numericTypeSet.contains(dataType); + } public static Value transformToDouble(Value value) { DataType dataType = value.getDataType(); @@ -64,21 +70,30 @@ public static Value transformToDouble(Value value) { return new Value(DataType.DOUBLE, dVal); } - public static int compare(Value o1, Value o2) { - DataType dataType = o1.getDataType(); - switch (dataType) { + public static int compare(Value v1, Value v2) throws PhysicalException { + DataType dataType1 = v1.getDataType(); + DataType dataType2 = v2.getDataType(); + if (dataType1 != dataType2) { + if (numericTypeSet.contains(dataType1) && numericTypeSet.contains(dataType2)) { + v1 = transformToDouble(v1); + v2 = transformToDouble(v2); + } else { + throw new InvalidOperatorParameterException(dataType1.toString() + " and " + dataType2.toString() + " can't be compared"); + } + } + switch (dataType1) { case INTEGER: - return Integer.compare(o1.getIntV(), o2.getIntV()); + return Integer.compare(v1.getIntV(), v2.getIntV()); case LONG: - return Long.compare(o1.getLongV(), o2.getLongV()); + return Long.compare(v1.getLongV(), v2.getLongV()); case BOOLEAN: - return Boolean.compare(o1.getBoolV(), o2.getBoolV()); + return Boolean.compare(v1.getBoolV(), v2.getBoolV()); case FLOAT: - return Float.compare(o1.getFloatV(), o2.getFloatV()); + return Float.compare(v1.getFloatV(), v2.getFloatV()); case DOUBLE: - return Double.compare(o1.getDoubleV(), o2.getDoubleV()); + return Double.compare(v1.getDoubleV(), v2.getDoubleV()); case BINARY: - return o1.getBinaryVAsString().compareTo(o2.getBinaryVAsString()); + return v1.getBinaryVAsString().compareTo(v2.getBinaryVAsString()); } return 0; } @@ -112,6 +127,20 @@ public static int compare(Object o1, Object o2, DataType dataType) { } return 0; } + + public static int compare(Object o1, Object o2, DataType dataType1, DataType dataType2) throws PhysicalException { + if (dataType1 != dataType2) { + if (numericTypeSet.contains(dataType1) && numericTypeSet.contains(dataType2)) { + Value v1 = ValueUtils.transformToDouble(new Value(dataType1, o1)); + Value v2 = ValueUtils.transformToDouble(new Value(dataType2, o2)); + return compare(v1, v2); + } else { + throw new InvalidOperatorParameterException(dataType1.toString() + " and " + dataType2.toString() + " can't be compared"); + } + } else { + return compare(o1, o2, dataType1); + } + } public static String toString(Object value, DataType dataType) { switch (dataType) { diff --git a/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java b/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java index 316205988..b879ffa51 100644 --- a/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java +++ b/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java @@ -370,6 +370,8 @@ public void testInnerJoin() throws PhysicalException { RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); assertStreamEqual(stream, usingTarget); } + + } @Test @@ -929,7 +931,188 @@ public void testNaturalJoin() throws PhysicalException { assertStreamEqual(stream, rightTarget); } } - + + @Test + public void testJoinWithTypeCast() throws PhysicalException { + Table tableA = generateTableFromValues( + true, + Arrays.asList( + new Field("a.a", DataType.INTEGER), + new Field("a.b", DataType.DOUBLE), + new Field("a.c", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(2, 2.0, true), + Arrays.asList(3, 3.0, false), + Arrays.asList(4, 4.0, true), + Arrays.asList(5, 5.0, false), + Arrays.asList(6, 6.0, true) + )); + + Table tableB = generateTableFromValues( + true, + Arrays.asList( + new Field("b.b", DataType.INTEGER), + new Field("b.d", DataType.DOUBLE), + new Field("b.e", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(1, 1.0, true), + Arrays.asList(3, 3.0, false), + Arrays.asList(5, 5.0, true), + Arrays.asList(7, 7.0, false), + Arrays.asList(9, 9.0, true) + )); + + Table targetInner = generateTableFromValues( + false, + Arrays.asList( + new Field("a.key", DataType.LONG), + new Field("a.a", DataType.INTEGER), + new Field("a.b", DataType.DOUBLE), + new Field("a.c", DataType.BOOLEAN), + new Field("b.key", DataType.LONG), + new Field("b.b", DataType.INTEGER), + new Field("b.d", DataType.DOUBLE), + new Field("b.e", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(2L, 3, 3.0, false, 2L, 3, 3.0, false), + Arrays.asList(4L, 5, 5.0, false, 3L, 5, 5.0, true) + )); + + Table usingTargetInner = generateTableFromValues( + false, + Arrays.asList( + new Field("a.key", DataType.LONG), + new Field("a.a", DataType.INTEGER), + new Field("a.b", DataType.DOUBLE), + new Field("a.c", DataType.BOOLEAN), + new Field("b.key", DataType.LONG), + new Field("b.d", DataType.DOUBLE), + new Field("b.e", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(2L, 3, 3.0, false, 2L, 3.0, false), + Arrays.asList(4L, 5, 5.0, false, 3L, 5.0, true) + )); + + { + // NestedLoopJoin + tableA.reset(); + tableB.reset(); + targetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + new PathFilter("a.b", Op.E, "b.b"), + Collections.emptyList(), + false, + JoinAlgType.NestedLoopJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, targetInner); + } + + { + // HashJoin + tableA.reset(); + tableB.reset(); + targetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + new PathFilter("a.b", Op.E, "b.b"), + Collections.emptyList(), + false, + JoinAlgType.HashJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, targetInner); + } + + { + // SortedMergeJoin + tableA.reset(); + tableB.reset(); + targetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + new PathFilter("a.b", Op.E, "b.b"), + Collections.emptyList(), + false, + JoinAlgType.SortedMergeJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, targetInner); + } + + { + // NestedLoopJoin + tableA.reset(); + tableB.reset(); + usingTargetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + null, + Collections.singletonList("b"), + false, + JoinAlgType.NestedLoopJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, usingTargetInner); + } + + { + // HashJoin + tableA.reset(); + tableB.reset(); + usingTargetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + null, + Collections.singletonList("b"), + false, + JoinAlgType.HashJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, usingTargetInner); + } + + { + // SortedMergeJoin + tableA.reset(); + tableB.reset(); + usingTargetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + null, + Collections.singletonList("b"), + false, + JoinAlgType.SortedMergeJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, usingTargetInner); + } + + } + // for debug private Table transformToTable(RowStream stream) throws PhysicalException { if (stream instanceof Table) { diff --git a/tools-viewPartitions/requirements.txt b/tools-viewPartitions/requirements.txt index f9f66eb4e..5b7834ade 100644 --- a/tools-viewPartitions/requirements.txt +++ b/tools-viewPartitions/requirements.txt @@ -2,7 +2,7 @@ distinctipy==1.2.2 iginx==0.5.1 kazoo==2.9.0 matplotlib==3.3.4 -numpy==1.19.5 +numpy==1.22.0 Pillow==9.3.0 pyparsing==3.0.9 thrift==0.16.0