From b6f7f2ffca65deb7227f3a3bfb3047844fa32aa4 Mon Sep 17 00:00:00 2001 From: HuangWei Date: Thu, 8 Feb 2024 13:20:37 +0800 Subject: [PATCH] feat: load, write supports iceberg (#3737) * feat: load/write supports iceberg * docs --- .../integration/offline_data_sources/hive.md | 4 +- .../offline_data_sources/iceberg.md | 104 ++++++++ .../offline_data_sources/index.rst | 3 +- .../integration/offline_data_sources/hive.md | 8 +- .../offline_data_sources/iceberg.md | 132 ++++++++++ .../offline_data_sources/index.rst | 3 +- .../openmldb/batch/api/OpenmldbSession.scala | 2 +- .../batch/nodes/CreateTablePlan.scala | 4 +- .../openmldb/batch/nodes/LoadDataPlan.scala | 4 +- .../openmldb/batch/nodes/SelectIntoPlan.scala | 12 +- .../openmldb/batch/utils/DataSourceUtil.scala | 228 ++++++++++++++++++ .../openmldb/batch/utils/HybridseUtil.scala | 193 +-------------- .../openmldb/batch/TestLoadDataPlan.scala | 4 +- ...tilTest.scala => DataSourceUtilTest.scala} | 4 +- 14 files changed, 499 insertions(+), 206 deletions(-) create mode 100644 docs/en/integration/offline_data_sources/iceberg.md create mode 100644 docs/zh/integration/offline_data_sources/iceberg.md create mode 100644 java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala rename java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/{HybridseUtilTest.scala => DataSourceUtilTest.scala} (97%) diff --git a/docs/en/integration/offline_data_sources/hive.md b/docs/en/integration/offline_data_sources/hive.md index 6784df026a2..5e41f5d66dd 100644 --- a/docs/en/integration/offline_data_sources/hive.md +++ b/docs/en/integration/offline_data_sources/hive.md @@ -102,7 +102,7 @@ Importing data from Hive sources is facilitated through the API [`LOAD DATA INFI - Both offline and online engines are capable of importing data from Hive sources. - The Hive data import feature supports soft connections. This approach minimizes the need for redundant data copies and ensures that OpenMLDB can access Hive's most up-to-date data at any given time. To activate the soft link mechanism for data import, utilize the `deep_copy=false` parameter. -- The `OPTIONS` parameter offers two valid settings: `deep_copy`, `mode` and `sql`. +- The `OPTIONS` parameter offers three valid settings: `deep_copy`, `mode` and `sql`. For example: @@ -122,7 +122,7 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql=' Exporting data to Hive sources is facilitated through the API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md), which employs a distinct URI format, `hive://[db].table`, to seamlessly transfer data to the Hive data warehouse. Here are some key considerations: -- If you omit specifying a database name, the default database name used will be `default_Db`. +- If you omit specifying Hive database name, the default database used in Hive will be `default`. - When a database name is explicitly provided, it's imperative that the database already exists. Currently, the system does not support the automatic creation of non-existent databases. - In the event that the designated Hive table name is absent, the system will automatically generate a table with the corresponding name within the Hive environment. - The `OPTIONS` parameter exclusively takes effect within the export mode of `mode`. Other parameters do not exert any influence. diff --git a/docs/en/integration/offline_data_sources/iceberg.md b/docs/en/integration/offline_data_sources/iceberg.md new file mode 100644 index 00000000000..5a46b93e932 --- /dev/null +++ b/docs/en/integration/offline_data_sources/iceberg.md @@ -0,0 +1,104 @@ +# Iceberg + +## Introduction + +[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table. OpenMLDB supports the use of Iceberg as an offline storage engine for importing data and exporting feature computation data. + +## Configuration + +### Installation + +For users employing [The OpenMLDB Spark Distribution Version](../../tutorial/openmldbspark_distribution.md), specifically v0.8.5 and newer iterations, the essential Iceberg 1.4.3 dependencies are already integrated. If you are working with an alternative Spark distribution or different iceberg version, you can download the corresponding Iceberg dependencies from the [Iceberg release](https://iceberg.apache.org/releases/) and add them to the Spark classpath/jars. For example, if you are using OpenMLDB Spark, you should download `x.x.x Spark 3.2_12 runtime Jar`(x.x.x is iceberg version) and add it to `jars/` in Spark home. + +### Configuration + +You should add the catalog configuration to the Spark configuration. This can be accomplished in two ways: + +- taskmanager.properties(.template): Include iceberg configs within the `spark.default.conf` configuration item, followed by restarting the taskmanager. +- CLI: Integrate this configuration directive into ini conf and use `--spark_conf` when start CLI. Please refer to [Client Spark Configuration](../../reference/client_config/client_spark_config.md). + +Iceberg config details can be found in [Iceberg Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/). + +For example, set hive catalog in `taskmanager.properties(.template)`: + +```properties +spark.default.conf=spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.hive_prod.type=hive;spark.sql.catalog.hive_prod.uri=thrift://metastore-host:port +``` + +If you need to create iceberg tables, you also need to configure `spark.sql.catalog.hive_prod.warehouse`. + +Set hadoop catalog: + +```properties +spark.default.conf=spark.sql.catalog.hadoop_prod=org.apache.iceberg.hadoop.HadoopCatalog;spark.sql.catalog.hadoop_prod.type=hadoop;spark.sql.catalog.hadoop_prod.warehouse=hdfs://hadoop-namenode:port/warehouse +``` + +Set rest catalog: + +```properties +spark.default.conf=spark.sql.catalog.rest_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.rest_prod.catalog-impl=org.apache.iceberg.rest.RESTCatalog;spark.sql.catalog.rest_prod.uri=http://iceberg-rest:8181/ +``` + +The full configuration of the iceberg catalog see [Iceberg Catalog Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/). + +### Debug Information + +When you import data from Iceberg, you can check the task log to confirm whether the task read the source data. +``` +INFO ReaderImpl: Reading ORC rows from +``` +TODO + +## Data Format + +Iceberg schema see [Iceberg Schema](https://iceberg.apache.org/spec/#schema). Currently, it only supports the following Iceberg data format: + +| OpenMLDB Data Format | Iceberg Data Format | +| -------------------- | ------------------- | +| BOOL | bool | +| INT | int | +| BIGINT | long | +| FLOAT | float | +| DOUBLE | double | +| DATE | date | +| TIMESTAMP | timestamp | +| STRING | string | + +## Import Iceberg Data to OpenMLDB + +Importing data from Iceberg sources is facilitated through the API [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md). This operation employs a specialized URI format, `hive://[db].table`, to seamlessly import data from Iceberg. Here are some important considerations: + +- Both offline and online engines are capable of importing data from Iceberg sources. +- The Iceberg data import feature supports soft connections. This approach minimizes the need for redundant data copies and ensures that OpenMLDB can access Iceberg's most up-to-date data at any given time. To activate the soft link mechanism for data import, utilize the `deep_copy=false` parameter. +- The `OPTIONS` parameter offers three valid settings: `deep_copy`, `mode` and `sql`. + +For example, load data from Iceberg configured as hive catalog: + +```sql +LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false); +-- or +LOAD DATA INFILE 'hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false, format='iceberg'); +``` + +The data loading process also supports using SQL queries to filter specific data from Hive tables. It's important to note that the SQL syntax must comply with SparkSQL standards. The table name used should be the registered name without the `iceberg://` prefix. + +For example: + +```sql +LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql='SELECT * FROM hive_prod.db1.t1 where key=\"foo\"') +``` + +## Export OpenMLDB Data to Iceberg + +Exporting data to Iceberg sources is facilitated through the API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md), which employs a distinct URI format, `iceberg://[catalog].[db].table`, to seamlessly transfer data to the Iceberg data warehouse. Here are some key considerations: + +- If you omit specifying Iceberg database name, the default database used in Iceberg will be `default`. +- When Iceberg database name is explicitly provided, it's imperative that the database already exists. Currently, the system does not support the automatic creation of non-existent databases. +- In the event that the designated Iceberg table name is absent, the system will automatically generate a table with the corresponding name within the Hive environment. +- The `OPTIONS` parameter exclusively takes effect within the export mode of `mode`. Other parameters do not exert any influence. + +For example: + +```sql +SELECT col1, col2, col3 FROM t1 INTO OUTFILE 'iceberg://hive_prod.db1.t1'; +``` diff --git a/docs/en/integration/offline_data_sources/index.rst b/docs/en/integration/offline_data_sources/index.rst index 51f877f29bc..9947828577d 100644 --- a/docs/en/integration/offline_data_sources/index.rst +++ b/docs/en/integration/offline_data_sources/index.rst @@ -6,4 +6,5 @@ Offline Data Source :maxdepth: 1 hive - s3 \ No newline at end of file + s3 + iceberg \ No newline at end of file diff --git a/docs/zh/integration/offline_data_sources/hive.md b/docs/zh/integration/offline_data_sources/hive.md index 286f14d9684..5e48115b9c8 100644 --- a/docs/zh/integration/offline_data_sources/hive.md +++ b/docs/zh/integration/offline_data_sources/hive.md @@ -26,14 +26,14 @@ ### 配置 -目前 OpenMLDB 只支持使用 metastore 服务来连接Hive。你可以在以下两种配置方式中选择一种,来访问 Hive 数据源。 +目前 OpenMLDB 只支持使用 metastore 服务来连接Hive。你可以在以下两种配置方式中选择一种,来访问 Hive 数据源。测试搭建的HIVE环境简单,通常只需要配置`hive.metastore.uris`即可。但生产环境中,可能需要配置更多的Hive配置,更推荐使用`hive-site.xml`的方式。 -- spark.conf:你可以在 spark conf 中配置 `spark.hadoop.hive.metastore.uris`。有两种方式: +- spark.conf:你可以在 spark conf 中配置 `spark.hadoop.hive.metastore.uris`等相关配置。有两种方式: - taskmanager.properties: 在配置项 `spark.default.conf` 中加入`spark.hadoop.hive.metastore.uris=thrift://...` ,随后重启taskmanager。 - CLI: 在 ini conf 中加入此配置项,并使用`--spark_conf`启动CLI,参考[客户端Spark配置文件](../../reference/client_config/client_spark_config.md)。 -- hive-site.xml:你可以配置 `hive-site.xml` 中的 `hive.metastore.uris`,并将配置文件放入 Spark home的`conf/`(如果已配置`HADOOP_CONF_DIR`环境变量,也可以将配置文件放入`HADOOP_CONF_DIR`中)。`hive-site.xml` 样例: +- hive-site.xml:你可以将HIVE的配置 `hive-site.xml` 放入 Spark home的`conf/`(如果已配置`HADOOP_CONF_DIR`环境变量,也可以将配置文件放入`HADOOP_CONF_DIR`中)。`hive-site.xml` 样例: ```xml @@ -122,7 +122,7 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql=' 对于 Hive 数据源的导出是通过 API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `hive://[db].table` 的格式进行导出到 Hive 数仓。注意: -- 如果不指定数据库名字,则会使用默认数据库名字 `default_db` +- 如果不指定Hive数据库名字,则会使用Hive默认数据库 `default` - 如果指定数据库名字,则该数据库必须已经存在,目前不支持对于不存在的数据库进行自动创建 - 如果指定的Hive表名不存在,则会在 Hive 内自动创建对应名字的表 - `OPTIONS` 参数只有导出模式`mode`生效,其他参数均不生效 diff --git a/docs/zh/integration/offline_data_sources/iceberg.md b/docs/zh/integration/offline_data_sources/iceberg.md new file mode 100644 index 00000000000..12088f5e990 --- /dev/null +++ b/docs/zh/integration/offline_data_sources/iceberg.md @@ -0,0 +1,132 @@ +# Iceberg + +## 简介 + +[Apache Iceberg](https://iceberg.apache.org/) 是一个开源的大数据表格格式。Iceberg可以在Spark、Trino、PrestoDB、Flink、Hive和Impala等计算引擎中添加表格,使用高性能的表格格式,就像SQL表格一样。OpenMLDB 支持使用 Iceberg 作为离线存储引擎,导入数据和导出特征计算数据。 + +## 配置 + +### 安装 + +[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md) v0.8.5 及以上版本均已经包含 Iceberg 1.4.3 依赖。如果你需要与其他iceberg版本或者其他Spark发行版一起使用,你可以从[Iceberg release](https://iceberg.apache.org/releases/)下载对应的Iceberg依赖,并将其添加到Spark的classpath/jars中。例如,如果你使用的是OpenMLDB Spark,你应该下载`x.x.x Spark 3.2_12 runtime Jar`(x.x.x is iceberg version)并将其添加到Spark home的`jars/`中。 + +### 配置 + +你需要将catalog配置添加到Spark配置中。有两种方式: + +- taskmanager.properties(.template): 在配置项 `spark.default.conf` 中加入Iceberg配置,随后重启taskmanager。 +- CLI: 在 ini conf 中加入此配置项,并使用`--spark_conf`启动CLI,参考[客户端Spark配置文件](../../reference/client_config/client_spark_config.md)。 + +Iceberg配置详情参考[Iceberg Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/)。 + +例如,在`taskmanager.properties(.template)`中设置hive catalog: + +```properties +spark.default.conf=spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.hive_prod.type=hive;spark.sql.catalog.hive_prod.uri=thrift://metastore-host:port +``` + +如果需要创建iceberg表,还需要配置`spark.sql.catalog.hive_prod.warehouse`。 + +设置 hadoop catalog: + +```properties +spark.default.conf=spark.sql.catalog.hadoop_prod=org.apache.iceberg.hadoop.HadoopCatalog;spark.sql.catalog.hadoop_prod.type=hadoop;spark.sql.catalog.hadoop_prod.warehouse=hdfs://hadoop-namenode:port/warehouse +``` + +设置 rest catalog: + +```properties +spark.default.conf=spark.sql.catalog.rest_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.rest_prod.catalog-impl=org.apache.iceberg.rest.RESTCatalog;spark.sql.catalog.rest_prod.uri=http://iceberg-rest:8181/ +``` + +Iceberg catalog的完整配置参考[Iceberg Catalog Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/)。 + +任一配置成功后,均使用`..`的格式访问Iceberg表。如果不想使用``,可以在配置中设置`spark.sql.catalog.default=`。也可添加`spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog`,`spark.sql.catalog.spark_catalog.type=hive`,让iceberg catalog合入spark catalog中(非iceberg表仍然存在于spark catalog中),这样可以使用`.`的格式访问Iceberg表。 + +### 调试信息 + +成功连接Iceberg Hive Catalog后,你可以在日志中看到类似以下的信息: + +``` +24/01/30 09:01:05 INFO SharedState: Setting hive.metastore.warehouse.dir ('hdfs://namenode:19000/user/hive/warehouse') to the value of spark.sql.warehouse.dir. +24/01/30 09:01:05 INFO SharedState: Warehouse path is 'hdfs://namenode:19000/user/hive/warehouse'. +... +24/01/30 09:01:06 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3.9 using Spark classes. +24/01/30 09:01:06 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.9) is hdfs://namenode:19000/user/hive/warehouse +24/01/30 09:01:06 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist +24/01/30 09:01:06 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist +24/01/30 09:01:06 INFO HiveMetaStore: 0: Opening raw store with implementation class:org.apache.hadoop.hive.metastore.ObjectStore +24/01/30 09:01:06 INFO ObjectStore: ObjectStore, initialize called +24/01/30 09:01:06 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored +24/01/30 09:01:06 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored +24/01/30 09:01:07 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" +24/01/30 09:01:07 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is POSTGRES +24/01/30 09:01:07 INFO ObjectStore: Initialized ObjectStore +24/01/30 09:01:08 INFO HiveMetaStore: Added admin role in metastore +24/01/30 09:01:08 INFO HiveMetaStore: Added public role in metastore +24/01/30 09:01:08 INFO HiveMetaStore: No user is added in admin role, since config is empty +24/01/30 09:01:08 INFO HiveMetaStore: 0: get_database: default +``` + +导出到Iceberg时,你可以检查任务日志,应该有类似以下的信息: + +``` +24/01/30 09:57:29 INFO AtomicReplaceTableAsSelectExec: Start processing data source write support: IcebergBatchWrite(table=nyc.taxis_out, format=PARQUET). The input RDD has 1 partitions. +... +24/01/30 09:57:31 INFO AtomicReplaceTableAsSelectExec: Data source write support IcebergBatchWrite(table=nyc.taxis_out, format=PARQUET) committed. +... +24/01/30 09:57:31 INFO HiveTableOperations: Committed to table hive_prod.nyc.taxis_out with the new metadata location hdfs://namenode:19000/user/hive/iceberg_storage/nyc.db/taxis_out/metadata/00001-038d8b81-04a6-4a19-bb83-275eb4664937.metadata.json +24/01/30 09:57:31 INFO BaseMetastoreTableOperations: Successfully committed to table hive_prod.nyc.taxis_out in 224 ms +``` + +## 数据格式 + +Iceberg schema参考[Iceberg Schema](https://iceberg.apache.org/spec/#schema)。目前,仅支持以下Iceberg数据格式: + +| OpenMLDB 数据格式 | Iceberg 数据格式 | +| ----------------- | ---------------- | +| BOOL | bool | +| INT | int | +| BIGINT | long | +| FLOAT | float | +| DOUBLE | double | +| DATE | date | +| TIMESTAMP | timestamp | +| STRING | string | + +## 导入 Iceberg 数据到 OpenMLDB + +从 Iceberg 表导入数据,需要使用 [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md) 语句。这个语句使用特殊的 URI 格式 `hive://[db].table`,可以无缝地从 Iceberg 导入数据。以下是一些重要的注意事项: + +- 离线引擎和在线引擎都可以从 Iceberg 表导入数据。 +- 离线导入支持软链接,但是在线导入不支持软链接。使用软链接时,需要在导入OPTIONS中指定 `deep_copy=false`。 +- Iceberg 表导入只有三个参数有效: `deep_copy`, `mode` and `sql`。其他格式参数`delimiter`,`quote`等均无效。 + +例如,通过Iceberg Hive Catalog导入数据: + +```sql +LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false); +-- or +LOAD DATA INFILE 'hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false, format='iceberg'); +``` + +数据导入支持`sql`参数,筛选出表种的特定数据进行导入,注意 SQL 必须符合 SparkSQL 语法,数据表为注册后的表名,不带 `iceberg://` 前缀。 + +```sql +LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false, sql='select * from t1 where id > 100'); +``` + +## 导出 OpenMLDB 数据到 Iceberg + +从 OpenMLDB 导出数据到 Iceberg 表,需要使用 [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 语句,这个语句使用特殊的 URI 格式 `iceberg://[db].table`,可以无缝地导出数据到 Iceberg 表。以下是一些重要的注意事项: + +- 如果不指定Iceberg数据库名字,则会使用Iceberg默认数据库`default` +- 如果指定Iceberg数据库名字,则该数据库必须已经存在,目前不支持对于不存在的数据库进行自动创建 +- 如果指定的Iceberg表名不存在,则会在 Iceberg 内自动创建对应名字的表 +- `OPTIONS` 参数只有导出模式`mode`生效,其他参数均不生效 + +举例: + +```sql +SELECT col1, col2, col3 FROM t1 INTO OUTFILE 'iceberg://hive_prod.db1.t1'; +``` diff --git a/docs/zh/integration/offline_data_sources/index.rst b/docs/zh/integration/offline_data_sources/index.rst index e7ee72aec6c..ef2877aee8a 100644 --- a/docs/zh/integration/offline_data_sources/index.rst +++ b/docs/zh/integration/offline_data_sources/index.rst @@ -6,4 +6,5 @@ :maxdepth: 1 hive - s3 \ No newline at end of file + s3 + iceberg diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index 7059f0146bd..d16496a6111 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -18,7 +18,7 @@ package com._4paradigm.openmldb.batch.api import com._4paradigm.openmldb.batch.catalog.OpenmldbCatalogService import com._4paradigm.openmldb.batch.utils.{DataTypeUtil, VersionCli} -import com._4paradigm.openmldb.batch.utils.HybridseUtil.autoLoad +import com._4paradigm.openmldb.batch.utils.DataSourceUtil.autoLoad import com._4paradigm.openmldb.batch.{OpenmldbBatchConfig, SparkPlanner} import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.log4j.{Level, Logger} diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/CreateTablePlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/CreateTablePlan.scala index 17f325fb909..579195fa257 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/CreateTablePlan.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/CreateTablePlan.scala @@ -18,7 +18,7 @@ package com._4paradigm.openmldb.batch.nodes import com._4paradigm.hybridse.node.CreateTableLikeClause.LikeKind import com._4paradigm.hybridse.sdk.UnsupportedHybridSeException import com._4paradigm.hybridse.vm.PhysicalCreateTableNode -import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil} +import com._4paradigm.openmldb.batch.utils.{DataSourceUtil, OpenmldbTableUtil} import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance} import org.slf4j.LoggerFactory @@ -44,7 +44,7 @@ object CreateTablePlan { val df = likeKind match { case LikeKind.HIVE => val hivePath = node.getData_.GetLikePath() - HybridseUtil.autoLoad(ctx.getOpenmldbSession, hivePath, "hive", Map[String, String](), null) + DataSourceUtil.autoLoad(ctx.getOpenmldbSession, hivePath, "hive", Map[String, String](), null) case LikeKind.PARQUET => val parquetPath = node.getData_.GetLikePath() ctx.getSparkSession.read.parquet(parquetPath) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala index ec9946b839a..7f87c55ffce 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala @@ -16,7 +16,7 @@ package com._4paradigm.openmldb.batch.nodes import com._4paradigm.hybridse.vm.PhysicalLoadDataNode -import com._4paradigm.openmldb.batch.utils.HybridseUtil +import com._4paradigm.openmldb.batch.utils.{DataSourceUtil, HybridseUtil} import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance} import com._4paradigm.openmldb.proto.NS.OfflineTableInfo import org.slf4j.LoggerFactory @@ -51,7 +51,7 @@ object LoadDataPlan { // we read input file even in soft copy, // cause we want to check if "the input file schema == openmldb table schema" - val df = HybridseUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList, + val df = DataSourceUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList, loadDataSql) // write diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala index 7dcdd51575b..4f366774cd5 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala @@ -16,7 +16,7 @@ package com._4paradigm.openmldb.batch.nodes import com._4paradigm.hybridse.vm.PhysicalSelectIntoNode -import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil} +import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil, DataSourceUtil} import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance} import org.slf4j.LoggerFactory @@ -39,12 +39,12 @@ object SelectIntoPlan { throw new Exception("select empty, skip save") } - if (format == "hive") { + if (DataSourceUtil.isCatalog(format)) { // we won't check if the database exists, if not, save will throw exception - // DO NOT create database in here(the table location will be spark warehouse) - val dbt = HybridseUtil.hiveDest(outPath) - logger.info(s"offline select into: hive way, write mode[${mode}], out table ${dbt}") - input.getDf().write.format("hive").mode(mode).saveAsTable(dbt) + // Hive: DO NOT create database in here(the table location will be spark warehouse) + val dbt = DataSourceUtil.catalogDest(outPath) + logger.info(s"offline select into: $format catalog, write mode[${mode}], out table ${dbt}") + input.getDf().write.format(format).mode(mode).saveAsTable(dbt) } else if (format == "openmldb") { val (db, table) = HybridseUtil.getOpenmldbDbAndTable(outPath) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala new file mode 100644 index 00000000000..12de283497c --- /dev/null +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtil.scala @@ -0,0 +1,228 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com._4paradigm.openmldb.batch.utils + +import java.util +import com._4paradigm.hybridse.`type`.TypeOuterClass.{ColumnDef, Database, TableDef} +import com._4paradigm.hybridse.node.ConstNode +import com._4paradigm.hybridse.sdk.UnsupportedHybridSeException +import com._4paradigm.hybridse.vm.{PhysicalLoadDataNode, PhysicalOpNode, PhysicalSelectIntoNode} +import com._4paradigm.openmldb.batch.api.OpenmldbSession +import com._4paradigm.openmldb.proto +import com._4paradigm.openmldb.proto.Common +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.functions.{col, first} +import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, + ShortType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` +import scala.collection.mutable + +// util for any data source(defined by format & options) +object DataSourceUtil { + private val logger = LoggerFactory.getLogger(this.getClass) + + def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String], + columns: util.List[Common.ColumnDesc]): DataFrame = { + autoLoad(openmldbSession, file, List.empty[String], format, options, columns, "") + } + + def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String], + columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = { + autoLoad(openmldbSession, file, List.empty[String], format, options, columns, loadDataSql) + } + + // otherwise isCatalog + // hdfs files are csv or parquet + def isFile(format: String): Boolean = { + format.toLowerCase.equals("csv") || format.toLowerCase.equals("parquet") + } + + def isCatalog(format: String): Boolean = { + !isFile(format) + } + + private def checkSchemaIgnoreNullable(actual: StructType, expect: StructType): Boolean = { + actual.zip(expect).forall{case (a, b) => (a.name, a.dataType) == (b.name, b.dataType)} + } + + // Load df from file **and** symbol paths, they should in the same format and options. + // Decide which load method to use by arg `format`, DO NOT pass `hive://a.b` with format `csv`, + // the format should be `hive`. + // Use `parseOptions` in LoadData/SelectInto to get the right format(filePath & option `format`). + // valid pattern: + // 1. catalog: discard other options, format supports hive(just schema.table), + // custom catalog(.scham.table, e.g.iceberg) + // 2. file: local file or hdfs file, format supports csv & parquet, other options take effect + // We use OpenmldbSession for running sparksql in hiveLoad. If in 4pd Spark distribution, SparkSession.sql + // will do openmldbSql first, and if DISABLE_OPENMLDB_FALLBACK, we can't use sparksql. + def autoLoad(openmldbSession: OpenmldbSession, file: String, symbolPaths: List[String], format: String, + options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String = "") + : DataFrame = { + val fmt = format.toLowerCase + if (isCatalog(fmt)) { + logger.info(s"load data from catalog table, format $fmt, paths: $file $symbolPaths") + if (file.isEmpty) { + // no file, read all symbol paths + var outputDf: DataFrame = null + symbolPaths.zipWithIndex.foreach { case (path, index) => + if (index == 0) { + outputDf = catalogLoad(openmldbSession, path, columns, loadDataSql) + } else { + outputDf = outputDf.union(catalogLoad(openmldbSession, path, columns, loadDataSql)) + } + } + outputDf + } else { + var outputDf = catalogLoad(openmldbSession, file, columns, loadDataSql) + for (path: String <- symbolPaths) { + outputDf = outputDf.union(catalogLoad(openmldbSession, path, columns, loadDataSql)) + } + outputDf + } + } else { + logger.info("load data from file {} & {} reader[format {}, options {}]", file, symbolPaths, fmt, options) + + if (file.isEmpty) { + var outputDf: DataFrame = null + symbolPaths.zipWithIndex.foreach { case (path, index) => + if (index == 0) { + outputDf = autoFileLoad(openmldbSession, path, fmt, options, columns, loadDataSql) + } else { + outputDf = outputDf.union(autoFileLoad(openmldbSession, path, fmt, options, columns, + loadDataSql)) + } + } + outputDf + } else { + var outputDf = autoFileLoad(openmldbSession, file, fmt, options, columns, loadDataSql) + for (path: String <- symbolPaths) { + outputDf = outputDf.union(autoFileLoad(openmldbSession, path, fmt, options, columns, + loadDataSql)) + } + outputDf + } + } + } + + // We want df with oriSchema, but if the file format is csv: + // 1. we support two format of timestamp + // 2. spark read may change the df schema to all nullable + // So we should fix it. + private def autoFileLoad(openmldbSession: OpenmldbSession, file: String, format: String, + options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = { + require(format.equals("csv") || format.equals("parquet"), s"unsupported format $format") + val reader = openmldbSession.getSparkSession.read.options(options) + + val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns) + var df = if (format.equals("parquet")) { + // When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons. + // ref https://spark.apache.org/docs/3.2.1/sql-data-sources-parquet.html + val df = if (loadDataSql != null && loadDataSql.nonEmpty) { + reader.format(format).load(file).createOrReplaceTempView("file") + openmldbSession.sparksql(loadDataSql) + } else { + reader.format(format).load(file) + } + + require(checkSchemaIgnoreNullable(df.schema, oriSchema), + s"schema mismatch(ignore nullable), loaded ${df.schema}!= table $oriSchema, check $file") + // reset nullable property + df.sqlContext.createDataFrame(df.rdd, oriSchema) + } else { + // csv should auto detect the timestamp format + reader.format(format) + // use string to read, then infer the format by the first non-null value of the ts column + val longTsCols = HybridseUtil.parseLongTsCols(reader, readSchema, tsCols, file) + logger.info(s"read schema: $readSchema, file $file") + var df = reader.schema(readSchema).load(file) + if (longTsCols.nonEmpty) { + // convert long type to timestamp type + for (tsCol <- longTsCols) { + logger.debug(s"cast $tsCol to timestamp") + df = df.withColumn(tsCol, (col(tsCol) / 1000).cast("timestamp")) + } + } + + if (loadDataSql != null && loadDataSql.nonEmpty) { + df.createOrReplaceTempView("file") + df = openmldbSession.sparksql(loadDataSql) + } + + if (logger.isDebugEnabled()) { + logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}") + df.show(10) + } + + // if we read non-streaming files, the df schema fields will be set as all nullable. + // so we need to set it right + if (!df.schema.equals(oriSchema)) { + logger.info(s"df schema: ${df.schema}, reset schema") + df.sqlContext.createDataFrame(df.rdd, oriSchema) + } else{ + df + } + } + + require(df.schema == oriSchema, s"schema mismatch, loaded ${df.schema} != table $oriSchema, check $file") + df + } + + // path can have prefix or not, we should remove it if exists + def catalogDest(path: String): String = { + path.split("://").last + } + + private def catalogLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc], + loadDataSql: String = ""): DataFrame = { + if (logger.isDebugEnabled()) { + logger.debug("session catalog {}", openmldbSession.getSparkSession.sessionState.catalog) + openmldbSession.sparksql("show tables").show() + } + // use sparksql to read catalog, no need to try openmldbsql and then fallback to sparksql + val df = if (loadDataSql != null && loadDataSql.nonEmpty) { + logger.debug("Try to execute custom SQL for catalog: " + loadDataSql) + openmldbSession.sparksql(loadDataSql) + } else { + openmldbSession.sparksql(s"SELECT * FROM ${catalogDest(file)}") + } + if (logger.isDebugEnabled()) { + logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}") + df.show(10) + } + + if (columns != null) { + val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns) + + require(checkSchemaIgnoreNullable(df.schema, oriSchema), //df.schema == oriSchema, hive table always nullable? + s"schema mismatch(ignore nullable), loaded hive ${df.schema}!= table $oriSchema, check $file") + + if (!df.schema.equals(oriSchema)) { + logger.info(s"df schema: ${df.schema}, reset schema") + df.sqlContext.createDataFrame(df.rdd, oriSchema) + } else{ + df + } + } else { + df + } + + } +} diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala index 6f3e5b78d40..ee588ab677d 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala @@ -206,15 +206,19 @@ object HybridseUtil { } // 'file' may change the option 'format': - // If file starts with 'hive', format is hive, not the detail format in hive - // If file starts with 'file'/'hdfs', format is the file format - // result: format, options(spark write/read options), mode is common, if more options, set them to extra map + // If file starts with 'hive'/'iceberg', format is hive/iceberg, not the detail format in hive + // If file starts with 'openmldb', format is openmldb, not the detail format in openmldb + // Others, format is the origin format option + // **Result**: format, options(spark write/read options), mode is common, if more options, set them to extra map def parseOptions[T](file: String, node: T): (String, Map[String, String], String, Map[String, String]) = { // load data: read format, select into: write format + // parse hive/iceberg to avoid user forget to set format val format = if (file.toLowerCase().startsWith("hive://")) { "hive" + } else if (file.toLowerCase().startsWith("iceberg://")) { + "iceberg" } else if (file.toLowerCase().startsWith("openmldb://")) { - "openmldb" + "openmldb" // TODO(hw): no doc for it } else { parseOption(getOptionFromNode(node, "format"), "csv", getStringOrDefault).toLowerCase } @@ -317,193 +321,16 @@ object HybridseUtil { longTsCols.toList } - def checkSchemaIgnoreNullable(actual: StructType, expect: StructType): Boolean = { - actual.zip(expect).forall{case (a, b) => (a.name, a.dataType) == (b.name, b.dataType)} - } - - def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String], - columns: util.List[Common.ColumnDesc]): DataFrame = { - autoLoad(openmldbSession, file, List.empty[String], format, options, columns, "") - } - - def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String], - columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = { - autoLoad(openmldbSession, file, List.empty[String], format, options, columns, loadDataSql) - } - - // Load df from file **and** symbol paths, they should in the same format and options. - // Decide which load method to use by arg `format`, DO NOT pass `hive://a.b` with format `csv`, - // the format should be `hive`. - // Use `parseOptions` in LoadData/SelectInto to get the right format(filePath & option `format`). - // valid pattern: - // 1. hive path, format must be hive, discard other options - // 2. file/hdfs path, format supports csv & parquet, other options take effect - // We use OpenmldbSession for running sparksql in hiveLoad. If in 4pd Spark distribution, SparkSession.sql - // will do openmldbSql first, and if DISABLE_OPENMLDB_FALLBACK, we can't use sparksql. - def autoLoad(openmldbSession: OpenmldbSession, file: String, symbolPaths: List[String], format: String, - options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String = "") - : DataFrame = { - val fmt = format.toLowerCase - if (fmt.equals("hive")) { - logger.info(s"load data from hive table $file & $symbolPaths") - if (file.isEmpty) { - var outputDf: DataFrame = null - symbolPaths.zipWithIndex.foreach { case (path, index) => - if (index == 0) { - outputDf = HybridseUtil.hiveLoad(openmldbSession, path, columns, loadDataSql) - } else { - outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns, loadDataSql)) - } - } - outputDf - } else { - var outputDf = HybridseUtil.hiveLoad(openmldbSession, file, columns, loadDataSql) - for (path: String <- symbolPaths) { - outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns, loadDataSql)) - } - outputDf - } - } else { - logger.info("load data from file {} & {} reader[format {}, options {}]", file, symbolPaths, fmt, options) - - if (file.isEmpty) { - var outputDf: DataFrame = null - symbolPaths.zipWithIndex.foreach { case (path, index) => - if (index == 0) { - outputDf = HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns, loadDataSql) - } else { - outputDf = outputDf.union(HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns, - loadDataSql)) - } - } - outputDf - } else { - var outputDf = HybridseUtil.autoFileLoad(openmldbSession, file, fmt, options, columns, loadDataSql) - for (path: String <- symbolPaths) { - outputDf = outputDf.union(HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns, - loadDataSql)) - } - outputDf - } - } - } - - // We want df with oriSchema, but if the file format is csv: - // 1. we support two format of timestamp - // 2. spark read may change the df schema to all nullable - // So we should fix it. - private def autoFileLoad(openmldbSession: OpenmldbSession, file: String, format: String, - options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = { - require(format.equals("csv") || format.equals("parquet"), s"unsupported format $format") - val reader = openmldbSession.getSparkSession.read.options(options) - - val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns) - var df = if (format.equals("parquet")) { - // When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons. - // ref https://spark.apache.org/docs/3.2.1/sql-data-sources-parquet.html - val df = if (loadDataSql != null && loadDataSql.nonEmpty) { - reader.format(format).load(file).createOrReplaceTempView("file") - openmldbSession.sparksql(loadDataSql) - } else { - reader.format(format).load(file) - } - - require(checkSchemaIgnoreNullable(df.schema, oriSchema), - s"schema mismatch(ignore nullable), loaded ${df.schema}!= table $oriSchema, check $file") - // reset nullable property - df.sqlContext.createDataFrame(df.rdd, oriSchema) - } else { - // csv should auto detect the timestamp format - reader.format(format) - // use string to read, then infer the format by the first non-null value of the ts column - val longTsCols = HybridseUtil.parseLongTsCols(reader, readSchema, tsCols, file) - logger.info(s"read schema: $readSchema, file $file") - var df = reader.schema(readSchema).load(file) - if (longTsCols.nonEmpty) { - // convert long type to timestamp type - for (tsCol <- longTsCols) { - logger.debug(s"cast $tsCol to timestamp") - df = df.withColumn(tsCol, (col(tsCol) / 1000).cast("timestamp")) - } - } - - if (loadDataSql != null && loadDataSql.nonEmpty) { - df.createOrReplaceTempView("file") - df = openmldbSession.sparksql(loadDataSql) - } - - if (logger.isDebugEnabled()) { - logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}") - df.show(10) - } - - // if we read non-streaming files, the df schema fields will be set as all nullable. - // so we need to set it right - if (!df.schema.equals(oriSchema)) { - logger.info(s"df schema: ${df.schema}, reset schema") - df.sqlContext.createDataFrame(df.rdd, oriSchema) - } else{ - df - } - } - - require(df.schema == oriSchema, s"schema mismatch, loaded ${df.schema} != table $oriSchema, check $file") - df - } - - def hiveDest(path: String): String = { - require(path.toLowerCase.startsWith("hive://"), s"invalid hive path $path") - // hive:// - val tableStartPos = 7 - path.substring(tableStartPos) - } - def getOpenmldbDbAndTable(path: String): (String, String) = { - require(path.toLowerCase.startsWith("openmldb://")) + require(path.toLowerCase.startsWith("openmldb://"), s"unsupported path $path") // openmldb:// val tableStartPos = 11 val dbAndTableString = path.substring(tableStartPos) - require(dbAndTableString.split("\\.").size == 2) + require(dbAndTableString.split("\\.").size == 2, s"invalid path $path") val db = dbAndTableString.split("\\.")(0) val table = dbAndTableString.split("\\.")(1) (db, table) } - - private def hiveLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc], - loadDataSql: String = ""): DataFrame = { - if (logger.isDebugEnabled()) { - logger.debug("session catalog {}", openmldbSession.getSparkSession.sessionState.catalog) - openmldbSession.sparksql("show tables").show() - } - // use sparksql to read hive, no need to try openmldbsql and then fallback to sparksql - val df = if (loadDataSql != null && loadDataSql.nonEmpty) { - logger.debug("Try to execute custom SQL for hive: " + loadDataSql) - openmldbSession.sparksql(loadDataSql) - } else { - openmldbSession.sparksql(s"SELECT * FROM ${hiveDest(file)}") - } - if (logger.isDebugEnabled()) { - logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}") - df.show(10) - } - - if (columns != null) { - val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns) - - require(checkSchemaIgnoreNullable(df.schema, oriSchema), //df.schema == oriSchema, hive table always nullable? - s"schema mismatch(ignore nullable), loaded hive ${df.schema}!= table $oriSchema, check $file") - - if (!df.schema.equals(oriSchema)) { - logger.info(s"df schema: ${df.schema}, reset schema") - df.sqlContext.createDataFrame(df.rdd, oriSchema) - } else{ - df - } - } else { - df - } - - } } diff --git a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestLoadDataPlan.scala b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestLoadDataPlan.scala index ee8e3e2633f..7ae1d9914f2 100644 --- a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestLoadDataPlan.scala +++ b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/TestLoadDataPlan.scala @@ -162,8 +162,8 @@ class TestLoadDataPlan extends SparkTestSuite with Matchers { fail("unreachable") } - println("deep load data with invalid format option") - a[IllegalArgumentException] should be thrownBy { + println("deep load data with invalid format option, catalog will throw exception") + a[org.apache.spark.sql.catalyst.parser.ParseException] should be thrownBy { openmldbSession.openmldbSql(s"load data infile '$testFileWithHeader' into table $db.$table " + "options(format='txt', mode='overwrite');") fail("unreachable") diff --git a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtilTest.scala b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtilTest.scala similarity index 97% rename from java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtilTest.scala rename to java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtilTest.scala index 5299ae5ae25..726173f9eb1 100644 --- a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtilTest.scala +++ b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/utils/DataSourceUtilTest.scala @@ -19,7 +19,7 @@ package com._4paradigm.openmldb.batch.utils import com._4paradigm.openmldb.batch.PlanContext import com._4paradigm.openmldb.batch.SparkTestSuite import com._4paradigm.openmldb.batch.api.OpenmldbSession -import com._4paradigm.openmldb.batch.utils.HybridseUtil.autoLoad +import com._4paradigm.openmldb.batch.utils.DataSourceUtil.autoLoad import com._4paradigm.openmldb.proto.{Common, Type} import org.apache.spark.sql.DataFrame import org.apache.spark.SparkConf @@ -27,7 +27,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.AnalysisException import org.scalatest.Matchers -class HybridseUtilTest extends SparkTestSuite with Matchers { +class DataSourceUtilTest extends SparkTestSuite with Matchers { var openmldbSession: OpenmldbSession = _ override def customizedBefore(): Unit = {