Skip to content

Commit

Permalink
Merge branch 'main' into fixSchemaPrefix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuyuqing authored Jan 13, 2023
2 parents 2a0af95 + 303496c commit fe10036
Show file tree
Hide file tree
Showing 12 changed files with 463 additions and 278 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -38,6 +40,8 @@ public class HashInnerJoinLazyStream extends BinaryLazyStream {
private String joinColumnA;

private String joinColumnB;

private boolean needTypeCast = false;

public HashInnerJoinLazyStream(InnerJoin innerJoin, RowStream streamA, RowStream streamB) {
super(streamA, streamB);
Expand Down Expand Up @@ -89,18 +93,28 @@ private void initialize() throws PhysicalException {
}
}
this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB);

int indexA = headerA.indexOf(innerJoin.getPrefixA() + '.' + joinColumnA);
DataType dataTypeA = headerA.getField(indexA).getType();
DataType dataTypeB = headerB.getField(index).getType();
if (ValueUtils.isNumericType(dataTypeA) && ValueUtils.isNumericType(dataTypeB)) {
this.needTypeCast = true;
}

while (streamB.hasNext()) {
Row rowB = streamB.next();
Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB);
Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB);
if (value == null) {
continue;
}
if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}
List<Row> rows = streamBHashMap.getOrDefault(hash, new ArrayList<>());
rows.add(rowB);
Expand Down Expand Up @@ -139,16 +153,18 @@ public boolean hasNext() throws PhysicalException {
private void tryMatch() throws PhysicalException {
Row rowA = streamA.next();

Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA);
Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA);
if (value == null) {
return;
}

if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}

if (streamBHashMap.containsKey(hash)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -48,6 +51,8 @@ public class HashOuterJoinLazyStream extends BinaryLazyStream {
private String joinColumnA;

private String joinColumnB;

private boolean needTypeCast = false;

public HashOuterJoinLazyStream(OuterJoin outerJoin, RowStream streamA, RowStream streamB) {
super(streamA, streamB);
Expand Down Expand Up @@ -103,24 +108,35 @@ private void initialize() throws PhysicalException {
}
}

int indexAnother;
if (outerJoinType == OuterJoinType.RIGHT) {
this.index = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA);
indexAnother = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
} else {
this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
indexAnother = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA);
}

DataType dataType1 = headerA.getField(indexAnother).getType();
DataType dataType2 = headerB.getField(index).getType();
if (ValueUtils.isNumericType(dataType1) && ValueUtils.isNumericType(dataType2)) {
this.needTypeCast = true;
}

while (streamB.hasNext()) {
Row rowB = streamB.next();
Object value = rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumnB);
Value value = rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB);
if (value == null) {
continue;
}
if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}
List<Row> rows = streamBHashMap.getOrDefault(hash, new ArrayList<>());
rows.add(rowB);
Expand Down Expand Up @@ -203,16 +219,18 @@ public boolean hasNext() throws PhysicalException {
private void tryMatch() throws PhysicalException {
Row rowA = streamA.next();

Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA);
Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA);
if (value == null) {
return;
}

