Skip to content

Commit

Permalink
Pipe IT: Add table model testPipeAfterDataRegionLeaderStop retry logi…
Browse files Browse the repository at this point in the history
…c and extend IoTDBPipeTypeConversionIT retry time (#14628)
  • Loading branch information
luoluoyuyu authored Jan 6, 2025
1 parent 2af3d02 commit 95451e4
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private void executeAndVerifyTypeConversion(TSDataType source, TSDataType target
String.format("select status from root.test.%s2%s", source.name(), target.name()),
String.format("Time,root.test.%s2%s.status,", source.name(), target.name()),
createExpectedResultSet(pairs, source, target),
30);
600);
}

private List<Pair> prepareTypeConversionTest(TSDataType sourceType, TSDataType targetType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,156 +173,172 @@ private void testWithAllParameters(final String realtimeMode) throws Exception {
}
}

// This function has a certain probability of triggering replica asynchrony. To ensure the success
// of the test, it will be retried 5 times. The exception will be thrown after five retries.
@Test
public void testPipeAfterDataRegionLeaderStop() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

boolean insertResult = true;

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
insertResult = TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
if (!insertResult) {
return;
}

extractorAttributes.put("extractor", "iotdb-extractor");
extractorAttributes.put("database-name", "test");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("table-name", "test");
extractorAttributes.put("start-time", "0");
extractorAttributes.put("end-time", "300");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

insertResult = TableModelUtils.insertData("test", "test", 100, 200, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
if (!insertResult) {
return;
}

final AtomicInteger leaderPort = new AtomicInteger(-1);
final TShowRegionResp showRegionResp =
client.showRegion(new TShowRegionReq().setIsTableModel(true));
showRegionResp
.getRegionInfoList()
.forEach(
regionInfo -> {
if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
leaderPort.set(regionInfo.getClientRpcPort());
}
});

int leaderIndex = -1;
for (int i = 0; i < 3; ++i) {
if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
leaderIndex = i;
try {
senderEnv.shutdownDataNode(i);
} catch (final Throwable e) {
e.printStackTrace();
for (int retry = 0; retry < 5; retry++) {
try {
if (retry != 0) {
this.setUp();
}
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

boolean insertResult = true;

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
insertResult = TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
if (!insertResult) {
return;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (final InterruptedException ignored) {
}
try {
senderEnv.startDataNode(i);
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();

extractorAttributes.put("extractor", "iotdb-extractor");
extractorAttributes.put("database-name", "test");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("table-name", "test");
extractorAttributes.put("start-time", "0");
extractorAttributes.put("end-time", "300");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

insertResult = TableModelUtils.insertData("test", "test", 100, 200, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
if (!insertResult) {
return;
}
}
}
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}

insertResult = TableModelUtils.insertData("test", "test", 200, 300, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
if (!insertResult) {
return;
}

TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, handleFailure);
}

try {
TestUtils.restartCluster(senderEnv);
TestUtils.restartCluster(receiverEnv);
} catch (final Throwable e) {
e.printStackTrace();
return;
}

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
// Create a new pipe and write new data
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("database-name", "test1");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("table-name", "test1");
extractorAttributes.put("start-time", "0");
extractorAttributes.put("end-time", "300");
final AtomicInteger leaderPort = new AtomicInteger(-1);
final TShowRegionResp showRegionResp =
client.showRegion(new TShowRegionReq().setIsTableModel(true));
showRegionResp
.getRegionInfoList()
.forEach(
regionInfo -> {
if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
leaderPort.set(regionInfo.getClientRpcPort());
}
});

int leaderIndex = -1;
for (int i = 0; i < 3; ++i) {
if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
leaderIndex = i;
try {
senderEnv.shutdownDataNode(i);
} catch (final Throwable e) {
e.printStackTrace();
return;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (final InterruptedException ignored) {
}
try {
senderEnv.startDataNode(i);
((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();
return;
}
}
}
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
insertResult = TableModelUtils.insertData("test", "test", 200, 300, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
if (!insertResult) {
return;
}

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p2", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, handleFailure);
}

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode());
try {
TestUtils.restartCluster(senderEnv);
TestUtils.restartCluster(receiverEnv);
} catch (final Throwable e) {
e.printStackTrace();
return;
}

insertResult = TableModelUtils.insertData("test", "test", 300, 400, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 300, 400, senderEnv);
if (!insertResult) {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
// Create a new pipe and write new data
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("database-name", "test1");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("table-name", "test1");
extractorAttributes.put("start-time", "0");
extractorAttributes.put("end-time", "300");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p2", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode());

insertResult = TableModelUtils.insertData("test", "test", 300, 400, senderEnv);
insertResult =
insertResult && TableModelUtils.insertData("test1", "test1", 300, 400, senderEnv);
if (!insertResult) {
return;
}
TableModelUtils.assertData("test", "test", 0, 301, receiverEnv, handleFailure);
TableModelUtils.assertData("test1", "test1", 0, 301, receiverEnv, handleFailure);
}
return;
} catch (Exception | Error e) {
if (retry < 4) {
this.tearDown();
} else {
throw e;
}
}
TableModelUtils.assertData("test", "test", 0, 301, receiverEnv, handleFailure);
TableModelUtils.assertData("test1", "test1", 0, 301, receiverEnv, handleFailure);
}
}

Expand Down

0 comments on commit 95451e4

Please sign in to comment.