Skip to content

Commit

Permalink
Update go v1.22 and confluent-kafka-go v2.3.0 (#16) (#18)
Browse files Browse the repository at this point in the history
* Migrate to go1.15 and confluent-kafka-go v1.5.2 (#10)

* update confluent-kafka-go dependency to v1.5.2

* fix build

* fix tests

* add logs

* Fix worflow

* Fix integration test (#9)

* use environnement variable to define boostrap server

* Update to go1.22 and confluente-kafka-go v2.3.0

* Update actions/checkout

* Update run_tests.sh

* Update run_tests.sh

* Update to kcat

* Add context to transform and project operations

---------

Co-authored-by: Laurent Dechoux <[email protected]>
Co-authored-by: Vincent Composieux <[email protected]>
Co-authored-by: Fatih KARAKAŞ <[email protected]>
  • Loading branch information
4 people authored Jul 3, 2024
1 parent c0f5f1c commit 5959cb4
Show file tree
Hide file tree
Showing 42 changed files with 257 additions and 90 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Build
run: make tests.docker
4 changes: 2 additions & 2 deletions Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ ARG GOLANG_VERSION
FROM golang:$GOLANG_VERSION-alpine

RUN echo $GOLANG_VERSION
RUN apk update && apk add --no-cache git bash make gcc libc-dev librdkafka-dev pkgconf openssh netcat-openbsd curl kafkacat
RUN go get -u golang.org/x/lint/golint
RUN apk update && apk add --no-cache git bash make gcc libc-dev librdkafka-dev pkgconf openssh netcat-openbsd curl kcat
RUN go install golang.org/x/lint/golint@latest
RUN mkdir -p /transformer/tests

WORKDIR /transformer/tests
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GOLANG_VERSION?=1.17
GOLANG_VERSION?=1.22

.PHONY: dev.up
dev.up:
Expand Down
8 changes: 5 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ services:
dockerfile: Dockerfile.test
command: ["bash", "./scripts/run_tests.sh"]
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:29092
- KAFKA_BOOTSTRAP_SERVER=broker:29092
depends_on:
- kafka

zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- 2181:2181
environment:
Expand All @@ -20,6 +21,7 @@ services:

kafka:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
Expand All @@ -30,7 +32,7 @@ services:
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
6 changes: 4 additions & 2 deletions examples/custom_collector/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package main

import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/etf1/kafka-transformer/pkg/instrument"
"github.com/etf1/kafka-transformer/pkg/transformer/kafka"

Expand Down Expand Up @@ -52,7 +53,8 @@ func main() {
exitchan := make(chan bool, 1)

go func() {
if err := transformer.Run(); err != nil {
ctx := context.Background()
if err := transformer.Run(ctx); err != nil {
log.Printf("failed to start transformer: %v", err)
}
exitchan <- true
Expand Down
5 changes: 3 additions & 2 deletions examples/custom_collector/projector.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package main

import (
"context"
"sync/atomic"

kafka "github.com/confluentinc/confluent-kafka-go/kafka"
kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

type customProjector struct {
counter uint32
}

func (c *customProjector) Project(msg *kafka.Message) {
func (c *customProjector) Project(ctx context.Context, msg *kafka.Message) {
atomic.AddUint32(&c.counter, 1)
/*
if c.counter%2 == 0 {
Expand Down
2 changes: 1 addition & 1 deletion examples/custom_collector/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"
"time"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/etf1/kafka-transformer/pkg/instrument"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down
6 changes: 4 additions & 2 deletions examples/custom_projector/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/etf1/kafka-transformer/pkg/transformer/kafka"
)

Expand Down Expand Up @@ -42,7 +43,8 @@ func main() {
exitchan := make(chan bool, 1)

go func() {
if err = transformer.Run(); err != nil {
ctx := context.Background()
if err = transformer.Run(ctx); err != nil {
log.Printf("failed to start transformer: %v", err)
}
exitchan <- true
Expand Down
5 changes: 3 additions & 2 deletions examples/custom_projector/redis_projector.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package main

import (
"context"
"log"
"strconv"
"time"

kafka "github.com/confluentinc/confluent-kafka-go/kafka"
kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-redis/redis/v7"
)

Expand Down Expand Up @@ -47,7 +48,7 @@ func (r RedisProjector) Close() error {
}

// Project implements transformer.Projector interface
func (r RedisProjector) Project(msg *kafka.Message) {
func (r RedisProjector) Project(ctx context.Context, msg *kafka.Message) {

if len(msg.Value) == 0 {
return
Expand Down
5 changes: 3 additions & 2 deletions examples/custom_transformer/header_transformer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main

import (
"context"
"time"

kafka "github.com/confluentinc/confluent-kafka-go/kafka"
kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/etf1/kafka-transformer/pkg/logger"
)

Expand All @@ -12,7 +13,7 @@ type headerTransformer struct {
}

// Add a custom header x-app-id to the message
func (ht headerTransformer) Transform(src *kafka.Message) []*kafka.Message {
func (ht headerTransformer) Transform(ctx context.Context, src *kafka.Message) []*kafka.Message {
topic := "custom-transformer"
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Expand Down
6 changes: 4 additions & 2 deletions examples/custom_transformer/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/etf1/kafka-transformer/pkg/transformer/kafka"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -46,7 +47,8 @@ func main() {
exitchan := make(chan bool, 1)

go func() {
if err = transformer.Run(); err != nil {
ctx := context.Background()
if err = transformer.Run(ctx); err != nil {
log.Printf("failed to start transformer: %v", err)
}
exitchan <- true
Expand Down
2 changes: 2 additions & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:

zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- 2181:2181
environment:
Expand All @@ -16,6 +17,7 @@ services:

kafka:
image: confluentinc/cp-kafka
hostname: kafka
depends_on:
- zookeeper
ports:
Expand Down
18 changes: 15 additions & 3 deletions examples/go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
module github.com/etf1/kafka-transformer/examples

go 1.15
go 1.22

require (
github.com/confluentinc/confluent-kafka-go v1.5.2
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/etf1/kafka-transformer v0.0.0-20200327090708-353621d904e9
github.com/go-redis/redis/v7 v7.2.0
github.com/prometheus/client_golang v1.5.1
github.com/sirupsen/logrus v1.4.2
github.com/sirupsen/logrus v1.8.1
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.9.1 // indirect
github.com/prometheus/procfs v0.0.8 // indirect
golang.org/x/sys v0.6.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

replace github.com/etf1/kafka-transformer => ../
Loading

0 comments on commit 5959cb4

Please sign in to comment.