Skip to content

Commit

Permalink
Fix and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielWang2035 committed Dec 10, 2024
1 parent 37b35ad commit 4fbc7fc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package org.apache.tsfile.write.writer;

import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;

import java.util.List;

@FunctionalInterface
public interface FlushChunkMetadataListener {

// measurement id -> chunk metadata list
void onFlush(List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList);
// Pair<device id, measurement id> -> chunk metadata list
void onFlush(List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>> sortedChunkMetadataList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
Expand Down Expand Up @@ -132,7 +131,7 @@ public class TsFileIOWriter implements AutoCloseable {

protected String encryptKey;

private final List<FlushChunkMetadataListener> flushListeners = new CopyOnWriteArrayList<>();
private final List<FlushChunkMetadataListener> flushListeners = new ArrayList<>();

/** empty construct function. */
protected TsFileIOWriter() {
Expand Down Expand Up @@ -672,6 +671,19 @@ public void setMaxPlanIndex(long maxPlanIndex) {
this.maxPlanIndex = maxPlanIndex;
}

public long getMaxMetadataSize() {
return maxMetadataSize;
}

/**
* Set the max memory size of chunk metadata. Note that the new size may be larger than current
* chunk metadata size, so caller would better call {@link #checkMetadataSizeAndMayFlush()} after
* this to avoid violating memory control.
*/
public void setMaxMetadataSize(long maxMetadataSize) {
this.maxMetadataSize = maxMetadataSize;
}

/**
* Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
* chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
Expand Down Expand Up @@ -711,29 +723,39 @@ public int checkMetadataSizeAndMayFlush() throws IOException {
protected int sortAndFlushChunkMetadata() throws IOException {
int writtenSize = 0;
// group by series
List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
final List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
TSMIterator.sortChunkMetadata(
chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
if (tempOutput == null) {
tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
}
hasChunkMetadataInDisk = true;

// This list is the same as sortedChunkMetadataList, but Path is replaced by Pair<IDeviceID,
// String>
final List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
sortedChunkMetadataListForCallBack = new ArrayList<>();

for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
Path seriesPath = pair.left;
boolean isNewPath = !seriesPath.equals(lastSerializePath);
final Path seriesPath = pair.left;
final boolean isNewPath = !seriesPath.equals(lastSerializePath);
if (isNewPath) {
// record the count of path to construct bloom filter later
pathCount++;
}
List<IChunkMetadata> iChunkMetadataList = pair.right;
final List<IChunkMetadata> iChunkMetadataList = pair.right;
writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
lastSerializePath = seriesPath;
sortedChunkMetadataListForCallBack.add(
new Pair<>(
new Pair<>(seriesPath.getIDeviceID(), seriesPath.getMeasurement()),
iChunkMetadataList));
logger.debug("Flushing {}", seriesPath);
}

// notify the listeners
for (final FlushChunkMetadataListener listener : flushListeners) {
listener.onFlush(sortedChunkMetadataList);
listener.onFlush(sortedChunkMetadataListForCallBack);
}

// clear the cache metadata to release the memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class TsFileIOWriterMemoryControlTest {
private static File testFile = new File("target", "1-1-0-0.tsfile");
Expand Down Expand Up @@ -248,6 +249,48 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
}
}

/** The following test is for calling listeners after flushing chunk metadata. */
@Test
public void testFlushChunkMetadataListener() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
final AtomicInteger cnt1 = new AtomicInteger(0);
final AtomicInteger cnt2 = new AtomicInteger(0);
writer.addFlushListener(sortedChunkMetadataList -> cnt1.incrementAndGet());
writer.addFlushListener(sortedChunkMetadataList -> cnt2.incrementAndGet());
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateBooleanData(1, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer);
originChunkMetadataList.addAll(writer.chunkMetadataList);
writer.endChunkGroup();
}
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();

TSMIterator iterator =
TSMIterator.getTSMIteratorInDisk(
writer.chunkMetadataTempFile,
writer.chunkGroupMetadataList,
writer.endPosInCMTForDevice);
for (int i = 0; iterator.hasNext(); ++i) {
Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
Assert.assertEquals(
originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType());
Assert.assertEquals(
originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
}
Assert.assertEquals(1, cnt1.get());
Assert.assertEquals(1, cnt2.get());
}
}

/** The following tests is for writing normal series in different nums. */

/**
Expand Down

0 comments on commit 4fbc7fc

Please sign in to comment.