diff --git a/.gitignore b/.gitignore index 80444360..89c449a0 100644 --- a/.gitignore +++ b/.gitignore @@ -51,9 +51,10 @@ _testmain.go .vscode/* .dev/* -caduceus +caduceus.yaml .ignore !deploy/helm/caduceus deploy/helm/caduceus/rendered.* + diff --git a/cmd/main.go b/cmd/caduceus/main.go similarity index 100% rename from cmd/main.go rename to cmd/caduceus/main.go diff --git a/internal/sink/hasher.go b/internal/sink/hasher.go new file mode 100644 index 00000000..2d1a5fcc --- /dev/null +++ b/internal/sink/hasher.go @@ -0,0 +1,77 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package sink + +import ( + "fmt" + "hash/crc32" + "reflect" + "sort" + + "github.com/xmidt-org/wrp-go/v3" +) + +type Node struct { + hash int + sink string +} + +type HashRing []Node + +func (h HashRing) Len() int { + return len(h) +} +func (h HashRing) Less(i, j int) bool { + return h[i].hash < h[j].hash +} +func (h HashRing) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h HashRing) Get(key string) string { + if len(h) == 0 { + return "" + } + hash := int(crc32.ChecksumIEEE([]byte(key))) + idx := sort.Search(len(h), func(i int) bool { + return h[i].hash >= hash + }) + if idx == len(h) { + idx = 0 + } + return h[idx].sink +} + +func (h *HashRing) Add(server string) { + hash := int(crc32.ChecksumIEEE([]byte(server))) + node := Node{hash: hash, sink: server} + *h = append(*h, node) + sort.Sort(h) +} + +func (h *HashRing) Remove(server string) { + hash := int(crc32.ChecksumIEEE([]byte(server))) + for i, node := range *h { + if node.hash == hash { + *h = append((*h)[:i], (*h)[i+1:]...) + break + } + } + sort.Sort(h) +} + +func GetKey(field string, msg *wrp.Message) string { + + v := reflect.ValueOf(msg) + if v.Kind() == reflect.Ptr { + v = v.Elem() // Dereference pointer if necessary + } + + value := v.FieldByName(field) + if value.IsValid() { + return fmt.Sprintf("%v", value.Interface()) + } + + return "" + +} diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 373aa6ee..8af7e476 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -50,7 +50,11 @@ type CommonWebhook struct { mutex sync.RWMutex logger *zap.Logger } -type Kafkas []*Kafka +type KafkaSink struct { + Kafkas map[string]*Kafka + Hash *HashRing + HashField string +} type Kafka struct { brokerAddr []string topic string @@ -76,12 +80,22 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { return whs } if len(l.Registration.Kafkas) > 0 { - var sink Kafkas - for _, k := range l.Registration.Kafkas { + var sink KafkaSink + r := &HashRing{} + sink.HashField = l.Registration.Hash.Field + for i, k := range l.Registration.Kafkas { kafka := &Kafka{} - kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) - sink = append(sink, kafka) + err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) + if err != nil { + return nil + } + sink.Kafkas[strconv.Itoa(i)] = kafka + if l.Registration.Hash.Field != "" { + r.Add(strconv.Itoa(i)) + + } } + sink.Hash = r return sink } default: @@ -90,9 +104,8 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { return nil } -func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) (err error) { - //TODO: is there anything else that needs to be done for this? - //do we need to return an error +func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { + //TODO: do we need to return an error if not - we should get rid of the error return v1.id = id v1.failureUrl = failureUrl v1.deliveryInterval = c.DeliveryInterval @@ -105,7 +118,6 @@ func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failu } v1.updateUrls(urlCount, receiverUrl, altUrls) - return nil } func (v1 *WebhookV1) updateUrls(urlCount int, url string, urls []string) { @@ -338,14 +350,24 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * return nil } -func (k Kafkas) Send(secret string, acceptType string, msg *wrp.Message) error { - //TODO: discuss with wes and john the default hashing logic - //for now: when no hash is given we will just loop through all the kafkas +func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { var errs error - for _, kafka := range k { - err := kafka.send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) + if len(*k.Hash) == len(k.Kafkas) { + //TODO: flush out the error handling for kafka + if kafka, ok := k.Kafkas[k.Hash.Get(GetKey(k.HashField, msg))]; ok { + err := kafka.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + } else { + //TODO: discuss with wes and john the default hashing logic + //for now: when no hash is given we will just loop through all the kafkas + for _, kafka := range k.Kafkas { + err := kafka.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } } } return errs