Skip to content

Commit

Permalink
5
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghui42 committed Nov 8, 2023
1 parent f29454b commit 5ea116d
Showing 1 changed file with 36 additions and 63 deletions.
99 changes: 36 additions & 63 deletions src/zh/UserGuide/V1.2.x/User-Manual/Data-Sync_timecho.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ STOP PIPE <PipeId>
```Go
DROP PIPE <PipeId>
```

删除任务不需要先停止同步任务。
#### 查看任务

查看全部任务:
Expand All @@ -111,29 +111,24 @@ SHOW PIPE <PipeId>

### 插件

为了使得整体架构更加灵活以匹配不同的同步场景需求,在上述同步任务框架中IoTDB支持进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件和 connector 插件,并加载至IoTDB系统进行使用。
为了使得整体架构更加灵活以匹配不同的同步场景需求,在上述同步任务框架中IoTDB支持进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 connector 插件,并加载至IoTDB系统进行使用。

| 模块 | 插件 | 预置插件 | 自定义插件 |
| --- | --- | --- | --- |
| 抽取(Extract) | Extractor 插件 | iotdb-extractor | 不支持 |
| 处理(Process) | Processor 插件 | do-nothing-processor | 支持 |
| 发送(Connect) | Connector 插件 | iotdb-thrift-sync-connector iotdb-thrift-async-connector iotdb-legacy-pipe-connector iotdb-air-gap-connector websocket - connector | 支持 |

#### 预置插件

预置插件如下:
预置插件如下(部分插件为系统内部插件,将在1.3.0版本中删除)

