diff --git a/cmake/cmake.version b/cmake/cmake.version index e9cf31e75bbc..0e35fa316f7a 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.2.2.0.alpha") + SET(TD_VER_NUMBER "3.2.3.0.alpha") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/cmake/curl_CMakeLists.txt.in b/cmake/curl_CMakeLists.txt.in index af7f005dda4d..197d978fb7ef 100644 --- a/cmake/curl_CMakeLists.txt.in +++ b/cmake/curl_CMakeLists.txt.in @@ -1,6 +1,7 @@ # curl ExternalProject_Add(curl2 - URL https://curl.se/download/curl-8.2.1.tar.gz + URL https://github.com/curl/curl/releases/download/curl-8_2_1/curl-8.2.1.tar.gz + #URL https://curl.se/download/curl-8.2.1.tar.gz URL_HASH MD5=b25588a43556068be05e1624e0e74d41 DOWNLOAD_NO_PROGRESS 1 DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" diff --git a/docs/en/08-client-libraries/50-odbc.mdx b/docs/en/08-client-libraries/50-odbc.mdx index 08b2c031c629..45701e279181 100644 --- a/docs/en/08-client-libraries/50-odbc.mdx +++ b/docs/en/08-client-libraries/50-odbc.mdx @@ -57,9 +57,9 @@ TDengine ODBC driver supports two kinds of connections to TDengine cluster, nati 4.6 [Password]: optional field, only used for connection testing in step 5; -5. Click "Test Connecting" to test whether the data source can be connectted; if successful, it will prompt "connecting success" +5. Click "Test Connection" to test whether the data source can be connectted; if successful, it will prompt "Successfully connected to URL" -6. Click "OK" to sae the configuration and exit. +6. Click "OK" to set the configuration and exit. 7. You can also select an already configured data source name in step 2 to change existing configuration. diff --git a/docs/en/08-client-libraries/assets/odbc-ws-config-en.webp b/docs/en/08-client-libraries/assets/odbc-ws-config-en.webp index aaca2e99b4f2..e47f3bf97ffe 100644 Binary files a/docs/en/08-client-libraries/assets/odbc-ws-config-en.webp and b/docs/en/08-client-libraries/assets/odbc-ws-config-en.webp differ diff --git a/docs/en/10-deployment/01-deploy.md b/docs/en/10-deployment/01-deploy.md index 6e7e0d1b7c3a..c6f0f5a3a317 100644 --- a/docs/en/10-deployment/01-deploy.md +++ b/docs/en/10-deployment/01-deploy.md @@ -173,12 +173,6 @@ Query OK, 8 row(s) in set (0.001154s) Before running the TDengine CLI, ensure that the taosd process has been stopped on the dnode that you want to delete. -```sql -DROP DNODE "fqdn:port"; -``` - -or - ```sql DROP DNODE dnodeId; ``` diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 2606b183b80b..bc593b898504 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -9,7 +9,7 @@ description: This document describes how to query data in TDengine. ```sql SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE() | CURRENT_USER() | USER() } -SELECT [hints] [DISTINCT] select_list +SELECT [hints] [DISTINCT] [TAGS] select_list from_clause [WHERE condition] [partition_by_clause] @@ -139,6 +139,11 @@ You can query tag columns in supertables and subtables and receive results in th SELECT location, groupid, current FROM d1001 LIMIT 2; ``` +### Alias Name + +The naming rules for aliases are the same as those for columns, and it supports directly specifying Chinese aliases in UTF-8 encoding format. + + ### Distinct Values The DISTINCT keyword returns only values that are different over one or more columns. You can use the DISTINCT keyword with tag columns and data columns. @@ -182,7 +187,7 @@ The TBNAME pseudocolumn in a supertable contains the names of subtables within t The following SQL statement returns all unique subtable names and locations within the meters supertable: ```mysql -SELECT DISTINCT TBNAME, location FROM meters; +SELECT TAGS TBNAME, location FROM meters; ``` Use the `INS_TAGS` system table in `INFORMATION_SCHEMA` to query the information for subtables in a supertable. For example, the following statement returns the name and tag values for each subtable in the `meters` supertable. @@ -227,6 +232,14 @@ The \_IROWTS pseudocolumn can only be used with INTERP function. This pseudocolu select _irowts, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(linear); ``` +### TAGS Query + +The TAGS keyword returns only tag columns from all child tables when only tag columns are specified. One row containing tag columns is returned for each child table. + +```sql +SELECT TAGS tag_name [, tag_name ...] FROM stb_name +``` + ## Query Objects `FROM` can be followed by a number of tables or super tables, or can be followed by a sub-query. @@ -434,7 +447,8 @@ SELECT ... FROM (SELECT ... FROM ...) ...; :::info -- The result of a nested query is returned as a virtual table used by the outer query. It's recommended to give an alias to this table for the convenience of using it in the outer query. +- The result of a nested query is returned as a virtual table used by the outer query. It's recommended to give an alias to this table for the convenience of using it in the outer query. +- Outer queries support directly referencing columns or pseudo-columns of inner queries in the form of column names or `column names`. - JOIN operation is allowed between tables/STables inside both inner and outer queries. Join operation can be performed on the result set of the inner query. - The features that can be used in the inner query are the same as those that can be used in a non-nested query. - `ORDER BY` inside the inner query is unnecessary and will slow down the query performance significantly. It is best to avoid the use of `ORDER BY` inside the inner query. diff --git a/docs/en/12-taos-sql/21-node.md b/docs/en/12-taos-sql/21-node.md index 8a5069e66f51..2ebccb76f711 100644 --- a/docs/en/12-taos-sql/21-node.md +++ b/docs/en/12-taos-sql/21-node.md @@ -27,10 +27,10 @@ The preceding SQL command shows all dnodes in the cluster with the ID, endpoint, ## Delete a DNODE ```sql -DROP DNODE {dnode_id | dnode_endpoint} +DROP DNODE dnode_id ``` -You can delete a dnode by its ID or by its endpoint. Note that deleting a dnode does not stop its process. You must stop the process after the dnode is deleted. +Note that deleting a dnode does not stop its process. You must stop the process after the dnode is deleted. ## Modify Dnode Configuration diff --git a/docs/en/12-taos-sql/26-udf.md b/docs/en/12-taos-sql/26-udf.md index f86b53592709..4c0f9feded4c 100644 --- a/docs/en/12-taos-sql/26-udf.md +++ b/docs/en/12-taos-sql/26-udf.md @@ -53,7 +53,7 @@ CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 64; ``` -For more information about user-defined functions, see [User-Defined Functions](../../develop/udf). +For more information about user-defined functions, see [User-Defined Functions](https://docs.tdengine.com/develop/udf/). ## Manage UDF diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index f846ede52f43..f5a478997670 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t import Release from "/components/ReleaseV3"; +## 3.2.2.0 + + + ## 3.2.1.0 diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index e2a0a85b9aea..0136a50cb0df 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -16,6 +16,7 @@ import TabItem from '@theme/TabItem'; - JDBC 原生连接:Java 应用在物理节点 1(pnode1)上使用 TSDBDriver 直接调用客户端驱动(libtaos.so 或 taos.dll)的 API 将写入和查询请求发送到位于物理节点 2(pnode2)上的 taosd 实例。 - JDBC REST 连接:Java 应用通过 RestfulDriver 将 SQL 封装成一个 REST 请求,发送给物理节点 2 的 REST 服务器(taosAdapter),通过 REST 服务器请求 taosd 并返回结果。 +- JDBC WebSocket 连接 : 是JDBC REST 连接的一种扩展形式.JDBC Rest使用http协议,JDBC WebSocket使用WebSocket协议.使用JDBC WebSocket可以进行一些JDBC Rest不支持的操作 使用 REST 连接,不依赖 TDengine 客户端驱动,可以跨平台,更加方便灵活。 @@ -1418,6 +1419,123 @@ public static void main(String[] args) throws Exception { > 更多 druid 使用问题请查看[官方说明](https://github.com/alibaba/druid)。 +### 使用连接池整合tio-boot框架 +#### tio-boot讲解 +tio-boot是一个高性能的网络框架,本章节是使用连接池和tio-boot框架整合来管理数据库连接的示例 + +#### TDUtils 存储 DataSource +```java +package com.litongjava.tio.web.hello.config.utils; + +import javax.sql.DataSource; + +public class TDUtils { + + public static DataSource ds; + + public static void setDataSource(DataSource ds) { + TDUtils.ds = ds; + } +} + +``` +- `TDUtils` 类用于全局存储和访问 `DataSource` 实例。 +- 通过 `setDataSource` 方法,可以设置一个 `DataSource` 实例,这通常是在应用启动时完成的。 + +#### 创建配置类 +```java +package com.litongjava.tio.web.hello.config; + +import com.litongjava.jfinal.aop.annotation.ABean; +import com.litongjava.jfinal.aop.annotation.AConfiguration; +import com.litongjava.tio.web.hello.config.utils.TDUtils; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +import javax.sql.DataSource; + +@AConfiguration +public class TdEngineDataSourceConfig { + + @ABean(destroyMethod = "close", priority = 10) + public DataSource hikariDataSource() { + HikariConfig config = new HikariConfig(); + // jdbc properties + String host = "192.168.3.9"; + int port = 6041; + String user = "root"; + String pswd = "taosdata"; + String dbName = "test_ws_parabind"; + String driverClassName = "com.taosdata.jdbc.rs.RestfulDriver"; + // String driverClassName = "com.taosdata.jdbc.TSDBDriver"; + + String jdbcUrl = getJdbcUrl(host, port, user, pswd, dbName); + config.setJdbcUrl(jdbcUrl); + config.setDriverClassName(driverClassName); + // connection pool configurations + config.setMinimumIdle(10); // minimum number of idle connection + config.setMaximumPoolSize(10); // maximum number of connection in the pool + config.setConnectionTimeout(30000); // maximum wait milliseconds for get connection from pool + config.setMaxLifetime(0); // maximum life time for each connection + config.setIdleTimeout(0); // max idle time for recycle idle connection + config.setConnectionTestQuery("select server_status()"); // validation query + + HikariDataSource ds = new HikariDataSource(config); // create datasource + TDUtils.setDataSource(ds); + return ds; + } + + private String getJdbcUrl(String host, int port, String user, String pswd, String dbName) { + // 添加batchfetch=true属性后得到的Websocket连接 + return "jdbc:TAOS-RS://" + host + ":" + port + "/" + dbName + "?user=" + user + "&password=" + pswd + + "&batchfetch=true"; + } +} +``` +- 这是一个配置类,用于创建和配置 `HikariDataSource`。 +- 使用了 `HikariConfig` 来设置数据库连接的详细信息,如主机名、端口、用户、密码等。 +- 配置了连接池的参数,如最小空闲连接数、最大连接数、连接超时时间等。 +- `hikariDataSource` 方法创建了 `HikariDataSource` 的实例,并使用 `TDUtils.setDataSource` 方法存储它。 + +#### 测试获取连接 +```java +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.List; +import java.util.Random; + +import com.litongjava.jfinal.plugin.activerecord.Record; +import com.litongjava.tio.http.server.annotation.RequestPath; +import com.litongjava.tio.web.hello.config.utils.TDUtils; +import com.taosdata.jdbc.ws.TSWSPreparedStatement; + +import lombok.Cleanup; + +@RequestPath("/tdeingien/test") +public class TbEngineTestController { + + public String connection() throws SQLException { + @Cleanup + Connection connection = TDUtils.ds.getConnection(); + String string = connection.toString(); + return string; + } +} +``` +- `TbEngineTestController` 类用于测试数据库连接。 +- `connection` 方法从 `TDUtils` 获取一个 `DataSource` 实例,然后从中获取一个连接,并将连接对象的字符串表示返回。 +- `@Cleanup` 用于在操作完成后自动关闭 Connection + +#### 完整的代码示例 +[示例1](https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-jfinal-plugins-tdengine-study) +[示例2](https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-mybatis-plus-tdengine-study) + + + + ### 更多示例程序 示例程序源码位于 `TDengine/examples/JDBC` 下: diff --git a/docs/zh/08-connector/50-odbc.mdx b/docs/zh/08-connector/50-odbc.mdx index 0668aed7df8a..271c003aff7b 100644 --- a/docs/zh/08-connector/50-odbc.mdx +++ b/docs/zh/08-connector/50-odbc.mdx @@ -48,17 +48,17 @@ TDengine ODBC 支持两种连接 TDengine 数据库方式:Websocket 连接与 4.1 【DSN】:Data Source Name 必填,为新添加的 ODBC 数据源命名 - 4.2【Connection Type】 : 必选,选择连接类型,这里选择 【Websocket】 + 4.2【连接类型】 : 必选,选择连接类型,这里选择 【Websocket】 - 4.3【URL】必填,ODBC 数据源 URL,example: `http://localhost:6041`, 云服务的 url 示例: `https://gw.cloud.taosdata.com?token=your_token` + 4.3【URL】必填,ODBC 数据源 URL,示例: `http://localhost:6041`, 云服务的 url 示例: `https://gw.cloud.taosdata.com?token=your_token` - 4.4【Database】选填,需要连接的默认数据库 + 4.4【数据库】选填,需要连接的默认数据库 - 4.5【User】仅供第5步测试连接使用,选填,数据库用户名,如果不填,TDengine 默认 root + 4.5【用户名】仅供第5步测试连接使用,选填,数据库用户名,如果不填,TDengine 默认 root - 4.6【Password】仅供第5步测试连接使用,选填,数据库用户密码,如果不填,TDengine 默认 taosdata + 4.6【密码】仅供第5步测试连接使用,选填,数据库用户密码,如果不填,TDengine 默认 taosdata -5. 点【Test Connecting...】测试连接情况,如果成功,提示"connecting success" +5. 点【测试连接】测试连接情况,如果成功,提示"成功连接到URL" 6. 点【确定】,即可保存配置并退出 @@ -78,17 +78,17 @@ TDengine ODBC 支持两种连接 TDengine 数据库方式:Websocket 连接与 4.1 【DSN】:Data Source Name 必填,为新添加的 ODBC 数据源命名 - 4.2 【Connection Type】 : 必选,选择连接类型,这里选择 【Native】 原生连接; + 4.2 【连接类型】 : 必选,选择连接类型,这里选择 【Native】 原生连接; - 4.3 【Server】必填,ODBC 数据源 Server 地址,example: `localhost:6030` + 4.3 【服务器】必填,ODBC 数据源 服务器 地址,示例: `localhost:6030` - 4.4 【Database】选填,需要连接的默认数据库 + 4.4 【数据库】选填,需要连接的默认数据库 - 4.5 【User】仅供第5步测试连接使用,选填,数据库用户名,如果不填,TDengine 默认 root + 4.5 【用户名】仅供第5步测试连接使用,选填,数据库用户名,如果不填,TDengine 默认 root - 4.6 【Password】仅供第5步测试连接使用,选填,数据库用户密码,如果不填,TDengine 默认 taosdata + 4.6 【密码】仅供第5步测试连接使用,选填,数据库用户密码,如果不填,TDengine 默认 taosdata -5. 点【Test Connecting...】测试连接情况,如果成功,提示"connecting success" +5. 点【测试连接】测试连接情况,如果成功,提示"连接成功" 6. 点【确定】,即可保存配置并退出 diff --git a/docs/zh/08-connector/assets/odbc-native-config-zh.webp b/docs/zh/08-connector/assets/odbc-native-config-zh.webp index ed9005f2a195..5589bc6cf76a 100644 Binary files a/docs/zh/08-connector/assets/odbc-native-config-zh.webp and b/docs/zh/08-connector/assets/odbc-native-config-zh.webp differ diff --git a/docs/zh/08-connector/assets/odbc-ws-config-zh.webp b/docs/zh/08-connector/assets/odbc-ws-config-zh.webp index c8a6e11011da..6a9cece9d90f 100644 Binary files a/docs/zh/08-connector/assets/odbc-ws-config-zh.webp and b/docs/zh/08-connector/assets/odbc-ws-config-zh.webp differ diff --git a/docs/zh/10-deployment/01-deploy.md b/docs/zh/10-deployment/01-deploy.md index 0ffbb8467b56..6eacd7b7e007 100644 --- a/docs/zh/10-deployment/01-deploy.md +++ b/docs/zh/10-deployment/01-deploy.md @@ -171,17 +171,11 @@ Query OK, 8 row(s) in set (0.001154s) 启动 CLI 程序 taos,执行: -```sql -DROP DNODE "fqdn:port"; -``` - -或者 - ```sql DROP DNODE dnodeId; ``` -通过 “fqdn:port” 或 dnodeID 来指定一个具体的节点都是可以的。其中 fqdn 是被删除的节点的 FQDN,port 是其对外服务器的端口号;dnodeID 可以通过 SHOW DNODES 获得。 +dnodeId 可以通过 SHOW DNODES 获得。 :::warning diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index 23ae0256109d..7010ab8758db 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -9,7 +9,7 @@ description: 查询数据的详细语法 ```sql SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE() | CURRENT_USER() | USER() } -SELECT [hints] [DISTINCT] select_list +SELECT [hints] [DISTINCT] [TAGS] select_list from_clause [WHERE condition] [partition_by_clause] @@ -139,6 +139,9 @@ SELECT d1001.* FROM d1001,d1003 WHERE d1001.ts = d1003.ts; SELECT location, groupid, current FROM d1001 LIMIT 2; ``` +### 别名 +别名的命名规则与列相同,支持直接指定 UTF-8 编码格式的中文别名。 + ### 结果去重 `DISTINCT` 关键字可以对结果集中的一列或多列进行去重,去除的列既可以是标签列也可以是数据列。 @@ -162,6 +165,16 @@ SELECT DISTINCT col_name [, col_name ...] FROM tb_name; ::: +### 标签查询 + +当查询的列只有标签列时,`TAGS` 关键字可以指定返回所有子表的标签列。每个子表只返回一行标签列。 + +返回所有子表的标签列: + +```sql +SELECT TAGS tag_name [, tag_name ...] FROM stb_name +``` + ### 结果集列名 `SELECT`子句中,如果不指定返回结果集合的列名,结果集列名称默认使用`SELECT`子句中的表达式名称作为列名称。此外,用户可使用`AS`来重命名返回结果集合中列的名称。例如: @@ -182,7 +195,7 @@ taos> SELECT ts, ts AS primary_key_ts FROM d1001; 获取一个超级表所有的子表名及相关的标签信息: ```mysql -SELECT DISTINCT TBNAME, location FROM meters; +SELECT TAGS TBNAME, location FROM meters; ``` 建议用户使用 INFORMATION_SCHEMA 下的 INS_TAGS 系统表来查询超级表的子表标签信息,例如获取超级表 meters 所有的子表名和标签值: @@ -434,7 +447,8 @@ SELECT ... FROM (SELECT ... FROM ...) ...; :::info -- 内层查询的返回结果将作为“虚拟表”供外层查询使用,此虚拟表建议起别名,以便于外层查询中方便引用。 +- 内层查询的返回结果将作为“虚拟表”供外层查询使用,此虚拟表建议起别名,以便于外层查询中方便引用。 +- 外层查询支持直接通过列名或`列名`的形式引用内层查询的列或伪列。 - 在内层和外层查询中,都支持普通的表间/超级表间 JOIN。内层查询的计算结果也可以再参与数据子表的 JOIN 操作。 - 内层查询支持的功能特性与非嵌套的查询语句能力是一致的。 - 内层查询的 ORDER BY 子句一般没有意义,建议避免这样的写法以免无谓的资源消耗。 diff --git a/docs/zh/12-taos-sql/21-node.md b/docs/zh/12-taos-sql/21-node.md index 778d699099b3..1f574118386a 100644 --- a/docs/zh/12-taos-sql/21-node.md +++ b/docs/zh/12-taos-sql/21-node.md @@ -27,10 +27,10 @@ SHOW DNODES; ## 删除数据节点 ```sql -DROP DNODE {dnode_id | dnode_endpoint} +DROP DNODE dnode_id ``` -可以用 dnoe_id 或 endpoint 两种方式从集群中删除一个 dnode。注意删除 dnode 不等于停止相应的进程。实际中推荐先将一个 dnode 删除之后再停止其所对应的进程。 +注意删除 dnode 不等于停止相应的进程。实际中推荐先将一个 dnode 删除之后再停止其所对应的进程。 ## 修改数据节点配置 diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index cfc4b92eeed4..b0a81e01a13e 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.2.2.0 + + + ## 3.2.1.0 diff --git a/packaging/MPtestJenkinsfile b/packaging/MPtestJenkinsfile index 1b2e555b88c4..9744e26192a4 100644 --- a/packaging/MPtestJenkinsfile +++ b/packaging/MPtestJenkinsfile @@ -113,16 +113,9 @@ pipeline { timeout(time: 30, unit: 'MINUTES'){ sync_source("${BRANCH_NAME}") sh ''' - if [ "${verMode}" = "all" ];then - verMode="enterprise" - fi - verModeList=${verMode} - for verModeSin in ${verModeList} - do - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m ${verModeSin} -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar - python3 checkPackageRuning.py - done + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m enterprise -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar + python3 checkPackageRuning.py ''' } } @@ -139,24 +132,21 @@ pipeline { verModeList=${verMode} for verModeSin in ${verModeList} do + if [ "${verModeSin}" == "community" ];then + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar + python3 checkPackageRuning.py + + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t deb + python3 checkPackageRuning.py + fi + cd ${TDENGINE_ROOT_DIR}/packaging bash testpackage.sh -m ${verModeSin} -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar python3 checkPackageRuning.py done - ''' - - sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar - python3 checkPackageRuning.py - ''' - - sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t deb - python3 checkPackageRuning.py - dpkg -r tdengine - ''' + ''' } } } @@ -167,28 +157,26 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' if [ "${verMode}" = "all" ];then - verMode="community enterprise" - fi + verMode="community enterprise" + fi verModeList=${verMode} for verModeSin in ${verModeList} do + if [ "${verModeSin}" == "community" ];then + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar + python3 checkPackageRuning.py + + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t rpm + python3 checkPackageRuning.py + fi + cd ${TDENGINE_ROOT_DIR}/packaging bash testpackage.sh -m ${verModeSin} -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar python3 checkPackageRuning.py done ''' - - sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar - python3 checkPackageRuning.py - ''' - - sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t rpm - python3 checkPackageRuning.py - ''' } } } @@ -200,33 +188,32 @@ pipeline { sh ''' if [ "${verMode}" = "all" ];then verMode="community enterprise" - fi + fi + verModeList=${verMode} for verModeSin in ${verModeList} do + if [ "${verModeSin}" == "community" ]; then + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar + python3 checkPackageRuning.py + + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t rpm + python3 checkPackageRuning.py + sudo rpm -e tdengine + fi + cd ${TDENGINE_ROOT_DIR}/packaging bash testpackage.sh -m ${verModeSin} -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar python3 checkPackageRuning.py done ''' - - sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar - python3 checkPackageRuning.py - ''' - - sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t rpm - python3 checkPackageRuning.py - sudo rpm -e tdengine - ''' } } } stage('arm64') { - agent{label 'linux_arm64'} + agent{label ' arm64-1.204 '} steps { timeout(time: 30, unit: 'MINUTES'){ sync_source("${BRANCH_NAME}") @@ -283,7 +270,7 @@ pipeline { } } stage('arm64-client') { - agent{label " linux_arm64 "} + agent{label " arm64-1.204 "} steps { timeout(time: 30, unit: 'MINUTES'){ sh ''' diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index c029b1871ab9..7ed3f0cc241b 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -57,9 +57,9 @@ else arch=$cpuType fi -echo "${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r ${arch} -e taoskeeper" +echo "${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r ${arch} -e taoskeeper -t ver-${tdengine_ver}" echo "$top_dir=${top_dir}" -taoskeeper_binary=`${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r $arch -e taoskeeper` +taoskeeper_binary=`${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r $arch -e taoskeeper -t ver-${tdengine_ver}` echo "taoskeeper_binary: ${taoskeeper_binary}" # copy config files diff --git a/packaging/rpm/makerpm.sh b/packaging/rpm/makerpm.sh index 9cf00364aa24..f895193b6baf 100755 --- a/packaging/rpm/makerpm.sh +++ b/packaging/rpm/makerpm.sh @@ -70,8 +70,8 @@ else fi cd ${top_dir} -echo "${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r ${arch} -e taoskeeper" -taoskeeper_binary=`${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r $arch -e taoskeeper` +echo "${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r ${arch} -e taoskeeper -t ver-${tdengine_ver}" +taoskeeper_binary=`${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r $arch -e taoskeeper -t ver-${tdengine_ver}` echo "taoskeeper_binary: ${taoskeeper_binary}" cd ${package_dir} diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 0622b01f2b6c..aeb110a83c23 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -20,7 +20,7 @@ scriptDir=$(dirname $(readlink -f $0)) version="3.0.1.7" originversion="3.0.1.7" testFile="server" -verMode="communtity" +verMode="community" sourcePath="nas" cpuType="x64" lite="true" @@ -79,7 +79,11 @@ GREEN_UNDERLINE='\033[4;32m' NC='\033[0m' if [[ ${verMode} = "enterprise" ]];then - prePackage="TDengine-enterprise-${testFile}" + if [ "${testFile}" == "server" ];then + prePackage="TDengine-enterprise" + elif [ "${testFile}" == "client" ];then + prePackage="TDengine-enterprise-client" + fi elif [ ${verMode} = "community" ];then prePackage="TDengine-${testFile}" fi @@ -105,9 +109,9 @@ elif [ ${testFile} = "client" ];then elif [ ${testFile} = "tools" ];then tdPath="taosTools-${version}" originTdpPath="taosTools-${originversion}" - packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}" + packageName="${tdPath}-Linux-${cpuType}${packageLite}-comp3.${packageType}" originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}" - installCmd="install-taostools.sh" + installCmd="install-tools.sh" fi @@ -145,41 +149,35 @@ elif [ ${color} = 'BD' ];then fi } - - function wgetFile { + file=$1 + versionPath=$2 + sourceP=$3 + nasServerIP="192.168.1.213" + packagePath="/nas/TDengine/v${versionPath}/${verMode}" + if [ -f ${file} ];then + echoColor YD "${file} already exists ,it will delete it and download it again " + rm -rf ${file} + fi -file=$1 -versionPath=$2 -sourceP=$3 -nasServerIP="192.168.1.213" -packagePath="/nas/TDengine/v${versionPath}/${verMode}" -if [ -f ${file} ];then - echoColor YD "${file} already exists ,it will delete it and download it again " - rm -rf ${file} -fi - -if [[ ${sourceP} = 'web' ]];then - echoColor BD "====download====:wget https://www.taosdata.com/assets-download/3.0/${file}" - wget https://www.taosdata.com/assets-download/3.0/${file} -elif [[ ${sourceP} = 'nas' ]];then - echoColor BD "====download====:scp root@${nasServerIP}:${packagePath}/${file} ." - scp root@${nasServerIP}:${packagePath}/${file} . -fi - + if [[ ${sourceP} = 'web' ]];then + echoColor BD "====download====:wget https://www.taosdata.com/assets-download/3.0/${file}" + wget https://www.taosdata.com/assets-download/3.0/${file} + elif [[ ${sourceP} = 'nas' ]];then + echoColor BD "====download====:scp root@${nasServerIP}:${packagePath}/${file} ." + scp root@${nasServerIP}:${packagePath}/${file} . + fi } function newPath { + buildPath=$1 -buildPath=$1 - -if [ ! -d ${buildPath} ] ;then - echoColor BD "mkdir -p ${buildPath}" - mkdir -p ${buildPath} -else - echoColor YD "${buildPath} already exists" -fi - + if [ ! -d ${buildPath} ] ;then + echoColor BD "mkdir -p ${buildPath}" + mkdir -p ${buildPath} + else + echoColor YD "${buildPath} already exists" + fi } @@ -193,11 +191,26 @@ echoColor G "===== Uninstall all components of TDeingne =====" if command -v rmtaos ;then echoColor YD "uninstall all components of TDeingne:rmtaos" - rmtaos + echo "n" | rmtaos else echoColor YD "os doesn't include TDengine" fi +if [[ -e /etc/os-release ]]; then + osinfo=$(cat /etc/os-release | grep "NAME" | cut -d '"' -f2) || : +else + osinfo="" +fi + +if echo $osinfo | grep -qwi "ubuntu"; then + # echo "This is ubuntu system" + apt remove tdengine -y +elif echo $osinfo | grep -qwi "centos"; then + # echo "This is centos system" + yum remove tdengine -y +fi + + if command -v rmtaostools ;then echoColor YD "uninstall all components of TDeingne:rmtaostools" rmtaostools @@ -205,11 +218,8 @@ else echoColor YD "os doesn't include rmtaostools " fi - -if [[ ${packageName} =~ "server" ]] ;then - echoColor BD " pkill -9 taosd " - pkill -9 taosd -fi +echoColor BD " pkill -9 taosd " +pkill -9 taosd echoColor G "===== new workroom path =====" @@ -329,25 +339,25 @@ cd ${installPath} if [[ ${packageName} =~ "Lite" ]] || ([[ ${packageName} =~ "x64" ]] && [[ ${packageName} =~ "client" ]]) || ([[ ${packageName} =~ "deb" ]] && [[ ${packageName} =~ "server" ]]) || ([[ ${packageName} =~ "rpm" ]] && [[ ${packageName} =~ "server" ]]) ;then echoColor G "===== install taos-tools when package is lite or client =====" cd ${installPath} - if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then - wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web - tar xf taosTools-2.1.3-Linux-x64.tar.gz + if [ ! -f "taosTools-2.5.4-Linux-x64.tar.gz " ];then + wgetFile taosTools-2.5.3-Linux-x64-comp3.tar.gz v2.5.3 web + tar xf taosTools-2.5.3-Linux-x64-comp3.tar.gz fi - cd taosTools-2.1.3 && bash install-taostools.sh + cd taosTools-2.5.3 && bash install-tools.sh elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then echoColor G "===== install taos-tools arm when package is arm64-client =====" cd ${installPath} - if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then - wgetFile taosTools-2.1.3-Linux-arm64.tar.gz v2.1.3 web - tar xf taosTools-2.1.3-Linux-arm64.tar.gz + if [ ! -f "taosTools-2.5.3-Linux-x64-comp3.tar.gz " ];then + wgetFile taosTools-2.5.3-Linux-arm64-comp3.tar.gz v2.5.3 web + tar xf taosTools-2.5.3-Linux-arm64-comp3.tar.gz fi - cd taosTools-2.1.3 && bash install-taostools.sh + cd taosTools-2.5.3 && bash install-tools.sh fi echoColor G "===== start TDengine =====" -if [[ ${packageName} =~ "server" ]] ;then +if [[ ! ${packageName} =~ "client" ]] ;then echoColor BD " rm -rf /var/lib/taos/* && systemctl restart taosd " rm -rf /var/lib/taos/* systemctl restart taosd diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index ae774a328903..f9978e49d018 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -931,11 +931,14 @@ function updateProduct() { if [ -z $1 ]; then install_bin install_service - install_adapter_service - install_adapter_config - install_keeper_service - if [ "${verMode}" != "cloud" ]; then - install_keeper_config + + if [ "${pagMode}" != "lite" ]; then + install_adapter_service + install_adapter_config + if [ "${verMode}" != "cloud" ]; then + install_keeper_service + install_keeper_config + fi fi openresty_work=false @@ -1034,12 +1037,16 @@ function installProduct() { # For installing new install_bin install_service - install_adapter_service - install_adapter_config - install_keeper_service - if [ "${verMode}" != "cloud" ]; then - install_keeper_config + + if [ "${pagMode}" != "lite" ]; then + install_adapter_service + install_adapter_config + if [ "${verMode}" != "cloud" ]; then + install_keeper_service + install_keeper_config + fi fi + openresty_work=false diff --git a/packaging/tools/install_client.sh b/packaging/tools/install_client.sh index 664336333917..9ba952914624 100755 --- a/packaging/tools/install_client.sh +++ b/packaging/tools/install_client.sh @@ -116,23 +116,14 @@ function install_bin() { ${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/* #Make link - [ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || : + [ -x ${install_main_dir}/bin/${clientName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName2} ${bin_link_dir}/${clientName2} || : if [ "$osType" != "Darwin" ]; then [ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || : fi [ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript} || : - [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : - - if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then - #Make link - [ -x ${install_main_dir}/bin/${clientName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName2} ${bin_link_dir}/${clientName2} || : - if [ "$osType" != "Darwin" ]; then - [ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || : - [ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || : - [ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || : - fi - [ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript2} || : - fi + [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : + [ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || : + [ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || : } function clean_lib() { diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 36298a291e49..aa75451e38ee 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -271,11 +271,11 @@ function clean_service() { function remove_data_and_config() { data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` if [ X"$data_dir" == X"" ]; then - data_dir="/var/lib/taos" + data_dir="/var/lib/${clientName2}" fi log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` if [ X"$log_dir" == X"" ]; then - log_dir="/var/log/taos" + log_dir="/var/log/${clientName2}" fi [ -d "${config_dir}" ] && ${csudo}rm -rf ${config_dir}/* [ -d "${data_dir}" ] && ${csudo}rm -rf ${data_dir}/* diff --git a/packaging/tools/remove_client.sh b/packaging/tools/remove_client.sh index 695307254d9b..9321adbaf662 100755 --- a/packaging/tools/remove_client.sh +++ b/packaging/tools/remove_client.sh @@ -38,9 +38,10 @@ if command -v sudo > /dev/null; then fi function kill_client() { - if [ -n "$(ps aux | grep -v grep | grep ${clientName})" ]; then - ${csudo}kill -9 $pid || : - fi + pid=$(ps -ef | grep "${clientName2}" | grep -v "grep" | grep -v "{$uninstallScript2}" | awk '{print $2}') + if [ -n "$(ps aux | grep -v grep | grep ${clientName})" ]; then + ${csudo}kill -9 $pid || : + fi } function clean_bin() { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ac9439061928..7c41bdf20938 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -53,7 +53,8 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { .msgType = TDMT_MND_RETRIEVE_IP_WHITE, .info.ahandle = (void *)0x9527, .info.refId = 0, - .info.noResp = 0}; + .info.noResp = 0, + .info.handle = 0}; SEpSet epset = {0}; dmGetMnodeEpSet(pMgmt->pData, &epset); @@ -153,7 +154,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527, .info.refId = 0, - .info.noResp = 0}; + .info.noResp = 0, + .info.handle = 0}; SRpcMsg rpcRsp = {0}; dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); @@ -193,7 +195,8 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt) { .msgType = TDMT_MND_NOTIFY, .info.ahandle = (void *)0x9527, .info.refId = 0, - .info.noResp = 1}; + .info.noResp = 1, + .info.handle = 0}; SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index af43804db401..632d9b63edbe 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -22,10 +22,8 @@ static void *dmStatusThreadFp(void *param) { int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-status"); - const static int16_t TRIM_FREQ = 30; - int32_t trimCount = 0; - int32_t upTimeCount = 0; - int64_t upTime = 0; + int32_t upTimeCount = 0; + int64_t upTime = 0; while (1) { taosMsleep(200); @@ -38,11 +36,6 @@ static void *dmStatusThreadFp(void *param) { dmSendStatusReq(pMgmt); lastTime = curTime; - trimCount = (trimCount + 1) % TRIM_FREQ; - if (trimCount == 0) { - taosMemoryTrim(0); - } - if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) { upTime = taosGetOsUptime() - tsDndStartOsUptime; tsDndUpTime = TMAX(tsDndUpTime, upTime); @@ -54,7 +47,7 @@ static void *dmStatusThreadFp(void *param) { } SDmNotifyHandle dmNotifyHdl = {.state = 0}; -static void *dmNotifyThreadFp(void *param) { +static void * dmNotifyThreadFp(void *param) { SDnodeMgmt *pMgmt = param; setThreadName("dnode-notify"); @@ -83,6 +76,9 @@ static void *dmMonitorThreadFp(void *param) { int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-monitor"); + const static int16_t TRIM_FREQ = 3600; + int16_t trimCount = 0; + while (1) { taosMsleep(200); if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; @@ -93,6 +89,13 @@ static void *dmMonitorThreadFp(void *param) { if (interval >= tsMonitorInterval) { (*pMgmt->sendMonitorReportFp)(); lastTime = curTime; + + // cron time = TRIM_FREQ * tsMonitorInterval, + // opt later + trimCount = (trimCount + 1) % TRIM_FREQ; + if (trimCount == 0) { + taosMemoryTrim(0); + } } } @@ -126,14 +129,15 @@ static void *dmCrashReportThreadFp(void *param) { setThreadName("dnode-crashReport"); char filepath[PATH_MAX] = {0}; snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP); - char *pMsg = NULL; - int64_t msgLen = 0; + char * pMsg = NULL; + int64_t msgLen = 0; TdFilePtr pFile = NULL; - bool truncateFile = false; - int32_t sleepTime = 200; - int32_t reportPeriodNum = 3600 * 1000 / sleepTime;; + bool truncateFile = false; + int32_t sleepTime = 200; + int32_t reportPeriodNum = 3600 * 1000 / sleepTime; + ; int32_t loopTimes = reportPeriodNum; - + while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (loopTimes++ < reportPeriodNum) { @@ -167,13 +171,13 @@ static void *dmCrashReportThreadFp(void *param) { pMsg = NULL; continue; } - + if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); pFile = NULL; truncateFile = false; } - + taosMsleep(sleepTime); loopTimes = 0; } @@ -299,7 +303,7 @@ void dmStopCrashReportThread(SDnodeMgmt *pMgmt) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - STraceId *trace = &pMsg->info.traceId; + STraceId * trace = &pMsg->info.traceId; dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); switch (pMsg->msgType) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 1ea61f0e9385..10bf39063ff5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -318,6 +318,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle); return -1; } else { + pMsg->info.handle = 0; rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); return 0; } diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 9b35343ed23e..6471fa817d53 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -288,7 +288,7 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB); tNameGetDbName(&name, varDataVal(tmpBuf)); } else { - strncpy(varDataVal(tmpBuf), pCompact->dbname, strlen(pCompact->dbname) + 1); + strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN); } varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf))); colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 21d813c7c0b2..9f1ff584a3c3 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -19,27 +19,27 @@ #include "tqCommon.h" #include "tuuid.h" -#define sndError(...) \ - do { \ - if (sndDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \ - } \ - } while (0) - -#define sndInfo(...) \ - do { \ - if (sndDebugFlag & DEBUG_INFO) { \ - taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \ - } \ - } while (0) - -#define sndDebug(...) \ - do { \ - if (sndDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \ - } \ - } while (0) +// clang-format off +#define sndError(...) do { if (sndDebugFlag & DEBUG_ERROR) {taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__);}} while (0) +#define sndInfo(...) do { if (sndDebugFlag & DEBUG_INFO) { taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__);}} while (0) +#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) +// clang-format on +static STaskId replaceStreamTaskId(SStreamTask *pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { + ASSERT(pTask->info.fillHistory); + pTask->id.taskId = pId->taskId; + pTask->id.streamId = pId->streamId; +} int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); @@ -50,23 +50,21 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamTaskOpenAllUpstreamInput(pTask); - SStreamTask *pSateTask = pTask; - SStreamTask task = {0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - task.id.streamId = pTask->streamTaskId.streamId; - task.id.taskId = pTask->streamTaskId.taskId; - task.pMeta = pTask->pMeta; - pSateTask = &task; + taskId = replaceStreamTaskId(pTask); } - pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1); + pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { sndError("s-task:%s failed to open state for task", pTask->id.idStr); return -1; } else { sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } - + if (pTask->info.fillHistory) { + restoreStreamTaskId(pTask, &taskId); + } int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { @@ -90,8 +88,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; - sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, + pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } char *p = NULL; @@ -99,18 +97,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer if (pTask->info.fillHistory) { sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); } else { sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); } return 0; } @@ -149,7 +147,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; } -int32_t sndInit(SSnode * pSnode) { +int32_t sndInit(SSnode *pSnode) { resetStreamTaskStatus(pSnode->pMeta); startStreamTasks(pSnode->pMeta); return 0; @@ -196,7 +194,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_DEPLOY: { - void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); return tqStreamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true); } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 976ee616f986..ac2486eda195 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -36,7 +36,6 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); - static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) { pInfo->uid = pEntry->uid; pInfo->version = pEntry->version; @@ -562,6 +561,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaULock(pMeta); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } nStbEntry.version = version; @@ -692,6 +692,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); metaULock(pMeta); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } // clear idx flag @@ -1076,7 +1077,7 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) { return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); } -static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t* pSysTbl) { +static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl) { void *pData = NULL; int nData = 0; int rc = 0; @@ -1146,6 +1147,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); } metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } } tDecoderClear(&tdc); @@ -1865,6 +1867,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb } tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } tdbTbcClose(pCtbIdxc); return 0; @@ -2122,7 +2125,7 @@ int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeM if (!tsTtlChangeOnWrite) return 0; metaWLock(pMeta); - int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); + int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); metaULock(pMeta); return ret; } @@ -2228,15 +2231,14 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { nTagData = tDataTypes[pTagColumn->type].bytes; } - if (pTagData != NULL) { - if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, - pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { - ret = -1; - goto end; - } - tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); + if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, + pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { + ret = -1; + goto end; } + tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } } end: diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 75fe3c51dde6..9d4c20493a02 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -54,10 +54,10 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) -static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) { +static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) { SNode* pNode; int32_t idx = 0; - FOREACH(pNode, pTargets) { + FOREACH(pNode, pScan->pTargets) { if (nodeType(pNode) == QUERY_NODE_COLUMN) { SColumnNode* pCol = (SColumnNode*)pNode; SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx); @@ -65,6 +65,19 @@ static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) } idx++; } + + for (; idx < pBlock->pDataBlock->size; ++idx) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx); + if (pScan->scan.pScanPseudoCols) { + FOREACH(pNode, pScan->scan.pScanPseudoCols) { + STargetNode* pTarget = (STargetNode*)pNode; + if (pColInfo->info.slotId == pTarget->slotId) { + pColInfo->info.colId = 0; + break; + } + } + } + } } SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, @@ -127,12 +140,12 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); - setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode->pTargets); + setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output - setColIdForCacheReadBlock(pInfo->pRes, pScanNode->pTargets); + setColIdForCacheReadBlock(pInfo->pRes, pScanNode); } initResultSizeInfo(&pOperator->resultInfo, capacity); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1ebc7ad3c6e6..205cd7d3ef24 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -475,6 +475,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSou if (isNull) { colDataSetVal(pColInfo, pBlock->info.rows, NULL, true); } else { + if (!pSrcColInfo->pData) continue; char* pData = colDataGetData(pSrcColInfo, *rowIndex); colDataSetVal(pColInfo, pBlock->info.rows, pData, false); } @@ -900,7 +901,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx for (int32_t i = 0; i < numCols; ++i) { SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - if (pColInfoData->varmeta.offset[row] != -1) { + if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) { char* p = colDataGetData(pColInfoData, row); sz += varDataTLen(p); } @@ -970,7 +971,6 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); nMergedRows += pHandle->pDataBlock->info.rows; - blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; bufInc = getPageBufIncForRow(minBlk, minRow, 0); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7376aa3a9cf6..4dbdeb41c048 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -26,7 +26,7 @@ int32_t qwStopAllTasks(SQWorker *mgmt) { void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; - void *key = taosHashGetKey(pIter, NULL); + void * key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, tId, eId); QW_LOCK(QW_WRITE, &ctx->lock); @@ -65,7 +65,7 @@ int32_t qwStopAllTasks(SQWorker *mgmt) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; - SQWSchStatus *sch = NULL; + SQWSchStatus * sch = NULL; QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch)); @@ -90,7 +90,6 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re QW_RET(TSDB_CODE_SUCCESS); } - int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; @@ -192,12 +191,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } else { QW_TASK_DLOG("dyn task qExecTask done, useconds:%" PRIu64, useconds); } - + ctx->queryExecDone = true; } dsEndPut(sinkHandle, useconds); - + if (queryStop) { *queryStop = true; } @@ -240,7 +239,6 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) { return false; } - int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; @@ -258,7 +256,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) return TSDB_CODE_OUT_OF_MEMORY; } - void *key = NULL; + void * key = NULL; size_t keyLen = 0; int32_t i = 0; STaskStatus status = {0}; @@ -321,7 +319,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (!ctx->dynamicTask) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); } - + if (NULL == rsp) { QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); *pOutput = output; @@ -427,11 +425,10 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes return TSDB_CODE_SUCCESS; } - -int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, int32_t code) { +int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32_t code) { if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - void *rsp = NULL; + void * rsp = NULL; int32_t dataLen = 0; SOutputData sOutput = {0}; if (TSDB_CODE_SUCCESS == code) { @@ -453,13 +450,13 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - + qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; - + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); - } + } } return TSDB_CODE_SUCCESS; @@ -474,14 +471,14 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryEnd, true, false)) { QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd); return TSDB_CODE_ACTION_IN_PROGRESS; - } + } #else ctx->queryExecDone = false; ctx->queryEnd = false; #endif dsReset(ctx->sinkHandle); - + qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); @@ -498,7 +495,6 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg return TSDB_CODE_SUCCESS; } - int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; @@ -609,7 +605,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; - SQWTaskCtx *ctx = NULL; + SQWTaskCtx * ctx = NULL; SRpcHandleInfo connInfo = {0}; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -712,11 +708,11 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { int32_t code = 0; bool queryRsped = false; - SSubplan *plan = NULL; + SSubplan * plan = NULL; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; - SQWTaskCtx *ctx = NULL; + SQWTaskCtx * ctx = NULL; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); @@ -749,7 +745,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_APP_ERROR); } - //qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); + // qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); ctx->level = plan->level; ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo); @@ -764,7 +760,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->queryExecDone = true; ctx->queryEnd = true; } - + _return: taosMemoryFree(sql); @@ -779,10 +775,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { } int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { - SQWTaskCtx *ctx = NULL; + SQWTaskCtx * ctx = NULL; int32_t code = 0; SQWPhaseInput input = {0}; - void *rsp = NULL; + void * rsp = NULL; int32_t dataLen = 0; bool queryStop = false; bool qComplete = false; @@ -800,7 +796,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (!queryStop) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); } - + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); @@ -872,8 +868,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; int32_t dataLen = 0; bool locked = false; - SQWTaskCtx *ctx = NULL; - void *rsp = NULL; + SQWTaskCtx * ctx = NULL; + void * rsp = NULL; SQWPhaseInput input = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); @@ -951,8 +947,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwFreeFetchRsp(rsp); rsp = NULL; } - } else { - //qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code); + } else { + // qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code); } QW_RET(TSDB_CODE_SUCCESS); @@ -994,6 +990,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + } else { + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); } } @@ -1002,6 +1000,10 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (ctx) { + if (qwMsg->connInfo.handle != ctx->ctrlConnInfo.handle) { + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); + } + qwReleaseTaskCtx(mgmt, ctx); } @@ -1029,7 +1031,7 @@ int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (ctx->explain && !ctx->explainRsped) { QW_ERR_RET(qwSendExplainResponse(QW_FPARAMS(), ctx)); } - break; + break; default: QW_ELOG("Invalid task notify type %d", qwMsg->msgType); QW_ERR_JRET(TSDB_CODE_INVALID_MSG); @@ -1056,11 +1058,10 @@ int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_RET(TSDB_CODE_SUCCESS); } - int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; - SQWSchStatus *sch = NULL; + SQWSchStatus * sch = NULL; if (qwMsg->code) { QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); @@ -1119,8 +1120,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWSchStatus *sch = NULL; int32_t taskNum = 0; - SQWHbInfo *rspList = NULL; - SArray *pExpiredSch = NULL; + SQWHbInfo * rspList = NULL; + SArray * pExpiredSch = NULL; int32_t code = 0; qwDbgDumpMgmtInfo(mgmt); @@ -1151,7 +1152,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { return; } - void *key = NULL; + void * key = NULL; size_t keyLen = 0; int32_t i = 0; int64_t currentMs = taosGetTimestampMs(); @@ -1206,7 +1207,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { int32_t code = 0; - SSubplan *plan = NULL; + SSubplan * plan = NULL; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx ctx = {0}; @@ -1371,7 +1372,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SDataSinkStat sinkStat = {0}; dsDataSinkGetCacheSize(&sinkStat); @@ -1395,10 +1396,10 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) { - SQWorker *mgmt = (SQWorker *)pMgmt; + SQWorker * mgmt = (SQWorker *)pMgmt; int32_t code = 0; - SQWTaskCtx *ctx = NULL; - SSubplan *plan = (SSubplan *)qwMsg->msg; + SQWTaskCtx * ctx = NULL; + SSubplan * plan = (SSubplan *)qwMsg->msg; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; @@ -1455,11 +1456,11 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void **pRsp, SArray *explainRes) { - SQWorker *mgmt = (SQWorker *)pMgmt; + SQWorker * mgmt = (SQWorker *)pMgmt; int32_t code = 0; int32_t dataLen = 0; SQWTaskCtx *ctx = NULL; - void *rsp = NULL; + void * rsp = NULL; bool queryStop = false; SQWPhaseInput input = {0}; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index f987420ecef1..817b9dc5cd29 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1173,7 +1173,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.queryId = pJob->queryId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; - qMsg.execId = pTask->execId; + qMsg.execId = *(int32_t*)param; msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); if (msgSize < 0) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index d96c01fc76d8..9fea21f6dc42 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -371,14 +371,13 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, pCtx->roundTotal = pEpSet->numOfEps; } - if (pCtx->roundTimes >= pCtx->roundTotal) { int64_t nowTs = taosGetTimestampMs(); int64_t lastTime = nowTs - pCtx->startTs; if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - pJob->noMoreRetry = true; + pJob->noMoreRetry = true; SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode)); } @@ -418,7 +417,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; - pTask->childReady = 0; + pTask->childReady = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } @@ -440,7 +439,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; + SEp * pEp = &addr->epSet.eps[addr->epSet.inUse]; SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode)); } else { @@ -505,11 +504,11 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i pLevel->taskExecDoneNum = 0; pLevel->taskLaunchedNum = 0; } - + SCH_RESET_JOB_LEVEL_IDX(pJob); - + code = schDoTaskRedirect(pJob, pTask, pData, rspCode); - + taosMemoryFreeClear(pData->pData); taosMemoryFreeClear(pData->pEpSet); @@ -627,7 +626,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo pTask->maxRetryTimes); return TSDB_CODE_SUCCESS; } - + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; pTask->maxRetryTimes++; @@ -862,7 +861,8 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { while (nodeInfo) { if (nodeInfo->handle) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL); + void *pExecId = taosHashGetKey(nodeInfo, NULL); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); SCH_TASK_DLOG("start to drop task's %dth execNode", i); } else { SCH_TASK_DLOG("no need to drop task %dth execNode", i); @@ -901,11 +901,10 @@ int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType return TSDB_CODE_SUCCESS; } - int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; - SSchJob *pJob = NULL; + SSchJob * pJob = NULL; qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port); @@ -952,7 +951,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { } SSchTask *pTask = NULL; - SSchJob *pJob = NULL; + SSchJob * pJob = NULL; for (int32_t i = 0; i < resNum; ++i) { SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i); @@ -1014,7 +1013,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { pTask->msgLen); SCH_ERR_RET(code); } else if (tsQueryPlannerTrace) { - char *msg = NULL; + char * msg = NULL; int32_t msgLen = 0; SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen)); SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); @@ -1070,7 +1069,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; - SSchJob *pJob = schAcquireJob(pCtx->jobRid); + SSchJob * pJob = schAcquireJob(pCtx->jobRid); if (NULL == pJob) { qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid); taosMemoryFree(param); @@ -1185,8 +1184,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { void schHandleTimerEvent(void *param, void *tmrId) { SSchTimerParam *pTimerParam = (SSchTimerParam *)param; - SSchTask *pTask = NULL; - SSchJob *pJob = NULL; + SSchTask * pTask = NULL; + SSchJob * pJob = NULL; int32_t code = 0; int64_t rId = pTimerParam->rId; @@ -1269,7 +1268,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t int32_t code = TSDB_CODE_SUCCESS; SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type)); - + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; @@ -1277,7 +1276,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_LOCK_TASK(pTask); code = schNotifyTaskOnExecNode(pJob, pTask, type); SCH_UNLOCK_TASK(pTask); - + if (TSDB_CODE_SUCCESS != code) { break; } @@ -1289,13 +1288,12 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_RET(code); } - int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL)); } int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { - void *pRsp = NULL; + void * pRsp = NULL; int32_t code = 0; SArray *explainRes = NULL; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ce391eeadbf8..6c5b1ef99468 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -394,6 +394,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->bkdChkptMgt = bkdMgtCreate(tpath); + taosThreadMutexInit(&pMeta->backendMutex, NULL); return pMeta; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 19b2dfc300e9..615d5679e6cc 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -115,9 +115,13 @@ typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { - void* handle; - int64_t refId; - void* pThrd; + void* handle; + int64_t refId; + void* pThrd; + queue q; + int8_t inited; + SRWLatch latch; + } SExHandle; typedef struct { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 936d11c151c8..a84a5fbce43e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -92,6 +92,7 @@ typedef struct SCliMsg { int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) + queue seqq; // } SCliMsg; typedef struct SCliThrd { @@ -123,9 +124,6 @@ typedef struct SCliThrd { SCliMsg* stopMsg; bool quit; - - int newConnCount; - SHashObj* msgCount; } SCliThrd; typedef struct SCliObj { @@ -204,7 +202,6 @@ static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); -static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); @@ -261,10 +258,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } \ if (i == sz) { \ pMsg = NULL; \ - tDebug("msg not found, %" PRIu64 "", ahandle); \ } else { \ pMsg = transQueueRm(&conn->cliMsgs, i); \ - tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) @@ -342,6 +337,34 @@ bool cliMaySendCachedMsg(SCliConn* conn) { _RETURN: return false; } +bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { + if (refId == 0) return false; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh == NULL) { + tDebug("release conn %p, refId: %" PRId64 "", conn, refId); + return false; + } + taosWLockLatch(&exh->latch); + if (exh->handle == NULL) exh->handle = conn; + exh->inited = 1; + if (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + taosWUnLockLatch(&exh->latch); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + transCtxMerge(&conn->ctx, &t->ctx->appCtx); + transQueuePush(&conn->cliMsgs, t); + tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + cliSend(conn); + return true; + } + taosWUnLockLatch(&exh->latch); + tDebug("empty conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + return false; +} + void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -437,8 +460,13 @@ void cliHandleResp(SCliConn* conn) { return; } } - destroyCmsg(pMsg); + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + tDebug("conn %p msg refId: %" PRId64 "", conn, refId); + destroyCmsg(pMsg); + if (cliConnSendSeqMsg(refId, conn)) { + return; + } if (cliMaySendCachedMsg(conn) == true) { return; } @@ -449,7 +477,21 @@ void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } - +static void cliDestroyMsgInExhandle(int64_t refId) { + if (refId == 0) return; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh) { + taosWLockLatch(&exh->latch); + while (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + destroyCmsg(t); + } + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); + } +} void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (transQueueEmpty(&pConn->cliMsgs)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { @@ -508,6 +550,8 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } if (pMsg == NULL || (pMsg && pMsg->type != Release)) { + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + cliDestroyMsgInExhandle(refId); if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; } @@ -582,12 +626,12 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); STrans* pTranInst = pThrd->pTransInst; if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key) + 1); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -611,6 +655,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_INIT(&conn->q); + tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); if (conn->task != NULL) { transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); @@ -622,11 +667,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key) + 1); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -674,7 +719,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } list->numOfConn++; } - tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum); + tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key); return NULL; } @@ -685,6 +730,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_INIT(&conn->q); + tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); if (conn->task != NULL) { transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); @@ -712,7 +758,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); if (conn->list == NULL) { - conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1); + conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); } SConnList* pList = conn->list; @@ -737,14 +783,15 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; + tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); - if (conn->list->size >= 20) { + if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime)); } } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -756,7 +803,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); + exh->refId = transAddExHandle(transGetRefMgt(), exh); + conn->refId = exh->refId; if (conn->refId == -1) { @@ -775,8 +826,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { if (exh == NULL) { return -1; } + taosWLockLatch(&exh->latch); exh->handle = conn; exh->pThrd = conn->hostThrd; + taosWUnLockLatch(&exh->latch); + conn->refId = exh->refId; transReleaseExHandle(transGetRefMgt(), handle); @@ -838,13 +892,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { uv_timer_init(pThrd->loop, timer); } timer->data = conn; - conn->timer = timer; + conn->timer = timer; conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); - transQueueInit(&conn->cliMsgs, NULL); - transInitBuffer(&conn->readBuf); QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; @@ -872,12 +924,11 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { connList->size--; } else { if (pThrd->pool) { - SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1); + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); if (connList != NULL) connList->list->numOfConn--; } } conn->list = NULL; - pThrd->newConnCount--; transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -954,7 +1005,7 @@ static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = transReqQueueRemove(req); if (pConn == NULL) return; - SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL; + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); if (pMsg != NULL) { int64_t cost = taosGetTimestampUs() - pMsg->st; if (cost > 1000 * 50) { @@ -1056,6 +1107,7 @@ void cliSend(SCliConn* pConn) { STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); + tDebug("malloc memory: %p", pMsg->pCont); pMsg->contLen = 0; } @@ -1181,7 +1233,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(pList->port); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1272,7 +1323,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -1284,7 +1335,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); } } } else { @@ -1383,7 +1434,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + taosRLockLatch(&exh->latch); SCliConn* conn = exh->handle; + taosRUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); @@ -1416,7 +1470,9 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) *ignore = true; return NULL; } else { + taosRLockLatch(&exh->latch); conn = exh->handle; + taosRUnLockLatch(&exh->latch); if (conn == NULL) { conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); @@ -1430,7 +1486,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { - tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); + tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr); } return conn; } @@ -1464,7 +1520,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { } static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { uint32_t addr = 0; - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { @@ -1473,7 +1529,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) return addr; } - taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } else { addr = *v; } @@ -1483,13 +1539,13 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later uint32_t addr = taosGetIpv4FromFqdn(fqdn); if (addr != 0xffffffff) { - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); if (addr != *v) { char old[64] = {0}, new[64] = {0}; tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } } return; @@ -1529,22 +1585,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } - - if (rpcDebugFlag & DEBUG_TRACE) { - if (tmsgIsValid(pMsg->msg.msgType)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count + 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } - } - char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; @@ -1602,7 +1642,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(port); tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1698,8 +1737,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key)); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, strlen(key)); if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); QUEUE_INIT(&pBatchList->wq); @@ -1723,7 +1761,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*)); + taosHashPut(pThrd->batchCache, key, strlen(key), &pBatchList, sizeof(void*)); } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); @@ -1793,21 +1831,6 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - if (rpcDebugFlag & DEBUG_TRACE) { - void* pIter = taosHashIterate(pThrd->msgCount, NULL); - while (pIter != NULL) { - int* count = pIter; - size_t len = 0; - char* key = taosHashGetKey(pIter, &len); - if (*count != 0) { - tDebug("key: %s count: %d", key, *count); - } - - pIter = taosHashIterate(pThrd->msgCount, pIter); - } - tDebug("all conn count: %d", pThrd->newConnCount); - } - int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { cliNoBatchDealReq(&wq, pThrd); @@ -1855,7 +1878,7 @@ void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { } } -void cliIteraConnMsgs(SCliConn* conn) { +void cliConnFreeMsgs(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1878,9 +1901,10 @@ void cliIteraConnMsgs(SCliConn* conn) { bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; - tDebug("ahandle = %" PRIu64 "", ahandle); SCliMsg* pMsg = NULL; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, + conn->refId); transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); @@ -1889,11 +1913,14 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i); if (cliMsg->type == Release) { ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req"); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, + conn->refId); + cliDestroyConn(conn, true); return true; } } - cliIteraConnMsgs(conn); + cliConnFreeMsgs(conn); tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); destroyCmsg(pMsg); @@ -1946,8 +1973,12 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, static FORCE_INLINE void destroyUserdata(STransMsg* userdata) { if (userdata->pCont == NULL) { + tDebug("empty pCont"); return; } + if (userdata->contLen == 0) { + tDebug("free memory: %p", userdata->pCont); + } transFreeMsg(userdata->pCont); userdata->pCont = NULL; } @@ -1957,6 +1988,7 @@ static FORCE_INLINE void destroyCmsg(void* arg) { if (pMsg == NULL) { return; } + tDebug("free memory:%p, free ctx: %p", pMsg, pMsg->ctx); transDestroyConnCtx(pMsg->ctx); destroyUserdata(&pMsg->msg); @@ -1970,12 +2002,12 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; - tDebug("destroy Ahandle A"); + tTrace("destroy Ahandle A"); if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - tDebug("destroy Ahandle B"); + tTrace("destroy Ahandle B"); pThrd->destroyAhandleFp(pMsg->ctx->ahandle); } - tDebug("destroy Ahandle C"); + tTrace("destroy Ahandle C"); transDestroyConnCtx(pMsg->ctx); destroyUserdata(&pMsg->msg); @@ -1999,11 +2031,9 @@ static SCliThrd* createThrdObj(void* trans) { taosMemoryFree(pThrd); return NULL; } - if (pTransInst->supportBatch) { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb); - } else { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); - } + + int32_t nSync = pTransInst->supportBatch ? 4 : 8; + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); if (pThrd->asyncPool == NULL) { tError("failed to init async pool"); uv_loop_close(pThrd->loop); @@ -2044,8 +2074,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->quit = false; - pThrd->newConnCount = 0; - pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); return pThrd; } static void destroyThrdObj(SCliThrd* pThrd) { @@ -2091,7 +2119,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); - taosHashCleanup(pThrd->msgCount); taosMemoryFree(pThrd); } @@ -2110,14 +2137,7 @@ void cliSendQuit(SCliThrd* thrd) { void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { if (uv_handle_get_type(handle) == UV_TIMER) { - // SCliConn* pConn = handle->data; - // if (pConn != NULL && pConn->timer != NULL) { - // SCliThrd* pThrd = pConn->hostThrd; - // uv_timer_stop((uv_timer_t*)handle); - // handle->data = NULL; - // taosArrayPush(pThrd->timerList, &pConn->timer); - // pConn->timer = NULL; - // } + // do nothing } else { uv_read_stop((uv_stream_t*)handle); } @@ -2153,29 +2173,27 @@ static void doCloseIdleConn(void* param) { taosMemoryFree(arg); } -static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; +static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { + if (!(rpcDebugFlag & DEBUG_DEBUG)) { + return; + } STransConnCtx* pCtx = pMsg->ctx; + STraceId* trace = &pMsg->msg.info.traceId; + char tbuf[512] = {0}; + EPSET_TO_STR(&pCtx->epSet, tbuf); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, + pCtx->retryNextInterval); + return; +} +static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { + STrans* pTransInst = pThrd->pTransInst; - if (rpcDebugFlag & DEBUG_DEBUG) { - STraceId* trace = &pMsg->msg.info.traceId; - char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); - tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, - pCtx->retryStep, pCtx->retryNextInterval); - } + cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; arg->param2 = pThrd; - - transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); -} - -FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { - if (*val != exp) { - *val = newVal; - } + transDQSched(pThrd->delayQueue, doDelayTask, arg, pMsg->ctx->retryNextInterval); } FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { @@ -2397,20 +2415,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } - if (rpcDebugFlag & DEBUG_TRACE) { - if (tmsgIsValid(pResp->msgType - 1)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 0; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count - 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } - } if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pSem) { @@ -2478,6 +2482,8 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh == NULL) { return NULL; + } else { + tDebug("conn %p got", exh->handle); } if (exh->pThrd == NULL && trans != NULL) { @@ -2521,7 +2527,7 @@ int transReleaseCliHandle(void* handle) { cmsg->ctx = pCtx; STraceId* trace = &tmsg.info.traceId; - tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid); + tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg); if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { destroyCmsg(cmsg); @@ -2529,21 +2535,7 @@ int transReleaseCliHandle(void* handle) { } return 0; } - -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { - transFreeMsg(pReq->pCont); - return -1; - } - - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); - if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; - } - +static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); epsetAssign(&pCtx->epSet, pEpSet); @@ -2560,12 +2552,48 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; + QUEUE_INIT(&cliMsg->seqq); + return cliMsg; +} + +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return -1; + } + + int64_t handle = (int64_t)pReq->info.handle; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + if (handle != 0) { + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); + if (exh != NULL) { + taosWLockLatch(&exh->latch); + if (exh->handle == NULL && exh->inited != 0) { + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); + taosWUnLockLatch(&exh->latch); + tDebug("msg refId: %" PRId64 "", handle); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; + } + exh->inited = 1; + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), handle); + } + } + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { - destroyCmsg(cliMsg); + EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); + if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return -1; } @@ -2742,6 +2770,9 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); + exh->refId = transAddExHandle(transGetRefMgt(), exh); tDebug("pre alloc refId %" PRId64 "", exh->refId); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 759a4d79dbf4..fab786497cb1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -87,6 +87,7 @@ void transFreeMsg(void* msg) { if (msg == NULL) { return; } + // tDebug("free memory: %p", msg); taosMemoryFree((char*)msg - sizeof(STransMsgHead)); } int transSockInfo2Str(struct sockaddr* sockname, char* dst) { @@ -298,7 +299,7 @@ void transCtxCleanup(STransCtx* ctx) { ctx->freeFunc(iter->val); iter = taosHashIterate(ctx->args, iter); } - ctx->freeFunc(ctx->brokenVal.val); + if (ctx->freeFunc) ctx->freeFunc(ctx->brokenVal.val); taosHashCleanup(ctx->args); ctx->args = NULL; } @@ -658,6 +659,10 @@ void transDestroyExHandle(void* handle) { if (handle == NULL) { return; } + SExHandle* eh = handle; + if (!QUEUE_IS_EMPTY(&eh->q)) { + tDebug("handle %p mem leak", handle); + } taosMemoryFree(handle); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 362d38b50573..88659dbb9ffc 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -756,9 +756,12 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; - pConn->status = ConnRelease; transClearBuffer(&pConn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); + if (pConn->status != ConnAcquire) { + return true; + } + pConn->status = ConnRelease; STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); @@ -1081,6 +1084,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { exh->handle = pConn; exh->pThrd = pThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); transAcquireExHandle(transGetRefMgt(), exh->refId); STrans* pTransInst = pThrd->pTransInst; @@ -1116,6 +1120,7 @@ static int reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); transAcquireExHandle(transGetRefMgt(), exh->refId); conn->refId = exh->refId; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 8edfb352abb6..27cd487ce6a6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1069,6 +1069,7 @@ e ,,y,script,./test.sh -f tsim/query/unionall_as_table.sim ,,y,script,./test.sh -f tsim/query/multi_order_by.sim ,,y,script,./test.sh -f tsim/query/sys_tbname.sim +,,y,script,./test.sh -f tsim/query/sort-pre-cols.sim ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/groupby_distinct.sim ,,y,script,./test.sh -f tsim/query/event.sim @@ -1319,6 +1320,7 @@ e ,,y,script,./test.sh -f tsim/tag/tinyint.sim ,,y,script,./test.sh -f tsim/tag/drop_tag.sim ,,y,script,./test.sh -f tsim/tag/tbNameIn.sim +,,y,script,./test.sh -f tsim/tag/create_null_tag.sim ,,y,script,./test.sh -f tmp/monitor.sim ,,y,script,./test.sh -f tsim/tagindex/add_index.sim ,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim diff --git a/tests/script/tsim/query/sort-pre-cols.sim b/tests/script/tsim/query/sort-pre-cols.sim new file mode 100644 index 000000000000..ef69725d87b6 --- /dev/null +++ b/tests/script/tsim/query/sort-pre-cols.sim @@ -0,0 +1,17 @@ + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql create database d +sql use d +sql create table st(ts timestamp, v int) tags(lj json) +sql insert into ct1 using st tags('{"instance":"200"}') values(now, 1)(now+1s, 2); +sql insert into ct2 using st tags('{"instance":"200"}') values(now+2s, 3)(now+3s, 4); +sql select to_char(ts, 'yyyy-mm-dd hh24:mi:ss') as time, irate(v) from st group by to_char(ts, 'yyyy-mm-dd hh24:mi:ss'), lj->'instance' order by time; +print $data01 +if $data01 != 0.000000000 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tag/create_null_tag.sim b/tests/script/tsim/tag/create_null_tag.sim new file mode 100644 index 000000000000..ad82a84f658c --- /dev/null +++ b/tests/script/tsim/tag/create_null_tag.sim @@ -0,0 +1,54 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======================== dnode1 start + +$dbPrefix = ta_cr_db +$tbPrefix = ta_cr_tb +$mtPrefix = ta_cr_mt +$tbNum = 10 +$rowNum = 20 +$totalNum = 200 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i + +sql create database $db +sql use $db + +print =============== step2 +$i = 2 +$mt = $mtPrefix . $i +$tb = $tbPrefix . $i + +sql create table $mt (ts timestamp, tbcol int) TAGS(t1 binary(10), t2 binary(10)) + +$idxName = ta_cr_mt2(t2) +sql create index t2idx on $idxName +sql create table $tb using $mt tags("table", "") +sql insert into $tb values(now, 1) + +$i = 3 +$tb = $tbPrefix . $i + +sql create table $tb using $mt tags("table1", "") +sql insert into $tb values(now, 1) + + + +sql select * from $mt where t1 = "table" + +if $rows != 1 then + return -1 +endi + +sql select * from $mt where t2 = "" + +if $rows != 2 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/1-insert/database_pre_suf.py b/tests/system-test/1-insert/database_pre_suf.py index 2e993b9a405e..76e52b1bc0b5 100755 --- a/tests/system-test/1-insert/database_pre_suf.py +++ b/tests/system-test/1-insert/database_pre_suf.py @@ -556,6 +556,7 @@ def run(self): print("taos -f sql start!") taos_cmd1 = "taos -f %s/%s.sql" % (self.testcasePath,self.testcaseFilename) _ = subprocess.check_output(taos_cmd1, shell=True) + time.sleep(5) print("taos -f sql over!") @@ -566,6 +567,7 @@ def run(self): def stop(self): tdSql.close() + time.sleep(5) tdLog.success("%s successfully executed" % __file__) diff --git a/tests/system-test/2-query/nestedQuery_26.py b/tests/system-test/2-query/nestedQuery_26.py index 9d5f31d1e067..0a42a36c90e1 100755 --- a/tests/system-test/2-query/nestedQuery_26.py +++ b/tests/system-test/2-query/nestedQuery_26.py @@ -62,6 +62,7 @@ def run(self): endTime = time.time() + time.sleep(5) print("total time %ds" % (endTime - startTime)) @@ -69,6 +70,7 @@ def run(self): def stop(self): tdSql.close() + time.sleep(5) tdLog.success("%s successfully executed" % __file__) diff --git a/tests/system-test/2-query/stablity_1.py b/tests/system-test/2-query/stablity_1.py index bde92fc9bd19..a68628a9594b 100755 --- a/tests/system-test/2-query/stablity_1.py +++ b/tests/system-test/2-query/stablity_1.py @@ -60,12 +60,14 @@ def run(self): self.time_nest(['TIMEDIFF_2']) endTime = time.time() + time.sleep(5) print("total time %ds" % (endTime - startTime)) def stop(self): tdSql.close() + time.sleep(5) tdLog.success("%s successfully executed" % __file__)