diff --git a/internal/sink/hasher.go b/internal/sink/hasher.go index 6a1dc0ec..2d1a5fcc 100644 --- a/internal/sink/hasher.go +++ b/internal/sink/hasher.go @@ -28,7 +28,7 @@ func (h HashRing) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h HashRing) GetServer(key string) string { +func (h HashRing) Get(key string) string { if len(h) == 0 { return "" } @@ -42,14 +42,14 @@ func (h HashRing) GetServer(key string) string { return h[idx].sink } -func (h *HashRing) AddServer(server string) { +func (h *HashRing) Add(server string) { hash := int(crc32.ChecksumIEEE([]byte(server))) node := Node{hash: hash, sink: server} *h = append(*h, node) sort.Sort(h) } -func (h *HashRing) RemoveServer(server string) { +func (h *HashRing) Remove(server string) { hash := int(crc32.ChecksumIEEE([]byte(server))) for i, node := range *h { if node.hash == hash { @@ -60,10 +60,6 @@ func (h *HashRing) RemoveServer(server string) { sort.Sort(h) } -func NewRing() *HashRing { - return &HashRing{} -} - func GetKey(field string, msg *wrp.Message) string { v := reflect.ValueOf(msg) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 5b3c76f5..67470153 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -51,7 +51,7 @@ type CommonWebhook struct { logger *zap.Logger } type KafkaSink struct { - Kafkas []*Kafka + Kafkas map[string]*Kafka Hash *HashRing HashField string } @@ -81,15 +81,15 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { } if len(l.Registration.Kafkas) > 0 { var sink KafkaSink - r := NewRing() + r := &HashRing{} sink.HashField = l.Registration.Hash.Field for i, k := range l.Registration.Kafkas { kafka := &Kafka{} kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) - sink.Kafkas = append(sink.Kafkas, kafka) + sink.Kafkas[strconv.Itoa(i)] = kafka if l.Registration.Hash.Field != "" { - key := l.Registration.Hash.Field + strconv.Itoa(i) - r.AddServer(key) + r.Add(strconv.Itoa(i)) + } } sink.Hash = r @@ -351,18 +351,17 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger * func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error { var errs error if k.HashField != "" { - key := GetKey(k.HashField, msg) - for i, kafka := range k.Kafkas { - hash := k.HashField + strconv.Itoa(i) - server := k.Hash.GetServer(key) - if server == hash { - err := kafka.send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) - } + + if kafka, ok := k.Kafkas[k.Hash.Get(GetKey(k.HashField, msg))]; ok { + err := kafka.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) } + } else { + errs = fmt.Errorf("could not find kakfa for the related hash %v", k.HashField) } + } else { //TODO: discuss with wes and john the default hashing logic //for now: when no hash is given we will just loop through all the kafkas