toTabletInsertionEvents();
+}
+```
+
+### 自定义流处理插件编程接口定义
+
+基于自定义流处理插件编程接口,用户可以轻松编写数据抽取插件、数据处理插件和数据发送插件,从而使得流处理功能灵活适配各种工业场景。
+
+#### 数据抽取插件接口
+
+数据抽取是流处理数据从数据抽取到数据发送三阶段的第一阶段。数据抽取插件(PipeExtractor)是流处理引擎和存储引擎的桥梁,它通过监听存储引擎的行为,
+捕获各种数据写入事件。
+
+```java
+/**
+ * PipeExtractor
+ *
+ * PipeExtractor is responsible for capturing events from sources.
+ *
+ *
Various data sources can be supported by implementing different PipeExtractor classes.
+ *
+ *
The lifecycle of a PipeExtractor is as follows:
+ *
+ *
+ * - When a collaboration task is created, the KV pairs of `WITH EXTRACTOR` clause in SQL are
+ * parsed and the validation method {@link PipeExtractor#validate(PipeParameterValidator)}
+ * will be called to validate the parameters.
+ *
- Before the collaboration task starts, the method {@link
+ * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} will be called
+ * to config the runtime behavior of the PipeExtractor.
+ *
- Then the method {@link PipeExtractor#start()} will be called to start the PipeExtractor.
+ *
- While the collaboration task is in progress, the method {@link PipeExtractor#supply()} will
+ * be called to capture events from sources and then the events will be passed to the
+ * PipeProcessor.
+ *
- The method {@link PipeExtractor#close()} will be called when the collaboration task is
+ * cancelled (the `DROP PIPE` command is executed).
+ *
+ */
+public interface PipeExtractor extends PipePlugin {
+
+ /**
+ * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
+ * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called.
+ *
+ * @param validator the validator used to validate {@link PipeParameters}
+ * @throws Exception if any parameter is not valid
+ */
+ void validate(PipeParameterValidator validator) throws Exception;
+
+ /**
+ * This method is mainly used to customize PipeExtractor. In this method, the user can do the
+ * following things:
+ *
+ *
+ * - Use PipeParameters to parse key-value pair attributes entered by the user.
+ *
- Set the running configurations in PipeExtractorRuntimeConfiguration.
+ *
+ *
+ * This method is called after the method {@link
+ * PipeExtractor#validate(PipeParameterValidator)} is called.
+ *
+ * @param parameters used to parse the input parameters entered by the user
+ * @param configuration used to set the required properties of the running PipeExtractor
+ * @throws Exception the user can throw errors if necessary
+ */
+ void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
+ throws Exception;
+
+ /**
+ * Start the extractor. After this method is called, events should be ready to be supplied by
+ * {@link PipeExtractor#supply()}. This method is called after {@link
+ * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called.
+ *
+ * @throws Exception the user can throw errors if necessary
+ */
+ void start() throws Exception;
+
+ /**
+ * Supply single event from the extractor and the caller will send the event to the processor.
+ * This method is called after {@link PipeExtractor#start()} is called.
+ *
+ * @return the event to be supplied. the event may be null if the extractor has no more events at
+ * the moment, but the extractor is still running for more events.
+ * @throws Exception the user can throw errors if necessary
+ */
+ Event supply() throws Exception;
+}
+```
+
+#### 数据处理插件接口
+
+数据处理是流处理数据从数据抽取到数据发送三阶段的第二阶段。数据处理插件(PipeProcessor)主要用于过滤和转换由数据抽取插件(PipeExtractor)捕获的
+各种事件。
+
+```java
+/**
+ * PipeProcessor
+ *
+ *
PipeProcessor is used to filter and transform the Event formed by the PipeExtractor.
+ *
+ *
The lifecycle of a PipeProcessor is as follows:
+ *
+ *
+ * - When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
+ * parsed and the validation method {@link PipeProcessor#validate(PipeParameterValidator)}
+ * will be called to validate the parameters.
+ *
- Before the collaboration task starts, the method {@link
+ * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called
+ * to config the runtime behavior of the PipeProcessor.
+ *
- While the collaboration task is in progress:
+ *
+ * - PipeExtractor captures the events and wraps them into three types of Event instances.
+ *
- PipeProcessor processes the event and then passes them to the PipeConnector. The
+ * following 3 methods will be called: {@link
+ * PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
+ * PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
+ * PipeProcessor#process(Event, EventCollector)}.
+ *
- PipeConnector serializes the events into binaries and send them to sinks.
+ *
+ * - When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
+ * PipeProcessor#close() } method will be called.
+ *
+ */
+public interface PipeProcessor extends PipePlugin {
+
+ /**
+ * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
+ * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.
+ *
+ * @param validator the validator used to validate {@link PipeParameters}
+ * @throws Exception if any parameter is not valid
+ */
+ void validate(PipeParameterValidator validator) throws Exception;
+
+ /**
+ * This method is mainly used to customize PipeProcessor. In this method, the user can do the
+ * following things:
+ *
+ *
+ * - Use PipeParameters to parse key-value pair attributes entered by the user.
+ *
- Set the running configurations in PipeProcessorRuntimeConfiguration.
+ *
+ *
+ * This method is called after the method {@link
+ * PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the
+ * events processing.
+ *
+ * @param parameters used to parse the input parameters entered by the user
+ * @param configuration used to set the required properties of the running PipeProcessor
+ * @throws Exception the user can throw errors if necessary
+ */
+ void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+ throws Exception;
+
+ /**
+ * This method is called to process the TabletInsertionEvent.
+ *
+ * @param tabletInsertionEvent TabletInsertionEvent to be processed
+ * @param eventCollector used to collect result events after processing
+ * @throws Exception the user can throw errors if necessary
+ */
+ void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+ throws Exception;
+
+ /**
+ * This method is called to process the TsFileInsertionEvent.
+ *
+ * @param tsFileInsertionEvent TsFileInsertionEvent to be processed
+ * @param eventCollector used to collect result events after processing
+ * @throws Exception the user can throw errors if necessary
+ */
+ default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+ throws Exception {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
+ }
+
+ /**
+ * This method is called to process the Event.
+ *
+ * @param event Event to be processed
+ * @param eventCollector used to collect result events after processing
+ * @throws Exception the user can throw errors if necessary
+ */
+ void process(Event event, EventCollector eventCollector) throws Exception;
+}
+```
+
+#### 数据发送插件接口
+
+数据发送是流处理数据从数据抽取到数据发送三阶段的第三阶段。数据发送插件(PipeConnector)主要用于发送经由数据处理插件(PipeProcessor)处理过后的
+各种事件,它作为流处理框架的网络实现层,接口上应允许接入多种实时通信协议和多种连接器。
+
+```java
+/**
+ * PipeConnector
+ *
+ *
PipeConnector is responsible for sending events to sinks.
+ *
+ *
Various network protocols can be supported by implementing different PipeConnector classes.
+ *
+ *
The lifecycle of a PipeConnector is as follows:
+ *
+ *
+ * - When a collaboration task is created, the KV pairs of `WITH CONNECTOR` clause in SQL are
+ * parsed and the validation method {@link PipeConnector#validate(PipeParameterValidator)}
+ * will be called to validate the parameters.
+ *
- Before the collaboration task starts, the method {@link
+ * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} will be called
+ * to config the runtime behavior of the PipeConnector and the method {@link
+ * PipeConnector#handshake()} will be called to create a connection with sink.
+ *
- While the collaboration task is in progress:
+ *
+ * - PipeExtractor captures the events and wraps them into three types of Event instances.
+ *
- PipeProcessor processes the event and then passes them to the PipeConnector.
+ *
- PipeConnector serializes the events into binaries and send them to sinks. The
+ * following 3 methods will be called: {@link
+ * PipeConnector#transfer(TabletInsertionEvent)}, {@link
+ * PipeConnector#transfer(TsFileInsertionEvent)} and {@link
+ * PipeConnector#transfer(Event)}.
+ *
+ * - When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
+ * PipeConnector#close() } method will be called.
+ *
+ *
+ * In addition, the method {@link PipeConnector#heartbeat()} will be called periodically to check
+ * whether the connection with sink is still alive. The method {@link PipeConnector#handshake()}
+ * will be called to create a new connection with the sink when the method {@link
+ * PipeConnector#heartbeat()} throws exceptions.
+ */
+public interface PipeConnector extends PipePlugin {
+
+ /**
+ * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
+ * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is called.
+ *
+ * @param validator the validator used to validate {@link PipeParameters}
+ * @throws Exception if any parameter is not valid
+ */
+ void validate(PipeParameterValidator validator) throws Exception;
+
+ /**
+ * This method is mainly used to customize PipeConnector. In this method, the user can do the
+ * following things:
+ *
+ *
+ * - Use PipeParameters to parse key-value pair attributes entered by the user.
+ *
- Set the running configurations in PipeConnectorRuntimeConfiguration.
+ *
+ *
+ * This method is called after the method {@link
+ * PipeConnector#validate(PipeParameterValidator)} is called and before the method {@link
+ * PipeConnector#handshake()} is called.
+ *
+ * @param parameters used to parse the input parameters entered by the user
+ * @param configuration used to set the required properties of the running PipeConnector
+ * @throws Exception the user can throw errors if necessary
+ */
+ void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+ throws Exception;
+
+ /**
+ * This method is used to create a connection with sink. This method will be called after the
+ * method {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is
+ * called or will be called when the method {@link PipeConnector#heartbeat()} throws exceptions.
+ *
+ * @throws Exception if the connection is failed to be created
+ */
+ void handshake() throws Exception;
+
+ /**
+ * This method will be called periodically to check whether the connection with sink is still
+ * alive.
+ *
+ * @throws Exception if the connection dies
+ */
+ void heartbeat() throws Exception;
+
+ /**
+ * This method is used to transfer the TabletInsertionEvent.
+ *
+ * @param tabletInsertionEvent TabletInsertionEvent to be transferred
+ * @throws PipeConnectionException if the connection is broken
+ * @throws Exception the user can throw errors if necessary
+ */
+ void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
+
+ /**
+ * This method is used to transfer the TsFileInsertionEvent.
+ *
+ * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
+ * @throws PipeConnectionException if the connection is broken
+ * @throws Exception the user can throw errors if necessary
+ */
+ default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ transfer(tabletInsertionEvent);
+ }
+ }
+
+ /**
+ * This method is used to transfer the Event.
+ *
+ * @param event Event to be transferred
+ * @throws PipeConnectionException if the connection is broken
+ * @throws Exception the user can throw errors if necessary
+ */
+ void transfer(Event event) throws Exception;
+}
+```
+
+## 自定义流处理插件管理
+
+为了保证用户自定义插件在实际生产中的灵活性和易用性,系统还需要提供对插件进行动态统一管理的能力。
+本章节介绍的流处理插件管理语句提供了对插件进行动态统一管理的入口。
+
+### 加载插件语句
+
+在 IoTDB 中,若要在系统中动态载入一个用户自定义插件,则首先需要基于 PipeExtractor、 PipeProcessor 或者 PipeConnector 实现一个具体的插件类,
+然后需要将插件类编译打包成 jar 可执行文件,最后使用加载插件的管理语句将插件载入 IoTDB。
+
+加载插件的管理语句的语法如图所示。
+
+```sql
+CREATE PIPEPLUGIN <别名>
+AS <全类名>
+USING
+```
+
+例如,用户实现了一个全类名为 edu.tsinghua.iotdb.pipe.ExampleProcessor 的数据处理插件,
+打包后的 jar 资源包存放到了 https://example.com:8080/iotdb/pipe-plugin.jar 上,用户希望在流处理引擎中使用这个插件,
+将插件标记为 example。那么,这个数据处理插件的创建语句如图所示。
+
+```sql
+CREATE PIPEPLUGIN example
+AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor'
+USING URI ''
+```
+
+### 删除插件语句
+
+当用户不再想使用一个插件,需要将插件从系统中卸载时,可以使用如图所示的删除插件语句。
+
+```sql
+DROP PIPEPLUGIN <别名>
+```
+
+### 查看插件语句
+
+用户也可以按需查看系统中的插件。查看插件的语句如图所示。
+
+```sql
+SHOW PIPEPLUGINS
+```
+
+## 系统预置的流处理插件
+
+### 预置 extractor 插件
+
+#### iotdb-extractor
+
+作用:抽取 IoTDB 内部的历史或实时数据进入 pipe。
+
+
+| key | value | value 取值范围 | required or optional with default |
+| ---------------------------- | ------------------------------------------------ | -------------------------------------- | --------------------------------- |
+| extractor | iotdb-extractor | String: iotdb-extractor | required |
+| extractor.pattern | 用于筛选时间序列的路径前缀 | String: 任意的时间序列前缀 | optional: root |
+| extractor.history.enable | 是否抽取历史数据 | Boolean: true, false | optional: true |
+| extractor.history.start-time | 抽取的历史数据的开始 event time,包含 start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MIN_VALUE |
+| extractor.history.end-time | 抽取的历史数据的结束 event time,包含 end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MAX_VALUE |
+| extractor.realtime.enable | 是否抽取实时数据 | Boolean: true, false | optional: true |
+
+> 🚫 **extractor.pattern 参数说明**
+>
+> * Pattern 需用反引号修饰不合法字符或者是不合法路径节点,例如如果希望筛选 root.\`a@b\` 或者 root.\`123\`,应设置 pattern 为 root.\`a@b\` 或者 root.\`123\`(具体参考 [单双引号和反引号的使用时机](https://iotdb.apache.org/zh/Download/#_1-0-版本不兼容的语法详细说明))
+> * 在底层实现中,当检测到 pattern 为 root(默认值)时,抽取效率较高,其他任意格式都将降低性能
+> * 路径前缀不需要能够构成完整的路径。例如,当创建一个包含参数为 'extractor.pattern'='root.aligned.1' 的 pipe 时:
+>
+> * root.aligned.1TS
+> * root.aligned.1TS.\`1\`
+> * root.aligned.100T
+>
+> 的数据会被抽取;
+>
+> * root.aligned.\`1\`
+> * root.aligned.\`123\`
+>
+> 的数据不会被抽取。
+
+> ❗️**extractor.history 的 start-time,end-time 参数说明**
+>
+> * start-time,end-time 应为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00
+
+> ✅ **一条数据从生产到落库 IoTDB,包含两个关键的时间概念**
+>
+> * **event time:** 数据实际生产时的时间(或者数据生产系统给数据赋予的生成时间,是数据点中的时间项),也称为事件时间。
+> * **arrival time:** 数据到达 IoTDB 系统内的时间。
+>
+> 我们常说的乱序数据,指的是数据到达时,其 **event time** 远落后于当前系统时间(或者已经落库的最大 **event time**)的数据。另一方面,不论是乱序数据还是顺序数据,只要它们是新到达系统的,那它们的 **arrival time** 都是会随着数据到达 IoTDB 的顺序递增的。
+
+> 💎 **iotdb-extractor 的工作可以拆分成两个阶段**
+>
+> 1. 历史数据抽取:所有 **arrival time** < 创建 pipe 时**当前系统时间**的数据称为历史数据
+> 2. 实时数据抽取:所有 **arrival time** >= 创建 pipe 时**当前系统时间**的数据称为实时数据
+>
+> 历史数据传输阶段和实时数据传输阶段,**两阶段串行执行,只有当历史数据传输阶段完成后,才执行实时数据传输阶段。**
+>
+> 用户可以指定 iotdb-extractor 进行:
+>
+> * 历史数据抽取(`'extractor.history.enable' = 'true'`, `'extractor.realtime.enable' = 'false'` )
+> * 实时数据抽取(`'extractor.history.enable' = 'false'`, `'extractor.realtime.enable' = 'true'` )
+> * 全量数据抽取(`'extractor.history.enable' = 'true'`, `'extractor.realtime.enable' = 'true'` )
+> * 禁止同时设置 `extractor.history.enable` 和 `extractor.realtime.enable` 为 `false`
+
+### 预置 processor 插件
+
+#### do-nothing-processor
+
+作用:不对 extractor 传入的事件做任何的处理。
+
+
+| key | value | value 取值范围 | required or optional with default |
+| --------- | -------------------- | ---------------------------- | --------------------------------- |
+| processor | do-nothing-processor | String: do-nothing-processor | required |
+
+### 预置 connector 插件
+
+#### do-nothing-connector
+
+作用:不对 processor 传入的事件做任何的处理。
+
+
+| key | value | value 取值范围 | required or optional with default |
+| --------- | -------------------- | ---------------------------- | --------------------------------- |
+| connector | do-nothing-connector | String: do-nothing-connector | required |
+
+## 流处理任务管理
+
+### 创建流处理任务
+
+使用 `CREATE PIPE` 语句来创建流处理任务。以数据同步流处理任务的创建为例,示例 SQL 语句如下:
+
+```sql
+CREATE PIPE -- PipeId 是能够唯一标定流处理任务的名字
+WITH EXTRACTOR (
+ -- 默认的 IoTDB 数据抽取插件
+ 'extractor' = 'iotdb-extractor',
+ -- 路径前缀,只有能够匹配该路径前缀的数据才会被抽取,用作后续的处理和发送
+ 'extractor.pattern' = 'root.timecho',
+ -- 是否抽取历史数据
+ 'extractor.history.enable' = 'true',
+ -- 描述被抽取的历史数据的时间范围,表示最早时间
+ 'extractor.history.start-time' = '2011.12.03T10:15:30+01:00',
+ -- 描述被抽取的历史数据的时间范围,表示最晚时间
+ 'extractor.history.end-time' = '2022.12.03T10:15:30+01:00',
+ -- 是否抽取实时数据
+ 'extractor.realtime.enable' = 'true',
+)
+WITH PROCESSOR (
+ -- 默认的数据处理插件,即不做任何处理
+ 'processor' = 'do-nothing-processor',
+)
+WITH CONNECTOR (
+ -- IoTDB 数据发送插件,目标端为 IoTDB
+ 'connector' = 'iotdb-thrift-connector',
+ -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip
+ 'connector.ip' = '127.0.0.1',
+ -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port
+ 'connector.port' = '6667',
+)
+```
+
+**创建流处理任务时需要配置 PipeId 以及三个插件部分的参数:**
+
+
+| 配置项 | 说明 | 是否必填 | 默认实现 | 默认实现说明 | 是否允许自定义实现 |
+| --------- | --------------------------------------------------- | --------------------------- | -------------------- | -------------------------------------------------------- | ------------------------- |
+| PipeId | 全局唯一标定一个流处理任务的名称 | 必填 | - | - | - |
+| extractor | Pipe Extractor 插件,负责在数据库底层抽取流处理数据 | 选填 | iotdb-extractor | 将数据库的全量历史数据和后续到达的实时数据接入流处理任务 | 否 |
+| processor | Pipe Processor 插件,负责处理数据 | 选填 | do-nothing-processor | 对传入的数据不做任何处理 | 是 |
+| connector | Pipe Connector 插件,负责发送数据 | 必填 | - | - | 是 |
+
+示例中,使用了 iotdb-extractor、do-nothing-processor 和 iotdb-thrift-connector 插件构建数据流处理任务。IoTDB 还内置了其他的流处理插件,**请查看“系统预置流处理插件”一节**。
+
+**一个最简的 CREATE PIPE 语句示例如下:**
+
+```sql
+CREATE PIPE -- PipeId 是能够唯一标定流处理任务的名字
+WITH CONNECTOR (
+ -- IoTDB 数据发送插件,目标端为 IoTDB
+ 'connector' = 'iotdb-thrift-connector',
+ -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip
+ 'connector.ip' = '127.0.0.1',
+ -- 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port
+ 'connector.port' = '6667',
+)
+```
+
+其表达的语义是:将本数据库实例中的全量历史数据和后续到达的实时数据,同步到目标为 127.0.0.1:6667 的 IoTDB 实例上。
+
+**注意:**
+
+- EXTRACTOR 和 PROCESSOR 为选填配置,若不填写配置参数,系统则会采用相应的默认实现
+- CONNECTOR 为必填配置,需要在 CREATE PIPE 语句中声明式配置
+- CONNECTOR 具备自复用能力。对于不同的流处理任务,如果他们的 CONNECTOR 具备完全相同 KV 属性的(所有属性的 key 对应的 value 都相同),**那么系统最终只会创建一个 CONNECTOR 实例**,以实现对连接资源的复用。
+
+ - 例如,有下面 pipe1, pipe2 两个流处理任务的声明:
+
+ ```sql
+ CREATE PIPE pipe1
+ WITH CONNECTOR (
+ 'connector' = 'iotdb-thrift-connector',
+ 'connector.thrift.host' = 'localhost',
+ 'connector.thrift.port' = '9999',
+ )
+
+ CREATE PIPE pipe2
+ WITH CONNECTOR (
+ 'connector' = 'iotdb-thrift-connector',
+ 'connector.thrift.port' = '9999',
+ 'connector.thrift.host' = 'localhost',
+ )
+ ```
+
+ - 因为它们对 CONNECTOR 的声明完全相同(**即使某些属性声明时的顺序不同**),所以框架会自动对它们声明的 CONNECTOR 进行复用,最终 pipe1, pipe2 的CONNECTOR 将会是同一个实例。
+- 请不要构建出包含数据循环同步的应用场景(会导致无限循环):
+
+ - IoTDB A -> IoTDB B -> IoTDB A
+ - IoTDB A -> IoTDB A
+
+### 启动流处理任务
+
+CREATE PIPE 语句成功执行后,流处理任务相关实例会被创建,但整个流处理任务的运行状态会被置为 STOPPED,即流处理任务不会立刻处理数据。
+
+可以使用 START PIPE 语句使流处理任务开始处理数据:
+
+```sql
+START PIPE
+```
+
+### 停止流处理任务
+
+使用 STOP PIPE 语句使流处理任务停止处理数据:
+
+```sql
+STOP PIPE
+```
+
+### 删除流处理任务
+
+使用 DROP PIPE 语句使流处理任务停止处理数据(当流处理任务状态为 RUNNING 时),然后删除整个流处理任务流处理任务:
+
+```sql
+DROP PIPE
+```
+
+用户在删除流处理任务前,不需要执行 STOP 操作。
+
+### 展示流处理任务
+
+使用 SHOW PIPES 语句查看所有流处理任务:
+
+```sql
+SHOW PIPES
+```
+
+查询结果如下:
+
+```sql
++-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
+| ID| CreationTime | State|PipeExtractor|PipeProcessor|PipeConnector|ExceptionMessage|
++-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
+|iotdb-kafka|2022-03-30T20:58:30.689|RUNNING| ...| ...| ...| None|
++-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
+|iotdb-iotdb|2022-03-31T12:55:28.129|STOPPED| ...| ...| ...| TException: ...|
++-----------+-----------------------+-------+-------------+-------------+-------------+----------------+
+```
+
+可以使用 `` 指定想看的某个流处理任务状态:
+
+```sql
+SHOW PIPE
+```
+
+您也可以通过 where 子句,判断某个 \ 使用的 Pipe Connector 被复用的情况。
+
+```sql
+SHOW PIPES
+WHERE CONNECTOR USED BY
+```
+
+### 流处理任务运行状态迁移
+
+一个流处理 pipe 在其被管理的生命周期中会经过多种状态:
+
+- **STOPPED:** pipe 处于停止运行状态。当管道处于该状态时,有如下几种可能:
+ - 当一个 pipe 被成功创建之后,其初始状态为暂停状态
+ - 用户手动将一个处于正常运行状态的 pipe 暂停,其状态会被动从 RUNNING 变为 STOPPED
+ - 当一个 pipe 运行过程中出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED
+- **RUNNING:** pipe 正在正常工作
+- **DROPPED:** pipe 任务被永久删除
+
+下图表明了所有状态以及状态的迁移:
+
+![状态迁移图](https://alioss.timecho.com/docs/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png)
+
+## 权限管理
+
+### 流处理任务
+
+
+| 权限名称 | 描述 |
+| -------- | -------------------------- |
+| USE_PIPE | 注册流处理任务。路径无关。 |
+| USE_PIPE | 开启流处理任务。路径无关。 |
+| USE_PIPE | 停止流处理任务。路径无关。 |
+| USE_PIPE | 卸载流处理任务。路径无关。 |
+| USE_PIPE | 查询流处理任务。路径无关。 |
+
+### 流处理任务插件
+
+
+| 权限名称 | 描述 |
+| :------- | ------------------------------ |
+| USE_PIPE | 注册流处理任务插件。路径无关。 |
+| USE_PIPE | 卸载流处理任务插件。路径无关。 |
+| USE_PIPE | 查询流处理任务插件。路径无关。 |
+
+## 配置参数
+
+在 iotdb-common.properties 中:
+
+```Properties
+####################
+### Pipe Configuration
+####################
+
+# Uncomment the following field to configure the pipe lib directory.
+# For Windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# pipe_lib_dir=ext\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# pipe_lib_dir=ext/pipe
+
+# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
+# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
+# pipe_subtask_executor_max_thread_num=5
+
+# The connection timeout (in milliseconds) for the thrift client.
+# pipe_connector_timeout_ms=900000
+```
diff --git a/src/zh/UserGuide/latest/User-Manual/Database-Programming.md b/src/zh/UserGuide/latest/User-Manual/Database-Programming.md
index 775301b1..c6d5020e 100644
--- a/src/zh/UserGuide/latest/User-Manual/Database-Programming.md
+++ b/src/zh/UserGuide/latest/User-Manual/Database-Programming.md
@@ -1497,13 +1497,13 @@ SHOW FUNCTIONS
### 用户权限管理
-用户在使用 UDF 时会涉及到 3 种权限:
+用户在使用 UDF 时会涉及到 1 种权限:`USE_UDF`
-* `USE_UDF`:具备该权限的用户才被允许执行 UDF 注册操作
-* `DROP_FUNCTION`:具备该权限的用户才被允许执行 UDF 卸载操作
-* `READ_TIMESERIES`:具备该权限的用户才被允许使用 UDF 进行查询
+* 具备该权限的用户才被允许执行 UDF 注册操作
+* 具备该权限的用户才被允许执行 UDF 卸载操作
+* 具备该权限的用户才被允许使用 UDF 进行查询
-更多用户权限相关的内容,请参考 [权限管理语句](./Security-Management_timecho.md##权限管理)。
+更多用户权限相关的内容,请参考 [权限管理语句](./Authority-Management.md##权限管理)。
### 配置项
diff --git a/src/zh/UserGuide/latest/User-Manual/Operate-Metadata.md b/src/zh/UserGuide/latest/User-Manual/Operate-Metadata.md
index 6444f469..2ff1c454 100644
--- a/src/zh/UserGuide/latest/User-Manual/Operate-Metadata.md
+++ b/src/zh/UserGuide/latest/User-Manual/Operate-Metadata.md
@@ -340,7 +340,7 @@ IoTDB> create device template t2 aligned (lat FLOAT encoding=Gorilla, lon FLOAT
挂载元数据模板的 SQL 语句如下所示:
```shell
-IoTDB> set schema template t1 to root.sg1.d1
+IoTDB> set device template t1 to root.sg1.d1
```
### 激活设备模板
@@ -1078,6 +1078,8 @@ IoTDB> show devices root.ln.**
IoTDB> show devices root.ln.** where device contains 't'
IoTDB> show devices root.ln.** where template = 't1'
IoTDB> show devices root.ln.** where template is null
+IoTDB> show devices root.ln.** where template != 't1'
+IoTDB> show devices root.ln.** where template is not null
```
你可以获得如下数据:
diff --git a/src/zh/UserGuide/latest/User-Manual/Streaming.md b/src/zh/UserGuide/latest/User-Manual/Streaming.md
index 0f25baca..bd23ab71 100644
--- a/src/zh/UserGuide/latest/User-Manual/Streaming.md
+++ b/src/zh/UserGuide/latest/User-Manual/Streaming.md
@@ -7,9 +7,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -738,22 +738,22 @@ WHERE CONNECTOR USED BY
### 流处理任务
-| 权限名称 | 描述 |
-| ----------- | -------------------------- |
-| CREATE_PIPE | 注册流处理任务。路径无关。 |
-| START_PIPE | 开启流处理任务。路径无关。 |
-| STOP_PIPE | 停止流处理任务。路径无关。 |
-| DROP_PIPE | 卸载流处理任务。路径无关。 |
-| SHOW_PIPES | 查询流处理任务。路径无关。 |
+| 权限名称 | 描述 |
+| -------- | -------------------------- |
+| USE_PIPE | 注册流处理任务。路径无关。 |
+| USE_PIPE | 开启流处理任务。路径无关。 |
+| USE_PIPE | 停止流处理任务。路径无关。 |
+| USE_PIPE | 卸载流处理任务。路径无关。 |
+| USE_PIPE | 查询流处理任务。路径无关。 |
### 流处理任务插件
-| 权限名称 | 描述 |
-| ----------------- | ------------------------------ |
-| CREATE_PIPEPLUGIN | 注册流处理任务插件。路径无关。 |
-| DROP_PIPEPLUGIN | 卸载流处理任务插件。路径无关。 |
-| SHOW_PIPEPLUGINS | 查询流处理任务插件。路径无关。 |
+| 权限名称 | 描述 |
+| :------- | ------------------------------ |
+| USE_PIPE | 注册流处理任务插件。路径无关。 |
+| USE_PIPE | 卸载流处理任务插件。路径无关。 |
+| USE_PIPE | 查询流处理任务插件。路径无关。 |
## 配置参数