Skip to content

Commit

Permalink
Implement schema quota for activation in table model
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Nov 18, 2024
1 parent f9f2385 commit 5fd2c9c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1139,15 +1139,16 @@ public Pair<Long, Long> getSchemaQuotaRemain() {
}
}

public void updateTimeSeriesUsage(Map<Integer, Long> seriesUsage) {
public void updateTimeSeriesUsage(final Map<Integer, Long> seriesUsage) {
schemaQuotaStatistics.updateTimeSeriesUsage(seriesUsage);
}

public void updateDeviceUsage(Map<Integer, Long> deviceUsage) {
public void updateDeviceUsage(final Map<Integer, Long> deviceUsage) {
schemaQuotaStatistics.updateDeviceUsage(deviceUsage);
}

public void updateSchemaQuotaConfiguration(long seriesThreshold, long deviceThreshold) {
public void updateSchemaQuotaConfiguration(
final long seriesThreshold, final long deviceThreshold) {
schemaQuotaStatistics.setDeviceThreshold(deviceThreshold);
schemaQuotaStatistics.setSeriesThreshold(seriesThreshold);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand All @@ -43,6 +45,7 @@
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionParams;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionParams;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
Expand All @@ -56,6 +59,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -359,8 +363,8 @@ public int getSchemaRegionNumber() {
return schemaRegionMap == null ? 0 : schemaRegionMap.size();
}

public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer> schemaIds) {
Map<Integer, Long> deviceNum = new HashMap<>();
public Map<Integer, Long> countDeviceNumBySchemaRegion(final List<Integer> schemaIds) {
final Map<Integer, Long> deviceNum = new HashMap<>();

schemaRegionMap.entrySet().stream()
.filter(
Expand All @@ -375,8 +379,8 @@ public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer> schemaIds)
return deviceNum;
}

public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer> schemaIds) {
Map<Integer, Long> timeSeriesNum = new HashMap<>();
public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(final List<Integer> schemaIds) {
final Map<Integer, Long> timeSeriesNum = new HashMap<>();
schemaRegionMap.entrySet().stream()
.filter(
entry ->
Expand All @@ -386,7 +390,26 @@ public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer> schemaI
entry ->
timeSeriesNum.put(
entry.getKey().getId(),
entry.getValue().getSchemaRegionStatistics().getSeriesNumber(false)));
entry.getValue().getSchemaRegionStatistics().getSeriesNumber(false)
+ entry
.getValue()
.getSchemaRegionStatistics()
.getTable2DevicesNumMap()
.entrySet()
.stream()
.map(
tableEntry -> {
final TsTable table =
DataNodeTableCache.getInstance()
.getTable(
PathUtils.unQualifyDatabaseName(
entry.getValue().getDatabaseFullPath()),
tableEntry.getKey());
return Objects.nonNull(table)
? table.getMeasurementNum() * tableEntry.getValue()
: 0;
})
.reduce(0L, Long::sum)));
return timeSeriesNum;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.schemaengine.rescon;

import java.util.Map;

public interface ISchemaRegionStatistics {

boolean isAllowToCreateNewSeries();
Expand All @@ -27,10 +29,12 @@ public interface ISchemaRegionStatistics {

int getSchemaRegionId();

long getSeriesNumber(boolean includeView);
long getSeriesNumber(final boolean includeView);

long getDevicesNumber();

Map<String, Long> getTable2DevicesNumMap();

long getTableDevicesNumber(final String table);

int getTemplateActivatedNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -97,6 +98,11 @@ public long getDevicesNumber() {
return devicesNumber.get();
}

@Override
public Map<String, Long> getTable2DevicesNumMap() {
return tableDeviceNumber;
}

@Override
public long getTableDevicesNumber(final String table) {
final Long deviceNumber = tableDeviceNumber.get(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class TsTable {
private Map<String, String> props = null;

private transient int idNums = 0;
private transient int measurementNum = 0;

public TsTable(final String tableName) {
this.tableName = tableName;
Expand Down Expand Up @@ -112,6 +113,8 @@ public void addColumnSchema(final TsTableColumnSchema columnSchema) {
if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.ID)) {
idNums++;
idColumnIndexMap.put(columnSchema.getColumnName(), idNums - 1);
} else if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.MEASUREMENT)) {
measurementNum++;
}
} finally {
readWriteLock.writeLock().unlock();
Expand Down Expand Up @@ -140,6 +143,9 @@ public void removeColumnSchema(final String columnName) {
throw new SchemaExecutionException("Cannot remove an id column: " + columnName);
} else if (columnSchema != null) {
columnSchemaMap.remove(columnName);
if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.MEASUREMENT)) {
measurementNum--;
}
}
} finally {
readWriteLock.writeLock().unlock();
Expand All @@ -164,6 +170,15 @@ public int getIdNums() {
}
}

public int getMeasurementNum() {
readWriteLock.readLock().lock();
try {
return measurementNum;
} finally {
readWriteLock.readLock().unlock();
}
}

public List<TsTableColumnSchema> getColumnList() {
readWriteLock.readLock().lock();
try {
Expand Down

0 comments on commit 5fd2c9c

Please sign in to comment.