Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi committed Jun 19, 2024
1 parent 0b496e8 commit 3984d04
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public QueryDataSet query(
return tsFileExecutor.execute(queryExpression, partitionStartOffset, partitionEndOffset);
}

public QueryDataSet query(QueryExpression queryExpression, boolean isAlignedQuery)
throws IOException {
return tsFileExecutor.execute(queryExpression, isAlignedQuery);
}

@Override
public void close() throws IOException {
fileReader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private Long timeHeapGet() {
return t;
}

private Field putValueToField(BatchData col) {
public static Field putValueToField(BatchData col) {
TSDataType type = col.getDataType();
Field field;
if (type == TSDataType.VECTOR) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.read.query.dataset;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;

import java.io.IOException;
import java.util.List;

import static org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator.putValueToField;

public class NonAlignedDataSetWithoutTimeGenerator extends QueryDataSet {
private final List<AbstractFileSeriesReader> readers;
private BatchData batchData;

public NonAlignedDataSetWithoutTimeGenerator(
final List<Path> paths,
final List<TSDataType> dataTypes,
final List<AbstractFileSeriesReader> readers) {
super(paths, dataTypes);
this.readers = readers;
this.columnNum = 1;
}

@Override
public boolean hasNextWithoutConstraint() {
return batchData.hasCurrent();
}

@Override
public RowRecord nextWithoutConstraint() throws IOException {
RowRecord rowRecord = null;

if (batchData.hasCurrent()) {
rowRecord = new RowRecord(batchData.currentTime());
rowRecord.addField(putValueToField(batchData));
batchData.next();
}

while (!batchData.hasCurrent() && !readers.isEmpty()) {
while (!readers.get(0).hasNextBatch()) {
readers.remove(0);
}
if (!readers.isEmpty()) {
batchData = readers.get(0).nextBatch();
}
}
return rowRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
import org.apache.tsfile.read.query.dataset.NonAlignedDataSetWithoutTimeGenerator;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.tsfile.read.reader.series.EmptyFileSeriesReader;
Expand All @@ -56,6 +57,11 @@ public TsFileExecutor(IMetadataQuerier metadataQuerier, IChunkLoader chunkLoader

@Override
public QueryDataSet execute(QueryExpression queryExpression) throws IOException {
return execute(queryExpression, true);
}

public QueryDataSet execute(QueryExpression queryExpression, boolean isAlignedQuery)
throws IOException {
// bloom filter
BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter();
List<Path> filteredSeriesPath = new ArrayList<>();
Expand All @@ -79,7 +85,9 @@ public QueryDataSet execute(QueryExpression queryExpression) throws IOException

if (regularIExpression instanceof GlobalTimeExpression) {
return execute(
queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression);
queryExpression.getSelectedSeries(),
(GlobalTimeExpression) regularIExpression,
isAlignedQuery);
} else {
return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader)
.execute(queryExpression);
Expand All @@ -89,7 +97,7 @@ public QueryDataSet execute(QueryExpression queryExpression) throws IOException
}
} else {
try {
return execute(queryExpression.getSelectedSeries());
return execute(queryExpression.getSelectedSeries(), isAlignedQuery);
} catch (NoMeasurementException e) {
throw new IOException(e);
}
Expand Down Expand Up @@ -148,9 +156,9 @@ public QueryDataSet execute(
* @param selectedPathList all selected paths
* @return DataSet without TimeGenerator
*/
private QueryDataSet execute(List<Path> selectedPathList)
private QueryDataSet execute(List<Path> selectedPathList, boolean isAlignedQuery)
throws IOException, NoMeasurementException {
return executeMayAttachTimeFiler(selectedPathList, null);
return executeMayAttachTimeFiler(selectedPathList, null, isAlignedQuery);
}

/**
Expand All @@ -160,9 +168,10 @@ private QueryDataSet execute(List<Path> selectedPathList)
* @param timeFilter GlobalTimeExpression that takes effect to all selected paths
* @return DataSet without TimeGenerator
*/
private QueryDataSet execute(List<Path> selectedPathList, GlobalTimeExpression timeFilter)
private QueryDataSet execute(
List<Path> selectedPathList, GlobalTimeExpression timeFilter, boolean isAlignedQuery)
throws IOException, NoMeasurementException {
return executeMayAttachTimeFiler(selectedPathList, timeFilter);
return executeMayAttachTimeFiler(selectedPathList, timeFilter, isAlignedQuery);
}

/**
Expand All @@ -171,7 +180,7 @@ private QueryDataSet execute(List<Path> selectedPathList, GlobalTimeExpression t
* @return DataSetWithoutTimeGenerator
*/
private QueryDataSet executeMayAttachTimeFiler(
List<Path> selectedPathList, GlobalTimeExpression timeExpression)
List<Path> selectedPathList, GlobalTimeExpression timeExpression, boolean isAlignedQuery)
throws IOException, NoMeasurementException {
List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
Expand All @@ -193,6 +202,9 @@ private QueryDataSet executeMayAttachTimeFiler(
}
readersOfSelectedSeries.add(seriesReader);
}
return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries);
return isAlignedQuery
? new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries)
: new NonAlignedDataSetWithoutTimeGenerator(
selectedPathList, dataTypes, readersOfSelectedSeries);
}
}

0 comments on commit 3984d04

Please sign in to comment.