Skip to content

Commit

Permalink
Add context to transform and project operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ldechoux committed Apr 4, 2024
1 parent 4ac7726 commit ac7db73
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 26 deletions.
4 changes: 3 additions & 1 deletion examples/custom_collector/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -52,7 +53,8 @@ func main() {
exitchan := make(chan bool, 1)

go func() {
if err := transformer.Run(); err != nil {
ctx := context.Background()
if err := transformer.Run(ctx); err != nil {
log.Printf("failed to start transformer: %v", err)
}
exitchan <- true
Expand Down
3 changes: 2 additions & 1 deletion examples/custom_collector/projector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"sync/atomic"

kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -10,7 +11,7 @@ type customProjector struct {
counter uint32
}

func (c *customProjector) Project(msg *kafka.Message) {
func (c *customProjector) Project(ctx context.Context, msg *kafka.Message) {
atomic.AddUint32(&c.counter, 1)
/*
if c.counter%2 == 0 {
Expand Down
4 changes: 3 additions & 1 deletion examples/custom_projector/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -42,7 +43,8 @@ func main() {
exitchan := make(chan bool, 1)

go func() {
if err = transformer.Run(); err != nil {
ctx := context.Background()
if err = transformer.Run(ctx); err != nil {
log.Printf("failed to start transformer: %v", err)
}
exitchan <- true
Expand Down
3 changes: 2 additions & 1 deletion examples/custom_projector/redis_projector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"log"
"strconv"
"time"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (r RedisProjector) Close() error {
}

// Project implements transformer.Projector interface
func (r RedisProjector) Project(msg *kafka.Message) {
func (r RedisProjector) Project(ctx context.Context, msg *kafka.Message) {

if len(msg.Value) == 0 {
return
Expand Down
3 changes: 2 additions & 1 deletion examples/custom_transformer/header_transformer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"time"

kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -12,7 +13,7 @@ type headerTransformer struct {
}

// Add a custom header x-app-id to the message
func (ht headerTransformer) Transform(src *kafka.Message) []*kafka.Message {
func (ht headerTransformer) Transform(ctx context.Context, src *kafka.Message) []*kafka.Message {
topic := "custom-transformer"
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Expand Down
4 changes: 3 additions & 1 deletion examples/custom_transformer/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -46,7 +47,8 @@ func main() {
exitchan := make(chan bool, 1)

go func() {
if err = transformer.Run(); err != nil {
ctx := context.Background()
if err = transformer.Run(ctx); err != nil {
log.Printf("failed to start transformer: %v", err)
}
exitchan <- true
Expand Down
14 changes: 8 additions & 6 deletions internal/test/transformer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package test

import (
"context"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/etf1/kafka-transformer/pkg/transformer"
)
Expand All @@ -16,12 +18,12 @@ func NewUnstableTransformer() transformer.Transformer {
}
}

func (ut unstableTransformer) Transform(msg *kafka.Message) []*kafka.Message {
func (ut unstableTransformer) Transform(ctx context.Context, msg *kafka.Message) []*kafka.Message {
if string(msg.Value) == "panic" {
panic("panic from transformer")
}

return ut.passthrough.Transform(msg)
return ut.passthrough.Transform(ctx, msg)
}

type duplicatorTransformer struct {
Expand All @@ -35,8 +37,8 @@ func NewDuplicatorTransformer() transformer.Transformer {
}
}

func (dt duplicatorTransformer) Transform(msg *kafka.Message) []*kafka.Message {
result := dt.passthrough.Transform(msg)
func (dt duplicatorTransformer) Transform(ctx context.Context, msg *kafka.Message) []*kafka.Message {
result := dt.passthrough.Transform(ctx, msg)
// return 2 times the same message, for testing duplication
return append(result, result[0])
}
Expand All @@ -52,8 +54,8 @@ func NewOpaqueTransformer() transformer.Transformer {
}
}

func (ot opaqueTransformer) Transform(msg *kafka.Message) []*kafka.Message {
result := ot.passthrough.Transform(msg)
func (ot opaqueTransformer) Transform(ctx context.Context, msg *kafka.Message) []*kafka.Message {
result := ot.passthrough.Transform(ctx, msg)
result[0].Opaque = "opaque"
return append(result, result[0])
}
3 changes: 2 additions & 1 deletion internal/transformer/kafka/producer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"log"
"time"

Expand Down Expand Up @@ -59,7 +60,7 @@ func NewProducer(l logger.Log, config *confluent.ConfigMap, collector instrument
}

// Project implements the Projector interface
func (p Producer) Project(msg *confluent.Message) {
func (p Producer) Project(ctx context.Context, msg *confluent.Message) {
p.collectBefore(msg)
p.producer.ProduceChannel() <- msg
}
Expand Down
5 changes: 3 additions & 2 deletions internal/transformer/projector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transformer

import (
"context"
"log"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -30,7 +31,7 @@ func NewProjector(log logger.Log, projector pkg.Projector, collector instrument.
}

// Run starts the projector goroutine
func (p *Projector) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) {
func (p *Projector) Run(ctx context.Context, wg *sync.WaitGroup, inChan chan *confluent.Message) {
go func() {
defer log.Println("projector stopped")
defer wg.Done()
Expand All @@ -53,7 +54,7 @@ func (p *Projector) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) {
start = time.Now()
th = p.collectBefore(msg, start)

p.projector.Project(msg)
p.projector.Project(ctx, msg)

p.collectAfter(msg, nil, start, th)

Expand Down
5 changes: 3 additions & 2 deletions internal/transformer/transformer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transformer

import (
"context"
"errors"
"log"
"sync"
Expand Down Expand Up @@ -33,7 +34,7 @@ func NewTransformer(log logger.Log, transformer transformer.Transformer, bufferS
}

// Run will start the transformer process
func (t *Transformer) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) chan *confluent.Message {
func (t *Transformer) Run(ctx context.Context, wg *sync.WaitGroup, inChan chan *confluent.Message) chan *confluent.Message {
outChan := make(chan *confluent.Message, t.bufferSize)
workers := newWorkers(t.log, t.bufferSize, inChan, t.transformer, t.workerTimeout, t.collector)

Expand All @@ -43,7 +44,7 @@ func (t *Transformer) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) ch
defer close(outChan)
defer log.Println("stopping transformer")

workers.Run(outChan)
workers.Run(ctx, outChan)
}()

return outChan
Expand Down
5 changes: 3 additions & 2 deletions internal/transformer/workers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transformer

import (
"context"
"log"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -63,7 +64,7 @@ func flushChunk(resultChan chan *confluent.Message, c chunk, size int) {
}

// Run starts parallel processing of messages
func (w Workers) Run(resultChan chan *confluent.Message) {
func (w Workers) Run(ctx context.Context, resultChan chan *confluent.Message) {
log.Println("starting transformer workers")

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -97,7 +98,7 @@ loop:

w.log.Debugf("worker: #%v, message received %v, working...", index, msg)

chunk[index] = w.transformer.Transform(msg)
chunk[index] = w.transformer.Transform(ctx, msg)
w.injectTimeHolder(chunk[index], th)
w.collectAfter(msg, nil, start, th)

Expand Down
8 changes: 4 additions & 4 deletions pkg/transformer/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -35,7 +36,6 @@ type Transformer struct {
producer *kafka.Producer
transformer *internal.Transformer
projector *internal.Projector
config Config
wg *sync.WaitGroup
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (k Transformer) Stop() {
}

// Run will start the Transformer with all the components (consumer, transformer, producer)
func (k Transformer) Run() error {
func (k Transformer) Run(ctx context.Context) error {
log.Println("starting kafka transformer ...")

k.wg.Add(3)
Expand All @@ -138,11 +138,11 @@ func (k Transformer) Run() error {

// Then transformer
log.Println("starting transformer ...")
transformerChan := k.transformer.Run(k.wg, consumerChan)
transformerChan := k.transformer.Run(ctx, k.wg, consumerChan)

// Finally, producer
log.Println("starting projector ...")
k.projector.Run(k.wg, transformerChan)
k.projector.Run(ctx, k.wg, transformerChan)

k.wg.Wait()

Expand Down
4 changes: 3 additions & 1 deletion pkg/transformer/passthrough.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package transformer

import (
"context"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

type passThrough struct{}

// Transform a kafka Message
func (p passThrough) Transform(src *kafka.Message) []*kafka.Message {
func (p passThrough) Transform(ctx context.Context, src *kafka.Message) []*kafka.Message {
topic := *src.TopicPartition.Topic + "-passthrough"

msg := &kafka.Message{
Expand Down
4 changes: 3 additions & 1 deletion pkg/transformer/projector.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package transformer

import (
"context"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Projector is an interface which is used by Kafka.Transformer
// in order to project a kafka Message (to kafka or an external system)
// If you want to customize the projection, this is the interface to implement
type Projector interface {
Project(message *kafka.Message)
Project(ctx context.Context, message *kafka.Message)
}
4 changes: 3 additions & 1 deletion pkg/transformer/transformer.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package transformer

import (
"context"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Transformer is an interface which is used by Kafka.Transformer
// in order to transform a kafka Message.
// If nil is returned the message will be ignored
type Transformer interface {
Transform(src *kafka.Message) []*kafka.Message
Transform(ctx context.Context, src *kafka.Message) []*kafka.Message
}

0 comments on commit ac7db73

Please sign in to comment.