Skip to content

Commit

Permalink
Add IMeasurementSchema list conversion methods (#303)
Browse files Browse the repository at this point in the history
* add IMeasurementSchema list conversion methods

* modify usage

* modify TsFileWriter

* modify exception

* copy schema in Tablet constructor

* make Tablet.rowSize access to private
  • Loading branch information
shuwenwei authored Nov 19, 2024
1 parent 92d39d5 commit d30d283
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 87 deletions.
11 changes: 3 additions & 8 deletions java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class TsFileTool {
private static int THREAD_COUNT = 8;
Expand Down Expand Up @@ -194,12 +193,8 @@ private static Tablet genTablet(
Tablet tablet =
new Tablet(
tableSchema.getTableName(),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList()),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getType)
.collect(Collectors.toList()),
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
tableSchema.getColumnTypes(),
num);

Expand Down Expand Up @@ -237,7 +232,7 @@ private static Tablet genTablet(
}
}
}
tablet.rowSize = num;
tablet.setRowSize(num);
return tablet;
} catch (Exception e) {
LOGGER.error("Failed to parse csv file", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ public static void writeWithTablet(
long sensorNum = schemas.size();

for (long r = 0; r < rowNum; r++, startValue++) {
int row = tablet.rowSize++;
timestamps[row] = startTime++;
int row = tablet.getRowSize();
tablet.addTimestamp(row, startTime++);
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = startValue;
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
if (isAligned) {
tsFileWriter.writeAligned(tablet);
} else {
Expand All @@ -139,7 +139,7 @@ public static void writeWithTablet(
}
}
// write
if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
if (isAligned) {
tsFileWriter.writeAligned(tablet);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class WriteUtils {
public static List<Pair<IDeviceID, Integer>> splitTabletByDevice(Tablet tablet) {
List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
IDeviceID lastDeviceID = null;
for (int i = 0; i < tablet.rowSize; i++) {
for (int i = 0; i < tablet.getRowSize(); i++) {
final IDeviceID currDeviceID = tablet.getDeviceID(i);
if (!currDeviceID.equals(lastDeviceID)) {
if (lastDeviceID != null) {
Expand All @@ -45,7 +45,7 @@ public static List<Pair<IDeviceID, Integer>> splitTabletByDevice(Tablet tablet)
lastDeviceID = currDeviceID;
}
}
result.add(new Pair<>(lastDeviceID, tablet.rowSize));
result.add(new Pair<>(lastDeviceID, tablet.getRowSize()));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,11 @@ private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean is
*/
@TsFileApi
public boolean writeRecord(TSRecord record) throws IOException, WriteProcessException {
boolean isAligned = getSchema().getSeriesSchema(record.deviceId).isAligned();
checkIsTimeseriesExist(record, isAligned);
MeasurementGroup measurementGroup = getSchema().getSeriesSchema(record.deviceId);
if (measurementGroup == null) {
throw new NoDeviceException(record.deviceId.toString());
}
checkIsTimeseriesExist(record, measurementGroup.isAligned());
recordCount += groupWriters.get(record.deviceId).write(record.time, record.dataPointList);
return checkMemorySizeAndMayFlushChunks();
}
Expand All @@ -537,16 +540,19 @@ public boolean writeRecord(TSRecord record) throws IOException, WriteProcessExce
*/
@TsFileApi
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId());
MeasurementGroup measurementGroup = getSchema().getSeriesSchema(deviceID);
if (measurementGroup == null) {
throw new NoDeviceException(deviceID.toString());
}
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, false);
checkIsTimeseriesExist(tablet, measurementGroup.isAligned());
// get corresponding ChunkGroupWriter and write this Tablet
recordCount +=
groupWriters
.get(IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()))
.write(tablet);
recordCount += groupWriters.get(deviceID).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}

@Deprecated
public boolean writeAligned(Tablet tablet) throws IOException, WriteProcessException {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public int write(long time, List<DataPoint> data) throws WriteProcessException,

@Override
public int write(Tablet tablet) throws IOException, WriteProcessException {
return write(tablet, 0, tablet.rowSize);
return write(tablet, 0, tablet.getRowSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int write(long time, List<DataPoint> data) throws IOException, WriteProce

@Override
public int write(Tablet tablet) throws IOException, WriteProcessException {
return write(tablet, 0, tablet.rowSize);
return write(tablet, 0, tablet.getRowSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public class Tablet {
*/
private boolean autoUpdateBitMaps = false;

/** The number of rows to include in this {@link Tablet} */
public int rowSize;
private int rowSize;

/** The maximum number of rows for this {@link Tablet} */
private final int maxRowNumber;
Expand All @@ -112,7 +111,7 @@ public Tablet(String deviceId, List<IMeasurementSchema> schemas) {

public Tablet(String deviceId, List<IMeasurementSchema> schemas, int maxRowNumber) {
this.insertTargetName = deviceId;
this.schemas = schemas;
this.schemas = new ArrayList<>(schemas);
setColumnCategories(ColumnCategory.nCopy(ColumnCategory.MEASUREMENT, schemas.size()));
this.maxRowNumber = maxRowNumber;
measurementIndex = new HashMap<>();
Expand Down Expand Up @@ -458,7 +457,7 @@ public int getMaxRowNumber() {

/** Reset Tablet to the default state - set the rowSize to 0 and reset bitMaps */
public void reset() {
rowSize = 0;
this.rowSize = 0;
if (bitMaps != null) {
for (BitMap bitMap : bitMaps) {
if (bitMap == null) {
Expand Down Expand Up @@ -1079,6 +1078,15 @@ public void setColumnCategories(List<ColumnCategory> columnCategories) {
}
}

/** The number of rows to include in this {@link Tablet} */
public int getRowSize() {
return rowSize;
}

public void setRowSize(int rowSize) {
this.rowSize = rowSize;
}

public enum ColumnCategory {
ID,
MEASUREMENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public interface IMeasurementSchema {

Expand Down Expand Up @@ -88,4 +89,14 @@ public interface IMeasurementSchema {
int partialSerializeTo(OutputStream outputStream) throws IOException;

boolean isLogicalView();

static List<String> getMeasurementNameList(List<? extends IMeasurementSchema> schemaList) {
return schemaList.stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList());
}

static List<TSDataType> getDataTypeList(List<? extends IMeasurementSchema> schemaList) {
return schemaList.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.tsfile.read.filter.factory.ValueFilterApi.DEFAULT_MEASUREMENT_INDEX;

Expand Down Expand Up @@ -613,12 +612,8 @@ public void testGetTableDeviceMethods() throws IOException, WriteProcessExceptio
Tablet tablet =
new Tablet(
tableSchema.getTableName(),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList()),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getType)
.collect(Collectors.toList()),
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
tableSchema.getColumnTypes());

String[][] ids =
Expand All @@ -640,7 +635,7 @@ public void testGetTableDeviceMethods() throws IOException, WriteProcessExceptio
new StringArrayDeviceID(tableSchema.getTableName(), ids[i][0], ids[i][1], ids[i][2]));
tablet.addValue("s1", i, i);
}
tablet.rowSize = ids.length;
tablet.setRowSize(ids.length);
writer.writeTable(tablet);
}
try (TsFileReader tsFileReader = new TsFileReader(file)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private void testTable() throws IOException, WriteProcessException, ReadProcessE
startTime = System.nanoTime();
tsFileWriter.writeTable(
tablet,
Collections.singletonList(new Pair<>(tablet.getDeviceID(0), tablet.rowSize)));
Collections.singletonList(new Pair<>(tablet.getDeviceID(0), tablet.getRowSize())));
writeTimeSum += System.nanoTime() - startTime;
}
}
Expand Down Expand Up @@ -282,7 +282,7 @@ private void fillTreeTablet(Tablet tablet, int tableNum, int deviceNum, int tabl
for (int valNum = 0; valNum < pointPerSeries; valNum++) {
tablet.timestamps[valNum] = (long) tabletNum * pointPerSeries + valNum;
}
tablet.rowSize = pointPerSeries;
tablet.setRowSize(pointPerSeries);
}

private Tablet initTableTablet() {
Expand All @@ -293,10 +293,8 @@ private Tablet initTableTablet() {
columnCategories.addAll(ColumnCategory.nCopy(ColumnCategory.MEASUREMENT, measurementSchemaCnt));
return new Tablet(
null,
measurementSchemas.stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList()),
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()),
IMeasurementSchema.getMeasurementNameList(measurementSchemas),
IMeasurementSchema.getDataTypeList(measurementSchemas),
columnCategories,
pointPerSeries);
}
Expand All @@ -319,7 +317,7 @@ private void fillTableTablet(Tablet tablet, int tableNum, int deviceNum, int tab
for (int valNum = 0; valNum < pointPerSeries; valNum++) {
tablet.timestamps[valNum] = (long) tabletNum * pointPerSeries + valNum;
}
tablet.rowSize = pointPerSeries;
tablet.setRowSize(pointPerSeries);
}

private void registerTree(TsFileWriter writer) throws WriteProcessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,8 @@ public void testDeviceIdWithNull() throws Exception {
Tablet tablet =
new Tablet(
tableSchema.getTableName(),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList()),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getType)
.collect(Collectors.toList()),
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
tableSchema.getColumnTypes());

ids =
Expand All @@ -235,7 +231,7 @@ public void testDeviceIdWithNull() throws Exception {
tablet.addValue("id3", i, ids[i][2]);
tablet.addValue("s1", i, i);
}
tablet.rowSize = ids.length;
tablet.setRowSize(ids.length);
writer.writeTable(tablet);
}

Expand Down Expand Up @@ -469,7 +465,7 @@ public void testHybridWrite() throws Exception {

List<Path> selectedSeries = new ArrayList<>();
Set<IDeviceID> deviceIDS = new HashSet<>();
for (int i = 0; i < tablet.rowSize; i++) {
for (int i = 0; i < tablet.getRowSize(); i++) {
final IDeviceID tabletDeviceID = tablet.getDeviceID(i);
if (!deviceIDS.contains(tabletDeviceID)) {
deviceIDS.add(tabletDeviceID);
Expand All @@ -495,12 +491,8 @@ public static Tablet genTablet(TableSchema tableSchema, int offset, int deviceNu
Tablet tablet =
new Tablet(
tableSchema.getTableName(),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList()),
tableSchema.getColumnSchemas().stream()
.map(IMeasurementSchema::getType)
.collect(Collectors.toList()),
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
tableSchema.getColumnTypes());

for (int i = 0; i < deviceNum; i++) {
Expand All @@ -515,7 +507,7 @@ public static Tablet genTablet(TableSchema tableSchema, int offset, int deviceNu
}
}
}
tablet.rowSize = deviceNum * numTimestampPerDevice;
tablet.setRowSize(deviceNum * numTimestampPerDevice);
return tablet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testUsingDefaultSchemaTemplate() throws IOException, WriteProcessExc
schema.put("s2", s2);

writer.registerSchemaTemplate("defaultTemplate", schema, false);
writer.registerDevice("d1", "defaultTemplate");

Tablet tablet = new Tablet("d1", schemaList);
long[] timestamps = tablet.timestamps;
Expand All @@ -72,20 +73,20 @@ public void testUsingDefaultSchemaTemplate() throws IOException, WriteProcessExc
long value = 1L;

for (int r = 0; r < 10; r++, value++) {
int row = tablet.rowSize++;
timestamps[row] = timestamp++;
int row = tablet.getRowSize();
tablet.addTimestamp(row, timestamp++);
for (int i = 0; i < 2; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write Tablet to TsFile
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
writer.write(tablet);
tablet.reset();
}
}
// write Tablet to TsFile
if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
writer.write(tablet);
tablet.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,20 +482,20 @@ private void generateFile(
long timestamp = 1;
long value = 1000000L;
for (int r = 0; r < rowNum; r++, value++) {
int row = tablet.rowSize++;
timestamps[row] = timestamp++;
int row = tablet.getRowSize();
tablet.addTimestamp(row, timestamp++);
for (int j = 0; j < measurementNum; j++) {
long[] sensor = (long[]) values[j];
sensor[row] = value;
}
// write Tablet to TsFile
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
}
// write Tablet to TsFile
if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
Expand Down
Loading

0 comments on commit d30d283

Please sign in to comment.