Skip to content

Commit

Permalink
Adjust join (#496)
Browse files Browse the repository at this point in the history
* adjust join
1.result set add key columns
2.fix some bugs

* adjust join

* fix a bug

Fix a bug about handling filter in sorted merge join.

* fix some bugs

1. handling filter in sorted merge join.
2. value comparison of different datatype

* fix some bugs

handle different types of data comparison

* Update NaiveOperatorMemoryExecutor.java

* adjust join stream

Co-authored-by: Jensen Wang <[email protected]>
Co-authored-by: zhuyuqing <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2023
1 parent 016843a commit 303496c
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 277 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -38,6 +40,8 @@ public class HashInnerJoinLazyStream extends BinaryLazyStream {
private String joinColumnA;

private String joinColumnB;

private boolean needTypeCast = false;

public HashInnerJoinLazyStream(InnerJoin innerJoin, RowStream streamA, RowStream streamB) {
super(streamA, streamB);
Expand Down Expand Up @@ -89,18 +93,28 @@ private void initialize() throws PhysicalException {
}
}
this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB);

int indexA = headerA.indexOf(innerJoin.getPrefixA() + '.' + joinColumnA);
DataType dataTypeA = headerA.getField(indexA).getType();
DataType dataTypeB = headerB.getField(index).getType();
if (ValueUtils.isNumericType(dataTypeA) && ValueUtils.isNumericType(dataTypeB)) {
this.needTypeCast = true;
}

while (streamB.hasNext()) {
Row rowB = streamB.next();
Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB);
Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB);
if (value == null) {
continue;
}
if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}
List<Row> rows = streamBHashMap.getOrDefault(hash, new ArrayList<>());
rows.add(rowB);
Expand Down Expand Up @@ -139,16 +153,18 @@ public boolean hasNext() throws PhysicalException {
private void tryMatch() throws PhysicalException {
Row rowA = streamA.next();

Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA);
Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA);
if (value == null) {
return;
}

if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}

if (streamBHashMap.containsKey(hash)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -48,6 +51,8 @@ public class HashOuterJoinLazyStream extends BinaryLazyStream {
private String joinColumnA;

private String joinColumnB;

private boolean needTypeCast = false;

public HashOuterJoinLazyStream(OuterJoin outerJoin, RowStream streamA, RowStream streamB) {
super(streamA, streamB);
Expand Down Expand Up @@ -103,24 +108,35 @@ private void initialize() throws PhysicalException {
}
}

int indexAnother;
if (outerJoinType == OuterJoinType.RIGHT) {
this.index = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA);
indexAnother = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
} else {
this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
indexAnother = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA);
}

DataType dataType1 = headerA.getField(indexAnother).getType();
DataType dataType2 = headerB.getField(index).getType();
if (ValueUtils.isNumericType(dataType1) && ValueUtils.isNumericType(dataType2)) {
this.needTypeCast = true;
}

while (streamB.hasNext()) {
Row rowB = streamB.next();
Object value = rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumnB);
Value value = rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB);
if (value == null) {
continue;
}
if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}
List<Row> rows = streamBHashMap.getOrDefault(hash, new ArrayList<>());
rows.add(rowB);
Expand Down Expand Up @@ -203,16 +219,18 @@ public boolean hasNext() throws PhysicalException {
private void tryMatch() throws PhysicalException {
Row rowA = streamA.next();

Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA);
Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA);
if (value == null) {
return;
}

if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}

