diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java new file mode 100644 index 000000000..237cb1913 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.writer; + +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Pair; + +import java.util.List; + +@FunctionalInterface +public interface FlushChunkMetadataListener { + + // Pair -> chunk metadata list + void onFlush(List, List>> sortedChunkMetadataList); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index ef9515c57..b7d719726 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -116,6 +116,8 @@ public class TsFileIOWriter implements AutoCloseable { private volatile int chunkMetadataCount = 0; public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta"; + private final List flushListeners = new ArrayList<>(); + /** empty construct function. */ protected TsFileIOWriter() {} @@ -156,6 +158,10 @@ public TsFileIOWriter(File file, long maxMetadataSize) throws IOException { chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); } + public void addFlushListener(FlushChunkMetadataListener listener) { + flushListeners.add(listener); + } + /** * Writes given bytes to output stream. This method is called when total memory size exceeds the * chunk group size threshold. @@ -580,6 +586,19 @@ public void setMaxPlanIndex(long maxPlanIndex) { this.maxPlanIndex = maxPlanIndex; } + public long getMaxMetadataSize() { + return maxMetadataSize; + } + + /** + * Set the max memory size of chunk metadata. Note that the new size may be larger than current + * chunk metadata size, so caller would better call {@link #checkMetadataSizeAndMayFlush()} after + * this to avoid violating memory control. + */ + public void setMaxMetadataSize(long maxMetadataSize) { + this.maxMetadataSize = maxMetadataSize; + } + /** * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the * chunk metadata will be written to a temp files. Notice! If you are writing a aligned device @@ -619,25 +638,41 @@ public int checkMetadataSizeAndMayFlush() throws IOException { protected int sortAndFlushChunkMetadata() throws IOException { int writtenSize = 0; // group by series - List>> sortedChunkMetadataList = + final List>> sortedChunkMetadataList = TSMIterator.sortChunkMetadata( chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList); if (tempOutput == null) { tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile)); } hasChunkMetadataInDisk = true; + + // This list is the same as sortedChunkMetadataList, but Path is replaced by Pair + final List, List>> + sortedChunkMetadataListForCallBack = new ArrayList<>(); + for (Pair> pair : sortedChunkMetadataList) { - Path seriesPath = pair.left; - boolean isNewPath = !seriesPath.equals(lastSerializePath); + final Path seriesPath = pair.left; + final boolean isNewPath = !seriesPath.equals(lastSerializePath); if (isNewPath) { // record the count of path to construct bloom filter later pathCount++; } - List iChunkMetadataList = pair.right; + final List iChunkMetadataList = pair.right; writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); lastSerializePath = seriesPath; + sortedChunkMetadataListForCallBack.add( + new Pair<>( + new Pair<>(seriesPath.getIDeviceID(), seriesPath.getMeasurement()), + iChunkMetadataList)); logger.debug("Flushing {}", seriesPath); } + + // notify the listeners + for (final FlushChunkMetadataListener listener : flushListeners) { + listener.onFlush(sortedChunkMetadataListForCallBack); + } + // clear the cache metadata to release the memory chunkGroupMetadataList.clear(); if (chunkMetadataList != null) { diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java index 4b161ff85..ae8d07af1 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class TsFileIOWriterMemoryControlTest { private static File testFile = new File("target", "1-1-0-0.tsfile"); @@ -249,6 +250,48 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { } } + /** The following test is for calling listeners after flushing chunk metadata. */ + @Test + public void testFlushChunkMetadataListener() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { + final AtomicInteger cnt1 = new AtomicInteger(0); + final AtomicInteger cnt2 = new AtomicInteger(0); + writer.addFlushListener(sortedChunkMetadataList -> cnt1.incrementAndGet()); + writer.addFlushListener(sortedChunkMetadataList -> cnt2.incrementAndGet()); + List originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + IDeviceID deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateBooleanData(1, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer); + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, + writer.chunkGroupMetadataList, + writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair timeseriesMetadataPair = iterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId()); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + Assert.assertEquals(1, cnt1.get()); + Assert.assertEquals(1, cnt2.get()); + } + } + /** The following tests is for writing normal series in different nums. */ /**