Skip to content

Commit

Permalink
Implement last cache for table model & Fixed the bug that the table c…
Browse files Browse the repository at this point in the history
…onflict detection when creating timeseries in tree model does not take effect
  • Loading branch information
Caideyipi authored Oct 15, 2024
1 parent f401e9a commit e022f22
Show file tree
Hide file tree
Showing 55 changed files with 2,823 additions and 2,700 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class TestConstant {
public static boolean testFlag = true;
public static String[] stringValue = new String[] {"A", "B", "C", "D", "E"};
public static String[] booleanValue = new String[] {"true", "false"};
public static final String TIMESEIRES_STR = ColumnHeaderConstant.TIMESERIES;
public static final String TIMESERIES_STR = ColumnHeaderConstant.TIMESERIES;
public static final String VALUE_STR = ColumnHeaderConstant.VALUE;
public static final String DATA_TYPE_STR = ColumnHeaderConstant.DATATYPE;
public static final String FUNCTION_TYPE_NATIVE = "native";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Set;

import static org.apache.iotdb.itbase.constant.TestConstant.DATA_TYPE_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESEIRES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESERIES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.VALUE_STR;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -136,7 +136,7 @@ public void selectAllAlignedAndNonAlignedLastTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -171,7 +171,7 @@ public void selectAllAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -208,7 +208,7 @@ public void selectSomeAlignedLastTest1() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -241,7 +241,7 @@ public void selectSomeAlignedLastTest2() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -275,7 +275,7 @@ public void selectSomeAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -313,7 +313,7 @@ public void selectSomeAlignedAndNonAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -397,13 +397,14 @@ public void testNullInMemtable() {
Statement statement = connection.createStatement()) {

try (ResultSet resultSet =
statement.executeQuery("select last gongnengma,wenben from root.ln_1.tb_6141;")) {
statement.executeQuery(
"select last gongnengma,wenben from root.ln_1.tb_6141 order by timeseries asc;")) {
int cnt = 0;
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -431,13 +432,13 @@ public void testNullInMemtable() {

try (ResultSet resultSet =
statement.executeQuery(
"select last gongnengma,mochanshuizhuangtai,wenben from root.ln_1.tb_6141;")) {
"select last gongnengma,mochanshuizhuangtai,wenben from root.ln_1.tb_6141 order by timeseries asc;")) {
int cnt = 0;
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Set;

import static org.apache.iotdb.itbase.constant.TestConstant.DATA_TYPE_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESEIRES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESERIES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.VALUE_STR;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void selectAllAlignedLastTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -137,7 +137,7 @@ public void selectAllAlignedAndNonAlignedLastTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -169,7 +169,7 @@ public void selectAllAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -203,7 +203,7 @@ public void selectSomeAlignedLastTest1() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -234,7 +234,7 @@ public void selectSomeAlignedLastTest2() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -267,7 +267,7 @@ public void selectSomeAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -303,7 +303,7 @@ public void selectSomeAlignedAndNonAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Set;

import static org.apache.iotdb.itbase.constant.TestConstant.DATA_TYPE_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESEIRES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESERIES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.VALUE_STR;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -90,7 +90,7 @@ public void selectAllAlignedLastTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -134,7 +134,7 @@ public void selectAllAlignedAndNonAlignedLastTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -169,7 +169,7 @@ public void selectAllAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -206,7 +206,7 @@ public void selectSomeAlignedLastTest1() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -239,7 +239,7 @@ public void selectSomeAlignedLastTest2() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -273,7 +273,7 @@ public void selectSomeAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down Expand Up @@ -310,7 +310,7 @@ public void selectSomeAlignedAndNonAlignedLastWithTimeFilterTest() {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(TIMESEIRES_STR)
+ resultSet.getString(TIMESERIES_STR)
+ ","
+ resultSet.getString(VALUE_STR)
+ ","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
import static org.apache.iotdb.itbase.constant.TestConstant.DATA_TYPE_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESEIRES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESERIES_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.itbase.constant.TestConstant.VALUE_STR;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -102,7 +102,7 @@ public static void tearDown() throws Exception {
@Test
public void testLastQuery() {
String[] expectedHeader =
new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
String[] retArray =
new String[] {
"1679365910000,root.ln_1.tb_6141.11_TEXT,13,TEXT,",
Expand All @@ -126,7 +126,7 @@ public void testLastQuery() {
@Test
public void testLastQueryOrderByTimeDesc() {
String[] expectedHeader =
new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
String[] retArray =
new String[] {
"1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
Expand All @@ -150,7 +150,7 @@ public void testLastQueryOrderByTimeDesc() {
@Test
public void testLastQuery1() {
String[] expectedHeader =
new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
String[] retArray =
new String[] {
"1679365910000,root.sg.`NH4-N_DOUBLE`,12.0,DOUBLE,",
Expand All @@ -169,7 +169,7 @@ public void cacheHitTest() {
@Test
public void testLastQuerySortWithLimit() {
String[] expectedHeader =
new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
String[] retArray =
new String[] {
"1679477545000,root.ln_1.tb_6141.code_DOUBLE,2.0,DOUBLE,",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
Expand All @@ -90,6 +89,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.ParsingException;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
Expand Down Expand Up @@ -250,7 +250,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {

private final TsBlockSerde serde = new TsBlockSerde();

private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance();
private final TreeDeviceSchemaCacheManager DATA_NODE_SCHEMA_CACHE =
TreeDeviceSchemaCacheManager.getInstance();

public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
Expand Down Expand Up @@ -137,7 +136,8 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
Expand Down Expand Up @@ -513,19 +513,18 @@ public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaCache.getInstance().takeWriteLock();
TreeDeviceSchemaCacheManager.getInstance().takeWriteLock();
try {
// req.getFullPath() is a database path
DataNodeSchemaCache.getInstance().invalidate(req.getFullPath());
ClusterTemplateManager.getInstance().invalid(req.getFullPath());
// clear table related cache
String database = req.getFullPath().substring(5);
DataNodeTableCache.getInstance().invalid(database);
TableDeviceSchemaFetcher.getInstance().getTableDeviceCache().invalidate(database);
TableDeviceSchemaCache.getInstance().invalidate(database);
LOGGER.info("Schema cache of {} has been invalidated", req.getFullPath());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
DataNodeSchemaCache.getInstance().releaseWriteLock();
TreeDeviceSchemaCacheManager.getInstance().releaseWriteLock();
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
}
}
Expand Down Expand Up @@ -592,7 +591,7 @@ public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) {

@Override
public TSStatus invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req) {
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
TreeDeviceSchemaCacheManager cache = TreeDeviceSchemaCacheManager.getInstance();
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
cache.takeWriteLock();
try {
Expand Down Expand Up @@ -2409,11 +2408,11 @@ public TSStatus cleanDataNodeCache(TCleanDataNodeCacheReq req) {
status.setMessage("disable datanode succeed");
// TODO what need to clean?
ClusterPartitionFetcher.getInstance().invalidAllCache();
DataNodeSchemaCache.getInstance().takeWriteLock();
TreeDeviceSchemaCacheManager.getInstance().takeWriteLock();
try {
DataNodeSchemaCache.getInstance().cleanUp();
TreeDeviceSchemaCacheManager.getInstance().cleanUp();
} finally {
DataNodeSchemaCache.getInstance().releaseWriteLock();
TreeDeviceSchemaCacheManager.getInstance().releaseWriteLock();
}
DataNodeDevicePathCache.getInstance().cleanUp();
return status;
Expand Down
Loading

0 comments on commit e022f22

Please sign in to comment.