Skip to content

Commit

Permalink
fix: local load/write validate and refactor (#3666)
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken authored Dec 21, 2023
1 parent 1448df8 commit ebe1f7d
Show file tree
Hide file tree
Showing 9 changed files with 515 additions and 349 deletions.
8 changes: 5 additions & 3 deletions docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
`LOAD DATA INFILE`语句能高效地将文件中的数据读取到数据库中的表中。`LOAD DATA INFILE``SELECT INTO OUTFILE`互补。要将数据从 table导出到文件,请使用[SELECT INTO OUTFILE](../dql/SELECT_INTO_STATEMENT.md)。要将文件数据导入到 table 中,请使用`LOAD DATA INFILE`

```{note}
INFILE 的 filePath,既可以是单个文件名,也可以是目录,也可以使用`*`通配符。如果目录中存在多格式的文件,只会选择 LoadDataInfileOptionsList 中指定的FORMAT格式文件。具体格式等价于`DataFrameReader.read.load(String)`,可以使用spark shell来read你想要的文件路径,确认能否读入成功。
无论何种load_mode,INFILE 的 filePath既可以是单个文件名,也可以是目录,也可以使用`*`通配符。
- load_mode=cluster的具体格式等价于`DataFrameReader.read.load(String)`,可以使用spark shell来read你想要的文件路径,确认能否读入成功。如果目录中存在多格式的文件,只会选择 LoadDataInfileOptionsList 中指定的FORMAT格式文件。
- load_mode=local则使用glob选择出符合的所有文件,不会检查单个文件的格式,所以,请保证满足条件的是csv格式,建议使用`*.csv`限制文件格式。
```

## Syntax
Expand Down Expand Up @@ -50,8 +52,8 @@ FilePathPattern
| header | Boolean | true | 是否包含表头, 默认为`true`|
| null_value | String | null | NULL值,默认填充`"null"`。加载时,遇到null_value的字符串将被转换为`"null"`,插入表中。 |
| format | String | csv | 导入文件的格式:<br />`csv`: 不显示指明format时,默认为该值。<br />`parquet`: 集群版还支持导入parquet格式文件,单机版不支持。 |
| quote | String | " | 输入数据的包围字符串。字符串长度<=1。<br />load_mode=`cluster`默认为双引号`"`。配置包围字符后,被包围字符包围的内容将作为一个整体解析。例如,当配置包围字符串为"#"时, `1, 1.0, #This is a string field, even there is a comma#, normal_string`将为解析为三个filed,第一个是整数1,第二个是浮点1.0,第三个是一个字符串,第四个虽然没有quote,但也是一个字符串。<br /> **local_mode=`local`默认为`\0`,不处理包围字符。** |
| mode | String | "error_if_exists" | 导入模式:<br />`error_if_exists`: 仅离线模式可用,若离线表已有数据则报错。<br />`overwrite`: 仅离线模式可用,数据将覆盖离线表数据。<br />`append`:离线在线均可用,若文件已存在,数据将追加到原文件后面。 |
| quote | String | " | 输入数据的包围字符串。字符串长度<=1。<br />load_mode=`cluster`默认为双引号`"`。配置包围字符后,被包围字符包围的内容将作为一个整体解析。例如,当配置包围字符串为"#"时, `1, 1.0, #This is a string field, even there is a comma#, normal_string`将为解析为三个filed,第一个是整数1,第二个是浮点1.0,第三个是一个字符串,第四个虽然没有quote,但也是一个字符串。<br /> **local_mode=`local`默认为`\0`也可使用空字符串赋值,不处理包围字符。** |
| mode | String | "error_if_exists" | 导入模式:<br />`error_if_exists`: 仅离线模式可用,若离线表已有数据则报错。<br />`overwrite`: 仅离线模式可用,数据将覆盖离线表数据。<br />`append`:离线在线均可用,若文件已存在,数据将追加到原文件后面。<br /> **local_mode=`local`默认为`append`** |
| deep_copy | Boolean | true | `deep_copy=false`仅支持离线load, 可以指定`INFILE` Path为该表的离线存储地址,从而不需要硬拷贝。 |
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 |
| thread | Integer | 1 | 仅在本地文件导入时生效,即`load_mode='local'`或者单机版,表示本地插入数据的线程数。 最大值为`50`|
Expand Down
6 changes: 4 additions & 2 deletions docs/zh/openmldb_sql/dql/SELECT_INTO_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ options_list:
| null_value | String | null | NULL填充值,默认填充`"null"` |
| format | String | csv | 输出文件格式:<br />`csv`:不显示指明format时,默认为该值<br />`parquet`:集群版离线模式支持导出parquet格式文件,但集群在线和单机版不支持 |
| mode | String | error_if_exists | 输出模式:<br />`error_if_exists`: 表示若文件已经在则报错。<br />`overwrite`: 表示若文件已存在,数据将覆盖原文件内容。<br />`append`:表示若文件已存在,数据将追加到原文件后面。<br />不显示配置时,默认为`error_if_exists`|
| quote | String | "" | 输出数据的包围字符串,字符串长度<=1。默认为"",表示输出数据包围字符串为空。当配置包围字符串时,将使用包围字符串包围一个field。例如,我们配置包围字符串为`"#"`,原始数据为{1, 1.0, This is a string, with comma}。输出的文本为`1, 1.0, #This is a string, with comma#`|
| quote | String | " | 输出数据的包围字符串,字符串长度<=1。默认为双引号`"`,表示输出数据包围字符串为空。当配置包围字符串时,将使用包围字符串包围一个field。例如,我们配置包围字符串为`"#"`,原始数据为{1, 1.0, This is a string, with comma}。输出的文本为`1, 1.0, #This is a string, with comma#`<br /> **单机模式或集群在线模式默认为`\0`,也可使用空字符串赋值,不处理包围字符。** |
| coalesce | Int | 0 | 仅集群版离线模式支持,默认值为0,不进行合并(可能输出多个文件),可指定最终输出几个文件。例如,coalesce=1,会将所有part合并为1个文件。 |


Expand All @@ -44,7 +44,9 @@ SELECT ... INTO OUTFILE 'file_path' OPTIONS (key = value, ...)

## FilePath

FilePath支持'file://', 'hdfs://', 'hive://'三种。其中'file://'和'hdfs://'地址为目录,而非文件名,'hive://'导出到Hive表中,格式为`hive://<db>.<table>`
load_mode='cluster'时,FilePath支持'file://', 'hdfs://', 'hive://'三种。其中'file://'和'hdfs://'地址为目录,而非文件名,'hive://'导出到Hive表中,格式为`hive://<db>.<table>`

单机版或load_mode='local'时,FilePath只能是file格式,且必须为文件名,不可以是目录。

## Hive 支持

Expand Down
22 changes: 11 additions & 11 deletions src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ TEST_F(SqlCmdTest, SelectIntoOutfile) {
router->ExecuteSQL(select_into_sql, &status);
ASSERT_FALSE(status.IsOK());

// False - Option un-supported
// True - Option un-supported will be ignored
select_into_sql =
"select * from " + name + " into outfile '" + file_path + "' options (mode = 'overwrite', test = 'null')";
router->ExecuteSQL(select_into_sql, &status);
ASSERT_FALSE(status.IsOK());
ASSERT_TRUE(status.IsOK());

// False - Type un-supproted
select_into_sql = "select * from " + name + " into outfile '" + file_path + "' options (mode = 1)";
Expand Down Expand Up @@ -571,10 +571,10 @@ TEST_P(DBSDKTest, LoadDataMultipleFiles) {
"' INTO TABLE trans options(load_mode='local', thread=10);";
hybridse::sdk::Status status;
sr->ExecuteSQL(load_sql, &status);
ASSERT_TRUE(status.IsOK()) << status.msg;
ASSERT_TRUE(status.IsOK()) << status.ToString();
ASSERT_EQ(status.msg, "Load 20 rows");
auto result = sr->ExecuteSQL("select * from trans;", &status);
ASSERT_TRUE(status.IsOK()) << status.msg;
ASSERT_TRUE(status.IsOK()) << status.ToString();
ASSERT_EQ(20, result->Size());
while (result->Next()) {
std::string col1 = result->GetStringUnsafe(0);
Expand Down Expand Up @@ -608,10 +608,10 @@ TEST_P(DBSDKTest, LoadData) {
"' INTO TABLE trans options(deep_copy=true, mode='append', load_mode='local', thread=60);";
hybridse::sdk::Status status;
sr->ExecuteSQL(load_sql, &status);
ASSERT_TRUE(status.IsOK()) << status.msg;
ASSERT_TRUE(status.IsOK()) << status.ToString();
ASSERT_EQ(status.msg, "Load 10 rows");
auto result = sr->ExecuteSQL("select * from trans;", &status);
ASSERT_TRUE(status.IsOK()) << status.msg;
ASSERT_TRUE(status.IsOK()) << status.ToString();
ASSERT_EQ(10, result->Size());
HandleSQL("drop table trans;");
HandleSQL("drop database test1;");
Expand Down Expand Up @@ -647,18 +647,18 @@ TEST_P(DBSDKTest, LoadDataError) {
"LOAD DATA INFILE 'not_exist.csv' INTO TABLE trans options(mode='overwrite', load_mode='local', thread=60);";
sr->ExecuteSQL(load_sql, &status);
ASSERT_FALSE(status.IsOK()) << status.msg;
ASSERT_EQ(status.msg, "online data load only supports 'append' mode");
ASSERT_EQ(status.msg, "INVALID_ARGUMENT: local load mode must be append\n");

load_sql =
"LOAD DATA INFILE 'not_exist.csv' INTO TABLE trans options(format='parquet', load_mode='local', thread=60);";
sr->ExecuteSQL(load_sql, &status);
ASSERT_FALSE(status.IsOK()) << status.msg;
ASSERT_EQ(status.msg, "local data load only supports 'csv' format");
ASSERT_EQ(status.msg, "INVALID_ARGUMENT: local load format must be csv\n");

load_sql = "LOAD DATA INFILE 'not_exist.csv' INTO TABLE trans options(load_mode='local', thread=0);";
sr->ExecuteSQL(load_sql, &status);
ASSERT_FALSE(status.IsOK()) << status.msg;
ASSERT_EQ(status.msg, "ERROR: parse option thread failed");
ASSERT_EQ(status.msg, "INVALID_ARGUMENT: thread must be positive\n");

HandleSQL("SET @@execute_mode='offline';");
load_sql =
Expand All @@ -671,7 +671,7 @@ TEST_P(DBSDKTest, LoadDataError) {
load_sql = "LOAD DATA INFILE 'not_exist.csv' INTO TABLE trans options(format='parquet', load_mode='cluster');";
sr->ExecuteSQL(load_sql, &status);
ASSERT_FALSE(status.IsOK()) << status.msg;
ASSERT_TRUE(status.msg.find("Fail to get TaskManager client") != std::string::npos);
ASSERT_TRUE(status.msg.find("Fail to get TaskManager client") != std::string::npos) << status.msg;
} else {
ASSERT_TRUE(status.IsOK()) << status.msg;
}
Expand Down Expand Up @@ -724,7 +724,7 @@ TEST_P(DBSDKTest, LoadDataMultipleThread) {
std::string load_sql = absl::StrCat("LOAD DATA INFILE '", (tmp_path / "myfile*").string(),
"' INTO TABLE trans options(load_mode='local', thread=", num_thread, ");");
sr->ExecuteSQL(load_sql, &status);
ASSERT_TRUE(status.IsOK()) << status.msg;
ASSERT_TRUE(status.IsOK()) << status.ToString();
ASSERT_EQ(status.msg, absl::StrCat("Load ", file_num * rows_per_file, " rows"));
}
auto result = sr->ExecuteSQL("select * from trans;", &status);
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ if(TESTING_ENABLE)

add_executable(split_test split_test.cc)
target_link_libraries(split_test ${BIN_LIBS})

add_executable(options_map_parser_test options_map_parser_test.cc)
target_link_libraries(options_map_parser_test ${BIN_LIBS})
endif()

set(SDK_LIBS openmldb_sdk openmldb_catalog client zk_client schema openmldb_flags openmldb_codec openmldb_proto base hybridse_sdk zookeeper_mt)
Expand Down
Loading

0 comments on commit ebe1f7d

Please sign in to comment.