From 1685347a38768d90fcd7e8d550e6a60526a01039 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Mon, 30 Sep 2024 10:13:54 -0400 Subject: [PATCH 1/8] added docker-compose file for testing --- docker-compose.yml | 143 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..c7edf416 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,143 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +version: '2' +services: + controller-1: + image: ${IMAGE} + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: 'controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LISTENERS: 'CONTROLLER://:9093' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + + controller-2: + image: ${IMAGE} + environment: + KAFKA_NODE_ID: 2 + KAFKA_PROCESS_ROLES: 'controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LISTENERS: 'CONTROLLER://:9093' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + + controller-3: + image: ${IMAGE} + environment: + KAFKA_NODE_ID: 3 + KAFKA_PROCESS_ROLES: 'controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LISTENERS: 'CONTROLLER://:9093' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + + kafka-1: + image: ${IMAGE} + ports: + - 29092:9092 + hostname: kafka-1 + container_name: kafka-1 + environment: + KAFKA_NODE_ID: 4 + KAFKA_PROCESS_ROLES: 'broker' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' + KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://localhost:29092' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + depends_on: + - controller-1 + - controller-2 + - controller-3 + + kafka-2: + image: ${IMAGE} + ports: + - 39092:9092 + hostname: kafka-2 + container_name: kafka-2 + environment: + KAFKA_NODE_ID: 5 + KAFKA_PROCESS_ROLES: 'broker' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-2:19092,PLAINTEXT_HOST://localhost:39092' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + depends_on: + - controller-1 + - controller-2 + - controller-3 + + kafka-3: + image: ${IMAGE} + ports: + - 49092:9092 + hostname: kafka-3 + container_name: kafka-3 + environment: + KAFKA_NODE_ID: 6 + KAFKA_PROCESS_ROLES: 'broker' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-3:19092,PLAINTEXT_HOST://localhost:49092' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + depends_on: + - controller-1 + - controller-2 + - controller-3 \ No newline at end of file From 604dd838fce0e4a6892b5caf3d039f0685127ac3 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Fri, 4 Oct 2024 10:28:38 -0400 Subject: [PATCH 2/8] inital hashing logic added to kafka --- cmd/main.go | 23 ------------ internal/sink/hasher.go | 81 +++++++++++++++++++++++++++++++++++++++++ internal/sink/sink.go | 33 +++++++++++++---- 3 files changed, 106 insertions(+), 31 deletions(-) delete mode 100644 cmd/main.go create mode 100644 internal/sink/hasher.go diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index c30113a3..00000000 --- a/cmd/main.go +++ /dev/null @@ -1,23 +0,0 @@ -// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 - -package main - -import ( - "fmt" - "os" - - "github.com/xmidt-org/caduceus" -) - -func main() { - - err := caduceus.Caduceus(os.Args[1:], true) - - if err == nil { - return - } - - fmt.Fprintln(os.Stderr, err) - os.Exit(-1) -} diff --git a/internal/sink/hasher.go b/internal/sink/hasher.go new file mode 100644 index 00000000..6a1dc0ec --- /dev/null +++ b/internal/sink/hasher.go @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package sink + +import ( + "fmt" + "hash/crc32" + "reflect" + "sort" + + "github.com/xmidt-org/wrp-go/v3" +) + +type Node struct { + hash int + sink string +} + +type HashRing []Node + +func (h HashRing) Len() int { + return len(h) +} +func (h HashRing) Less(i, j int) bool { + return h[i].hash < h[j].hash +} +func (h HashRing) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h HashRing) GetServer(key string) string { + if len(h) == 0 { + return "" + } + hash := int(crc32.ChecksumIEEE([]byte(key))) + idx := sort.Search(len(h), func(i int) bool { + return h[i].hash >= hash + }) + if idx == len(h) { + idx = 0 + } + return h[idx].sink +} + +func (h *HashRing) AddServer(server string) { + hash := int(crc32.ChecksumIEEE([]byte(server))) + node := Node{hash: hash, sink: server} + *h = append(*h, node) + sort.Sort(h) +} + +func (h *HashRing) RemoveServer(server string) { + hash := int(crc32.ChecksumIEEE([]byte(server))) + for i, node := range *h { + if node.hash == hash { + *h = append((*h)[:i], (*h)[i+1:]...) + break + } + } + sort.Sort(h) +} + +func NewRing() *HashRing { + return &HashRing{} +} + +func GetKey(field string, msg *wrp.Message) string { + + v := reflect.ValueOf(msg) + if v.Kind() == reflect.Ptr { + v = v.Elem() // Dereference pointer if necessary + } + + value := v.FieldByName(field) + if value.IsValid() { + return fmt.Sprintf("%v", value.Interface()) + } + + return "" + +} diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 373aa6ee..294d50ac 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -44,13 +44,18 @@ type WebhookV1 struct { type Webhooks []*WebhookV1 type CommonWebhook struct { id string + hashField string failureUrl string deliveryInterval time.Duration deliveryRetries int mutex sync.RWMutex logger *zap.Logger } -type Kafkas []*Kafka +type KafkaSink struct { + Kafkas []*Kafka + Hash *HashRing + HashField string +} type Kafka struct { brokerAddr []string topic string @@ -76,11 +81,15 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { return whs } if len(l.Registration.Kafkas) > 0 { - var sink Kafkas - for _, k := range l.Registration.Kafkas { + var sink KafkaSink + r := NewRing() + sink.HashField = l.Registration.Hash.Field + for i, k := range l.Registration.Kafkas { kafka := &Kafka{} kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) - sink = append(sink, kafka) + sink.Kafkas = append(sink.Kafkas, kafka) + key := l.Registration.Hash.Field + strconv.Itoa(i) + r.AddServer(key) } return sink } @@ -91,8 +100,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { } func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) (err error) { - //TODO: is there anything else that needs to be done for this? - //do we need to return an error + //TODO: do we need to return an error if not - we should get rid of the error return v1.id = id v1.failureUrl = failureUrl v1.deliveryInterval = c.DeliveryInterval @@ -338,11 +346,20 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * return nil } -func (k Kafkas) Send(secret string, acceptType string, msg *wrp.Message) error { +func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { + key := GetKey(k.HashField, msg) + for i, kafka := range k.Kafkas { + hash := k.HashField + strconv.Itoa(i) + server := k.Hash.GetServer(key) + if server == hash { + _ = kafka.send(secret, acceptType, msg) + } + + } //TODO: discuss with wes and john the default hashing logic //for now: when no hash is given we will just loop through all the kafkas var errs error - for _, kafka := range k { + for _, kafka := range k.Kafkas { err := kafka.send(secret, acceptType, msg) if err != nil { errs = errors.Join(errs, err) From 7f7060c346a12f11c88f79ad722620e199c5d983 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Fri, 4 Oct 2024 14:17:54 -0400 Subject: [PATCH 3/8] fixed panic: added hashring to sink --- internal/sink/sink.go | 44 +++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 294d50ac..5b3c76f5 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -44,7 +44,6 @@ type WebhookV1 struct { type Webhooks []*WebhookV1 type CommonWebhook struct { id string - hashField string failureUrl string deliveryInterval time.Duration deliveryRetries int @@ -88,9 +87,12 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { kafka := &Kafka{} kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) sink.Kafkas = append(sink.Kafkas, kafka) - key := l.Registration.Hash.Field + strconv.Itoa(i) - r.AddServer(key) + if l.Registration.Hash.Field != "" { + key := l.Registration.Hash.Field + strconv.Itoa(i) + r.AddServer(key) + } } + sink.Hash = r return sink } default: @@ -347,22 +349,28 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * } func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { - key := GetKey(k.HashField, msg) - for i, kafka := range k.Kafkas { - hash := k.HashField + strconv.Itoa(i) - server := k.Hash.GetServer(key) - if server == hash { - _ = kafka.send(secret, acceptType, msg) - } - - } - //TODO: discuss with wes and john the default hashing logic - //for now: when no hash is given we will just loop through all the kafkas var errs error - for _, kafka := range k.Kafkas { - err := kafka.send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) + if k.HashField != "" { + key := GetKey(k.HashField, msg) + for i, kafka := range k.Kafkas { + hash := k.HashField + strconv.Itoa(i) + server := k.Hash.GetServer(key) + if server == hash { + err := kafka.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + + } + } else { + //TODO: discuss with wes and john the default hashing logic + //for now: when no hash is given we will just loop through all the kafkas + for _, kafka := range k.Kafkas { + err := kafka.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } } } return errs From 845d5adf0e0305609bfc62ffe72f52d573e58bb9 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 10 Oct 2024 14:54:58 -0400 Subject: [PATCH 4/8] changed caduceus line in .gitignore to caduceus.yaml --- .gitignore | 3 ++- cmd/caduceus/main.go | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 cmd/caduceus/main.go diff --git a/.gitignore b/.gitignore index 80444360..89c449a0 100644 --- a/.gitignore +++ b/.gitignore @@ -51,9 +51,10 @@ _testmain.go .vscode/* .dev/* -caduceus +caduceus.yaml .ignore !deploy/helm/caduceus deploy/helm/caduceus/rendered.* + diff --git a/cmd/caduceus/main.go b/cmd/caduceus/main.go new file mode 100644 index 00000000..c30113a3 --- /dev/null +++ b/cmd/caduceus/main.go @@ -0,0 +1,23 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "os" + + "github.com/xmidt-org/caduceus" +) + +func main() { + + err := caduceus.Caduceus(os.Args[1:], true) + + if err == nil { + return + } + + fmt.Fprintln(os.Stderr, err) + os.Exit(-1) +} From aebbdfdcb7b27429b6b5d3eac51b409931f53267 Mon Sep 17 00:00:00 2001 From: Maura Fortino Date: Thu, 10 Oct 2024 15:04:17 -0400 Subject: [PATCH 5/8] Delete docker-compose.yml removing file from this commit - not necessary. will add it back in later. --- docker-compose.yml | 143 --------------------------------------------- 1 file changed, 143 deletions(-) delete mode 100644 docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index c7edf416..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,143 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - ---- -version: '2' -services: - controller-1: - image: ${IMAGE} - environment: - KAFKA_NODE_ID: 1 - KAFKA_PROCESS_ROLES: 'controller' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LISTENERS: 'CONTROLLER://:9093' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - - controller-2: - image: ${IMAGE} - environment: - KAFKA_NODE_ID: 2 - KAFKA_PROCESS_ROLES: 'controller' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LISTENERS: 'CONTROLLER://:9093' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - - controller-3: - image: ${IMAGE} - environment: - KAFKA_NODE_ID: 3 - KAFKA_PROCESS_ROLES: 'controller' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LISTENERS: 'CONTROLLER://:9093' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - - kafka-1: - image: ${IMAGE} - ports: - - 29092:9092 - hostname: kafka-1 - container_name: kafka-1 - environment: - KAFKA_NODE_ID: 4 - KAFKA_PROCESS_ROLES: 'broker' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://localhost:29092' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - depends_on: - - controller-1 - - controller-2 - - controller-3 - - kafka-2: - image: ${IMAGE} - ports: - - 39092:9092 - hostname: kafka-2 - container_name: kafka-2 - environment: - KAFKA_NODE_ID: 5 - KAFKA_PROCESS_ROLES: 'broker' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-2:19092,PLAINTEXT_HOST://localhost:39092' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - depends_on: - - controller-1 - - controller-2 - - controller-3 - - kafka-3: - image: ${IMAGE} - ports: - - 49092:9092 - hostname: kafka-3 - container_name: kafka-3 - environment: - KAFKA_NODE_ID: 6 - KAFKA_PROCESS_ROLES: 'broker' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-3:19092,PLAINTEXT_HOST://localhost:49092' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - depends_on: - - controller-1 - - controller-2 - - controller-3 \ No newline at end of file From 7b89fda7c0d70ebb389a5d21609eae8aadb6c1fd Mon Sep 17 00:00:00 2001 From: maura fortino Date: Fri, 11 Oct 2024 12:10:38 -0400 Subject: [PATCH 6/8] updated hashing logic --- internal/sink/hasher.go | 10 +++------- internal/sink/sink.go | 27 +++++++++++++-------------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/internal/sink/hasher.go b/internal/sink/hasher.go index 6a1dc0ec..2d1a5fcc 100644 --- a/internal/sink/hasher.go +++ b/internal/sink/hasher.go @@ -28,7 +28,7 @@ func (h HashRing) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h HashRing) GetServer(key string) string { +func (h HashRing) Get(key string) string { if len(h) == 0 { return "" } @@ -42,14 +42,14 @@ func (h HashRing) GetServer(key string) string { return h[idx].sink } -func (h *HashRing) AddServer(server string) { +func (h *HashRing) Add(server string) { hash := int(crc32.ChecksumIEEE([]byte(server))) node := Node{hash: hash, sink: server} *h = append(*h, node) sort.Sort(h) } -func (h *HashRing) RemoveServer(server string) { +func (h *HashRing) Remove(server string) { hash := int(crc32.ChecksumIEEE([]byte(server))) for i, node := range *h { if node.hash == hash { @@ -60,10 +60,6 @@ func (h *HashRing) RemoveServer(server string) { sort.Sort(h) } -func NewRing() *HashRing { - return &HashRing{} -} - func GetKey(field string, msg *wrp.Message) string { v := reflect.ValueOf(msg) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 5b3c76f5..67470153 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -51,7 +51,7 @@ type CommonWebhook struct { logger *zap.Logger } type KafkaSink struct { - Kafkas []*Kafka + Kafkas map[string]*Kafka Hash *HashRing HashField string } @@ -81,15 +81,15 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { } if len(l.Registration.Kafkas) > 0 { var sink KafkaSink - r := NewRing() + r := &HashRing{} sink.HashField = l.Registration.Hash.Field for i, k := range l.Registration.Kafkas { kafka := &Kafka{} kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) - sink.Kafkas = append(sink.Kafkas, kafka) + sink.Kafkas[strconv.Itoa(i)] = kafka if l.Registration.Hash.Field != "" { - key := l.Registration.Hash.Field + strconv.Itoa(i) - r.AddServer(key) + r.Add(strconv.Itoa(i)) + } } sink.Hash = r @@ -351,18 +351,17 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { var errs error if k.HashField != "" { - key := GetKey(k.HashField, msg) - for i, kafka := range k.Kafkas { - hash := k.HashField + strconv.Itoa(i) - server := k.Hash.GetServer(key) - if server == hash { - err := kafka.send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) - } + + if kafka, ok := k.Kafkas[k.Hash.Get(GetKey(k.HashField, msg))]; ok { + err := kafka.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) } + } else { + errs = fmt.Errorf("could not find kakfa for the related hash %v", k.HashField) } + } else { //TODO: discuss with wes and john the default hashing logic //for now: when no hash is given we will just loop through all the kafkas From e9371d136d149696cc615e6c64f4514ba9fa3bf8 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Mon, 14 Oct 2024 16:34:34 -0400 Subject: [PATCH 7/8] made minor change to kafka error handling --- internal/sink/sink.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 67470153..ac27ab23 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -350,18 +350,14 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { var errs error - if k.HashField != "" { - + if len(*k.Hash) == len(k.Kafkas) { + //TODO: flush out the error handling for kafka if kafka, ok := k.Kafkas[k.Hash.Get(GetKey(k.HashField, msg))]; ok { err := kafka.send(secret, acceptType, msg) if err != nil { errs = errors.Join(errs, err) } - - } else { - errs = fmt.Errorf("could not find kakfa for the related hash %v", k.HashField) } - } else { //TODO: discuss with wes and john the default hashing logic //for now: when no hash is given we will just loop through all the kafkas From a444a1cc6cd7c327d964b8bf2487c53427ef1cbd Mon Sep 17 00:00:00 2001 From: maura fortino Date: Mon, 14 Oct 2024 16:40:36 -0400 Subject: [PATCH 8/8] updated error handling for Update functions --- internal/sink/sink.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index ac27ab23..8af7e476 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -85,7 +85,10 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { sink.HashField = l.Registration.Hash.Field for i, k := range l.Registration.Kafkas { kafka := &Kafka{} - kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) + err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) + if err != nil { + return nil + } sink.Kafkas[strconv.Itoa(i)] = kafka if l.Registration.Hash.Field != "" { r.Add(strconv.Itoa(i)) @@ -101,7 +104,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { return nil } -func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) (err error) { +func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { //TODO: do we need to return an error if not - we should get rid of the error return v1.id = id v1.failureUrl = failureUrl @@ -115,7 +118,6 @@ func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failu } v1.updateUrls(urlCount, receiverUrl, altUrls) - return nil } func (v1 *WebhookV1) updateUrls(urlCount int, url string, urls []string) {