if (streamBHashMap.containsKey(hash)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class NestedLoopInnerJoinLazyStream extends BinaryLazyStream {

Expand Down Expand Up @@ -131,7 +130,8 @@ private Row tryMatch() throws PhysicalException {
}
} else { // Join condition: natural or using
for (String joinColumn : joinColumns) {
if (!Objects.equals(nextA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) {
if (ValueUtils.compare(nextA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn),
nextB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) {
nextB = null;
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

public class NestedLoopOuterJoinLazyStream extends BinaryLazyStream {
Expand Down Expand Up @@ -205,7 +205,8 @@ private Row tryMatch() throws PhysicalException {
}
} else { // Join condition: natural or using
for (String joinColumn : joinColumns) {
if (!Objects.equals(nextA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) {
if (ValueUtils.compare(nextA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn),
nextB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) {
nextB = null;
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

/**
* input two stream must be ascending order.
Expand All @@ -36,15 +35,13 @@ public class SortedMergeInnerJoinLazyStream extends BinaryLazyStream {

private String joinColumnB;

private DataType joinColumnDataType;

private Row nextA;

private Row nextB;

private int index;

private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join

private final List<Row> sameValueStreamBRows; // StreamB中join列的值相同的列缓存

Expand Down Expand Up @@ -101,13 +98,6 @@ private void initialize() throws PhysicalException {
}
this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB);

DataType dataTypeA = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType();
DataType dataTypeB = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType();
if (!dataTypeA.equals(dataTypeB)) {
throw new InvalidOperatorParameterException("the datatype of join columns is different");
}
joinColumnDataType = dataTypeA;

if (filter != null) { // Join condition: on
this.header = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB());
} else { // Join condition: natural or using
Expand Down Expand Up @@ -135,8 +125,8 @@ public boolean hasNext() throws PhysicalException {
}

private void tryMatch() throws PhysicalException {
Object curJoinColumnAValue = nextA.getValue(innerJoin.getPrefixA() + "." + joinColumnA);
int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue);
Value curJoinColumnAValue = nextA.getAsValue(innerJoin.getPrefixA() + "." + joinColumnA);
int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue);
if (cmp < 0) {
nextA = null;
} else if (cmp > 0) {
Expand Down Expand Up @@ -169,13 +159,13 @@ private boolean hasMoreRows() throws PhysicalException {
}
while (sameValueStreamBRows.isEmpty() && nextB != null) {
sameValueStreamBRows.add(nextB);
curJoinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB);
curJoinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB);
nextB = null;

while (streamB.hasNext()) {
nextB = streamB.next();
Object joinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB);
if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) {
Value joinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB);
if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) {
sameValueStreamBRows.add(nextB);
nextB = null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream {

Expand All @@ -35,13 +35,11 @@ public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream {

private String joinColumnB;

private DataType joinColumnDataType;

private Row nextA;

private Row nextB;

private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join

private final List<Row> sameValueStreamBRows; // StreamB中join列的值相同的列缓存

Expand Down Expand Up @@ -113,13 +111,6 @@ private void initialize() throws PhysicalException {
this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
}

DataType dataTypeA = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType();
DataType dataTypeB = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType();
if (!dataTypeA.equals(dataTypeB)) {
throw new InvalidOperatorParameterException("the datatype of join columns is different");
}
joinColumnDataType = dataTypeA;

if (filter != null) { // Join condition: on
this.header = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB());
} else { // Join condition: natural or using
Expand Down Expand Up @@ -205,8 +196,8 @@ public boolean hasNext() throws PhysicalException {
}

private void tryMatch() throws PhysicalException {
Object curJoinColumnAValue = nextA.getValue(outerJoin.getPrefixA() + "." + joinColumnA);
int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue);
Value curJoinColumnAValue = nextA.getAsValue(outerJoin.getPrefixA() + "." + joinColumnA);
int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue);
if (cmp < 0) {
unmatchedStreamARows.add(nextA);
nextA = null;
Expand Down Expand Up @@ -255,13 +246,13 @@ private boolean hasMoreRows() throws PhysicalException {
}
while (sameValueStreamBRows.isEmpty() && nextB != null) {
sameValueStreamBRows.add(nextB);
curJoinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB);
curJoinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB);
nextB = null;

while (streamB.hasNext()) {
nextB = streamB.next();
Object joinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB);
if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) {
Value joinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB);
if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) {
sameValueStreamBRows.add(nextB);
nextB = null;
} else {
Expand Down
Loading

0 comments on commit 303496c

Please sign in to comment.