Skip to content

Commit

Permalink
Add FlushChunkMetadataListener (#328)
Browse files Browse the repository at this point in the history
* Add FlushChunkMetadataListener

* Fix and add test
  • Loading branch information
DanielWang2035 committed Dec 13, 2024
1 parent 0bc238b commit 292caee
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.write.writer;

import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;

import java.util.List;

@FunctionalInterface
public interface FlushChunkMetadataListener {

// Pair<device id, measurement id> -> chunk metadata list
void onFlush(List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>> sortedChunkMetadataList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public class TsFileIOWriter implements AutoCloseable {
private volatile int chunkMetadataCount = 0;
public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";

private final List<FlushChunkMetadataListener> flushListeners = new ArrayList<>();

/** empty construct function. */
protected TsFileIOWriter() {}

Expand Down Expand Up @@ -156,6 +158,10 @@ public TsFileIOWriter(File file, long maxMetadataSize) throws IOException {
chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
}

public void addFlushListener(FlushChunkMetadataListener listener) {
flushListeners.add(listener);
}

/**
* Writes given bytes to output stream. This method is called when total memory size exceeds the
* chunk group size threshold.
Expand Down Expand Up @@ -580,6 +586,19 @@ public void setMaxPlanIndex(long maxPlanIndex) {
this.maxPlanIndex = maxPlanIndex;
}

public long getMaxMetadataSize() {
return maxMetadataSize;
}

/**
* Set the max memory size of chunk metadata. Note that the new size may be larger than current
* chunk metadata size, so caller would better call {@link #checkMetadataSizeAndMayFlush()} after
* this to avoid violating memory control.
*/
public void setMaxMetadataSize(long maxMetadataSize) {
this.maxMetadataSize = maxMetadataSize;
}

/**
* Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
* chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
Expand Down Expand Up @@ -619,25 +638,41 @@ public int checkMetadataSizeAndMayFlush() throws IOException {
protected int sortAndFlushChunkMetadata() throws IOException {
int writtenSize = 0;
// group by series
List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
final List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
TSMIterator.sortChunkMetadata(
chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
if (tempOutput == null) {
tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
}
hasChunkMetadataInDisk = true;

// This list is the same as sortedChunkMetadataList, but Path is replaced by Pair<IDeviceID,
// String>
final List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
sortedChunkMetadataListForCallBack = new ArrayList<>();

for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
Path seriesPath = pair.left;
boolean isNewPath = !seriesPath.equals(lastSerializePath);
final Path seriesPath = pair.left;
final boolean isNewPath = !seriesPath.equals(lastSerializePath);
if (isNewPath) {
// record the count of path to construct bloom filter later
pathCount++;
}
List<IChunkMetadata> iChunkMetadataList = pair.right;
final List<IChunkMetadata> iChunkMetadataList = pair.right;
writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
lastSerializePath = seriesPath;
sortedChunkMetadataListForCallBack.add(
new Pair<>(
new Pair<>(seriesPath.getIDeviceID(), seriesPath.getMeasurement()),
iChunkMetadataList));
logger.debug("Flushing {}", seriesPath);
}

// notify the listeners
for (final FlushChunkMetadataListener listener : flushListeners) {
listener.onFlush(sortedChunkMetadataListForCallBack);
}

// clear the cache metadata to release the memory
chunkGroupMetadataList.clear();
if (chunkMetadataList != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class TsFileIOWriterMemoryControlTest {
private static File testFile = new File("target", "1-1-0-0.tsfile");
Expand Down Expand Up @@ -249,6 +250,48 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
}
}

/** The following test is for calling listeners after flushing chunk metadata. */
@Test
public void testFlushChunkMetadataListener() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
final AtomicInteger cnt1 = new AtomicInteger(0);
final AtomicInteger cnt2 = new AtomicInteger(0);
writer.addFlushListener(sortedChunkMetadataList -> cnt1.incrementAndGet());
writer.addFlushListener(sortedChunkMetadataList -> cnt2.incrementAndGet());
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
IDeviceID deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateBooleanData(1, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer);
generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer);
originChunkMetadataList.addAll(writer.chunkMetadataList);
writer.endChunkGroup();
}
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();

TSMIterator iterator =
TSMIterator.getTSMIteratorInDisk(
writer.chunkMetadataTempFile,
writer.chunkGroupMetadataList,
writer.endPosInCMTForDevice);
for (int i = 0; iterator.hasNext(); ++i) {
Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
Assert.assertEquals(
originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType());
Assert.assertEquals(
originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
}
Assert.assertEquals(1, cnt1.get());
Assert.assertEquals(1, cnt2.get());
}
}

/** The following tests is for writing normal series in different nums. */

/**
Expand Down

0 comments on commit 292caee

Please sign in to comment.