Skip to content

Commit

Permalink
6
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghui42 committed Nov 9, 2023
1 parent 5ea116d commit e30129c
Showing 1 changed file with 39 additions and 39 deletions.
78 changes: 39 additions & 39 deletions src/zh/UserGuide/V1.2.x/User-Manual/Data-Sync_timecho.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ WITH CONNECTOR (
[<parameter> = <value>,],
)
```
> 📌 注:使用数据同步功能,请保证接收端开启自动创建元数据


Expand Down Expand Up @@ -116,19 +117,19 @@ SHOW PIPE <PipeId>
| 模块 | 插件 | 预置插件 | 自定义插件 |
| --- | --- | --- | --- |
| 抽取(Extract) | Extractor 插件 | iotdb-extractor | 不支持 |
| 发送(Connect) | Connector 插件 | iotdb-thrift-sync-connector iotdb-thrift-async-connector iotdb-legacy-pipe-connector iotdb-air-gap-connector websocket - connector | 支持 |
| 发送(Connect) | Connector 插件 | iotdb-thrift-connectoriotdb-air-gap-connector| 支持 |

#### 预置插件

预置插件如下(部分插件为系统内部插件,将在1.3.0版本中删除)
预置插件如下:

| 插件名称 | 类型 | 介绍 | 适用版本 |
| ---------------------------- | ---- | ------------------------------------------------------------ | --------- |
| iotdb-extractor | extractor 插件 | 默认的extractor插件,用于抽取 IoTDB 历史或实时数据 | 1.2.x |
| iotdb-thrift-connector | connector 插件 | 用于 IoTDB(v1.2.0及以上)与 IoTDB(v1.2.0及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景 | 1.2.x |
| iotdb-air-gap-connector | connector 插件 | 用于 IoTDB(v1.2.2+)向 IoTDB(v1.2.2+)跨单向数据网闸的数据同步。支持的网闸型号包括南瑞 Syskeeper 2000 等 | 1.2.1以上 |

每个插件的详细参数参考[参数说明](#connector-参数)
每个插件的详细参数可参考本文[参数说明](#connector-参数)章节

#### 查看插件

Expand All @@ -138,7 +139,7 @@ SHOW PIPE <PipeId>
SHOW PIPEPLUGINS
```

返回结果如下(1.2.2 版本):
返回结果如下(其中部分插件为系统内部插件,将在1.3.0版本中删除):

```Go
IoTDB> SHOW PIPEPLUGINS
Expand All @@ -160,11 +161,11 @@ IoTDB> SHOW PIPEPLUGINS

### 全量数据同步

同步两个 IoTDB 之间的所有数据,例如下面场景,创建一个名为 A2B, 功能为同步 A IoTDB 到 B IoTDB 间的全量数据,数据链路如下图所示:
本例子用来演示将一个 IoTDB 的所有数据同步至另一个IoTDB,数据链路如下图所示:

![](https://alioss.timecho.com/docs/img/w1.png)

可使用简化的创建任务语句
在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 connector 的 iotdb-thrift-connector 插件(内置插件),需指定接收端地址,这个例子中指定了'connector.ip'和'connector.port',也可指定'connector.node-urls',如下面的示例语句

```Go
create pipe A2B
Expand All @@ -174,24 +175,15 @@ with connector (
'connector.port'='6668'
)
```
在这个例子中,connector 任务用到的是 iotdb-thrift-connector 插件,需指定接收端地址,这个例子中指定了'connector.ip'和'connector.port',也可指定'connector.node-urls',如下面的例子。

> 📌 注:使用数据同步功能,请保证接收端开启自动创建元数据

### 部分数据同步

### 历史数据同步

> ❗️ **Pipe 中的数据含义**
>
> * 历史数据:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
> * 实时数据:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
> * 全量数据: 全量数据 = 历史数据 + 实时数据
同步某个时间范围的数据,例如下面场景,创建一个名为 A2B, 功能为同步 A IoTDB 到 B IoTDB 间2023年8月23日8点到2023年10月23日8点的数据,数据链路如下图所示。
本例子用来演示同步某个历史时间范围(2023年8月23日8点到2023年10月23日8点)的数据至另一个IoTDB,数据链路如下图所示:

![](https://alioss.timecho.com/docs/img/w2.png)

此时,我们需要使用 extractor 来定义传输数据的范围。由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),所以需要将extractor.realtime.enable参数配置为false,即不同步实时数据(实时数据是指同步任务创建之后存在的数据),同时将 extractor.realtime.mode设置为 hybrid,表示使用 hybrid模式传输数据
在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 extractor 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),所以需要将extractor.realtime.enable参数配置为false;同时需要配置数据的起止时间start-time和end-time以及传输的模式mode,此处推荐mode设置为 hybrid 模式(hybrid模式为混合传输,在无数据积压时采用实时传输方式,有数据积压时采用批量传输方式,并根据系统内部情况自动切换)

详细语句如下:

Expand All @@ -209,23 +201,18 @@ with connector (
'connector.batch.enable'='false')
```

