Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Enhancement] support lookup function #316

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions docs/content/connector-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,29 @@ Follow these steps to deploy the Flink connector:

### Common parameters

| Parameter | Required | Data type | Description |
| --------------------------- | -------- | --------- | ------------------------------------------------------------ |
| connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to `starrocks`. |
| scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: `<fe_host>:<fe_http_port>`. The default port is `8030`. You can specify multiple addresses, which must be separated with a comma (,). Example: `192.168.xxx.xxx:8030,192.168.xxx.xxx:8030`. |
| jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: `jdbc:mysql://<fe_host>:<fe_query_port>`. The default port number is `9030`. |
| username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See [User privileges](../administration/User_privilege.md). |
| password | Yes | STRING | The password of your StarRocks cluster account. |
| database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. |
| table-name | Yes | STRING | The name of the StarRocks table you want to read. |
| scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: `1000`. If the amount of time taken to establish the connection exceeds this limit, the read task fails. |
| Parameter | Required | Data type | Description |
| --------------------------- | -------- |-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to `starrocks`. |
| scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: `<fe_host>:<fe_http_port>`. The default port is `8030`. You can specify multiple addresses, which must be separated with a comma (,). Example: `192.168.xxx.xxx:8030,192.168.xxx.xxx:8030`. |
| jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: `jdbc:mysql://<fe_host>:<fe_query_port>`. The default port number is `9030`. |
| username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See [User privileges](../administration/User_privilege.md). |
| password | Yes | STRING | The password of your StarRocks cluster account. |
| database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. |
| table-name | Yes | STRING | The name of the StarRocks table you want to read. |
| scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: `1000`. If the amount of time taken to establish the connection exceeds this limit, the read task fails. |
| scan.params.keep-alive-min | No | STRING | The maximum amount of time during which the read task keeps alive. The keep-alive time is checked on a regular basis by using a polling mechanism. Unit: minutes. Default value: `10`. We recommend that you set this parameter to a value that is greater than or equal to `5`. |
| scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: `600`. If no read result is returned after the time duration elapses, the read task stops. |
| scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: `1073741824`, equal to 1 GB. |
| scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: `1`. If the number of times that the read task is retried exceeds this limit, the read task returns errors. |
| scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: `600`. If no read result is returned after the time duration elapses, the read task stops. |
| scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: `1073741824`, equal to 1 GB. |
| scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: `1`. If the number of times that the read task is retried exceeds this limit, the read task returns errors. |
| lookup.cache | No | ENUM | The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database). |
| lookup.max-retries | No | INTEGER | The max retry times if lookup database failed. Default value is 3 |
| lookup.partial-cache.expire-after-access | No | Duration | The max time to live for each rows in lookup cache after accessing the entry in the cache."lookup.cache" must be set to "PARTIAL" to use this option. |
| lookup.partial-cache.expire-after-write | No | Duration | The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. |
| lookup.partial-cache.max-rows | No | LONG | The maximum number of rows to store in the cache |
| lookup.partial-cache.cache-missing-key | No | BOOLEAN | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table |




The following parameters apply only to reading data by using DataStream API.

Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ limitations under the License.
</goals>
</execution>
</executions>
<configuration>
<source>${maven.compiler.source}</source>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package com.starrocks.connector.flink.connection;

import org.apache.flink.annotation.Internal;

import java.sql.Connection;
import java.sql.SQLException;

/**
* connection provider.
Expand All @@ -27,6 +29,17 @@ public interface StarRocksJdbcConnectionIProvider {

Connection reestablishConnection() throws Exception;

boolean isConnectionValid() throws SQLException;

/**
* Get existing connection or establish an new one if there is none.
*
* @return existing connection or newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;

void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,30 @@ public Connection getConnection() throws SQLException, ClassNotFoundException {
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
close();
connection = getConnection();
return getOrEstablishConnection();
}

public boolean isConnectionValid() throws SQLException {
return connection != null && connection.isValid(60);
}

@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (isConnectionValid() && !connection.isClosed() ) {
return connection;
}
try {
Class.forName(jdbcOptions.getCjDriverName());
} catch (ClassNotFoundException ex) {
LOG.warn("can not found class {}, try class {}", jdbcOptions.getCjDriverName(), jdbcOptions.getDriverName());
Class.forName(jdbcOptions.getDriverName());
}
if (jdbcOptions.getUsername().isPresent()) {
connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(),
jdbcOptions.getPassword().orElse(null));
} else {
connection = DriverManager.getConnection(jdbcOptions.getDbURL());
}
return connection;
}

Expand Down
Loading
Loading