Skip to content

Commit

Permalink
extract cert and key from JKS
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Jan 19, 2021
1 parent 2e86805 commit 1f595b5
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 12 deletions.
15 changes: 10 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ type KafkaConfig struct {
Brokers string
Version string
TLS struct {
Enable bool
CaCertFiles string // Required. It's the CA certificate with which Kafka brokers certs be signed.
ClientCertFile string // Required if Kafka brokers require client authentication.
ClientKeyFile string // Required if and only if ClientCertFile is present.
InsecureSkipVerify bool // Whether disable broker FQDN verification.
Enable bool
CaCertFiles string // Required. It's the CA cert.pem with which Kafka brokers certs be signed.
ClientCertFile string // Required for client authentication. It's client cert.pem.
ClientKeyFile string // Required if and only if ClientCertFile is present. It's client key.pem.

TrustStoreLocation string //JKS format of CA certificate, used to extract CA cert.pem.
TrustStorePassword string
KeystoreLocation string //JKS format of client certificate and key, used to extrace client cert.pem and key.pem.
KeystorePassword string
EndpIdentAlgo string
}
//simplified sarama.Config.Net.SASL to only support SASL/PLAIN and SASL/GSSAPI(Kerberos)
Sasl struct {
Expand Down
2 changes: 0 additions & 2 deletions docs/configuration/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
clientCertFile: "",
// Required if and only if ClientCertFile is present.
clientKeyFile: "",
// Whether disable broker FQDN verification. Set it to `true` if kafka-console-consumer.sh uses `ssl.endpoint.identification.algorithm=`.
"insecureSkipVerify": true
}
// SASL
Expand Down
21 changes: 19 additions & 2 deletions docs/dev/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ An example kafka config:
- Encryption using SSL

An example kafka config:
```
"kafka": {
"brokers": "192.168.31.64:9093",
"version": "2.2.1",
"tls": {
"enable": true,
"@trustStoreLocation": "ssl.truststore.location which kafka-console-consumer.sh uses",
"trustStoreLocation": "/etc/security/kafka.client.truststore.jks",
"@trustStorePassword": "ssl.truststore.password which kafka-console-consumer.sh uses",
"trustStorePassword": "123456",
"@endpIdentAlgo": "ssl.endpoint.identification.algorithm which kafka-console-consumer.sh uses",
"endpIdentAlgo": ""
}
}
```

Or if you have extracted certificates from JKS, use the following config:
```
"kafka": {
"brokers": "192.168.31.64:9093",
Expand All @@ -66,8 +83,8 @@ An example kafka config:
"enable": true,
"@caCertFiles": "Required. It's the CA certificate with which Kafka brokers certs be signed. This cert is added to kafka.client.truststore.jks which kafka-console-consumer.sh uses",
"caCertFiles": "/etc/security/ca-cert",
"@insecureSkipVerify": "Whether disable broker FQDN verification. Set it to `true` if kafka-console-consumer.sh uses `ssl.endpoint.identification.algorithm=`.",
"insecureSkipVerify": true
"@endpIdentAlgo": "ssl.endpoint.identification.algorithm which kafka-console-consumer.sh uses",
"endpIdentAlgo": ""
}
}
```
Expand Down
12 changes: 11 additions & 1 deletion input/kafka_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,20 @@ func (k *KafkaGo) Init(cfg *config.Config, taskName string, putFn func(msg model
CommitInterval: time.Second, // flushes commits to Kafka every second
ErrorLogger: log.StandardLogger(), //kafka-go INFO log is too verbose
}
if kfkCfg.TLS.CaCertFiles == "" && kfkCfg.TLS.TrustStoreLocation != "" {
if kfkCfg.TLS.CaCertFiles, _, err = util.JksToPem(kfkCfg.TLS.TrustStoreLocation, kfkCfg.TLS.TrustStorePassword, false); err != nil {
return
}
}
if kfkCfg.TLS.ClientKeyFile == "" && kfkCfg.TLS.KeystoreLocation != "" {
if kfkCfg.TLS.ClientCertFile, kfkCfg.TLS.ClientKeyFile, err = util.JksToPem(kfkCfg.TLS.KeystoreLocation, kfkCfg.TLS.KeystorePassword, false); err != nil {
return
}
}
var dialer *kafka.Dialer
if kfkCfg.TLS.Enable {
var tlsConfig *tls.Config
if tlsConfig, err = util.NewTLSConfig(kfkCfg.TLS.CaCertFiles, kfkCfg.TLS.ClientCertFile, kfkCfg.TLS.ClientKeyFile, kfkCfg.TLS.InsecureSkipVerify); err != nil {
if tlsConfig, err = util.NewTLSConfig(kfkCfg.TLS.CaCertFiles, kfkCfg.TLS.ClientCertFile, kfkCfg.TLS.ClientKeyFile, kfkCfg.TLS.EndpIdentAlgo == ""); err != nil {
return
}
dialer = &kafka.Dialer{
Expand Down
12 changes: 11 additions & 1 deletion input/kafka_sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,19 @@ func (k *KafkaSarama) Init(cfg *config.Config, taskName string, putFn func(msg m
return
}
sarama.Logger = log.StandardLogger()
if kfkCfg.TLS.CaCertFiles == "" && kfkCfg.TLS.TrustStoreLocation != "" {
if kfkCfg.TLS.CaCertFiles, _, err = util.JksToPem(kfkCfg.TLS.TrustStoreLocation, kfkCfg.TLS.TrustStorePassword, false); err != nil {
return
}
}
if kfkCfg.TLS.ClientKeyFile == "" && kfkCfg.TLS.KeystoreLocation != "" {
if kfkCfg.TLS.ClientCertFile, kfkCfg.TLS.ClientKeyFile, err = util.JksToPem(kfkCfg.TLS.KeystoreLocation, kfkCfg.TLS.KeystorePassword, false); err != nil {
return
}
}
if kfkCfg.TLS.Enable {
config.Net.TLS.Enable = true
if config.Net.TLS.Config, err = util.NewTLSConfig(kfkCfg.TLS.CaCertFiles, kfkCfg.TLS.ClientCertFile, kfkCfg.TLS.ClientKeyFile, kfkCfg.TLS.InsecureSkipVerify); err != nil {
if config.Net.TLS.Config, err = util.NewTLSConfig(kfkCfg.TLS.CaCertFiles, kfkCfg.TLS.ClientCertFile, kfkCfg.TLS.ClientKeyFile, kfkCfg.TLS.EndpIdentAlgo == ""); err != nil {
return
}
}
Expand Down
52 changes: 51 additions & 1 deletion util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ limitations under the License.
package util

import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/fagongzi/goetty"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -163,3 +167,49 @@ func EnvBoolVar(value *bool, key string) {
*value = true
}
}

// JksToPem converts JKS to PEM
// Refers to:
// https://serverfault.com/questions/715827/how-to-generate-key-and-crt-file-from-jks-file-for-httpd-apache-server
func JksToPem(jksPath, jksPassword string, overwrite bool) (certPemPath, keyPemPath string, err error) {
dir, fn := filepath.Split(jksPath)
certPemPath = filepath.Join(dir, fn+".cert.pem")
keyPemPath = filepath.Join(dir, fn+".key.pem")
pkcs12Path := filepath.Join(dir, fn+".p12")
if overwrite {
for _, fp := range []string{certPemPath, keyPemPath, pkcs12Path} {
if err = os.RemoveAll(fp); err != nil {
err = errors.Wrapf(err, "")
return
}
}
} else {
for _, fp := range []string{certPemPath, keyPemPath, pkcs12Path} {
if _, err = os.Stat(fp); err == nil {
return
}
}
}
cmds := [][]string{
{"keytool", "-importkeystore", "-srckeystore", jksPath, "-destkeystore", pkcs12Path, "-deststoretype", "PKCS12"},
{"openssl", "pkcs12", "-in", pkcs12Path, "-nokeys", "-out", certPemPath, "-passin", "env:password"},
{"openssl", "pkcs12", "-in", pkcs12Path, "-nodes", "-nocerts", "-out", keyPemPath, "-passin", "env:password"},
}
for _, cmd := range cmds {
log.Infof(strings.Join(cmd, " "))
exe := exec.Command(cmd[0], cmd[1:]...)
if cmd[0] == "keytool" {
exe.Stdin = bytes.NewReader([]byte(jksPassword + "\n" + jksPassword + "\n" + jksPassword))
} else if cmd[0] == "openssl" {
exe.Env = []string{fmt.Sprintf("password=%s", jksPassword)}
}
var out []byte
out, err = exe.CombinedOutput()
log.Infof(string(out))
if err != nil {
err = errors.Wrapf(err, "")
return
}
}
return
}
24 changes: 24 additions & 0 deletions util/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package util

import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func TestJksToPem(t *testing.T) {
var err error
var certPemPath, keyPemPath string
jksPaths := []string{"kafka.client.truststore.jks", "kafka.client.keystore.jks"}
jksPassword := "123456"
for _, jksPath := range jksPaths {
if _, err = os.Stat(jksPath); err != nil {
continue
}
certPemPath, keyPemPath, err = JksToPem(jksPath, jksPassword, true)
require.Nil(t, err, "err should be nothing")
fmt.Printf("converted %s to %s, %s\n", jksPath, certPemPath, keyPemPath)
}
}

0 comments on commit 1f595b5

Please sign in to comment.