Skip to content

Commit

Permalink
Pipe: Fix PipeWriteBackSink using toTPipeTransferReq causing NPE due …
Browse files Browse the repository at this point in the history
…to uninitialized buffer (#14672)
  • Loading branch information
luoluoyuyu authored Jan 14, 2025
1 parent f2e20b5 commit f129298
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ private void testSinkFormat(final String format) throws Exception {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "forced-log");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");

Expand Down Expand Up @@ -242,44 +241,22 @@ private void testSinkFormat(final String format) throws Exception {

@Test
public void testWriteBackSink() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 50, senderEnv, true);

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", "flush"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("forwarding-pipe-requests", "false");
extractorAttributes.put("extractor.database-name", "test.*");
extractorAttributes.put("extractor.table-name", "test.*");

processorAttributes.put("processor", "rename-database-processor");
processorAttributes.put("processor.new-db-name", "test1");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "true");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.realtime-first", "false");
connectorAttributes.put("connector", "write-back-sink");

final TSStatus status =
client.createPipe(
Expand All @@ -292,15 +269,20 @@ public void testWriteBackSink() throws Exception {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

TableModelUtils.insertData("test", "test", 50, 100, senderEnv, true);

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"))) {
return;
}

TableModelUtils.assertCountData("test1", "test", 100, receiverEnv, handleFailure);
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertDataNotThrowError("test", "test", 0, 20, senderEnv);

TableModelUtils.insertTablet("test", "test", 20, 200, senderEnv, true);

TableModelUtils.insertTablet("test", "test", 200, 400, senderEnv, true);

TableModelUtils.assertCountData("test1", "test", 400, senderEnv);
}
}

Expand Down Expand Up @@ -382,7 +364,6 @@ private void doTest(BiConsumer<Map<String, List<Tablet>>, Map<String, List<Table
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "forced-log");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("extractor.database-name", "test.*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.iotdb.db.pipe.connector.protocol.writeback;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
Expand All @@ -30,7 +33,18 @@
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.annotation.TableModel;
Expand All @@ -44,11 +58,20 @@
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;

@TreeModel
@TableModel
Expand All @@ -64,6 +87,10 @@ public class WriteBackConnector implements PipeConnector {

private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;

private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();

private static final Set<String> ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet();

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
// Do nothing
Expand All @@ -82,7 +109,11 @@ public void customize(
environment.getPipeName(),
environment.getCreationTime(),
environment.getRegionId()));
SESSION_MANAGER.registerSession(session);

// Fill in the necessary information. Incomplete information will result in NPE.
session.setUsername(AuthorityChecker.SUPER_USER);
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
session.setZoneId(ZoneId.systemDefault());
}

@Override
Expand Down Expand Up @@ -141,17 +172,25 @@ private void doTransfer(
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;

final InsertBaseStatement insertBaseStatement;
if (Objects.isNull(insertNode)) {
insertBaseStatement =
PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer(), dataBaseName)
.constructStatement();
} else {
insertBaseStatement =
PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, dataBaseName)
.constructStatement();
}

final TSStatus status =
PipeDataNodeAgent.receiver()
.thrift()
.receive(
Objects.isNull(insertNode)
? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer(), dataBaseName)
: PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
insertNode, dataBaseName))
.getStatus();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
insertBaseStatement.isWriteToTable()
? executeStatementForTableModel(insertBaseStatement, dataBaseName)
: executeStatementForTreeModel(insertBaseStatement);

if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Write back PipeInsertNodeTabletInsertionEvent %s error, result status %s",
Expand All @@ -174,18 +213,25 @@ private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletIn

private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
throws PipeException {
final String dataBaseName =
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;

final InsertTabletStatement insertTabletStatement =
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned(),
dataBaseName)
.constructStatement();

final TSStatus status =
PipeDataNodeAgent.receiver()
.thrift()
.receive(
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER))
.getStatus();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
insertTabletStatement.isWriteToTable()
? executeStatementForTableModel(insertTabletStatement, dataBaseName)
: executeStatementForTreeModel(insertTabletStatement);

