From fe5ac28d00ea54cf3f65cd5eb4581ced263f2e46 Mon Sep 17 00:00:00 2001 From: d3akhtar Date: Sat, 30 Nov 2024 18:05:42 -0500 Subject: [PATCH] RequestReply: Add CESQL function #8318 --- .../cesql_correlationid_filter.go | 227 +++++++++++++++++ .../cesql_correlationid_filter_test.go | 235 ++++++++++++++++++ .../subscriptionsapi/cesql_filter.go | 2 +- .../subscriptionsapi/cesql_filter_test.go | 2 +- 4 files changed, 464 insertions(+), 2 deletions(-) create mode 100644 pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter.go create mode 100644 pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter_test.go diff --git a/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter.go b/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter.go new file mode 100644 index 00000000000..6a0b1fcf3ee --- /dev/null +++ b/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter.go @@ -0,0 +1,227 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package subscriptionsapi + +import ( + "context" + "crypto/aes" + "crypto/cipher" + "crypto/des" + "crypto/rc4" + "encoding/base64" + "encoding/hex" + "errors" + "os" + "path" + "regexp" + "strings" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "knative.dev/eventing/pkg/eventfilter" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + cesql "github.com/cloudevents/sdk-go/sql/v2" + cefn "github.com/cloudevents/sdk-go/sql/v2/function" + ceruntime "github.com/cloudevents/sdk-go/sql/v2/runtime" +) + +// Add a user defined function to validate correlation id, then return a cesql_filter +func NewCESQLCorrelationIdFilter(expr string) (eventfilter.Filter, error) { + + var correlationIdFilterFunction = cefn.NewFunction( + "KN_VERIFY_CORRELATIONID", + []cesql.Type{cesql.StringType}, + cesql.TypePtr(cesql.StringType), + cesql.BooleanType, + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + correlationId := i[0].(string) + + match, _ := regexp.MatchString(".*:.*", correlationId) + if !match { + return false, errors.New("correlationId Format: :") + } + + slice := strings.Split(correlationId, ":") + originalId := slice[0] + encryptedId := slice[1] + + encryptedIdBytes, err := decodeBase64OrHex(encryptedId) + if err != nil { + return false, err + } + + // Create a set of secret names to try looking for in k8s + secretNamesToTry := make(map[string]bool) + + // Iterate through secret names in argument list and add them to the set + for num, secret := range i { + if num != 0 { + secretNamesToTry[secret.(string)] = true + } + } + + secrets, err := getSecretsFromK8s() + if err != nil { + return false, err + } + + /* + * Go through each retrived secret + * Check if the secret has data for a key and algorithm + * Check if the secret name matches with one of the arguments + * Decrypt encryptedId using the current secret's key + * Check if the encrypted value matches with originalId + */ + for _, secret := range secrets { + key, keyFieldExists := secret.Data["key"] + algorithm, algorithmFieldExists := secret.Data["algorithm"] + + if keyFieldExists && algorithmFieldExists && secretNamesToTry[secret.Name] { + var decryptionFunc func(originalId string, encryptedIdBytes []byte, key []byte) (bool, error) + switch strings.ToUpper(string(algorithm)) { + case "AES", "AES-ECB": + decryptionFunc = compareWithAES + case "DES": + decryptionFunc = compareWithDES + case "3DES", "TRIPLEDES": + decryptionFunc = compareWithTripleDES + case "RC4": + decryptionFunc = compareWithRC4 + default: + return false, errors.New("cipher algorithm not supported") + } + + res, err := decryptionFunc(originalId, encryptedIdBytes, key) + if err != nil { + return false, err + } + if res { + return true, nil + } + } + } + + return false, nil + }, + ) + + ceruntime.AddFunction(correlationIdFilterFunction) + + return NewCESQLFilter(expr) +} + +func getSecretsFromK8s() ([]v1.Secret, error) { + // Assuming .kube/config is in the home directory + basePath, _ := os.UserHomeDir() + defaultKubeConfigPath := path.Join(basePath, ".kube", "config") + + // Set up k8s client to get secrets + config, err := clientcmd.BuildConfigFromFlags("", defaultKubeConfigPath) + if err != nil { + return nil, err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + secrets, err := clientset.CoreV1().Secrets("default").List(context.Background(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + + return secrets.Items, nil +} + +func decodeBase64OrHex(encryptedId string) ([]byte, error) { + hexRegex := regexp.MustCompile("^([0-9A-Fa-f]+)$") + base64Regex := regexp.MustCompile(`^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{4})$`) + + encryptedIdBytes := []byte(encryptedId) + + if hexRegex.Match(encryptedIdBytes) { + return hex.DecodeString(encryptedId) + } else if base64Regex.Match(encryptedIdBytes) { + return base64.StdEncoding.DecodeString(encryptedId) + } else { + return nil, errors.New("encryptedId must either be Base64 or Hex encoded") + } +} + +func compareWithAES(originalId string, encryptedIdBytes []byte, key []byte) (bool, error) { + block, err := aes.NewCipher(key) + if err != nil { + return false, err + } + plainText := getPlaintextFromBlock(block, encryptedIdBytes) + + return plainText == originalId, nil +} + +func compareWithDES(originalId string, encryptedIdBytes []byte, key []byte) (bool, error) { + block, err := des.NewCipher(key) + if err != nil { + return false, nil + } + plainText := getPlaintextFromBlock(block, encryptedIdBytes) + + return plainText == originalId, nil +} + +func compareWithTripleDES(originalId string, encryptedIdBytes []byte, key []byte) (bool, error) { + block, err := des.NewTripleDESCipher(key) + if err != nil { + return false, nil + } + plainText := getPlaintextFromBlock(block, encryptedIdBytes) + + return plainText == originalId, nil +} + +func compareWithRC4(originalId string, encryptedIdBytes []byte, key []byte) (bool, error) { + cipher, err := rc4.NewCipher(key) + if err != nil { + return false, nil + } + out := make([]byte, len(encryptedIdBytes)) + cipher.XORKeyStream(out, encryptedIdBytes) + plainText := string(out) + + return plainText == originalId, nil +} + +func getPlaintextFromBlock(block cipher.Block, encryptedIdBytes []byte) string { + plainText := make([]byte, len(encryptedIdBytes)) + for i, j := 0, block.BlockSize(); i < len(encryptedIdBytes); i, j = i+block.BlockSize(), j+block.BlockSize() { + block.Decrypt(plainText[i:j], encryptedIdBytes[i:j]) + } + trim := 0 + if len(plainText) > 0 { + trim = len(plainText) - int(plainText[len(plainText)-1]) + } + + if trim >= 0 { + trimmedPlaintext := string(plainText[:trim]) + + return trimmedPlaintext + } + + return "" +} diff --git a/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter_test.go b/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter_test.go new file mode 100644 index 00000000000..161e9766005 --- /dev/null +++ b/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package subscriptionsapi + +import ( + "context" + "errors" + "os" + "path" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + cloudevents "github.com/cloudevents/sdk-go/v2" + + "knative.dev/eventing/pkg/eventfilter" +) + +// Info to create k8s secret object during testing +type Secret struct { + Name string + Key string + Algorithm string +} + +var secrets = [5]Secret{ + { + Name: "secret1", + Key: "aesEncryptionKey", + Algorithm: "AES", + }, + { + Name: "secret2", + Key: "abcabcdefdefmnop", + Algorithm: "AES-ECB", + }, + { + Name: "secret3", + Key: "desEncKe", + Algorithm: "DES", + }, + { + Name: "secret4", + Key: "tripleDesKeyForEncrypter", + Algorithm: "3DES", + }, + { + Name: "secret5", + Key: "rc4EncKey", + Algorithm: "RC4", + }, +} + +func TestCESQLCorrelationIdFilter(t *testing.T) { + tests := map[string]struct { + expression string + event *cloudevents.Event + want eventfilter.FilterResult + }{ + "1. CorrelationId encoded in hex matches with 'secret1'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:2826A47C1C3325A5899235911B6F546F', 'secret1')", + want: eventfilter.PassFilter, + }, + "2. CorrelationId encoded in base64 matches with 'secret1'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:KCakfBwzJaWJkjWRG29Ubw==', 'secret1')", + want: eventfilter.PassFilter, + }, + "3. CorrelationId encoded in hex matches with 'secret2": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:dcd075b3679cf81325a09e0786b87f87', 'secret2')", + want: eventfilter.PassFilter, + }, + "4. CorrelationId encoded in base64 matches with 'secret2'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:3NB1s2ec+BMloJ4Hhrh/hw==', 'secret2')", + want: eventfilter.PassFilter, + }, + "5. CorrelationId encoded in base64 matches with 'secret3'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:qpCNy1Dy5aXWnXzgiAfg6w==', 'secret3')", + want: eventfilter.PassFilter, + }, + "6. CorrelationId encoded in hex matches with 'secret3'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:aa908dcb50f2e5a5d69d7ce08807e0eb', 'secret3')", + want: eventfilter.PassFilter, + }, + "7. CorrelationId encoded in base64 matches with 'secret4'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:2jA1dTIjigEqR5eOruRjxA==', 'secret4')", + want: eventfilter.PassFilter, + }, + "8. CorrelationId encoded in hex matches with 'secret4": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:da30357532238a012a47978eaee463c4', 'secret4')", + want: eventfilter.PassFilter, + }, + "9. CorrelationId encoded in base64 matches with 'secret5'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:Jl9EcLh2qv8t', 'secret5')", + want: eventfilter.PassFilter, + }, + "10. CorrelationId encoded in hex matches with 'secret5": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:265f4470b876aaff2d', 'secret5')", + want: eventfilter.PassFilter, + }, + "11. CorrelationId encoded in hex fails to match for 'secret2' and 'secret3'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:2826A47C1C3325A5899235911B6F546F', 'secret2', 'secret3')", + want: eventfilter.FailFilter, + }, + "12. CorrelationId encoded in base64 fails to match for 'secret2' and 'secret3'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:KCakfBwzJaWJkjWRG29Ubw==', 'secret2', 'secret3')", + want: eventfilter.FailFilter, + }, + "13. CorrelationId encoded in base64 fails to match for 'secret6'": { + expression: "KN_VERIFY_CORRELATIONID('randomId1:KCakfBwzJaWJkjWRG29Ubw==', 'secret6')", + want: eventfilter.FailFilter, + }, + "14. CorrelationId encoded in hex matches with 'secret2'": { + expression: "KN_VERIFY_CORRELATIONID('97cdf5c1-7826-4e4b-9406-c1d9f18b7740:349c52325d623791549f83a3cffb328bd68a3e2f641227b804c2570c39a6b4d3c7bc25b1e39b5c5539eedfb21c6b0084', 'secret2')", + want: eventfilter.PassFilter, + }, + "15. CorrelationId encoded in base64 matches with 'secret2'": { + expression: "KN_VERIFY_CORRELATIONID('97cdf5c1-7826-4e4b-9406-c1d9f18b7740:NJxSMl1iN5FUn4Ojz/syi9aKPi9kEie4BMJXDDmmtNPHvCWx45tcVTnu37IcawCE', 'secret2')", + want: eventfilter.PassFilter, + }, + "16. CorrelationId encoded in base64 matches with 'secret1'": { + expression: "KN_VERIFY_CORRELATIONID('b2e5c373-4454-46d7-805d-e2038263ae3e:/deOV4nw7c8xJm7KjDVDNtavo4XtiHe1acaLbooTJ13AXgORw/PWsDdhHb3qkXBI', 'secret1')", + want: eventfilter.PassFilter, + }, + "17. CorrelationId encoded in hex matches with 'secret4'": { + expression: "KN_VERIFY_CORRELATIONID('c59d9e75-12dd-4ff6-b721-2c5418c92c94:e3014db9448418d55189bfc6ad4533071c2a1cc2eb3698dc1c061e8b06c240f64aca74132532d4bc', 'secret4')", + want: eventfilter.PassFilter, + }, + } + + deleteK8TestSecrets() + + err := createK8TestSecrets() + if err != nil { + t.Fatalf("Error creating k8s secrets for testing. %v", err) + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + e := tt.event + if e == nil { + e = makeEvent() + } + f, err := NewCESQLCorrelationIdFilter(tt.expression) + if err != nil { + t.Fatalf("Error instanciating CESQL CorrelationId filter. %v", err) + } + if got := f.Filter(context.TODO(), *e); got != tt.want { + t.Errorf("Filter() = %v, want %v", got, tt.want) + } + }) + } + + err = deleteK8TestSecrets() + if err != nil { + t.Fatalf("Error deleting k8s secrets for testing. %v", err) + } +} +func initClient() (kubernetes.Clientset, error) { + clientset := &kubernetes.Clientset{} + + // Assuming .kube/config is in the home directory + basePath, _ := os.UserHomeDir() + defaultKubeConfigPath := path.Join(basePath, ".kube", "config") + + // Set up k8s client to get secrets + config, err := clientcmd.BuildConfigFromFlags("", defaultKubeConfigPath) + if err != nil { + return *clientset, err + } + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return *clientset, err + } + + return *clientset, nil +} + +func createK8TestSecrets() error { + clientset, err := initClient() + if err != nil { + return err + } + + for _, secret := range secrets { + _, err := clientset.CoreV1().Secrets("default").Get(context.TODO(), secret.Name, metav1.GetOptions{}) + if err == nil { + return errors.New(secret.Name + " already exists") + } + + data := map[string][]byte{ + "key": []byte(secret.Key), + "algorithm": []byte(secret.Algorithm), + } + objectMetadata := metav1.ObjectMeta{Name: secret.Name} + k8secret := &v1.Secret{Data: data, ObjectMeta: objectMetadata} + + _, err = clientset.CoreV1().Secrets("default").Create(context.TODO(), k8secret, metav1.CreateOptions{}) + + if err != nil { + return err + } + } + + return nil +} + +func deleteK8TestSecrets() error { + clientset, err := initClient() + if err != nil { + return err + } + + for _, secret := range secrets { + _ = clientset.CoreV1().Secrets("default").Delete(context.TODO(), secret.Name, metav1.DeleteOptions{}) + } + + return nil +} diff --git a/pkg/eventfilter/subscriptionsapi/cesql_filter.go b/pkg/eventfilter/subscriptionsapi/cesql_filter.go index 00a6d2ae325..c190faaf5d2 100644 --- a/pkg/eventfilter/subscriptionsapi/cesql_filter.go +++ b/pkg/eventfilter/subscriptionsapi/cesql_filter.go @@ -66,7 +66,7 @@ func (filter *ceSQLFilter) Filter(ctx context.Context, event cloudevents.Event) } if !res.(bool) { - logger.Debugw("CESOL match failed.", zap.String("expression", filter.rawExpression), zap.Any("event", event)) + logger.Debugw("CESQL match failed.", zap.String("expression", filter.rawExpression), zap.Any("event", event)) return eventfilter.FailFilter } return eventfilter.PassFilter diff --git a/pkg/eventfilter/subscriptionsapi/cesql_filter_test.go b/pkg/eventfilter/subscriptionsapi/cesql_filter_test.go index 93a10c281ba..1ea952e2be9 100644 --- a/pkg/eventfilter/subscriptionsapi/cesql_filter_test.go +++ b/pkg/eventfilter/subscriptionsapi/cesql_filter_test.go @@ -78,7 +78,7 @@ func TestCESQLFilter(t *testing.T) { } f, err := NewCESQLFilter(tt.expression) if err != nil { - t.Fatalf("Error inistanciating CESQL filter. %v", err) + t.Fatalf("Error instanciating CESQL filter. %v", err) } if got := f.Filter(context.TODO(), *e); got != tt.want { t.Errorf("Filter() = %v, want %v", got, tt.want)