Skip to content

Commit

Permalink
add new interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei committed Nov 22, 2024
1 parent dff85f3 commit 62e4a41
Show file tree
Hide file tree
Showing 28 changed files with 869 additions and 861 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void main(String[] args) throws IOException {

// construct TSRecord
for (int i = 0; i < 100; i++) {
TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
TSRecord tsRecord = new TSRecord(Constant.DEVICE_PREFIX + (i % 4), i);
DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
Expand All @@ -74,7 +74,7 @@ public static void main(String[] args) throws IOException {
tsRecord.addTuple(dPoint3);

// write TSRecord
tsFileWriter.write(tsRecord);
tsFileWriter.writeRecord(tsRecord);
}
} catch (Exception e) {
LOGGER.error("meet error in TsFileWrite ", e);
Expand Down Expand Up @@ -106,7 +106,7 @@ private static void write(ForceAppendTsFileWriter fwriter) {
}
// construct TSRecord
for (int i = 100; i < 120; i++) {
TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
TSRecord tsRecord = new TSRecord(Constant.DEVICE_PREFIX + (i % 4), i);
DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
Expand All @@ -115,7 +115,7 @@ private static void write(ForceAppendTsFileWriter fwriter) {
tsRecord.addTuple(dPoint3);

// write TSRecord
tsFileWriter1.write(tsRecord);
tsFileWriter1.writeRecord(tsRecord);
}
} catch (Exception e) {
LOGGER.error("meet error in TsFileWrite ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,22 @@ private static void writeAlignedWithTablet(
long sensorNum = schemas.size();

for (long r = 0; r < rowNum; r++, startValue++) {
int row = tablet.rowSize++;
timestamps[row] = startTime++;
int row = tablet.getRowSize();
tablet.addTimestamp(row, startTime++);
for (int i = 0; i < sensorNum; i++) {
tablet.addValue(
schemas.get(i).getMeasurementId(),
schemas.get(i).getMeasurementName(),
row,
DataGenerator.generate(schemas.get(i).getType(), (int) r));
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
}
// write
if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
Expand All @@ -140,21 +140,21 @@ private static void writeNonAlignedWithTablet(TsFileWriter tsFileWriter)
long timestamp = 1;
long value = 1000000L;
for (int r = 0; r < rowNum; r++, value++) {
int row = tablet.rowSize++;
timestamps[row] = timestamp++;
int row = tablet.getRowSize();
tablet.addTimestamp(row, timestamp++);
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.write(tablet);
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
tsFileWriter.writeTree(tablet);
tablet.reset();
}
}
// write
if (tablet.rowSize != 0) {
tsFileWriter.write(tablet);
if (tablet.getRowSize() != 0) {
tsFileWriter.writeTree(tablet);
tablet.reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.v4;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.ColumnSchemaBuilder;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.query.dataset.ResultSet;
import org.apache.tsfile.read.query.dataset.ResultSetMetadata;
import org.apache.tsfile.read.v4.ITsFileReader;
import org.apache.tsfile.read.v4.TsFileReaderBuilder;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.v4.ITsFileWriter;
import org.apache.tsfile.write.v4.TsFileWriterBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.StringJoiner;

public class ITsFileReaderAndITsFileWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(ITsFileReaderAndITsFileWriter.class);

public static void main(String[] args) throws IOException {
String path = "test.tsfile";
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists()) {
Files.delete(f.toPath());
}

String tableName = "table1";

TableSchema tableSchema =
new TableSchema(
tableName,
Arrays.asList(
new ColumnSchemaBuilder()
.name("id1")
.dataType(TSDataType.STRING)
.category(Tablet.ColumnCategory.ID)
.build(),
new ColumnSchemaBuilder()
.name("id2")
.dataType(TSDataType.STRING)
.category(Tablet.ColumnCategory.ID)
.build(),
new ColumnSchemaBuilder()
.name("s1")
.dataType(TSDataType.INT32)
.category(Tablet.ColumnCategory.MEASUREMENT)
.build(),
new ColumnSchemaBuilder().name("s2").dataType(TSDataType.BOOLEAN).build()));

Tablet tablet =
new Tablet(
Arrays.asList("id1", "id2", "s1", "s2"),
Arrays.asList(
TSDataType.STRING, TSDataType.STRING, TSDataType.INT32, TSDataType.BOOLEAN));
for (int row = 0; row < 5; row++) {
long timestamp = row;
tablet.addTimestamp(row, timestamp);
tablet.addValue(row, "id1", "id1_filed_1");
tablet.addValue(row, "id2", "id2_filed_1");
tablet.addValue(row, "s1", row);
// null value
// tablet.addValue(row, "s2", true);
}
for (int row = 5; row < 10; row++) {
long timestamp = row;
tablet.addTimestamp(row, timestamp);

// id1 column
tablet.addValue(row, 0, "id1_field_2");

// id2 column
tablet.addValue(row, 1, "id1_field_2");

// s1 column: null value
// tablet.addValue(row, 2, row);

// s2 column
tablet.addValue(row, 3, false);
}

long memoryThreshold = 10 * 1024 * 1024;
// tableSchema and file are required. memoryThreshold is an optional parameter, default value is
// 32 * 1024 * 1024 byte.
try (ITsFileWriter writer =
new TsFileWriterBuilder()
.file(f)
.tableSchema(tableSchema)
.memoryThreshold(memoryThreshold)
.build()) {
writer.write(tablet);
} catch (WriteProcessException e) {
LOGGER.error("meet error in TsFileWrite ", e);
}

// file is a required parameter
try (ITsFileReader reader = new TsFileReaderBuilder().file(f).build()) {
ResultSet resultSet = reader.query(tableName, Arrays.asList("id1", "id2", "s1", "s2"), 2, 8);
// first column is Time
ResultSetMetadata metadata = resultSet.getMetadata();
System.out.println(metadata);
while (resultSet.next()) {
StringJoiner sj = new StringJoiner(" ");
for (int column = 1; column <= 5; column++) {
sj.add(metadata.getColumnName(column) + "(" + metadata.getColumnType(column) + ") ");
}
System.out.println(sj.toString());

// columnIndex starts from 1
// Time id1 id2 s1 s2
Long timeField = resultSet.getLong("Time");
String id1Field = resultSet.isNull("id1") ? null : resultSet.getString("id1");
String id2Field = resultSet.isNull("id2") ? null : resultSet.getString("id2");
Integer s1Field = resultSet.isNull("s1") ? null : resultSet.getInt(4);
Boolean s2Field = resultSet.isNull("s2") ? null : resultSet.getBoolean(5);
sj = new StringJoiner(" ");
System.out.println(
sj.add(timeField + "")
.add(id1Field)
.add(id2Field)
.add(s1Field + "")
.add(s2Field + "")
.toString());
}
} catch (Exception e) {
LOGGER.error("meet error in TsFileRead ", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.file.metadata;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.Tablet.ColumnCategory;

public class ColumnSchema {
private String columnName;
private TSDataType dataType;
private ColumnCategory columnCategory;

public ColumnSchema(String columnName, TSDataType dataType, ColumnCategory columnCategory) {
this.columnName = columnName;
this.dataType = dataType;
this.columnCategory = columnCategory;
}

public String getColumnName() {
return columnName;
}

public TSDataType getDataType() {
return dataType;
}

public Tablet.ColumnCategory getColumnCategory() {
return columnCategory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.file.metadata;

import org.apache.tsfile.common.TsFileApi;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet.ColumnCategory;

public class ColumnSchemaBuilder {

private String columnName;
private TSDataType columnDataType;
private ColumnCategory columnCategory = ColumnCategory.MEASUREMENT;

@TsFileApi
public ColumnSchema build() {
validateNameParameters();
return new ColumnSchema(columnName, columnDataType, columnCategory);
}

@TsFileApi
public ColumnSchemaBuilder name(String columnName) {
this.columnName = columnName;
return this;
}

@TsFileApi
public ColumnSchemaBuilder dataType(TSDataType columnType) {
this.columnDataType = columnType;
return this;
}

@TsFileApi
public ColumnSchemaBuilder category(ColumnCategory columnCategory) {
this.columnCategory = columnCategory;
return this;
}

private void validateNameParameters() {
if (columnName == null || columnName.trim().isEmpty()) {
throw new IllegalStateException("Column name must be set before building");
}
if (columnDataType == null) {
throw new IllegalStateException("Column data type must be set before building");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public void finalizeColumnSchema() {
List<IMeasurementSchema> allColumns = new ArrayList<>(generateIdColumns());
List<ColumnCategory> allColumnCategories =
ColumnCategory.nCopy(ColumnCategory.ID, allColumns.size());
allColumns.addAll(columnSchemas);
allColumns.addAll(measurementSchemas);
allColumnCategories.addAll(columnCategories);
columnSchemas = allColumns;
measurementSchemas = allColumns;
columnCategories = allColumnCategories;
updatable = false;
}
Expand Down
Loading

0 comments on commit 62e4a41

Please sign in to comment.