From f007e8a1407afffbcaffeca5a168164b4646297e Mon Sep 17 00:00:00 2001 From: Zhenyu Luo Date: Tue, 7 Jan 2025 22:47:41 +0800 Subject: [PATCH] Pipe: Support 'format'='tsfile' option in table model data sync sinks (#14108) Co-authored-by: Steve Yurong Su --- .../it/tablemodel/IoTDBPipeDataSinkIT.java | 429 +++++++++++++- .../it/tablemodel/IoTDBPipeNullValueIT.java | 4 +- .../pipe/it/tablemodel/TableModelUtils.java | 546 ++++++++++++++---- .../batch/PipeTabletEventTsFileBatch.java | 385 +++--------- .../request/PipeTransferTabletRawReq.java | 4 +- .../request/PipeTransferTabletRawReqV2.java | 9 +- .../protocol/opcua/OpcUaNameSpace.java | 8 +- .../async/IoTDBDataRegionAsyncConnector.java | 10 +- .../PipeTransferTabletBatchEventHandler.java | 2 +- .../sync/IoTDBDataRegionSyncConnector.java | 10 +- .../builder/PipeTableModeTsFileBuilder.java | 273 +++++++++ .../builder/PipeTreeModelTsFileBuilder.java | 268 +++++++++ .../util/builder/PipeTsFileBuilder.java | 162 ++++++ .../util/{ => cacher}/LeaderCacheUtils.java | 2 +- .../PipeTableModelTabletEventSorter.java | 271 +++++++++ .../{ => sorter}/PipeTabletEventSorter.java | 107 +--- .../PipeTreeModelTabletEventSorter.java | 121 ++++ .../SubscriptionPipeTsFileEventBatch.java | 11 +- .../connector/PipeTabletEventSorterTest.java | 210 ++++++- 19 files changed, 2289 insertions(+), 543 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/{ => cacher}/LeaderCacheUtils.java (97%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/{ => sorter}/PipeTabletEventSorter.java (56%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTreeModelTabletEventSorter.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java index cd144bd4b86c..bd0a94419c14 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java @@ -28,22 +28,28 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.write.record.Tablet; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Consumer; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2TableModel.class}) public class IoTDBPipeDataSinkIT extends AbstractPipeTableModelTestIT { + @Test public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -117,8 +123,6 @@ public void testSinkTabletFormat() throws Exception { testSinkFormat("tablet"); } - // table model not support - @Ignore @Test public void testSinkTsFileFormat() throws Exception { testSinkFormat("tsfile"); @@ -179,7 +183,7 @@ private void testSinkFormat(final String format) throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - TableModelUtils.insertData("test", "test", 100, 150, senderEnv, true); + TableModelUtils.insertData("test", "test", 50, 150, senderEnv, true); if (!TestUtils.tryExecuteNonQueriesWithRetry( senderEnv, @@ -187,6 +191,8 @@ private void testSinkFormat(final String format) throws Exception { return; } + TableModelUtils.assertCountData("test", "test", 150, receiverEnv); + TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.**", @@ -218,6 +224,9 @@ private void testSinkFormat(final String format) throws Exception { } TableModelUtils.insertData("test", "test", 150, 200, senderEnv, true); + TableModelUtils.insertTablet("test", "test", 200, 250, senderEnv, true); + TableModelUtils.insertTablet("test", "test", 250, 300, senderEnv, true); + TableModelUtils.insertTablet("test", "test", 300, 350, senderEnv, true); TestUtils.assertDataEventuallyOnEnv( receiverEnv, @@ -227,7 +236,7 @@ private void testSinkFormat(final String format) throws Exception { new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", "3,1.0,", "4,1.0,"))), handleFailure); - TableModelUtils.assertCountData("test", "test", 150, receiverEnv, handleFailure); + TableModelUtils.assertCountData("test", "test", 350, receiverEnv); } } @@ -294,4 +303,414 @@ public void testWriteBackSink() throws Exception { TableModelUtils.assertCountData("test1", "test", 100, receiverEnv, handleFailure); } } + + @Test + public void testSinkTsFileFormat2() throws Exception { + doTest(this::insertTablet1); + } + + @Test + public void testSinkTsFileFormat3() throws Exception { + doTest(this::insertTablet2); + } + + @Test + public void testSinkTsFileFormat4() throws Exception { + doTest(this::insertTablet3); + } + + @Test + public void testSinkTsFileFormat5() throws Exception { + doTest(this::insertTablet4); + } + + @Test + public void testSinkTsFileFormat6() throws Exception { + doTest(this::insertTablet5); + } + + @Test + public void testSinkTsFileFormat7() throws Exception { + doTest(this::insertTablet6); + } + + @Test + public void testSinkTsFileFormat8() throws Exception { + doTest(this::insertTablet7); + } + + @Test + public void testSinkTsFileFormat9() throws Exception { + doTest(this::insertTablet8); + } + + @Test + public void testSinkTsFileFormat10() throws Exception { + doTest(this::insertTablet9); + } + + private void doTest(BiConsumer>, Map>> consumer) + throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + final Consumer handleFailure = + o -> { + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + }; + + Map> testResult = new HashMap<>(); + Map> test1Result = new HashMap<>(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + for (int i = 0; i < 5; i++) { + TableModelUtils.createDataBaseAndTable(senderEnv, "test" + i, "test0"); + TableModelUtils.createDataBaseAndTable(senderEnv, "test" + i, "test1"); + } + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.realtime.mode", "forced-log"); + extractorAttributes.put("capture.table", "true"); + extractorAttributes.put("capture.tree", "true"); + extractorAttributes.put("extractor.database-name", "test.*"); + extractorAttributes.put("extractor.table-name", "test.*"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + connectorAttributes.put("connector.format", "tsfile"); + connectorAttributes.put("connector.realtime-first", "true"); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)) + .getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", "flush"))) { + return; + } + + consumer.accept(testResult, test1Result); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.vehicle.d0.s1,", + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,"))), + handleFailure); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (4, 1)", + "insert into root.vehicle.d0(time, s1) values (3, 1), (0, 1)", + "flush"))) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.vehicle.d0.s1,", + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", "3,1.0,", "4,1.0,"))), + handleFailure); + } + + for (Map.Entry> entry : testResult.entrySet()) { + final Set set = new HashSet<>(); + entry + .getValue() + .forEach( + tablet -> { + set.addAll(TableModelUtils.generateExpectedResults(tablet)); + }); + TableModelUtils.assertCountData( + "test0", entry.getKey(), set.size(), receiverEnv, handleFailure); + TableModelUtils.assertData("test0", entry.getKey(), set, receiverEnv, handleFailure); + } + + for (Map.Entry> entry : test1Result.entrySet()) { + final Set set = new HashSet<>(); + entry + .getValue() + .forEach( + tablet -> { + set.addAll(TableModelUtils.generateExpectedResults(tablet)); + }); + TableModelUtils.assertCountData( + "test1", entry.getKey(), set.size(), receiverEnv, handleFailure); + TableModelUtils.assertData("test1", entry.getKey(), set, receiverEnv, handleFailure); + } + } + + private void insertTablet1( + final Map> testResult, final Map> test1Result) { + + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 10; + + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, deviceIDStartIndex, deviceIDEndIndex, 0, 10, false, false); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + deviceIDStartIndex += 2; + deviceIDEndIndex += 2; + } + } + + private void insertTablet2( + final Map> testResult, final Map> test1Result) { + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 10; + + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, deviceIDStartIndex, deviceIDEndIndex, 10, false, false); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + deviceIDStartIndex += 2; + deviceIDEndIndex += 2; + } + } + + private void insertTablet3( + final Map> testResult, final Map> test1Result) { + final Random random = new Random(); + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 100; + + for (int j = 0; j < 25; j++) { + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + final String dataBaseName = "test" + j % 2; + deviceIDStartIndex = random.nextInt(1 << 16) - 10; + deviceIDEndIndex = deviceIDStartIndex + 10; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, + deviceIDStartIndex, + deviceIDEndIndex, + deviceIDStartIndex, + deviceIDEndIndex, + false, + true); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + } + } + + private void insertTablet4( + final Map> testResult, final Map> test1Result) { + final Random random = new Random(); + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 100; + + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + deviceIDStartIndex = random.nextInt(1 << 16) - 10; + deviceIDEndIndex = deviceIDStartIndex + 10; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, + deviceIDStartIndex, + deviceIDEndIndex, + deviceIDStartIndex, + deviceIDEndIndex, + false, + false); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + } + } + + private void insertTablet5( + final Map> testResult, final Map> test1Result) { + final Random random = new Random(); + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 100; + for (int j = 0; j < 25; j++) { + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + final String dataBaseName = "test" + j % 2; + deviceIDStartIndex = random.nextInt(1 << 16) - 10; + deviceIDEndIndex = deviceIDStartIndex + 10; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, 0, 10, deviceIDStartIndex, deviceIDEndIndex, false, true); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + } + } + + private void insertTablet6( + final Map> testResult, final Map> test1Result) { + final Random random = new Random(); + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 100; + + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + deviceIDStartIndex = random.nextInt(1 << 16) - 10; + deviceIDEndIndex = deviceIDStartIndex + 10; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, deviceIDStartIndex, deviceIDEndIndex, 100, 110, false, true); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + } + } + + private void insertTablet7( + final Map> testResult, final Map> test1Result) { + + final Random random = new Random(); + int deviceIDStartIndex = 0; + int deviceIDEndIndex = 100; + deviceIDStartIndex = random.nextInt(1 << 16) - 10; + deviceIDEndIndex = deviceIDStartIndex + 10; + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, deviceIDStartIndex, deviceIDEndIndex, 100, 110, false, true); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + deviceIDStartIndex += 2; + deviceIDEndIndex += 2; + } + } + + private void insertTablet8( + final Map> testResult, final Map> test1Result) { + final Random random = new Random(); + int deviceIDStartIndex = random.nextInt(1 << 16); + int deviceIDEndIndex = deviceIDStartIndex + 10; + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + Tablet tablet = + TableModelUtils.generateTablet( + tableName, 100, 110, deviceIDStartIndex, deviceIDEndIndex, false, true); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + deviceIDStartIndex += 2; + deviceIDEndIndex += 2; + } + } + + private void insertTablet9( + final Map> testResult, final Map> test1Result) { + final Random random = new Random(); + for (int j = 0; j < 25; j++) { + final String dataBaseName = "test" + j % 2; + for (int i = 0; i < 5; i++) { + final String tableName = "test" + i; + Tablet tablet = + TableModelUtils.generateTabletDeviceIDAllIsNull(tableName, 100, 110, 10, false); + TableModelUtils.insertTablet(dataBaseName, tablet, senderEnv); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map> map = j % 2 == 0 ? testResult : test1Result; + map.computeIfAbsent(tableName, k -> new ArrayList<>()).add(tablet); + } + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeNullValueIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeNullValueIT.java index c801cd25c622..aae3626ee531 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeNullValueIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeNullValueIT.java @@ -69,7 +69,7 @@ private void testInsertNullValueTemplate( TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test"); if (insertType == InsertType.SESSION_INSERT_TABLET) { - TableModelUtils.insertDataByTablet("test", "test", 0, 200, senderEnv, true); + TableModelUtils.insertTablet("test", "test", 0, 200, senderEnv, true); } else if (insertType == InsertType.SQL_INSERT) { TableModelUtils.insertData("test", "test", 0, 200, senderEnv, true); } @@ -96,7 +96,7 @@ private void testInsertNullValueTemplate( } if (insertType == InsertType.SESSION_INSERT_TABLET) { - TableModelUtils.insertDataByTablet("test", "test", 200, 400, senderEnv, true); + TableModelUtils.insertTablet("test", "test", 200, 400, senderEnv, true); } else if (insertType == InsertType.SQL_INSERT) { TableModelUtils.insertData("test", "test", 200, 400, senderEnv, true); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java index 0d74e2e89201..c7da1382535d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java @@ -64,21 +64,22 @@ */ public class TableModelUtils { - public static void createDataBaseAndTable(BaseEnv baseEnv, String table, String database) { + public static void createDataBaseAndTable( + final BaseEnv baseEnv, final String table, final String database) { try (Connection connection = baseEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { statement.execute("create database if not exists " + database); statement.execute("use " + database); statement.execute( - "CREATE TABLE " - + table - + "(s0 string tag, s1 int64 field, s2 float field, s3 string field, s4 timestamp field, s5 int32 field, s6 double field, s7 date field, s8 text field )"); + String.format( + "CREATE TABLE %s(s0 string tag, s1 string tag, s2 string tag, s3 string tag,s4 int64 field, s5 float field, s6 string field, s7 timestamp field, s8 int32 field, s9 double field, s10 date field, s11 text field )", + table)); } catch (Exception e) { fail(e.getMessage()); } } - public static void createDataBase(BaseEnv baseEnv, String database) { + public static void createDataBase(final BaseEnv baseEnv, final String database) { try (Connection connection = baseEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { statement.execute("create database if not exists " + database); @@ -88,101 +89,118 @@ public static void createDataBase(BaseEnv baseEnv, String database) { } public static boolean insertData( - String dataBaseName, String tableName, int start, int end, BaseEnv baseEnv) { + final String dataBaseName, + final String tableName, + final int start, + final int end, + final BaseEnv baseEnv) { List list = new ArrayList<>(end - start + 1); for (int i = start; i < end; ++i) { list.add( String.format( - "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, time) values ('t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", - tableName, i, i, i, i, i, i, i, getDateStr(i), i, i)); + "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10, s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", + tableName, i, i, i, i, i, i, i, i, i, i, getDateStr(i), i, i)); } list.add("flush"); - if (!TestUtils.tryExecuteNonQueriesWithRetry( - dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list)) { - return false; - } - return true; + return TestUtils.tryExecuteNonQueriesWithRetry( + dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list); } public static boolean insertData( - String dataBaseName, - String tableName, - int start, - int end, - BaseEnv baseEnv, - boolean allowNullValue) { + final String dataBaseName, + final String tableName, + final int start, + final int end, + final BaseEnv baseEnv, + final boolean allowNullValue) { List list = new ArrayList<>(end - start + 1); - Object[] values = new Object[9]; + Object[] values = new Object[12]; Random random = new Random(); - // s0 string, s1 int64, s2 float, s3 string, s4 timestamp, s5 int32, s6 double, s7 date, s8 text + // s0 string, s1 string, s2 string, s3 string, s4 int64, s5 float, s6 string s7 timestamp, s8 + // int32, s9 double, s10 date, s11 text for (int i = start; i < end; ++i) { Arrays.fill(values, i); values[0] = String.format("'t%s'", i); - values[2] = String.format("%s.0", i); - values[3] = String.format("%s", i); - values[6] = String.format("%s.0", i); - values[7] = String.format("'%s'", getDateStr(i)); - values[8] = String.format("'%s'", i); + values[1] = String.format("'t%s'", i); + values[2] = String.format("'t%s'", i); + values[3] = String.format("'t%s'", i); + values[4] = String.format("%s", i); + values[5] = String.format("%s.0", i); + values[6] = String.format("%s", i); + values[7] = String.format("%s", i); + values[8] = String.format("%s", i); + values[9] = String.format("%s.0", i); + values[10] = String.format("'%s'", getDateStr(i)); + values[11] = String.format("'%s'", i); if (allowNullValue) { values[random.nextInt(9)] = "null"; } list.add( String.format( - "insert into %s (s0, s1, s2, s3, s4, s5, s6, s7, s8, time) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - tableName, values[0], values[1], values[2], values[3], values[4], values[5], - values[6], values[7], values[8], i)); + "insert into %s (s0, s1, s2, s3, s4, s5, s6, s7, s8,s9, s10, s11, time) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + tableName, + values[0], + values[1], + values[2], + values[3], + values[4], + values[5], + values[6], + values[7], + values[8], + values[9], + values[10], + values[11], + i)); } - if (!TestUtils.tryExecuteNonQueriesWithRetry( - dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list)) { - return false; - } - return true; + return TestUtils.tryExecuteNonQueriesWithRetry( + dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list); } public static boolean insertDataNotThrowError( - String dataBaseName, String tableName, int start, int end, BaseEnv baseEnv) { + final String dataBaseName, + final String tableName, + final int start, + final int end, + final BaseEnv baseEnv) { List list = new ArrayList<>(end - start + 1); for (int i = start; i < end; ++i) { list.add( String.format( - "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, time) values ('t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", - tableName, i, i, i, i, i, i, i, getDateStr(i), i, i)); + "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10, s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", + tableName, i, i, i, i, i, i, i, i, i, i, getDateStr(i), i, i)); } return TestUtils.tryExecuteNonQueriesWithRetry( dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list); } public static boolean insertData( - String dataBaseName, - String tableName, - int start, - int end, - BaseEnv baseEnv, - DataNodeWrapper wrapper) { + final String dataBaseName, + final String tableName, + final int start, + final int end, + final BaseEnv baseEnv, + final DataNodeWrapper wrapper) { List list = new ArrayList<>(end - start + 1); for (int i = start; i < end; ++i) { list.add( String.format( - "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, time) values ('t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", - tableName, i, i, i, i, i, i, i, getDateStr(i), i, i)); + "insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10, s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)", + tableName, i, i, i, i, i, i, i, i, i, i, getDateStr(i), i, i)); } list.add("flush"); - if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry( - baseEnv, wrapper, list, dataBaseName, BaseEnv.TABLE_SQL_DIALECT)) { - return false; - } - - return true; + return TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry( + baseEnv, wrapper, list, dataBaseName, BaseEnv.TABLE_SQL_DIALECT); } - public static boolean insertDataByTablet( - String dataBaseName, - String tableName, - int start, - int end, - BaseEnv baseEnv, - boolean allowNullValue) { - final Tablet tablet = generateTablet(tableName, start, end, allowNullValue); + public static boolean insertTablet( + final String dataBaseName, + final String tableName, + final int start, + final int end, + final BaseEnv baseEnv, + final boolean allowNullValue) { + final Tablet tablet = generateTablet(tableName, start, end, allowNullValue, true); ITableSessionPool tableSessionPool = baseEnv.getTableSessionPool(1); try (final ITableSession session = tableSessionPool.getSession()) { session.executeNonQueryStatement("use " + dataBaseName); @@ -195,8 +213,26 @@ public static boolean insertDataByTablet( } } + public static boolean insertTablet( + final String dataBaseName, final Tablet tablet, final BaseEnv baseEnv) { + try (ITableSessionPool tableSessionPool = baseEnv.getTableSessionPool(20); + final ITableSession session = tableSessionPool.getSession()) { + session.executeNonQueryStatement("use " + dataBaseName); + session.insert(tablet); + session.executeNonQueryStatement("flush"); + return true; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + public static void deleteData( - String dataBaseName, String tableName, int start, int end, BaseEnv baseEnv) { + final String dataBaseName, + final String tableName, + final int start, + final int end, + final BaseEnv baseEnv) { List list = new ArrayList<>(end - start + 1); list.add( String.format("delete from %s where time >= %s and time <= %s", tableName, start, end)); @@ -206,19 +242,21 @@ public static void deleteData( } } - public static Set generateExpectedResults(int start, int end) { + // s0 string, s1 string, s2 string, s3 string, s4 int64, s5 float, s6 string s7 timestamp, s8 + // int32, s9 double, s10 date, s11 text + public static Set generateExpectedResults(final int start, final int end) { Set expectedResSet = new HashSet<>(); for (int i = start; i < end; ++i) { final String time = RpcUtils.formatDatetime("default", "ms", i, ZoneOffset.UTC); expectedResSet.add( String.format( - "t%d,%d,%d.0,%d,%s,%d,%d.0,%s,%s,%s,", - i, i, i, i, time, i, i, getDateStr(i), i, time)); + "t%s,t%s,t%s,t%s,%s,%s.0,%s,%s,%d,%d.0,%s,%s,%s,", + i, i, i, i, i, i, i, time, i, i, getDateStr(i), i, time)); } return expectedResSet; } - public static Set generateExpectedResults(Tablet tablet) { + public static Set generateExpectedResults(final Tablet tablet) { Set expectedResSet = new HashSet<>(); List schemas = tablet.getSchemas(); for (int i = 0; i < tablet.getRowSize(); i++) { @@ -260,12 +298,15 @@ public static Set generateExpectedResults(Tablet tablet) { case FLOAT: stringBuffer.append(((float[]) tablet.values[j])[i]); stringBuffer.append(","); + break; case INT32: stringBuffer.append(((int[]) tablet.values[j])[i]); stringBuffer.append(","); + break; case INT64: stringBuffer.append(((long[]) tablet.values[j])[i]); stringBuffer.append(","); + break; } } String time = RpcUtils.formatDatetime("default", "ms", tablet.timestamps[i], ZoneOffset.UTC); @@ -278,19 +319,23 @@ public static Set generateExpectedResults(Tablet tablet) { } public static String generateHeaderResults() { - return "s0,s3,s2,s1,s4,s5,s6,s7,s8,time,"; + return "s0,s1,s2,s3,s4,s5,s6,s7,s8,s9,s10,s11,time,"; } - public static String getQuerySql(String table) { - return "select s0,s3,s2,s1,s4,s5,s6,s7,s8,time from " + table; + public static String getQuerySql(final String table) { + return "select s0,s1,s2,s3,s4,s5,s6,s7,s8,s9,s10,s11,time from " + table; } - public static String getQueryCountSql(String table) { + public static String getQueryCountSql(final String table) { return "select count(*) from " + table; } public static void assertData( - String database, String table, int start, int end, BaseEnv baseEnv) { + final String database, + final String table, + final int start, + final int end, + final BaseEnv baseEnv) { TestUtils.assertDataEventuallyOnEnv( baseEnv, TableModelUtils.getQuerySql(table), @@ -300,12 +345,27 @@ public static void assertData( } public static void assertData( - String database, - String table, - int start, - int end, - BaseEnv baseEnv, - Consumer handleFailure) { + final String database, + final String table, + final Set expectedResults, + final BaseEnv baseEnv, + final Consumer handleFailure) { + TestUtils.assertDataEventuallyOnEnv( + baseEnv, + TableModelUtils.getQuerySql(table), + TableModelUtils.generateHeaderResults(), + expectedResults, + database, + handleFailure); + } + + public static void assertData( + final String database, + final String table, + final int start, + final int end, + final BaseEnv baseEnv, + final Consumer handleFailure) { TestUtils.assertDataEventuallyOnEnv( baseEnv, TableModelUtils.getQuerySql(table), @@ -315,7 +375,8 @@ public static void assertData( handleFailure); } - public static void assertData(String database, String table, Tablet tablet, BaseEnv baseEnv) { + public static void assertData( + final String database, final String table, final Tablet tablet, final BaseEnv baseEnv) { TestUtils.assertDataEventuallyOnEnv( baseEnv, TableModelUtils.getQuerySql(table), @@ -324,18 +385,23 @@ public static void assertData(String database, String table, Tablet tablet, Base database); } - public static boolean hasDataBase(String database, BaseEnv baseEnv) { + public static boolean hasDataBase(final String database, final BaseEnv baseEnv) { TestUtils.assertDataEventuallyOnEnv(baseEnv, "", "", Collections.emptySet(), database); return true; } - public static void assertCountData(String database, String table, int count, BaseEnv baseEnv) { + public static void assertCountData( + final String database, final String table, final int count, final BaseEnv baseEnv) { TestUtils.assertDataEventuallyOnEnv( baseEnv, getQueryCountSql(table), "_col0,", Collections.singleton(count + ","), database); } public static void assertCountData( - String database, String table, int count, BaseEnv baseEnv, Consumer handleFailure) { + final String database, + final String table, + final int count, + final BaseEnv baseEnv, + final Consumer handleFailure) { TestUtils.executeNonQueryWithRetry(baseEnv, "flush"); TestUtils.assertDataEventuallyOnEnv( baseEnv, @@ -346,7 +412,7 @@ public static void assertCountData( handleFailure); } - public static String getDateStr(int value) { + public static String getDateStr(final int value) { Date date = new Date(value); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); try { @@ -356,7 +422,7 @@ public static String getDateStr(int value) { } } - public static LocalDate getDate(int value) { + public static LocalDate getDate(final int value) { Date date = new Date(value); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); try { @@ -368,20 +434,30 @@ public static LocalDate getDate(int value) { } public static Tablet generateTablet( - String tableName, int start, int end, boolean allowNullValue) { + final String tableName, + final int start, + final int end, + final boolean allowNullValue, + final boolean allowNullDeviceColumn) { List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s0", TSDataType.STRING)); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.FLOAT)); + schemaList.add(new MeasurementSchema("s1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s2", TSDataType.STRING)); schemaList.add(new MeasurementSchema("s3", TSDataType.STRING)); - schemaList.add(new MeasurementSchema("s4", TSDataType.TIMESTAMP)); - schemaList.add(new MeasurementSchema("s5", TSDataType.INT32)); - schemaList.add(new MeasurementSchema("s6", TSDataType.DOUBLE)); - schemaList.add(new MeasurementSchema("s7", TSDataType.DATE)); - schemaList.add(new MeasurementSchema("s8", TSDataType.TEXT)); + schemaList.add(new MeasurementSchema("s4", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s5", TSDataType.FLOAT)); + schemaList.add(new MeasurementSchema("s6", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s7", TSDataType.TIMESTAMP)); + schemaList.add(new MeasurementSchema("s8", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s9", TSDataType.DOUBLE)); + schemaList.add(new MeasurementSchema("s10", TSDataType.DATE)); + schemaList.add(new MeasurementSchema("s11", TSDataType.TEXT)); final List columnTypes = Arrays.asList( + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, Tablet.ColumnCategory.TAG, Tablet.ColumnCategory.FIELD, Tablet.ColumnCategory.FIELD, @@ -400,28 +476,298 @@ public static Tablet generateTablet( end - start); tablet.initBitMaps(); Random random = new Random(); + int nullDeviceIndex = allowNullDeviceColumn ? random.nextInt(4) : 4; - // s2 float, s3 string, s4 timestamp, s5 int32, s6 double, s7 date, s8 text for (long row = 0; row < end - start; row++) { - int randomNumber = allowNullValue ? random.nextInt(9) : 9; + int randomNumber = allowNullValue ? random.nextInt(12) : 12; long value = start + row; int rowIndex = tablet.getRowSize(); tablet.addTimestamp(rowIndex, value); tablet.addValue( - "s0", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); - tablet.addValue("s1", rowIndex, value); - tablet.addValue("s2", rowIndex, (value * 1.0f)); + "s0", rowIndex, new Binary(String.format("t%s", value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s1", rowIndex, new Binary(String.format("t%s", value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s2", rowIndex, new Binary(String.format("t%s", value).getBytes(StandardCharsets.UTF_8))); tablet.addValue( - "s3", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + "s3", rowIndex, new Binary(String.format("t%s", value).getBytes(StandardCharsets.UTF_8))); tablet.addValue("s4", rowIndex, value); - tablet.addValue("s5", rowIndex, (int) value); - tablet.addValue("s6", rowIndex, value * 0.1); - tablet.addValue("s7", rowIndex, getDate((int) value)); + tablet.addValue("s5", rowIndex, (value * 1.0f)); tablet.addValue( - "s8", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); - if (randomNumber < 9) { + "s6", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s7", rowIndex, value); + tablet.addValue("s8", rowIndex, (int) value); + tablet.addValue("s9", rowIndex, value * 0.1); + tablet.addValue("s10", rowIndex, getDate((int) value)); + tablet.addValue( + "s11", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + if (randomNumber < 11) { tablet.addValue("s" + randomNumber, rowIndex, null); } + if (nullDeviceIndex < 4) { + tablet.addValue("s" + nullDeviceIndex, rowIndex, null); + } + tablet.setRowSize(rowIndex + 1); + } + + return tablet; + } + + public static Tablet generateTablet( + final String tableName, + final int deviceStartIndex, + final int deviceEndIndex, + final int start, + final int end, + final boolean allowNullValue, + final boolean allowNullDeviceColumn) { + List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s0", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s2", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s3", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s4", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s5", TSDataType.FLOAT)); + schemaList.add(new MeasurementSchema("s6", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s7", TSDataType.TIMESTAMP)); + schemaList.add(new MeasurementSchema("s8", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s9", TSDataType.DOUBLE)); + schemaList.add(new MeasurementSchema("s10", TSDataType.DATE)); + schemaList.add(new MeasurementSchema("s11", TSDataType.TEXT)); + + final List columnTypes = + Arrays.asList( + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD); + Tablet tablet = + new Tablet( + tableName, + IMeasurementSchema.getMeasurementNameList(schemaList), + IMeasurementSchema.getDataTypeList(schemaList), + columnTypes, + (deviceEndIndex - deviceStartIndex) * (end - start)); + tablet.initBitMaps(); + final Random random = new Random(); + int nullDeviceIndex = allowNullDeviceColumn ? random.nextInt(4) : 4; + + for (int deviceIndex = deviceStartIndex; deviceIndex < deviceEndIndex; deviceIndex++) { + for (long row = start; row < end; row++) { + int randomNumber = allowNullValue ? random.nextInt(12) : 12; + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, row); + tablet.addValue( + "s0", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s1", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s2", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s3", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s4", rowIndex, row); + tablet.addValue("s5", rowIndex, (row * 1.0f)); + tablet.addValue( + "s6", rowIndex, new Binary(String.valueOf(row).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s7", rowIndex, row); + tablet.addValue("s8", rowIndex, (int) row); + tablet.addValue("s9", rowIndex, row * 0.1); + tablet.addValue("s10", rowIndex, getDate((int) row)); + tablet.addValue( + "s11", rowIndex, new Binary(String.valueOf(row).getBytes(StandardCharsets.UTF_8))); + if (randomNumber < 12) { + tablet.addValue("s" + randomNumber, rowIndex, null); + } + if (nullDeviceIndex < 4) { + tablet.addValue("s" + nullDeviceIndex, rowIndex, null); + } + tablet.setRowSize(rowIndex + 1); + } + } + + return tablet; + } + + public static Tablet generateTablet( + final String tableName, + final int deviceStartIndex, + final int deviceEndIndex, + final int deviceDataSize, + final boolean allowNullValue, + final boolean allowNullDeviceColumn) { + List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s0", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s2", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s3", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s4", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s5", TSDataType.FLOAT)); + schemaList.add(new MeasurementSchema("s6", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s7", TSDataType.TIMESTAMP)); + schemaList.add(new MeasurementSchema("s8", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s9", TSDataType.DOUBLE)); + schemaList.add(new MeasurementSchema("s10", TSDataType.DATE)); + schemaList.add(new MeasurementSchema("s11", TSDataType.TEXT)); + + final List columnTypes = + Arrays.asList( + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD); + Tablet tablet = + new Tablet( + tableName, + IMeasurementSchema.getMeasurementNameList(schemaList), + IMeasurementSchema.getDataTypeList(schemaList), + columnTypes, + (deviceEndIndex - deviceStartIndex) * deviceDataSize); + tablet.initBitMaps(); + final Random random = new Random(); + int nullDeviceIndex = allowNullDeviceColumn ? random.nextInt(4) : 4; + + for (int deviceIndex = deviceStartIndex; deviceIndex < deviceEndIndex; deviceIndex++) { + // s2 float, s3 string, s4 timestamp, s5 int32, s6 double, s7 date, s8 text + long value = random.nextInt(1 << 16); + for (long row = 0; row < deviceDataSize; row++) { + int randomNumber = allowNullValue ? random.nextInt(12) : 12; + int rowIndex = tablet.getRowSize(); + value += random.nextInt(100); + tablet.addTimestamp(rowIndex, value); + tablet.addValue( + "s0", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s1", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s2", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "s3", + rowIndex, + new Binary(String.format("t%s", deviceIndex).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s4", rowIndex, value); + tablet.addValue("s5", rowIndex, (value * 1.0f)); + tablet.addValue( + "s6", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s7", rowIndex, value); + tablet.addValue("s8", rowIndex, (int) value); + tablet.addValue("s9", rowIndex, value * 0.1); + tablet.addValue("s10", rowIndex, getDate((int) value)); + tablet.addValue( + "s11", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + if (randomNumber < 12) { + tablet.addValue("s" + randomNumber, rowIndex, null); + } + if (nullDeviceIndex < 4) { + tablet.addValue("s" + nullDeviceIndex, rowIndex, null); + } + tablet.setRowSize(rowIndex + 1); + } + } + + return tablet; + } + + public static Tablet generateTabletDeviceIDAllIsNull( + final String tableName, + final int deviceStartIndex, + final int deviceEndIndex, + final int deviceDataSize, + final boolean allowNullValue) { + List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s0", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s2", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s3", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s4", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s5", TSDataType.FLOAT)); + schemaList.add(new MeasurementSchema("s6", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s7", TSDataType.TIMESTAMP)); + schemaList.add(new MeasurementSchema("s8", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s9", TSDataType.DOUBLE)); + schemaList.add(new MeasurementSchema("s10", TSDataType.DATE)); + schemaList.add(new MeasurementSchema("s11", TSDataType.TEXT)); + + final List columnTypes = + Arrays.asList( + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD); + Tablet tablet = + new Tablet( + tableName, + IMeasurementSchema.getMeasurementNameList(schemaList), + IMeasurementSchema.getDataTypeList(schemaList), + columnTypes, + (deviceEndIndex - deviceStartIndex) * deviceDataSize); + tablet.initBitMaps(); + final Random random = new Random(); + + for (int deviceIndex = deviceStartIndex; deviceIndex < deviceEndIndex; deviceIndex++) { + // s2 float, s3 string, s4 timestamp, s5 int32, s6 double, s7 date, s8 text + long value = random.nextInt(1 << 16); + for (long row = 0; row < deviceDataSize; row++) { + int randomNumber = allowNullValue ? random.nextInt(12) : 12; + int rowIndex = tablet.getRowSize(); + value += random.nextInt(100); + tablet.addTimestamp(rowIndex, value); + tablet.addValue("s0", rowIndex, null); + tablet.addValue("s1", rowIndex, null); + tablet.addValue("s2", rowIndex, null); + tablet.addValue("s3", rowIndex, null); + tablet.addValue("s4", rowIndex, value); + tablet.addValue("s5", rowIndex, (value * 1.0f)); + tablet.addValue( + "s6", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s7", rowIndex, value); + tablet.addValue("s8", rowIndex, (int) value); + tablet.addValue("s9", rowIndex, value * 0.1); + tablet.addValue("s10", rowIndex, getDate((int) value)); + tablet.addValue( + "s11", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + if (randomNumber < 12) { + tablet.addValue("s" + randomNumber, rowIndex, null); + } + tablet.setRowSize(rowIndex + 1); + } } return tablet; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 02a82e598c98..1162961f567e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -19,114 +19,54 @@ package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModeTsFileBuilder; +import org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilder; +import org.apache.iotdb.db.pipe.connector.util.builder.PipeTsFileBuilder; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.commons.io.FileUtils; -import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.exception.write.WriteProcessException; -import org.apache.tsfile.read.common.Path; import org.apache.tsfile.utils.Pair; -import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class); - private static final AtomicReference FOLDER_MANAGER = new AtomicReference<>(); private static final AtomicLong BATCH_ID_GENERATOR = new AtomicLong(0); private final AtomicLong currentBatchId = new AtomicLong(BATCH_ID_GENERATOR.incrementAndGet()); - private final File batchFileBaseDir; - - private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch - private final AtomicLong tsFileIdGenerator = new AtomicLong(0); private final long maxSizeInBytes; - private final Map, Double> pipeName2WeightMap = new HashMap<>(); - - private final List tabletList = new ArrayList<>(); - private final List isTabletAlignedList = new ArrayList<>(); + private final PipeTsFileBuilder treeModeTsFileBuilder; + private final PipeTsFileBuilder tableModeTsFileBuilder; - @SuppressWarnings("java:S3077") - private volatile TsFileWriter fileWriter; + private final Map, Double> pipeName2WeightMap = new HashMap<>(); public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { super(maxDelayInMs); this.maxSizeInBytes = requestMaxBatchSizeInBytes; - try { - this.batchFileBaseDir = getNextBaseDir(); - } catch (final Exception e) { - throw new PipeException( - String.format("Failed to create file dir for batch: %s", e.getMessage())); - } - } - - private File getNextBaseDir() throws DiskSpaceInsufficientException { - if (FOLDER_MANAGER.get() == null) { - synchronized (FOLDER_MANAGER) { - if (FOLDER_MANAGER.get() == null) { - FOLDER_MANAGER.set( - new FolderManager( - Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs()) - .map(fileDir -> fileDir + File.separator + ".batch") - .collect(Collectors.toList()), - DirectoryStrategyType.SEQUENCE_STRATEGY)); - } - } - } - final File baseDir = - new File(FOLDER_MANAGER.get().getNextFolder(), Long.toString(currentBatchId.get())); - if (baseDir.exists()) { - FileUtils.deleteQuietly(baseDir); - } - if (!baseDir.exists() && !baseDir.mkdirs()) { - LOGGER.warn( - "Batch id = {}: Failed to create batch file dir {}.", - currentBatchId.get(), - baseDir.getPath()); - throw new PipeException( - String.format( - "Failed to create batch file dir %s. (Batch id = %s)", - baseDir.getPath(), currentBatchId.get())); - } - LOGGER.info( - "Batch id = {}: Create batch dir successfully, batch file dir = {}.", - currentBatchId.get(), - baseDir.getPath()); - return baseDir; + final AtomicLong tsFileIdGenerator = new AtomicLong(0); + treeModeTsFileBuilder = new PipeTreeModelTsFileBuilder(currentBatchId, tsFileIdGenerator); + tableModeTsFileBuilder = new PipeTableModeTsFileBuilder(currentBatchId, tsFileIdGenerator); } @Override @@ -134,18 +74,28 @@ protected boolean constructBatch(final TabletInsertionEvent event) { if (event instanceof PipeInsertNodeTabletInsertionEvent) { final PipeInsertNodeTabletInsertionEvent insertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) event; - // TODO: for table model insertion, we need to get the database name + final boolean isTableModel = insertNodeTabletInsertionEvent.isTableModelEvent(); final List tablets = insertNodeTabletInsertionEvent.convertToTablets(); for (int i = 0; i < tablets.size(); ++i) { final Tablet tablet = tablets.get(i); if (tablet.getRowSize() == 0) { continue; } - bufferTablet( - insertNodeTabletInsertionEvent.getPipeName(), - insertNodeTabletInsertionEvent.getCreationTime(), - tablet, - insertNodeTabletInsertionEvent.isAligned(i)); + if (isTableModel) { + // table Model + bufferTableModelTablet( + insertNodeTabletInsertionEvent.getPipeName(), + insertNodeTabletInsertionEvent.getCreationTime(), + tablet, + insertNodeTabletInsertionEvent.getTableModelDatabaseName()); + } else { + // tree Model + bufferTreeModelTablet( + insertNodeTabletInsertionEvent.getPipeName(), + insertNodeTabletInsertionEvent.getCreationTime(), + tablet, + insertNodeTabletInsertionEvent.isAligned(i)); + } } } else if (event instanceof PipeRawTabletInsertionEvent) { final PipeRawTabletInsertionEvent rawTabletInsertionEvent = @@ -154,11 +104,21 @@ protected boolean constructBatch(final TabletInsertionEvent event) { if (tablet.getRowSize() == 0) { return true; } - bufferTablet( - rawTabletInsertionEvent.getPipeName(), - rawTabletInsertionEvent.getCreationTime(), - tablet, - rawTabletInsertionEvent.isAligned()); + if (rawTabletInsertionEvent.isTableModelEvent()) { + // table Model + bufferTableModelTablet( + rawTabletInsertionEvent.getPipeName(), + rawTabletInsertionEvent.getCreationTime(), + tablet, + rawTabletInsertionEvent.getTableModelDatabaseName()); + } else { + // tree Model + bufferTreeModelTablet( + rawTabletInsertionEvent.getPipeName(), + rawTabletInsertionEvent.getCreationTime(), + tablet, + rawTabletInsertionEvent.isAligned()); + } } else { LOGGER.warn( "Batch id = {}: Unsupported event {} type {} when constructing tsfile batch", @@ -169,12 +129,25 @@ protected boolean constructBatch(final TabletInsertionEvent event) { return true; } - private void bufferTablet( + private void bufferTreeModelTablet( final String pipeName, final long creationTime, final Tablet tablet, final boolean isAligned) { - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + + totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet); + + pipeName2WeightMap.compute( + new Pair<>(pipeName, creationTime), + (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); + + treeModeTsFileBuilder.bufferTreeModelTablet(tablet, isAligned); + } + + private void bufferTableModelTablet( + final String pipeName, final long creationTime, final Tablet tablet, final String dataBase) { + new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp(); totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet); @@ -182,8 +155,7 @@ private void bufferTablet( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); - tabletList.add(tablet); - isTabletAlignedList.add(isAligned); + tableModeTsFileBuilder.bufferTableModelTablet(dataBase, tablet); } public Map, Double> deepCopyPipe2WeightMap() { @@ -195,193 +167,28 @@ public Map, Double> deepCopyPipe2WeightMap() { return new HashMap<>(pipeName2WeightMap); } - public synchronized List sealTsFiles() throws IOException, WriteProcessException { - return isClosed ? Collections.emptyList() : writeTabletsToTsFiles(); - } - - private List writeTabletsToTsFiles() throws IOException, WriteProcessException { - final Map> device2Tablets = new HashMap<>(); - final Map device2Aligned = new HashMap<>(); - - // Sort the tablets by device id - for (int i = 0, size = tabletList.size(); i < size; ++i) { - final Tablet tablet = tabletList.get(i); - final String deviceId = tablet.getDeviceId(); - device2Tablets.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(tablet); - device2Aligned.put(deviceId, isTabletAlignedList.get(i)); - } - - // Sort the tablets by start time in each device - for (final List tablets : device2Tablets.values()) { - tablets.sort( - // Each tablet has at least one timestamp - Comparator.comparingLong(tablet -> tablet.timestamps[0])); - } - - // Sort the devices by device id - final List devices = new ArrayList<>(device2Tablets.keySet()); - devices.sort(Comparator.naturalOrder()); - - // Replace ArrayList with LinkedList to improve performance - final LinkedHashMap> device2TabletsLinkedList = - new LinkedHashMap<>(); - for (final String device : devices) { - device2TabletsLinkedList.put(device, new LinkedList<>(device2Tablets.get(device))); + /** + * Converts a Tablet to a TSFile and returns the generated TSFile along with its corresponding + * database name. + * + * @return a list of pairs containing the database name and the generated TSFile + * @throws IOException if an I/O error occurs during the conversion process + * @throws WriteProcessException if an error occurs during the write process + */ + public synchronized List> sealTsFiles() + throws IOException, WriteProcessException { + if (isClosed) { + return Collections.emptyList(); } - // Help GC - devices.clear(); - device2Tablets.clear(); - - // Write the tablets to the tsfile device by device, and the tablets - // in the same device are written in order of start time. Tablets in - // the same device should not be written if their time ranges overlap. - // If overlapped, we try to write the tablets whose device id is not - // the same as the previous one. For the tablets not written in the - // previous round, we write them in a new tsfile. - final List sealedFiles = new ArrayList<>(); - - // Try making the tsfile size as large as possible - while (!device2TabletsLinkedList.isEmpty()) { - if (Objects.isNull(fileWriter)) { - fileWriter = - new TsFileWriter( - new File( - batchFileBaseDir, - TS_FILE_PREFIX - + "_" - + IoTDBDescriptor.getInstance().getConfig().getDataNodeId() - + "_" - + currentBatchId.get() - + "_" - + tsFileIdGenerator.getAndIncrement() - + TsFileConstant.TSFILE_SUFFIX)); - } - - try { - tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, device2Aligned); - } catch (final Exception e) { - LOGGER.warn( - "Batch id = {}: Failed to write tablets into tsfile, because {}", - currentBatchId.get(), - e.getMessage(), - e); - - try { - fileWriter.close(); - } catch (final Exception closeException) { - LOGGER.warn( - "Batch id = {}: Failed to close the tsfile {} after failed to write tablets into, because {}", - currentBatchId.get(), - fileWriter.getIOWriter().getFile().getPath(), - closeException.getMessage(), - closeException); - } finally { - // Add current writing file to the list and delete the file - sealedFiles.add(fileWriter.getIOWriter().getFile()); - } - - for (final File sealedFile : sealedFiles) { - final boolean deleteSuccess = FileUtils.deleteQuietly(sealedFile); - LOGGER.warn( - "Batch id = {}: {} delete the tsfile {} after failed to write tablets into {}. {}", - currentBatchId.get(), - deleteSuccess ? "Successfully" : "Failed to", - sealedFile.getPath(), - fileWriter.getIOWriter().getFile().getPath(), - deleteSuccess ? "" : "Maybe the tsfile needs to be deleted manually."); - } - sealedFiles.clear(); - - fileWriter = null; - - throw e; - } - - fileWriter.close(); - final File sealedFile = fileWriter.getIOWriter().getFile(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Batch id = {}: Seal tsfile {} successfully.", - currentBatchId.get(), - sealedFile.getPath()); - } - sealedFiles.add(sealedFile); - fileWriter = null; + final List> list = new ArrayList<>(); + if (!treeModeTsFileBuilder.isEmpty()) { + list.addAll(treeModeTsFileBuilder.convertTabletToTsFileWithDBInfo()); } - - return sealedFiles; - } - - private void tryBestToWriteTabletsIntoOneFile( - final LinkedHashMap> device2TabletsLinkedList, - final Map device2Aligned) - throws IOException, WriteProcessException { - final Iterator>> iterator = - device2TabletsLinkedList.entrySet().iterator(); - - while (iterator.hasNext()) { - final Map.Entry> entry = iterator.next(); - final String deviceId = entry.getKey(); - final LinkedList tablets = entry.getValue(); - - final List tabletsToWrite = new ArrayList<>(); - - Tablet lastTablet = null; - while (!tablets.isEmpty()) { - final Tablet tablet = tablets.peekFirst(); - if (Objects.isNull(lastTablet) - // lastTablet.rowSize is not 0 - || lastTablet.timestamps[lastTablet.getRowSize() - 1] < tablet.timestamps[0]) { - tabletsToWrite.add(tablet); - lastTablet = tablet; - tablets.pollFirst(); - } else { - break; - } - } - - if (tablets.isEmpty()) { - iterator.remove(); - } - - final boolean isAligned = device2Aligned.get(deviceId); - if (isAligned) { - final Map> deviceId2MeasurementSchemas = new HashMap<>(); - tabletsToWrite.forEach( - tablet -> - deviceId2MeasurementSchemas.compute( - tablet.getDeviceId(), - (k, v) -> { - if (Objects.isNull(v)) { - return new ArrayList<>(tablet.getSchemas()); - } - v.addAll(tablet.getSchemas()); - return v; - })); - for (final Entry> deviceIdWithMeasurementSchemas : - deviceId2MeasurementSchemas.entrySet()) { - fileWriter.registerAlignedTimeseries( - new Path(deviceIdWithMeasurementSchemas.getKey()), - deviceIdWithMeasurementSchemas.getValue()); - } - for (final Tablet tablet : tabletsToWrite) { - fileWriter.writeAligned(tablet); - } - } else { - for (final Tablet tablet : tabletsToWrite) { - for (final IMeasurementSchema schema : tablet.getSchemas()) { - try { - fileWriter.registerTimeseries(new Path(tablet.getDeviceId()), schema); - } catch (final WriteProcessException ignore) { - // Do nothing if the timeSeries has been registered - } - } - - fileWriter.writeTree(tablet); - } - } + if (!tableModeTsFileBuilder.isEmpty()) { + list.addAll(tableModeTsFileBuilder.convertTabletToTsFileWithDBInfo()); } + return list; } @Override @@ -394,13 +201,8 @@ public synchronized void onSuccess() { super.onSuccess(); pipeName2WeightMap.clear(); - - tabletList.clear(); - isTabletAlignedList.clear(); - - // We don't need to delete the tsFile here, because the tsFile - // will be deleted after the file is transferred. - fileWriter = null; + tableModeTsFileBuilder.onSuccess(); + treeModeTsFileBuilder.onSuccess(); } @Override @@ -408,34 +210,7 @@ public synchronized void close() { super.close(); pipeName2WeightMap.clear(); - - tabletList.clear(); - isTabletAlignedList.clear(); - - if (Objects.nonNull(fileWriter)) { - try { - fileWriter.close(); - } catch (final Exception e) { - LOGGER.info( - "Batch id = {}: Failed to close the tsfile {} when trying to close batch, because {}", - currentBatchId.get(), - fileWriter.getIOWriter().getFile().getPath(), - e.getMessage(), - e); - } - - try { - FileUtils.delete(fileWriter.getIOWriter().getFile()); - } catch (final Exception e) { - LOGGER.info( - "Batch id = {}: Failed to delete the tsfile {} when trying to close batch, because {}", - currentBatchId.get(), - fileWriter.getIOWriter().getFile().getPath(), - e.getMessage(), - e); - } - - fileWriter = null; - } + tableModeTsFileBuilder.close(); + treeModeTsFileBuilder.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java index 7c5ef90299d0..bf27b697698c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.PathUtils; -import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -60,7 +60,7 @@ public boolean getIsAligned() { } public InsertTabletStatement constructStatement() { - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); try { if (isTabletEmpty(tablet)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java index c0ae65e0628e..fd435ded99ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java @@ -23,7 +23,8 @@ import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.PathUtils; -import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -57,7 +58,11 @@ public String getDataBaseName() { @Override public InsertTabletStatement constructStatement() { - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + if (Objects.isNull(dataBaseName)) { + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + } else { + new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByTimestampIfNecessary(); + } try { if (isTabletEmpty(tablet)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java index 270188005ce4..834a14f94df8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java @@ -22,7 +22,8 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.utils.PathUtils; -import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.pipe.api.event.Event; @@ -116,7 +117,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole final List schemas = tablet.getSchemas(); final List newSchemas = new ArrayList<>(); if (!isTableModel) { - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); final List timestamps = new ArrayList<>(); final List values = new ArrayList<>(); @@ -137,6 +138,8 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole transferTabletRowForClientServerModel( tablet.getDeviceId().split("\\."), newSchemas, timestamps, values); } else { + new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByTimestampIfNecessary(); + final List columnIndexes = new ArrayList<>(); for (int i = 0; i < schemas.size(); ++i) { if (tablet.getColumnTypes().get(i) == Tablet.ColumnCategory.FIELD) { @@ -144,6 +147,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole newSchemas.add(schemas.get(i)); } } + for (int i = 0; i < tablet.getRowSize(); ++i) { final Object[] segments = tablet.getDeviceID(i).getSegments(); final String[] folderSegments = new String[segments.length + 2]; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 4eb24322e83c..1dfebff416c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -187,13 +187,13 @@ private void transferInBatchWithoutCheck( new PipeTransferTabletBatchEventHandler((PipeTabletEventPlainBatch) batch, this)); } else if (batch instanceof PipeTabletEventTsFileBatch) { final PipeTabletEventTsFileBatch tsFileBatch = (PipeTabletEventTsFileBatch) batch; - final List sealedFiles = tsFileBatch.sealTsFiles(); + final List> dbTsFilePairs = tsFileBatch.sealTsFiles(); final Map, Double> pipe2WeightMap = tsFileBatch.deepCopyPipe2WeightMap(); final List events = tsFileBatch.deepCopyEvents(); - final AtomicInteger eventsReferenceCount = new AtomicInteger(sealedFiles.size()); + final AtomicInteger eventsReferenceCount = new AtomicInteger(dbTsFilePairs.size()); final AtomicBoolean eventsHadBeenAddedToRetryQueue = new AtomicBoolean(false); - for (final File sealedFile : sealedFiles) { + for (final Pair sealedFile : dbTsFilePairs) { transfer( new PipeTransferTsFileHandler( this, @@ -201,10 +201,10 @@ private void transferInBatchWithoutCheck( events, eventsReferenceCount, eventsHadBeenAddedToRetryQueue, - sealedFile, + sealedFile.right, null, false, - null)); + sealedFile.left)); } } else { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 8370dd667ab3..cc03af180f9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; -import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils; +import org.apache.iotdb.db.pipe.connector.util.cacher.LeaderCacheUtils; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index 14f2ff5fd070..93ce77974b90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -35,7 +35,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq; -import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils; +import org.apache.iotdb.db.pipe.connector.util.cacher.LeaderCacheUtils; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -306,13 +306,13 @@ private void doTransfer( private void doTransfer(final PipeTabletEventTsFileBatch batchToTransfer) throws IOException, WriteProcessException { - final List sealedFiles = batchToTransfer.sealTsFiles(); + final List> dbTsFilePairs = batchToTransfer.sealTsFiles(); final Map, Double> pipe2WeightMap = batchToTransfer.deepCopyPipe2WeightMap(); - for (final File tsFile : sealedFiles) { - doTransfer(pipe2WeightMap, tsFile, null, null); + for (final Pair tsFile : dbTsFilePairs) { + doTransfer(pipe2WeightMap, tsFile.right, null, tsFile.left); try { - FileUtils.delete(tsFile); + FileUtils.delete(tsFile.right); } catch (final NoSuchFileException e) { LOGGER.info("The file {} is not found, may already be deleted.", tsFile); } catch (final Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java new file mode 100644 index 000000000000..6ac5ac911bfa --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.util.builder; + +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.WriteUtils; +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTableModeTsFileBuilder.class); + + private final Map> dataBase2TabletList = new HashMap<>(); + + public PipeTableModeTsFileBuilder(AtomicLong currentBatchId, AtomicLong tsFileIdGenerator) { + super(currentBatchId, tsFileIdGenerator); + } + + @Override + public void bufferTableModelTablet(String dataBase, Tablet tablet) { + dataBase2TabletList.computeIfAbsent(dataBase, db -> new ArrayList<>()).add(tablet); + } + + @Override + public void bufferTreeModelTablet(Tablet tablet, Boolean isAligned) { + throw new UnsupportedOperationException( + "PipeTableModeTsFileBuilder does not support tree model tablet to build TSFile"); + } + + @Override + public List> convertTabletToTsFileWithDBInfo() throws IOException { + if (dataBase2TabletList.isEmpty()) { + return new ArrayList<>(0); + } + List> pairList = new ArrayList<>(); + for (Map.Entry> entry : dataBase2TabletList.entrySet()) { + final LinkedHashSet>>>> linkedHashSet = + new LinkedHashSet<>(); + pairList.addAll( + writeTableModelTabletsToTsFiles(entry.getValue(), entry.getKey(), linkedHashSet)); + } + return pairList; + } + + @Override + public boolean isEmpty() { + return dataBase2TabletList.isEmpty(); + } + + @Override + public synchronized void onSuccess() { + super.onSuccess(); + dataBase2TabletList.clear(); + } + + @Override + public synchronized void close() { + super.close(); + dataBase2TabletList.clear(); + } + + private >>> + List> writeTableModelTabletsToTsFiles( + final List tabletList, + final String dataBase, + LinkedHashSet> linkedHashSet) + throws IOException { + + final Map> tableName2Tablets = new HashMap<>(); + + // Sort the tablets by dataBaseName + for (final Tablet tablet : tabletList) { + tableName2Tablets + .computeIfAbsent(tablet.getTableName(), k -> new ArrayList<>()) + .add((T) new Pair<>(tablet, WriteUtils.splitTabletByDevice(tablet))); + } + + // Replace ArrayList with LinkedList to improve performance + final LinkedHashSet> table2Tablets = new LinkedHashSet<>(); + + // Sort the tablets by start time in first device + for (final List tablets : tableName2Tablets.values()) { + tablets.sort( + (o1, o2) -> { + final IDeviceID deviceID = o1.right.get(0).left; + final int result; + if ((result = deviceID.compareTo(o2.right.get(0).left)) == 0) { + return Long.compare(o1.left.timestamps[0], o2.left.timestamps[0]); + } + return result; + }); + } + + // Sort the tables by table name + tableName2Tablets.entrySet().stream() + .sorted(Map.Entry.comparingByKey(Comparator.naturalOrder())) + .forEach(entry -> linkedHashSet.add(new LinkedList<>(entry.getValue()))); + + // Help GC + tableName2Tablets.clear(); + + final List> sealedFiles = new ArrayList<>(); + + // Try making the tsfile size as large as possible + while (!linkedHashSet.isEmpty()) { + if (Objects.isNull(fileWriter)) { + createFileWriter(); + } + + try { + tryBestToWriteTabletsIntoOneFile(linkedHashSet); + } catch (final Exception e) { + LOGGER.warn( + "Batch id = {}: Failed to write tablets into tsfile, because {}", + currentBatchId.get(), + e.getMessage(), + e); + + try { + fileWriter.close(); + } catch (final Exception closeException) { + LOGGER.warn( + "Batch id = {}: Failed to close the tsfile {} after failed to write tablets into, because {}", + currentBatchId.get(), + fileWriter.getIOWriter().getFile().getPath(), + closeException.getMessage(), + closeException); + } finally { + // Add current writing file to the list and delete the file + sealedFiles.add(new Pair<>(dataBase, fileWriter.getIOWriter().getFile())); + } + + for (final Pair sealedFile : sealedFiles) { + final boolean deleteSuccess = FileUtils.deleteQuietly(sealedFile.right); + LOGGER.warn( + "Batch id = {}: {} delete the tsfile {} after failed to write tablets into {}. {}", + currentBatchId.get(), + deleteSuccess ? "Successfully" : "Failed to", + sealedFile.right.getPath(), + fileWriter.getIOWriter().getFile().getPath(), + deleteSuccess ? "" : "Maybe the tsfile needs to be deleted manually."); + } + sealedFiles.clear(); + + fileWriter = null; + + throw e; + } + + fileWriter.close(); + final File sealedFile = fileWriter.getIOWriter().getFile(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Batch id = {}: Seal tsfile {} successfully.", + currentBatchId.get(), + sealedFile.getPath()); + } + sealedFiles.add(new Pair<>(dataBase, sealedFile)); + fileWriter = null; + } + + return sealedFiles; + } + + private >>> + void tryBestToWriteTabletsIntoOneFile( + final LinkedHashSet> device2TabletsLinkedList) throws IOException { + final Iterator> iterator = device2TabletsLinkedList.iterator(); + + while (iterator.hasNext()) { + final LinkedList tablets = iterator.next(); + + final List tabletsToWrite = new ArrayList<>(); + final Map deviceLastTimestampMap = new HashMap<>(); + while (!tablets.isEmpty()) { + final T pair = tablets.peekFirst(); + if (timestampsAreNonOverlapping( + (Pair>>) pair, deviceLastTimestampMap)) { + tabletsToWrite.add(pair); + tablets.pollFirst(); + continue; + } + break; + } + + if (tablets.isEmpty()) { + iterator.remove(); + } + boolean schemaNotRegistered = true; + for (final Pair>> pair : tabletsToWrite) { + final Tablet tablet = pair.left; + if (schemaNotRegistered) { + fileWriter.registerTableSchema( + new TableSchema(tablet.getTableName(), tablet.getSchemas(), tablet.getColumnTypes())); + schemaNotRegistered = false; + } + try { + fileWriter.writeTable(tablet, pair.right); + } catch (WriteProcessException e) { + LOGGER.warn( + "Batch id = {}: Failed to build the table model TSFile. Please check whether the written Tablet has time overlap and whether the Table Schema is correct.", + currentBatchId.get(), + e); + throw new PipeException( + "The written Tablet time may overlap or the Schema may be incorrect"); + } + } + } + } + + /** + * A Map is used to record the maximum time each {@link IDeviceID} is written. {@link Pair} + * records the Index+1 of the maximum timestamp of IDevice in each {@link Tablet}. + * + * @return If false, the tablet overlaps with the previous tablet; if true, there is no time + * overlap. + */ + private >>> + boolean timestampsAreNonOverlapping( + final T tabletPair, final Map deviceLastTimestampMap) { + int currentTimestampIndex = 0; + for (Pair deviceTimestampIndexPair : tabletPair.right) { + final Long lastDeviceTimestamp = deviceLastTimestampMap.get(deviceTimestampIndexPair.left); + if (lastDeviceTimestamp != null + && lastDeviceTimestamp >= tabletPair.left.timestamps[currentTimestampIndex]) { + return false; + } + currentTimestampIndex = deviceTimestampIndexPair.right; + deviceLastTimestampMap.put( + deviceTimestampIndexPair.left, tabletPair.left.timestamps[currentTimestampIndex - 1]); + } + + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java new file mode 100644 index 000000000000..8d30dd0172f2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.util.builder; + +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +public class PipeTreeModelTsFileBuilder extends PipeTsFileBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTreeModelTsFileBuilder.class); + + private final List tabletList = new ArrayList<>(); + private final List isTabletAlignedList = new ArrayList<>(); + + public PipeTreeModelTsFileBuilder( + final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) { + super(currentBatchId, tsFileIdGenerator); + } + + @Override + public void bufferTableModelTablet(final String dataBase, final Tablet tablet) { + throw new UnsupportedOperationException( + "PipeTreeModelTsFileBuilder does not support table model tablet to build TSFile"); + } + + @Override + public void bufferTreeModelTablet(final Tablet tablet, final Boolean isAligned) { + tabletList.add(tablet); + isTabletAlignedList.add(isAligned); + } + + @Override + public List> convertTabletToTsFileWithDBInfo() + throws IOException, WriteProcessException { + return writeTabletsToTsFiles(); + } + + @Override + public boolean isEmpty() { + return tabletList.isEmpty(); + } + + @Override + public void onSuccess() { + super.onSuccess(); + tabletList.clear(); + isTabletAlignedList.clear(); + } + + @Override + public synchronized void close() { + super.close(); + tabletList.clear(); + isTabletAlignedList.clear(); + } + + private List> writeTabletsToTsFiles() + throws IOException, WriteProcessException { + final Map> device2Tablets = new HashMap<>(); + final Map device2Aligned = new HashMap<>(); + + // Sort the tablets by device id + for (int i = 0, size = tabletList.size(); i < size; ++i) { + final Tablet tablet = tabletList.get(i); + final String deviceId = tablet.getDeviceId(); + device2Tablets.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(tablet); + device2Aligned.put(deviceId, isTabletAlignedList.get(i)); + } + + // Sort the tablets by start time in each device + for (final List tablets : device2Tablets.values()) { + tablets.sort( + // Each tablet has at least one timestamp + Comparator.comparingLong(tablet -> tablet.timestamps[0])); + } + + // Sort the devices by device id + final List devices = new ArrayList<>(device2Tablets.keySet()); + devices.sort(Comparator.naturalOrder()); + + // Replace ArrayList with LinkedList to improve performance + final LinkedHashMap> device2TabletsLinkedList = + new LinkedHashMap<>(); + for (final String device : devices) { + device2TabletsLinkedList.put(device, new LinkedList<>(device2Tablets.get(device))); + } + + // Help GC + devices.clear(); + device2Tablets.clear(); + + // Write the tablets to the tsfile device by device, and the tablets + // in the same device are written in order of start time. Tablets in + // the same device should not be written if their time ranges overlap. + // If overlapped, we try to write the tablets whose device id is not + // the same as the previous one. For the tablets not written in the + // previous round, we write them in a new tsfile. + final List> sealedFiles = new ArrayList<>(); + + // Try making the tsfile size as large as possible + while (!device2TabletsLinkedList.isEmpty()) { + if (Objects.isNull(fileWriter)) { + createFileWriter(); + } + try { + tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, device2Aligned); + } catch (final Exception e) { + LOGGER.warn( + "Batch id = {}: Failed to write tablets into tsfile, because {}", + currentBatchId.get(), + e.getMessage(), + e); + + try { + fileWriter.close(); + } catch (final Exception closeException) { + LOGGER.warn( + "Batch id = {}: Failed to close the tsfile {} after failed to write tablets into, because {}", + currentBatchId.get(), + fileWriter.getIOWriter().getFile().getPath(), + closeException.getMessage(), + closeException); + } finally { + // Add current writing file to the list and delete the file + sealedFiles.add(new Pair<>(null, fileWriter.getIOWriter().getFile())); + } + + for (final Pair sealedFile : sealedFiles) { + final boolean deleteSuccess = FileUtils.deleteQuietly(sealedFile.right); + LOGGER.warn( + "Batch id = {}: {} delete the tsfile {} after failed to write tablets into {}. {}", + currentBatchId.get(), + deleteSuccess ? "Successfully" : "Failed to", + sealedFile.right.getPath(), + fileWriter.getIOWriter().getFile().getPath(), + deleteSuccess ? "" : "Maybe the tsfile needs to be deleted manually."); + } + sealedFiles.clear(); + + fileWriter = null; + + throw e; + } + + fileWriter.close(); + final File sealedFile = fileWriter.getIOWriter().getFile(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Batch id = {}: Seal tsfile {} successfully.", + currentBatchId.get(), + sealedFile.getPath()); + } + sealedFiles.add(new Pair<>(null, sealedFile)); + fileWriter = null; + } + + return sealedFiles; + } + + private void tryBestToWriteTabletsIntoOneFile( + final LinkedHashMap> device2TabletsLinkedList, + final Map device2Aligned) + throws IOException, WriteProcessException { + final Iterator>> iterator = + device2TabletsLinkedList.entrySet().iterator(); + + while (iterator.hasNext()) { + final Map.Entry> entry = iterator.next(); + final String deviceId = entry.getKey(); + final LinkedList tablets = entry.getValue(); + + final List tabletsToWrite = new ArrayList<>(); + + Tablet lastTablet = null; + while (!tablets.isEmpty()) { + final Tablet tablet = tablets.peekFirst(); + if (Objects.isNull(lastTablet) + // lastTablet.rowSize is not 0 + || lastTablet.timestamps[lastTablet.getRowSize() - 1] < tablet.timestamps[0]) { + tabletsToWrite.add(tablet); + lastTablet = tablet; + tablets.pollFirst(); + } else { + break; + } + } + + if (tablets.isEmpty()) { + iterator.remove(); + } + + final boolean isAligned = device2Aligned.get(deviceId); + if (isAligned) { + final Map> deviceId2MeasurementSchemas = new HashMap<>(); + tabletsToWrite.forEach( + tablet -> + deviceId2MeasurementSchemas.compute( + tablet.getDeviceId(), + (k, v) -> { + if (Objects.isNull(v)) { + return new ArrayList<>(tablet.getSchemas()); + } + v.addAll(tablet.getSchemas()); + return v; + })); + for (final Map.Entry> deviceIdWithMeasurementSchemas : + deviceId2MeasurementSchemas.entrySet()) { + fileWriter.registerAlignedTimeseries( + new Path(deviceIdWithMeasurementSchemas.getKey()), + deviceIdWithMeasurementSchemas.getValue()); + } + for (final Tablet tablet : tabletsToWrite) { + fileWriter.writeAligned(tablet); + } + } else { + for (final Tablet tablet : tabletsToWrite) { + for (final IMeasurementSchema schema : tablet.getSchemas()) { + try { + fileWriter.registerTimeseries( + IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), schema); + } catch (final WriteProcessException ignore) { + // Do nothing if the timeSeries has been registered + } + } + + fileWriter.writeTree(tablet); + } + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java new file mode 100644 index 000000000000..d320bff0d173 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.util.builder; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; +import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +public abstract class PipeTsFileBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileBuilder.class); + + private static final AtomicReference FOLDER_MANAGER = new AtomicReference<>(); + protected final AtomicLong currentBatchId; + private final File batchFileBaseDir; + + private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch + private final AtomicLong tsFileIdGenerator; + + @SuppressWarnings("java:S3077") + protected volatile TsFileWriter fileWriter; + + public PipeTsFileBuilder(final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) { + this.currentBatchId = currentBatchId; + this.tsFileIdGenerator = tsFileIdGenerator; + try { + this.batchFileBaseDir = getNextBaseDir(); + } catch (final Exception e) { + throw new PipeException( + String.format("Failed to create file dir for batch: %s", e.getMessage())); + } + } + + private File getNextBaseDir() throws DiskSpaceInsufficientException { + if (FOLDER_MANAGER.get() == null) { + synchronized (FOLDER_MANAGER) { + if (FOLDER_MANAGER.get() == null) { + FOLDER_MANAGER.set( + new FolderManager( + Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs()) + .map(fileDir -> fileDir + File.separator + ".batch") + .collect(Collectors.toList()), + DirectoryStrategyType.SEQUENCE_STRATEGY)); + } + } + } + + final File baseDir = + new File(FOLDER_MANAGER.get().getNextFolder(), Long.toString(currentBatchId.get())); + if (baseDir.exists()) { + FileUtils.deleteQuietly(baseDir); + } + if (!baseDir.exists() && !baseDir.mkdirs()) { + LOGGER.warn( + "Batch id = {}: Failed to create batch file dir {}.", + currentBatchId.get(), + baseDir.getPath()); + throw new PipeException( + String.format( + "Failed to create batch file dir %s. (Batch id = %s)", + baseDir.getPath(), currentBatchId.get())); + } + LOGGER.info( + "Batch id = {}: Create batch dir successfully, batch file dir = {}.", + currentBatchId.get(), + baseDir.getPath()); + return baseDir; + } + + public abstract void bufferTableModelTablet(String dataBase, Tablet tablet); + + public abstract void bufferTreeModelTablet(Tablet tablet, Boolean isAligned); + + public abstract List> convertTabletToTsFileWithDBInfo() + throws IOException, WriteProcessException; + + public abstract boolean isEmpty(); + + public synchronized void onSuccess() { + fileWriter = null; + } + + public synchronized void close() { + if (Objects.nonNull(fileWriter)) { + try { + fileWriter.close(); + } catch (final Exception e) { + LOGGER.info( + "Batch id = {}: Failed to close the tsfile {} when trying to close batch, because {}", + currentBatchId.get(), + fileWriter.getIOWriter().getFile().getPath(), + e.getMessage(), + e); + } + + try { + FileUtils.delete(fileWriter.getIOWriter().getFile()); + } catch (final Exception e) { + LOGGER.info( + "Batch id = {}: Failed to delete the tsfile {} when trying to close batch, because {}", + currentBatchId.get(), + fileWriter.getIOWriter().getFile().getPath(), + e.getMessage(), + e); + } + + fileWriter = null; + } + } + + protected void createFileWriter() throws IOException { + fileWriter = + new TsFileWriter( + new File( + batchFileBaseDir, + TS_FILE_PREFIX + + "_" + + IoTDBDescriptor.getInstance().getConfig().getDataNodeId() + + "_" + + currentBatchId.get() + + "_" + + tsFileIdGenerator.getAndIncrement() + + TsFileConstant.TSFILE_SUFFIX)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/cacher/LeaderCacheUtils.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/cacher/LeaderCacheUtils.java index e31fec9b57b6..0151a83a9cff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/cacher/LeaderCacheUtils.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.connector.util; +package org.apache.iotdb.db.pipe.connector.util.cacher; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java new file mode 100644 index 000000000000..63ac5b847a1b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.util.sorter; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PipeTableModelTabletEventSorter { + + private final Tablet tablet; + + private Integer[] index; + private boolean isUnSorted = false; + private boolean hasDuplicates = false; + private int deduplicatedSize; + private int initIndexSize; + + public PipeTableModelTabletEventSorter(final Tablet tablet) { + this.tablet = tablet; + deduplicatedSize = tablet == null ? 0 : tablet.getRowSize(); + } + + /** + * For the sorting and deduplication needs of the table model tablet, it is done according to the + * {@link IDeviceID}. For sorting, it is necessary to sort the {@link IDeviceID} first, and then + * sort by time. Deduplication is to remove the same timestamp in the same {@link IDeviceID}, and + * the same timestamp in different {@link IDeviceID} will not be processed. + */ + public void sortAndDeduplicateByDevIdTimestamp() { + if (tablet == null || tablet.getRowSize() < 1) { + return; + } + + HashMap>> deviceIDToIndexMap = new HashMap<>(); + final long[] timestamps = tablet.timestamps; + + IDeviceID lastDevice = tablet.getDeviceID(0); + long previousTimestamp = tablet.timestamps[0]; + int lasIndex = 0; + for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { + final IDeviceID deviceID = tablet.getDeviceID(i); + final long currentTimestamp = timestamps[i]; + final int deviceComparison = deviceID.compareTo(lastDevice); + if (deviceComparison == 0) { + if (previousTimestamp == currentTimestamp) { + hasDuplicates = true; + continue; + } + if (previousTimestamp > currentTimestamp) { + isUnSorted = true; + } + previousTimestamp = currentTimestamp; + continue; + } + if (deviceComparison < 0) { + isUnSorted = true; + } + + List> list = + deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>()); + + if (!list.isEmpty()) { + isUnSorted = true; + } + list.add(new Pair<>(lasIndex, i)); + lastDevice = deviceID; + lasIndex = i; + previousTimestamp = currentTimestamp; + } + + List> list = + deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>()); + if (!list.isEmpty()) { + isUnSorted = true; + } + list.add(new Pair<>(lasIndex, tablet.getRowSize())); + + if (!isUnSorted && !hasDuplicates) { + return; + } + + initIndexSize = 0; + deduplicatedSize = 0; + index = new Integer[tablet.getRowSize()]; + deviceIDToIndexMap.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach( + entry -> { + final int start = initIndexSize; + int i = initIndexSize; + for (Pair pair : entry.getValue()) { + for (int j = pair.left; j < pair.right; j++) { + index[i++] = j; + } + } + if (isUnSorted) { + sortTimestamps(start, i); + deduplicateTimestamps(start, i); + initIndexSize = i; + return; + } + + if (hasDuplicates) { + deduplicateTimestamps(start, i); + } + initIndexSize = i; + }); + + sortAndDeduplicateValuesAndBitMaps(); + } + + private void sortAndDeduplicateValuesAndBitMaps() { + int columnIndex = 0; + tablet.timestamps = + (long[]) + PipeTabletEventSorter.reorderValueList( + deduplicatedSize, tablet.timestamps, TSDataType.TIMESTAMP, index); + for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { + final IMeasurementSchema schema = tablet.getSchemas().get(i); + if (schema != null) { + tablet.values[columnIndex] = + PipeTabletEventSorter.reorderValueList( + deduplicatedSize, tablet.values[columnIndex], schema.getType(), index); + if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { + tablet.bitMaps[columnIndex] = + PipeTabletEventSorter.reorderBitMap( + deduplicatedSize, tablet.bitMaps[columnIndex], index); + } + columnIndex++; + } + } + + tablet.setRowSize(deduplicatedSize); + } + + private void sortTimestamps(final int startIndex, final int endIndex) { + Arrays.sort( + this.index, startIndex, endIndex, Comparator.comparingLong(i -> tablet.timestamps[i])); + } + + private void deduplicateTimestamps(final int startIndex, final int endIndex) { + long lastTime = tablet.timestamps[index[startIndex]]; + index[deduplicatedSize++] = index[startIndex]; + for (int i = startIndex + 1; i < endIndex; i++) { + if (lastTime != (lastTime = tablet.timestamps[index[i]])) { + index[deduplicatedSize++] = index[i]; + } + } + } + + /** Sort by time only, and remove only rows with the same DeviceID and time. */ + public void sortAndDeduplicateByTimestampIfNecessary() { + if (tablet == null || tablet.getRowSize() == 0) { + return; + } + + for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { + final long currentTimestamp = tablet.timestamps[i]; + final long previousTimestamp = tablet.timestamps[i - 1]; + + if (currentTimestamp < previousTimestamp) { + isUnSorted = true; + break; + } + if (currentTimestamp == previousTimestamp) { + hasDuplicates = true; + } + } + + if (!isUnSorted && !hasDuplicates) { + return; + } + + index = new Integer[tablet.getRowSize()]; + for (int i = 0, size = tablet.getRowSize(); i < size; i++) { + index[i] = i; + } + + if (isUnSorted) { + sortTimestamps(); + + // Do deduplicate anyway. + // isDeduplicated may be false positive when isUnSorted is true. + deduplicateTimestamps(); + hasDuplicates = false; + } + + if (hasDuplicates) { + deduplicateTimestamps(); + } + + sortAndDeduplicateValuesAndBitMapsIgnoreTimestamp(); + } + + private void sortTimestamps() { + Arrays.sort(this.index, Comparator.comparingLong(i -> tablet.timestamps[i])); + Arrays.sort(tablet.timestamps, 0, tablet.getRowSize()); + } + + private void deduplicateTimestamps() { + deduplicatedSize = 1; + long lastTime = tablet.timestamps[0]; + IDeviceID deviceID = tablet.getDeviceID(index[0]); + final Set deviceIDSet = new HashSet<>(); + deviceIDSet.add(deviceID); + for (int i = 1, size = tablet.getRowSize(); i < size; i++) { + deviceID = tablet.getDeviceID(index[i]); + if ((lastTime == (lastTime = tablet.timestamps[i]))) { + if (!deviceIDSet.contains(deviceID)) { + tablet.timestamps[deduplicatedSize] = lastTime; + index[deduplicatedSize++] = index[i]; + deviceIDSet.add(deviceID); + } + } else { + tablet.timestamps[deduplicatedSize] = lastTime; + index[deduplicatedSize++] = index[i]; + deviceIDSet.clear(); + deviceIDSet.add(deviceID); + } + } + } + + private void sortAndDeduplicateValuesAndBitMapsIgnoreTimestamp() { + int columnIndex = 0; + for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { + final IMeasurementSchema schema = tablet.getSchemas().get(i); + if (schema != null) { + tablet.values[columnIndex] = + PipeTabletEventSorter.reorderValueList( + deduplicatedSize, tablet.values[columnIndex], schema.getType(), index); + if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { + tablet.bitMaps[columnIndex] = + PipeTabletEventSorter.reorderBitMap( + deduplicatedSize, tablet.bitMaps[columnIndex], index); + } + columnIndex++; + } + } + + tablet.setRowSize(deduplicatedSize); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTabletEventSorter.java similarity index 56% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTabletEventSorter.java index ee34099eb216..f3756b2a46c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTabletEventSorter.java @@ -17,116 +17,19 @@ * under the License. */ -package org.apache.iotdb.db.pipe.connector.util; +package org.apache.iotdb.db.pipe.connector.util.sorter; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; -import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.IMeasurementSchema; import java.time.LocalDate; -import java.util.Arrays; -import java.util.Comparator; public class PipeTabletEventSorter { - private final Tablet tablet; - - private boolean isSorted = true; - private boolean isDeduplicated = true; - - private Integer[] index; - private int deduplicatedSize; - - public PipeTabletEventSorter(final Tablet tablet) { - this.tablet = tablet; - deduplicatedSize = tablet == null ? 0 : tablet.getRowSize(); - } - - public void deduplicateAndSortTimestampsIfNecessary() { - if (tablet == null || tablet.getRowSize() == 0) { - return; - } - - for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { - final long currentTimestamp = tablet.timestamps[i]; - final long previousTimestamp = tablet.timestamps[i - 1]; - - if (currentTimestamp < previousTimestamp) { - isSorted = false; - } - if (currentTimestamp == previousTimestamp) { - isDeduplicated = false; - } - - if (!isSorted && !isDeduplicated) { - break; - } - } - - if (isSorted && isDeduplicated) { - return; - } - - index = new Integer[tablet.getRowSize()]; - for (int i = 0, size = tablet.getRowSize(); i < size; i++) { - index[i] = i; - } - - if (!isSorted) { - sortTimestamps(); - - // Do deduplicate anyway. - // isDeduplicated may be false positive when isSorted is false. - deduplicateTimestamps(); - isDeduplicated = true; - } - - if (!isDeduplicated) { - deduplicateTimestamps(); - } - - sortAndDeduplicateValuesAndBitMaps(); - } - - private void sortTimestamps() { - Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i])); - Arrays.sort(tablet.timestamps, 0, tablet.getRowSize()); - } - - private void deduplicateTimestamps() { - deduplicatedSize = 1; - for (int i = 1, size = tablet.getRowSize(); i < size; i++) { - if (tablet.timestamps[i] != tablet.timestamps[i - 1]) { - index[deduplicatedSize] = index[i]; - tablet.timestamps[deduplicatedSize] = tablet.timestamps[i]; - - ++deduplicatedSize; - } - } - tablet.setRowSize(deduplicatedSize); - } - - private void sortAndDeduplicateValuesAndBitMaps() { - int columnIndex = 0; - for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { - final IMeasurementSchema schema = tablet.getSchemas().get(i); - if (schema != null) { - tablet.values[columnIndex] = - reorderValueList(deduplicatedSize, tablet.values[columnIndex], schema.getType(), index); - if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { - tablet.bitMaps[columnIndex] = - reorderBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], index); - } - columnIndex++; - } - } - } - - private static Object reorderValueList( - int deduplicatedSize, + public static Object reorderValueList( + final int deduplicatedSize, final Object valueList, final TSDataType dataType, final Integer[] index) { @@ -189,8 +92,8 @@ private static Object reorderValueList( } } - private static BitMap reorderBitMap( - int deduplicatedSize, final BitMap bitMap, final Integer[] index) { + public static BitMap reorderBitMap( + final int deduplicatedSize, final BitMap bitMap, final Integer[] index) { final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize()); for (int i = 0; i < deduplicatedSize; i++) { if (bitMap.isMarked(index[i])) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTreeModelTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTreeModelTabletEventSorter.java new file mode 100644 index 000000000000..42f8c2dac52c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTreeModelTabletEventSorter.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.util.sorter; + +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.util.Arrays; +import java.util.Comparator; + +public class PipeTreeModelTabletEventSorter { + + private final Tablet tablet; + + private boolean isSorted = true; + private boolean isDeduplicated = true; + + private Integer[] index; + private int deduplicatedSize; + + public PipeTreeModelTabletEventSorter(final Tablet tablet) { + this.tablet = tablet; + deduplicatedSize = tablet == null ? 0 : tablet.getRowSize(); + } + + public void deduplicateAndSortTimestampsIfNecessary() { + if (tablet == null || tablet.getRowSize() == 0) { + return; + } + + for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { + final long currentTimestamp = tablet.timestamps[i]; + final long previousTimestamp = tablet.timestamps[i - 1]; + + if (currentTimestamp < previousTimestamp) { + isSorted = false; + break; + } + if (currentTimestamp == previousTimestamp) { + isDeduplicated = false; + } + } + + if (isSorted && isDeduplicated) { + return; + } + + index = new Integer[tablet.getRowSize()]; + for (int i = 0, size = tablet.getRowSize(); i < size; i++) { + index[i] = i; + } + + if (!isSorted) { + sortTimestamps(); + + // Do deduplicate anyway. + // isDeduplicated may be false positive when isSorted is false. + deduplicateTimestamps(); + isDeduplicated = true; + } + + if (!isDeduplicated) { + deduplicateTimestamps(); + } + + sortAndDeduplicateValuesAndBitMaps(); + } + + private void sortTimestamps() { + Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i])); + Arrays.sort(tablet.timestamps, 0, tablet.getRowSize()); + } + + private void deduplicateTimestamps() { + deduplicatedSize = 1; + for (int i = 1, size = tablet.getRowSize(); i < size; i++) { + if (tablet.timestamps[i] != tablet.timestamps[i - 1]) { + index[deduplicatedSize] = index[i]; + tablet.timestamps[deduplicatedSize] = tablet.timestamps[i]; + + ++deduplicatedSize; + } + } + tablet.setRowSize(deduplicatedSize); + } + + private void sortAndDeduplicateValuesAndBitMaps() { + int columnIndex = 0; + for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { + final IMeasurementSchema schema = tablet.getSchemas().get(i); + if (schema != null) { + tablet.values[columnIndex] = + PipeTabletEventSorter.reorderValueList( + deduplicatedSize, tablet.values[columnIndex], schema.getType(), index); + if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { + tablet.bitMaps[columnIndex] = + PipeTabletEventSorter.reorderBitMap( + deduplicatedSize, tablet.bitMaps[columnIndex], index); + } + columnIndex++; + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 514795db23a3..2781350db5d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -28,6 +28,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,14 +96,16 @@ protected List generateSubscriptionEvents() throws Exception } final List events = new ArrayList<>(); - final List tsFiles = batch.sealTsFiles(); - final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size()); - for (final File tsFile : tsFiles) { + final List> dbTsFilePairs = batch.sealTsFiles(); + final AtomicInteger referenceCount = new AtomicInteger(dbTsFilePairs.size()); + for (final Pair tsFile : dbTsFilePairs) { final SubscriptionCommitContext commitContext = prefetchingQueue.generateSubscriptionCommitContext(); events.add( new SubscriptionEvent( - new SubscriptionPipeTsFileBatchEvents(this, referenceCount), tsFile, commitContext)); + new SubscriptionPipeTsFileBatchEvents(this, referenceCount), + tsFile.right, + commitContext)); } return events; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java index a1cbaa0d03a1..e6f2d44a589f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java @@ -19,17 +19,24 @@ package org.apache.iotdb.db.pipe.connector; -import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -46,7 +53,7 @@ private static boolean checkSorted(final Tablet tablet) { } @Test - public void testDeduplicateAndSort() { + public void testTreeModelDeduplicateAndSort() { List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); @@ -82,7 +89,7 @@ public void testDeduplicateAndSort() { Assert.assertFalse(checkSorted(tablet)); - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); Assert.assertTrue(checkSorted(tablet)); @@ -103,7 +110,7 @@ public void testDeduplicateAndSort() { } @Test - public void testDeduplicate() { + public void testTreeModelDeduplicate() { List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); @@ -127,7 +134,7 @@ public void testDeduplicate() { Assert.assertTrue(checkSorted(tablet)); - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); Assert.assertTrue(checkSorted(tablet)); @@ -148,7 +155,7 @@ public void testDeduplicate() { } @Test - public void testSort() { + public void testTreeModelSort() { List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); @@ -196,7 +203,7 @@ public void testSort() { } } - new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); Assert.assertTrue(checkSorted(tablet)); @@ -215,4 +222,193 @@ public void testSort() { } } } + + @Test + public void testTableModelDeduplicateAndSort() { + doTableModelTest(true, true); + } + + @Test + public void testTableModelDeduplicate() { + doTableModelTest(true, false); + } + + @Test + public void testTableModelSort() { + doTableModelTest(false, true); + } + + @Test + public void testTableModelDeduplicateAndSort1() { + doTableModelTest1(true, true); + } + + @Test + public void testTableModelDeduplicate1() { + doTableModelTest1(true, false); + } + + @Test + public void testTableModelSort1() { + doTableModelTest1(false, true); + } + + public void doTableModelTest(final boolean hasDuplicates, final boolean isUnSorted) { + final Tablet tablet = generateTablet("test", 10, hasDuplicates, isUnSorted); + new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp(); + for (int i = 1; i < tablet.getRowSize(); i++) { + long time = tablet.timestamps[i]; + Assert.assertTrue(time > tablet.timestamps[i - 1]); + Assert.assertEquals( + tablet.getValue(i, 0), + new Binary(String.valueOf(i / 100).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(tablet.getValue(i, 1), (long) i); + Assert.assertEquals(tablet.getValue(i, 2), i * 1.0f); + Assert.assertEquals( + tablet.getValue(i, 3), new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(tablet.getValue(i, 4), (long) i); + Assert.assertEquals(tablet.getValue(i, 5), i); + Assert.assertEquals(tablet.getValue(i, 6), i * 0.1); + Assert.assertEquals(tablet.getValue(i, 7), getDate(i)); + Assert.assertEquals( + tablet.getValue(i, 8), new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + } + } + + public void doTableModelTest1(final boolean hasDuplicates, final boolean isUnSorted) { + final Tablet tablet = generateTablet("test", 10, hasDuplicates, isUnSorted); + new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByTimestampIfNecessary(); + for (int i = 1; i < tablet.getRowSize(); i++) { + long time = tablet.timestamps[i]; + Assert.assertTrue(time > tablet.timestamps[i - 1]); + Assert.assertEquals( + tablet.getValue(i, 0), + new Binary(String.valueOf(i / 100).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(tablet.getValue(i, 1), (long) i); + Assert.assertEquals(tablet.getValue(i, 2), i * 1.0f); + Assert.assertEquals( + tablet.getValue(i, 3), new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(tablet.getValue(i, 4), (long) i); + Assert.assertEquals(tablet.getValue(i, 5), i); + Assert.assertEquals(tablet.getValue(i, 6), i * 0.1); + Assert.assertEquals(tablet.getValue(i, 7), getDate(i)); + Assert.assertEquals( + tablet.getValue(i, 8), new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + } + } + + private Tablet generateTablet( + final String tableName, + final int deviceIDNum, + final boolean hasDuplicates, + final boolean isUnSorted) { + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s0", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.FLOAT)); + schemaList.add(new MeasurementSchema("s3", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("s4", TSDataType.TIMESTAMP)); + schemaList.add(new MeasurementSchema("s5", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s6", TSDataType.DOUBLE)); + schemaList.add(new MeasurementSchema("s7", TSDataType.DATE)); + schemaList.add(new MeasurementSchema("s8", TSDataType.TEXT)); + + final List columnTypes = + Arrays.asList( + Tablet.ColumnCategory.TAG, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD, + Tablet.ColumnCategory.FIELD); + Tablet tablet = + new Tablet( + tableName, + IMeasurementSchema.getMeasurementNameList(schemaList), + IMeasurementSchema.getDataTypeList(schemaList), + columnTypes, + deviceIDNum * 1000); + tablet.initBitMaps(); + + // s2 float, s3 string, s4 timestamp, s5 int32, s6 double, s7 date, s8 text + int rowIndex = 0; + + for (long row = 0; row < deviceIDNum; row++) { + for (int i = 0; i < (isUnSorted ? 50 : 100); i++) { + + final long value; + if (isUnSorted) { + value = (row + 1) * 100 - i - 1; + } else { + value = (row) * 100 + i; + } + for (int j = 0; j < 10; j++) { + tablet.addTimestamp(rowIndex, value); + tablet.addValue( + "s0", rowIndex, new Binary(String.valueOf(row).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s1", rowIndex, value); + tablet.addValue("s2", rowIndex, (value * 1.0f)); + tablet.addValue( + "s3", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s4", rowIndex, value); + tablet.addValue("s5", rowIndex, (int) value); + tablet.addValue("s6", rowIndex, value * 0.1); + tablet.addValue("s7", rowIndex, getDate((int) value)); + tablet.addValue( + "s8", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + rowIndex++; + tablet.setRowSize(rowIndex); + if (!hasDuplicates) { + break; + } + } + } + } + if (!isUnSorted) { + return tablet; + } + for (long row = 0; row < deviceIDNum; row++) { + for (int i = 50; i < 100; i++) { + + final long value; + value = (row + 1) * 100 - i - 1; + + for (int j = 0; j < 10; j++) { + tablet.addTimestamp(rowIndex, value); + tablet.addValue( + "s0", rowIndex, new Binary(String.valueOf(row).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s1", rowIndex, value); + tablet.addValue("s2", rowIndex, (value * 1.0f)); + tablet.addValue( + "s3", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s4", rowIndex, value); + tablet.addValue("s5", rowIndex, (int) value); + tablet.addValue("s6", rowIndex, value * 0.1); + tablet.addValue("s7", rowIndex, getDate((int) value)); + tablet.addValue( + "s8", rowIndex, new Binary(String.valueOf(value).getBytes(StandardCharsets.UTF_8))); + rowIndex++; + tablet.setRowSize(rowIndex); + if (!hasDuplicates) { + break; + } + } + } + } + return tablet; + } + + public LocalDate getDate(final int value) { + Date date = new Date(value); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + try { + return DateUtils.parseIntToLocalDate( + DateUtils.parseDateExpressionToInt(dateFormat.format(date))); + } catch (Exception e) { + return DateUtils.parseIntToLocalDate(DateUtils.parseDateExpressionToInt("1970-01-01")); + } + } }