Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve schema registry support #12

Merged
merged 34 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1a8f11a
Improve schema registry
stewartboyd119 Sep 9, 2024
e7a63c0
Updated
stewartboyd119 Sep 9, 2024
e1eec3b
Updated
stewartboyd119 Sep 9, 2024
164f5ee
Updated
stewartboyd119 Sep 9, 2024
bb5ab13
Updated
stewartboyd119 Sep 9, 2024
27cb9bb
Updated
stewartboyd119 Sep 10, 2024
c8f6820
Updated tests
stewartboyd119 Sep 10, 2024
ce08024
Updated formatter
stewartboyd119 Sep 10, 2024
7f18652
Renamed formatter stuff
stewartboyd119 Sep 10, 2024
fc65e78
Privatized some stuff
stewartboyd119 Sep 21, 2024
3963574
Added some tests that will need implementation
stewartboyd119 Sep 21, 2024
355e8b4
Added proto schema registry support
stewartboyd119 Sep 22, 2024
98b8014
Updaated
stewartboyd119 Sep 22, 2024
fc6708a
Updaated
stewartboyd119 Sep 22, 2024
7cb678a
Added json schema registry support
stewartboyd119 Sep 22, 2024
78d3ac7
Updated
stewartboyd119 Sep 22, 2024
d0f2875
Added chema registry test
stewartboyd119 Sep 22, 2024
4e0da81
Added `Test_DeadletterClientDoesntCollideWithProducer`
stewartboyd119 Sep 22, 2024
14f6589
Updated tests to provide bootstrap in config
stewartboyd119 Sep 22, 2024
837a16e
Updated
stewartboyd119 Sep 22, 2024
4b9c184
Fixed linter
stewartboyd119 Sep 23, 2024
ac83851
Fixed linter warnings
stewartboyd119 Sep 23, 2024
8ac8970
Added lifecycle tests
stewartboyd119 Sep 23, 2024
9412096
Added tests which use Proto/JSON schema registry that run in pipeline
stewartboyd119 Sep 23, 2024
a4e4535
Added some comments
stewartboyd119 Sep 23, 2024
ac2ca68
Updated some stuff
stewartboyd119 Sep 23, 2024
4bc4a79
Updated
stewartboyd119 Sep 23, 2024
f25b0ca
Updated
stewartboyd119 Sep 23, 2024
554c0f9
Updated changelog and README.md
stewartboyd119 Sep 23, 2024
1fdba5b
Updated some tests
stewartboyd119 Sep 23, 2024
dba790b
Corrected error message assertion
stewartboyd119 Sep 23, 2024
18ef0e3
Updated spelling of fmtter -> formatter
stewartboyd119 Sep 23, 2024
7f11505
Executed code cleanup
stewartboyd119 Sep 23, 2024
f0923ca
Corrected some typos
stewartboyd119 Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
- name: Test
env:
KAFKA_BOOTSTRAP_SERVER: ${{ env.kafka_runner_address }}:9092
ENABLE_KAFKA_BROKER_TESTS: true
run: make cover

- name: Upload coverage reports to Codecov
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go.work.sum
.idea/
.run/
zk-multiple-kafka-multiple/
*.out
*.res
*.lsif
*.prof
Expand Down
14 changes: 6 additions & 8 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
run:
skip-dirs:
- docs
- datadog
- kustomize
skip-files:
- 'wire_gen.go'
tests: false
go: '1.22'
issues:
exclude-files:
- 'wire_gen.go'
exclude-dirs:
- docs
linters-settings:
errcheck:
check-type-assertions: true
Expand All @@ -14,8 +14,6 @@ linters-settings:
sections:
- standard
- default
gosimple:
go: '1.17'
depguard:
rules:
Main:
Expand Down
16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ setup:
# Assumes setup has been executed. Runs go test with coverage
.PHONY: cover
cover:
export GO_TAGS=--tags=integration; ./coverage.sh
./coverage.sh