if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
}
int hash;
if (value instanceof byte[]) {
hash = Arrays.hashCode((byte[]) value);
if (value.getDataType() == DataType.BINARY) {
hash = Arrays.hashCode(value.getBinaryV());
} else {
hash = value.hashCode();
hash = value.getValue().hashCode();
}

if (streamBHashMap.containsKey(hash)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class NestedLoopInnerJoinLazyStream extends BinaryLazyStream {

Expand Down Expand Up @@ -131,7 +130,8 @@ private Row tryMatch() throws PhysicalException {
}
} else { // Join condition: natural or using
for (String joinColumn : joinColumns) {
if (!Objects.equals(nextA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) {
if (ValueUtils.compare(nextA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn),
nextB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) {
nextB = null;
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

public class NestedLoopOuterJoinLazyStream extends BinaryLazyStream {
Expand Down Expand Up @@ -205,7 +205,8 @@ private Row tryMatch() throws PhysicalException {
}
} else { // Join condition: natural or using
for (String joinColumn : joinColumns) {
if (!Objects.equals(nextA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) {
if (ValueUtils.compare(nextA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn),
nextB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) {
nextB = null;
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

/**
* input two stream must be ascending order.
Expand All @@ -36,15 +35,13 @@ public class SortedMergeInnerJoinLazyStream extends BinaryLazyStream {

private String joinColumnB;

private DataType joinColumnDataType;

private Row nextA;

private Row nextB;

private int index;

private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join

private final List<Row> sameValueStreamBRows; // StreamB中join列的值相同的列缓存

Expand Down Expand Up @@ -101,13 +98,6 @@ private void initialize() throws PhysicalException {
}
this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB);

DataType dataTypeA = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType();
DataType dataTypeB = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType();
if (!dataTypeA.equals(dataTypeB)) {
throw new InvalidOperatorParameterException("the datatype of join columns is different");
}
joinColumnDataType = dataTypeA;

if (filter != null) { // Join condition: on
this.header = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB());
} else { // Join condition: natural or using
Expand Down Expand Up @@ -135,8 +125,8 @@ public boolean hasNext() throws PhysicalException {
}

private void tryMatch() throws PhysicalException {
Object curJoinColumnAValue = nextA.getValue(innerJoin.getPrefixA() + "." + joinColumnA);
int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue);
Value curJoinColumnAValue = nextA.getAsValue(innerJoin.getPrefixA() + "." + joinColumnA);
int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue);
if (cmp < 0) {
nextA = null;
} else if (cmp > 0) {
Expand Down Expand Up @@ -169,13 +159,13 @@ private boolean hasMoreRows() throws PhysicalException {
}
while (sameValueStreamBRows.isEmpty() && nextB != null) {
sameValueStreamBRows.add(nextB);
curJoinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB);
curJoinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB);
nextB = null;

while (streamB.hasNext()) {
nextB = streamB.next();
Object joinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB);
if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) {
Value joinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB);
if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) {
sameValueStreamBRows.add(nextB);
nextB = null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream {

Expand All @@ -35,13 +35,11 @@ public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream {

private String joinColumnB;

private DataType joinColumnDataType;

private Row nextA;

private Row nextB;

private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join

private final List<Row> sameValueStreamBRows; // StreamB中join列的值相同的列缓存

Expand Down Expand Up @@ -113,13 +111,6 @@ private void initialize() throws PhysicalException {
this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
}

DataType dataTypeA = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType();
DataType dataTypeB = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType();
if (!dataTypeA.equals(dataTypeB)) {
throw new InvalidOperatorParameterException("the datatype of join columns is different");
}
joinColumnDataType = dataTypeA;

if (filter != null) { // Join condition: on
this.header = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB());
} else { // Join condition: natural or using
Expand Down Expand Up @@ -205,8 +196,8 @@ public boolean hasNext() throws PhysicalException {
}

private void tryMatch() throws PhysicalException {
Object curJoinColumnAValue = nextA.getValue(outerJoin.getPrefixA() + "." + joinColumnA);
int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue);
Value curJoinColumnAValue = nextA.getAsValue(outerJoin.getPrefixA() + "." + joinColumnA);
int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue);
if (cmp < 0) {
unmatchedStreamARows.add(nextA);
nextA = null;
Expand Down Expand Up @@ -255,13 +246,13 @@ private boolean hasMoreRows() throws PhysicalException {
}
while (sameValueStreamBRows.isEmpty() && nextB != null) {
sameValueStreamBRows.add(nextB);
curJoinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB);
curJoinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB);
nextB = null;

while (streamB.hasNext()) {
nextB = streamB.next();
Object joinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB);
if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) {
Value joinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB);
if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) {
sameValueStreamBRows.add(nextB);
nextB = null;
} else {
Expand Down
Loading

0 comments on commit fe10036

Please sign in to comment.