diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java index 7478eba61..eac7dae00 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java @@ -328,9 +328,11 @@ private Operator mergeRawData(Map> fragments, L if (!dummyFragments.isEmpty()) { List joinList = new ArrayList<>(); dummyFragments.forEach(meta -> { - if (meta.isValid()) - joinList.add(new Project(new FragmentSource(meta), - pathMatchPrefix(pathList,meta.getTsInterval().getTimeSeries(), meta.getTsInterval().getSchemaPrefix()), tagFilter)); + if (meta.isValid()) { + String schemaPrefix = meta.getTsInterval().getSchemaPrefix(); + joinList.add(new AddSchemaPrefix(new OperatorSource(new Project(new FragmentSource(meta), + pathMatchPrefix(pathList, meta.getTsInterval().getTimeSeries(), schemaPrefix), tagFilter)), schemaPrefix)); + } }); joinList.add(operator); operator = OperatorUtils.joinOperatorsByTime(joinList); @@ -349,13 +351,30 @@ private Pair>, List> getFragm return keyFromTSIntervalToTimeInterval(fragmentsByTSInterval); } + // 筛选出满足 dataPrefix前缀,并且去除 schemaPrefix private List pathMatchPrefix(List pathList, String prefix, String schemaPrefix) { - if (prefix == null) return pathList; - if (schemaPrefix != null) prefix = schemaPrefix + "." + prefix; // deal with the schemaPrefix + if (prefix == null && schemaPrefix == null) return pathList; List ans = new ArrayList<>(); + + if (prefix == null) { // deal with the schemaPrefix + for(String path : pathList) { + if (path.equals("*.*") || path.equals("*")) { + ans.add(path); + } else if (path.indexOf(schemaPrefix) == 0) { + path = path.substring(schemaPrefix.length() + 1); + ans.add(path); + } + } + return ans; + } +// if (schemaPrefix != null) prefix = schemaPrefix + "." + prefix; + for(String path : pathList) { - if (path.equals("*.*")) { - ans.add(path); + if (schemaPrefix != null && path.indexOf(schemaPrefix) == 0) { + path = path.substring(schemaPrefix.length() + 1); + } + if (path.equals("*.*") || path.equals("*")) { + ans.add(prefix + ".*"); } else if (path.charAt(path.length()-1) == '*' && path.length() != 1) { // 通配符匹配,例如 a.b.* String queryPrefix = path.substring(0,path.length()-2) + ".(.*)"; if (prefix.matches(queryPrefix)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java index 7e521e9bb..8a4843971 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java @@ -81,6 +81,8 @@ public RowStream executeUnaryOperator(UnaryOperator operator, RowStream stream) return executeRename((Rename) operator, transformToTable(stream)); case Reorder: return executeReorder((Reorder) operator, transformToTable(stream)); + case AddSchemaPrefix: + return executeAddSchemaPrefix((AddSchemaPrefix) operator, transformToTable(stream)); default: throw new UnexpectedOperatorException("unknown unary operator: " + operator.getType()); } @@ -333,6 +335,32 @@ private RowStream executeRename(Rename rename, Table table) throws PhysicalExcep return new Table(newHeader, rows); } + private RowStream executeAddSchemaPrefix(AddSchemaPrefix addSchemaPrefix, Table table) throws PhysicalException { + Header header = table.getHeader(); + String schemaPrefix = addSchemaPrefix.getSchemaPrefix(); + + List fields = new ArrayList<>(); + header.getFields().forEach(field -> { + if (schemaPrefix != null) + fields.add(new Field(schemaPrefix + "." + field.getName(), field.getType(), field.getTags())); + else + fields.add(new Field(field.getName(), field.getType(), field.getTags())); + }); + + Header newHeader = new Header(header.getKey(), fields); + + List rows = new ArrayList<>(); + table.getRows().forEach(row -> { + if (newHeader.hasKey()) { + rows.add(new Row(newHeader, row.getKey(), row.getValues())); + } else { + rows.add(new Row(newHeader, row.getValues())); + } + }); + + return new Table(newHeader, rows); + } + private RowStream executeReorder(Reorder reorder, Table table) throws PhysicalException { List patterns = reorder.getPatterns(); Header header = table.getHeader(); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/AddSchemaPrefixLazyStream.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/AddSchemaPrefixLazyStream.java new file mode 100644 index 000000000..02d18b291 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/AddSchemaPrefixLazyStream.java @@ -0,0 +1,65 @@ +package cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream; + +import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; +import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; +import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; +import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; +import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.operator.AddSchemaPrefix; +import cn.edu.tsinghua.iginx.engine.shared.operator.Rename; +import cn.edu.tsinghua.iginx.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class AddSchemaPrefixLazyStream extends UnaryLazyStream { + + private final AddSchemaPrefix addSchemaPrefix; + + private Header header; + + public AddSchemaPrefixLazyStream(AddSchemaPrefix addSchemaPrefix, RowStream stream) { + super(stream); + this.addSchemaPrefix = addSchemaPrefix; + } + + @Override + public Header getHeader() throws PhysicalException { + if (header == null) { + Header header = stream.getHeader(); + String schemaPrefix = addSchemaPrefix.getSchemaPrefix(); + + List fields = new ArrayList<>(); + header.getFields().forEach(field -> { + if (schemaPrefix != null) + fields.add(new Field(schemaPrefix + "." + field.getName(), field.getType(), field.getTags())); + else + fields.add(new Field(field.getName(), field.getType(), field.getTags())); + }); + + this.header = new Header(header.getKey(), fields); + } + return header; + } + + @Override + public boolean hasNext() throws PhysicalException { + return stream.hasNext(); + } + + @Override + public Row next() throws PhysicalException { + if (!hasNext()) { + throw new IllegalStateException("row stream doesn't have more data!"); + } + + Row row = stream.next(); + if (header.hasKey()) { + return new Row(header, row.getKey(), row.getValues()); + } else { + return new Row(header, row.getValues()); + } + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/StreamOperatorMemoryExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/StreamOperatorMemoryExecutor.java index 70125a875..29d179744 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/StreamOperatorMemoryExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/StreamOperatorMemoryExecutor.java @@ -59,6 +59,8 @@ public RowStream executeUnaryOperator(UnaryOperator operator, RowStream stream) return executeRename((Rename) operator, stream); case Reorder: return executeReorder((Reorder) operator, stream); + case AddSchemaPrefix: + return executeAddSchemaPrefix((AddSchemaPrefix) operator, stream); default: throw new UnexpectedOperatorException("unknown unary operator: " + operator.getType()); } @@ -128,6 +130,10 @@ private RowStream executeReorder(Reorder reorder, RowStream stream) { return new ReorderLazyStream(reorder, stream); } + private RowStream executeAddSchemaPrefix(AddSchemaPrefix addSchemaPrefix, RowStream stream) { + return new AddSchemaPrefixLazyStream(addSchemaPrefix, stream); + } + private RowStream executeJoin(Join join, RowStream streamA, RowStream streamB) throws PhysicalException { if (!join.getJoinBy().equals(Constants.KEY) && !join.getJoinBy().equals(Constants.ORDINAL)) { throw new InvalidOperatorParameterException("join operator is not support for field " + join.getJoinBy() + " except for " + Constants.KEY diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/AddSchemaPrefix.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/AddSchemaPrefix.java new file mode 100644 index 000000000..451b8796a --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/AddSchemaPrefix.java @@ -0,0 +1,26 @@ +package cn.edu.tsinghua.iginx.engine.shared.operator; + +import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType; +import cn.edu.tsinghua.iginx.engine.shared.source.Source; + +import java.util.ArrayList; +import java.util.List; + +public class AddSchemaPrefix extends AbstractUnaryOperator { + + private final String schemaPrefix;// 可以为 null + + public AddSchemaPrefix(Source source, String schemaPrefix) { + super(OperatorType.AddSchemaPrefix, source); + this.schemaPrefix = schemaPrefix; + } + + @Override + public Operator copy() { + return new AddSchemaPrefix(getSource().copy(), schemaPrefix); + } + + public String getSchemaPrefix() { + return schemaPrefix; + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java index 5601c82af..7be03f308 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java @@ -20,18 +20,32 @@ public enum OperatorType { - Unknown, - Binary, - Unary, - Multiple, + // Exception[0,9] + Unknown(0), - Project, - Select, - Join, + // MultipleOperator[10,19] + CombineNonQuery(10), + + //isGlobalOperator[20,29] + ShowTimeSeries(20), + Migration, + + // BinaryOperator[30,39] + Join(30), + Union, InnerJoin, OuterJoin, CrossJoin, - Union, + + + // isUnaryOperator >= 40 + Binary(40), + Unary, + Delete, + Insert, + Multiple, + Project, + Select, Sort, Limit, Downsample, @@ -39,23 +53,36 @@ public enum OperatorType { SetTransform, MappingTransform, Rename, - Reorder, + AddSchemaPrefix; - Delete, - Insert, - CombineNonQuery, - ShowTimeSeries, - Migration; + private int value; + OperatorType(){ + this(OperatorTypeCounter.nextValue); + } + OperatorType(int value){ + this.value = value; + OperatorTypeCounter.nextValue = value + 1; + } + + public int getValue() + { + return value; + } + + private static class OperatorTypeCounter + { + private static int nextValue = 0; + } public static boolean isBinaryOperator(OperatorType op) { - return op == Join || op == Union || op == InnerJoin || op == OuterJoin || op == CrossJoin; + return op.value >= 30 && op.value <= 39; } public static boolean isUnaryOperator(OperatorType op) { - return op == Project || op == Select || op == Sort || op == Limit || op == Downsample || op == RowTransform || op == SetTransform || op == MappingTransform || op == Delete || op == Insert || op == Rename || op == Reorder; + return op.value >= 40; } public static boolean isMultipleOperator(OperatorType op) { diff --git a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java index b435935f5..5ab00fa0b 100644 --- a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java +++ b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java @@ -236,18 +236,11 @@ public TaskExecuteResult execute(StoragePhysicalTask task) { return new TaskExecuteResult(new NonExecutablePhysicalTaskException("unsupported physical task")); } - private String getRealPathWithoutPrefix(String oriPath, String prefix) { - if (prefix != null && !prefix.isEmpty() && oriPath.contains(prefix)) { - return oriPath.substring(oriPath.indexOf(prefix) + prefix.length() + 1); - } - return oriPath; - } - private TaskExecuteResult executeHistoryProjectTask(TimeSeriesRange timeSeriesInterval, TimeInterval timeInterval, Project project) { Map bucketQueries = new HashMap<>(); TagFilter tagFilter = project.getTagFilter(); for (String pattern: project.getPatterns()) { - Pair pair = SchemaTransformer.processPatternForQuery(getRealPathWithoutPrefix(pattern, timeSeriesInterval.getSchemaPrefix()), tagFilter); + Pair pair = SchemaTransformer.processPatternForQuery(pattern, tagFilter); String bucketName = pair.k; String query = pair.v; String fullQuery = ""; @@ -276,7 +269,7 @@ private TaskExecuteResult executeHistoryProjectTask(TimeSeriesRange timeSeriesIn bucketQueryResults.put(bucket, client.getQueryApi().query(statement, organization.getId())); } - InfluxDBHistoryQueryRowStream rowStream = new InfluxDBHistoryQueryRowStream(bucketQueryResults, project.getPatterns(), timeSeriesInterval.getSchemaPrefix()); + InfluxDBHistoryQueryRowStream rowStream = new InfluxDBHistoryQueryRowStream(bucketQueryResults, project.getPatterns()); return new TaskExecuteResult(rowStream); } diff --git a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/query/entity/InfluxDBHistoryQueryRowStream.java b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/query/entity/InfluxDBHistoryQueryRowStream.java index 9c5946977..aaeba745a 100644 --- a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/query/entity/InfluxDBHistoryQueryRowStream.java +++ b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/query/entity/InfluxDBHistoryQueryRowStream.java @@ -45,11 +45,8 @@ public class InfluxDBHistoryQueryRowStream implements RowStream { private int hasMoreRecords; private int size; - public InfluxDBHistoryQueryRowStream(Map> bucketQueryResults, List patterns) { - this(bucketQueryResults, patterns, null); - } - public InfluxDBHistoryQueryRowStream(Map> bucketQueryResults, List patterns, String prefix) { + public InfluxDBHistoryQueryRowStream(Map> bucketQueryResults, List patterns) { this.bucketQueryResults = new ArrayList<>(bucketQueryResults.entrySet()); this.indexList = new ArrayList<>(); List fields = new ArrayList<>(); @@ -58,7 +55,7 @@ public InfluxDBHistoryQueryRowStream(Map> bucketQueryRes List tables = this.bucketQueryResults.get(i).getValue(); this.indexList.add(new int[tables.size()]); for (FluxTable table: tables) { - fields.add(SchemaTransformer.toField(bucket, table, prefix)); + fields.add(SchemaTransformer.toField(bucket, table)); this.hasMoreRecords++; this.size++; } diff --git a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/tools/SchemaTransformer.java b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/tools/SchemaTransformer.java index dd7d59ed0..2c6026646 100644 --- a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/tools/SchemaTransformer.java +++ b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/tools/SchemaTransformer.java @@ -17,10 +17,6 @@ public class SchemaTransformer { public static Field toField(String bucket, FluxTable table) { - return toField(bucket, table, null); - } - - public static Field toField(String bucket, FluxTable table, String prefix) { FluxRecord record = table.getRecords().get(0); String measurement = record.getMeasurement(); String field = record.getField(); @@ -36,10 +32,6 @@ public static Field toField(String bucket, FluxTable table, String prefix) { DataType dataType = fromInfluxDB(table.getColumns().stream().filter(x -> x.getLabel().equals("_value")).collect(Collectors.toList()).get(0).getDataType()); StringBuilder pathBuilder = new StringBuilder(); - if (prefix != null) { - pathBuilder.append(prefix); - pathBuilder.append('.'); - } pathBuilder.append(bucket); pathBuilder.append('.'); pathBuilder.append(measurement); diff --git a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java index 3cb556d84..6bbfe5797 100644 --- a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java +++ b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java @@ -319,12 +319,12 @@ private TaskExecuteResult executeQueryHistoryTask(TimeSeriesRange timeSeriesInte try { StringBuilder builder = new StringBuilder(); for (String path : project.getPatterns()) { - builder.append(getRealPathWithoutPrefix(path, timeSeriesInterval.getSchemaPrefix())); + builder.append(path); builder.append(','); } String statement = String.format(QUERY_HISTORY_DATA, builder.deleteCharAt(builder.length() - 1).toString(), FilterTransformer.toString(filter)); logger.info("[Query] execute query: " + statement); - RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project, timeSeriesInterval.getSchemaPrefix())); + RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project)); return new TaskExecuteResult(rowStream); } catch (IoTDBConnectionException | StatementExecutionException e) { logger.error(e.getMessage()); diff --git a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java index 3ca8ec9ca..6c07bb8be 100644 --- a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java +++ b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java @@ -66,10 +66,6 @@ enum State { private State state; public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project) { - this(dataset, trimStorageUnit, project, null); - } - - public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project, String prefix) { this.dataset = dataset; this.trimStorageUnit = trimStorageUnit; this.filterByTags = project.getTagFilter() != null; @@ -91,7 +87,7 @@ public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUni } name = transformColumnName(name); Pair> pair = TagKVUtils.splitFullName(name); - Field field = new Field(prefix==null? pair.getK() : prefix + "." + pair.getK(), DataTypeTransformer.fromIoTDB(type), pair.getV()); + Field field = new Field(pair.getK(), DataTypeTransformer.fromIoTDB(type), pair.getV()); if (!this.trimStorageUnit && field.getFullName().startsWith(UNIT)) { filterList.add(true); continue; diff --git a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java index 662ba616e..2ac494f5e 100644 --- a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java +++ b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java @@ -317,12 +317,12 @@ private TaskExecuteResult executeQueryHistoryTask(TimeSeriesRange timeSeriesInte try { StringBuilder builder = new StringBuilder(); for (String path : project.getPatterns()) { - builder.append(getRealPathWithoutPrefix(path, timeSeriesInterval.getSchemaPrefix())); + builder.append(path); builder.append(','); } String statement = String.format(QUERY_HISTORY_DATA, builder.deleteCharAt(builder.length() - 1).toString(), FilterTransformer.toString(filter)); logger.info("[Query] execute query: " + statement); - RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project, timeSeriesInterval.getSchemaPrefix())); + RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project)); return new TaskExecuteResult(rowStream); } catch (IoTDBConnectionException | StatementExecutionException e) { logger.error(e.getMessage()); diff --git a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java index 28a154a75..e8caf6aed 100644 --- a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java +++ b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/query/entity/IoTDBQueryRowStream.java @@ -65,10 +65,6 @@ enum State { private State state; public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project) { - this(dataset, trimStorageUnit, project, null); - } - - public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project, String prefix) { this.dataset = dataset; this.trimStorageUnit = trimStorageUnit; this.filterByTags = project.getTagFilter() != null; @@ -90,7 +86,7 @@ public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUni } name = transformColumnName(name); Pair> pair = TagKVUtils.splitFullName(name); - Field field = new Field(prefix==null? pair.getK() : prefix + "." + pair.getK(), DataTypeTransformer.strFromIoTDB(type), pair.getV()); + Field field = new Field(pair.getK(), DataTypeTransformer.strFromIoTDB(type), pair.getV()); if (!this.trimStorageUnit && field.getFullName().startsWith(UNIT)) { filterList.add(true); continue; diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java index c69d362a8..2db32a9fc 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java @@ -138,8 +138,7 @@ public TaskExecuteResult execute(StoragePhysicalTask task) { project.getTagFilter(), FilterTransformer.toString(filter), storageUnit, - isDummyStorageUnit, - task.getTargetFragment().getTsInterval().getSchemaPrefix()); + isDummyStorageUnit); } else if (op.getType() == OperatorType.Insert) { Insert insert = (Insert) op; return executor.executeInsertTask( diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/entity/ParquetQueryRowStream.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/entity/ParquetQueryRowStream.java index fedaecba1..b1c854fa7 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/entity/ParquetQueryRowStream.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/entity/ParquetQueryRowStream.java @@ -39,17 +39,10 @@ public class ParquetQueryRowStream implements RowStream { private boolean hasNextCache = false; - private final String schemaPrefix; - private final Map physicalNameCache = new HashMap<>(); public ParquetQueryRowStream(ResultSet rs, TagFilter tagFilter) { - this(rs, tagFilter, ""); - } - - public ParquetQueryRowStream(ResultSet rs, TagFilter tagFilter, String schemaPrefix) { this.rs = rs; - this.schemaPrefix = schemaPrefix; if (rs == null) { this.header = new Header(Field.KEY, Collections.emptyList()); @@ -72,12 +65,7 @@ public ParquetQueryRowStream(ResultSet rs, TagFilter tagFilter, String schemaPre Pair> pair = TagKVUtils.splitFullName(pathName); DataType type = fromParquetDataType(rsMetaData.getColumnTypeName(i)); - Field field; - if (schemaPrefix.equals("")) { - field = new Field(pair.getK(), type, pair.getV()); - } else { - field = new Field(schemaPrefix + "." + pair.getK(), type, pair.getV()); - } + Field field = new Field(pair.getK(), type, pair.getV()); if (filterByTags && !TagKVUtils.match(pair.v, tagFilter)) { continue; @@ -168,9 +156,6 @@ private String getPhysicalPath(Field field) { return physicalNameCache.get(field); } else { String name = field.getName(); - if (!schemaPrefix.equals("")) { - name = name.substring(name.indexOf(schemaPrefix) + schemaPrefix.length() + 1); - } String path = TagKVUtils.toFullName(name, field.getTags()); path = path.replaceAll(IGINX_SEPARATOR, PARQUET_SEPARATOR); physicalNameCache.put(field, path); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java index af036f093..7f3d47250 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java @@ -15,7 +15,7 @@ public interface Executor { TaskExecuteResult executeProjectTask(List paths, TagFilter tagFilter, String filter, - String storageUnit, boolean isDummyStorageUnit, String schemaPrefix); + String storageUnit, boolean isDummyStorageUnit); TaskExecuteResult executeInsertTask(DataView dataView, String storageUnit); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java index 208673fd5..2f111710e 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java @@ -110,7 +110,7 @@ public LocalExecutor(ParquetStoragePolicy policy, Connection connection, String @Override public TaskExecuteResult executeProjectTask(List paths, TagFilter tagFilter, - String filter, String storageUnit, boolean isDummyStorageUnit, String schemaPrefix) { + String filter, String storageUnit, boolean isDummyStorageUnit) { try { createDUDirectoryIfNotExists(storageUnit); } catch (PhysicalException e) { @@ -118,7 +118,7 @@ public TaskExecuteResult executeProjectTask(List paths, TagFilter tagFil } if (isDummyStorageUnit) { - return executeDummyProjectTask(paths, tagFilter, filter, storageUnit, schemaPrefix); + return executeDummyProjectTask(paths, tagFilter, filter, storageUnit); } try { @@ -154,22 +154,11 @@ public TaskExecuteResult executeProjectTask(List paths, TagFilter tagFil } private TaskExecuteResult executeDummyProjectTask(List paths, TagFilter tagFilter, - String filter, String storageUnit, String schemaPrefix) { + String filter, String storageUnit) { try { Connection conn = ((DuckDBConnection) connection).duplicate(); Statement stmt = conn.createStatement(); - - // trim prefix - List pathList = new ArrayList<>(); - if (schemaPrefix != null && !schemaPrefix.equals("")) { - for (String path : paths) { - if (path.contains(schemaPrefix)) { - pathList.add(path.substring(path.indexOf(schemaPrefix) + schemaPrefix.length() + 1)); - } else if (path.equals("*")) { - pathList.add(path); - } - } - } + List pathList = new ArrayList<>(paths); pathList = determinePathListWithTagFilter(storageUnit, pathList, tagFilter, true); if (pathList.isEmpty()) { @@ -192,7 +181,7 @@ private TaskExecuteResult executeDummyProjectTask(List paths, TagFilter conn.close(); RowStream rowStream = new ClearEmptyRowStreamWrapper( - new MergeTimeRowStreamWrapper(new ParquetQueryRowStream(rs, tagFilter, schemaPrefix))); + new MergeTimeRowStreamWrapper(new ParquetQueryRowStream(rs, tagFilter))); return new TaskExecuteResult(rowStream); } catch (SQLException | PhysicalException e) { logger.error(e.getMessage()); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java index 3b65a1698..fa424868d 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java @@ -74,7 +74,7 @@ public RemoteExecutor(String ip, int port) throws TTransportException { @Override public TaskExecuteResult executeProjectTask(List paths, TagFilter tagFilter, - String filter, String storageUnit, boolean isDummyStorageUnit, String schemaPrefix) { + String filter, String storageUnit, boolean isDummyStorageUnit) { ProjectReq req = new ProjectReq(storageUnit, isDummyStorageUnit, paths); if (tagFilter != null) { req.setTagFilter(constructRawTagFilter(tagFilter)); @@ -82,9 +82,6 @@ public TaskExecuteResult executeProjectTask(List paths, TagFilter tagFil if (filter != null && !filter.equals("")) { req.setFilter(filter); } - if (schemaPrefix != null && !schemaPrefix.equals("")) { - req.setSchemaPrefix(schemaPrefix); - } try { ProjectResp resp = client.executeProject(req); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java index 9a20f6759..183f56697 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java @@ -82,8 +82,7 @@ public ProjectResp executeProject(ProjectReq req) throws TException { tagFilter, req.getFilter(), req.getStorageUnit(), - req.isDummyStorageUnit, - req.getSchemaPrefix()); + req.isDummyStorageUnit); if (result.getException() != null || result.getRowStream() == null) { return new ProjectResp(EXEC_PROJECT_FAIL); diff --git a/shared/src/main/java/cn/edu/tsinghua/iginx/utils/StringUtils.java b/shared/src/main/java/cn/edu/tsinghua/iginx/utils/StringUtils.java index 9d00e256a..d56c7917a 100644 --- a/shared/src/main/java/cn/edu/tsinghua/iginx/utils/StringUtils.java +++ b/shared/src/main/java/cn/edu/tsinghua/iginx/utils/StringUtils.java @@ -55,7 +55,7 @@ public static int compare(String ts, String border, boolean isStart) { * @param border 前缀式时间范围 */ public static int compare(String ts, String border) { - if (ts.indexOf(border) == 0) { + if (ts.indexOf(border) == 0 || ts.equals("*.*")) { return 0; } else diff --git a/thrift/src/main/proto/parquet.thrift b/thrift/src/main/proto/parquet.thrift index 93dc73669..b5442ec12 100644 --- a/thrift/src/main/proto/parquet.thrift +++ b/thrift/src/main/proto/parquet.thrift @@ -28,7 +28,6 @@ struct ProjectReq { 3: required list paths 4: optional RawTagFilter tagFilter 5: optional string filter - 6: optional string schemaPrefix } struct ParquetHeader {