Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
eko committed Oct 22, 2021
2 parents 58fc208 + 7758199 commit 98b35fa
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 4 deletions.
9 changes: 5 additions & 4 deletions internal/transformer/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ loop:
defer wg.Done()
defer func() {
if r := recover(); r != nil {
w.collectAfterOne(msg, toErr(r), start, th)
w.collectAfter(msg, toErr(r), start, th)
w.log.Errorf("worker: recovered from panic: %v", string(debug.Stack()))
chunk[index] = nil
}
Expand All @@ -98,7 +98,7 @@ loop:
w.log.Debugf("worker: #%v, message received %v, working...", index, msg)

chunk[index] = w.transformer.Transform(msg)
w.collectAfter(chunk[index], nil, start, th)
w.collectAfter(msg, nil, start, th)

w.log.Debugf("worker: #%v, work done %v", index, msg)
}(counter, msg)
Expand Down Expand Up @@ -139,13 +139,14 @@ func (w Workers) collectBefore(msg *confluent.Message, start time.Time) (th _ins
return
}

/*
func (w Workers) collectAfter(msgs []*confluent.Message, err error, start time.Time, th _instrument.TimeHolder) {
for _, msg := range msgs {
w.collectAfterOne(msg, err, start, th)
}
}

func (w Workers) collectAfterOne(msg *confluent.Message, err error, start time.Time, th _instrument.TimeHolder) {
*/
func (w Workers) collectAfter(msg *confluent.Message, err error, start time.Time, th _instrument.TimeHolder) {
th.Opaque = msg.Opaque
msg.Opaque = th
w.collector.After(msg, instrument.TransformerTransform, err, start)
Expand Down
34 changes: 34 additions & 0 deletions pkg/instrument/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package instrument

// Allow to wrap multiple collectors as one collector
import (
"time"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
)

// MultiCollector allows you to compose a list of collector
type MultiCollector struct {
collectors []Collector
}

// NewMultiCollector is a constructor for multi collector
func NewMultiCollector(collectors ...Collector) MultiCollector {
return MultiCollector{
collectors: collectors,
}
}

// Before calls all Before method of inner collectors
func (m MultiCollector) Before(message *confluent.Message, action Action, start time.Time) {
for i := 0; i < len(m.collectors); i++ {
m.collectors[i].Before(message, action, start)
}
}

// After calls all After method of inner collectors
func (m MultiCollector) After(message *confluent.Message, action Action, err error, start time.Time) {
for i := 0; i < len(m.collectors); i++ {
m.collectors[i].After(message, action, err, start)
}
}
59 changes: 59 additions & 0 deletions pkg/instrument/multi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package instrument_test

import (
"testing"
"time"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/etf1/kafka-transformer/pkg/instrument"
)

// a dummy collector which appends 1 character eacy time a before/after is called
type dummyCollector struct {
before string
after string
result string
}

func newDummyCollector(before, after string) *dummyCollector {
return &dummyCollector{
before: before,
after: after,
}
}

func (dc *dummyCollector) Before(msg *confluent.Message, action instrument.Action, start time.Time) {
dc.result += dc.before
}

func (dc *dummyCollector) After(msg *confluent.Message, action instrument.Action, err error, start time.Time) {
dc.result += dc.after
}

// Rule #1: all collector should be called in sequence as passed in argument
func TestMultiCollector(t *testing.T) {
dc := newDummyCollector("a", "b")

mc := instrument.NewMultiCollector(dc)
mc.Before(nil, instrument.OverallTime, time.Now())
mc.After(nil, instrument.OverallTime, nil, time.Now())
if dc.result != "ab" {
t.Errorf("unexpected result: %q", dc.result)
}

dc = newDummyCollector("a", "b")
mc = instrument.NewMultiCollector(dc, dc)
mc.Before(nil, instrument.OverallTime, time.Now())
mc.After(nil, instrument.OverallTime, nil, time.Now())
if dc.result != "aabb" {
t.Errorf("unexpected result: %q", dc.result)
}

dc = newDummyCollector("b", "a")
mc = instrument.NewMultiCollector(dc, dc)
mc.Before(nil, instrument.OverallTime, time.Now())
mc.After(nil, instrument.OverallTime, nil, time.Now())
if dc.result != "bbaa" {
t.Errorf("unexpected result: %q", dc.result)
}
}

0 comments on commit 98b35fa

Please sign in to comment.