-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconverter.go
66 lines (57 loc) · 1.64 KB
/
converter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package confluent
import (
ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/alebabai/go-kafka"
)
// ConvertMessageToKafkaMessage transforms a [ckafka.Message] into a [kafka.Message].
func ConvertMessageToKafkaMessage(in ckafka.Message) kafka.Message {
var topic string
if in.TopicPartition.Topic != nil {
topic = *in.TopicPartition.Topic
}
hs := make([]kafka.Header, 0)
for _, h := range in.Headers {
hs = append(hs, ConvertHeaderToKafkaHeader(h))
}
return kafka.Message{
Key: in.Key,
Value: in.Value,
Topic: topic,
Partition: in.TopicPartition.Partition,
Offset: int64(in.TopicPartition.Offset),
Headers: hs,
Timestamp: in.Timestamp,
}
}
// ConvertHeaderToKafkaHeader transforms a [ckafka.Header] into a [kafka.Header].
func ConvertHeaderToKafkaHeader(in ckafka.Header) kafka.Header {
return kafka.Header{
Key: []byte(in.Key),
Value: in.Value,
}
}
// ConvertKafkaMessageToMessage transforms a [kafka.Message] into a [ckafka.Message].
func ConvertKafkaMessageToMessage(in kafka.Message) ckafka.Message {
hs := make([]ckafka.Header, 0)
for _, h := range in.Headers {
hs = append(hs, ConvertKafkaHeaderToHeader(h))
}
return ckafka.Message{
Key: in.Key,
Value: in.Value,
TopicPartition: ckafka.TopicPartition{
Topic: &in.Topic,
Partition: in.Partition,
Offset: ckafka.Offset(in.Offset),
},
Timestamp: in.Timestamp,
Headers: hs,
}
}
// ConvertKafkaHeaderToHeader transforms a [kafka.Header] into a [ckafka.Header].
func ConvertKafkaHeaderToHeader(in kafka.Header) ckafka.Header {
return ckafka.Header{
Key: string(in.Key),
Value: in.Value,
}
}