Skip to content

Commit

Permalink
Privatized some stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
stewartboyd119 committed Sep 21, 2024
1 parent 7f18652 commit fc65e78
Show file tree
Hide file tree
Showing 14 changed files with 494 additions and 256 deletions.
147 changes: 110 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"

"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avrov2"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -27,18 +29,22 @@ const instrumentationName = "github.com/zillow/zkafka"

// Client helps instantiate usable readers and writers
type Client struct {
mu sync.RWMutex
conf Config
readers map[string]*KReader
writers map[string]*KWriter
logger Logger
lifecycle LifecycleHooks
groupPrefix string
tp trace.TracerProvider
p propagation.TextMapPropagator

mmu sync.Mutex
srCls map[string]schemaregistry.Client
mu sync.RWMutex
conf Config
readers map[string]Reader
writers map[string]Writer
logger Logger
lifecycle LifecycleHooks
groupPrefix string
tp trace.TracerProvider
p propagation.TextMapPropagator
srClProvider srProvider2
//writerProvider writerProvider
//readerProvider readerProvider

srf *schemaRegistryFactory

Check failure on line 45 in client.go

View workflow job for this annotation

GitHub Actions / Lint

field `srf` is unused (unused)
//mmu sync.Mutex
//srCls map[string]schemaregistry.Client

// confluent dependencies
producerProvider confluentProducerProvider
Expand All @@ -47,15 +53,16 @@ type Client struct {

// NewClient instantiates a kafka client to get readers and writers
func NewClient(conf Config, opts ...Option) *Client {
srf := newSchemaRegistryFactory()
c := &Client{
conf: conf,
readers: make(map[string]*KReader),
writers: make(map[string]*KWriter),
srCls: make(map[string]schemaregistry.Client),
readers: make(map[string]Reader),
writers: make(map[string]Writer),
logger: NoopLogger{},

producerProvider: defaultConfluentProducerProvider{}.NewProducer,
consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer,
srClProvider: srf.create,
}
for _, opt := range opts {
opt(c)
Expand All @@ -71,7 +78,12 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts
}
c.mu.RLock()
r, exist := c.readers[topicConfig.ClientID]
if exist && !r.isClosed {
kr, ok := r.(*KReader)
// is kr -> isClosed = true -> true
// is kr -> isClosed = false -> false
// is not kr -> false
isClosed := ok && kr.isClosed
if exist && !isClosed {
c.mu.RUnlock()
return r, nil
}
Expand All @@ -80,20 +92,31 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts
c.mu.Lock()
defer c.mu.Unlock()
r, exist = c.readers[topicConfig.ClientID]
if exist && !r.isClosed {
if exist && !isClosed {
return r, nil
}

reader, err := newReader(c.conf, topicConfig, c.consumerProvider, c.logger, c.groupPrefix, c.schemaCl)
formatter, err := getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
getSR: c.srClProvider,
})
if err != nil {
return nil, err
}
// copy settings from client first
reader.lifecycle = c.lifecycle

// overwrite options if given
for _, opt := range opts {
opt(reader)
reader, err := newReader(readerArgs{
cfg: c.conf,
cCfg: topicConfig,
consumerProvider: c.consumerProvider,
f: formatter,
l: c.logger,
prefix: c.groupPrefix,
hooks: c.lifecycle,
opts: opts,
})
if err != nil {
return nil, err
}
c.readers[topicConfig.ClientID] = reader
return c.readers[topicConfig.ClientID], nil
Expand All @@ -107,7 +130,9 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
}
c.mu.RLock()
w, exist := c.writers[topicConfig.ClientID]
if exist && !w.isClosed {
kr, ok := w.(*KWriter)
isClosed := ok && kr.isClosed
if exist && !isClosed {
c.mu.RUnlock()
return w, nil
}
Expand All @@ -116,23 +141,34 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
c.mu.Lock()
defer c.mu.Unlock()
w, exist = c.writers[topicConfig.ClientID]
if exist && !w.isClosed {
if exist && !isClosed {
return w, nil
}
writer, err := newWriter(c.conf, topicConfig, c.producerProvider, c.schemaCl)
formatter, err := getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
getSR: c.srClProvider,
})

if err != nil {
return nil, err
}
writer, err := newWriter(writerArgs{
cfg: c.conf,
pCfg: topicConfig,
producerProvider: c.producerProvider,
f: formatter,
l: c.logger,
t: getTracer(c.tp),
p: c.p,
hooks: c.lifecycle,
opts: opts,
})
if err != nil {
return nil, err
}
// copy settings from client first
writer.logger = c.logger
writer.tracer = getTracer(c.tp)
writer.p = c.p
writer.lifecycle = c.lifecycle

// overwrite options if given
for _, opt := range opts {
opt(writer)
}
c.writers[topicConfig.ClientID] = writer
return c.writers[topicConfig.ClientID], nil
}
Expand All @@ -157,7 +193,44 @@ func (c *Client) Close() error {
return err
}

func (c *Client) schemaCl(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) {
type schemaRegistryFactory struct {
mmu sync.Mutex

Check failure on line 197 in client.go

View workflow job for this annotation

GitHub Actions / Lint

field `mmu` is unused (unused)
srCls map[string]schemaregistry.Client
}

func newSchemaRegistryFactory() *schemaRegistryFactory {
return &schemaRegistryFactory{
srCls: make(map[string]schemaregistry.Client),
}
}

func (c *schemaRegistryFactory) create(srConfig SchemaRegistryConfig) (schemaRegistryCl, error) {
cl, err := c.getSchemaClient(srConfig)
if err != nil {
return nil, err
}

deserConfig := avrov2.NewDeserializerConfig()
deser, err := avrov2.NewDeserializer(cl, serde.ValueSerde, deserConfig)
if err != nil {
return shim{}, fmt.Errorf("failed to create deserializer: %w", err)
}

serConfig := avrov2.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
serConfig.NormalizeSchemas = true

ser, err := avrov2.NewSerializer(cl, serde.ValueSerde, serConfig)
if err != nil {
return shim{}, fmt.Errorf("failed to create serializer: %w", err)
}
return shim{
ser: ser,
deser: deser,
}, nil
}

func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) {
url := srConfig.URL
if url == "" {
return nil, errors.New("no schema registry url provided")
Expand Down
Loading

0 comments on commit fc65e78

Please sign in to comment.