if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Write back PipeRawTabletInsertionEvent %s error, result status %s",
Expand All @@ -203,6 +249,105 @@ public void close() throws Exception {
if (session != null) {
SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
}
SESSION_MANAGER.removeCurrSession();
}

private TSStatus executeStatementForTableModel(Statement statement, String dataBaseName) {
session.setDatabaseName(dataBaseName);
session.setSqlDialect(IClientSession.SqlDialect.TABLE);
SESSION_MANAGER.registerSession(session);
try {
autoCreateDatabaseIfNecessary(dataBaseName);
return Coordinator.getInstance()
.executeForTableModel(
new PipeEnrichedStatement(statement),
RELATIONAL_SQL_PARSER,
session,
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfoOfPipeReceiver(session, dataBaseName),
"",
LocalExecutionPlanner.getInstance().metadata,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
.status;
} catch (final Exception e) {
ALREADY_CREATED_DATABASES.remove(dataBaseName);

final Throwable rootCause = getRootCause(e);
if (rootCause.getMessage() != null
&& rootCause
.getMessage()
.toLowerCase(Locale.ENGLISH)
.contains(DATABASE_NOT_SET.toLowerCase(Locale.ENGLISH))) {
autoCreateDatabaseIfNecessary(dataBaseName);

// Retry after creating the database
session.setDatabaseName(dataBaseName);
return Coordinator.getInstance()
.executeForTableModel(
new PipeEnrichedStatement(statement),
RELATIONAL_SQL_PARSER,
session,
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfo(session),
"",
LocalExecutionPlanner.getInstance().metadata,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
.status;
}

// If the exception is not caused by database not set, throw it directly
throw e;
} finally {
SESSION_MANAGER.removeCurrSession();
}
}

private void autoCreateDatabaseIfNecessary(final String database) {
if (ALREADY_CREATED_DATABASES.contains(database)) {
return;
}

final TDatabaseSchema schema = new TDatabaseSchema(new TDatabaseSchema(database));
schema.setIsTableModel(true);

final CreateDBTask task = new CreateDBTask(schema, true);
try {
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
final ConfigTaskResult result = future.get();
if (result.getStatusCode().getStatusCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Auto create database failed: %s, status code: %s",
database, result.getStatusCode()));
}
} catch (final ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new PipeException("Auto create database failed because: " + e.getMessage());
}

ALREADY_CREATED_DATABASES.add(database);
}

private TSStatus executeStatementForTreeModel(final Statement statement) {
session.setDatabaseName(null);
session.setSqlDialect(IClientSession.SqlDialect.TREE);
SESSION_MANAGER.registerSession(session);
try {
return Coordinator.getInstance()
.executeForTreeModel(
new PipeEnrichedStatement(statement),
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfo(session),
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false)
.status;
} finally {
SESSION_MANAGER.removeCurrSession();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class PipeInsertionEvent extends EnrichedEvent {

private Boolean isTableModelEvent; // lazy initialization

private final String treeModelDatabaseName;
private String treeModelDatabaseName;
private String tableModelDatabaseName; // lazy initialization

protected PipeInsertionEvent(
Expand Down Expand Up @@ -103,6 +103,9 @@ public String getTableModelDatabaseName() {
}

public void renameTableModelDatabase(final String tableModelDatabaseName) {
// Please note that if you parse TsFile, you need to use TreeModelDatabaseName, so you need to
// rename TreeModelDatabaseName as well.
this.tableModelDatabaseName = tableModelDatabaseName;
this.treeModelDatabaseName = "root." + tableModelDatabaseName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,8 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
return shouldConvertDataTypeOnTypeMismatch
&& ((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement) statement).hasFailedMeasurements())
|| status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
|| (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()))
? (isTableModelStatement
? statement
.accept(
Expand Down

0 comments on commit f129298

Please sign in to comment.