diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index 4e3d5f42a..656ad01ec 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -4,7 +4,7 @@ ################### 被测数据库配置 ####################### # 被测试的数据库,目前的格式为{name}{-version}{-insert mode}(注意-号)其全部参考值参见README文件,注意版本需要匹配 -# DB_SWITCH=IoTDB-110-SESSION_BY_TABLET +# DB_SWITCH=IoTDB-130-SESSION_BY_TABLET # 主机列表,如果有多个主机则使用英文逗号进行分割 # HOST=127.0.0.1 diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Constants.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Constants.java index 4633822f7..ee638d934 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Constants.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Constants.java @@ -51,6 +51,12 @@ public class Constants { public static final String HASH_SG_ASSIGN_MODE = "hash"; public static final String DIV_SG_ASSIGN_MODE = "div"; + public static final String IOTDB130_JDBC_CLASS = "cn.edu.tsinghua.iot.benchmark.iotdb130.IoTDB"; + public static final String IOTDB130_SESSION_CLASS = + "cn.edu.tsinghua.iot.benchmark.iotdb130.IoTDBSession"; + public static final String IOTDB130_ROUNDROBIN_SESSION_CLASS = + "cn.edu.tsinghua.iot.benchmark.iotdb130.IoTDBClusterSession"; + public static final String IOTDB110_JDBC_CLASS = "cn.edu.tsinghua.iot.benchmark.iotdb110.IoTDB"; public static final String IOTDB110_SESSION_CLASS = "cn.edu.tsinghua.iot.benchmark.iotdb110.IoTDBSession"; diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBConfig.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBConfig.java index 148b565d8..9ed3c28eb 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBConfig.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBConfig.java @@ -30,7 +30,7 @@ public class DBConfig { * The database to use, format: {name of database}{-version}{-insert mode} name of database, for * more, in README.md */ - private DBSwitch DB_SWITCH = DBSwitch.DB_IOT_110_SESSION_BY_TABLET; + private DBSwitch DB_SWITCH = DBSwitch.DB_IOT_130_SESSION_BY_TABLET; /** The host of database server for IoTDB */ private List HOST = Collections.singletonList("127.0.0.1"); /** The port of database server */ diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBFactory.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBFactory.java index 29f86bd96..930c54ff4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBFactory.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBFactory.java @@ -38,6 +38,19 @@ public IDatabase getDatabase(DBConfig dbConfig) throws SQLException { String dbClass = ""; try { switch (dbConfig.getDB_SWITCH()) { + // IoTDB 1.3 + case DB_IOT_130_JDBC: + dbClass = Constants.IOTDB130_JDBC_CLASS; + break; + case DB_IOT_130_SESSION_BY_TABLET: + case DB_IOT_130_SESSION_BY_RECORD: + case DB_IOT_130_SESSION_BY_RECORDS: + if (config.isIS_ALL_NODES_VISIBLE()) { + dbClass = Constants.IOTDB130_ROUNDROBIN_SESSION_CLASS; + } else { + dbClass = Constants.IOTDB130_SESSION_CLASS; + } + break; // IoTDB 1.1 case DB_IOT_110_JDBC: dbClass = Constants.IOTDB110_JDBC_CLASS; diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBSwitch.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBSwitch.java index 06475cd0f..8a474fbc7 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBSwitch.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBSwitch.java @@ -23,6 +23,13 @@ import org.slf4j.LoggerFactory; public enum DBSwitch { + DB_IOT_130_JDBC(DBType.IoTDB, DBVersion.IOTDB_130, DBInsertMode.INSERT_USE_JDBC), + DB_IOT_130_SESSION_BY_TABLET( + DBType.IoTDB, DBVersion.IOTDB_130, DBInsertMode.INSERT_USE_SESSION_TABLET), + DB_IOT_130_SESSION_BY_RECORD( + DBType.IoTDB, DBVersion.IOTDB_130, DBInsertMode.INSERT_USE_SESSION_RECORD), + DB_IOT_130_SESSION_BY_RECORDS( + DBType.IoTDB, DBVersion.IOTDB_130, DBInsertMode.INSERT_USE_SESSION_RECORDS), DB_IOT_110_JDBC(DBType.IoTDB, DBVersion.IOTDB_110, DBInsertMode.INSERT_USE_JDBC), DB_IOT_110_SESSION_BY_TABLET( DBType.IoTDB, DBVersion.IOTDB_110, DBInsertMode.INSERT_USE_SESSION_TABLET), diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBVersion.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBVersion.java index 57d021fb0..7c5e6e169 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBVersion.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/enums/DBVersion.java @@ -20,6 +20,7 @@ package cn.edu.tsinghua.iot.benchmark.tsdb.enums; public enum DBVersion { + IOTDB_130("130"), IOTDB_110("110"), IOTDB_100("100"), IOTDB_013("013"), diff --git a/iotdb-1.3/pom.xml b/iotdb-1.3/pom.xml new file mode 100644 index 000000000..1252003b1 --- /dev/null +++ b/iotdb-1.3/pom.xml @@ -0,0 +1,112 @@ + + + + 4.0.0 + + cn.edu.tsinghua + iot-benchmark + 1.0.0 + ../pom.xml + + iotdb-1.3 + Benchmark IoTDB 1.3 + + + + snapshot_id + snapshot_name + https://repository.apache.org/content/repositories/snapshots + + false + + + true + + + + + + + 1.3.14 + + + + + cn.edu.tsinghua + core + ${project.version} + + + org.apache.iotdb + iotdb-jdbc + 1.3.1-SNAPSHOT + + + org.apache.iotdb + iotdb-session + 1.3.1-SNAPSHOT + + + ch.qos.logback + logback-classic + ${logback.version} + + + ch.qos.logback + logback-core + ${logback.version} + + + + + iot-benchmark-iotdb-1.3 + + + org.apache.maven.plugins + maven-assembly-plugin + 3.1.0 + + + + server-assembly + package + + single + + + + src/assembly/assembly.xml + + false + + + true + true + + + + + + + + + diff --git a/iotdb-1.3/src/assembly/assembly.xml b/iotdb-1.3/src/assembly/assembly.xml new file mode 100644 index 000000000..0f61aa26f --- /dev/null +++ b/iotdb-1.3/src/assembly/assembly.xml @@ -0,0 +1,71 @@ + + + + IoTDB-1.3 + + dir + zip + + true + + + lib + + + + + ${maven.multiModuleProjectDirectory}/configuration/bin/ + bin + 0755 + + + ${maven.multiModuleProjectDirectory}/configuration/conf/ + conf + + + + + ${maven.multiModuleProjectDirectory}/configuration/benchmark.bat + + + ${maven.multiModuleProjectDirectory}/configuration/benchmark.sh + 0755 + + + ${maven.multiModuleProjectDirectory}/configuration/rep-benchmark.sh + 0755 + + + + ${maven.multiModuleProjectDirectory}/configuration/cli-benchmark.sh + 0755 + + + ${maven.multiModuleProjectDirectory}/configuration/routine + + + ${maven.multiModuleProjectDirectory}/LICENSE + + + ${project.build.outputDirectory}/git.properties + + + diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IBenchmarkSession.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IBenchmarkSession.java new file mode 100644 index 000000000..11c5eb3e3 --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IBenchmarkSession.java @@ -0,0 +1,59 @@ +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; + +import java.util.List; + +public interface IBenchmarkSession { + void open() throws IoTDBConnectionException; + + void open(boolean enableRPCCompression) throws IoTDBConnectionException; + + void insertRecord( + String deviceId, + long time, + List measurements, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException; + + void insertAlignedRecord( + String multiSeriesId, + long time, + List multiMeasurementComponents, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException; + + void insertRecords( + List deviceIds, + List times, + List> measurementsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException; + + void insertAlignedRecords( + List multiSeriesIds, + List times, + List> multiMeasurementComponentsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException; + + void insertTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException; + + void insertAlignedTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException; + + ISessionDataSet executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException; + + void executeNonQueryStatement(String deleteSeriesSql) + throws IoTDBConnectionException, StatementExecutionException; + + void close() throws IoTDBConnectionException; +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/ISessionDataSet.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/ISessionDataSet.java new file mode 100644 index 000000000..a56c64e33 --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/ISessionDataSet.java @@ -0,0 +1,16 @@ +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +public interface ISessionDataSet { + RowRecord next() throws IoTDBConnectionException, StatementExecutionException; + + boolean hasNext() throws IoTDBConnectionException, StatementExecutionException; + + void close() throws IoTDBConnectionException, StatementExecutionException; + + SessionDataSet.DataIterator iterator(); +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDB.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDB.java new file mode 100644 index 000000000..2c136be79 --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDB.java @@ -0,0 +1,956 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.isession.template.Template; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.template.MeasurementNode; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import cn.edu.tsinghua.iot.benchmark.client.operation.Operation; +import cn.edu.tsinghua.iot.benchmark.conf.Config; +import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; +import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch; +import cn.edu.tsinghua.iot.benchmark.entity.DeviceSummary; +import cn.edu.tsinghua.iot.benchmark.entity.Record; +import cn.edu.tsinghua.iot.benchmark.entity.Sensor; +import cn.edu.tsinghua.iot.benchmark.entity.enums.SensorType; +import cn.edu.tsinghua.iot.benchmark.exception.DBConnectException; +import cn.edu.tsinghua.iot.benchmark.measurement.Status; +import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema; +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.IDatabase; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; +import cn.edu.tsinghua.iot.benchmark.utils.TimeUtils; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeValueQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggValueQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.DeviceQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.LatestPointQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.PreciseQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.RangeQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.ValueRangeQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.VerificationQuery; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** this class will create more than one connection. */ +public class IoTDB implements IDatabase { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDB.class); + private static final String ALREADY_KEYWORD = "already"; + private static final AtomicBoolean templateInit = new AtomicBoolean(false); + protected final String DELETE_SERIES_SQL; + protected SingleNodeJDBCConnection ioTDBConnection; + + protected static final Config config = ConfigDescriptor.getInstance().getConfig(); + protected static final CyclicBarrier templateBarrier = + new CyclicBarrier(config.getCLIENT_NUMBER()); + protected static final CyclicBarrier schemaBarrier = new CyclicBarrier(config.getCLIENT_NUMBER()); + protected static final CyclicBarrier activateTemplateBarrier = + new CyclicBarrier(config.getCLIENT_NUMBER()); + protected static Set storageGroups = Collections.synchronizedSet(new HashSet<>()); + protected final String ROOT_SERIES_NAME; + protected ExecutorService service; + protected Future task; + protected DBConfig dbConfig; + protected Random random = new Random(config.getDATA_SEED()); + + public IoTDB(DBConfig dbConfig) { + this.dbConfig = dbConfig; + ROOT_SERIES_NAME = "root." + dbConfig.getDB_NAME(); + DELETE_SERIES_SQL = "delete storage group root." + dbConfig.getDB_NAME() + ".*"; + } + + @Override + public void init() throws TsdbException { + if (ioTDBConnection == null) { + try { + ioTDBConnection = new SingleNodeJDBCConnection(dbConfig); + ioTDBConnection.init(); + this.service = + Executors.newSingleThreadExecutor(new NamedThreadFactory("DataClientExecuteJob")); + } catch (Exception e) { + throw new TsdbException(e); + } + } + } + + @Override + public void cleanup() { + try (Statement statement = ioTDBConnection.getConnection().createStatement()) { + statement.execute(DELETE_SERIES_SQL); + LOGGER.info("Finish clean data!"); + } catch (Exception e) { + LOGGER.warn("No Data to Clean!"); + } + } + + @Override + public void close() throws TsdbException { + if (ioTDBConnection != null) { + ioTDBConnection.close(); + } + if (service != null) { + service.shutdownNow(); + } + if (task != null) { + task.cancel(true); + } + } + + @Override + public Double registerSchema(List schemaList) throws TsdbException { + // create timeseries one by one is too slow in current cluster server. + // therefore, we use session to create time series in batch. + long start = System.nanoTime(); + long end; + if (config.hasWrite()) { + Map> sessionListMap = new HashMap<>(); + try { + // open meta session + if (!config.isIS_ALL_NODES_VISIBLE()) { + Session metaSession = + new Session.Builder() + .host(dbConfig.getHOST().get(0)) + .port(Integer.parseInt(dbConfig.getPORT().get(0))) + .username(dbConfig.getUSERNAME()) + .password(dbConfig.getPASSWORD()) + .version(Version.V_1_0) + .build(); + metaSession.open(config.isENABLE_THRIFT_COMPRESSION()); + sessionListMap.put(metaSession, createTimeseries(schemaList)); + } else { + int sessionNumber = dbConfig.getHOST().size(); + List keys = new ArrayList<>(); + for (int i = 0; i < sessionNumber; i++) { + Session metaSession = + new Session.Builder() + .host(dbConfig.getHOST().get(i)) + .port(Integer.parseInt(dbConfig.getPORT().get(i))) + .username(dbConfig.getUSERNAME()) + .password(dbConfig.getPASSWORD()) + .version(Version.V_1_0) + .build(); + metaSession.open(config.isENABLE_THRIFT_COMPRESSION()); + keys.add(metaSession); + sessionListMap.put(metaSession, new ArrayList<>()); + } + for (int i = 0; i < schemaList.size(); i++) { + sessionListMap + .get(keys.get(i % sessionNumber)) + .add(createTimeseries(schemaList.get(i))); + } + } + + if (config.isTEMPLATE() && templateInit.compareAndSet(false, true)) { + Template template = null; + if (config.isTEMPLATE() && schemaList.size() > 0) { + template = createTemplate(schemaList.get(0)); + } + start = System.nanoTime(); + int sessionIndex = random.nextInt(sessionListMap.size()); + Session templateSession = new ArrayList<>(sessionListMap.keySet()).get(sessionIndex); + registerTemplate(templateSession, template); + } else { + start = System.nanoTime(); + } + templateBarrier.await(); + for (Map.Entry> pair : sessionListMap.entrySet()) { + registerStorageGroups(pair.getKey(), pair.getValue()); + } + schemaBarrier.await(); + if (config.isTEMPLATE()) { + for (Map.Entry> pair : sessionListMap.entrySet()) { + activateTemplate(pair.getKey(), pair.getValue()); + } + activateTemplateBarrier.await(); + } + if (!config.isTEMPLATE()) { + for (Map.Entry> pair : sessionListMap.entrySet()) { + registerTimeseries(pair.getKey(), pair.getValue()); + } + } + } catch (Exception e) { + throw new TsdbException(e); + } finally { + if (sessionListMap.size() != 0) { + Set sessions = sessionListMap.keySet(); + for (Session session : sessions) { + try { + session.close(); + } catch (IoTDBConnectionException e) { + LOGGER.error("Schema-register session cannot be closed: {}", e.getMessage()); + } + } + } + } + } + end = System.nanoTime(); + return TimeUtils.convertToSeconds(end - start, "ns"); + } + + /** create template */ + private Template createTemplate(DeviceSchema deviceSchema) { + Template template = null; + if (config.isTEMPLATE()) { + if (config.isVECTOR()) { + template = new Template(config.getTEMPLATE_NAME(), true); + } else { + template = new Template(config.getTEMPLATE_NAME(), false); + } + try { + for (Sensor sensor : deviceSchema.getSensors()) { + MeasurementNode measurementNode = + new MeasurementNode( + sensor.getName(), + Enum.valueOf(TSDataType.class, sensor.getSensorType().name), + Enum.valueOf(TSEncoding.class, getEncodingType(sensor.getSensorType())), + Enum.valueOf(CompressionType.class, config.getCOMPRESSOR())); + template.addToTemplate(measurementNode); + } + } catch (StatementExecutionException e) { + LOGGER.error(e.getMessage()); + return null; + } + } + return template; + } + + /** register template */ + private void registerTemplate(Session metaSession, Template template) + throws IoTDBConnectionException, IOException { + try { + metaSession.createSchemaTemplate(template); + } catch (StatementExecutionException e) { + // do nothing + e.printStackTrace(); + } + } + + private void registerStorageGroups(Session metaSession, List schemaList) + throws TsdbException { + // get all storage groups + Set groups = new HashSet<>(); + for (TimeseriesSchema timeseriesSchema : schemaList) { + DeviceSchema schema = timeseriesSchema.getDeviceSchema(); + synchronized (IoTDB.class) { + if (!storageGroups.contains(schema.getGroup())) { + groups.add(schema.getGroup()); + storageGroups.add(schema.getGroup()); + } + } + } + // register storage groups + for (String group : groups) { + try { + metaSession.setStorageGroup(ROOT_SERIES_NAME + "." + group); + if (config.isTEMPLATE()) { + metaSession.setSchemaTemplate(config.getTEMPLATE_NAME(), ROOT_SERIES_NAME + "." + group); + } + } catch (Exception e) { + handleRegisterException(e); + } + } + } + + private void activateTemplate(Session metaSession, List schemaList) { + try { + List devicePaths = + schemaList.stream() + .map(schema -> ROOT_SERIES_NAME + "." + schema.getDeviceSchema().getDevicePath()) + .collect(Collectors.toList()); + metaSession.createTimeseriesUsingSchemaTemplate(devicePaths); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + private TimeseriesSchema createTimeseries(DeviceSchema deviceSchema) { + List paths = new ArrayList<>(); + List tsDataTypes = new ArrayList<>(); + List tsEncodings = new ArrayList<>(); + List compressionTypes = new ArrayList<>(); + for (Sensor sensor : deviceSchema.getSensors()) { + if (config.isVECTOR()) { + paths.add(sensor.getName()); + } else { + paths.add(getSensorPath(deviceSchema, sensor.getName())); + } + SensorType datatype = sensor.getSensorType(); + tsDataTypes.add(Enum.valueOf(TSDataType.class, datatype.name)); + tsEncodings.add(Enum.valueOf(TSEncoding.class, getEncodingType(datatype))); + compressionTypes.add(Enum.valueOf(CompressionType.class, config.getCOMPRESSOR())); + } + TimeseriesSchema timeseriesSchema = + new TimeseriesSchema(deviceSchema, paths, tsDataTypes, tsEncodings, compressionTypes); + if (config.isVECTOR()) { + timeseriesSchema.setDeviceId(getDevicePath(deviceSchema)); + } + return timeseriesSchema; + } + + private List createTimeseries(List schemaList) { + List timeseriesSchemas = new ArrayList<>(); + for (DeviceSchema deviceSchema : schemaList) { + TimeseriesSchema timeseriesSchema = createTimeseries(deviceSchema); + timeseriesSchemas.add(timeseriesSchema); + } + return timeseriesSchemas; + } + + private void registerTimeseries(Session metaSession, List timeseriesSchemas) + throws TsdbException { + // create time series + for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) { + try { + if (config.isVECTOR()) { + metaSession.createAlignedTimeseries( + timeseriesSchema.getDeviceId(), + timeseriesSchema.getPaths(), + timeseriesSchema.getTsDataTypes(), + timeseriesSchema.getTsEncodings(), + timeseriesSchema.getCompressionTypes(), + null); + } else { + metaSession.createMultiTimeseries( + timeseriesSchema.getPaths(), + timeseriesSchema.getTsDataTypes(), + timeseriesSchema.getTsEncodings(), + timeseriesSchema.getCompressionTypes(), + null, + null, + null, + null); + } + } catch (Exception e) { + handleRegisterException(e); + } + } + } + + private void handleRegisterException(Exception e) throws TsdbException { + // ignore if already has the time series + if (!e.getMessage().contains(ALREADY_KEYWORD) && !e.getMessage().contains("300")) { + LOGGER.error("Register IoTDB schema failed because ", e); + throw new TsdbException(e); + } + } + + @Override + public Status insertOneBatch(IBatch batch) throws DBConnectException { + try (Statement statement = ioTDBConnection.getConnection().createStatement()) { + for (Record record : batch.getRecords()) { + String sql = + getInsertOneBatchSql( + batch.getDeviceSchema(), record.getTimestamp(), record.getRecordDataValue()); + statement.addBatch(sql); + } + statement.executeBatch(); + return new Status(true); + } catch (Exception e) { + return new Status(false, 0, e, e.toString()); + } + } + + /** + * Q1: PreciseQuery SQL: select {sensors} from {devices} where time = {time} + * + * @param preciseQuery universal precise query condition parameters + * @return + */ + @Override + public Status preciseQuery(PreciseQuery preciseQuery) { + String strTime = preciseQuery.getTimestamp() + ""; + String sql = getSimpleQuerySqlHead(preciseQuery.getDeviceSchema()) + " WHERE time = " + strTime; + return executeQueryAndGetStatus(sql, Operation.PRECISE_QUERY); + } + + /** + * Q2: RangeQuery SQL: select {sensors} from {devices} where time >= {startTime} and time <= + * {endTime} + * + * @param rangeQuery universal range query condition parameters + * @return + */ + @Override + public Status rangeQuery(RangeQuery rangeQuery) { + String sql = + getRangeQuerySql( + rangeQuery.getDeviceSchema(), + rangeQuery.getStartTimestamp(), + rangeQuery.getEndTimestamp()); + return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY); + } + + /** + * Q3: ValueRangeQuery SQL: select {sensors} from {devices} where time >= {startTime} and time <= + * {endTime} and {sensors} > {value} + * + * @param valueRangeQuery contains universal range query with value filter parameters + * @return + */ + @Override + public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) { + String sql = getValueRangeQuerySql(valueRangeQuery); + return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY); + } + + /** + * Q4: AggRangeQuery SQL: select {AggFun}({sensors}) from {devices} where time >= {startTime} and + * time <= {endTime} + * + * @param aggRangeQuery contains universal aggregation query with time filter parameters + * @return + */ + @Override + public Status aggRangeQuery(AggRangeQuery aggRangeQuery) { + String aggQuerySqlHead = + getAggQuerySqlHead(aggRangeQuery.getDeviceSchema(), aggRangeQuery.getAggFun()); + String sql = + addWhereTimeClause( + aggQuerySqlHead, aggRangeQuery.getStartTimestamp(), aggRangeQuery.getEndTimestamp()); + return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_QUERY); + } + + /** + * Q5: AggValueQuery SQL: select {AggFun}({sensors}) from {devices} where {sensors} > {value} + * + * @param aggValueQuery contains universal aggregation query with value filter parameters + * @return + */ + @Override + public Status aggValueQuery(AggValueQuery aggValueQuery) { + String aggQuerySqlHead = + getAggQuerySqlHead(aggValueQuery.getDeviceSchema(), aggValueQuery.getAggFun()); + String sql = + aggQuerySqlHead + + " WHERE " + + getValueFilterClause( + aggValueQuery.getDeviceSchema(), (int) aggValueQuery.getValueThreshold()) + .substring(4); + return executeQueryAndGetStatus(sql, Operation.AGG_VALUE_QUERY); + } + + /** + * Q6: AggRangeValueQuery SQL: select {AggFun}({sensors}) from {devices} where time >= {startTime} + * and time <= {endTime} and {sensors} > {value} + * + * @param aggRangeValueQuery contains universal aggregation query with time and value filters + * parameters + * @return + */ + @Override + public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { + String aggQuerySqlHead = + getAggQuerySqlHead(aggRangeValueQuery.getDeviceSchema(), aggRangeValueQuery.getAggFun()); + String sql = + addWhereTimeClause( + aggQuerySqlHead, + aggRangeValueQuery.getStartTimestamp(), + aggRangeValueQuery.getEndTimestamp()); + sql += + getValueFilterClause( + aggRangeValueQuery.getDeviceSchema(), (int) aggRangeValueQuery.getValueThreshold()); + return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_VALUE_QUERY); + } + + /** + * Q7: GroupByQuery SQL: select {AggFun}({sensors}) from {devices} group by ([{start}, {end}], + * {Granularity}ms) + * + * @param groupByQuery contains universal group by query condition parameters + * @return + */ + @Override + public Status groupByQuery(GroupByQuery groupByQuery) { + String aggQuerySqlHead = + getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); + String sql = + addGroupByClause( + aggQuerySqlHead, + groupByQuery.getStartTimestamp(), + groupByQuery.getEndTimestamp(), + groupByQuery.getGranularity()); + return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY); + } + + /** + * Q8: LatestPointQuery SQL: select last {sensors} from {devices} + * + * @param latestPointQuery contains universal latest point query condition parameters + * @return + */ + @Override + public Status latestPointQuery(LatestPointQuery latestPointQuery) { + String aggQuerySqlHead = getLatestPointQuerySql(latestPointQuery.getDeviceSchema()); + return executeQueryAndGetStatus(aggQuerySqlHead, Operation.LATEST_POINT_QUERY); + } + + /** + * Q9: RangeQuery SQL: select {sensors} from {devices} where time >= {startTime} and time <= + * {endTime} order by time desc + * + * @param rangeQuery universal range query condition parameters + * @return + */ + @Override + public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { + String sql = + getRangeQuerySql( + rangeQuery.getDeviceSchema(), + rangeQuery.getStartTimestamp(), + rangeQuery.getEndTimestamp()) + + " order by time desc"; + return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY_ORDER_BY_TIME_DESC); + } + + /** + * Q10: ValueRangeQuery SQL: select {sensors} from {devices} where time >= {startTime} and time <= + * {endTime} and {sensors} > {value} order by time desc + * + * @param valueRangeQuery contains universal range query with value filter parameters + * @return + */ + @Override + public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) { + String sql = getValueRangeQuerySql(valueRangeQuery) + " order by time desc"; + return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC); + } + + /** + * Generate simple query header. + * + * @param devices schema list of query devices + * @return Simple Query header. e.g. Select sensors from devices + */ + protected String getSimpleQuerySqlHead(List devices) { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT "); + List querySensors = devices.get(0).getSensors(); + builder.append(querySensors.get(0).getName()); + for (int i = 1; i < querySensors.size(); i++) { + builder.append(", ").append(querySensors.get(i).getName()); + } + return addFromClause(devices, builder); + } + + private String getAggQuerySqlHead(List devices, String aggFun) { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT "); + List querySensors = devices.get(0).getSensors(); + builder.append(aggFun).append("(").append(querySensors.get(0).getName()).append(")"); + for (int i = 1; i < querySensors.size(); i++) { + builder + .append(", ") + .append(aggFun) + .append("(") + .append(querySensors.get(i).getName()) + .append(")"); + } + return addFromClause(devices, builder); + } + + /** + * Add from Clause + * + * @param devices + * @param builder + * @return From clause, e.g. FROM devices + */ + private String addFromClause(List devices, StringBuilder builder) { + builder.append(" FROM ").append(getDevicePath(devices.get(0))); + for (int i = 1; i < devices.size(); i++) { + builder.append(", ").append(getDevicePath(devices.get(i))); + } + return builder.toString(); + } + + private String getValueRangeQuerySql(ValueRangeQuery valueRangeQuery) { + String rangeQuerySql = + getRangeQuerySql( + valueRangeQuery.getDeviceSchema(), + valueRangeQuery.getStartTimestamp(), + valueRangeQuery.getEndTimestamp()); + String valueFilterClause = + getValueFilterClause( + valueRangeQuery.getDeviceSchema(), (int) valueRangeQuery.getValueThreshold()); + return rangeQuerySql + valueFilterClause; + } + + private String getValueFilterClause(List deviceSchemas, int valueThreshold) { + StringBuilder builder = new StringBuilder(); + for (DeviceSchema deviceSchema : deviceSchemas) { + for (Sensor sensor : deviceSchema.getSensors()) { + builder + .append(" AND ") + .append(getDevicePath(deviceSchema)) + .append(".") + .append(sensor.getName()) + .append(" > ") + .append(valueThreshold); + } + } + return builder.toString(); + } + + private String getLatestPointQuerySql(List devices) { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT last "); + List querySensors = devices.get(0).getSensors(); + builder.append(querySensors.get(0).getName()); + for (int i = 1; i < querySensors.size(); i++) { + builder.append(", ").append(querySensors.get(i).getName()); + } + return addFromClause(devices, builder); + } + + private String getRangeQuerySql(List deviceSchemas, long start, long end) { + return addWhereTimeClause(getSimpleQuerySqlHead(deviceSchemas), start, end); + } + + private String addWhereTimeClause(String prefix, long start, long end) { + String startTime = start + ""; + String endTime = end + ""; + return prefix + " WHERE time >= " + startTime + " AND time <= " + endTime; + } + + private String addGroupByClause(String prefix, long start, long end, long granularity) { + return prefix + " group by ([" + start + "," + end + ")," + granularity + "ms) "; + } + + /** + * convert deviceSchema to the format + * + * @param deviceSchema + * @return format, e.g. root.group_1.d_1 + */ + protected String getDevicePath(DeviceSchema deviceSchema) { + StringBuilder name = new StringBuilder(ROOT_SERIES_NAME); + name.append(".").append(deviceSchema.getGroup()); + for (Map.Entry pair : deviceSchema.getTags().entrySet()) { + name.append(".").append(pair.getValue()); + } + name.append(".").append(deviceSchema.getDevice()); + return name.toString(); + } + + protected Status executeQueryAndGetStatus(String sql, Operation operation) { + String executeSQL; + if (config.isIOTDB_USE_DEBUG() && random.nextDouble() < config.getIOTDB_USE_DEBUG_RATIO()) { + executeSQL = "debug " + sql; + } else { + executeSQL = sql; + } + if (!config.isIS_QUIET_MODE()) { + LOGGER.info("{} query SQL: {}", Thread.currentThread().getName(), executeSQL); + } + AtomicInteger line = new AtomicInteger(); + AtomicLong queryResultPointNum = new AtomicLong(); + AtomicBoolean isOk = new AtomicBoolean(true); + try (Statement statement = ioTDBConnection.getConnection().createStatement()) { + List> records = new ArrayList<>(); + task = + service.submit( + () -> { + try { + try (ResultSet resultSet = statement.executeQuery(executeSQL)) { + while (resultSet.next()) { + line.getAndIncrement(); + if (config.isIS_COMPARISON()) { + List record = new ArrayList<>(); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + switch (operation) { + case LATEST_POINT_QUERY: + if (i == 2 || i >= 4) { + continue; + } + break; + default: + break; + } + record.add(resultSet.getObject(i)); + } + records.add(record); + } + } + } + } catch (SQLException e) { + LOGGER.error("exception occurred when execute query={}", executeSQL, e); + isOk.set(false); + } + long resultPointNum = line.get(); + if (!Operation.LATEST_POINT_QUERY.equals(operation)) { + resultPointNum *= config.getQUERY_SENSOR_NUM(); + resultPointNum *= config.getQUERY_DEVICE_NUM(); + } + queryResultPointNum.set(resultPointNum); + }); + try { + task.get(config.getREAD_OPERATION_TIMEOUT_MS(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + task.cancel(true); + return new Status(false, queryResultPointNum.get(), e, executeSQL); + } + if (isOk.get() == true) { + if (config.isIS_COMPARISON()) { + return new Status(true, queryResultPointNum.get(), executeSQL, records); + } else { + return new Status(true, queryResultPointNum.get()); + } + } else { + return new Status( + false, queryResultPointNum.get(), new Exception("Failed to execute."), executeSQL); + } + } catch (Exception e) { + return new Status(false, queryResultPointNum.get(), e, executeSQL); + } catch (Throwable t) { + return new Status(false, queryResultPointNum.get(), new Exception(t), executeSQL); + } + } + + public String getInsertOneBatchSql( + DeviceSchema deviceSchema, long timestamp, List values) { + StringBuilder builder = new StringBuilder("insert into "); + builder.append(getDevicePath(deviceSchema)).append("(timestamp"); + for (Sensor sensor : deviceSchema.getSensors()) { + builder.append(",").append(sensor.getName()); + } + if (config.isVECTOR() == true) { + builder.append(") aligned values("); + } else { + builder.append(") values("); + } + builder.append(timestamp); + int sensorIndex = 0; + List sensors = deviceSchema.getSensors(); + for (Object value : values) { + switch (sensors.get(sensorIndex).getSensorType()) { + case BOOLEAN: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + builder.append(",").append(value); + break; + case TEXT: + builder.append(",").append("'").append(value).append("'"); + break; + } + sensorIndex++; + } + builder.append(")"); + LOGGER.debug("getInsertOneBatchSql: {}", builder); + return builder.toString(); + } + + /** + * Using in verification + * + * @param verificationQuery + */ + @Override + public Status verificationQuery(VerificationQuery verificationQuery) { + DeviceSchema deviceSchema = verificationQuery.getDeviceSchema(); + List deviceSchemas = new ArrayList<>(); + deviceSchemas.add(deviceSchema); + + List records = verificationQuery.getRecords(); + if (records == null || records.size() == 0) { + return new Status( + false, + new TsdbException("There are no records in verficationQuery."), + "There are no records in verficationQuery."); + } + + StringBuffer sql = new StringBuffer(); + sql.append(getSimpleQuerySqlHead(deviceSchemas)); + Map> recordMap = new HashMap<>(); + sql.append(" WHERE time = ").append(records.get(0).getTimestamp()); + recordMap.put(records.get(0).getTimestamp(), records.get(0).getRecordDataValue()); + for (int i = 1; i < records.size(); i++) { + Record record = records.get(i); + sql.append(" or time = ").append(record.getTimestamp()); + recordMap.put(record.getTimestamp(), record.getRecordDataValue()); + } + int point = 0; + int line = 0; + try (Statement statement = ioTDBConnection.getConnection().createStatement()) { + ResultSet resultSet = statement.executeQuery(sql.toString()); + while (resultSet.next()) { + long timeStamp = resultSet.getLong(1); + List values = recordMap.get(timeStamp); + for (int i = 0; i < values.size(); i++) { + String value = resultSet.getString(i + 2); + String target = String.valueOf(values.get(i)); + if (!value.equals(target)) { + LOGGER.error("Using SQL: " + sql + ",Expected:" + value + " but was: " + target); + } else { + point++; + } + } + line++; + } + } catch (Exception e) { + LOGGER.error("Query Error: " + sql); + return new Status(false, new TsdbException("Failed to query"), "Failed to query."); + } + if (recordMap.size() != line) { + LOGGER.error( + "Using SQL: " + sql + ",Expected line:" + recordMap.size() + " but was: " + line); + } + return new Status(true, point); + } + + @Override + public Status deviceQuery(DeviceQuery deviceQuery) throws SQLException, TsdbException { + DeviceSchema deviceSchema = deviceQuery.getDeviceSchema(); + String sql = + getDeviceQuerySql( + deviceSchema, deviceQuery.getStartTimestamp(), deviceQuery.getEndTimestamp()); + if (!config.isIS_QUIET_MODE()) { + LOGGER.info("IoTDB:" + sql); + } + List> result = new ArrayList<>(); + try (Statement statement = ioTDBConnection.getConnection().createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + int colNumber = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + List line = new ArrayList<>(); + for (int i = 1; i <= colNumber; i++) { + line.add(resultSet.getObject(i)); + } + result.add(line); + } + } catch (Exception e) { + LOGGER.error("Query Error: " + sql + " exception:" + e.getMessage()); + return new Status(false, new TsdbException("Failed to query"), "Failed to query."); + } + + return new Status(true, 0, sql.toString(), result); + } + + protected String getDeviceQuerySql( + DeviceSchema deviceSchema, long startTimeStamp, long endTimeStamp) { + StringBuffer sql = new StringBuffer(); + List deviceSchemas = new ArrayList<>(); + deviceSchemas.add(deviceSchema); + sql.append(getSimpleQuerySqlHead(deviceSchemas)); + sql.append(" where time >= ").append(startTimeStamp); + sql.append(" and time <").append(endTimeStamp); + sql.append(" order by time desc"); + return sql.toString(); + } + + @Override + public DeviceSummary deviceSummary(DeviceQuery deviceQuery) throws SQLException, TsdbException { + DeviceSchema deviceSchema = deviceQuery.getDeviceSchema(); + int totalLineNumber = 0; + long minTimeStamp = 0, maxTimeStamp = 0; + try (Statement statement = ioTDBConnection.getConnection().createStatement()) { + ResultSet resultSet = statement.executeQuery(getTotalLineNumberSql(deviceSchema)); + resultSet.next(); + totalLineNumber = Integer.parseInt(resultSet.getString(1)); + + resultSet = statement.executeQuery(getMaxTimeStampSql(deviceSchema)); + resultSet.next(); + maxTimeStamp = Long.parseLong(resultSet.getObject(1).toString()); + + resultSet = statement.executeQuery(getMinTimeStampSql(deviceSchema)); + resultSet.next(); + minTimeStamp = Long.parseLong(resultSet.getObject(1).toString()); + } + return new DeviceSummary(deviceSchema.getDevice(), totalLineNumber, minTimeStamp, maxTimeStamp); + } + + protected String getTotalLineNumberSql(DeviceSchema deviceSchema) { + return "select count(*) from " + getDevicePath(deviceSchema); + } + + protected String getMinTimeStampSql(DeviceSchema deviceSchema) { + return "select * from " + getDevicePath(deviceSchema) + " order by time limit 1"; + } + + protected String getMaxTimeStampSql(DeviceSchema deviceSchema) { + return "select * from " + getDevicePath(deviceSchema) + " order by time desc limit 1"; + } + + String getEncodingType(SensorType dataSensorType) { + switch (dataSensorType) { + case BOOLEAN: + return config.getENCODING_BOOLEAN(); + case INT32: + return config.getENCODING_INT32(); + case INT64: + return config.getENCODING_INT64(); + case FLOAT: + return config.getENCODING_FLOAT(); + case DOUBLE: + return config.getENCODING_DOUBLE(); + case TEXT: + return config.getENCODING_TEXT(); + default: + LOGGER.error("Unsupported data sensorType {}.", dataSensorType); + return null; + } + } + + /** + * convert deviceSchema and sensor to the format: root.group_1.d_1.s_1 + * + * @param deviceSchema + * @param sensor + * @return + */ + private String getSensorPath(DeviceSchema deviceSchema, String sensor) { + return getDevicePath(deviceSchema) + "." + sensor; + } +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBClusterSession.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBClusterSession.java new file mode 100644 index 000000000..8321460ad --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBClusterSession.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.write.record.Tablet; + +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; + +public class IoTDBClusterSession extends IoTDBSessionBase { + private class BenchmarkSessionPool implements IBenchmarkSession { + private final SessionPool sessionPool; + + public BenchmarkSessionPool( + List hostUrls, + String user, + String password, + int maxSize, + boolean enableCompression, + boolean enableRedirection) { + this.sessionPool = + new SessionPool( + hostUrls, + dbConfig.getUSERNAME(), + dbConfig.getPASSWORD(), + MAX_SESSION_CONNECTION_PER_CLIENT, + config.isENABLE_THRIFT_COMPRESSION(), + true); + } + + @Override + public void open() {} + + @Override + public void open(boolean enableRPCCompression) {} + + @Override + public void insertRecord( + String deviceId, + long time, + List measurements, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertRecord(deviceId, time, measurements, types, values); + } + + @Override + public void insertAlignedRecord( + String multiSeriesId, + long time, + List multiMeasurementComponents, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertAlignedRecord( + multiSeriesId, time, multiMeasurementComponents, types, values); + } + + @Override + public void insertRecords( + List deviceIds, + List times, + List> measurementsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); + } + + @Override + public void insertAlignedRecords( + List multiSeriesIds, + List times, + List> multiMeasurementComponentsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertAlignedRecords( + multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList); + } + + @Override + public void insertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertTablet(tablet); + } + + @Override + public void insertAlignedTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertAlignedTablet(tablet); + } + + @Override + public ISessionDataSet executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { + return new SessionDataSet2(sessionPool.executeQueryStatement(sql)); + } + + @Override + public void close() { + sessionPool.close(); + } + + @Override + public void executeNonQueryStatement(String deleteSeriesSql) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.executeNonQueryStatement(deleteSeriesSql); + } + + private class SessionDataSet2 implements ISessionDataSet { + SessionDataSetWrapper sessionDataSet; + + public SessionDataSet2(SessionDataSetWrapper sessionDataSetWrapper) { + this.sessionDataSet = sessionDataSetWrapper; + } + + @Override + public RowRecord next() throws IoTDBConnectionException, StatementExecutionException { + return sessionDataSet.next(); + } + + @Override + public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException { + return sessionDataSet.hasNext(); + } + + @Override + public void close() throws IoTDBConnectionException, StatementExecutionException { + sessionDataSet.close(); + } + + @Override + public SessionDataSet.DataIterator iterator() { + return sessionDataSet.iterator(); + } + } + } + + private static final int MAX_SESSION_CONNECTION_PER_CLIENT = 3; + + public IoTDBClusterSession(DBConfig dbConfig) { + super(dbConfig); + LOGGER = LoggerFactory.getLogger(IoTDBClusterSession.class); + List hostUrls = new ArrayList<>(dbConfig.getHOST().size()); + for (int i = 0; i < dbConfig.getHOST().size(); i++) { + hostUrls.add(dbConfig.getHOST().get(i) + ":" + dbConfig.getPORT().get(i)); + } + sessionWrapper = + new BenchmarkSessionPool( + hostUrls, + dbConfig.getUSERNAME(), + dbConfig.getPASSWORD(), + MAX_SESSION_CONNECTION_PER_CLIENT, + config.isENABLE_THRIFT_COMPRESSION(), + true); + } + + @Override + public void init() throws TsdbException { + // do nothing + this.service = Executors.newSingleThreadExecutor(); + } + + @Override + public void close() throws TsdbException { + if (sessionWrapper != null) { + try { + sessionWrapper.close(); + } catch (IoTDBConnectionException ignored) { + // should never happen + } + } + if (ioTDBConnection != null) { + ioTDBConnection.close(); + } + this.service.shutdown(); + } + + @Override + public void cleanup() { + try { + sessionWrapper.executeNonQueryStatement( + "drop database root." + config.getDbConfig().getDB_NAME() + ".**"); + } catch (IoTDBConnectionException e) { + LOGGER.error("Failed to connect to IoTDB:" + e.getMessage()); + } catch (StatementExecutionException e) { + LOGGER.error("Failed to execute statement:" + e.getMessage()); + } + + try { + sessionWrapper.executeNonQueryStatement("drop schema template " + config.getTEMPLATE_NAME()); + } catch (IoTDBConnectionException e) { + LOGGER.error("Failed to connect to IoTDB:" + e.getMessage()); + } catch (StatementExecutionException e) { + LOGGER.error("Failed to execute statement:" + e.getMessage()); + } + } +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBSession.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBSession.java new file mode 100644 index 000000000..6d6479bba --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBSession.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.write.record.Tablet; + +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; + +public class IoTDBSession extends IoTDBSessionBase { + private class BenchmarkSession implements IBenchmarkSession { + private final Session session; + + public BenchmarkSession(Session session) { + this.session = session; + } + + @Override + public void open() throws IoTDBConnectionException { + session.open(); + } + + @Override + public void open(boolean enableRPCCompression) throws IoTDBConnectionException { + session.open(enableRPCCompression); + } + + @Override + public void insertRecord( + String deviceId, + long time, + List measurements, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { + session.insertRecord(deviceId, time, measurements, types, values); + } + + @Override + public void insertAlignedRecord( + String multiSeriesId, + long time, + List multiMeasurementComponents, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { + session.insertAlignedRecord(multiSeriesId, time, multiMeasurementComponents, types, values); + } + + @Override + public void insertRecords( + List deviceIds, + List times, + List> measurementsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); + } + + @Override + public void insertAlignedRecords( + List multiSeriesIds, + List times, + List> multiMeasurementComponentsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + session.insertAlignedRecords( + multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList); + } + + @Override + public void insertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + session.insertTablet(tablet); + } + + @Override + public void insertAlignedTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + session.insertAlignedTablet(tablet); + } + + @Override + public ISessionDataSet executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { + return new SessionDataSet1(session.executeQueryStatement(sql)); + } + + @Override + public void close() throws IoTDBConnectionException { + session.close(); + } + + @Override + public void executeNonQueryStatement(String deleteSeriesSql) + throws IoTDBConnectionException, StatementExecutionException { + session.executeNonQueryStatement(deleteSeriesSql); + } + + private class SessionDataSet1 implements ISessionDataSet { + SessionDataSet sessionDataSet; + + public SessionDataSet1(SessionDataSet sessionDataSet) { + this.sessionDataSet = sessionDataSet; + } + + @Override + public RowRecord next() throws IoTDBConnectionException, StatementExecutionException { + return sessionDataSet.next(); + } + + @Override + public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException { + return sessionDataSet.hasNext(); + } + + @Override + public void close() throws IoTDBConnectionException, StatementExecutionException { + sessionDataSet.close(); + } + + @Override + public SessionDataSet.DataIterator iterator() { + return sessionDataSet.iterator(); + } + } + } + + public IoTDBSession(DBConfig dbConfig) { + super(dbConfig); + LOGGER = LoggerFactory.getLogger(IoTDBSession.class); + List hostUrls = new ArrayList<>(dbConfig.getHOST().size()); + for (int i = 0; i < dbConfig.getHOST().size(); i++) { + hostUrls.add(dbConfig.getHOST().get(i) + ":" + dbConfig.getPORT().get(i)); + } + sessionWrapper = + new BenchmarkSession( + new Session.Builder() + .nodeUrls(hostUrls) + .username(dbConfig.getUSERNAME()) + .password(dbConfig.getPASSWORD()) + .enableRedirection(true) + .version(Version.V_1_0) + .build()); + } + + @Override + public void init() { + try { + if (config.isENABLE_THRIFT_COMPRESSION()) { + sessionWrapper.open(true); + } else { + sessionWrapper.open(); + } + this.service = Executors.newSingleThreadExecutor(); + } catch (IoTDBConnectionException e) { + LOGGER.error("Failed to add session", e); + } + } + + @Override + public void cleanup() { + try { + sessionWrapper.executeNonQueryStatement( + "drop database root." + config.getDbConfig().getDB_NAME() + ".**"); + } catch (IoTDBConnectionException e) { + LOGGER.error("Failed to connect to IoTDB:" + e.getMessage()); + } catch (StatementExecutionException e) { + LOGGER.error("Failed to execute statement:" + e.getMessage()); + } + + try { + sessionWrapper.executeNonQueryStatement("drop schema template " + config.getTEMPLATE_NAME()); + } catch (IoTDBConnectionException e) { + LOGGER.error("Failed to connect to IoTDB:" + e.getMessage()); + } catch (StatementExecutionException e) { + LOGGER.error("Failed to execute statement:" + e.getMessage()); + } + } + + @Override + public void close() throws TsdbException { + try { + if (sessionWrapper != null) { + sessionWrapper.close(); + } + if (ioTDBConnection != null) { + ioTDBConnection.close(); + } + this.service.shutdown(); + } catch (IoTDBConnectionException ioTDBConnectionException) { + LOGGER.error("Failed to close session."); + throw new TsdbException(ioTDBConnectionException); + } + } +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBSessionBase.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBSessionBase.java new file mode 100644 index 000000000..68181a686 --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/IoTDBSessionBase.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import cn.edu.tsinghua.iot.benchmark.client.operation.Operation; +import cn.edu.tsinghua.iot.benchmark.conf.Config; +import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; +import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch; +import cn.edu.tsinghua.iot.benchmark.entity.DeviceSummary; +import cn.edu.tsinghua.iot.benchmark.entity.Record; +import cn.edu.tsinghua.iot.benchmark.entity.Sensor; +import cn.edu.tsinghua.iot.benchmark.entity.enums.SensorType; +import cn.edu.tsinghua.iot.benchmark.exception.OperationFailException; +import cn.edu.tsinghua.iot.benchmark.measurement.Status; +import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema; +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.DeviceQuery; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.VerificationQuery; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class IoTDBSessionBase extends IoTDB { + static Logger LOGGER; + + static final Config config = ConfigDescriptor.getInstance().getConfig(); + IBenchmarkSession sessionWrapper; + + private static final Map binaryCache = + new ConcurrentHashMap<>(config.getWORKLOAD_BUFFER_SIZE()); + + public IoTDBSessionBase(DBConfig dbConfig) { + super(dbConfig); + } + + public Status insertOneBatchByTablet(IBatch batch) { + Tablet tablet = genTablet(batch); + task = + service.submit( + () -> { + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedTablet(tablet); + } else { + sessionWrapper.insertTablet(tablet); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new OperationFailException(e); + } + }); + return waitWriteTaskToFinishAndGetStatus(); + } + + public Status insertOneBatchByRecord(IBatch batch) { + String deviceId = getDevicePath(batch.getDeviceSchema()); + int failRecord = 0; + List sensors = + batch.getDeviceSchema().getSensors().stream() + .map(Sensor::getName) + .collect(Collectors.toList()); + for (Record record : batch.getRecords()) { + long timestamp = record.getTimestamp(); + List dataTypes = + constructDataTypes( + batch.getDeviceSchema().getSensors(), record.getRecordDataValue().size()); + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedRecord( + deviceId, timestamp, sensors, dataTypes, record.getRecordDataValue()); + } else { + sessionWrapper.insertRecord( + deviceId, timestamp, sensors, dataTypes, record.getRecordDataValue()); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + failRecord++; + } + } + if (failRecord == 0) { + return new Status(true); + } else { + Exception e = new Exception("failRecord number is " + failRecord); + return new Status(false, 0, e, e.toString()); + } + } + + public Status insertOneBatchByRecords(IBatch batch) { + List deviceIds = new ArrayList<>(); + List times = new ArrayList<>(); + List> measurementsList = new ArrayList<>(); + List> typesList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List sensors = + batch.getDeviceSchema().getSensors().stream() + .map(Sensor::getName) + .collect(Collectors.toList()); + while (true) { + String deviceId = getDevicePath(batch.getDeviceSchema()); + for (Record record : batch.getRecords()) { + deviceIds.add(deviceId); + times.add(record.getTimestamp()); + measurementsList.add(sensors); + valuesList.add(record.getRecordDataValue()); + typesList.add( + constructDataTypes( + batch.getDeviceSchema().getSensors(), record.getRecordDataValue().size())); + } + if (!batch.hasNext()) { + break; + } + batch.next(); + } + task = + service.submit( + () -> { + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedRecords( + deviceIds, times, measurementsList, typesList, valuesList); + } else { + sessionWrapper.insertRecords( + deviceIds, times, measurementsList, typesList, valuesList); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new OperationFailException(e); + } + }); + return waitWriteTaskToFinishAndGetStatus(); + } + + @Override + protected Status executeQueryAndGetStatus(String sql, Operation operation) { + String executeSQL; + if (config.isIOTDB_USE_DEBUG() && random.nextDouble() < config.getIOTDB_USE_DEBUG_RATIO()) { + executeSQL = "debug " + sql; + } else { + executeSQL = sql; + } + if (!config.isIS_QUIET_MODE()) { + LOGGER.info("{} query SQL: {}", Thread.currentThread().getName(), executeSQL); + } + AtomicInteger line = new AtomicInteger(); + AtomicLong queryResultPointNum = new AtomicLong(); + AtomicBoolean isOk = new AtomicBoolean(true); + try { + List> records = new ArrayList<>(); + task = + service.submit( + () -> { + try { + ISessionDataSet sessionDataSet = sessionWrapper.executeQueryStatement(executeSQL); + if (config.isIS_COMPARISON()) { + while (sessionDataSet.hasNext()) { + RowRecord rowRecord = sessionDataSet.next(); + line.getAndIncrement(); + List record = new ArrayList<>(); + switch (operation) { + case AGG_RANGE_QUERY: + case AGG_VALUE_QUERY: + case AGG_RANGE_VALUE_QUERY: + break; + default: + record.add(rowRecord.getTimestamp()); + break; + } + List fields = rowRecord.getFields(); + for (int i = 0; i < fields.size(); i++) { + switch (operation) { + case LATEST_POINT_QUERY: + if (i == 0 || i == 2) { + continue; + } + default: + break; + } + record.add(fields.get(i).toString()); + } + records.add(record); + } + } else { + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + line.getAndIncrement(); + } + } + + sessionDataSet.close(); + } catch (StatementExecutionException | IoTDBConnectionException e) { + LOGGER.error("exception occurred when execute query={}", executeSQL, e); + isOk.set(false); + } + long resultPointNum = line.get(); + if (!Operation.LATEST_POINT_QUERY.equals(operation)) { + resultPointNum *= config.getQUERY_SENSOR_NUM(); + resultPointNum *= config.getQUERY_DEVICE_NUM(); + } + queryResultPointNum.set(resultPointNum); + }); + try { + task.get(config.getREAD_OPERATION_TIMEOUT_MS(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + task.cancel(true); + return new Status(false, queryResultPointNum.get(), e, executeSQL); + } + if (isOk.get()) { + if (config.isIS_COMPARISON()) { + return new Status(true, queryResultPointNum.get(), executeSQL, records); + } else { + return new Status(true, queryResultPointNum.get()); + } + } else { + return new Status( + false, queryResultPointNum.get(), new Exception("Failed to execute."), executeSQL); + } + } catch (Exception e) { + return new Status(false, queryResultPointNum.get(), e, executeSQL); + } catch (Throwable t) { + return new Status(false, queryResultPointNum.get(), new Exception(t), executeSQL); + } + } + + /** + * Using in verification + * + * @param verificationQuery the query of verification + */ + @Override + public Status verificationQuery(VerificationQuery verificationQuery) { + DeviceSchema deviceSchema = verificationQuery.getDeviceSchema(); + List deviceSchemas = new ArrayList<>(); + deviceSchemas.add(deviceSchema); + + List records = verificationQuery.getRecords(); + if (records == null || records.size() == 0) { + return new Status( + false, + new TsdbException("There are no records in verificationQuery."), + "There are no records in verificationQuery."); + } + + StringBuffer sql = new StringBuffer(); + sql.append(getSimpleQuerySqlHead(deviceSchemas)); + Map> recordMap = new HashMap<>(); + sql.append(" WHERE time = ").append(records.get(0).getTimestamp()); + recordMap.put(records.get(0).getTimestamp(), records.get(0).getRecordDataValue()); + for (int i = 1; i < records.size(); i++) { + Record record = records.get(i); + sql.append(" or time = ").append(record.getTimestamp()); + recordMap.put(record.getTimestamp(), record.getRecordDataValue()); + } + int point = 0; + int line = 0; + try { + ISessionDataSet sessionDataSet = sessionWrapper.executeQueryStatement(sql.toString()); + while (sessionDataSet.hasNext()) { + RowRecord rowRecord = sessionDataSet.next(); + long timeStamp = rowRecord.getTimestamp(); + List values = recordMap.get(timeStamp); + for (int i = 0; i < values.size(); i++) { + String value = rowRecord.getFields().get(i).toString(); + String target = String.valueOf(values.get(i)); + if (!value.equals(target)) { + LOGGER.error("Using SQL: " + sql + ",Expected:" + value + " but was: " + target); + } else { + point++; + } + } + line++; + } + sessionDataSet.close(); + } catch (Exception e) { + LOGGER.error("Query Error: " + sql); + return new Status(false, new TsdbException("Failed to query"), "Failed to query."); + } + if (recordMap.size() != line) { + LOGGER.error( + "Using SQL: " + sql + ",Expected line:" + recordMap.size() + " but was: " + line); + } + return new Status(true, point); + } + + @Override + public Status deviceQuery(DeviceQuery deviceQuery) { + DeviceSchema deviceSchema = deviceQuery.getDeviceSchema(); + String sql = + getDeviceQuerySql( + deviceSchema, deviceQuery.getStartTimestamp(), deviceQuery.getEndTimestamp()); + if (!config.isIS_QUIET_MODE()) { + LOGGER.info("IoTDB:" + sql); + } + List> result = new ArrayList<>(); + try { + ISessionDataSet sessionDataSet = sessionWrapper.executeQueryStatement(sql); + while (sessionDataSet.hasNext()) { + List line = new ArrayList<>(); + RowRecord rowRecord = sessionDataSet.next(); + line.add(rowRecord.getTimestamp()); + List fields = rowRecord.getFields(); + for (Field field : fields) { + line.add(field.getStringValue()); + } + result.add(line); + } + sessionDataSet.close(); + } catch (Exception e) { + LOGGER.error("Query Error: " + sql + " exception:" + e.getMessage()); + return new Status(false, new TsdbException("Failed to query"), "Failed to query."); + } + + return new Status(true, 0, sql, result); + } + + @Override + public DeviceSummary deviceSummary(DeviceQuery deviceQuery) throws TsdbException { + DeviceSchema deviceSchema = deviceQuery.getDeviceSchema(); + int totalLineNumber; + long minTimeStamp, maxTimeStamp; + try { + ISessionDataSet sessionDataSet = + sessionWrapper.executeQueryStatement(getTotalLineNumberSql(deviceSchema)); + RowRecord rowRecord = sessionDataSet.next(); + totalLineNumber = Integer.parseInt(rowRecord.getFields().get(0).toString()); + sessionDataSet.close(); + + sessionDataSet = sessionWrapper.executeQueryStatement(getMaxTimeStampSql(deviceSchema)); + rowRecord = sessionDataSet.next(); + maxTimeStamp = rowRecord.getTimestamp(); + sessionDataSet.close(); + + sessionDataSet = sessionWrapper.executeQueryStatement(getMinTimeStampSql(deviceSchema)); + rowRecord = sessionDataSet.next(); + minTimeStamp = rowRecord.getTimestamp(); + sessionDataSet.close(); + } catch (IoTDBConnectionException e) { + throw new TsdbException("Failed to connect to IoTDB:" + e.getMessage()); + } catch (StatementExecutionException e) { + throw new TsdbException("Failed to execute statement:" + e.getMessage()); + } + return new DeviceSummary(deviceSchema.getDevice(), totalLineNumber, minTimeStamp, maxTimeStamp); + } + + protected Tablet genTablet(IBatch batch) { + config.getWORKLOAD_BUFFER_SIZE(); + List schemaList = new ArrayList<>(); + int sensorIndex = 0; + for (Sensor sensor : batch.getDeviceSchema().getSensors()) { + SensorType dataSensorType = sensor.getSensorType(); + schemaList.add( + new MeasurementSchema( + sensor.getName(), + Enum.valueOf(TSDataType.class, dataSensorType.name), + Enum.valueOf(TSEncoding.class, getEncodingType(dataSensorType)))); + sensorIndex++; + } + String deviceId = getDevicePath(batch.getDeviceSchema()); + Tablet tablet = new Tablet(deviceId, schemaList, batch.getRecords().size()); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + + List sensors = batch.getDeviceSchema().getSensors(); + for (int recordIndex = 0; recordIndex < batch.getRecords().size(); recordIndex++) { + tablet.rowSize++; + Record record = batch.getRecords().get(recordIndex); + sensorIndex = 0; + long currentTime = record.getTimestamp(); + timestamps[recordIndex] = currentTime; + for (int recordValueIndex = 0; + recordValueIndex < record.getRecordDataValue().size(); + recordValueIndex++) { + switch (sensors.get(sensorIndex).getSensorType()) { + case BOOLEAN: + boolean[] sensorsBool = (boolean[]) values[recordValueIndex]; + sensorsBool[recordIndex] = + (boolean) (record.getRecordDataValue().get(recordValueIndex)); + break; + case INT32: + int[] sensorsInt = (int[]) values[recordValueIndex]; + sensorsInt[recordIndex] = (int) (record.getRecordDataValue().get(recordValueIndex)); + break; + case INT64: + long[] sensorsLong = (long[]) values[recordValueIndex]; + sensorsLong[recordIndex] = (long) (record.getRecordDataValue().get(recordValueIndex)); + break; + case FLOAT: + float[] sensorsFloat = (float[]) values[recordValueIndex]; + sensorsFloat[recordIndex] = (float) (record.getRecordDataValue().get(recordValueIndex)); + break; + case DOUBLE: + double[] sensorsDouble = (double[]) values[recordValueIndex]; + sensorsDouble[recordIndex] = + (double) (record.getRecordDataValue().get(recordValueIndex)); + break; + case TEXT: + Binary[] sensorsText = (Binary[]) values[recordValueIndex]; + sensorsText[recordIndex] = + binaryCache.computeIfAbsent( + (String) record.getRecordDataValue().get(recordValueIndex), + BytesUtils::valueOf); + break; + default: + LOGGER.error("Unsupported Type: {}", sensors.get(sensorIndex).getSensorType()); + } + sensorIndex++; + } + } + return tablet; + } + + public List constructDataTypes(List sensors, int recordValueSize) { + List dataTypes = new ArrayList<>(); + for (int sensorIndex = 0; sensorIndex < recordValueSize; sensorIndex++) { + switch (sensors.get(sensorIndex).getSensorType()) { + case BOOLEAN: + dataTypes.add(TSDataType.BOOLEAN); + break; + case INT32: + dataTypes.add(TSDataType.INT32); + break; + case INT64: + dataTypes.add(TSDataType.INT64); + break; + case FLOAT: + dataTypes.add(TSDataType.FLOAT); + break; + case DOUBLE: + dataTypes.add(TSDataType.DOUBLE); + break; + case TEXT: + dataTypes.add(TSDataType.TEXT); + break; + } + } + return dataTypes; + } + + @Override + public Status insertOneBatch(IBatch batch) { + DBInsertMode insertMode = dbConfig.getDB_SWITCH().getInsertMode(); + switch (insertMode) { + case INSERT_USE_SESSION_TABLET: + return insertOneBatchByTablet(batch); + case INSERT_USE_SESSION_RECORD: + return insertOneBatchByRecord(batch); + case INSERT_USE_SESSION_RECORDS: + return insertOneBatchByRecords(batch); + default: + throw new IllegalStateException("Unexpected INSERT_MODE value: " + insertMode); + } + } + + Status waitWriteTaskToFinishAndGetStatus() { + try { + task.get(config.getWRITE_OPERATION_TIMEOUT_MS(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + task.cancel(true); + LOGGER.error("insertion failed", e); + return new Status(false, 0, e, e.toString()); + } + return new Status(true); + } +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/SingleNodeJDBCConnection.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/SingleNodeJDBCConnection.java new file mode 100644 index 000000000..697d1af4b --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/SingleNodeJDBCConnection.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import cn.edu.tsinghua.iot.benchmark.conf.Config; +import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class SingleNodeJDBCConnection { + + private static final Logger LOGGER = LoggerFactory.getLogger(SingleNodeJDBCConnection.class); + private static Config config = ConfigDescriptor.getInstance().getConfig(); + + protected static final String JDBC_URL = "jdbc:iotdb://%s:%s/"; + protected DBConfig dbConfig; + + private Connection[] connections; + private AtomicInteger currConnectionIndex = new AtomicInteger(0); + + public SingleNodeJDBCConnection(DBConfig dbConfig) { + this.dbConfig = dbConfig; + } + + public void init() throws TsdbException { + int nodeSize = 1; + String[] urls; + if (config.isIS_ALL_NODES_VISIBLE()) { + nodeSize = dbConfig.getHOST().size(); + urls = new String[nodeSize]; + List clusterHosts = dbConfig.getHOST(); + for (int i = 0; i < nodeSize; i++) { + String jdbcUrl = + String.format(JDBC_URL, dbConfig.getHOST().get(i), dbConfig.getPORT().get(i)); + urls[i] = jdbcUrl; + } + } else { + urls = new String[nodeSize]; + urls[0] = String.format(JDBC_URL, dbConfig.getHOST().get(0), dbConfig.getPORT().get(0)); + } + connections = new Connection[nodeSize]; + + for (int i = 0; i < connections.length; i++) { + try { + Class.forName("org.apache.iotdb.jdbc.IoTDBDriver"); + org.apache.iotdb.jdbc.Config.rpcThriftCompressionEnable = + config.isENABLE_THRIFT_COMPRESSION(); + connections[i] = + DriverManager.getConnection(urls[i], dbConfig.getUSERNAME(), dbConfig.getPASSWORD()); + } catch (Exception e) { + LOGGER.error("Initialize IoTDB failed because ", e); + throw new TsdbException(e); + } + } + } + + public void close() throws TsdbException { + for (Connection connection : connections) { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOGGER.error("Failed to close IoTDB connection because ", e); + throw new TsdbException(e); + } + } + } + } + + public Connection getConnection() { + return connections[currConnectionIndex.incrementAndGet() % connections.length]; + } +} diff --git a/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/TimeseriesSchema.java b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/TimeseriesSchema.java new file mode 100644 index 000000000..7d11f9e15 --- /dev/null +++ b/iotdb-1.3/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb130/TimeseriesSchema.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.iotdb130; + +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema; + +import java.util.List; + +public class TimeseriesSchema { + private DeviceSchema deviceSchema; + private List paths; + private List tsDataTypes; + private List tsEncodings; + private List compressionTypes; + private String deviceId; + + public TimeseriesSchema( + DeviceSchema deviceSchema, + List paths, + List tsDataTypes, + List tsEncodings, + List compressionTypes) { + this.deviceSchema = deviceSchema; + this.paths = paths; + this.tsDataTypes = tsDataTypes; + this.tsEncodings = tsEncodings; + this.compressionTypes = compressionTypes; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public DeviceSchema getDeviceSchema() { + return deviceSchema; + } + + public List getPaths() { + return paths; + } + + public List getTsDataTypes() { + return tsDataTypes; + } + + public List getTsEncodings() { + return tsEncodings; + } + + public List getCompressionTypes() { + return compressionTypes; + } + + public String getDeviceId() { + return deviceId; + } +} diff --git a/iotdb-1.3/src/main/test/cn/edu/tsinghua/iot/benchmark/IoTDB130TestEntrance.java b/iotdb-1.3/src/main/test/cn/edu/tsinghua/iot/benchmark/IoTDB130TestEntrance.java new file mode 100644 index 000000000..9afdbe425 --- /dev/null +++ b/iotdb-1.3/src/main/test/cn/edu/tsinghua/iot/benchmark/IoTDB130TestEntrance.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark; + +import java.sql.SQLException; + +public class IoTDB110TestEntrance { + public static void main(String[] args) throws SQLException { + App.main(args); + } +} diff --git a/pom.xml b/pom.xml index c4f492d28..08b1a6670 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,7 @@ timescaledb timescaledb-cluster victoriametrics + iotdb-1.3 iotdb-1.1 iotdb-1.0 iotdb-0.13