From ef72fc195b12a38db9f79d98556d3f1fef069bd8 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 25 May 2020 14:51:51 +0800 Subject: [PATCH 01/28] add hbase kerbos --- .../flink/sql/side/hbase/HbaseAllReqRow.java | 85 ++++++++++++-- .../sql/side/hbase/HbaseAsyncReqRow.java | 40 ++++++- .../side/hbase/table/HbaseSideTableInfo.java | 48 ++++++++ .../side/hbase/utils/HbaseConfigUtils.java | 108 ++++++++++++++++++ 4 files changed, 272 insertions(+), 9 deletions(-) create mode 100644 hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java diff --git a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java index 63d26d5dd..401ef00c0 100644 --- a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java +++ b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java @@ -25,8 +25,10 @@ import com.dtstack.flink.sql.side.FieldInfo; import com.dtstack.flink.sql.side.JoinInfo; import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; +import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; import org.apache.calcite.sql.JoinType; import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.lang.StringUtils; import org.apache.flink.api.java.typeutils.RowTypeInfo; import com.google.common.collect.Maps; import org.apache.flink.table.runtime.types.CRow; @@ -36,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -44,10 +47,13 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.security.PrivilegedAction; import java.sql.SQLException; import java.sql.Timestamp; import java.util.Calendar; @@ -166,13 +172,44 @@ public void flatMap(CRow input, Collector out) throws Exception { private void loadData(Map> tmpCache) throws SQLException { AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo(); HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo; - Configuration conf = new Configuration(); - conf.set("hbase.zookeeper.quorum", hbaseSideTableInfo.getHost()); - Connection conn = null; - Table table = null; - ResultScanner resultScanner = null; + boolean openKerberos = hbaseSideTableInfo.isKerberosAuthEnable(); + int loadDataCount = 0; try { - conn = ConnectionFactory.createConnection(conf); + conf = HBaseConfiguration.create(); + if (openKerberos) { + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC, hbaseSideTableInfo.getParent()); + + fillSyncKerberosConfig(conf,hbaseSideTableInfo); + LOG.info("hbase.security.authentication:{}", conf.get("hbase.security.authentication")); + LOG.info("hbase.security.authorization:{}", conf.get("hbase.security.authorization")); + LOG.info("hbase.master.keytab.file:{}", conf.get("hbase.master.keytab.file")); + LOG.info("hbase.master.kerberos.principal:{}", conf.get("hbase.master.kerberos.principal")); + LOG.info("hbase.regionserver.keytab.file:{}", conf.get("hbase.regionserver.keytab.file")); + LOG.info("hbase.regionserver.kerberos.principal:{}", conf.get("hbase.regionserver.kerberos.principal")); + + UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, hbaseSideTableInfo.getRegionserverPrincipal(), + hbaseSideTableInfo.getRegionserverKeytabFile()); + + Configuration finalConf = conf; + conn = userGroupInformation.doAs(new PrivilegedAction() { + @Override + public Connection run() { + try { + return ConnectionFactory.createConnection(finalConf); + } catch (IOException e) { + LOG.error("Get connection fail with config:{}", finalConf); + throw new RuntimeException(e); + } + } + }); + + } else { + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC, hbaseSideTableInfo.getParent()); + conn = ConnectionFactory.createConnection(conf); + } + table = conn.getTable(TableName.valueOf(tableName)); resultScanner = table.getScanner(new Scan()); for (Result r : resultScanner) { @@ -187,13 +224,15 @@ private void loadData(Map> tmpCache) throws SQLExcep kv.put(aliasNameInversion.get(key.toString()), value); } + loadDataCount++; tmpCache.put(new String(r.getRow()), kv); } } catch (IOException e) { - LOG.error("", e); + throw new RuntimeException(e); } finally { + LOG.info("load Data count: {}", loadDataCount); try { - if (null != conn && !conn.isClosed()) { + if (null != conn) { conn.close(); } @@ -209,4 +248,34 @@ private void loadData(Map> tmpCache) throws SQLExcep } } } + + private void fillSyncKerberosConfig(Configuration config, HbaseSideTableInfo hbaseSideTableInfo) throws IOException { + String regionserverKeytabFile = hbaseSideTableInfo.getRegionserverKeytabFile(); + if (StringUtils.isEmpty(regionserverKeytabFile)) { + throw new IllegalArgumentException("Must provide regionserverKeytabFile when authentication is Kerberos"); + } + String regionserverKeytabFilePath = System.getProperty("user.dir") + File.separator + regionserverKeytabFile; + LOG.info("regionserverKeytabFilePath:{}", regionserverKeytabFilePath); + config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KEYTAB_FILE, regionserverKeytabFilePath); + config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KEYTAB_FILE, regionserverKeytabFilePath); + + String regionserverPrincipal = hbaseSideTableInfo.getRegionserverPrincipal(); + if (StringUtils.isEmpty(regionserverPrincipal)) { + throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); + } + config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionserverPrincipal); + config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionserverPrincipal); + config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true"); + config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos"); + + if (!StringUtils.isEmpty(hbaseSideTableInfo.getZookeeperSaslClient())) { + System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, hbaseSideTableInfo.getZookeeperSaslClient()); + } + + if (!StringUtils.isEmpty(hbaseSideTableInfo.getSecurityKrb5Conf())) { + String krb5ConfPath = System.getProperty("user.dir") + File.separator + hbaseSideTableInfo.getSecurityKrb5Conf(); + LOG.info("krb5ConfPath:{}", krb5ConfPath); + System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath); + } + } } \ No newline at end of file diff --git a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java index 56f50e27c..d43d1408a 100644 --- a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java +++ b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java @@ -31,6 +31,7 @@ import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer; import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; import com.dtstack.flink.sql.factory.DTThreadFactory; +import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; import com.google.common.collect.Maps; import com.stumbleupon.async.Deferred; import org.apache.commons.lang3.StringUtils; @@ -40,10 +41,13 @@ import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; +import org.hbase.async.Config; import org.hbase.async.HBaseClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; import java.sql.Timestamp; import java.util.Collections; import java.util.List; @@ -93,11 +97,19 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo(); HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo; + ExecutorService executorService =new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new DTThreadFactory("hbase-aysnc")); - hBaseClient = new HBaseClient(hbaseSideTableInfo.getHost(), hbaseSideTableInfo.getParent(), executorService); + Config config = new Config(); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_ASYNC, hbaseSideTableInfo.getParent()); + + if (hbaseSideTableInfo.isKerberosAuthEnable()) { + fillAsyncKerberosConfig(config, hbaseSideTableInfo); + } + hBaseClient = new HBaseClient(config, executorService); try { Deferred deferred = hBaseClient.ensureTableExists(tableName) @@ -166,6 +178,32 @@ public void close() throws Exception { hBaseClient.shutdown(); } + private void fillAsyncKerberosConfig(Config config, HbaseSideTableInfo hbaseSideTableInfo) throws IOException { + AuthUtil.JAASConfig jaasConfig = HbaseConfigUtils.buildJaasConfig(hbaseSideTableInfo); + LOG.info("jaasConfig file:\n {}", jaasConfig.toString()); + String jaasFilePath = AuthUtil.creatJaasFile("JAAS", ".conf", jaasConfig); + config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTH_ENABLE, "true"); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_SASL_CLIENTCONFIG, "Client"); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos"); + + String regionserverPrincipal = hbaseSideTableInfo.getRegionserverPrincipal(); + if (StringUtils.isEmpty(regionserverPrincipal)) { + throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); + } + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL, regionserverPrincipal); + + if (!StringUtils.isEmpty(hbaseSideTableInfo.getZookeeperSaslClient())) { + System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, hbaseSideTableInfo.getZookeeperSaslClient()); + } + + if (!StringUtils.isEmpty(hbaseSideTableInfo.getSecurityKrb5Conf())) { + String krb5ConfPath = System.getProperty("user.dir") + File.separator + hbaseSideTableInfo.getSecurityKrb5Conf(); + LOG.info("krb5ConfPath:{}", krb5ConfPath); + System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath); + } + } + class CheckResult{ diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java index 3cedb2c68..d8b0a62bd 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java @@ -148,6 +148,54 @@ public void setPreRowKey(boolean preRowKey) { this.preRowKey = preRowKey; } + public boolean isKerberosAuthEnable() { + return kerberosAuthEnable; + } + + public void setKerberosAuthEnable(boolean kerberosAuthEnable) { + this.kerberosAuthEnable = kerberosAuthEnable; + } + + public String getRegionserverKeytabFile() { + return regionserverKeytabFile; + } + + public void setRegionserverKeytabFile(String regionserverKeytabFile) { + this.regionserverKeytabFile = regionserverKeytabFile; + } + + public String getRegionserverPrincipal() { + return regionserverPrincipal; + } + + public void setRegionserverPrincipal(String regionserverPrincipal) { + this.regionserverPrincipal = regionserverPrincipal; + } + + public String getJaasPrincipal() { + return jaasPrincipal; + } + + public void setJaasPrincipal(String jaasPrincipal) { + this.jaasPrincipal = jaasPrincipal; + } + + public String getSecurityKrb5Conf() { + return securityKrb5Conf; + } + + public void setSecurityKrb5Conf(String securityKrb5Conf) { + this.securityKrb5Conf = securityKrb5Conf; + } + + public String getZookeeperSaslClient() { + return zookeeperSaslClient; + } + + public void setZookeeperSaslClient(String zookeeperSaslClient) { + this.zookeeperSaslClient = zookeeperSaslClient; + } + @Override public void finish(){ super.finish(); diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java new file mode 100644 index 000000000..09f2ea53d --- /dev/null +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.side.hbase.utils; + +import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; +import com.dtstack.flink.sql.util.AuthUtil; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * + * The utility class of HBase connection + * + * Date: 2019/12/24 + * Company: www.dtstack.com + * @author maqi + */ +public class HbaseConfigUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HbaseConfigUtils.class); + // sync side kerberos + public final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; + public final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; + public final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file"; + public final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; + public final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file"; + public final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; + + // async side kerberos + public final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; + public final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig"; + public final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal"; + + public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC = "zookeeper.znode.parent"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_ASYNC = "hbase.zookeeper.znode.parent"; + + + public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; + + public static final String KEY_JAVA_SECURITY_AUTH_LOGIN_CONF = "java.security.auth.login.config"; + + + public static AuthUtil.JAASConfig buildJaasConfig(HbaseSideTableInfo hbaseSideTableInfo) { + String keytabPath = System.getProperty("user.dir") + File.separator + hbaseSideTableInfo.getRegionserverKeytabFile(); + Map loginModuleOptions = new HashMap<>(); + loginModuleOptions.put("useKeyTab", "true"); + loginModuleOptions.put("useTicketCache", "false"); + loginModuleOptions.put("keyTab", "\"" + keytabPath + "\""); + loginModuleOptions.put("principal", "\"" + hbaseSideTableInfo.getJaasPrincipal() + "\""); + return AuthUtil.JAASConfig.builder().setEntryName("Client") + .setLoginModule("com.sun.security.auth.module.Krb5LoginModule") + .setLoginModuleFlag("required").setLoginModuleOptions(loginModuleOptions).build(); + } + + + public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException { + if (conf == null) { + throw new IllegalArgumentException("kerberos conf can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(principal)) { + throw new IllegalArgumentException("principal can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(keytab)) { + throw new IllegalArgumentException("keytab can not be null"); + } + + conf.set("hadoop.security.authentication", "Kerberos"); + UserGroupInformation.setConfiguration(conf); + + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + } + +} From 1e3237f503754c38d4b89b6ee28d2473d8bf904b Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 25 May 2020 14:54:27 +0800 Subject: [PATCH 02/28] fix missing class --- .../com/dtstack/flink/sql/util/AuthUtil.java | 112 ++++++++++++++++++ .../flink/sql/side/hbase/HbaseAllReqRow.java | 4 + .../sql/side/hbase/HbaseAsyncReqRow.java | 1 + 3 files changed, 117 insertions(+) create mode 100644 core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java diff --git a/core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java b/core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java new file mode 100644 index 000000000..646a11b66 --- /dev/null +++ b/core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.util; + +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility methods for helping with security tasks. + * Date: 2019/12/28 + * Company: www.dtstack.com + * @author maqi + */ +public class AuthUtil { + + public static String creatJaasFile(String prefix, String suffix, JAASConfig jaasConfig) throws IOException { + File krbConf = new File(System.getProperty("user.dir")); + File temp = File.createTempFile(prefix, suffix, krbConf); + temp.deleteOnExit(); + FileUtils.writeStringToFile(temp, jaasConfig.toString()); + return temp.getAbsolutePath(); + } + + + public static class JAASConfig { + private String entryName; + private String loginModule; + private String loginModuleFlag; + private Map loginModuleOptions; + + public JAASConfig(String entryName, String loginModule, String loginModuleFlag, Map loginModuleOptions) { + this.entryName = entryName; + this.loginModule = loginModule; + this.loginModuleFlag = loginModuleFlag; + this.loginModuleOptions = loginModuleOptions; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(entryName).append(" {\n\t") + .append(loginModule).append(" ").append(loginModuleFlag).append("\n\t"); + String[] keys = loginModuleOptions.keySet().toArray(new String[loginModuleOptions.size()]); + for (int i = 0; i < keys.length; i++) { + stringBuilder.append(keys[i]).append("=").append(loginModuleOptions.get(keys[i])); + if (i != keys.length - 1) { + stringBuilder.append("\n\t"); + } else { + stringBuilder.append(";\n"); + } + + } + stringBuilder.append("\n").append("};"); + return stringBuilder.toString(); + } + + public static class Builder { + private String entryName; + private String loginModule; + private String loginModuleFlag; + private Map loginModuleOptions; + + public Builder setEntryName(String entryName) { + this.entryName = entryName; + return this; + } + + public Builder setLoginModule(String loginModule) { + this.loginModule = loginModule; + return this; + } + + public Builder setLoginModuleFlag(String loginModuleFlag) { + this.loginModuleFlag = loginModuleFlag; + return this; + } + + public Builder setLoginModuleOptions(Map loginModuleOptions) { + this.loginModuleOptions = loginModuleOptions; + return this; + } + + public JAASConfig build() { + return new JAASConfig( + entryName, loginModule, loginModuleFlag, loginModuleOptions); + } + } + } +} diff --git a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java index 401ef00c0..c50e1a806 100644 --- a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java +++ b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java @@ -71,6 +71,10 @@ public class HbaseAllReqRow extends BaseAllReqRow { private Map aliasNameInversion; private AtomicReference>> cacheRef = new AtomicReference<>(); + private Connection conn = null; + private Table table = null; + private ResultScanner resultScanner = null; + private Configuration conf = null; public HbaseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, AbstractSideTableInfo sideTableInfo) { super(new HbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo)); diff --git a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java index d43d1408a..f3a1d6691 100644 --- a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java +++ b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java @@ -32,6 +32,7 @@ import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; import com.dtstack.flink.sql.factory.DTThreadFactory; import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; +import com.dtstack.flink.sql.util.AuthUtil; import com.google.common.collect.Maps; import com.stumbleupon.async.Deferred; import org.apache.commons.lang3.StringUtils; From 6bb89495cc5679bd4eea8a8ce5166472a6bf6e64 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 25 May 2020 14:56:59 +0800 Subject: [PATCH 03/28] fill missing field --- .../sql/side/hbase/table/HbaseSideTableInfo.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java index d8b0a62bd..2a5411682 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java @@ -52,6 +52,18 @@ public class HbaseSideTableInfo extends AbstractSideTableInfo { private String tableName; + private boolean kerberosAuthEnable; + + private String regionserverKeytabFile; + + private String regionserverPrincipal; + + private String jaasPrincipal; + + private String securityKrb5Conf; + + private String zookeeperSaslClient; + private String[] columnRealNames; private List columnRealNameList = Lists.newArrayList(); From 907aacb8961d6521c1b91f0fe1b1651318fc2a1b Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 25 May 2020 15:15:36 +0800 Subject: [PATCH 04/28] hbase kerberos sink --- .../sql/sink/hbase/HbaseOutputFormat.java | 33 +++++++++++++++- .../flink/sql/sink/hbase/HbaseSink.java | 39 +++++++++++++++---- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index a3189b0a6..faea0845c 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -62,6 +62,12 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat { private String[] columnTypes; private Map columnNameFamily; + private boolean kerberosAuthEnable; + private String regionserverKeytabFile; + private String regionserverPrincipal; + private String securityKrb5Conf; + private String zookeeperSaslClient; + private String[] families; private String[] qualifiers; @@ -252,6 +258,31 @@ public HbaseOutputFormatBuilder setColumnNameFamily(Map columnNa format.columnNameFamily = columnNameFamily; return this; } + public HbaseOutputFormatBuilder setKerberosAuthEnable(boolean kerberosAuthEnable) { + format.kerberosAuthEnable = kerberosAuthEnable; + return this; + } + + public HbaseOutputFormatBuilder setRegionserverKeytabFile(String regionserverKeytabFile) { + format.regionserverKeytabFile = regionserverKeytabFile; + return this; + } + + public HbaseOutputFormatBuilder setRegionserverPrincipal(String regionserverPrincipal) { + format.regionserverPrincipal = regionserverPrincipal; + return this; + } + + public HbaseOutputFormatBuilder setSecurityKrb5Conf(String securityKrb5Conf) { + format.securityKrb5Conf = securityKrb5Conf; + return this; + } + + public HbaseOutputFormatBuilder setZookeeperSaslClient(String zookeeperSaslClient) { + format.zookeeperSaslClient = zookeeperSaslClient; + return this; + } + public HbaseOutputFormat finish() { Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified"); @@ -267,7 +298,7 @@ public HbaseOutputFormat finish() { String[] columns = keySet.toArray(new String[keySet.size()]); for (int i = 0; i < columns.length; ++i) { String col = columns[i]; - String[] part = StringUtils.split(col, ":"); + String[] part = col.split(":"); families[i] = part[0]; qualifiers[i] = part[1]; } diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java index 13bd98b70..8364b6c10 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java @@ -52,6 +52,15 @@ public class HbaseSink implements RetractStreamTableSink, IStreamSinkGener< protected String tableName; protected String updateMode; protected String rowkey; + protected String registerTabName; + + protected boolean kerberosAuthEnable; + protected String regionserverKeytabFile; + protected String regionserverPrincipal; + protected String securityKrb5Conf; + protected String zookeeperSaslClient; + private int parallelism = -1; + public HbaseSink() { // TO DO NOTHING @@ -66,20 +75,34 @@ public HbaseSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { this.tableName = hbaseTableInfo.getTableName(); this.rowkey = hbaseTableInfo.getRowkey(); this.columnNameFamily = hbaseTableInfo.getColumnNameFamily(); - this.updateMode = hbaseTableInfo.getUpdateMode(); + this.registerTabName = hbaseTableInfo.getName(); + + this.kerberosAuthEnable = hbaseTableInfo.isKerberosAuthEnable(); + this.regionserverKeytabFile = hbaseTableInfo.getRegionserverKeytabFile(); + this.regionserverPrincipal = hbaseTableInfo.getRegionserverPrincipal(); + this.securityKrb5Conf = hbaseTableInfo.getSecurityKrb5Conf(); + this.zookeeperSaslClient = hbaseTableInfo.getZookeeperSaslClient(); + + Integer tmpSinkParallelism = hbaseTableInfo.getParallelism(); + if (tmpSinkParallelism != null) { + this.parallelism = tmpSinkParallelism; + } return this; } @Override public void emitDataStream(DataStream> dataStream) { HbaseOutputFormat.HbaseOutputFormatBuilder builder = HbaseOutputFormat.buildHbaseOutputFormat(); - builder.setHost(this.zookeeperQuorum) - .setZkParent(this.parent) - .setTable(this.tableName) - .setRowkey(rowkey) - .setUpdateMode(updateMode) - .setColumnNames(fieldNames) - .setColumnNameFamily(columnNameFamily); + builder.setHost(this.zookeeperQuorum).setZkParent(this.parent).setTable(this.tableName); + + builder.setRowkey(rowkey); + builder.setColumnNames(fieldNames); + builder.setColumnNameFamily(columnNameFamily); + builder.setKerberosAuthEnable(kerberosAuthEnable); + builder.setRegionserverKeytabFile(regionserverKeytabFile); + builder.setRegionserverPrincipal(regionserverPrincipal); + builder.setSecurityKrb5Conf(securityKrb5Conf); + builder.setZookeeperSaslClient(zookeeperSaslClient); HbaseOutputFormat outputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); From 3a9cb360d3d6c7e54e75170c1ae638c3e154d428 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 25 May 2020 15:18:21 +0800 Subject: [PATCH 05/28] fix missing field --- .../sql/sink/hbase/table/HbaseTableInfo.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java index 36f8e89e5..49df2f757 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java @@ -23,6 +23,8 @@ import com.dtstack.flink.sql.table.AbstractTargetTableInfo; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + import java.util.Map; /** @@ -54,6 +56,18 @@ public class HbaseTableInfo extends AbstractTargetTableInfo { private String updateMode; + private boolean kerberosAuthEnable; + + private String regionserverKeytabFile; + + private String regionserverPrincipal; + + private String securityKrb5Conf; + + private String zookeeperSaslClient; + + private Map hbaseConfig = Maps.newHashMap(); + public HbaseTableInfo(){ setType(CURR_TYPE); } @@ -149,4 +163,53 @@ public String getType() { return super.getType().toLowerCase(); } + + public Map getHbaseConfig() { + return hbaseConfig; + } + + public void setHbaseConfig(Map hbaseConfig) { + this.hbaseConfig = hbaseConfig; + } + + public boolean isKerberosAuthEnable() { + return kerberosAuthEnable; + } + + public void setKerberosAuthEnable(boolean kerberosAuthEnable) { + this.kerberosAuthEnable = kerberosAuthEnable; + } + + public String getRegionserverKeytabFile() { + return regionserverKeytabFile; + } + + public void setRegionserverKeytabFile(String regionserverKeytabFile) { + this.regionserverKeytabFile = regionserverKeytabFile; + } + + public String getRegionserverPrincipal() { + return regionserverPrincipal; + } + + public void setRegionserverPrincipal(String regionserverPrincipal) { + this.regionserverPrincipal = regionserverPrincipal; + } + + public String getSecurityKrb5Conf() { + return securityKrb5Conf; + } + + public void setSecurityKrb5Conf(String securityKrb5Conf) { + this.securityKrb5Conf = securityKrb5Conf; + } + + public String getZookeeperSaslClient() { + return zookeeperSaslClient; + } + + public void setZookeeperSaslClient(String zookeeperSaslClient) { + this.zookeeperSaslClient = zookeeperSaslClient; + } + } From 2f0fdfeec8ddec660618e3574c77226d2b9fc7a4 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 25 May 2020 17:46:25 +0800 Subject: [PATCH 06/28] =?UTF-8?q?hbase=20kerberos=20=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sql/side/hbase/table/HbaseSideParser.java | 14 ++++++++++++++ .../sql/sink/hbase/table/HbaseSinkParser.java | 12 ++++++++++++ 2 files changed, 26 insertions(+) diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java index 80753c40b..832961834 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java @@ -53,6 +53,13 @@ public class HbaseSideParser extends AbstractSideTableParser { public static final String CACHE = "cache"; + public static final String KERBEROS_AUTH_ENABLE_KEY = "kerberosAuthEnable"; + public static final String REGIONSERVER_KEYTAB_FILE_KEY = "regionserverKeytabFile"; + public static final String REGIONSERVER_PRINCIPAL_KEY = "regionserverPrincipal"; + public static final String JAAS_PRINCIPAL_KEY = "jaasPrincipal"; + public static final String SECURITY_KRB5_CONF_KEY = "securityKrb5Conf"; + public static final String ZOOKEEPER_SASL_CLINT_KEY = "zookeeperSaslClient"; + public HbaseSideParser() { addParserHandler(FIELD_KEY, FIELD_PATTERN, this::dealField); } @@ -69,6 +76,13 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map Date: Mon, 25 May 2020 17:56:06 +0800 Subject: [PATCH 07/28] =?UTF-8?q?=E4=BF=9D=E6=8C=81=E4=B8=80=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index faea0845c..3def5cb1d 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -298,7 +298,7 @@ public HbaseOutputFormat finish() { String[] columns = keySet.toArray(new String[keySet.size()]); for (int i = 0; i < columns.length; ++i) { String col = columns[i]; - String[] part = col.split(":"); + String[] part = StringUtils.split(col, ":");; families[i] = part[0]; qualifiers[i] = part[1]; } From 9d80379c90be792d35bdd9ecef20713e72dcb49f Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 27 May 2020 10:56:38 +0800 Subject: [PATCH 08/28] =?UTF-8?q?fix=20=E8=A1=A5=E5=85=85=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sql/sink/hbase/HbaseConfigUtils.java | 79 +++++++++++++++++++ .../sql/sink/hbase/HbaseOutputFormat.java | 56 +++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java new file mode 100644 index 000000000..2be03484b --- /dev/null +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.hbase; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * + * The utility class of HBase connection + * + * Date: 2019/12/24 + * Company: www.dtstack.com + * @author maqi + */ +public class HbaseConfigUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HbaseConfigUtils.class); + // sync side kerberos + public final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; + public final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; + public final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file"; + public final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; + public final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file"; + public final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; + + public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent"; + + + public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; + + public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException { + if (conf == null) { + throw new IllegalArgumentException("kerberos conf can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(principal)) { + throw new IllegalArgumentException("principal can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(keytab)) { + throw new IllegalArgumentException("keytab can not be null"); + } + + conf.set("hadoop.security.authentication", "Kerberos"); + UserGroupInformation.setConfiguration(conf); + + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + } +} diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index 3def5cb1d..36b251d14 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -37,10 +37,13 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.security.PrivilegedAction; import java.util.List; import java.util.Map; import java.util.Set; @@ -91,6 +94,29 @@ public void open(int taskNumber, int numTasks) throws IOException { LOG.warn("---open---"); conn = ConnectionFactory.createConnection(conf); table = conn.getTable(TableName.valueOf(tableName)); + if (kerberosAuthEnable) { + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); + fillSyncKerberosConfig(conf, regionserverKeytabFile, regionserverPrincipal, zookeeperSaslClient, securityKrb5Conf); + + UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, regionserverPrincipal, regionserverKeytabFile); + org.apache.hadoop.conf.Configuration finalConf = conf; + conn = userGroupInformation.doAs(new PrivilegedAction() { + @Override + public Connection run() { + try { + return ConnectionFactory.createConnection(finalConf); + } catch (IOException e) { + LOG.error("Get connection fail with config:{}", finalConf); + throw new RuntimeException(e); + } + } + }); + } else { + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); + conn = ConnectionFactory.createConnection(conf); + } LOG.warn("---open end(get table from hbase) ---"); initMetric(); } @@ -311,5 +337,35 @@ public HbaseOutputFormat finish() { } + private void fillSyncKerberosConfig( org.apache.hadoop.conf.Configuration config, String regionserverKeytabFile, String regionserverPrincipal, + String zookeeperSaslClient, String securityKrb5Conf) throws IOException { + if (StringUtils.isEmpty(regionserverKeytabFile)) { + throw new IllegalArgumentException("Must provide regionserverKeytabFile when authentication is Kerberos"); + } + String regionserverKeytabFilePath = System.getProperty("user.dir") + File.separator + regionserverKeytabFile; + LOG.info("regionserverKeytabFilePath:{}",regionserverKeytabFilePath); + config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KEYTAB_FILE, regionserverKeytabFilePath); + config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KEYTAB_FILE, regionserverKeytabFilePath); + + if (StringUtils.isEmpty(regionserverPrincipal)) { + throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); + } + config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionserverPrincipal); + config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionserverPrincipal); + config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true"); + config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos"); + + + if (!StringUtils.isEmpty(zookeeperSaslClient)) { + System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, zookeeperSaslClient); + } + + if (!StringUtils.isEmpty(securityKrb5Conf)) { + String krb5ConfPath = System.getProperty("user.dir") + File.separator + securityKrb5Conf; + LOG.info("krb5ConfPath:{}", krb5ConfPath); + System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath); + } + } + } From a44c2be5167ad98b86d0036ec66e52db6c7cab7d Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 27 May 2020 11:49:03 +0800 Subject: [PATCH 09/28] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=A1=BA=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index 36b251d14..97c3ad873 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -92,8 +92,6 @@ public void configure(Configuration parameters) { @Override public void open(int taskNumber, int numTasks) throws IOException { LOG.warn("---open---"); - conn = ConnectionFactory.createConnection(conf); - table = conn.getTable(TableName.valueOf(tableName)); if (kerberosAuthEnable) { conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); @@ -117,6 +115,7 @@ public Connection run() { conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); conn = ConnectionFactory.createConnection(conf); } + table = conn.getTable(TableName.valueOf(tableName)); LOG.warn("---open end(get table from hbase) ---"); initMetric(); } From 5d71d4a4fcd82d5637e3f6c4722152b87a82107d Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 27 May 2020 14:07:20 +0800 Subject: [PATCH 10/28] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BC=BA=E5=A4=B1?= =?UTF-8?q?=E7=9A=84=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sql/sink/hbase/HbaseConfigUtils.java | 14 +- .../sql/sink/hbase/HbaseOutputFormat.java | 132 ++++++++++++------ .../flink/sql/sink/hbase/HbaseSink.java | 9 ++ .../sql/sink/hbase/table/HbaseSinkParser.java | 7 + .../sql/sink/hbase/table/HbaseTableInfo.java | 20 +++ 5 files changed, 128 insertions(+), 54 deletions(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java index 2be03484b..57c63d243 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java @@ -18,19 +18,12 @@ package com.dtstack.flink.sql.sink.hbase; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; /** * @@ -52,13 +45,18 @@ public class HbaseConfigUtils { public final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "zookeeper.znode.parent"; + + public final static String KEY_HBASE_CLIENT_KEYTAB_FILE = "hbase.client.keytab.file"; + public final static String KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL = "hbase.client.kerberos.principal"; public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException { + LOG.info("loginAndReturnUGI principal {}",principal); + LOG.info("loginAndReturnUGI keytab {}",keytab); if (conf == null) { throw new IllegalArgumentException("kerberos conf can not be null"); } diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index 97c3ad873..3b17c3551 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -29,8 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -70,6 +69,8 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat { private String regionserverPrincipal; private String securityKrb5Conf; private String zookeeperSaslClient; + private String clientPrincipal; + private String clientKeytabFile; private String[] families; private String[] qualifiers; @@ -78,48 +79,78 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat { private transient Connection conn; private transient Table table; + private transient ChoreService choreService; + @Override public void configure(Configuration parameters) { LOG.warn("---configure---"); conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", host); - if (zkParent != null && !"".equals(zkParent)) { - conf.set("zookeeper.znode.parent", zkParent); - } - LOG.warn("---configure end ---"); } @Override public void open(int taskNumber, int numTasks) throws IOException { LOG.warn("---open---"); - if (kerberosAuthEnable) { - conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); - conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); - fillSyncKerberosConfig(conf, regionserverKeytabFile, regionserverPrincipal, zookeeperSaslClient, securityKrb5Conf); - - UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, regionserverPrincipal, regionserverKeytabFile); - org.apache.hadoop.conf.Configuration finalConf = conf; - conn = userGroupInformation.doAs(new PrivilegedAction() { - @Override - public Connection run() { - try { - return ConnectionFactory.createConnection(finalConf); - } catch (IOException e) { - LOG.error("Get connection fail with config:{}", finalConf); - throw new RuntimeException(e); - } - } - }); - } else { - conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); - conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); - conn = ConnectionFactory.createConnection(conf); - } + openConn(); table = conn.getTable(TableName.valueOf(tableName)); LOG.warn("---open end(get table from hbase) ---"); initMetric(); } + private void openConn(){ + try{ + if (kerberosAuthEnable) { + LOG.info("open kerberos conn"); + openKerberosConn(); + } else { + LOG.info("open conn"); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); + conn = ConnectionFactory.createConnection(conf); + } + }catch (Exception e){ + throw new RuntimeException(e); + } + + } + private void openKerberosConn() throws IOException { + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); + + LOG.info("kerberos config:{}", this.toString()); + Preconditions.checkArgument(!StringUtils.isEmpty(clientPrincipal), " clientPrincipal not null!"); + Preconditions.checkArgument(!StringUtils.isEmpty(clientKeytabFile), " clientKeytabFile not null!"); + + fillSyncKerberosConfig(conf, regionserverPrincipal, zookeeperSaslClient, securityKrb5Conf); + + clientKeytabFile = System.getProperty("user.dir") + File.separator + clientKeytabFile; + clientPrincipal = !StringUtils.isEmpty(clientPrincipal) ? clientPrincipal : regionserverPrincipal; + + conf.set(HbaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE, clientKeytabFile); + conf.set(HbaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL, clientPrincipal); + + UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, clientPrincipal, clientKeytabFile); + org.apache.hadoop.conf.Configuration finalConf = conf; + conn = userGroupInformation.doAs(new PrivilegedAction() { + @Override + public Connection run() { + try { + ScheduledChore authChore = AuthUtil.getAuthChore(finalConf); + if (authChore != null) { + choreService = new ChoreService("hbaseKerberosSink"); + choreService.scheduleChore(authChore); + } + + return ConnectionFactory.createConnection(finalConf); + } catch (IOException e) { + LOG.error("Get connection fail with config:{}", finalConf); + throw new RuntimeException(e); + } + } + }); + } + + + @Override public void writeRecord(Tuple2 tuple2) { Tuple2 tupleTrans = tuple2; @@ -227,7 +258,6 @@ public void close() throws IOException { conn = null; } } - private HbaseOutputFormat() { } @@ -264,11 +294,6 @@ public HbaseOutputFormatBuilder setRowkey(String rowkey) { return this; } - public HbaseOutputFormatBuilder setUpdateMode(String updateMode) { - format.updateMode = updateMode; - return this; - } - public HbaseOutputFormatBuilder setColumnNames(String[] columnNames) { format.columnNames = columnNames; return this; @@ -283,6 +308,7 @@ public HbaseOutputFormatBuilder setColumnNameFamily(Map columnNa format.columnNameFamily = columnNameFamily; return this; } + public HbaseOutputFormatBuilder setKerberosAuthEnable(boolean kerberosAuthEnable) { format.kerberosAuthEnable = kerberosAuthEnable; return this; @@ -308,6 +334,16 @@ public HbaseOutputFormatBuilder setZookeeperSaslClient(String zookeeperSaslClien return this; } + public HbaseOutputFormatBuilder setClientPrincipal(String clientPrincipal) { + format.clientPrincipal = clientPrincipal; + return this; + } + + public HbaseOutputFormatBuilder setClientKeytabFile(String clientKeytabFile) { + format.clientKeytabFile = clientKeytabFile; + return this; + } + public HbaseOutputFormat finish() { Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified"); @@ -323,7 +359,7 @@ public HbaseOutputFormat finish() { String[] columns = keySet.toArray(new String[keySet.size()]); for (int i = 0; i < columns.length; ++i) { String col = columns[i]; - String[] part = StringUtils.split(col, ":");; + String[] part = col.split(":"); families[i] = part[0]; qualifiers[i] = part[1]; } @@ -336,16 +372,8 @@ public HbaseOutputFormat finish() { } - private void fillSyncKerberosConfig( org.apache.hadoop.conf.Configuration config, String regionserverKeytabFile, String regionserverPrincipal, + private void fillSyncKerberosConfig(org.apache.hadoop.conf.Configuration config, String regionserverPrincipal, String zookeeperSaslClient, String securityKrb5Conf) throws IOException { - if (StringUtils.isEmpty(regionserverKeytabFile)) { - throw new IllegalArgumentException("Must provide regionserverKeytabFile when authentication is Kerberos"); - } - String regionserverKeytabFilePath = System.getProperty("user.dir") + File.separator + regionserverKeytabFile; - LOG.info("regionserverKeytabFilePath:{}",regionserverKeytabFilePath); - config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KEYTAB_FILE, regionserverKeytabFilePath); - config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KEYTAB_FILE, regionserverKeytabFilePath); - if (StringUtils.isEmpty(regionserverPrincipal)) { throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); } @@ -366,5 +394,17 @@ private void fillSyncKerberosConfig( org.apache.hadoop.conf.Configuration config } } + @Override + public String toString() { + return "HbaseOutputFormat kerberos{" + + "kerberosAuthEnable=" + kerberosAuthEnable + + ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' + + ", regionserverPrincipal='" + regionserverPrincipal + '\'' + + ", securityKrb5Conf='" + securityKrb5Conf + '\'' + + ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' + + ", clientPrincipal='" + clientPrincipal + '\'' + + ", clientKeytabFile='" + clientKeytabFile + '\'' + + '}'; + } } diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java index 8364b6c10..09f5944b4 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java @@ -59,6 +59,9 @@ public class HbaseSink implements RetractStreamTableSink, IStreamSinkGener< protected String regionserverPrincipal; protected String securityKrb5Conf; protected String zookeeperSaslClient; + + private String clientPrincipal; + private String clientKeytabFile; private int parallelism = -1; @@ -83,6 +86,9 @@ public HbaseSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { this.securityKrb5Conf = hbaseTableInfo.getSecurityKrb5Conf(); this.zookeeperSaslClient = hbaseTableInfo.getZookeeperSaslClient(); + this.clientKeytabFile = hbaseTableInfo.getClientKeytabFile(); + this.clientPrincipal = hbaseTableInfo.getClientPrincipal(); + Integer tmpSinkParallelism = hbaseTableInfo.getParallelism(); if (tmpSinkParallelism != null) { this.parallelism = tmpSinkParallelism; @@ -104,6 +110,9 @@ public void emitDataStream(DataStream> dataStream) { builder.setSecurityKrb5Conf(securityKrb5Conf); builder.setZookeeperSaslClient(zookeeperSaslClient); + builder.setClientPrincipal(clientPrincipal); + builder.setClientKeytabFile(clientKeytabFile); + HbaseOutputFormat outputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); dataStream.addSink(richSinkFunction); diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java index 6374ddfaf..5105e0fc0 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java @@ -61,6 +61,10 @@ public class HbaseSinkParser extends AbstractTableParser { public static final String SECURITY_KRB5_CONF_KEY = "securityKrb5Conf"; public static final String ZOOKEEPER_SASL_CLINT_KEY = "zookeeperSaslClient"; + public static final String CLIENT_PRINCIPAL_KEY = "clientPrincipal"; + public static final String CLIENT_KEYTABFILE_KEY = "clientKeytabFile"; + + @Override protected boolean fieldNameNeedsUpperCase() { return false; @@ -85,6 +89,9 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map hbaseConfig = Maps.newHashMap(); public HbaseTableInfo(){ @@ -212,4 +216,20 @@ public void setZookeeperSaslClient(String zookeeperSaslClient) { this.zookeeperSaslClient = zookeeperSaslClient; } + public String getClientPrincipal() { + return clientPrincipal; + } + + public void setClientPrincipal(String clientPrincipal) { + this.clientPrincipal = clientPrincipal; + } + + public String getClientKeytabFile() { + return clientKeytabFile; + } + + public void setClientKeytabFile(String clientKeytabFile) { + this.clientKeytabFile = clientKeytabFile; + } + } From 1f48d456f5fc236c9be7fcee85e20ea6f64d970b Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 27 May 2020 14:16:25 +0800 Subject: [PATCH 11/28] =?UTF-8?q?=E5=BC=82=E6=AD=A5=E7=BB=B4=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/sql/side/hbase/HbaseAsyncReqRow.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java index f3a1d6691..26bf8ce28 100644 --- a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java +++ b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java @@ -98,6 +98,7 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo(); HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo; + Map hbaseConfig = hbaseSideTableInfo.getHbaseConfig(); ExecutorService executorService =new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, @@ -105,11 +106,18 @@ public void open(Configuration parameters) throws Exception { Config config = new Config(); config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); - config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_ASYNC, hbaseSideTableInfo.getParent()); - - if (hbaseSideTableInfo.isKerberosAuthEnable()) { - fillAsyncKerberosConfig(config, hbaseSideTableInfo); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent()); + HbaseConfigUtils.loadKrb5Conf(hbaseConfig); + hbaseConfig.entrySet().forEach(entity -> { + config.overrideConfig(entity.getKey(), (String) entity.getValue()); + }); + + if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) { + String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig); + String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr); + config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); } + hBaseClient = new HBaseClient(config, executorService); try { From 46763a77dadbb2be3e783238730d529e3524e3fd Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 27 May 2020 14:39:28 +0800 Subject: [PATCH 12/28] =?UTF-8?q?hbase=20kerberos=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/sql/side/hbase/HbaseAllReqRow.java | 50 +----- .../sql/side/hbase/HbaseAsyncReqRow.java | 27 --- .../sql/side/hbase/table/HbaseSideParser.java | 18 +- .../side/hbase/table/HbaseSideTableInfo.java | 11 ++ .../side/hbase/utils/HbaseConfigUtils.java | 167 +++++++++++++++--- 5 files changed, 161 insertions(+), 112 deletions(-) diff --git a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java index c50e1a806..376bccd81 100644 --- a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java +++ b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java @@ -179,22 +179,14 @@ private void loadData(Map> tmpCache) throws SQLExcep boolean openKerberos = hbaseSideTableInfo.isKerberosAuthEnable(); int loadDataCount = 0; try { - conf = HBaseConfiguration.create(); if (openKerberos) { + conf = HbaseConfigUtils.getHadoopConfiguration(hbaseSideTableInfo.getHbaseConfig()); conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); - conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC, hbaseSideTableInfo.getParent()); - - fillSyncKerberosConfig(conf,hbaseSideTableInfo); - LOG.info("hbase.security.authentication:{}", conf.get("hbase.security.authentication")); - LOG.info("hbase.security.authorization:{}", conf.get("hbase.security.authorization")); - LOG.info("hbase.master.keytab.file:{}", conf.get("hbase.master.keytab.file")); - LOG.info("hbase.master.kerberos.principal:{}", conf.get("hbase.master.kerberos.principal")); - LOG.info("hbase.regionserver.keytab.file:{}", conf.get("hbase.regionserver.keytab.file")); - LOG.info("hbase.regionserver.kerberos.principal:{}", conf.get("hbase.regionserver.kerberos.principal")); - - UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, hbaseSideTableInfo.getRegionserverPrincipal(), - hbaseSideTableInfo.getRegionserverKeytabFile()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent()); + String principal = HbaseConfigUtils.getPrincipal(hbaseSideTableInfo.getHbaseConfig()); + String keytab = HbaseConfigUtils.getKeytab(hbaseSideTableInfo.getHbaseConfig()); + UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, principal, keytab); Configuration finalConf = conf; conn = userGroupInformation.doAs(new PrivilegedAction() { @Override @@ -209,8 +201,9 @@ public Connection run() { }); } else { + conf = HbaseConfigUtils.getConfig(hbaseSideTableInfo.getHbaseConfig()); conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); - conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC, hbaseSideTableInfo.getParent()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent()); conn = ConnectionFactory.createConnection(conf); } @@ -253,33 +246,4 @@ public Connection run() { } } - private void fillSyncKerberosConfig(Configuration config, HbaseSideTableInfo hbaseSideTableInfo) throws IOException { - String regionserverKeytabFile = hbaseSideTableInfo.getRegionserverKeytabFile(); - if (StringUtils.isEmpty(regionserverKeytabFile)) { - throw new IllegalArgumentException("Must provide regionserverKeytabFile when authentication is Kerberos"); - } - String regionserverKeytabFilePath = System.getProperty("user.dir") + File.separator + regionserverKeytabFile; - LOG.info("regionserverKeytabFilePath:{}", regionserverKeytabFilePath); - config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KEYTAB_FILE, regionserverKeytabFilePath); - config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KEYTAB_FILE, regionserverKeytabFilePath); - - String regionserverPrincipal = hbaseSideTableInfo.getRegionserverPrincipal(); - if (StringUtils.isEmpty(regionserverPrincipal)) { - throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); - } - config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionserverPrincipal); - config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionserverPrincipal); - config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true"); - config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos"); - - if (!StringUtils.isEmpty(hbaseSideTableInfo.getZookeeperSaslClient())) { - System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, hbaseSideTableInfo.getZookeeperSaslClient()); - } - - if (!StringUtils.isEmpty(hbaseSideTableInfo.getSecurityKrb5Conf())) { - String krb5ConfPath = System.getProperty("user.dir") + File.separator + hbaseSideTableInfo.getSecurityKrb5Conf(); - LOG.info("krb5ConfPath:{}", krb5ConfPath); - System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath); - } - } } \ No newline at end of file diff --git a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java index 26bf8ce28..a6bfaca7a 100644 --- a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java +++ b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java @@ -187,33 +187,6 @@ public void close() throws Exception { hBaseClient.shutdown(); } - private void fillAsyncKerberosConfig(Config config, HbaseSideTableInfo hbaseSideTableInfo) throws IOException { - AuthUtil.JAASConfig jaasConfig = HbaseConfigUtils.buildJaasConfig(hbaseSideTableInfo); - LOG.info("jaasConfig file:\n {}", jaasConfig.toString()); - String jaasFilePath = AuthUtil.creatJaasFile("JAAS", ".conf", jaasConfig); - config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); - config.overrideConfig(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTH_ENABLE, "true"); - config.overrideConfig(HbaseConfigUtils.KEY_HBASE_SASL_CLIENTCONFIG, "Client"); - config.overrideConfig(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos"); - - String regionserverPrincipal = hbaseSideTableInfo.getRegionserverPrincipal(); - if (StringUtils.isEmpty(regionserverPrincipal)) { - throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); - } - config.overrideConfig(HbaseConfigUtils.KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL, regionserverPrincipal); - - if (!StringUtils.isEmpty(hbaseSideTableInfo.getZookeeperSaslClient())) { - System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, hbaseSideTableInfo.getZookeeperSaslClient()); - } - - if (!StringUtils.isEmpty(hbaseSideTableInfo.getSecurityKrb5Conf())) { - String krb5ConfPath = System.getProperty("user.dir") + File.separator + hbaseSideTableInfo.getSecurityKrb5Conf(); - LOG.info("krb5ConfPath:{}", krb5ConfPath); - System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath); - } - } - - class CheckResult{ private boolean connect; diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java index 832961834..03868d618 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java @@ -53,13 +53,6 @@ public class HbaseSideParser extends AbstractSideTableParser { public static final String CACHE = "cache"; - public static final String KERBEROS_AUTH_ENABLE_KEY = "kerberosAuthEnable"; - public static final String REGIONSERVER_KEYTAB_FILE_KEY = "regionserverKeytabFile"; - public static final String REGIONSERVER_PRINCIPAL_KEY = "regionserverPrincipal"; - public static final String JAAS_PRINCIPAL_KEY = "jaasPrincipal"; - public static final String SECURITY_KRB5_CONF_KEY = "securityKrb5Conf"; - public static final String ZOOKEEPER_SASL_CLINT_KEY = "zookeeperSaslClient"; - public HbaseSideParser() { addParserHandler(FIELD_KEY, FIELD_PATTERN, this::dealField); } @@ -76,13 +69,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map entity.getKey().contains(".")) + .map(entity -> hbaseTableInfo.getHbaseConfig().put(entity.getKey(), String.valueOf(entity.getValue()))) + .count(); return hbaseTableInfo; } diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java index 2a5411682..51597d583 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java @@ -50,6 +50,8 @@ public class HbaseSideTableInfo extends AbstractSideTableInfo { private Map columnNameFamily; + private Map hbaseConfig = Maps.newHashMap(); + private String tableName; private boolean kerberosAuthEnable; @@ -208,6 +210,15 @@ public void setZookeeperSaslClient(String zookeeperSaslClient) { this.zookeeperSaslClient = zookeeperSaslClient; } + public Map getHbaseConfig() { + return hbaseConfig; + } + + public void setHbaseConfig(Map hbaseConfig) { + this.hbaseConfig = hbaseConfig; + } + + @Override public void finish(){ super.finish(); diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java index 09f2ea53d..a7708aaae 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java @@ -18,8 +18,6 @@ package com.dtstack.flink.sql.side.hbase.utils; -import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; -import com.dtstack.flink.sql.util.AuthUtil; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -33,7 +31,6 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -50,42 +47,157 @@ public class HbaseConfigUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseConfigUtils.class); // sync side kerberos - public final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; - public final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; - public final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file"; - public final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; - public final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file"; - public final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; + private final static String AUTHENTICATION_TYPE = "Kerberos"; + private final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; + private final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; + private final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; + private final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file"; + private final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file"; + private final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; // async side kerberos - public final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; - public final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig"; - public final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal"; + private final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; + private final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig"; + private final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal"; + private static final String KEY_KEY_TAB = "hbase.keytab"; + private static final String KEY_PRINCIPAL = "hbase.principal"; public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC = "zookeeper.znode.parent"; - public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_ASYNC = "hbase.zookeeper.znode.parent"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent"; - public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; - public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; - + private static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; public static final String KEY_JAVA_SECURITY_AUTH_LOGIN_CONF = "java.security.auth.login.config"; - public static AuthUtil.JAASConfig buildJaasConfig(HbaseSideTableInfo hbaseSideTableInfo) { - String keytabPath = System.getProperty("user.dir") + File.separator + hbaseSideTableInfo.getRegionserverKeytabFile(); - Map loginModuleOptions = new HashMap<>(); - loginModuleOptions.put("useKeyTab", "true"); - loginModuleOptions.put("useTicketCache", "false"); - loginModuleOptions.put("keyTab", "\"" + keytabPath + "\""); - loginModuleOptions.put("principal", "\"" + hbaseSideTableInfo.getJaasPrincipal() + "\""); - return AuthUtil.JAASConfig.builder().setEntryName("Client") - .setLoginModule("com.sun.security.auth.module.Krb5LoginModule") - .setLoginModuleFlag("required").setLoginModuleOptions(loginModuleOptions).build(); + private static final String SP = File.separator; + private static final String KEY_KRB5_CONF = "krb5.conf"; + + + private static List KEYS_KERBEROS_REQUIRED = Arrays.asList( + KEY_HBASE_SECURITY_AUTHENTICATION, + KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, + KEY_HBASE_MASTER_KEYTAB_FILE, + KEY_HBASE_REGIONSERVER_KEYTAB_FILE, + KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL + ); + + private static List ASYNC_KEYS_KERBEROS_REQUIRED = Arrays.asList( + KEY_HBASE_SECURITY_AUTH_ENABLE, + KEY_HBASE_SASL_CLIENTCONFIG, + KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL, + KEY_HBASE_SECURITY_AUTHENTICATION, + KEY_KEY_TAB); + + + public static Configuration getConfig(Map hbaseConfigMap) { + Configuration hConfiguration = HBaseConfiguration.create(); + + for (Map.Entry entry : hbaseConfigMap.entrySet()) { + if (entry.getValue() != null && !(entry.getValue() instanceof Map)) { + hConfiguration.set(entry.getKey(), entry.getValue().toString()); + } + } + return hConfiguration; + } + + public static boolean openKerberos(Map hbaseConfigMap) { + if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHORIZATION)) { + return false; + } + return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION)); + } + + public static boolean asyncOpenKerberos(Map hbaseConfigMap) { + if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTH_ENABLE)) { + return false; + } + return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION)); } + + + public static Configuration getHadoopConfiguration(Map hbaseConfigMap) { + for (String key : KEYS_KERBEROS_REQUIRED) { + if (StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, key))) { + throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key)); + } + } + loadKrb5Conf(hbaseConfigMap); + + Configuration conf = new Configuration(); + if (hbaseConfigMap == null) { + return conf; + } + + hbaseConfigMap.forEach((key, val) -> { + if (val != null) { + conf.set(key, val.toString()); + } + }); + + return conf; + } + + public static String getPrincipal(Map hbaseConfigMap) { + String principal = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KERBEROS_PRINCIPAL); + if (StringUtils.isNotEmpty(principal)) { + return principal; + } + + throw new IllegalArgumentException(""); + } + + public static String getKeytab(Map hbaseConfigMap) { + String keytab = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KEYTAB_FILE); + if (StringUtils.isNotEmpty(keytab)) { + return keytab; + } + + throw new IllegalArgumentException(""); + } + + public static void loadKrb5Conf(Map kerberosConfig) { + String krb5FilePath = MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF); + if (!org.apache.commons.lang.StringUtils.isEmpty(krb5FilePath)) { + System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);; + } + } + + public static String creatJassFile(String configStr) throws IOException { + String fileName = System.getProperty("user.dir"); + File krbConf = new File(fileName); + File temp = File.createTempFile("JAAS", ".conf", krbConf); + temp.deleteOnExit(); + BufferedWriter out = new BufferedWriter(new FileWriter(temp, false)); + out.write(configStr + "\n"); + out.close(); + return temp.getAbsolutePath(); + } + + public static String buildJaasStr(Map kerberosConfig) { + for (String key : ASYNC_KEYS_KERBEROS_REQUIRED) { + if (StringUtils.isEmpty(MapUtils.getString(kerberosConfig, key))) { + throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key)); + } + } + + String keyTab = MapUtils.getString(kerberosConfig, KEY_KEY_TAB); + String principal = MapUtils.getString(kerberosConfig, KEY_PRINCIPAL); + + StringBuilder jaasSB = new StringBuilder("Client {\n" + + " com.sun.security.auth.module.Krb5LoginModule required\n" + + " useKeyTab=true\n" + + " useTicketCache=false\n"); + jaasSB.append(" keyTab=\"").append(keyTab).append("\"").append("\n"); + jaasSB.append(" principal=\"").append(principal).append("\"").append(";\n"); + jaasSB.append("};"); + return jaasSB.toString(); + } + + + public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException { if (conf == null) { throw new IllegalArgumentException("kerberos conf can not be null"); @@ -104,5 +216,4 @@ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); } - } From 44be6d0bcc44e160c422dbacf6ac2cb1f5e9a587 Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 3 Jun 2020 12:43:58 +0800 Subject: [PATCH 13/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=A9=BA=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index 3b17c3551..3f18a3a20 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -165,7 +165,7 @@ public void writeRecord(Tuple2 tuple2) { protected void dealInsert(Row record) { Put put = getPutByRow(record); - if (put == null) { + if (put.isEmpty()) { return; } From 28ae8889c12cda67769fe59be5e657d8cc364af3 Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 3 Jun 2020 14:05:59 +0800 Subject: [PATCH 14/28] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=A9=BA=E7=BD=AE?= =?UTF-8?q?=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index 3f18a3a20..acb6b5448 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -165,7 +165,7 @@ public void writeRecord(Tuple2 tuple2) { protected void dealInsert(Row record) { Put put = getPutByRow(record); - if (put.isEmpty()) { + if (put == null || put.isEmpty()) { return; } From 9371b9d6244c74c00932df023375225ba2a568df Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 3 Jun 2020 14:32:30 +0800 Subject: [PATCH 15/28] =?UTF-8?q?=E8=AE=B0=E5=BD=95=E8=84=8F=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index acb6b5448..a6a889cc4 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -166,12 +166,13 @@ public void writeRecord(Tuple2 tuple2) { protected void dealInsert(Row record) { Put put = getPutByRow(record); if (put == null || put.isEmpty()) { + outDirtyRecords.inc(); return; } try { table.put(put); - } catch (IOException e) { + } catch (Exception e) { if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) { LOG.error("record insert failed ..{}", record.toString()); LOG.error("", e); From baafc80800913455d35cb80aa5c6b32d4f005af7 Mon Sep 17 00:00:00 2001 From: dapeng Date: Wed, 3 Jun 2020 20:13:39 +0800 Subject: [PATCH 16/28] =?UTF-8?q?kerberos=20=E8=AF=AD=E5=8F=A5=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/config.md | 5 +++ docs/plugin/hbaseSide.md | 80 +++++++++++++++++++++++++++++++++++++++- docs/plugin/hbaseSink.md | 64 ++++++++++++++++++++++++++++++-- 3 files changed, 145 insertions(+), 4 deletions(-) diff --git a/docs/config.md b/docs/config.md index 9aa8df994..27c4244e7 100644 --- a/docs/config.md +++ b/docs/config.md @@ -46,6 +46,11 @@ sh submit.sh -key1 val1 -key2 val2 * 描述:扩展jar路径,当前主要是UDF定义的jar; * 必选:否 * 默认值:无 + +* **addShipfile** + * 描述:扩展上传的文件,比如开启;Kerberos认证需要的keytab文件和krb5.conf文件 + * 必选:否 + * 默认值:无 * **confProp** * 描述:一些参数设置 diff --git a/docs/plugin/hbaseSide.md b/docs/plugin/hbaseSide.md index e590b02e7..29dc60bf9 100644 --- a/docs/plugin/hbaseSide.md +++ b/docs/plugin/hbaseSide.md @@ -43,7 +43,14 @@ | tableName | hbase 的表名称|是|| | cache | 维表缓存策略(NONE/LRU)|否|NONE| | partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)|否|false| - +|kerberosAuthEnable | 是否开启kerberos认证|否|false| +|regionserverPrincipal | regionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取|否|| +|clientKeytabFile|client的keytab 文件|否| +|clientPrincipal|client的principal|否|| +|zookeeperSaslClient | zookeeper.sasl.client值|否|true| +|securityKrb5Conf | java.security.krb5.conf值|否|| + 另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf + 同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看[命令参数说明](../config.md) -------------- ## 5.样例 @@ -168,4 +175,75 @@ into sideTable b on a.id=b.rowkey1 and a.name = b.rowkey2; ``` +### kerberos维表示例 +``` +CREATE TABLE MyTable( + name varchar, + channel varchar, + pv INT, + xctime bigint +)WITH( + type ='kafka11', + bootstrapServers ='172.16.8.107:9092', + zookeeperQuorum ='172.16.8.107:2181/kafka', + offsetReset ='latest', + topic ='es_test', + timezone='Asia/Shanghai', + updateMode ='append', + enableKeyPartitions ='false', + topicIsPattern ='false', + parallelism ='1' +); + +CREATE TABLE MyResult( + name varchar, + channel varchar +)WITH( + type ='mysql', + url ='jdbc:mysql://172.16.10.45:3306/test', + userName ='dtstack', + password ='abc123', + tableName ='myresult', + updateMode ='append', + parallelism ='1', + batchSize ='100', + batchWaitInterval ='1000' +); + +CREATE TABLE sideTable( + cf:name varchar as name, + cf:info varchar as info, + PRIMARY KEY(md5(name) +'test') , + PERIOD FOR SYSTEM_TIME +)WITH( + type ='hbase', + zookeeperQuorum ='172.16.10.104:2181,172.16.10.224:2181,172.16.10.252:2181', + zookeeperParent ='/hbase', + tableName ='workerinfo', + partitionedJoin ='false', + cache ='LRU', + cacheSize ='10000', + cacheTTLMs ='60000', + asyncTimeoutNum ='0', + parallelism ='1', + kerberosAuthEnable='true', + regionserverPrincipal='hbase/_HOST@DTSTACK.COM', + clientKeytabFile='test.keytab', + clientPrincipal='test@DTSTACK.COM', + securityKrb5Conf='krb5.conf', +); +insert into + MyResult +select + b.name as name, + a.channel + +from + MyTable a + +join + sideTable b + +on a.channel=b.name +``` diff --git a/docs/plugin/hbaseSink.md b/docs/plugin/hbaseSink.md index ef1be339b..5006f11a2 100644 --- a/docs/plugin/hbaseSink.md +++ b/docs/plugin/hbaseSink.md @@ -37,9 +37,17 @@ hbase2.0 |rowkey | hbase的rowkey关联的列信息,多个值以逗号隔开|是|| |updateMode|APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新|否|APPEND| |parallelism | 并行度设置|否|1| - - +|kerberosAuthEnable | 是否开启kerberos认证|否|false| +|regionserverPrincipal | regionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取|否|| +|clientKeytabFile|client的keytab 文件|否| +|clientPrincipal|client的principal|否|| +|zookeeperSaslClient | zookeeper.sasl.client值|否|true| +|securityKrb5Conf | java.security.krb5.conf值|否|| + 另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf + 同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看[命令参数说明](../config.md) ## 5.样例: + +### 普通结果表语句示例 ``` CREATE TABLE MyTable( name varchar, @@ -78,9 +86,59 @@ into channel, name from - MyTable a + MyTable a + + ``` +### kerberos认证结果表语句示例 +``` +CREATE TABLE MyTable( + name varchar, + channel varchar, + age int + )WITH( + type ='kafka10', + bootstrapServers ='172.16.8.107:9092', + zookeeperQuorum ='172.16.8.107:2181/kafka', + offsetReset ='latest', + topic ='mqTest01', + timezone='Asia/Shanghai', + updateMode ='append', + enableKeyPartitions ='false', + topicIsPattern ='false', + parallelism ='1' + ); + +CREATE TABLE MyResult( + cf:name varchar , + cf:channel varchar + )WITH( + type ='hbase', + zookeeperQuorum ='cdh2.cdhsite:2181,cdh4.cdhsite:2181', + zookeeperParent ='/hbase', + tableName ='myresult', + partitionedJoin ='false', + parallelism ='1', + rowKey='name', + kerberosAuthEnable='true', + regionserverPrincipal='hbase/_HOST@DTSTACK.COM', + clientKeytabFile='test.keytab', + clientPrincipal='test@DTSTACK.COM', + securityKrb5Conf='krb5.conf', + ); + +insert +into + MyResult + select + channel, + name + from + MyTable a + +``` + ## 6.hbase数据 ### 数据内容说明 hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'-'连接 From a923fff36b088b48cb79424962f4e3c153d3ba3b Mon Sep 17 00:00:00 2001 From: dapeng Date: Thu, 4 Jun 2020 21:08:11 +0800 Subject: [PATCH 17/28] =?UTF-8?q?=E6=B7=BB=E5=8A=A0addshipfile=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/dtstack/flink/sql/option/Options.java | 13 +++++++++++++ .../launcher/perjob/PerJobClusterClientBuilder.java | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/core/src/main/java/com/dtstack/flink/sql/option/Options.java b/core/src/main/java/com/dtstack/flink/sql/option/Options.java index ba6296d1e..b7d425a53 100644 --- a/core/src/main/java/com/dtstack/flink/sql/option/Options.java +++ b/core/src/main/java/com/dtstack/flink/sql/option/Options.java @@ -72,6 +72,10 @@ public class Options { @OptionRequired(description = "log level") private String logLevel = "info"; + @OptionRequired(description = "file add to ship file") + private String addShipfile; + + public String getMode() { return mode; } @@ -183,4 +187,13 @@ public String getLogLevel() { public void setLogLevel(String logLevel) { this.logLevel = logLevel; } + + public String getAddShipfile() { + return addShipfile; + } + + public void setAddShipfile(String addShipfile) { + this.addShipfile = addShipfile; + } + } diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java index 5dc74900e..9d99f2928 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java @@ -106,6 +106,13 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ } else { throw new RuntimeException("The Flink jar path is null"); } + // add user customized file to shipfile + if (!StringUtils.isBlank(launcherOptions.getAddShipfile())) { + List paths = ConfigParseUtil.parsePathFromStr(launcherOptions.getAddShipfile()); + paths.forEach(path -> { + shipFiles.add(new File(path)); + }); + } // classpath , all node need contain plugin jar String pluginLoadMode = launcherOptions.getPluginLoadMode(); if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) { From 4b462d842256fcb49e99c31f1e183715a2ba2537 Mon Sep 17 00:00:00 2001 From: dapeng Date: Thu, 4 Jun 2020 21:10:26 +0800 Subject: [PATCH 18/28] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=BC=BA=E5=A4=B1?= =?UTF-8?q?=E7=9A=84=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sql/launcher/perjob/ConfigParseUtil.java | 32 ++++++++----------- .../perjob/PerJobClusterClientBuilder.java | 19 ++++++----- 2 files changed, 25 insertions(+), 26 deletions(-) rename clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java => launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java (55%) diff --git a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java similarity index 55% rename from clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java rename to launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java index 29bf54798..5acdb59a1 100644 --- a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java @@ -16,31 +16,27 @@ * limitations under the License. */ -package com.dtstack.flink.sql.sink.clickhouse; +package com.dtstack.flink.sql.launcher.perjob; -import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; +import org.apache.commons.io.Charsets; -import java.util.Optional; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.Arrays; +import java.util.List; /** - * Date: 2020/1/15 + * Date: 2019/12/28 * Company: www.dtstack.com * @author maqi */ -public class ClickhouseDialect implements JDBCDialect { +public class ConfigParseUtil { - @Override - public boolean canHandle(String url) { - return url.startsWith("jdbc:clickhouse:"); - } - - @Override - public Optional defaultDriverName() { - return Optional.of("ru.yandex.clickhouse.ClickHouseDriver"); - } - - @Override - public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) { - throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode"); + public static List parsePathFromStr(String pathStr) throws UnsupportedEncodingException { + String addjarPath = URLDecoder.decode(pathStr, Charsets.UTF_8.toString()); + if (addjarPath.length() > 2) { + addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"",""); + } + return Arrays.asList(addjarPath.split(",")); } } diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java index 9d99f2928..84a4e23a3 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; @@ -82,7 +83,7 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC } public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph) - throws MalformedURLException { + throws MalformedURLException, UnsupportedEncodingException { String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf(); AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf); @@ -106,13 +107,7 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ } else { throw new RuntimeException("The Flink jar path is null"); } - // add user customized file to shipfile - if (!StringUtils.isBlank(launcherOptions.getAddShipfile())) { - List paths = ConfigParseUtil.parsePathFromStr(launcherOptions.getAddShipfile()); - paths.forEach(path -> { - shipFiles.add(new File(path)); - }); - } + // classpath , all node need contain plugin jar String pluginLoadMode = launcherOptions.getPluginLoadMode(); if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) { @@ -125,6 +120,14 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ + " Currently only classpath and shipfile are supported."); } + // add user customized file to shipfile + if (!StringUtils.isBlank(launcherOptions.getAddShipfile())) { + List paths = ConfigParseUtil.parsePathFromStr(launcherOptions.getAddShipfile()); + paths.forEach(path -> { + shipFiles.add(new File(path)); + }); + } + clusterDescriptor.addShipFiles(shipFiles); clusterDescriptor.setName(launcherOptions.getName()); String queue = launcherOptions.getQueue(); From 0b85867a644e3d1ff6990f1506466c0bcb827e5c Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 8 Jun 2020 18:04:50 +0800 Subject: [PATCH 19/28] =?UTF-8?q?redis=20=20sink=20=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sql/sink/redis/RedisOutputFormat.java | 79 +++++++------------ .../flink/sql/sink/redis/enums/RedisType.java | 33 ++++++++ 2 files changed, 62 insertions(+), 50 deletions(-) create mode 100644 redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java index ae4fe5a4b..ab97cf60e 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java @@ -19,6 +19,8 @@ package com.dtstack.flink.sql.sink.redis; import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; +import com.dtstack.flink.sql.sink.redis.enums.RedisType; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -35,11 +37,7 @@ import java.io.Closeable; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; /** * @author yanxi @@ -49,7 +47,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat { private String url; - private String database; + private String database = "0"; private String tableName; @@ -71,7 +69,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat { protected List primaryKeys; - protected int timeout; + protected int timeout = 10000; private JedisPool pool; @@ -121,29 +119,21 @@ private void establishConnection() { String[] ipPortPair = StringUtils.split(ipPort, ":"); addresses.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim()))); } - if (timeout == 0){ - timeout = 10000; - } - if (database == null) - { - database = "0"; - } - switch (redisType){ - //单机 - case 1: + switch (RedisType.parse(redisType)){ + case STANDALONE: pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database)); jedis = pool.getResource(); break; - //哨兵 - case 2: + case SENTINEL: jedisSentinelPool = new JedisSentinelPool(masterName, ipPorts, poolConfig, timeout, password, Integer.parseInt(database)); jedis = jedisSentinelPool.getResource(); break; - //集群 - case 3: + case CLUSTER: jedis = new JedisCluster(addresses, timeout, timeout, 10, password, poolConfig); + break; default: + throw new RuntimeException("unsupport redis type[ " + redisType + "]"); } } @@ -158,36 +148,14 @@ public void writeRecord(Tuple2 record) throws IOException { if (row.getArity() != fieldNames.length) { return; } - - HashMap map = new HashMap<>(8); - for (String primaryKey : primaryKeys) { - for (int i = 0; i < fieldNames.length; i++) { - if (fieldNames[i].equals(primaryKey)) { - map.put(primaryKey, i); - } - } - } - - List kvList = new LinkedList<>(); - for (String primaryKey : primaryKeys){ - StringBuilder primaryKv = new StringBuilder(); - int index = map.get(primaryKey).intValue(); - primaryKv.append(primaryKey).append(":").append(row.getField(index)); - kvList.add(primaryKv.toString()); - } - - String perKey = String.join(":", kvList); - for (int i = 0; i < fieldNames.length; i++) { - StringBuilder key = new StringBuilder(); - key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]); - - String value = "null"; - Object field = row.getField(i); - if (field != null) { - value = field.toString(); - } - jedis.set(key.toString(), value); + Map refData = Maps.newHashMap(); + for(int i = 0; i < fieldNames.length; i++){ + refData.put(fieldNames[i], row.getField(i)); } + String redisKey = buildCacheKey(refData); + refData.entrySet().forEach(e ->{ + jedis.hset(redisKey, e.getKey(), String.valueOf(e.getValue())); + }); if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){ LOG.info(record.toString()); @@ -211,6 +179,17 @@ public void close() throws IOException { } + public String buildCacheKey(Map refData) { + StringBuilder keyBuilder = new StringBuilder(tableName); + for(String primaryKey : primaryKeys){ + if(!refData.containsKey(primaryKey)){ + return null; + } + keyBuilder.append("_").append(refData.get(primaryKey)); + } + return keyBuilder.toString(); + } + public static RedisOutputFormatBuilder buildRedisOutputFormat(){ return new RedisOutputFormatBuilder(); } diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java new file mode 100644 index 000000000..7a4054dfc --- /dev/null +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java @@ -0,0 +1,33 @@ +package com.dtstack.flink.sql.sink.redis.enums; + +public enum RedisType { + /** + * 单机 + */ + STANDALONE(1), + /** + * 哨兵 + */ + SENTINEL(2), + /** + * 集群 + */ + CLUSTER(3); + int type; + RedisType(int type){ + this.type = type; + } + + public int getType(){ + return type; + } + + public static RedisType parse(int redisType){ + for(RedisType type : RedisType.values()){ + if(type.getType() == redisType){ + return type; + } + } + throw new RuntimeException("unsupport redis type["+ redisType + "]"); + } +} From eee1867ec18dd68f8527b2180cb0592b548dae12 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 8 Jun 2020 21:27:39 +0800 Subject: [PATCH 20/28] =?UTF-8?q?=E6=96=87=E6=A1=A3=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/plugin/redisSink.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/plugin/redisSink.md b/docs/plugin/redisSink.md index 103cb4997..eb0109f06 100644 --- a/docs/plugin/redisSink.md +++ b/docs/plugin/redisSink.md @@ -94,7 +94,8 @@ redis5.0 ## 6.redis完整样例 ### redis数据说明 -redis使用k-v格式存储,key的构建格式为tableName:privateKey:privateKeyValue:columnName, value=columnValue +redis使用散列类型 hash 数据结构,key=tableName_primaryKey1_primaryKey2,value={column1=value1, column2=value2} +如果以班级class表为例,id和name作为联合主键,那么redis的结构为 ### 源表数据内容 ``` @@ -103,10 +104,10 @@ redis使用k-v格式存储,key的构建格式为tableName:privateKey:privateKe ### redis实际数据内容 ``` 127.0.0.1:6379> keys * -1) "resultTable:name:roc:name" -2) "resultTable:name:roc:channel" -127.0.0.1:6379> get "resultTable:name:roc:name" -"roc" -127.0.0.1:6379> get "resultTable:name:roc:channel" -"daishu" +1) "resultTable_roc" +127.0.0.1:6379> hgetall resultTable_roc +1) "channel" +2) "daishu" +3) "name" +4) "roc" ``` \ No newline at end of file From 512320f7604267e908999f6b2ade01e1f4685f1e Mon Sep 17 00:00:00 2001 From: dapeng Date: Fri, 12 Jun 2020 15:09:52 +0800 Subject: [PATCH 21/28] =?UTF-8?q?=E4=BF=AE=E6=94=B9fastjson=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E4=B8=BA1.2.70?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- launcher/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/launcher/pom.xml b/launcher/pom.xml index ea921d87a..df9e8ae29 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -38,7 +38,7 @@ com.alibaba fastjson - 1.2.7 + 1.2.70 From eb5a0c8d2ce8e378bd5fa49e0f494a9b20cde090 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 22 Jun 2020 10:20:15 +0800 Subject: [PATCH 22/28] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=BA=90=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=BA=A2=E5=87=BA=E4=BF=9D=E6=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sql/side/rdb/async/RdbAsyncReqRow.java | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java index 2374e4d2a..518f457a6 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java @@ -121,35 +121,41 @@ private void connectWithRetry(Map inputParams, CRow input, Resul AtomicLong failCounter = new AtomicLong(0); AtomicBoolean finishFlag = new AtomicBoolean(false); while(!finishFlag.get()){ - CountDownLatch latch = new CountDownLatch(1); - rdbSqlClient.getConnection(conn -> { - try { - if(conn.failed()){ - connectionStatus.set(false); - if(failCounter.getAndIncrement() % 1000 == 0){ - LOG.error("getConnection error", conn.cause()); - } - if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){ - resultFuture.completeExceptionally(conn.cause()); - finishFlag.set(true); + try{ + CountDownLatch latch = new CountDownLatch(1); + rdbSqlClient.getConnection(conn -> { + try { + if(conn.failed()){ + connectionStatus.set(false); + if(failCounter.getAndIncrement() % 1000 == 0){ + LOG.error("getConnection error", conn.cause()); + } + if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){ + resultFuture.completeExceptionally(conn.cause()); + finishFlag.set(true); + } + return; } - return; + connectionStatus.set(true); + ScheduledFuture timerFuture = registerTimer(input, resultFuture); + cancelTimerWhenComplete(resultFuture, timerFuture); + handleQuery(conn.result(), inputParams, input, resultFuture); + finishFlag.set(true); + } catch (Exception e) { + dealFillDataError(input, resultFuture, e); + } finally { + latch.countDown(); } - connectionStatus.set(true); - ScheduledFuture timerFuture = registerTimer(input, resultFuture); - cancelTimerWhenComplete(resultFuture, timerFuture); - handleQuery(conn.result(), inputParams, input, resultFuture); - finishFlag.set(true); - } catch (Exception e) { - dealFillDataError(input, resultFuture, e); - } finally { - latch.countDown(); + }); + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("", e); } - }); - try { - latch.await(); - } catch (InterruptedException e) { - LOG.error("", e); + + } catch (Exception e){ + //数据源队列溢出情况 + connectionStatus.set(false); } if(!finishFlag.get()){ try { From 8a3d6efc5fac41a52e5994b5d8a3a3fc45774ef8 Mon Sep 17 00:00:00 2001 From: gkd Date: Tue, 23 Jun 2020 15:03:56 +0800 Subject: [PATCH 23/28] =?UTF-8?q?rdb=20float=E6=95=B0=E6=8D=AE=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E8=BD=AC=E6=8D=A2=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java | 2 +- .../com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java index a41cad5ef..3813386bc 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java @@ -106,7 +106,7 @@ public static void setField(PreparedStatement upload, int type, Object field, in break; case java.sql.Types.FLOAT: case java.sql.Types.DOUBLE: - upload.setDouble(index + 1, (double) field); + upload.setDouble(index + 1, Double.parseDouble(field.toString())); break; case java.sql.Types.DECIMAL: case java.sql.Types.NUMERIC: diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java index 3559d4376..7c3ff4b09 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java @@ -118,11 +118,11 @@ public void executeUpdate(Connection connection) { } catch (SQLException e1) { throw new RuntimeException(e1); } - metricOutputFormat.outDirtyRecords.inc(); if (metricOutputFormat.outDirtyRecords.getCount() % DIRTYDATA_PRINT_FREQUENTY == 0 || LOG.isDebugEnabled()) { LOG.error("record insert failed ,this row is {}", row.toString()); LOG.error("", e); } + metricOutputFormat.outDirtyRecords.inc(); } }); rows.clear(); From 81b35d39d7162356bb7dd61abb734fe8a538f356 Mon Sep 17 00:00:00 2001 From: tiezhu Date: Wed, 24 Jun 2020 10:35:05 +0800 Subject: [PATCH 24/28] fix redmine-27342 --- .../flink/sql/side/rdb/all/AbstractRdbAllReqRow.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java index 082c546e9..23b7b19d4 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java @@ -93,7 +93,6 @@ protected void initCache() throws SQLException { protected void reloadCache() { //reload cacheRef and replace to old cacheRef Map>> newCache = Maps.newConcurrentMap(); - cacheRef.set(newCache); try { loadData(newCache); } catch (SQLException e) { @@ -123,6 +122,11 @@ public void flatMap(CRow value, Collector out) throws Exception { List> cacheList = cacheRef.get().get(cacheKey); if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) { out.collect(new CRow(fillData(value.row(), null), value.change())); + return; + } + + if (CollectionUtils.isEmpty(cacheList)) { + return; } cacheList.forEach(one -> out.collect(new CRow(fillData(value.row(), one), value.change()))); From 3a3bf1c909e463e45a483c6b5241f9ab5aee38ea Mon Sep 17 00:00:00 2001 From: tiezhu Date: Wed, 24 Jun 2020 14:58:11 +0800 Subject: [PATCH 25/28] fix redmine 27343 --- .../java/com/dtstack/flink/sql/parser/InsertSqlParser.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java index e9d8cc179..2cacd01d4 100644 --- a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package com.dtstack.flink.sql.parser; @@ -153,6 +152,7 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){ /** * 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题 + * 仅对 table.xx 这种类型的字段进行替换 * @param selectList select Node 的 select 字段 * @param sqlSelect 第一层解析出来的 selectNode */ @@ -160,7 +160,8 @@ private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelec SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition()); for (int index = 0; index < selectList.size(); index++) { - if (selectList.get(index).getKind().equals(SqlKind.AS)) { + if (selectList.get(index).getKind().equals(SqlKind.AS) + || ((SqlIdentifier) selectList.get(index)).names.size() == 1) { sqlNodes.add(selectList.get(index)); continue; } From 5656872f481d1420c675a5c75a3ec6159868a746 Mon Sep 17 00:00:00 2001 From: wuren Date: Fri, 3 Jul 2020 15:54:42 +0800 Subject: [PATCH 26/28] fix create view with cep --- .gitignore | 1 + .../dtstack/flink/sql/parser/CreateTmpTableParser.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index a32168f1a..d7842e1ed 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ target/ *.eclipse.* *.iml plugins/ +sqlplugins/ lib/ .vertx/ .DS_Store diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java index 9ec5dcc0c..c0b55f7ee 100644 --- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java @@ -22,11 +22,7 @@ import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.calcite.config.Lex; -import org.apache.calcite.sql.SqlBasicCall; -import org.apache.calcite.sql.SqlJoin; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.*; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import com.google.common.collect.Lists; @@ -164,6 +160,10 @@ private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserRes parseNode(unionRight, sqlParseResult); } break; + case MATCH_RECOGNIZE: + SqlMatchRecognize node = (SqlMatchRecognize) sqlNode; + sqlParseResult.addSourceTable(node.getTableRef().toString()); + break; default: //do nothing break; From e0a10435dcb243a911c0405daebc6aa667d5119d Mon Sep 17 00:00:00 2001 From: xuchao Date: Mon, 6 Jul 2020 15:43:27 +0800 Subject: [PATCH 27/28] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=87=BA=E7=8E=B0=E7=9A=84=E7=B1=BB=E4=B8=A2?= =?UTF-8?q?=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sink/clickhouse/ClickhouseDialect.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java diff --git a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java b/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java new file mode 100644 index 000000000..29bf54798 --- /dev/null +++ b/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.clickhouse; + +import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; + +import java.util.Optional; + +/** + * Date: 2020/1/15 + * Company: www.dtstack.com + * @author maqi + */ +public class ClickhouseDialect implements JDBCDialect { + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:clickhouse:"); + } + + @Override + public Optional defaultDriverName() { + return Optional.of("ru.yandex.clickhouse.ClickHouseDriver"); + } + + @Override + public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) { + throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode"); + } +} From 27c0f8b4428c664b65492617003cca3551a4bee2 Mon Sep 17 00:00:00 2001 From: dapeng Date: Mon, 13 Jul 2020 11:36:19 +0800 Subject: [PATCH 28/28] =?UTF-8?q?=E5=85=BC=E5=AE=B9primaryKey?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/flink/sql/sink/redis/table/RedisSinkParser.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java index e965eeecb..8961f7da9 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java @@ -50,11 +50,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map primaryKeysList = Lists.newArrayList(); if (!StringUtils.isEmpty(primaryKeysStr)) { + List primaryKeysList = Lists.newArrayList(); primaryKeysList = Arrays.asList(StringUtils.split(primaryKeysStr, ",")); + redisTableInfo.setPrimaryKeys(primaryKeysList); } - redisTableInfo.setPrimaryKeys(primaryKeysList); return redisTableInfo; }