> 💎 ​**extractor.realtime.mode:数据抽取的模式**
> - **​log**:该模式下,任务仅使用操作日志进行数据处理、发送
> - **file**:该模式下,任务仅使用数据文件进行数据处理、发送
> - **hybrid**:该模式,考虑了按操作日志逐条目发送数据时延迟低但吞吐低的特点,以及按数据文件批量 发送时发送吞吐高但延迟高的特点,能够在不同的写入负载下自动切换适合的>数据抽取方式,首先采取基于操作日志的数据抽取方式以保证低发送延迟,当产生数据积压时自动切换成基于数据文件的数据抽取方式以保证高发送吞吐,积压消除时自动切换回基于操作日志的数据抽取方式,避免了采用单一数据抽取算法难以平衡数据发送延迟或吞吐的问题。

### 双向数据传输

要实现两个 IoTDB 之间互相备份,实时同步的功能,如下图所示
本例子用来演示两个 IoTDB 之间互为双活的场景,数据链路如下图所示

![](https://alioss.timecho.com/docs/img/w3.png)

在这个场景中,需要将参数`extractor.forwarding-pipe-requests` 设置为 `false`,表示不转发从另一 pipe 传输而来的数据,A 和 B 上的的 pipe 都需要将该参数设置为 false,否则将会造成数据无休止的集群间循环转发。

`'extractor.history.enable' = 'false'`表示不传输历史数据,即不同步创建该任务前的数据。
在这个例子中,为了避免数据无限循环,需要将A和B上的参数`extractor.forwarding-pipe-requests` 均设置为 `false`,表示不转发从另一pipe传输而来的数据。同时将`'extractor.history.enable'` 设置为 `false`,表示不传输历史数据,即不同步创建该任务前的数据。

可创建两个子任务, 功能为双向同步 A IoTDB 到 B IoTDB 间的实时数据,在 A IoTDB 上执行下列语句:
详细语句如下:

在 A IoTDB 上执行下列语句:

```Go
create pipe AB
Expand Down Expand Up @@ -256,24 +243,25 @@ with connector (

### 级联数据传输

要实现 A IoTDB 到 B IoTDB 到 C IoTDB 之间的级联数据传输链路,如下图所示:

本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由A集群同步至B集群,再同步至C集群,数据链路如下图所示:

![](https://alioss.timecho.com/docs/img/w4.png)

创建一个名为 AB 的pipe,在 A IoTDB 上执行下列语句:
在这个例子中,为了将A集群的数据同步至C,在BC之间的pipe需要将 `extractor.forwarding-pipe-requests` 配置为`true`,详细语句如下:

在A IoTDB上执行下列语句,将A中数据同步至B:

```Go
create pipe AB
with extractor (
'extractor.forwarding-pipe-requests',
with connector (
'connector'='iotdb-thrift-connector',
'connector.ip'='127.0.0.1',
'connector.port'='6668'
)
```

创建一个名为 BC 的pipe,在 B IoTDB 上执行下列语句
在B IoTDB上执行下列语句,将B中数据同步至C

```Go
create pipe BC
Expand All @@ -288,12 +276,11 @@ with connector (

### 跨网闸数据传输

创建一个名为 A2B 的pipe,实现内网服务器上的 A,经由单向网闸,传输数据到外网服务器上的B,如下图所示
本例子用来演示将一个 IoTDB 的数据,经过单向网闸,同步至另一个IoTDB的场景,数据链路如下图所示

![](https://alioss.timecho.com/docs/img/w5.png)


数据穿透网闸需要使用 connector 任务中的iotdb-air-gap-connector 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句:
在这个例子中,需要使用 connector 任务中的iotdb-air-gap-connector 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句,其中ip和port填写网闸信息,详细语句如下:

```Go
create pipe A2B
Expand Down Expand Up @@ -361,6 +348,19 @@ with connector (
| extractor.realtime.mode | 实时数据的抽取模式 | String: hybrid, log, file | 选填 | hybrid |
| extractor.forwarding-pipe-requests | 是否转发由其他 Pipe (通常是数据同步)写入的数据 | Boolean: true, false | 选填 | true |

> 💎 **说明:历史数据与实时数据的差异**
>
> * **历史数据**:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
> * **实时数据**:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
> * **全量数据**: 全量数据 = 历史数据 + 实时数据

> 💎 ​**说明:数据抽取模式hybrid, log和file的差异**
>
> - **hybrid(推荐)**:该模式下,任务将优先对数据进行实时处理、发送,当数据产生积压时自动切换至批量发送模式,其特点是平衡了数据同步的时效性和吞吐量
> - **​log**:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐
> - **file**:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐

### connector 参数

Expand All @@ -369,9 +369,9 @@ with connector (
| key | value | value 取值范围 | 是否必填 | 默认取值 |
| --------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -------- | ------------------------------------------- |
| connector | iotdb-thrift-connector 或 iotdb-thrift-sync-connector | String: iotdb-thrift-connector 或 iotdb-thrift-sync-connector | 必填 | |
| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | 选填 | 与 connector.node-urls 任选其一填写 |
| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | 选填 | 与 connector.node-urls 任选其一填写 |
| connector.node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url | String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 选填 | 与 connector.ip:connector.port 任选其一填写 |
| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip(请注意同步任务不支持向自身服务进行转发) | String | 选填 | 与 connector.node-urls 任选其一填写 |
| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port(请注意同步任务不支持向自身服务进行转发) | Integer | 选填 | 与 connector.node-urls 任选其一填写 |
| connector.node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 选填 | 与 connector.ip:connector.port 任选其一填写 |
| connector.batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true |
| connector.batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
| connector.batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填
Expand Down

0 comments on commit e30129c

Please sign in to comment.