From ac7db731cb68cd593060bfab245ce4a90d9f9d09 Mon Sep 17 00:00:00 2001 From: ldechoux <34473609+ldechoux@users.noreply.github.com> Date: Thu, 4 Apr 2024 09:27:00 +0200 Subject: [PATCH] Add context to transform and project operations --- examples/custom_collector/main.go | 4 +++- examples/custom_collector/projector.go | 3 ++- examples/custom_projector/main.go | 4 +++- examples/custom_projector/redis_projector.go | 3 ++- examples/custom_transformer/header_transformer.go | 3 ++- examples/custom_transformer/main.go | 4 +++- internal/test/transformer.go | 14 ++++++++------ internal/transformer/kafka/producer.go | 3 ++- internal/transformer/projector.go | 5 +++-- internal/transformer/transformer.go | 5 +++-- internal/transformer/workers.go | 5 +++-- pkg/transformer/kafka/kafka.go | 8 ++++---- pkg/transformer/passthrough.go | 4 +++- pkg/transformer/projector.go | 4 +++- pkg/transformer/transformer.go | 4 +++- 15 files changed, 47 insertions(+), 26 deletions(-) diff --git a/examples/custom_collector/main.go b/examples/custom_collector/main.go index c5ca3c8..3c18578 100644 --- a/examples/custom_collector/main.go +++ b/examples/custom_collector/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "net/http" "os" @@ -52,7 +53,8 @@ func main() { exitchan := make(chan bool, 1) go func() { - if err := transformer.Run(); err != nil { + ctx := context.Background() + if err := transformer.Run(ctx); err != nil { log.Printf("failed to start transformer: %v", err) } exitchan <- true diff --git a/examples/custom_collector/projector.go b/examples/custom_collector/projector.go index 81243ce..e1f2f2f 100644 --- a/examples/custom_collector/projector.go +++ b/examples/custom_collector/projector.go @@ -1,6 +1,7 @@ package main import ( + "context" "sync/atomic" kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -10,7 +11,7 @@ type customProjector struct { counter uint32 } -func (c *customProjector) Project(msg *kafka.Message) { +func (c *customProjector) Project(ctx context.Context, msg *kafka.Message) { atomic.AddUint32(&c.counter, 1) /* if c.counter%2 == 0 { diff --git a/examples/custom_projector/main.go b/examples/custom_projector/main.go index 0800a51..55acfae 100644 --- a/examples/custom_projector/main.go +++ b/examples/custom_projector/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "os" "os/signal" @@ -42,7 +43,8 @@ func main() { exitchan := make(chan bool, 1) go func() { - if err = transformer.Run(); err != nil { + ctx := context.Background() + if err = transformer.Run(ctx); err != nil { log.Printf("failed to start transformer: %v", err) } exitchan <- true diff --git a/examples/custom_projector/redis_projector.go b/examples/custom_projector/redis_projector.go index 4245db9..b5d5d35 100644 --- a/examples/custom_projector/redis_projector.go +++ b/examples/custom_projector/redis_projector.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "strconv" "time" @@ -47,7 +48,7 @@ func (r RedisProjector) Close() error { } // Project implements transformer.Projector interface -func (r RedisProjector) Project(msg *kafka.Message) { +func (r RedisProjector) Project(ctx context.Context, msg *kafka.Message) { if len(msg.Value) == 0 { return diff --git a/examples/custom_transformer/header_transformer.go b/examples/custom_transformer/header_transformer.go index c69f09c..76fd939 100644 --- a/examples/custom_transformer/header_transformer.go +++ b/examples/custom_transformer/header_transformer.go @@ -1,6 +1,7 @@ package main import ( + "context" "time" kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -12,7 +13,7 @@ type headerTransformer struct { } // Add a custom header x-app-id to the message -func (ht headerTransformer) Transform(src *kafka.Message) []*kafka.Message { +func (ht headerTransformer) Transform(ctx context.Context, src *kafka.Message) []*kafka.Message { topic := "custom-transformer" msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ diff --git a/examples/custom_transformer/main.go b/examples/custom_transformer/main.go index 16d703e..93ab059 100644 --- a/examples/custom_transformer/main.go +++ b/examples/custom_transformer/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "os" "os/signal" @@ -46,7 +47,8 @@ func main() { exitchan := make(chan bool, 1) go func() { - if err = transformer.Run(); err != nil { + ctx := context.Background() + if err = transformer.Run(ctx); err != nil { log.Printf("failed to start transformer: %v", err) } exitchan <- true diff --git a/internal/test/transformer.go b/internal/test/transformer.go index 7069cef..00d22d7 100644 --- a/internal/test/transformer.go +++ b/internal/test/transformer.go @@ -1,6 +1,8 @@ package test import ( + "context" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/etf1/kafka-transformer/pkg/transformer" ) @@ -16,12 +18,12 @@ func NewUnstableTransformer() transformer.Transformer { } } -func (ut unstableTransformer) Transform(msg *kafka.Message) []*kafka.Message { +func (ut unstableTransformer) Transform(ctx context.Context, msg *kafka.Message) []*kafka.Message { if string(msg.Value) == "panic" { panic("panic from transformer") } - return ut.passthrough.Transform(msg) + return ut.passthrough.Transform(ctx, msg) } type duplicatorTransformer struct { @@ -35,8 +37,8 @@ func NewDuplicatorTransformer() transformer.Transformer { } } -func (dt duplicatorTransformer) Transform(msg *kafka.Message) []*kafka.Message { - result := dt.passthrough.Transform(msg) +func (dt duplicatorTransformer) Transform(ctx context.Context, msg *kafka.Message) []*kafka.Message { + result := dt.passthrough.Transform(ctx, msg) // return 2 times the same message, for testing duplication return append(result, result[0]) } @@ -52,8 +54,8 @@ func NewOpaqueTransformer() transformer.Transformer { } } -func (ot opaqueTransformer) Transform(msg *kafka.Message) []*kafka.Message { - result := ot.passthrough.Transform(msg) +func (ot opaqueTransformer) Transform(ctx context.Context, msg *kafka.Message) []*kafka.Message { + result := ot.passthrough.Transform(ctx, msg) result[0].Opaque = "opaque" return append(result, result[0]) } diff --git a/internal/transformer/kafka/producer.go b/internal/transformer/kafka/producer.go index fade582..a8afb4c 100644 --- a/internal/transformer/kafka/producer.go +++ b/internal/transformer/kafka/producer.go @@ -1,6 +1,7 @@ package kafka import ( + "context" "log" "time" @@ -59,7 +60,7 @@ func NewProducer(l logger.Log, config *confluent.ConfigMap, collector instrument } // Project implements the Projector interface -func (p Producer) Project(msg *confluent.Message) { +func (p Producer) Project(ctx context.Context, msg *confluent.Message) { p.collectBefore(msg) p.producer.ProduceChannel() <- msg } diff --git a/internal/transformer/projector.go b/internal/transformer/projector.go index e80c3df..fe1b57e 100644 --- a/internal/transformer/projector.go +++ b/internal/transformer/projector.go @@ -1,6 +1,7 @@ package transformer import ( + "context" "log" "runtime/debug" "sync" @@ -30,7 +31,7 @@ func NewProjector(log logger.Log, projector pkg.Projector, collector instrument. } // Run starts the projector goroutine -func (p *Projector) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) { +func (p *Projector) Run(ctx context.Context, wg *sync.WaitGroup, inChan chan *confluent.Message) { go func() { defer log.Println("projector stopped") defer wg.Done() @@ -53,7 +54,7 @@ func (p *Projector) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) { start = time.Now() th = p.collectBefore(msg, start) - p.projector.Project(msg) + p.projector.Project(ctx, msg) p.collectAfter(msg, nil, start, th) diff --git a/internal/transformer/transformer.go b/internal/transformer/transformer.go index 1a731f3..8bde9ee 100644 --- a/internal/transformer/transformer.go +++ b/internal/transformer/transformer.go @@ -1,6 +1,7 @@ package transformer import ( + "context" "errors" "log" "sync" @@ -33,7 +34,7 @@ func NewTransformer(log logger.Log, transformer transformer.Transformer, bufferS } // Run will start the transformer process -func (t *Transformer) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) chan *confluent.Message { +func (t *Transformer) Run(ctx context.Context, wg *sync.WaitGroup, inChan chan *confluent.Message) chan *confluent.Message { outChan := make(chan *confluent.Message, t.bufferSize) workers := newWorkers(t.log, t.bufferSize, inChan, t.transformer, t.workerTimeout, t.collector) @@ -43,7 +44,7 @@ func (t *Transformer) Run(wg *sync.WaitGroup, inChan chan *confluent.Message) ch defer close(outChan) defer log.Println("stopping transformer") - workers.Run(outChan) + workers.Run(ctx, outChan) }() return outChan diff --git a/internal/transformer/workers.go b/internal/transformer/workers.go index cab94a0..70f0b8d 100644 --- a/internal/transformer/workers.go +++ b/internal/transformer/workers.go @@ -1,6 +1,7 @@ package transformer import ( + "context" "log" "runtime/debug" "sync" @@ -63,7 +64,7 @@ func flushChunk(resultChan chan *confluent.Message, c chunk, size int) { } // Run starts parallel processing of messages -func (w Workers) Run(resultChan chan *confluent.Message) { +func (w Workers) Run(ctx context.Context, resultChan chan *confluent.Message) { log.Println("starting transformer workers") wg := sync.WaitGroup{} @@ -97,7 +98,7 @@ loop: w.log.Debugf("worker: #%v, message received %v, working...", index, msg) - chunk[index] = w.transformer.Transform(msg) + chunk[index] = w.transformer.Transform(ctx, msg) w.injectTimeHolder(chunk[index], th) w.collectAfter(msg, nil, start, th) diff --git a/pkg/transformer/kafka/kafka.go b/pkg/transformer/kafka/kafka.go index 9493437..51d28de 100644 --- a/pkg/transformer/kafka/kafka.go +++ b/pkg/transformer/kafka/kafka.go @@ -1,6 +1,7 @@ package kafka import ( + "context" "fmt" "log" "sync" @@ -35,7 +36,6 @@ type Transformer struct { producer *kafka.Producer transformer *internal.Transformer projector *internal.Projector - config Config wg *sync.WaitGroup } @@ -124,7 +124,7 @@ func (k Transformer) Stop() { } // Run will start the Transformer with all the components (consumer, transformer, producer) -func (k Transformer) Run() error { +func (k Transformer) Run(ctx context.Context) error { log.Println("starting kafka transformer ...") k.wg.Add(3) @@ -138,11 +138,11 @@ func (k Transformer) Run() error { // Then transformer log.Println("starting transformer ...") - transformerChan := k.transformer.Run(k.wg, consumerChan) + transformerChan := k.transformer.Run(ctx, k.wg, consumerChan) // Finally, producer log.Println("starting projector ...") - k.projector.Run(k.wg, transformerChan) + k.projector.Run(ctx, k.wg, transformerChan) k.wg.Wait() diff --git a/pkg/transformer/passthrough.go b/pkg/transformer/passthrough.go index e5222a1..b47f0e0 100644 --- a/pkg/transformer/passthrough.go +++ b/pkg/transformer/passthrough.go @@ -1,13 +1,15 @@ package transformer import ( + "context" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type passThrough struct{} // Transform a kafka Message -func (p passThrough) Transform(src *kafka.Message) []*kafka.Message { +func (p passThrough) Transform(ctx context.Context, src *kafka.Message) []*kafka.Message { topic := *src.TopicPartition.Topic + "-passthrough" msg := &kafka.Message{ diff --git a/pkg/transformer/projector.go b/pkg/transformer/projector.go index a8b6459..f988f53 100644 --- a/pkg/transformer/projector.go +++ b/pkg/transformer/projector.go @@ -1,6 +1,8 @@ package transformer import ( + "context" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) @@ -8,5 +10,5 @@ import ( // in order to project a kafka Message (to kafka or an external system) // If you want to customize the projection, this is the interface to implement type Projector interface { - Project(message *kafka.Message) + Project(ctx context.Context, message *kafka.Message) } diff --git a/pkg/transformer/transformer.go b/pkg/transformer/transformer.go index fb6e848..230c91a 100644 --- a/pkg/transformer/transformer.go +++ b/pkg/transformer/transformer.go @@ -1,6 +1,8 @@ package transformer import ( + "context" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) @@ -8,5 +10,5 @@ import ( // in order to transform a kafka Message. // If nil is returned the message will be ignored type Transformer interface { - Transform(src *kafka.Message) []*kafka.Message + Transform(ctx context.Context, src *kafka.Message) []*kafka.Message }