Skip to content

Commit

Permalink
StarRocks Source support multiple table
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed Feb 16, 2025
1 parent c9ede75 commit 21f2cfb
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
}

public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) {
eos.set(false);
this.readerOffset = 0;
this.rowBatch = null;
this.seaTunnelRowType = seaTunnelRowType;
Set<Long> tabletIds = partition.getTabletIds();
TScanOpenParams params = new TScanOpenParams();
params.setTablet_ids(new ArrayList<>(tabletIds));
params.setOpaqued_query_plan(partition.getQueryPlan());
params.setCluster(DEFAULT_CLUSTER_NAME);
params.setDatabase(sourceConfig.getDatabase());
params.setTable(sourceConfig.getTable());
params.setDatabase(partition.getDatabase());
params.setTable(partition.getTable());
params.setUser(sourceConfig.getUsername());
params.setPasswd(sourceConfig.getPassword());
params.setBatch_size(sourceConfig.getBatchRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPlan;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

Expand All @@ -39,37 +40,41 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class StarRocksQueryPlanReadClient {
private RetryUtils.RetryMaterial retryMaterial;
private SourceConfig sourceConfig;
private SeaTunnelRowType seaTunnelRowType;
private final HttpHelper httpHelper = new HttpHelper();
private final Map<String, StarRocksSourceTableConfig> tables;

private static final long DEFAULT_SLEEP_TIME_MS = 1000L;

public StarRocksQueryPlanReadClient(
SourceConfig sourceConfig, SeaTunnelRowType seaTunnelRowType) {
public StarRocksQueryPlanReadClient(SourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
this.seaTunnelRowType = seaTunnelRowType;
this.retryMaterial =
new RetryUtils.RetryMaterial(
sourceConfig.getMaxRetries(),
true,
exception -> true,
DEFAULT_SLEEP_TIME_MS);

this.tables =
sourceConfig.getTableConfigList().stream()
.collect(
Collectors.toMap(
StarRocksSourceTableConfig::getTable, Function.identity()));
}

public List<QueryPartition> findPartitions() {
public List<QueryPartition> findPartitions(String table) {
QueryPlan queryPlan = getQueryPlan(genQuerySql(table), table);
List<String> nodeUrls = sourceConfig.getNodeUrls();
QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
return tabletsMapToPartition(
be2Tablets,
queryPlan.getQueryPlan(),
sourceConfig.getDatabase(),
sourceConfig.getTable());
be2Tablets, queryPlan.getQueryPlan(), sourceConfig.getDatabase(), table);
}

private List<QueryPartition> tabletsMapToPartition(
Expand Down Expand Up @@ -134,8 +139,9 @@ private Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan) {
return beXTablets;
}

private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
private QueryPlan getQueryPlan(String querySQL, String table) {

List<String> nodeUrls = sourceConfig.getNodeUrls();
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("sql", querySQL);
String body = JsonUtils.toJsonString(bodyMap);
Expand All @@ -147,7 +153,7 @@ private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
.append("/api/")
.append(sourceConfig.getDatabase())
.append("/")
.append(sourceConfig.getTable())
.append(table)
.append("/_query_plan")
.toString();
try {
Expand Down Expand Up @@ -183,15 +189,17 @@ private Map<String, String> getQueryPlanHttpHeader() {
return headerMap;
}

private String genQuerySql() {
private String genQuerySql(String table) {

StarRocksSourceTableConfig starRocksSourceTableConfig = tables.get(table);
SeaTunnelRowType seaTunnelRowType =
starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType();
String columns =
seaTunnelRowType.getFieldNames().length != 0
? String.join(",", seaTunnelRowType.getFieldNames())
: "*";
String filter =
sourceConfig.getScanFilter().isEmpty()
? ""
: " where " + sourceConfig.getScanFilter();
String scanFilter = starRocksSourceTableConfig.getScanFilter();
String filter = scanFilter.isEmpty() ? "" : " where " + scanFilter;

String sql =
"select "
Expand All @@ -202,7 +210,7 @@ private String genQuerySql() {
+ "`"
+ "."
+ "`"
+ sourceConfig.getTable()
+ table
+ "`"
+ filter;
log.debug("Generate query sql '{}'.", sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Setter
Expand All @@ -49,6 +51,7 @@ public SourceConfig(ReadonlyConfig config) {
key.substring(prefix.length()).toLowerCase(), value);
}
});
this.tableConfigList = StarRocksSourceTableConfig.of(config);
}

private int maxRetries = StarRocksSourceOptions.MAX_RETRIES.defaultValue();
Expand All @@ -59,5 +62,7 @@ public SourceConfig(ReadonlyConfig config) {
private int keepAliveMin = StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = StarRocksSourceOptions.SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT.defaultValue();
private List<StarRocksSourceTableConfig> tableConfigList = new ArrayList<>();

private Map<String, String> sourceOptionProps = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;
import java.util.Map;

public class StarRocksSourceOptions extends StarRocksBaseOptions {
private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;

Expand Down Expand Up @@ -72,4 +77,10 @@ public class StarRocksSourceOptions extends StarRocksBaseOptions {
.stringType()
.noDefaultValue()
.withDescription("The parameter of the scan data from be");

public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("table list config");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;

import lombok.Getter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Getter
public class StarRocksSourceTableConfig implements Serializable {

private final String table;

private final CatalogTable catalogTable;

private final String scanFilter;

private StarRocksSourceTableConfig(
String tableName, CatalogTable catalogTable, String scanFilter) {
this.table = tableName;
this.catalogTable = catalogTable;
this.scanFilter = scanFilter;
}

public static StarRocksSourceTableConfig parseStarRocksSourceConfig(ReadonlyConfig config) {

String table = config.get(StarRocksSourceOptions.TABLE);
String dataBase = config.get(StarRocksSourceOptions.DATABASE);
TablePath tablePath = TablePath.of(dataBase, table);
TableSchema tableSchema = new ReadonlyConfigParser().parse(config);
CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("", tablePath),
tableSchema,
new HashMap<>(),
new ArrayList<>(),
"");

return new StarRocksSourceTableConfig(
table, catalogTable, config.get(StarRocksSourceOptions.SCAN_FILTER));
}

public static List<StarRocksSourceTableConfig> of(ReadonlyConfig config) {

if (config.getOptional(StarRocksSourceOptions.TABLE_LIST).isPresent()) {
List<Map<String, Object>> maps = config.get(StarRocksSourceOptions.TABLE_LIST);
return maps.stream()
.map(ReadonlyConfig::fromMap)
.map(StarRocksSourceTableConfig::parseStarRocksSourceConfig)
.collect(Collectors.toList());
}
return Lists.newArrayList(parseStarRocksSourceConfig(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -77,7 +78,7 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
catalogTable.getTableId().getTableName());
Catalog catalog =
new StarRocksCatalog(
"StarRocks",
StarRocksBaseOptions.CONNECTOR_IDENTITY,
sinkConfig.getUsername(),
sinkConfig.getPassword(),
sinkConfig.getJdbcUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,23 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class StarRocksSource
implements SeaTunnelSource<SeaTunnelRow, StarRocksSourceSplit, StarRocksSourceState> {

private CatalogTable catalogTable;
private SourceConfig sourceConfig;

@Override
public String getPluginName() {
return StarRocksBaseOptions.CONNECTOR_IDENTITY;
}

public StarRocksSource(SourceConfig sourceConfig, CatalogTable catalogTable) {
public StarRocksSource(SourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
this.catalogTable = catalogTable;
}

@Override
Expand All @@ -52,13 +51,14 @@ public Boundedness getBoundedness() {

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(catalogTable);
return sourceConfig.getTableConfigList().stream()
.map(StarRocksSourceTableConfig::getCatalogTable)
.collect(Collectors.toList());
}

@Override
public SourceReader createReader(SourceReader.Context readerContext) {
return new StarRocksSourceReader(
readerContext, catalogTable.getSeaTunnelRowType(), sourceConfig);
return new StarRocksSourceReader(readerContext, sourceConfig);
}

@Override
Expand All @@ -67,15 +67,11 @@ public SourceSplitEnumerator<StarRocksSourceSplit, StarRocksSourceState> restore
StarRocksSourceState checkpointState)
throws Exception {
return new StartRocksSourceSplitEnumerator(
enumeratorContext,
sourceConfig,
catalogTable.getSeaTunnelRowType(),
checkpointState);
enumeratorContext, sourceConfig, checkpointState);
}

@Override
public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) {
return new StartRocksSourceSplitEnumerator(
enumeratorContext, sourceConfig, catalogTable.getSeaTunnelRowType());
return new StartRocksSourceSplitEnumerator(enumeratorContext, sourceConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ public OptionRule optionRule() {
StarRocksSourceOptions.NODE_URLS,
StarRocksSourceOptions.USERNAME,
StarRocksSourceOptions.PASSWORD,
StarRocksSourceOptions.DATABASE,
StarRocksSourceOptions.TABLE,
SinkConnectorCommonOptions.SCHEMA)
StarRocksSourceOptions.DATABASE)
.optional(
SinkConnectorCommonOptions.SCHEMA,
StarRocksSourceOptions.MAX_RETRIES,
StarRocksSourceOptions.QUERY_TABLET_SIZE,
StarRocksSourceOptions.SCAN_FILTER,
Expand All @@ -62,6 +61,7 @@ public OptionRule optionRule() {
StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN,
StarRocksSourceOptions.SCAN_BATCH_ROWS,
StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT)
.exclusive(StarRocksSourceOptions.TABLE, StarRocksSourceOptions.TABLE_LIST)
.build();
}

Expand All @@ -77,7 +77,6 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
SourceConfig starRocksSourceConfig = new SourceConfig(config);
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new StarRocksSource(starRocksSourceConfig, catalogTable);
(SeaTunnelSource<T, SplitT, StateT>) new StarRocksSource(starRocksSourceConfig);
}
}
Loading

0 comments on commit 21f2cfb

Please sign in to comment.