From 016843a408c13fd089e5cfa33356aa78207ffe34 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 12 Jan 2023 10:22:25 +0800 Subject: [PATCH 1/4] Bump numpy from 1.19.5 to 1.22.0 in /tools-viewPartitions (#493) Bumps [numpy](https://github.com/numpy/numpy) from 1.19.5 to 1.22.0. - [Release notes](https://github.com/numpy/numpy/releases) - [Changelog](https://github.com/numpy/numpy/blob/main/doc/RELEASE_WALKTHROUGH.rst) - [Commits](https://github.com/numpy/numpy/compare/v1.19.5...v1.22.0) --- updated-dependencies: - dependency-name: numpy dependency-type: direct:production ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: zhuyuqing --- tools-viewPartitions/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools-viewPartitions/requirements.txt b/tools-viewPartitions/requirements.txt index f9f66eb4e..5b7834ade 100644 --- a/tools-viewPartitions/requirements.txt +++ b/tools-viewPartitions/requirements.txt @@ -2,7 +2,7 @@ distinctipy==1.2.2 iginx==0.5.1 kazoo==2.9.0 matplotlib==3.3.4 -numpy==1.19.5 +numpy==1.22.0 Pillow==9.3.0 pyparsing==3.0.9 thrift==0.16.0 From 303496cdb8b2449ca31b67aa0a0f7a9600717707 Mon Sep 17 00:00:00 2001 From: jzl18thu Date: Fri, 13 Jan 2023 12:30:24 +0800 Subject: [PATCH 2/4] Adjust join (#496) * adjust join 1.result set add key columns 2.fix some bugs * adjust join * fix a bug Fix a bug about handling filter in sorted merge join. * fix some bugs 1. handling filter in sorted merge join. 2. value comparison of different datatype * fix some bugs handle different types of data comparison * Update NaiveOperatorMemoryExecutor.java * adjust join stream Co-authored-by: Jensen Wang <1092792221@qq.com> Co-authored-by: zhuyuqing --- .../naive/NaiveOperatorMemoryExecutor.java | 312 +++++++++--------- .../stream/HashInnerJoinLazyStream.java | 36 +- .../stream/HashOuterJoinLazyStream.java | 36 +- .../stream/NestedLoopInnerJoinLazyStream.java | 6 +- .../stream/NestedLoopOuterJoinLazyStream.java | 5 +- .../SortedMergeInnerJoinLazyStream.java | 26 +- .../SortedMergeOuterJoinLazyStream.java | 25 +- .../memory/execute/utils/FilterUtils.java | 9 +- .../memory/execute/utils/RowUtils.java | 52 +-- .../function/system/utils/ValueUtils.java | 47 ++- .../AbstractOperatorMemoryExecutorTest.java | 185 ++++++++++- 11 files changed, 462 insertions(+), 277 deletions(-) diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java index 18f89ca47..7e521e9bb 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java @@ -35,11 +35,13 @@ import cn.edu.tsinghua.iginx.engine.shared.function.MappingFunction; import cn.edu.tsinghua.iginx.engine.shared.function.RowMappingFunction; import cn.edu.tsinghua.iginx.engine.shared.function.SetMappingFunction; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Bitmap; import cn.edu.tsinghua.iginx.utils.Pair; import cn.edu.tsinghua.iginx.utils.StringUtils; @@ -547,7 +549,8 @@ private RowStream executeNestedLoopInnerJoin(InnerJoin innerJoin, Table tableA, flag: for (Row rowB : rowsB) { for (String joinColumn : joinColumns){ - if (!Objects.equals(rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn), + rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) { continue flag; } } @@ -602,19 +605,33 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table } } + boolean needTypeCast = false; List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); + if (!rowsA.isEmpty() && !rowsB.isEmpty()) { + Value valueA = rowsA.get(0).getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value valueB = rowsB.get(0).getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB); + if (valueA.getDataType() != valueB.getDataType()) { + if (ValueUtils.isNumericType(valueA) && ValueUtils.isNumericType(valueB)) { + needTypeCast = true; + } + } + } + HashMap> rowsBHashMap = new HashMap<>(); for (Row rowB: rowsB) { - Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List l = rowsBHashMap.containsKey(hash) ? rowsBHashMap.get(hash) : new ArrayList<>(); l.add(rowB); @@ -625,15 +642,18 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table if (filter != null) { // Join condition: on newHeader = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB()); for (Row rowA : rowsA) { - Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } if (rowsBHashMap.containsKey(hash)) { @@ -650,16 +670,20 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table newHeader = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB(), Collections.singletonList(joinColumnB), true).getV(); int index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB); for (Row rowA : rowsA) { - Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } + if (rowsBHashMap.containsKey(hash)) { List hashRowsB = rowsBHashMap.get(hash); for (Row rowB : hashRowsB) { @@ -674,10 +698,6 @@ private RowStream executeHashInnerJoin(InnerJoin innerJoin, Table tableA, Table private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, Table tableB) throws PhysicalException { Filter filter = innerJoin.getFilter(); - - List fieldsA = new ArrayList<>(tableA.getHeader().getFields()); - List fieldsB = new ArrayList<>(tableB.getHeader().getFields()); - List joinColumns = new ArrayList<>(innerJoin.getJoinColumns()); if (innerJoin.isNaturalJoin()) { RowUtils.fillNaturalJoinColumns(joinColumns, tableA.getHeader(), tableB.getHeader(), innerJoin.getPrefixA(), innerJoin.getPrefixB()); @@ -690,17 +710,22 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); + List joinColumnsA = new ArrayList<>(joinColumns); + List joinColumnsB = new ArrayList<>(joinColumns); if (filter != null) { - joinColumns = new ArrayList<>(); + joinColumnsA = new ArrayList<>(); + joinColumnsB = new ArrayList<>(); List> pairs = FilterUtils.getJoinColumnsFromFilter(filter); if (pairs.isEmpty()) { throw new InvalidOperatorParameterException("on condition in join operator has no join columns."); } for(Pair p : pairs) { if (headerA.indexOf(p.k) != -1 && headerB.indexOf(p.v) != -1) { - joinColumns.add(p.k.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.k.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.v.replaceFirst(innerJoin.getPrefixB() + '.', "")); } else if (headerA.indexOf(p.v) != -1 && headerB.indexOf(p.k) != -1) { - joinColumns.add(p.v.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.v.replaceFirst(innerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.k.replaceFirst(innerJoin.getPrefixB() + '.', "")); } else { throw new InvalidOperatorParameterException("invalid join path filter input."); } @@ -708,8 +733,8 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, } boolean isAscendingSorted; - int flagA = RowUtils.checkRowsSortedByColumns(rowsA, innerJoin.getPrefixA(), joinColumns); - int flagB = RowUtils.checkRowsSortedByColumns(rowsB, innerJoin.getPrefixB(), joinColumns); + int flagA = RowUtils.checkRowsSortedByColumns(rowsA, innerJoin.getPrefixA(), joinColumnsA); + int flagB = RowUtils.checkRowsSortedByColumns(rowsB, innerJoin.getPrefixB(), joinColumnsB); if (flagA == -1 || flagB == -1) { throw new InvalidOperatorParameterException("input rows in merge join haven't be sorted."); } else if (flagA + flagB == 3) { @@ -731,22 +756,16 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, Header newHeader; List transformedRows = new ArrayList<>(); if (filter != null) { - fieldsA.addAll(fieldsB); - newHeader = new Header(fieldsA); + newHeader = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB()); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin = new Object[valuesA.length + valuesB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - System.arraycopy(valuesB, 0, valuesJoin, valuesA.length, valuesB.length); - Row transformedRow = new Row(newHeader, valuesJoin); - if (FilterUtils.validate(filter, transformedRow)) { - transformedRows.add(transformedRow); + Row joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB)); + if (FilterUtils.validate(filter, joinedRow)) { + transformedRows.add(joinedRow); } if (indexA + 1 == rowsA.size()) { @@ -760,8 +779,8 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, indexA++; indexB = startIndexOfContinuousEqualValuesB; } else { - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumns); + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumnsB, joinColumnsB); if (flagBEqualNextB == 0) { indexB++; } else { @@ -783,52 +802,42 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, } } } else { // Join condition: natural or using - int[] indexOfJoinColumnInTableB = new int[joinColumns.size()]; - int i = 0; - flag1: - for (Field fieldB : fieldsB) { - for (String joinColumn : joinColumns) { - if (Objects.equals(fieldB.getName(), innerJoin.getPrefixB() + '.' + joinColumn)) { - indexOfJoinColumnInTableB[i++] = headerB.indexOf(fieldB); - continue flag1; - } - } - fieldsA.add(fieldB); - } - newHeader = new Header(fieldsA); + Pair pair = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumns, true); + int[] indexOfJoinColumnInTableB = pair.getK(); + newHeader = pair.getV(); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), innerJoin.getPrefixA(), innerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin = new Object[valuesA.length + valuesB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - int k = valuesA.length; - flag3: - for (int j = 0; j < valuesB.length; j++) { - for (int index : indexOfJoinColumnInTableB) { - if (j == index) { - continue flag3; - } + Row joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB), indexOfJoinColumnInTableB, true); + transformedRows.add(joinedRow); + + if (indexA + 1 == rowsA.size()) { + if (indexB + 1 == rowsB.size()) { + break; + } else { + indexB++; } - valuesJoin[k++] = valuesB[j]; - } - transformedRows.add(new Row(newHeader, valuesJoin)); - - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumns); - if (flagBEqualNextB == 0) { - indexB++; } else { - indexA++; - if (flagAEqualNextA != 0) { - indexB++; - startIndexOfContinuousEqualValuesB = indexB; - } else { + if (indexB + 1 == rowsB.size()) { + indexA++; indexB = startIndexOfContinuousEqualValuesB; + } else { + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), innerJoin.getPrefixA(), innerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), innerJoin.getPrefixB(), innerJoin.getPrefixB(), joinColumnsB, joinColumnsB); + if (flagBEqualNextB == 0) { + indexB++; + } else { + indexA++; + if (flagAEqualNextA != 0) { + indexB++; + startIndexOfContinuousEqualValuesB = indexB; + } else { + indexB = startIndexOfContinuousEqualValuesB; + } + } } } } else if (flagAEqualB == -1) { @@ -918,7 +927,8 @@ private RowStream executeNestedLoopOuterJoin(OuterJoin outerJoin, Table tableA, for (int indexB = 0; indexB < rowsB.size(); indexB++) { Row rowB = rowsB.get(indexB); for (String joinColumn : joinColumns){ - if (!Objects.equals(rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn), + rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) { continue flag; } } @@ -1011,9 +1021,19 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table throw new InvalidOperatorParameterException("invalid hash join column input."); } } - + + boolean needTypeCast = false; List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); + if (!rowsA.isEmpty() && !rowsB.isEmpty()) { + Value valueA = rowsA.get(0).getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value valueB = rowsB.get(0).getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB); + if (valueA.getDataType() != valueB.getDataType()) { + if (ValueUtils.isNumericType(valueA) && ValueUtils.isNumericType(valueB)) { + needTypeCast = true; + } + } + } Bitmap bitmapA = new Bitmap(rowsA.size()); Bitmap bitmapB = new Bitmap(rowsB.size()); @@ -1021,15 +1041,18 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table HashMap> rowsBHashMap = new HashMap<>(); HashMap> indexOfRowBHashMap = new HashMap<>(); for (int indexB = 0; indexB < rowsB.size(); indexB++) { - Object value = rowsB.get(indexB).getValue(outerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowsB.get(indexB).getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List l = rowsBHashMap.containsKey(hash) ? rowsBHashMap.get(hash) : new ArrayList<>(); List il = rowsBHashMap.containsKey(hash) ? indexOfRowBHashMap.get(hash) : new ArrayList<>(); @@ -1045,16 +1068,20 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table newHeader = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB()); for (int indexA = 0; indexA < rowsA.size(); indexA++) { Row rowA = rowsA.get(indexA); - Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } + if (rowsBHashMap.containsKey(hash)) { List hashRowsB = rowsBHashMap.get(hash); List hashIndexB = indexOfRowBHashMap.get(hash); @@ -1085,16 +1112,20 @@ private RowStream executeHashOuterJoin(OuterJoin outerJoin, Table tableA, Table for (int indexA = 0; indexA < rowsA.size(); indexA++) { Row rowA = rowsA.get(indexA); - Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } + if (rowsBHashMap.containsKey(hash)) { List hashRowsB = rowsBHashMap.get(hash); List hashIndexB = indexOfRowBHashMap.get(hash); @@ -1177,18 +1208,22 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, List rowsA = tableA.getRows(); List rowsB = tableB.getRows(); - + List joinColumnsA = new ArrayList<>(joinColumns); + List joinColumnsB = new ArrayList<>(joinColumns); if (filter != null) { - joinColumns = new ArrayList<>(); + joinColumnsA = new ArrayList<>(); + joinColumnsB = new ArrayList<>(); List> pairs = FilterUtils.getJoinColumnsFromFilter(filter); if (pairs.isEmpty()) { throw new InvalidOperatorParameterException("on condition in join operator has no join columns."); } for(Pair p : pairs) { if (headerA.indexOf(p.k) != -1 && headerB.indexOf(p.v) != -1) { - joinColumns.add(p.k.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.k.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.v.replaceFirst(outerJoin.getPrefixB() + '.', "")); } else if (headerA.indexOf(p.v) != -1 && headerB.indexOf(p.k) != -1) { - joinColumns.add(p.v.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsA.add(p.v.replaceFirst(outerJoin.getPrefixA() + '.', "")); + joinColumnsB.add(p.k.replaceFirst(outerJoin.getPrefixB() + '.', "")); } else { throw new InvalidOperatorParameterException("invalid join path filter input."); } @@ -1196,8 +1231,8 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, } boolean isAscendingSorted; - int flagA = RowUtils.checkRowsSortedByColumns(rowsA, outerJoin.getPrefixA(), joinColumns); - int flagB = RowUtils.checkRowsSortedByColumns(rowsB, outerJoin.getPrefixB(), joinColumns); + int flagA = RowUtils.checkRowsSortedByColumns(rowsA, outerJoin.getPrefixA(), joinColumnsA); + int flagB = RowUtils.checkRowsSortedByColumns(rowsB, outerJoin.getPrefixB(), joinColumnsB); if (flagA == -1 || flagB == -1) { throw new InvalidOperatorParameterException("input rows in merge join haven't be sorted."); } else if (flagA + flagB == 3) { @@ -1222,28 +1257,22 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, Header newHeader; List transformedRows = new ArrayList<>(); if (filter != null) { - fieldsA.addAll(fieldsB); - newHeader = new Header(fieldsA); + newHeader = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB()); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin = new Object[valuesA.length + valuesB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - System.arraycopy(valuesB, 0, valuesJoin, valuesA.length, valuesB.length); - Row transformedRow = new Row(newHeader, valuesJoin); - if (FilterUtils.validate(filter, transformedRow)) { + Row joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB)); + if (FilterUtils.validate(filter, joinedRow)) { if (!bitmapA.get(indexA)) { bitmapA.mark(indexA); } if (!bitmapB.get(indexB)) { bitmapB.mark(indexB); } - transformedRows.add(transformedRow); + transformedRows.add(joinedRow); } if (indexA + 1 == rowsA.size()) { @@ -1257,8 +1286,8 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, indexA++; indexB = startIndexOfContinuousEqualValuesB; } else { - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumnsB, joinColumnsB); if (flagBEqualNextB == 0) { indexB++; } else { @@ -1280,72 +1309,25 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, } } } else { // Join condition: natural or using - List newFields = new ArrayList<>(); - int[] indexOfJoinColumnInTableB = new int[joinColumns.size()]; - int[] indexOfJoinColumnInTableA = new int[joinColumns.size()]; - int i = 0; - int j = 0; + Pair pair; if (outerType == OuterJoinType.RIGHT) { - flag1: - for (Field fieldA : fieldsA) { - for (String joinColumn : joinColumns) { - if (Objects.equals(fieldA.getName(), outerJoin.getPrefixA() + '.' + joinColumn)) { - indexOfJoinColumnInTableA[j++] = headerA.indexOf(fieldA); - continue flag1; - } - } - newFields.add(fieldA); - } - newFields.addAll(fieldsB); + pair = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns, false); } else { - newFields.addAll(fieldsA); - flag2: - for (Field fieldB : fieldsB) { - for (String joinColumn : joinColumns) { - if (Objects.equals(fieldB.getName(), outerJoin.getPrefixB() + '.' + joinColumn)) { - indexOfJoinColumnInTableB[i++] = headerB.indexOf(fieldB); - continue flag2; - } - } - newFields.add(fieldB); - } + pair = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns, true); } - newHeader = new Header(newFields); + int[] indexOfJoinColumnInTable = pair.getK(); + newHeader = pair.getV(); int indexA = 0; int indexB = 0; int startIndexOfContinuousEqualValuesB = 0; while (indexA < rowsA.size() && indexB < rowsB.size()) { - int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualB = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsB.get(indexB), outerJoin.getPrefixA(), outerJoin.getPrefixB(), joinColumnsA, joinColumnsB); if (flagAEqualB == 0) { - Object[] valuesA = rowsA.get(indexA).getValues(); - Object[] valuesB = rowsB.get(indexB).getValues(); - Object[] valuesJoin; + Row joinedRow; if (outerType == OuterJoinType.RIGHT) { - valuesJoin = new Object[valuesA.length + valuesB.length - indexOfJoinColumnInTableA.length]; - System.arraycopy(valuesB, 0, valuesJoin, valuesA.length - indexOfJoinColumnInTableA.length, valuesB.length); - int k = 0; - flag3: - for (int m = 0; m < valuesA.length; m++) { - for (int index : indexOfJoinColumnInTableA) { - if (m == index) { - continue flag3; - } - } - valuesJoin[k++] = valuesA[m]; - } + joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB), indexOfJoinColumnInTable, false); } else { - valuesJoin = new Object[valuesA.length + valuesB.length - indexOfJoinColumnInTableB.length]; - System.arraycopy(valuesA, 0, valuesJoin, 0, valuesA.length); - int k = valuesA.length; - flag4: - for (int m = 0; m < valuesB.length; m++) { - for (int index : indexOfJoinColumnInTableB) { - if (m == index) { - continue flag4; - } - } - valuesJoin[k++] = valuesB[m]; - } + joinedRow = RowUtils.constructNewRow(newHeader, rowsA.get(indexA), rowsB.get(indexB), indexOfJoinColumnInTable, true); } if (!bitmapA.get(indexA)) { @@ -1354,10 +1336,10 @@ private RowStream executeSortedMergeOuterJoin(OuterJoin outerJoin, Table tableA, if (!bitmapB.get(indexB)) { bitmapB.mark(indexB); } - transformedRows.add(new Row(newHeader, valuesJoin)); + transformedRows.add(joinedRow); - int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumns); - int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumns); + int flagAEqualNextA = RowUtils.compareRowsSortedByColumns(rowsA.get(indexA), rowsA.get(indexA + 1), outerJoin.getPrefixA(), outerJoin.getPrefixA(), joinColumnsA, joinColumnsA); + int flagBEqualNextB = RowUtils.compareRowsSortedByColumns(rowsB.get(indexB), rowsB.get(indexB + 1), outerJoin.getPrefixB(), outerJoin.getPrefixB(), joinColumnsB, joinColumnsB); if (flagBEqualNextB == 0) { indexB++; } else { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java index 506a435cf..6a742d5aa 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java @@ -4,7 +4,7 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; -import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; @@ -12,7 +12,9 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -38,6 +40,8 @@ public class HashInnerJoinLazyStream extends BinaryLazyStream { private String joinColumnA; private String joinColumnB; + + private boolean needTypeCast = false; public HashInnerJoinLazyStream(InnerJoin innerJoin, RowStream streamA, RowStream streamB) { super(streamA, streamB); @@ -89,18 +93,28 @@ private void initialize() throws PhysicalException { } } this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB); + + int indexA = headerA.indexOf(innerJoin.getPrefixA() + '.' + joinColumnA); + DataType dataTypeA = headerA.getField(indexA).getType(); + DataType dataTypeB = headerB.getField(index).getType(); + if (ValueUtils.isNumericType(dataTypeA) && ValueUtils.isNumericType(dataTypeB)) { + this.needTypeCast = true; + } while (streamB.hasNext()) { Row rowB = streamB.next(); - Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List rows = streamBHashMap.getOrDefault(hash, new ArrayList<>()); rows.add(rowB); @@ -139,16 +153,18 @@ public boolean hasNext() throws PhysicalException { private void tryMatch() throws PhysicalException { Row rowA = streamA.next(); - Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { return; } - + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } if (streamBHashMap.containsKey(hash)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java index cea28213e..6f999343f 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java @@ -4,14 +4,17 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.Arrays; @@ -48,6 +51,8 @@ public class HashOuterJoinLazyStream extends BinaryLazyStream { private String joinColumnA; private String joinColumnB; + + private boolean needTypeCast = false; public HashOuterJoinLazyStream(OuterJoin outerJoin, RowStream streamA, RowStream streamB) { super(streamA, streamB); @@ -103,24 +108,35 @@ private void initialize() throws PhysicalException { } } + int indexAnother; if (outerJoinType == OuterJoinType.RIGHT) { this.index = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA); + indexAnother = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB); } else { this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB); + indexAnother = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA); } + DataType dataType1 = headerA.getField(indexAnother).getType(); + DataType dataType2 = headerB.getField(index).getType(); + if (ValueUtils.isNumericType(dataType1) && ValueUtils.isNumericType(dataType2)) { + this.needTypeCast = true; + } while (streamB.hasNext()) { Row rowB = streamB.next(); - Object value = rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumnB); + Value value = rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB); if (value == null) { continue; } + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } List rows = streamBHashMap.getOrDefault(hash, new ArrayList<>()); rows.add(rowB); @@ -203,16 +219,18 @@ public boolean hasNext() throws PhysicalException { private void tryMatch() throws PhysicalException { Row rowA = streamA.next(); - Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA); + Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA); if (value == null) { return; } - + if (needTypeCast) { + value = ValueUtils.transformToDouble(value); + } int hash; - if (value instanceof byte[]) { - hash = Arrays.hashCode((byte[]) value); + if (value.getDataType() == DataType.BINARY) { + hash = Arrays.hashCode(value.getBinaryV()); } else { - hash = value.hashCode(); + hash = value.getValue().hashCode(); } if (streamBHashMap.containsKey(hash)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java index 79458a679..00103739a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java @@ -4,16 +4,15 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; -import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.List; -import java.util.Objects; public class NestedLoopInnerJoinLazyStream extends BinaryLazyStream { @@ -131,7 +130,8 @@ private Row tryMatch() throws PhysicalException { } } else { // Join condition: natural or using for (String joinColumn : joinColumns) { - if (!Objects.equals(nextA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(nextA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn), + nextB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) { nextB = null; return null; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java index 70b526406..c393daea9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java @@ -7,6 +7,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; @@ -14,7 +15,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; public class NestedLoopOuterJoinLazyStream extends BinaryLazyStream { @@ -205,7 +205,8 @@ private Row tryMatch() throws PhysicalException { } } else { // Join condition: natural or using for (String joinColumn : joinColumns) { - if (!Objects.equals(nextA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) { + if (ValueUtils.compare(nextA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn), + nextB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) { nextB = null; return null; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java index 274b6f67c..7a52b8ae9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java @@ -4,22 +4,21 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; -import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; -import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.LinkedList; import java.util.List; -import java.util.Objects; /** * input two stream must be ascending order. @@ -36,15 +35,13 @@ public class SortedMergeInnerJoinLazyStream extends BinaryLazyStream { private String joinColumnB; - private DataType joinColumnDataType; - private Row nextA; private Row nextB; private int index; - private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join + private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join private final List sameValueStreamBRows; // StreamB中join列的值相同的列缓存 @@ -101,13 +98,6 @@ private void initialize() throws PhysicalException { } this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB); - DataType dataTypeA = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType(); - DataType dataTypeB = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType(); - if (!dataTypeA.equals(dataTypeB)) { - throw new InvalidOperatorParameterException("the datatype of join columns is different"); - } - joinColumnDataType = dataTypeA; - if (filter != null) { // Join condition: on this.header = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB()); } else { // Join condition: natural or using @@ -135,8 +125,8 @@ public boolean hasNext() throws PhysicalException { } private void tryMatch() throws PhysicalException { - Object curJoinColumnAValue = nextA.getValue(innerJoin.getPrefixA() + "." + joinColumnA); - int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue); + Value curJoinColumnAValue = nextA.getAsValue(innerJoin.getPrefixA() + "." + joinColumnA); + int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue); if (cmp < 0) { nextA = null; } else if (cmp > 0) { @@ -169,13 +159,13 @@ private boolean hasMoreRows() throws PhysicalException { } while (sameValueStreamBRows.isEmpty() && nextB != null) { sameValueStreamBRows.add(nextB); - curJoinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB); + curJoinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB); nextB = null; while (streamB.hasNext()) { nextB = streamB.next(); - Object joinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB); - if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) { + Value joinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB); + if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) { sameValueStreamBRows.add(nextB); nextB = null; } else { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java index 81641d01d..c2485fa03 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java @@ -4,22 +4,22 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType; -import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.LinkedList; import java.util.List; -import java.util.Objects; public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream { @@ -35,13 +35,11 @@ public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream { private String joinColumnB; - private DataType joinColumnDataType; - private Row nextA; private Row nextB; - private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join + private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join private final List sameValueStreamBRows; // StreamB中join列的值相同的列缓存 @@ -113,13 +111,6 @@ private void initialize() throws PhysicalException { this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB); } - DataType dataTypeA = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType(); - DataType dataTypeB = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType(); - if (!dataTypeA.equals(dataTypeB)) { - throw new InvalidOperatorParameterException("the datatype of join columns is different"); - } - joinColumnDataType = dataTypeA; - if (filter != null) { // Join condition: on this.header = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB()); } else { // Join condition: natural or using @@ -205,8 +196,8 @@ public boolean hasNext() throws PhysicalException { } private void tryMatch() throws PhysicalException { - Object curJoinColumnAValue = nextA.getValue(outerJoin.getPrefixA() + "." + joinColumnA); - int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue); + Value curJoinColumnAValue = nextA.getAsValue(outerJoin.getPrefixA() + "." + joinColumnA); + int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue); if (cmp < 0) { unmatchedStreamARows.add(nextA); nextA = null; @@ -255,13 +246,13 @@ private boolean hasMoreRows() throws PhysicalException { } while (sameValueStreamBRows.isEmpty() && nextB != null) { sameValueStreamBRows.add(nextB); - curJoinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB); + curJoinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB); nextB = null; while (streamB.hasNext()) { nextB = streamB.next(); - Object joinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB); - if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) { + Value joinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB); + if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) { sameValueStreamBRows.add(nextB); nextB = null; } else { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java index cc675c931..8ebe844a4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils; +import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; @@ -29,7 +30,7 @@ public class FilterUtils { - public static boolean validate(Filter filter, Row row) { + public static boolean validate(Filter filter, Row row) throws PhysicalException { switch (filter.getType()) { case Or: OrFilter orFilter = (OrFilter) filter; @@ -89,7 +90,7 @@ private static boolean validateTimeFilter(KeyFilter keyFilter, Row row) { return false; } - private static boolean validateValueFilter(ValueFilter valueFilter, Row row) { + private static boolean validateValueFilter(ValueFilter valueFilter, Row row) throws PhysicalException { String path = valueFilter.getPath(); Value targetValue = valueFilter.getValue(); if (targetValue.isNull()) { // targetValue是空值,则认为不可比较 @@ -116,7 +117,7 @@ private static boolean validateValueFilter(ValueFilter valueFilter, Row row) { } } - private static boolean validatePathFilter(PathFilter pathFilter, Row row) { + private static boolean validatePathFilter(PathFilter pathFilter, Row row) throws PhysicalException { Value valueA = row.getAsValue(pathFilter.getPathA()); Value valueB = row.getAsValue(pathFilter.getPathB()); if (valueA == null || valueA.isNull() || valueB == null || valueB.isNull()) { // 如果任何一个是空值,则认为不可比较 @@ -125,7 +126,7 @@ private static boolean validatePathFilter(PathFilter pathFilter, Row row) { return validateValueCompare(pathFilter.getOp(), valueA, valueB); } - private static boolean validateValueCompare(Op op, Value valueA, Value valueB) { + private static boolean validateValueCompare(Op op, Value valueA, Value valueB) throws PhysicalException { if (valueA.getDataType() != valueB.getDataType()) { if (ValueUtils.isNumericType(valueA) && ValueUtils.isNumericType(valueB)) { valueA = ValueUtils.transformToDouble(valueA); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java index 7beb01a1d..d4df978b8 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/RowUtils.java @@ -24,6 +24,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; +import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.ArrayList; @@ -54,12 +55,12 @@ public static Row transform(Row row, Header targetHeader) { * 2: descending sorted */ public static int checkRowsSortedByColumns(List rows, String prefix, - List columns) { + List columns) throws PhysicalException { int res = 0; int index = 0; while (index < rows.size() - 1) { int mark = compareRowsSortedByColumns(rows.get(index), rows.get(index + 1), prefix, - prefix, columns); + prefix, columns, columns); if (mark == -1) { if (res == 0) { res = 1; @@ -84,10 +85,12 @@ public static int checkRowsSortedByColumns(List rows, String prefix, * 1: row1 > row2 */ public static int compareRowsSortedByColumns(Row row1, Row row2, String prefix1, String prefix2, - List columns) { - for (String column : columns) { - Object value1 = row1.getValue(prefix1 + '.' + column); - Object value2 = row2.getValue(prefix2 + '.' + column); + List columns1, List columns2) throws PhysicalException { + assert columns1.size() == columns2.size(); + int size = columns1.size(); + for (int index = 0; index < size; index++) { + Object value1 = row1.getValue(prefix1 + '.' + columns1.get(index)); + Object value2 = row2.getValue(prefix2 + '.' + columns2.get(index)); if (value1 == null && value2 == null) { return 0; } else if (value1 == null) { @@ -95,9 +98,11 @@ public static int compareRowsSortedByColumns(Row row1, Row row2, String prefix1, } else if (value2 == null) { return 1; } - DataType dataType = row1.getField(row1.getHeader().indexOf(prefix1 + '.' + column)) + DataType dataType1 = row1.getField(row1.getHeader().indexOf(prefix1 + '.' + columns1.get(index))) .getType(); - int cmp = compareObjects(dataType, value1, value2); + DataType dataType2 = row2.getField(row2.getHeader().indexOf(prefix2 + '.' + columns2.get(index))) + .getType(); + int cmp = ValueUtils.compare(value1, value2, dataType1, dataType2); if (cmp != 0) { return cmp; } @@ -105,37 +110,6 @@ public static int compareRowsSortedByColumns(Row row1, Row row2, String prefix1, return 0; } - public static int compareObjects(DataType dataType, Object value1, Object value2) { - switch (dataType) { - case BOOLEAN: - boolean boolean1 = (boolean) value1; - boolean boolean2 = (boolean) value2; - return Boolean.compare(boolean1, boolean2); - case INTEGER: - int int1 = (int) value1; - int int2 = (int) value2; - return Integer.compare(int1, int2); - case LONG: - long long1 = (long) value1; - long long2 = (long) value2; - return Long.compare(long1, long2); - case FLOAT: - float float1 = (float) value1; - float float2 = (float) value2; - return Float.compare(float1, float2); - case DOUBLE: - double double1 = (double) value1; - double double2 = (double) value2; - return Double.compare(double1, double2); - case BINARY: - String string1 = (String) value1; - String string2 = (String) value2; - return string1.compareTo(string2); - default: - throw new IllegalArgumentException("unknown datatype: " + dataType); - } - } - public static Header constructNewHead(Header headerA, Header headerB, String prefixA, String prefixB) { List fields = new ArrayList<>(); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java index a5d0f29d5..6edc76e20 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/utils/ValueUtils.java @@ -18,6 +18,8 @@ */ package cn.edu.tsinghua.iginx.engine.shared.function.system.utils; +import cn.edu.tsinghua.iginx.engine.physical.exception.InvalidOperatorParameterException; +import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.thrift.DataType; @@ -35,6 +37,10 @@ public class ValueUtils { public static boolean isNumericType(Value value) { return numericTypeSet.contains(value.getDataType()); } + + public static boolean isNumericType(DataType dataType) { + return numericTypeSet.contains(dataType); + } public static Value transformToDouble(Value value) { DataType dataType = value.getDataType(); @@ -64,21 +70,30 @@ public static Value transformToDouble(Value value) { return new Value(DataType.DOUBLE, dVal); } - public static int compare(Value o1, Value o2) { - DataType dataType = o1.getDataType(); - switch (dataType) { + public static int compare(Value v1, Value v2) throws PhysicalException { + DataType dataType1 = v1.getDataType(); + DataType dataType2 = v2.getDataType(); + if (dataType1 != dataType2) { + if (numericTypeSet.contains(dataType1) && numericTypeSet.contains(dataType2)) { + v1 = transformToDouble(v1); + v2 = transformToDouble(v2); + } else { + throw new InvalidOperatorParameterException(dataType1.toString() + " and " + dataType2.toString() + " can't be compared"); + } + } + switch (dataType1) { case INTEGER: - return Integer.compare(o1.getIntV(), o2.getIntV()); + return Integer.compare(v1.getIntV(), v2.getIntV()); case LONG: - return Long.compare(o1.getLongV(), o2.getLongV()); + return Long.compare(v1.getLongV(), v2.getLongV()); case BOOLEAN: - return Boolean.compare(o1.getBoolV(), o2.getBoolV()); + return Boolean.compare(v1.getBoolV(), v2.getBoolV()); case FLOAT: - return Float.compare(o1.getFloatV(), o2.getFloatV()); + return Float.compare(v1.getFloatV(), v2.getFloatV()); case DOUBLE: - return Double.compare(o1.getDoubleV(), o2.getDoubleV()); + return Double.compare(v1.getDoubleV(), v2.getDoubleV()); case BINARY: - return o1.getBinaryVAsString().compareTo(o2.getBinaryVAsString()); + return v1.getBinaryVAsString().compareTo(v2.getBinaryVAsString()); } return 0; } @@ -112,6 +127,20 @@ public static int compare(Object o1, Object o2, DataType dataType) { } return 0; } + + public static int compare(Object o1, Object o2, DataType dataType1, DataType dataType2) throws PhysicalException { + if (dataType1 != dataType2) { + if (numericTypeSet.contains(dataType1) && numericTypeSet.contains(dataType2)) { + Value v1 = ValueUtils.transformToDouble(new Value(dataType1, o1)); + Value v2 = ValueUtils.transformToDouble(new Value(dataType2, o2)); + return compare(v1, v2); + } else { + throw new InvalidOperatorParameterException(dataType1.toString() + " and " + dataType2.toString() + " can't be compared"); + } + } else { + return compare(o1, o2, dataType1); + } + } public static String toString(Object value, DataType dataType) { switch (dataType) { diff --git a/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java b/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java index 316205988..b879ffa51 100644 --- a/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java +++ b/core/src/test/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/AbstractOperatorMemoryExecutorTest.java @@ -370,6 +370,8 @@ public void testInnerJoin() throws PhysicalException { RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); assertStreamEqual(stream, usingTarget); } + + } @Test @@ -929,7 +931,188 @@ public void testNaturalJoin() throws PhysicalException { assertStreamEqual(stream, rightTarget); } } - + + @Test + public void testJoinWithTypeCast() throws PhysicalException { + Table tableA = generateTableFromValues( + true, + Arrays.asList( + new Field("a.a", DataType.INTEGER), + new Field("a.b", DataType.DOUBLE), + new Field("a.c", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(2, 2.0, true), + Arrays.asList(3, 3.0, false), + Arrays.asList(4, 4.0, true), + Arrays.asList(5, 5.0, false), + Arrays.asList(6, 6.0, true) + )); + + Table tableB = generateTableFromValues( + true, + Arrays.asList( + new Field("b.b", DataType.INTEGER), + new Field("b.d", DataType.DOUBLE), + new Field("b.e", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(1, 1.0, true), + Arrays.asList(3, 3.0, false), + Arrays.asList(5, 5.0, true), + Arrays.asList(7, 7.0, false), + Arrays.asList(9, 9.0, true) + )); + + Table targetInner = generateTableFromValues( + false, + Arrays.asList( + new Field("a.key", DataType.LONG), + new Field("a.a", DataType.INTEGER), + new Field("a.b", DataType.DOUBLE), + new Field("a.c", DataType.BOOLEAN), + new Field("b.key", DataType.LONG), + new Field("b.b", DataType.INTEGER), + new Field("b.d", DataType.DOUBLE), + new Field("b.e", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(2L, 3, 3.0, false, 2L, 3, 3.0, false), + Arrays.asList(4L, 5, 5.0, false, 3L, 5, 5.0, true) + )); + + Table usingTargetInner = generateTableFromValues( + false, + Arrays.asList( + new Field("a.key", DataType.LONG), + new Field("a.a", DataType.INTEGER), + new Field("a.b", DataType.DOUBLE), + new Field("a.c", DataType.BOOLEAN), + new Field("b.key", DataType.LONG), + new Field("b.d", DataType.DOUBLE), + new Field("b.e", DataType.BOOLEAN) + ), + Arrays.asList( + Arrays.asList(2L, 3, 3.0, false, 2L, 3.0, false), + Arrays.asList(4L, 5, 5.0, false, 3L, 5.0, true) + )); + + { + // NestedLoopJoin + tableA.reset(); + tableB.reset(); + targetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + new PathFilter("a.b", Op.E, "b.b"), + Collections.emptyList(), + false, + JoinAlgType.NestedLoopJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, targetInner); + } + + { + // HashJoin + tableA.reset(); + tableB.reset(); + targetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + new PathFilter("a.b", Op.E, "b.b"), + Collections.emptyList(), + false, + JoinAlgType.HashJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, targetInner); + } + + { + // SortedMergeJoin + tableA.reset(); + tableB.reset(); + targetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + new PathFilter("a.b", Op.E, "b.b"), + Collections.emptyList(), + false, + JoinAlgType.SortedMergeJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, targetInner); + } + + { + // NestedLoopJoin + tableA.reset(); + tableB.reset(); + usingTargetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + null, + Collections.singletonList("b"), + false, + JoinAlgType.NestedLoopJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, usingTargetInner); + } + + { + // HashJoin + tableA.reset(); + tableB.reset(); + usingTargetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + null, + Collections.singletonList("b"), + false, + JoinAlgType.HashJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, usingTargetInner); + } + + { + // SortedMergeJoin + tableA.reset(); + tableB.reset(); + usingTargetInner.reset(); + + InnerJoin innerJoin = new InnerJoin( + EmptySource.EMPTY_SOURCE, + EmptySource.EMPTY_SOURCE, + "a", "b", + null, + Collections.singletonList("b"), + false, + JoinAlgType.SortedMergeJoin); + + RowStream stream = getExecutor().executeBinaryOperator(innerJoin, tableA, tableB); + assertStreamEqual(stream, usingTargetInner); + } + + } + // for debug private Table transformToTable(RowStream stream) throws PhysicalException { if (stream instanceof Table) { From 23289b50266e915f54b088490033a7d00dedb37e Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Tue, 17 Jan 2023 11:19:26 +0800 Subject: [PATCH 3/4] add session v2 test it (#500) * add session v2 test it * fix Co-authored-by: zhuyuqing --- .github/workflows/api-session.yml | 57 +++++++++++++++++++ .../iginx/integration/SessionV2IT.java | 2 +- 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/api-session.yml diff --git a/.github/workflows/api-session.yml b/.github/workflows/api-session.yml new file mode 100644 index 000000000..e66bbfd3d --- /dev/null +++ b/.github/workflows/api-session.yml @@ -0,0 +1,57 @@ +name: "API-Test-SESSION-V2" + +on: + push: + branches: + - main + pull_request: + branches: + - main +env: + VERSION: 0.6.0-SNAPSHOT + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + SessionV2-Test: + strategy: + fail-fast: false + #max-parallel: 20 + matrix: + java: [ 8 ] + python-version: [ "3.7" ] + os: [ ubuntu-latest, macos-latest ] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + - name: Environmet Dependence + uses: ./.github/actions/dependence + with: + python-version: ${{ matrix.python-version }} + java: ${{ matrix.java }} + + - name: Run ZooKeeper + uses: ./.github/actions/zookeeperRunner + + - name: Run IoTDB + uses: ./.github/actions/iotdbRunner + with: + if-CapExp: "false" + version: iotdb11 + + - name: Install with Maven + run: mvn clean package -DskipTests + + - name: Start IginX + uses: ./.github/actions/iginxRunner + with: + version: ${VERSION} + + - name: A Lame Integration Test with Maven for IoTDB + run: mvn test -q -Dtest=SessionV2IT -DfailIfNoTests=false + - uses: codecov/codecov-action@v1 + with: + file: ./**/target/site/jacoco/jacoco.xml + name: codecov diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/SessionV2IT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/SessionV2IT.java index cdf8f86d1..fa4840ae7 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/SessionV2IT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/SessionV2IT.java @@ -581,7 +581,7 @@ public void testMeasurementQuery() { @Test public void testClusterInfo() { - int expectedReplicaNum = 2; + int expectedReplicaNum = 1; assertEquals(expectedReplicaNum, clusterClient.getReplicaNum()); IginxInfo info = new IginxInfo(); From f897a7e5cbb3ca63cf3a10ae1ec83381dd187808 Mon Sep 17 00:00:00 2001 From: RemHero <80055724+RemHero@users.noreply.github.com> Date: Wed, 18 Jan 2023 10:07:13 +0800 Subject: [PATCH 4/4] Remove history data (#497) * Remove History Data Source * IT test * optimized code and fix IT * fix IT * replace the Constants.DUMMY with the FUNC * simplify the IT test * replace the isValid with valid * change the dummyID --- .github/workflows/capacity-expansion.yml | 6 +-- .../cn/edu/tsinghua/iginx/IginxWorker.java | 54 ++++++++++++++++++- .../logical/generator/QueryGenerator.java | 7 ++- .../optimizer/FilterFragmentOptimizer.java | 2 +- .../iginx/metadata/DefaultMetaManager.java | 32 +++++++++-- .../tsinghua/iginx/metadata/IMetaManager.java | 5 ++ .../metadata/cache/DefaultMetaCache.java | 37 ++++++++++++- .../iginx/metadata/cache/IMetaCache.java | 3 ++ .../iginx/metadata/entity/FragmentMeta.java | 10 ++++ .../metadata/entity/StorageEngineMeta.java | 15 ++++-- .../metadata/entity/StorageUnitMeta.java | 20 +++++++ .../iginx/metadata/storage/IMetaStorage.java | 2 + .../storage/etcd/ETCDMetaStorage.java | 18 +++++++ .../storage/zk/ZooKeeperMetaStorage.java | 24 +++++++++ .../edu/tsinghua/iginx/session/Session.java | 18 +++++++ .../IoTDBHistoryDataCapacityExpansionIT.java | 23 +++++++- thrift/src/main/proto/rpc.thrift | 7 +++ 17 files changed, 265 insertions(+), 18 deletions(-) diff --git a/.github/workflows/capacity-expansion.yml b/.github/workflows/capacity-expansion.yml index 8fdff4be4..96f273636 100644 --- a/.github/workflows/capacity-expansion.yml +++ b/.github/workflows/capacity-expansion.yml @@ -242,7 +242,7 @@ jobs: run: | mvn test -q -Dtest=InfluxDBHistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix -DfailIfNoTests=false - DataPrefixWithMultiSchemaPrefix-Test-IotDB: + DataPrefixWithMultiSchemaPrefix_AND_RemoveHistoryDataSource-Test-IotDB: timeout-minutes: 20 strategy: fail-fast: false @@ -289,7 +289,7 @@ jobs: - name: data prefix IT run: | if [ "${{matrix.DB-name}}" == "iotdb11" ]; then - mvn test -q -Dtest=IoTDB11HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix -DfailIfNoTests=false + mvn test -q -Dtest=IoTDB11HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataSource -DfailIfNoTests=false elif [ "${{matrix.DB-name}}" == "iotdb12" ]; then - mvn test -q -Dtest=IoTDB12HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix -DfailIfNoTests=false + mvn test -q -Dtest=IoTDB12HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataSource -DfailIfNoTests=false fi \ No newline at end of file diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java index 80fb43b5c..d867a6c43 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java @@ -28,6 +28,7 @@ import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngineImpl; import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager; import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.exceptions.StatusCode; import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager; import cn.edu.tsinghua.iginx.metadata.IMetaManager; import cn.edu.tsinghua.iginx.metadata.entity.*; @@ -183,6 +184,57 @@ public QueryDataResp queryData(QueryDataReq req) { return ctx.getResult().getQueryDataResp(); } + @Override + public Status removeHistoryDataSource(RemoveHistoryDataSourceReq req) { + if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) { + return RpcUtils.ACCESS_DENY; + } + Status status = RpcUtils.SUCCESS; + long dummyStorageId = req.getDummyStorageId(); + StorageEngineMeta meta = metaManager.getStorageEngine(dummyStorageId); + if (meta == null || meta.getDummyFragment() == null || meta.getDummyStorageUnit() == null) { + status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode()); + status.setMessage("storage engine is not exists."); + return status; + } + try { + // 设置对应的 dummyFragament 为 invalid 状态 + meta.getDummyFragment().setIfValid(false); + meta.getDummyStorageUnit().setIfValid(false); + + // 修改需要更新的元数据信息 extraParams中的 has_data属性需要修改 + StorageEngineMeta newMeta = new StorageEngineMeta( + meta.getId(), + meta.getIp(), + meta.getPort(), + false, + null, + null, + meta.isReadOnly(), + null, + null, + meta.getExtraParams(), + meta.getStorageEngine(), + meta.getStorageUnitList(), + meta.getCreatedBy(), + meta.isNeedReAllocate() + ); + + // 更新 zk 上元数据信息,以及 iginx 上元数据信息 + if (!metaManager.updateStorageEngine(dummyStorageId, newMeta)) { + status = RpcUtils.FAILURE; + status.setMessage("unexpected error during storage update"); + } + + return status; + } catch (Exception e) { + logger.error("unexpected error during storage migration: ", e); + status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode()); + status.setMessage("unexpected error during removing history data source: " + e.getMessage()); + return status; + } + } + @Override public Status addStorageEngines(AddStorageEnginesReq req) { if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) { @@ -245,7 +297,7 @@ public Status addStorageEngines(AddStorageEnginesReq req) { int index = 0; if (meta.isHasData()) { String dataPrefix = meta.getDataPrefix(); - StorageUnitMeta dummyStorageUnit = new StorageUnitMeta(Constants.DUMMY + String.format("%04d", 0), -1); + StorageUnitMeta dummyStorageUnit = new StorageUnitMeta(StorageUnitMeta.generateDummyStorageUnitID(0), -1); Pair boundary = StorageManager.getBoundaryOfStorage(meta, dataPrefix); FragmentMeta dummyFragment; String schemaPrefixTmp = null; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java index f667e9beb..7478eba61 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java @@ -327,8 +327,11 @@ private Operator mergeRawData(Map> fragments, L Operator operator = OperatorUtils.unionOperators(unionList); if (!dummyFragments.isEmpty()) { List joinList = new ArrayList<>(); - dummyFragments.forEach(meta -> joinList.add(new Project(new FragmentSource(meta), - pathMatchPrefix(pathList,meta.getTsInterval().getTimeSeries(), meta.getTsInterval().getSchemaPrefix()), tagFilter))); + dummyFragments.forEach(meta -> { + if (meta.isValid()) + joinList.add(new Project(new FragmentSource(meta), + pathMatchPrefix(pathList,meta.getTsInterval().getTimeSeries(), meta.getTsInterval().getSchemaPrefix()), tagFilter)); + }); joinList.add(operator); operator = OperatorUtils.joinOperatorsByTime(joinList); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/optimizer/FilterFragmentOptimizer.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/optimizer/FilterFragmentOptimizer.java index 26142e299..4f90f7716 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/optimizer/FilterFragmentOptimizer.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/optimizer/FilterFragmentOptimizer.java @@ -104,7 +104,7 @@ private void filterFragmentByTimeRange(Select selectOperator) { if (!dummyFragments.isEmpty()) { List joinList = new ArrayList<>(); dummyFragments.forEach(meta -> { - if (hasTimeRangeOverlap(meta, timeRanges)) { + if (meta.isValid() && hasTimeRangeOverlap(meta, timeRanges)) { joinList.add(new Project(new FragmentSource(meta), pathList, selectOperator.getTagFilter())); } }); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java index 794bbd2a6..c3f72c0ae 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java @@ -22,6 +22,7 @@ import cn.edu.tsinghua.iginx.conf.Constants; import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager; import cn.edu.tsinghua.iginx.exceptions.MetaStorageException; +import cn.edu.tsinghua.iginx.exceptions.StatusCode; import cn.edu.tsinghua.iginx.metadata.cache.DefaultMetaCache; import cn.edu.tsinghua.iginx.metadata.cache.IMetaCache; import cn.edu.tsinghua.iginx.metadata.entity.*; @@ -33,6 +34,7 @@ import cn.edu.tsinghua.iginx.policy.simple.TimeSeriesCalDO; import cn.edu.tsinghua.iginx.sql.statement.InsertStatement; import cn.edu.tsinghua.iginx.thrift.AuthType; +import cn.edu.tsinghua.iginx.thrift.Status; import cn.edu.tsinghua.iginx.thrift.UserType; import cn.edu.tsinghua.iginx.utils.Pair; import cn.edu.tsinghua.iginx.utils.SnowFlakeUtils; @@ -149,7 +151,7 @@ private void initStorageEngine() throws MetaStorageException { if (storageEngine.isHasData()) { StorageUnitMeta dummyStorageUnit = storageEngine.getDummyStorageUnit(); dummyStorageUnit.setStorageEngineId(id); - dummyStorageUnit.setId(String.format(Constants.DUMMY + "%04d", (int) id)); + dummyStorageUnit.setId(StorageUnitMeta.generateDummyStorageUnitID(id)); dummyStorageUnit.setMasterId(dummyStorageUnit.getId()); FragmentMeta dummyFragment = storageEngine.getDummyFragment(); dummyFragment.setMasterStorageUnit(dummyStorageUnit); @@ -318,7 +320,7 @@ public boolean addStorageEngines(List storageEngineMetas) { if (storageEngineMeta.isHasData()) { StorageUnitMeta dummyStorageUnit = storageEngineMeta.getDummyStorageUnit(); dummyStorageUnit.setStorageEngineId(id); - dummyStorageUnit.setId(String.format(Constants.DUMMY + "%04d", (int) id)); + dummyStorageUnit.setId(StorageUnitMeta.generateDummyStorageUnitID(id)); dummyStorageUnit.setMasterId(dummyStorageUnit.getId()); FragmentMeta dummyFragment = storageEngineMeta.getDummyFragment(); dummyFragment.setMasterStorageUnit(dummyStorageUnit); @@ -333,6 +335,30 @@ public boolean addStorageEngines(List storageEngineMetas) { return false; } + @Override + public boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngineMeta) { + if (getStorageEngine(storageID) == null) { + return false; + } + try { + storageEngineMeta.setId(storageID); + storage.updateStorageEngine(storageID, storageEngineMeta); // 如果删除成功,则后续更新对应的 dummyFragament 的元数据 + if (storageEngineMeta.isHasData()) { // 确保内部数据的一致性 + StorageUnitMeta dummyStorageUnit = storageEngineMeta.getDummyStorageUnit(); + dummyStorageUnit.setStorageEngineId(storageID); + dummyStorageUnit.setId(StorageUnitMeta.generateDummyStorageUnitID(storageID)); + dummyStorageUnit.setMasterId(dummyStorageUnit.getId()); + FragmentMeta dummyFragment = storageEngineMeta.getDummyFragment(); + dummyFragment.setMasterStorageUnit(dummyStorageUnit); + dummyFragment.setMasterStorageUnitId(dummyStorageUnit.getId()); + } + return cache.updateStorageEngine(storageID, storageEngineMeta); + } catch (MetaStorageException e) { + logger.error("update storage engines error:", e); + } + return false; + } + @Override public List getStorageEngineList() { return new ArrayList<>(cache.getStorageEngineList()); @@ -1049,7 +1075,7 @@ private List resolveStorageEngineFromConf() { boolean readOnly = Boolean.parseBoolean(extraParams.getOrDefault(Constants.IS_READ_ONLY, "false")); StorageEngineMeta storage = new StorageEngineMeta(i, ip, port, hasData, dataPrefix, readOnly, extraParams, storageEngine, id); if (hasData) { - StorageUnitMeta dummyStorageUnit = new StorageUnitMeta(Constants.DUMMY + String.format("%04d", i), i); + StorageUnitMeta dummyStorageUnit = new StorageUnitMeta(StorageUnitMeta.generateDummyStorageUnitID(i), i); Pair boundary = StorageManager.getBoundaryOfStorage(storage); FragmentMeta dummyFragment; if (dataPrefix == null) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java index 5b57227ff..e9aabff5a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java @@ -39,6 +39,11 @@ public interface IMetaManager { */ boolean addStorageEngines(List storageEngineMetas); + /** + * 更新存储引擎节点 + */ + boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngineMeta); + /** * 获取所有的存储引擎实例的原信息(包括每个存储引擎的存储单元列表) */ diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/DefaultMetaCache.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/DefaultMetaCache.java index cb85baa51..cc2bb160b 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/DefaultMetaCache.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/DefaultMetaCache.java @@ -20,6 +20,7 @@ import cn.edu.tsinghua.iginx.conf.Config; import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.conf.Constants; import cn.edu.tsinghua.iginx.engine.shared.data.write.*; import cn.edu.tsinghua.iginx.metadata.entity.*; import cn.edu.tsinghua.iginx.policy.simple.TimeSeriesCalDO; @@ -356,7 +357,7 @@ public List getDummyFragmentsByTimeSeriesInterval(TimeSeriesRange fragmentLock.readLock().lock(); List results = new ArrayList<>(); for (FragmentMeta fragmentMeta: dummyFragments) { - if (fragmentMeta.getTsInterval().isIntersect(tsInterval)) { + if (fragmentMeta.isValid() && fragmentMeta.getTsInterval().isIntersect(tsInterval)) { results.add(fragmentMeta); } } @@ -403,7 +404,7 @@ public List getDummyFragmentsByTimeSeriesIntervalAndTimeInterval(T fragmentLock.readLock().lock(); List results = new ArrayList<>(); for (FragmentMeta fragmentMeta: dummyFragments) { - if (fragmentMeta.getTsInterval().isIntersect(tsInterval) && fragmentMeta.getTimeInterval().isIntersect(timeInterval)) { + if (fragmentMeta.isValid() && fragmentMeta.getTsInterval().isIntersect(tsInterval) && fragmentMeta.getTimeInterval().isIntersect(timeInterval)) { results.add(fragmentMeta); } } @@ -568,6 +569,38 @@ public void addStorageEngine(StorageEngineMeta storageEngineMeta) { storageUnitLock.writeLock().unlock(); } + @Override + public boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngineMeta) { + storageUnitLock.writeLock().lock(); + fragmentLock.writeLock().lock(); + + if (!storageEngineMetaMap.containsKey(storageID)) { + logger.error("No corresponding storage engine needs to be updated"); + return false; + } + String dummyStorageUnitID = StorageUnitMeta.generateDummyStorageUnitID(storageID); + boolean ifOriHasData = storageEngineMetaMap.get(storageID).isHasData(); + if (storageEngineMeta.isHasData()) { // 设置相关元数据信息 + StorageUnitMeta dummyStorageUnit = storageEngineMeta.getDummyStorageUnit(); + FragmentMeta dummyFragment = storageEngineMeta.getDummyFragment(); + dummyFragment.setMasterStorageUnit(dummyStorageUnit); + dummyStorageUnitMetaMap.put(dummyStorageUnit.getId(), dummyStorageUnit); + if (ifOriHasData) { // 更新 dummyFragments 数据 + dummyFragments.removeIf(e -> e.getMasterStorageUnitId().equals(dummyStorageUnitID)); + } else { + dummyFragments.add(dummyFragment); + } + } else if (ifOriHasData) { // 原来没有,则移除 + dummyFragments.removeIf(e -> e.getMasterStorageUnitId().equals(dummyStorageUnitID)); + dummyStorageUnitMetaMap.remove(dummyStorageUnitID); + } + storageEngineMetaMap.put(storageEngineMeta.getId(), storageEngineMeta); + + fragmentLock.writeLock().unlock(); + storageUnitLock.writeLock().unlock(); + return true; + } + @Override public List getStorageEngineList() { return new ArrayList<>(this.storageEngineMetaMap.values()); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/IMetaCache.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/IMetaCache.java index 41a339257..2ea72bdba 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/IMetaCache.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/cache/IMetaCache.java @@ -90,6 +90,9 @@ public interface IMetaCache { // 数据后端相关的缓存读写接口 void addStorageEngine(StorageEngineMeta storageEngineMeta); + // 更新对应节点的元数据信息。如果对应节点的 dummy 元数据被移除,则需要删除相应的 dummy 元数据信息 + boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngineMeta); + List getStorageEngineList(); StorageEngineMeta getStorageEngine(long id); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/FragmentMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/FragmentMeta.java index 5f1c8f215..f56ce50b4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/FragmentMeta.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/FragmentMeta.java @@ -58,6 +58,8 @@ public final class FragmentMeta { private boolean dummyFragment = false; + private boolean valid = true; + public FragmentMeta(String startPrefix, String endPrefix, long startTime, long endTime) { this.timeInterval = new TimeInterval(startTime, endTime); this.tsInterval = new TimeSeriesInterval(startPrefix, endPrefix); @@ -159,6 +161,14 @@ public void setMasterStorageUnitId(String masterStorageUnitId) { this.masterStorageUnitId = masterStorageUnitId; } + public boolean isValid() { + return valid; + } + + public void setIfValid(boolean ifValid) { + this.valid = ifValid; + } + @Override public String toString() { return "FragmentMeta{" + diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java index 96df9eb5f..6368c6221 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; -public final class StorageEngineMeta { +public final class StorageEngineMeta implements Cloneable { /** * 数据库的 id @@ -39,13 +39,13 @@ public final class StorageEngineMeta { */ private final int port; - private final boolean hasData; - - private final String dataPrefix; + private final boolean readOnly; private final String schemaPrefix; - private final boolean readOnly; + private final String dataPrefix; + + private final boolean hasData; private StorageUnitMeta dummyStorageUnit; @@ -213,6 +213,11 @@ public void setNeedReAllocate(boolean needReAllocate) { this.needReAllocate = needReAllocate; } + @Override + public StorageEngineMeta clone() throws CloneNotSupportedException { + return (StorageEngineMeta)super.clone(); + } + @Override public String toString() { return "StorageEngineMeta {" + diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java index 3ca54cc1b..905469e72 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java @@ -18,6 +18,8 @@ */ package cn.edu.tsinghua.iginx.metadata.entity; +import cn.edu.tsinghua.iginx.conf.Constants; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -39,6 +41,8 @@ public final class StorageUnitMeta { private boolean dummy = false; + private boolean ifValid = true; + private transient List replicas = new ArrayList<>(); public StorageUnitMeta(String id, long storageEngineId, String masterId, boolean isMaster) { @@ -189,4 +193,20 @@ public void setStorageEngineId(long storageEngineId) { public boolean isDummy() { return dummy; } + + public void setDummy(boolean dummy) { + this.dummy = dummy; + } + + public boolean isIfValid() { + return ifValid; + } + + public void setIfValid(boolean ifValid) { + this.ifValid = ifValid; + } + + public static String generateDummyStorageUnitID(long id) { + return String.format(Constants.DUMMY + "%010d", (int)id); + } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java index e5ba2e330..be5a342fd 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java @@ -43,6 +43,8 @@ public interface IMetaStorage { long addStorageEngine(StorageEngineMeta storageEngine) throws MetaStorageException; + boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngine) throws MetaStorageException; + void registerStorageChangeHook(StorageChangeHook hook); Map loadStorageUnit() throws MetaStorageException; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java index f0114c48b..e3ee8581b 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java @@ -579,6 +579,24 @@ public long addStorageEngine(StorageEngineMeta storageEngine) throws MetaStorage return 0L; } + @Override + public boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngine) throws MetaStorageException { + try { + lockStorage(); + this.client.getKVClient() + .put(ByteSequence.from((STORAGE_PREFIX + String.format("%06d", storageID)).getBytes()), + ByteSequence.from(JsonUtils.toJson(storageEngine))).get(); + } catch (ExecutionException | InterruptedException e) { + logger.error("got error when add storage: ", e); + throw new MetaStorageException(e); + } finally { + if (storageLease != -1) { + releaseStorage(); + } + } + return true; + } + @Override public void registerStorageChangeHook(StorageChangeHook hook) { this.storageChangeHook = hook; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java index 2431b44d4..ee905d8d1 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java @@ -19,6 +19,7 @@ package cn.edu.tsinghua.iginx.metadata.storage.zk; import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.conf.Constants; import cn.edu.tsinghua.iginx.exceptions.MetaStorageException; import cn.edu.tsinghua.iginx.metadata.entity.*; import cn.edu.tsinghua.iginx.metadata.hook.*; @@ -119,6 +120,8 @@ public class ZooKeeperMetaStorage implements IMetaStorage { private boolean isMaster = false; + private final int STORAGE_ENGINE_NODE_NUM_LENGTH = 10; + private static ZooKeeperMetaStorage INSTANCE = null; private final CuratorFramework client; @@ -461,6 +464,27 @@ public long addStorageEngine(StorageEngineMeta storageEngine) throws MetaStorage } } + @Override + public boolean updateStorageEngine(long storageID, StorageEngineMeta storageEngine) throws MetaStorageException { + InterProcessMutex mutex = new InterProcessMutex(this.client, STORAGE_ENGINE_LOCK_NODE); + try { // node0000000002 STORAGE_ENGINE_NODE_NUM_LENGTH + mutex.acquire(); + String tmp = new String(JsonUtils.toJson(storageEngine)); + String nodeName = String.format(STORAGE_ENGINE_NODE + "%0" + STORAGE_ENGINE_NODE_NUM_LENGTH + "d", (int) storageID); + this.client.setData() + .forPath(nodeName, tmp.getBytes()); + return true; + } catch (Exception e) { + throw new MetaStorageException("get error when update storage engine", e); + } finally { + try { + mutex.release(); + } catch (Exception e) { + throw new MetaStorageException("get error when release interprocess lock for " + SCHEMA_MAPPING_LOCK_NODE, e); + } + } + } + private void registerStorageEngineListener() throws Exception { this.storageEngineCache = new TreeCache(this.client, STORAGE_ENGINE_NODE_PREFIX); TreeCacheListener listener = (curatorFramework, event) -> { diff --git a/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java b/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java index 9d401a877..7c1c44f73 100644 --- a/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java +++ b/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java @@ -1259,4 +1259,22 @@ public CurveMatchResult curveMatch(List paths, long startTime, long endT } return new CurveMatchResult(resp.getMatchedTimestamp(), resp.getMatchedPath()); } + + public void removeHistoryDataSource(long id) throws SessionException, ExecutionException { + RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(sessionId, id); + try { + Status status; + do { + lock.readLock().lock(); + try { + status = client.removeHistoryDataSource(req); + } finally { + lock.readLock().unlock(); + } + } while(checkRedirect(status)); + RpcUtils.verifySuccess(status); + } catch (TException e) { + throw new SessionException(e); + } + } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java index 47b3eb150..d379eefa9 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java @@ -494,7 +494,7 @@ public void testDataPrefix() throws Exception { } @Test - public void testAddSameDataPrefixWithDiffSchemaPrefix() throws Exception { + public void testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataSource() throws Exception { session.executeSql("ADD STORAGEENGINE (\"127.0.0.1\", 6668, \"" + ENGINE_TYPE + "\", \"username:root, password:root, sessionPoolSize:20, has_data:true, data_prefix:test, schema_prefix:p1, is_read_only:true\");"); session.executeSql("ADD STORAGEENGINE (\"127.0.0.1\", 6668, \"" + ENGINE_TYPE + "\", \"username:root, password:root, sessionPoolSize:20, has_data:true, data_prefix:test, schema_prefix:p2, is_read_only:true\");"); @@ -528,5 +528,26 @@ public void testAddSameDataPrefixWithDiffSchemaPrefix() throws Exception { "+---+\n" + "Empty set.\n"; SQLTestTools.executeAndCompare(session, statement, expect); + + session.executeSql("ADD STORAGEENGINE (\"127.0.0.1\", 6668, \"" + ENGINE_TYPE + "\", \"username:root, password:root, sessionPoolSize:20, has_data:true, data_prefix:test, is_read_only:false\");"); + statement = "select * from test"; + expect = "ResultSets:\n" + + "+---+---------------------+--------------------------+\n" + + "|key|test.wf03.wt01.status|test.wf03.wt01.temperature|\n" + + "+---+---------------------+--------------------------+\n" + + "| 77| true| null|\n" + + "|200| false| 77.71|\n" + + "+---+---------------------+--------------------------+\n" + + "Total line number = 2\n"; + SQLTestTools.executeAndCompare(session, statement, expect); + session.removeHistoryDataSource(3); + statement = "select * from test"; + expect = "ResultSets:\n" + + "+---+\n" + + "|key|\n" + + "+---+\n" + + "+---+\n" + + "Empty set.\n"; + SQLTestTools.executeAndCompare(session, statement, expect); } } diff --git a/thrift/src/main/proto/rpc.thrift b/thrift/src/main/proto/rpc.thrift index 5c535cef1..5bafd8f44 100644 --- a/thrift/src/main/proto/rpc.thrift +++ b/thrift/src/main/proto/rpc.thrift @@ -581,6 +581,11 @@ struct DebugInfoResp { 2: optional binary payload } +struct RemoveHistoryDataSourceReq { + 1: required i64 sessionId + 2: required i64 dummyStorageId +} + service IService { OpenSessionResp openSession(1: OpenSessionReq req); @@ -603,6 +608,8 @@ service IService { Status addStorageEngines(1: AddStorageEnginesReq req); + Status removeHistoryDataSource(1: RemoveHistoryDataSourceReq req); + AggregateQueryResp aggregateQuery(1: AggregateQueryReq req); LastQueryResp lastQuery(1: LastQueryReq req);