| 插件名称 | 类型 | 介绍 | 适用版本 |
| ---------------------------- | ---- | ------------------------------------------------------------ | --------- |
| iotdb-extractor | extractor 插件 | 默认的extractor插件,用于抽取 IoTDB 历史或实时数据 | 1.2.x |
| do-nothing-processor | processor 插件 | 默认的processor插件,不对传入的数据做任何的处理 | 1.2.x |
| iotdb-thrift-connector | connector 插件 | 用于 IoTDB(v1.2.0及以上)与 IoTDB(v1.2.0及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景 | 1.2.x |
| iotdb-air-gap-connector | connector 插件 | 用于 IoTDB(v1.2.2+)向 IoTDB(v1.2.2+)跨单向数据网闸的数据同步。支持的网闸型号包括南瑞 Syskeeper 2000 等 | 1.2.1以上 |

每个插件的属性参考[参数说明](#connector-参数)
#### 自定义插件

自定义插件方法参考[自定义流处理插件开发](Streaming_timecho.md#自定义流处理插件开发)一章。
每个插件的详细参数参考[参数说明](#connector-参数)

#### 查看插件

Expand Down Expand Up @@ -165,10 +160,11 @@ IoTDB> SHOW PIPEPLUGINS

### 全量数据同步

创建一个名为 A2B, 功能为同步 A IoTDB 到 B IoTDB 间的全量数据,数据链路如下图所示:
同步两个 IoTDB 之间的所有数据,例如下面场景,创建一个名为 A2B, 功能为同步 A IoTDB 到 B IoTDB 间的全量数据,数据链路如下图所示:

![](https://alioss.timecho.com/docs/img/w1.png)
可使用如下语句:

可使用简化的创建任务语句:

```Go
create pipe A2B
Expand All @@ -178,45 +174,58 @@ with connector (
'connector.port'='6668'
)
```
> 💎 ​**extractor.realtime.mode:数据抽取的模式**
> - **​log**:该模式下,任务仅使用操作日志进行数据处理、发送
> - **file**:该模式下,任务仅使用数据文件进行数据处理、发送
> - **hybrid**:该模式,考虑了按操作日志逐条目发送数据时延迟低但吞吐低的特点,以及按数据文件批量 发送时发送吞吐高但延迟高的特点,能够在不同的写入负载下自动切换适合的>数据抽取方式,首先采取基于操作日志的数据抽取方式以保证低发送延迟,当产生数据积压时自动切换成基于数据文件的数据抽取方式以保证高发送吞吐,积压消除时自动切换回基于操作日志的数据抽取方式,避免了采用单一数据抽取算法难以平衡数据发送延迟或吞吐的问题。
在这个例子中,connector 任务用到的是 iotdb-thrift-connector 插件,需指定接收端地址,这个例子中指定了'connector.ip'和'connector.port',也可指定'connector.node-urls',如下面的例子。

> 📌 注:使用数据同步功能,请保证接收端开启自动创建元数据
### 部分数据同步

创建一个名为 实时数据, 功能为同步 A IoTDB 到 B IoTDB 间的2023年8月23日8点到2023年10月23日8点的数据,数据链路如下图所示。

> ❗️ **Pipe 中的数据含义**
>
> * 历史数据:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
> * 实时数据:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
> * 全量数据: 全量数据 = 历史数据 + 实时数据
同步某个时间范围的数据,例如下面场景,创建一个名为 A2B, 功能为同步 A IoTDB 到 B IoTDB 间2023年8月23日8点到2023年10月23日8点的数据,数据链路如下图所示。

![](https://alioss.timecho.com/docs/img/w2.png)

可使用如下语句:
此时,我们需要使用 extractor 来定义传输数据的范围。由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),所以需要将extractor.realtime.enable参数配置为false,即不同步实时数据(实时数据是指同步任务创建之后存在的数据),同时将 extractor.realtime.mode设置为 hybrid,表示使用 hybrid模式传输数据。

```Go
详细语句如下:

```SQL
create pipe A2B
WITH EXTRACTOR (
'extractor'= 'iotdb-extractor',
'extractor.realtime.enable' = 'false',
'extractor.realtime.mode'='file',
'extractor.realtime.mode'='hybrid',
'extractor.history.start-time' = '2023.08.23T08:00:00+00:00',
'extractor.history.end-time' = '2023.10.23T08:00:00+00:00')
with connector (
'connector'='iotdb-thrift-async-connector',
'connector.node-urls'='xxxx:6668',
'connector.batch.enable'='false')
```
>
>
> - `'extractor.realtime.mode'='file'`表示实时数据的抽取模式为 file 模式,该模式下,任务仅使用数据文件进行数据处理、发送。
> - `'extractor.realtime.enable' = 'false'`, 表示不同步实时数据,即创建该任务后到达的数据都不传输。
> - `start-time,end-time` 应为 ISO 格式。

> 💎 ​**extractor.realtime.mode:数据抽取的模式**
> - **​log**:该模式下,任务仅使用操作日志进行数据处理、发送
> - **file**:该模式下,任务仅使用数据文件进行数据处理、发送
> - **hybrid**:该模式,考虑了按操作日志逐条目发送数据时延迟低但吞吐低的特点,以及按数据文件批量 发送时发送吞吐高但延迟高的特点,能够在不同的写入负载下自动切换适合的>数据抽取方式,首先采取基于操作日志的数据抽取方式以保证低发送延迟,当产生数据积压时自动切换成基于数据文件的数据抽取方式以保证高发送吞吐,积压消除时自动切换回基于操作日志的数据抽取方式,避免了采用单一数据抽取算法难以平衡数据发送延迟或吞吐的问题。

### 双向数据传输

要实现两个 IoTDB 之间互相备份,实时同步的功能,如下图所示:

![](https://alioss.timecho.com/docs/img/w3.png)

可创建两个子任务, 功能为双向同步 A IoTDB 到 B IoTDB 间的实时数据,在 A IoTDB 上执行下列语句:
在这个场景中,需要将参数`extractor.forwarding-pipe-requests` 设置为 `false`,表示不转发从另一 pipe 传输而来的数据,A 和 B 上的的 pipe 都需要将该参数设置为 false,否则将会造成数据无休止的集群间循环转发。

`'extractor.history.enable' = 'false'`表示不传输历史数据,即不同步创建该任务前的数据。

可创建两个子任务, 功能为双向同步 A IoTDB 到 B IoTDB 间的实时数据,在 A IoTDB 上执行下列语句:

```Go
create pipe AB
Expand Down Expand Up @@ -244,12 +253,6 @@ with connector (
)
```

> 💎
>
> - `'extractor.history.enable' = 'false'`表示不传输历史数据,即不同步创建该任务前的数据。
>
> - `'extractor.forwarding-pipe-requests' = 'false'`表示不转发从另一 pipe 传输而来的数据,A 和 B 上的的 pipe 都需要将该参数设置为 false,否则将会造成数据无休止的集群间循环转发。

### 级联数据传输

Expand All @@ -275,7 +278,7 @@ with connector (
```Go
create pipe BC
with extractor (
'extractor.forwarding-pipe-requests' = 'false',
'extractor.forwarding-pipe-requests' = 'true',
with connector (
'connector'='iotdb-thrift-connector',
'connector.ip'='127.0.0.1',
Expand All @@ -290,7 +293,7 @@ with connector (
![](https://alioss.timecho.com/docs/img/w5.png)


配置网闸后,在 A IoTDB 上执行下列语句:
数据穿透网闸需要使用 connector 任务中的iotdb-air-gap-connector 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句:

```Go
create pipe A2B
Expand All @@ -303,14 +306,6 @@ with connector (

## 参考:注意事项

> 📌 使用数据同步功能,请保证接收端开启自动创建元数据
> ❗️ **Pipe 中的数据含义**
>
> * 历史数据:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
> * 实时数据:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
> * 全量数据: 全量数据 = 历史数据 + 实时数据
可通过修改 IoTDB 配置文件(iotdb-common.properties)以调整数据同步的参数,如同步数据存储目录等。完整配置如下:

```Go
Expand Down Expand Up @@ -381,29 +376,7 @@ with connector (
| connector.batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
| connector.batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填

#### iotdb-thrift-async-connector

| key | value | value 取值范围 | 是否必填 | 默认取值 |
| --------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -------- | ------------------------------------------- |
| connector | iotdb-thrift-async-connector | String: iotdb-thrift-async-connector | 必填 | |
| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | 选填 | 与 connector.node-urls 任选其一填写 |
| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | 选填 | 与 connector.node-urls 任选其一填写 |
| connector.node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url | String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 选填 | 与 connector.ip:connector.port 任选其一填写 |
| connector.batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true |
| connector.batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
| connector.batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 16 * 1024 * 1024 (16MiB) |


#### iotdb-legacy-pipe-connector

| key | value | value 取值范围 | 是否必填 | 默认取值 |
| ------------------ | ------------------------------------------------------------ | ----------------------------------- | -------- | -------- |
| connector | iotdb-legacy-pipe-connector | String: iotdb-legacy-pipe-connector | 必填 | - |
| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | 选填 | - |
| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | 选填 | - |
| connector.user | 目标端 IoTDB 的用户名,注意该用户需要支持数据写入、TsFile Load 的权限 | String | 选填 | root |
| connector.password | 目标端 IoTDB 的密码,注意该用户需要支持数据写入、TsFile Load 的权限 | String | 选填 | root |
| connector.version | 目标端 IoTDB 的版本,用于伪装自身实际版本,绕过目标端的版本一致性检查 | String | 选填 | 1.1 |

#### iotdb-air-gap-connector

Expand Down

0 comments on commit 5ea116d

Please sign in to comment.