# Runs setup and executes tests with coverage.
.PHONY: test-local
Expand Down Expand Up @@ -41,3 +41,17 @@ golangci-lint:
(cd $(mod) && \
echo "[lint] golangci-lint: $(mod)" && \
golangci-lint run --path-prefix $(mod) ./...) &&) true

.PHONY: gen
gen: protoc-exists
cd test/evolution; protoc --proto_path=. --go_out=./ ./schema_1.proto
cd test/evolution; protoc --proto_path=. --go_out=./ ./schema_2.proto
go run github.com/heetch/avro/cmd/[email protected] -p main -d ./example/producer_avro ./example/producer_avro/dummy_event.avsc
go run github.com/heetch/avro/cmd/[email protected] -p main -d ./example/worker_avro ./example/worker_avro/dummy_event.avsc
go run github.com/heetch/avro/cmd/[email protected] -p avro1 -d ./test/evolution/avro1 ./test/evolution/schema_1.avsc
go run github.com/heetch/avro/cmd/[email protected] -p avro2 -d ./test/evolution/avro2 ./test/evolution/schema_2.avsc

# a forced dependency which fails (and prints) if `avro-tools` isn't installed
.PHONY: protoc-exists
protoc-exists:
@which protoc > /dev/null || (echo "protoc is not installed. Install via `brew install protobuf`"; exit 1)
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,17 @@ special processing of these messages.

### SchemaRegistry Support:

There is limited support for schema registry in zkafka. A schemaID can be hardcoded via configuration. No
communication is done with schema registry, but some primitive checks can be conducted if a schemaID is specified via
configuration.
zkafka supports schema registry. It extends `zfmt` to enable this adding three `zfmt.FormatterType`:
```
AvroSchemaRegistry zfmt.FormatterType = "avro_schema_registry"
ProtoSchemaRegistry zfmt.FormatterType = "proto_schema_registry"
JSONSchemaRegistry zfmt.FormatterType = "json_schema_registry"
```

