From 3984d041c15415bebb056b7ff794ce6f6cb8042f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 19 Jun 2024 15:31:03 +0800 Subject: [PATCH] commit --- .../org/apache/tsfile/read/TsFileReader.java | 5 ++ .../dataset/DataSetWithoutTimeGenerator.java | 2 +- ...NonAlignedDataSetWithoutTimeGenerator.java | 71 +++++++++++++++++++ .../read/query/executor/TsFileExecutor.java | 28 +++++--- 4 files changed, 97 insertions(+), 9 deletions(-) create mode 100644 java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/NonAlignedDataSetWithoutTimeGenerator.java diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java index a3cbf3040..3c1afad43 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java @@ -54,6 +54,11 @@ public QueryDataSet query( return tsFileExecutor.execute(queryExpression, partitionStartOffset, partitionEndOffset); } + public QueryDataSet query(QueryExpression queryExpression, boolean isAlignedQuery) + throws IOException { + return tsFileExecutor.execute(queryExpression, isAlignedQuery); + } + @Override public void close() throws IOException { fileReader.close(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java index 02f8ffd45..0979f636b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java @@ -150,7 +150,7 @@ private Long timeHeapGet() { return t; } - private Field putValueToField(BatchData col) { + public static Field putValueToField(BatchData col) { TSDataType type = col.getDataType(); Field field; if (type == TSDataType.VECTOR) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/NonAlignedDataSetWithoutTimeGenerator.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/NonAlignedDataSetWithoutTimeGenerator.java new file mode 100644 index 000000000..797f49dc0 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/NonAlignedDataSetWithoutTimeGenerator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader; + +import java.io.IOException; +import java.util.List; + +import static org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator.putValueToField; + +public class NonAlignedDataSetWithoutTimeGenerator extends QueryDataSet { + private final List readers; + private BatchData batchData; + + public NonAlignedDataSetWithoutTimeGenerator( + final List paths, + final List dataTypes, + final List readers) { + super(paths, dataTypes); + this.readers = readers; + this.columnNum = 1; + } + + @Override + public boolean hasNextWithoutConstraint() { + return batchData.hasCurrent(); + } + + @Override + public RowRecord nextWithoutConstraint() throws IOException { + RowRecord rowRecord = null; + + if (batchData.hasCurrent()) { + rowRecord = new RowRecord(batchData.currentTime()); + rowRecord.addField(putValueToField(batchData)); + batchData.next(); + } + + while (!batchData.hasCurrent() && !readers.isEmpty()) { + while (!readers.get(0).hasNextBatch()) { + readers.remove(0); + } + if (!readers.isEmpty()) { + batchData = readers.get(0).nextBatch(); + } + } + return rowRecord; + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java index e599e7650..099b96a82 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java @@ -33,6 +33,7 @@ import org.apache.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.tsfile.read.expression.util.ExpressionOptimizer; import org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator; +import org.apache.tsfile.read.query.dataset.NonAlignedDataSetWithoutTimeGenerator; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader; import org.apache.tsfile.read.reader.series.EmptyFileSeriesReader; @@ -56,6 +57,11 @@ public TsFileExecutor(IMetadataQuerier metadataQuerier, IChunkLoader chunkLoader @Override public QueryDataSet execute(QueryExpression queryExpression) throws IOException { + return execute(queryExpression, true); + } + + public QueryDataSet execute(QueryExpression queryExpression, boolean isAlignedQuery) + throws IOException { // bloom filter BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter(); List filteredSeriesPath = new ArrayList<>(); @@ -79,7 +85,9 @@ public QueryDataSet execute(QueryExpression queryExpression) throws IOException if (regularIExpression instanceof GlobalTimeExpression) { return execute( - queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression); + queryExpression.getSelectedSeries(), + (GlobalTimeExpression) regularIExpression, + isAlignedQuery); } else { return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader) .execute(queryExpression); @@ -89,7 +97,7 @@ public QueryDataSet execute(QueryExpression queryExpression) throws IOException } } else { try { - return execute(queryExpression.getSelectedSeries()); + return execute(queryExpression.getSelectedSeries(), isAlignedQuery); } catch (NoMeasurementException e) { throw new IOException(e); } @@ -148,9 +156,9 @@ public QueryDataSet execute( * @param selectedPathList all selected paths * @return DataSet without TimeGenerator */ - private QueryDataSet execute(List selectedPathList) + private QueryDataSet execute(List selectedPathList, boolean isAlignedQuery) throws IOException, NoMeasurementException { - return executeMayAttachTimeFiler(selectedPathList, null); + return executeMayAttachTimeFiler(selectedPathList, null, isAlignedQuery); } /** @@ -160,9 +168,10 @@ private QueryDataSet execute(List selectedPathList) * @param timeFilter GlobalTimeExpression that takes effect to all selected paths * @return DataSet without TimeGenerator */ - private QueryDataSet execute(List selectedPathList, GlobalTimeExpression timeFilter) + private QueryDataSet execute( + List selectedPathList, GlobalTimeExpression timeFilter, boolean isAlignedQuery) throws IOException, NoMeasurementException { - return executeMayAttachTimeFiler(selectedPathList, timeFilter); + return executeMayAttachTimeFiler(selectedPathList, timeFilter, isAlignedQuery); } /** @@ -171,7 +180,7 @@ private QueryDataSet execute(List selectedPathList, GlobalTimeExpression t * @return DataSetWithoutTimeGenerator */ private QueryDataSet executeMayAttachTimeFiler( - List selectedPathList, GlobalTimeExpression timeExpression) + List selectedPathList, GlobalTimeExpression timeExpression, boolean isAlignedQuery) throws IOException, NoMeasurementException { List readersOfSelectedSeries = new ArrayList<>(); List dataTypes = new ArrayList<>(); @@ -193,6 +202,9 @@ private QueryDataSet executeMayAttachTimeFiler( } readersOfSelectedSeries.add(seriesReader); } - return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries); + return isAlignedQuery + ? new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries) + : new NonAlignedDataSetWithoutTimeGenerator( + selectedPathList, dataTypes, readersOfSelectedSeries); } }