diff --git a/Dockerfile b/Dockerfile index 6c43cf9..6a12443 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,6 @@ FROM eclipse-temurin:17-jdk-alpine VOLUME /tmp COPY target/*.jar app.jar +COPY target/classes/tpch.db /tpch-tiny.db ENTRYPOINT ["java", "--add-opens=java.base/java.nio=ALL-UNNAMED", "-jar", "/app.jar"] EXPOSE 8080 \ No newline at end of file diff --git a/how_to_gen_tpch_data.md b/how_to_gen_tpch_data.md new file mode 100644 index 0000000..be50d55 --- /dev/null +++ b/how_to_gen_tpch_data.md @@ -0,0 +1,166 @@ +1. download tpch-tools + from [official website](https://www.tpc.org/TPC_Documents_Current_Versions/download_programs/tools-download-request5.asp?bm_type=TPC-H&bm_vers=3.0.1&mode=CURRENT-ONLY) +2. unzip the file and change dir to dbgen +3. fill the `makefile.suite` and rename it to `Makefile`. the run `make` to build the executable file. (Note that if you + work on + MacOS, you need to change `import malloc.h` to `import stdlib.h` in file `varsub.c` and `bm_utils.c`) +4. run `dbgen` like this `./dbgen -vf -s 1`, and the you will find `.tbl` files in the current dir. +5. Process the obtained .tbl file into csv format for subsequent storage into sqllite. You can use the following script. + +```sql +CREATE TABLE nation ( + nationkey INTEGER PRIMARY KEY, + name TEXT NOT NULL, + regionkey INTEGER NOT NULL, + comment TEXT +); + +CREATE TABLE region ( + regionkey INTEGER PRIMARY KEY, + name TEXT NOT NULL, + comment TEXT +); + +CREATE TABLE part ( + partkey INTEGER PRIMARY KEY, + name TEXT NOT NULL, + mfgr TEXT NOT NULL, + brand TEXT NOT NULL, + type TEXT NOT NULL, + size INTEGER NOT NULL, + container TEXT NOT NULL, + retailprice REAL NOT NULL, + comment TEXT NOT NULL +); + +CREATE TABLE supplier ( + suppkey INTEGER PRIMARY KEY, + name TEXT NOT NULL, + address TEXT NOT NULL, + nationkey INTEGER NOT NULL, + phone TEXT NOT NULL, + acctbal REAL NOT NULL, + comment TEXT NOT NULL +); + +CREATE TABLE partsupp ( + partkey INTEGER NOT NULL, + suppkey INTEGER NOT NULL, + availqty INTEGER NOT NULL, + supplycost REAL NOT NULL, + comment TEXT NOT NULL, + PRIMARY KEY (partkey, suppkey) +); + +CREATE TABLE customer ( + custkey INTEGER PRIMARY KEY, + name TEXT NOT NULL, + address TEXT NOT NULL, + nationkey INTEGER NOT NULL, + phone TEXT NOT NULL, + acctbal REAL NOT NULL, + mktsegment TEXT NOT NULL, + comment TEXT NOT NULL +); + +CREATE TABLE orders ( + orderkey INTEGER PRIMARY KEY, + custkey INTEGER NOT NULL, + orderstatus TEXT NOT NULL, + totalprice REAL NOT NULL, + orderdate TEXT NOT NULL, + orderpriority TEXT NOT NULL, + clerk TEXT NOT NULL, + shippriority INTEGER NOT NULL, + comment TEXT NOT NULL +); + +CREATE TABLE lineitem ( + orderkey INTEGER NOT NULL, + partkey INTEGER NOT NULL, + suppkey INTEGER NOT NULL, + linenumber INTEGER NOT NULL, + quantity REAL NOT NULL, + extendedprice REAL NOT NULL, + discount REAL NOT NULL, + tax REAL NOT NULL, + returnflag TEXT NOT NULL, + linestatus TEXT NOT NULL, + shipdate TEXT NOT NULL, + commitdate TEXT NOT NULL, + receiptdate TEXT NOT NULL, + shipinstruct TEXT NOT NULL, + shipmode TEXT NOT NULL, + comment TEXT NOT NULL, + PRIMARY KEY (orderkey, linenumber) +); +``` + + +```bash +#!/bin/bash +folder_path="." + +# hard code the headers +get_header() { + case "$1" in + "customer") + echo "custkey|name|address|nationkey|phone|acctbal|mktsegment|comment" + ;; + "lineitem") + echo "orderkey|partkey|suppkey|linenumber|quantity|extendedprice|discount|tax|returnflag|linestatus|shipdate|commitdate|receiptdate|shipinstruct|shipmode|comment" + ;; + "nation") + echo "nationkey|name|regionkey|comment" + ;; + "orders") + echo "orderkey|custkey|orderstatus|totalprice|orderdate|orderpriority|clerk|shippriority|comment" + ;; + "part") + echo "partkey|name|mfgr|brand|type|size|container|retailprice|comment" + ;; + "partsupp") + echo "partkey|suppkey|availqty|supplycost|comment" + ;; + "region") + echo "regionkey|name|comment" + ;; + "supplier") + echo "suppkey|name|address|nationkey|phone|acctbal|comment" + ;; + *) + echo "" + ;; + esac +} + +for file in customer.tbl lineitem.tbl nation.tbl orders.tbl part.tbl partsupp.tbl region.tbl supplier.tbl; do + table_name="${file%.*}" + header=$(get_header "$table_name") + + if [ -n "$header" ]; then + tmp_file=$(mktemp "${file}.tmp.XXXXXX") + echo "$header" > "$tmp_file" + sed 's/|$//' "$file" >> "$tmp_file" + mv "$tmp_file" "$file" + else + echo "cannot found $table_name, skip $file" + fi +done + +```g + +6. load the data into the table. you can use follow scripts. + +```sql +.mode csv +.separator | +.import customer.tbl customer +.import lineitem.tbl lineitem +.import nation.tbl nation +.import orders.tbl orders +.import part.tbl part +.import partsupp.tbl partsupp +.import region.tbl region +.import supplier.tbl supplier +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index e3e0cab..a19d42e 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ com.aliyun.odps maxcompute-emulator - 0.0.5 + 0.0.6-tpch maxcompute-emulator mock version to access MaxCompute @@ -115,7 +115,7 @@ tech.dingxin arrow-utils - 0.0.1 + 0.0.2 org.apache.arrow diff --git a/src/main/java/com/aliyun/odps/service/StorageService.java b/src/main/java/com/aliyun/odps/service/StorageService.java index d1655f8..e508515 100644 --- a/src/main/java/com/aliyun/odps/service/StorageService.java +++ b/src/main/java/com/aliyun/odps/service/StorageService.java @@ -112,20 +112,23 @@ private TableData read(TableId tableId) throws Exception { sql.delete(sql.length() - 4, sql.length()); sql.append(";"); } else { - sql = new StringBuilder("select * from " + tableId.getTableName().toUpperCase() + " where "); + sql = new StringBuilder("select * from " + tableId.getTableName().toUpperCase()); List partitionNames = tableId.getPartitionNames(); - for (String partitionName : partitionNames) { - sql.append("("); - PartitionSpec partitionSpec = new PartitionSpec(partitionName); - Set keys = partitionSpec.keys(); - for (String key : keys) { - sql.append(key).append(" = '").append(partitionSpec.get(key)).append("' and "); + if (!partitionNames.isEmpty()) { + sql.append(" where "); + for (String partitionName : partitionNames) { + sql.append("("); + PartitionSpec partitionSpec = new PartitionSpec(partitionName); + Set keys = partitionSpec.keys(); + for (String key : keys) { + sql.append(key).append(" = '").append(partitionSpec.get(key)).append("' and "); + } + sql.delete(sql.length() - 4, sql.length()); + + sql.append(") or "); } sql.delete(sql.length() - 4, sql.length()); - - sql.append(") or "); } - sql.delete(sql.length() - 4, sql.length()); sql.append(";"); } try ( diff --git a/src/main/java/com/aliyun/odps/service/TableService.java b/src/main/java/com/aliyun/odps/service/TableService.java index ade169d..20f5e3a 100644 --- a/src/main/java/com/aliyun/odps/service/TableService.java +++ b/src/main/java/com/aliyun/odps/service/TableService.java @@ -84,7 +84,14 @@ public boolean tableExist(String tableName) { pstmt.setString(1, tableName.toUpperCase()); try (ResultSet resultSet = pstmt.executeQuery()) { - return resultSet.next(); + if (resultSet.next()) { + return true; + } else { + pstmt.setString(1, tableName); + try (ResultSet resultSet2 = pstmt.executeQuery()) { + return resultSet2.next(); + } + } } } } catch (SQLException e) { diff --git a/src/main/java/com/aliyun/odps/utils/AnalyzeDbRunner.java b/src/main/java/com/aliyun/odps/utils/AnalyzeDbRunner.java new file mode 100644 index 0000000..4f9e997 --- /dev/null +++ b/src/main/java/com/aliyun/odps/utils/AnalyzeDbRunner.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aliyun.odps.utils; + +import com.aliyun.odps.entity.SqlLiteColumn; +import com.aliyun.odps.entity.SqlLiteSchema; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author dingxin (zhangdingxin.zdx@alibaba-inc.com) + */ +public class AnalyzeDbRunner { + + public static void main(String[] args) { + String url = "jdbc:sqlite:tpch.db"; + CommonUtils.initEmulator(); + try (Connection conn = DriverManager.getConnection(url)) { + if (conn != null) { + Map schemas = getDatabaseSchema(conn); + schemas.forEach((tableName, schema) -> { + try { + deleteRowIfFirstColumnEqualsColumnName(conn, tableName); + SqlRunner.executeSql("INSERT INTO schemas VALUES ('" + tableName + "', '" + schema.toJson() + + "');"); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + + public static Map getDatabaseSchema(Connection conn) throws SQLException { + DatabaseMetaData meta = conn.getMetaData(); + ResultSet rsTables = meta.getTables(null, null, "%", new String[] {"TABLE"}); + Map schemas = new HashMap<>(); + + while (rsTables.next()) { + String tableName = rsTables.getString("TABLE_NAME"); + ResultSet rsColumns = meta.getColumns(null, null, tableName, "%"); + SqlLiteSchema schema = new SqlLiteSchema(); + List columns = new ArrayList<>(); + + while (rsColumns.next()) { + String columnName = rsColumns.getString("COLUMN_NAME"); + String columnType = rsColumns.getString("TYPE_NAME"); + if (columnType.equals("TEXT")) { + columnType = "STRING"; + } + boolean notNull = rsColumns.getInt("NULLABLE") == DatabaseMetaData.columnNoNulls; + String defaultValue = rsColumns.getString("COLUMN_DEF"); + boolean primaryKey = isPrimaryKey(meta, tableName, columnName); + + SqlLiteColumn column = + new SqlLiteColumn(columnName, columnType, notNull, defaultValue, primaryKey, false); + columns.add(column); + } + + schema.setColumns(columns); + schema.setPartitionColumns(new ArrayList<>()); + schemas.put(tableName, schema); + rsColumns.close(); + } + + rsTables.close(); + return schemas; + } + + private static boolean isPrimaryKey(DatabaseMetaData meta, String tableName, String columnName) + throws SQLException { + ResultSet rsPrimaryKeys = meta.getPrimaryKeys(null, null, tableName); + while (rsPrimaryKeys.next()) { + String pkColumnName = rsPrimaryKeys.getString("COLUMN_NAME"); + if (columnName.equals(pkColumnName)) { + rsPrimaryKeys.close(); + return true; + } + } + rsPrimaryKeys.close(); + return false; + } + + private static void deleteRowIfFirstColumnEqualsColumnName(Connection conn, String tableName) throws SQLException { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("PRAGMA table_info(" + tableName + ")"); + + if (rs.next()) { + String firstColumnName = rs.getString("name"); + String sql = "DELETE FROM " + tableName + " WHERE " + firstColumnName + " = '" + firstColumnName + "'"; + stmt.executeUpdate(sql); + System.out.println("Deleted rows from table " + tableName + " where " + firstColumnName + " equals " + + firstColumnName); + } + + stmt.close(); + rs.close(); + } +} diff --git a/src/main/java/com/aliyun/odps/utils/CommonUtils.java b/src/main/java/com/aliyun/odps/utils/CommonUtils.java index abde693..5fed6a3 100644 --- a/src/main/java/com/aliyun/odps/utils/CommonUtils.java +++ b/src/main/java/com/aliyun/odps/utils/CommonUtils.java @@ -40,7 +40,7 @@ public static String generateUUID() { return java.util.UUID.randomUUID().toString().replace("-", ""); } - private static final String URL = "jdbc:sqlite:/tmp/maxcompute-emulator.db"; + private static final String URL = "jdbc:sqlite:tpch-tiny.db"; public static void initEmulator() { try { diff --git a/src/main/java/com/aliyun/odps/utils/SqlRunner.java b/src/main/java/com/aliyun/odps/utils/SqlRunner.java index 13062b5..9b03f27 100644 --- a/src/main/java/com/aliyun/odps/utils/SqlRunner.java +++ b/src/main/java/com/aliyun/odps/utils/SqlRunner.java @@ -32,7 +32,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -213,16 +212,23 @@ public static String handleRenameColumn(String mcSql) throws SQLException { public static SqlLiteSchema getSchema(String tableName) throws SQLException { try (Statement stmt = CommonUtils.getConnection().createStatement()) { - ResultSet rs = stmt.executeQuery("SELECT schema FROM schemas WHERE table_name = '" + tableName.toUpperCase() + "';"); + ResultSet rs = stmt.executeQuery( + "SELECT schema FROM schemas WHERE table_name = '" + tableName.toUpperCase() + "';"); if (rs.next()) { String schema = rs.getString("schema"); return SqlLiteSchema.fromJson(schema); + } else { + rs = stmt.executeQuery("SELECT schema FROM schemas WHERE table_name = '" + tableName + "';"); + if (rs.next()) { + String schema = rs.getString("schema"); + return SqlLiteSchema.fromJson(schema); + } } } throw new SQLException("Table schema " + tableName + " not found"); } - private static void updateSchema(String tableName, SqlLiteSchema schema) throws SQLException { + static void updateSchema(String tableName, SqlLiteSchema schema) throws SQLException { try (Statement stmt = CommonUtils.getConnection().createStatement()) { stmt.executeUpdate( "UPDATE schemas SET schema = '" + schema.toJson() + "' WHERE table_name = '" + tableName + diff --git a/src/main/java/com/aliyun/odps/utils/TypeConvertUtils.java b/src/main/java/com/aliyun/odps/utils/TypeConvertUtils.java index b7dae60..03d902f 100644 --- a/src/main/java/com/aliyun/odps/utils/TypeConvertUtils.java +++ b/src/main/java/com/aliyun/odps/utils/TypeConvertUtils.java @@ -21,6 +21,11 @@ import com.aliyun.odps.type.TypeInfo; import com.aliyun.odps.type.TypeInfoFactory; +import java.sql.Types; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + /** * @author dingxin (zhangdingxin.zdx@alibaba-inc.com) */ @@ -57,7 +62,7 @@ public static TypeInfo convertToMaxComputeType(String typeName) { if ("DOUBLE".equals(typeName) || "DOUBLE PRECISION".equals(typeName)) { return TypeInfoFactory.DOUBLE; } - if ("FLOAT".equals(typeName)) { + if ("FLOAT".equals(typeName) || "REAL".equals(typeName)) { return TypeInfoFactory.FLOAT; } if ("CHARACTER".equals(typeName) || "NCHAR".equals(typeName) || "NATIVE CHARACTER".equals(typeName) || @@ -87,12 +92,27 @@ public static Object convertToMaxComputeValue(int columnType, Object object) { return null; } return switch (columnType) { - case java.sql.Types.TINYINT -> ((Number) object).byteValue(); - case java.sql.Types.SMALLINT -> ((Number) object).shortValue(); - case java.sql.Types.INTEGER -> ((Number) object).intValue(); - case java.sql.Types.BIGINT -> ((Number) object).longValue(); + case Types.TINYINT -> ((Number) object).byteValue(); + case Types.SMALLINT -> ((Number) object).shortValue(); + case Types.INTEGER -> ((Number) object).intValue(); + case Types.BIGINT -> ((Number) object).longValue(); + case Types.FLOAT, Types.REAL -> ((Number) object).floatValue(); + case Types.DOUBLE -> ((Number) object).doubleValue(); + case Types.DATE -> convertStringToLocalDate((String) object); default -> object; }; } + + private static DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static LocalDate convertStringToLocalDate(String dateString) { + if (dateString == null) { + return null; + } + try { + return LocalDate.parse(dateString, DATE_FORMATTER); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid date format, expected yyyy-MM-dd", e); + } + } } diff --git a/src/main/resources/tpch.db b/src/main/resources/tpch.db new file mode 100644 index 0000000..ca26529 Binary files /dev/null and b/src/main/resources/tpch.db differ