Skip to content

Commit

Permalink
add it
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei committed Jan 10, 2025
1 parent 83e57fb commit 98e86c3
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,27 @@
* under the License.
*/

package org.apache.iotdb.relational.it.db.it;
package org.apache.iotdb.db.it;

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.SealedTsFileRecoverPerformer;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.v4.ITsFileWriter;
import org.apache.tsfile.write.v4.TsFileWriterBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -53,7 +52,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;

Expand All @@ -80,24 +78,40 @@ public void tearDown() throws Exception {
}
}

private void generateFile() throws IOException, WriteProcessException, DataRegionException {
private void generateFileWithNewModFile()
throws IOException, WriteProcessException, IllegalPathException, DataRegionException {
TsFileResource resource = generateFile();
// write mods file
resource
.getExclusiveModFile()
.write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1"), 1, 2));
resource.getExclusiveModFile().close();
}

private void generateFileWithOldModFile()
throws IOException, DataRegionException, WriteProcessException, IllegalPathException {
TsFileResource resource = generateFile();
ModificationFileV1 oldModFile = ModificationFileV1.getNormalMods(resource);
oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.s1"), Long.MAX_VALUE, 1, 2));
oldModFile.close();
}

private TsFileResource generateFile()
throws WriteProcessException, IOException, DataRegionException {
File tsfile = new File(tmpDir, "1-1-0-0.tsfile");
TableSchema tableSchema =
new TableSchema(
"t1",
Arrays.asList(new MeasurementSchema("s1", TSDataType.BOOLEAN)),
Arrays.asList(Tablet.ColumnCategory.FIELD));
// generate tsfile
try (ITsFileWriter writer =
new TsFileWriterBuilder().file(tsfile).tableSchema(tableSchema).build()) {
try (TsFileWriter writer = new TsFileWriter(tsfile)) {
writer.registerAlignedTimeseries(
"root.test.d1",
Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN)));
Tablet tablet =
new Tablet(
Collections.singletonList("s1"), Collections.singletonList(TSDataType.BOOLEAN));
"root.test.d1",
Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN)));
for (int i = 0; i < 5; i++) {
tablet.addTimestamp(i, i);
tablet.addValue(i, 0, true);
}
writer.write(tablet);
writer.writeTree(tablet);
}
// generate resource file
TsFileResource resource = new TsFileResource(tsfile);
Expand All @@ -106,33 +120,47 @@ private void generateFile() throws IOException, WriteProcessException, DataRegio
}
resource.setStatusForTest(TsFileResourceStatus.NORMAL);
resource.deserialize();
// write mods file
resource
.getExclusiveModFile()
.write(new TableDeletionEntry(new DeletionPredicate("t1"), new TimeRange(1, 2)));
resource.getExclusiveModFile().close();
return resource;
}

@Test
public void test()
throws IoTDBConnectionException,
SQLException,
public void testWithNewModFile()
throws SQLException,
IOException,
DataRegionException,
WriteProcessException {
generateFile();
try (final Connection connection = EnvFactory.getEnv().getTableConnection();
WriteProcessException,
IllegalPathException {
generateFileWithNewModFile();
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(
String.format("load \'%s\' with ('database-name'='db1')", tmpDir.getAbsolutePath()));
statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath()));

statement.execute("use db1");
try (final ResultSet resultSet = statement.executeQuery("select s1 from t1")) {
Assert.assertTrue(resultSet.next());
try (final ResultSet resultSet =
statement.executeQuery("select count(s1) as c from root.test.d1")) {
Assert.assertTrue(resultSet.next());
Assert.assertEquals(3, resultSet.getLong("c"));
}
}
}

@Test
public void testWithOldModFile()
throws SQLException,
IOException,
DataRegionException,
WriteProcessException,
IllegalPathException {
generateFileWithOldModFile();
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(s1) as c from root.test.d1")) {
Assert.assertTrue(resultSet.next());
Assert.assertFalse(resultSet.next());
Assert.assertEquals(3, resultSet.getLong("c"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3114,10 +3114,19 @@ private void loadModFile(
final File newTargetModFile = ModificationFile.getExclusiveMods(targetTsFile);
final File oldModFile = ModificationFileV1.getNormalMods(tsFileToLoad);
moveModFile(newModFileToLoad, newTargetModFile, deleteOriginFile);
ModificationFile resourceExclusiveModFile = tsFileResource.getExclusiveModFile();
if (resourceExclusiveModFile.getFile().getParentFile().equals(targetTsFile.getParentFile())) {
// the mod file is correct
return;
}
// the mod file refer to the source file of LOAD
try {
if (oldModFile.exists()) {
// remove the temporary v2 mod file in load dir
tsFileResource.removeModFile();
} else {
resourceExclusiveModFile.removeFromFileMetrics();
resourceExclusiveModFile.close();
}
tsFileResource.setExclusiveModFile(ModificationFile.getExclusiveMods(tsFileResource));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,19 @@ public boolean exists() {

public void remove() throws IOException {
close();
removeFromFileMetrics();
FileUtils.deleteFileOrDirectory(file);
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(getFileLength());
fileExists = false;
}

// the modification may be source file of LOAD
public void removeFromFileMetrics() {
if (fileExists) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(getFileLength());
}
}

public static ModificationFile getExclusiveMods(TsFileResource tsFileResource) {
String tsFilePath = tsFileResource.getTsFilePath();
// replace the temp suffix with the final name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ public void upgradeModFile(ExecutorService upgradeModFileThreadPool, boolean rem

if (upgradeModFileThreadPool != null) {
exclusiveModFileFuture =
upgradeModFileThreadPool.submit(() -> doUpgradeModFile(oldModFile, remove()));
upgradeModFileThreadPool.submit(() -> doUpgradeModFile(oldModFile, removeOldModFile));
} else {
exclusiveModFileFuture =
CompletableFuture.completedFuture(doUpgradeModFile(oldModFile, removeOldModFile));
Expand Down

0 comments on commit 98e86c3

Please sign in to comment.