From 4fbc7fcf9f6e0a0ca0afaa4f595cc03572f9ab3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:18:13 +0800 Subject: [PATCH] Fix and add test --- .../writer/FlushChunkMetadataListener.java | 6 +-- .../tsfile/write/writer/TsFileIOWriter.java | 36 +++++++++++++--- .../TsFileIOWriterMemoryControlTest.java | 43 +++++++++++++++++++ 3 files changed, 75 insertions(+), 10 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java index 2937763e0..237cb1913 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java @@ -20,7 +20,7 @@ package org.apache.tsfile.write.writer; import org.apache.tsfile.file.metadata.IChunkMetadata; -import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import java.util.List; @@ -28,6 +28,6 @@ @FunctionalInterface public interface FlushChunkMetadataListener { - // measurement id -> chunk metadata list - void onFlush(List>> sortedChunkMetadataList); + // Pair -> chunk metadata list + void onFlush(List, List>> sortedChunkMetadataList); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 66e432000..54244ed08 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -66,7 +66,6 @@ import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; -import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue; import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex; @@ -132,7 +131,7 @@ public class TsFileIOWriter implements AutoCloseable { protected String encryptKey; - private final List flushListeners = new CopyOnWriteArrayList<>(); + private final List flushListeners = new ArrayList<>(); /** empty construct function. */ protected TsFileIOWriter() { @@ -672,6 +671,19 @@ public void setMaxPlanIndex(long maxPlanIndex) { this.maxPlanIndex = maxPlanIndex; } + public long getMaxMetadataSize() { + return maxMetadataSize; + } + + /** + * Set the max memory size of chunk metadata. Note that the new size may be larger than current + * chunk metadata size, so caller would better call {@link #checkMetadataSizeAndMayFlush()} after + * this to avoid violating memory control. + */ + public void setMaxMetadataSize(long maxMetadataSize) { + this.maxMetadataSize = maxMetadataSize; + } + /** * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the * chunk metadata will be written to a temp files. Notice! If you are writing a aligned device @@ -711,29 +723,39 @@ public int checkMetadataSizeAndMayFlush() throws IOException { protected int sortAndFlushChunkMetadata() throws IOException { int writtenSize = 0; // group by series - List>> sortedChunkMetadataList = + final List>> sortedChunkMetadataList = TSMIterator.sortChunkMetadata( chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList); if (tempOutput == null) { tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile)); } hasChunkMetadataInDisk = true; + + // This list is the same as sortedChunkMetadataList, but Path is replaced by Pair + final List, List>> + sortedChunkMetadataListForCallBack = new ArrayList<>(); + for (Pair> pair : sortedChunkMetadataList) { - Path seriesPath = pair.left; - boolean isNewPath = !seriesPath.equals(lastSerializePath); + final Path seriesPath = pair.left; + final boolean isNewPath = !seriesPath.equals(lastSerializePath); if (isNewPath) { // record the count of path to construct bloom filter later pathCount++; } - List iChunkMetadataList = pair.right; + final List iChunkMetadataList = pair.right; writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); lastSerializePath = seriesPath; + sortedChunkMetadataListForCallBack.add( + new Pair<>( + new Pair<>(seriesPath.getIDeviceID(), seriesPath.getMeasurement()), + iChunkMetadataList)); logger.debug("Flushing {}", seriesPath); } // notify the listeners for (final FlushChunkMetadataListener listener : flushListeners) { - listener.onFlush(sortedChunkMetadataList); + listener.onFlush(sortedChunkMetadataListForCallBack); } // clear the cache metadata to release the memory diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java index bb47294e5..77ffdab0c 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class TsFileIOWriterMemoryControlTest { private static File testFile = new File("target", "1-1-0-0.tsfile"); @@ -248,6 +249,48 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { } } + /** The following test is for calling listeners after flushing chunk metadata. */ + @Test + public void testFlushChunkMetadataListener() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { + final AtomicInteger cnt1 = new AtomicInteger(0); + final AtomicInteger cnt2 = new AtomicInteger(0); + writer.addFlushListener(sortedChunkMetadataList -> cnt1.incrementAndGet()); + writer.addFlushListener(sortedChunkMetadataList -> cnt2.incrementAndGet()); + List originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + IDeviceID deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateBooleanData(1, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer); + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, + writer.chunkGroupMetadataList, + writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair timeseriesMetadataPair = iterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId()); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + Assert.assertEquals(1, cnt1.get()); + Assert.assertEquals(1, cnt2.get()); + } + } + /** The following tests is for writing normal series in different nums. */ /**