This can be used in ProducerTopicConfig/ConsumerTopicConfig just like the others. Examples have been added
`example/producer_avro` and `example/worker_avro` which demonstrate the additional configuration (mostly there to enable the
schema registry communication that's required)

Below is a breakdown of schema registry interactions into two subcategories. One is `Raw Handling` where the configurable
foramtter is bypassed entirely in favor of operating with the value byte arrays directly. The other is `Native Support` which
attempts to create confluent compatible serializations, without communicating with schema registry directly.

#### Producers

Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.1.0 (Sep 22, 2024)

1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry`
2. Added lifecycle function `LifecyclePostReadImmediate`
3. Added `workFactory.CreateWithFunc` which is a convenience work factory method for creating work using a callback instead of an interface (can reduce boilerplate) in some scenarios.
4. During the creation of readers/writers an error is now returned if bootstrap servers is empty


## 1.0.2 (Sep 6, 2024)

1. Updated `WithDeadLetterTopic` option to borrow username and password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig
Expand Down
126 changes: 92 additions & 34 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type Client struct {
tp trace.TracerProvider
p propagation.TextMapPropagator

// confluent dependencies
srf *schemaRegistryFactory

producerProvider confluentProducerProvider
consumerProvider confluentConsumerProvider
}

// NewClient instantiates a kafka client to get readers and writers
func NewClient(conf Config, opts ...Option) *Client {
srf := newSchemaRegistryFactory()
c := &Client{
conf: conf,
readers: make(map[string]*KReader),
Expand All @@ -51,6 +53,7 @@ func NewClient(conf Config, opts ...Option) *Client {

producerProvider: defaultConfluentProducerProvider{}.NewProducer,
consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer,
srf: srf,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -79,16 +82,26 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts
return r, nil
}

reader, err := newReader(c.conf, topicConfig, c.consumerProvider, c.logger, c.groupPrefix)
formatter, err := c.getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
})
if err != nil {
return nil, err
}
// copy settings from client first
reader.lifecycle = c.lifecycle

// overwrite options if given
for _, opt := range opts {
opt(reader)
reader, err := newReader(readerArgs{
cfg: c.conf,
cCfg: topicConfig,
consumerProvider: c.consumerProvider,
f: formatter,
l: c.logger,
prefix: c.groupPrefix,
hooks: c.lifecycle,
opts: opts,
})
if err != nil {
return nil, err
}
c.readers[topicConfig.ClientID] = reader
return c.readers[topicConfig.ClientID], nil
Expand All @@ -100,8 +113,9 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
if err != nil {
return nil, err
}
writerKey := getWriterKey(topicConfig)
c.mu.RLock()
w, exist := c.writers[topicConfig.ClientID]
w, exist := c.writers[writerKey]
if exist && !w.isClosed {
c.mu.RUnlock()
return w, nil
Expand All @@ -110,39 +124,36 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts

c.mu.Lock()
defer c.mu.Unlock()
w, exist = c.writers[topicConfig.ClientID]
w, exist = c.writers[writerKey]
if exist && !w.isClosed {
return w, nil
}
writer, err := newWriter(c.conf, topicConfig, c.producerProvider)
formatter, err := c.getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
})

if err != nil {
return nil, err
}
// copy settings from client first
writer.logger = c.logger
writer.tracer = getTracer(c.tp)
writer.p = c.p
writer.lifecycle = c.lifecycle

// overwrite options if given
for _, opt := range opts {
opt(writer)
writer, err := newWriter(writerArgs{
cfg: c.conf,
pCfg: topicConfig,
producerProvider: c.producerProvider,
f: formatter,
l: c.logger,
t: getTracer(c.tp),
p: c.p,
hooks: c.lifecycle,
opts: opts,
})
if err != nil {
return nil, err
}
c.writers[topicConfig.ClientID] = writer
return c.writers[topicConfig.ClientID], nil
}

func getFormatter(topicConfig TopicConfig) (zfmt.Formatter, error) {
switch topicConfig.GetFormatter() {
case CustomFmt:
return &noopFormatter{}, nil
default:
f, err := zfmt.GetFormatter(topicConfig.GetFormatter(), topicConfig.GetSchemaID())
if err != nil {
return nil, fmt.Errorf("unsupported formatter %s", topicConfig.GetFormatter())
}
return f, nil
}
c.writers[writerKey] = writer
return c.writers[writerKey], nil
}

// Close terminates all cached readers and writers gracefully.
Expand All @@ -165,9 +176,56 @@ func (c *Client) Close() error {
return err
}

func (c *Client) getFormatter(args formatterArgs) (kFormatter, error) {
formatter := args.formatter
schemaID := args.schemaID

switch formatter {
case AvroSchemaRegistry:
scl, err := c.srf.createAvro(args.srCfg)
if err != nil {
return nil, err
}
cf, err := newAvroSchemaRegistryFormatter(scl)
return cf, err
case ProtoSchemaRegistry:
scl, err := c.srf.createProto(args.srCfg)
if err != nil {
return nil, err
}
cf := newProtoSchemaRegistryFormatter(scl)
return cf, nil
case JSONSchemaRegistry:
scl, err := c.srf.createJson(args.srCfg)
if err != nil {
return nil, err
}
cf := newJsonSchemaRegistryFormatter(scl)
return cf, nil
case CustomFmt:
return &errFormatter{}, nil
default:
f, err := zfmt.GetFormatter(formatter, schemaID)
if err != nil {
return nil, fmt.Errorf("unsupported formatter %s", formatter)
}
return zfmtShim{F: f}, nil
}
}

func getTracer(tp trace.TracerProvider) trace.Tracer {
if tp == nil {
return nil
}
return tp.Tracer(instrumentationName, trace.WithInstrumentationVersion("v1.0.0"))
}

func getWriterKey(cfg ProducerTopicConfig) string {
return cfg.ClientID + "-" + cfg.Topic
}

type formatterArgs struct {
formatter zfmt.FormatterType
schemaID int
srCfg SchemaRegistryConfig
}
Loading