From fc65e78779a5f3b147a6e0db815a2d90db60e283 Mon Sep 17 00:00:00 2001 From: stewartboyd119 Date: Sat, 21 Sep 2024 08:38:08 -0700 Subject: [PATCH] Privatized some stuff --- client.go | 147 +++++++++++++++++++++++++++++---------- client_test.go | 104 ++++++++++++++++----------- config.go | 4 +- config_test.go | 6 +- formatter.go | 83 +++++++--------------- reader.go | 60 +++++++++++----- reader_test.go | 120 ++++++++++++++++++++++++++------ test/integration_test.go | 2 +- test/worker_test.go | 39 ++++------- testhelper.go | 63 +++++++++++------ work.go | 14 ++++ work_test.go | 24 +++++-- writer.go | 52 ++++++++++---- writer_test.go | 32 +++++---- 14 files changed, 494 insertions(+), 256 deletions(-) diff --git a/client.go b/client.go index 3a5bb9c..aac8b40 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,8 @@ import ( "sync" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avrov2" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -27,18 +29,22 @@ const instrumentationName = "github.com/zillow/zkafka" // Client helps instantiate usable readers and writers type Client struct { - mu sync.RWMutex - conf Config - readers map[string]*KReader - writers map[string]*KWriter - logger Logger - lifecycle LifecycleHooks - groupPrefix string - tp trace.TracerProvider - p propagation.TextMapPropagator - - mmu sync.Mutex - srCls map[string]schemaregistry.Client + mu sync.RWMutex + conf Config + readers map[string]Reader + writers map[string]Writer + logger Logger + lifecycle LifecycleHooks + groupPrefix string + tp trace.TracerProvider + p propagation.TextMapPropagator + srClProvider srProvider2 + //writerProvider writerProvider + //readerProvider readerProvider + + srf *schemaRegistryFactory + //mmu sync.Mutex + //srCls map[string]schemaregistry.Client // confluent dependencies producerProvider confluentProducerProvider @@ -47,15 +53,16 @@ type Client struct { // NewClient instantiates a kafka client to get readers and writers func NewClient(conf Config, opts ...Option) *Client { + srf := newSchemaRegistryFactory() c := &Client{ conf: conf, - readers: make(map[string]*KReader), - writers: make(map[string]*KWriter), - srCls: make(map[string]schemaregistry.Client), + readers: make(map[string]Reader), + writers: make(map[string]Writer), logger: NoopLogger{}, producerProvider: defaultConfluentProducerProvider{}.NewProducer, consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer, + srClProvider: srf.create, } for _, opt := range opts { opt(c) @@ -71,7 +78,12 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts } c.mu.RLock() r, exist := c.readers[topicConfig.ClientID] - if exist && !r.isClosed { + kr, ok := r.(*KReader) + // is kr -> isClosed = true -> true + // is kr -> isClosed = false -> false + // is not kr -> false + isClosed := ok && kr.isClosed + if exist && !isClosed { c.mu.RUnlock() return r, nil } @@ -80,20 +92,31 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts c.mu.Lock() defer c.mu.Unlock() r, exist = c.readers[topicConfig.ClientID] - if exist && !r.isClosed { + if exist && !isClosed { return r, nil } - reader, err := newReader(c.conf, topicConfig, c.consumerProvider, c.logger, c.groupPrefix, c.schemaCl) + formatter, err := getFormatter(formatterArgs{ + formatter: topicConfig.Formatter, + schemaID: topicConfig.SchemaID, + srCfg: topicConfig.SchemaRegistry, + getSR: c.srClProvider, + }) if err != nil { return nil, err } - // copy settings from client first - reader.lifecycle = c.lifecycle - - // overwrite options if given - for _, opt := range opts { - opt(reader) + reader, err := newReader(readerArgs{ + cfg: c.conf, + cCfg: topicConfig, + consumerProvider: c.consumerProvider, + f: formatter, + l: c.logger, + prefix: c.groupPrefix, + hooks: c.lifecycle, + opts: opts, + }) + if err != nil { + return nil, err } c.readers[topicConfig.ClientID] = reader return c.readers[topicConfig.ClientID], nil @@ -107,7 +130,9 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts } c.mu.RLock() w, exist := c.writers[topicConfig.ClientID] - if exist && !w.isClosed { + kr, ok := w.(*KWriter) + isClosed := ok && kr.isClosed + if exist && !isClosed { c.mu.RUnlock() return w, nil } @@ -116,23 +141,34 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts c.mu.Lock() defer c.mu.Unlock() w, exist = c.writers[topicConfig.ClientID] - if exist && !w.isClosed { + if exist && !isClosed { return w, nil } - writer, err := newWriter(c.conf, topicConfig, c.producerProvider, c.schemaCl) + formatter, err := getFormatter(formatterArgs{ + formatter: topicConfig.Formatter, + schemaID: topicConfig.SchemaID, + srCfg: topicConfig.SchemaRegistry, + getSR: c.srClProvider, + }) + + if err != nil { + return nil, err + } + writer, err := newWriter(writerArgs{ + cfg: c.conf, + pCfg: topicConfig, + producerProvider: c.producerProvider, + f: formatter, + l: c.logger, + t: getTracer(c.tp), + p: c.p, + hooks: c.lifecycle, + opts: opts, + }) if err != nil { return nil, err } - // copy settings from client first - writer.logger = c.logger - writer.tracer = getTracer(c.tp) - writer.p = c.p - writer.lifecycle = c.lifecycle - // overwrite options if given - for _, opt := range opts { - opt(writer) - } c.writers[topicConfig.ClientID] = writer return c.writers[topicConfig.ClientID], nil } @@ -157,7 +193,44 @@ func (c *Client) Close() error { return err } -func (c *Client) schemaCl(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) { +type schemaRegistryFactory struct { + mmu sync.Mutex + srCls map[string]schemaregistry.Client +} + +func newSchemaRegistryFactory() *schemaRegistryFactory { + return &schemaRegistryFactory{ + srCls: make(map[string]schemaregistry.Client), + } +} + +func (c *schemaRegistryFactory) create(srConfig SchemaRegistryConfig) (schemaRegistryCl, error) { + cl, err := c.getSchemaClient(srConfig) + if err != nil { + return nil, err + } + + deserConfig := avrov2.NewDeserializerConfig() + deser, err := avrov2.NewDeserializer(cl, serde.ValueSerde, deserConfig) + if err != nil { + return shim{}, fmt.Errorf("failed to create deserializer: %w", err) + } + + serConfig := avrov2.NewSerializerConfig() + serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas + serConfig.NormalizeSchemas = true + + ser, err := avrov2.NewSerializer(cl, serde.ValueSerde, serConfig) + if err != nil { + return shim{}, fmt.Errorf("failed to create serializer: %w", err) + } + return shim{ + ser: ser, + deser: deser, + }, nil +} + +func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) { url := srConfig.URL if url == "" { return nil, errors.New("no schema registry url provided") diff --git a/client_test.go b/client_test.go index c204f04..a03a482 100644 --- a/client_test.go +++ b/client_test.go @@ -33,8 +33,8 @@ func TestNewClient(t *testing.T) { { name: "empty config", want: &Client{ - readers: make(map[string]*KReader), - writers: make(map[string]*KWriter), + readers: make(map[string]Reader), + writers: make(map[string]Writer), logger: NoopLogger{}, producerProvider: defaultConfluentProducerProvider{}.NewProducer, consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer, @@ -51,8 +51,8 @@ func TestNewClient(t *testing.T) { conf: Config{ BootstrapServers: []string{"test"}, }, - readers: make(map[string]*KReader), - writers: make(map[string]*KWriter), + readers: make(map[string]Reader), + writers: make(map[string]Writer), logger: NoopLogger{}, producerProvider: defaultConfluentProducerProvider{}.NewProducer, consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer, @@ -124,8 +124,8 @@ func TestClient_WithOptions(t *testing.T) { func TestClient_Reader(t *testing.T) { type fields struct { conf Config - readers map[string]*KReader - writers map[string]*KWriter + readers map[string]Reader + writers map[string]Writer logger Logger producerProvider confluentProducerProvider consumerProvider confluentConsumerProvider @@ -146,7 +146,7 @@ func TestClient_Reader(t *testing.T) { name: "create new KReader with overridden Brokers, error from consumer provider", fields: fields{ consumerProvider: mockConfluentConsumerProvider{err: true}.NewConsumer, - readers: make(map[string]*KReader), + readers: make(map[string]Reader), }, args: args{ topicConfig: ConsumerTopicConfig{ @@ -162,7 +162,7 @@ func TestClient_Reader(t *testing.T) { name: "create new KReader with bad formatter", fields: fields{ consumerProvider: mockConfluentConsumerProvider{err: false}.NewConsumer, - readers: make(map[string]*KReader), + readers: make(map[string]Reader), }, args: args{ topicConfig: ConsumerTopicConfig{ @@ -178,8 +178,8 @@ func TestClient_Reader(t *testing.T) { { name: "create new KReader for closed KReader", fields: fields{ - readers: map[string]*KReader{ - "test-config": {isClosed: true}, + readers: map[string]Reader{ + "test-config": &KReader{isClosed: true}, }, consumerProvider: mockConfluentConsumerProvider{c: MockKafkaConsumer{ID: "stew"}}.NewConsumer, logger: NoopLogger{}, @@ -210,8 +210,8 @@ func TestClient_Reader(t *testing.T) { { name: "create new KReader for closed KReader with default overrides", fields: fields{ - readers: map[string]*KReader{ - "test-config": {isClosed: true}, + readers: map[string]Reader{ + "test-config": &KReader{isClosed: true}, }, consumerProvider: mockConfluentConsumerProvider{c: MockKafkaConsumer{ID: "stew"}}.NewConsumer, logger: NoopLogger{}, @@ -250,8 +250,8 @@ func TestClient_Reader(t *testing.T) { { name: "get from cache", fields: fields{ - readers: map[string]*KReader{ - "test-config": {}, + readers: map[string]Reader{ + "test-config": &KReader{}, }, }, args: args{ @@ -303,8 +303,8 @@ func TestClient_Reader(t *testing.T) { func TestClient_Writer(t *testing.T) { type fields struct { conf Config - readers map[string]*KReader - writers map[string]*KWriter + readers map[string]Reader + writers map[string]Writer logger Logger producerProvider confluentProducerProvider } @@ -324,7 +324,7 @@ func TestClient_Writer(t *testing.T) { name: "create new KWriter with overridden Brokers, error from producer provider", fields: fields{ producerProvider: mockConfluentProducerProvider{err: true}.NewProducer, - writers: make(map[string]*KWriter), + writers: make(map[string]Writer), conf: Config{ SaslUsername: ptr("test-user"), SaslPassword: ptr("test-password"), @@ -342,8 +342,8 @@ func TestClient_Writer(t *testing.T) { { name: "create new KWriter for closed writer", fields: fields{ - writers: map[string]*KWriter{ - "test-id": {isClosed: true}, + writers: map[string]Writer{ + "test-id": &KWriter{isClosed: true}, }, producerProvider: mockConfluentProducerProvider{}.NewProducer, logger: NoopLogger{}, @@ -371,8 +371,8 @@ func TestClient_Writer(t *testing.T) { { name: "create new KWriter for closed writer with default overrides", fields: fields{ - writers: map[string]*KWriter{ - "test-id": {isClosed: true}, + writers: map[string]Writer{ + "test-id": &KWriter{isClosed: true}, }, producerProvider: mockConfluentProducerProvider{}.NewProducer, logger: NoopLogger{}, @@ -407,8 +407,8 @@ func TestClient_Writer(t *testing.T) { { name: "get from cache", fields: fields{ - writers: map[string]*KWriter{ - "test-id": {}, + writers: map[string]Writer{ + "test-id": &KWriter{}, }, }, args: args{ @@ -453,20 +453,32 @@ func TestClient_Close(t *testing.T) { type fields struct { Mutex *sync.Mutex conf Config - readers map[string]*KReader - writers map[string]*KWriter + readers map[string]Reader + writers map[string]Writer } m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r1, err := newReader(Config{}, ConsumerTopicConfig{ - Formatter: zfmt.StringFmt, - }, m, &NoopLogger{}, "", nil) + r1, err := newReader(readerArgs{ + cfg: Config{}, + cCfg: ConsumerTopicConfig{ + Formatter: zfmt.StringFmt, + }, + consumerProvider: m, + f: zfmtShim{F: &zfmt.StringFormatter{}}, + l: &NoopLogger{}, + }) require.NoError(t, err) - r2, err := newReader(Config{}, ConsumerTopicConfig{ - Formatter: zfmt.StringFmt, - }, m, &NoopLogger{}, "", nil) + r2, err := newReader(readerArgs{ + cfg: Config{}, + cCfg: ConsumerTopicConfig{ + Formatter: zfmt.StringFmt, + }, + consumerProvider: m, + f: zfmtShim{F: &zfmt.StringFormatter{}}, + l: &NoopLogger{}, + }) require.NoError(t, err) tests := []struct { name string @@ -481,13 +493,13 @@ func TestClient_Close(t *testing.T) { name: "with readers/writers => no error", wantErr: true, fields: fields{ - readers: map[string]*KReader{ + readers: map[string]Reader{ "r1": r1, "r2": r2, }, - writers: map[string]*KWriter{ - "w1": {producer: p}, - "w2": {producer: p}, + writers: map[string]Writer{ + "w1": &KWriter{producer: p}, + "w2": &KWriter{producer: p}, }, }, }, @@ -507,10 +519,14 @@ func TestClient_Close(t *testing.T) { require.NoError(t, err) } for _, w := range c.writers { - require.True(t, w.isClosed, "clients writer should be closed") + kw, ok := w.(*KWriter) + require.True(t, ok, "Expected writer to be KWriter") + require.True(t, kw.isClosed, "clients writer should be closed") } - for _, reader := range c.readers { - require.True(t, reader.isClosed, "clients reader should be closed") + for _, r := range c.readers { + kr, ok := r.(*KReader) + require.True(t, ok, "Expected reader to be KReader") + require.True(t, kr.isClosed, "clients reader should be closed") } }) } @@ -588,7 +604,11 @@ func Test_getFormatter_Consumer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer recoverThenFail(t) - got, err := getFormatter(tt.args.topicConfig.Formatter, tt.args.topicConfig.SchemaID, SchemaRegistryConfig{}, nil) + args := formatterArgs{ + formatter: tt.args.topicConfig.Formatter, + schemaID: tt.args.topicConfig.SchemaID, + } + got, err := getFormatter(args) if tt.wantErr { require.Error(t, err) } else { @@ -625,7 +645,11 @@ func Test_getFormatter_Producer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer recoverThenFail(t) - got, err := getFormatter(tt.args.topicConfig.Formatter, tt.args.topicConfig.SchemaID, SchemaRegistryConfig{}, nil) + args := formatterArgs{ + formatter: tt.args.topicConfig.Formatter, + schemaID: tt.args.topicConfig.SchemaID, + } + got, err := getFormatter(args) if tt.wantErr { require.Error(t, err) } else { diff --git a/config.go b/config.go index 090b5da..fd093f6 100644 --- a/config.go +++ b/config.go @@ -80,7 +80,7 @@ type ConsumerTopicConfig struct { SchemaRegistry SchemaRegistryConfig - // SchemaID defines the schema registered with Confluent Schema Registry + // SchemaID defines the schema registered with Confluent schema Registry // Default value is 0, and it implies that both Writer and Reader do not care about schema validation // and should encode/decode the message based on data type provided. // Currently, this only works with SchematizedAvroFormatter @@ -176,7 +176,7 @@ type ProducerTopicConfig struct { SchemaRegistry SchemaRegistryConfig - // SchemaID defines the schema registered with Confluent Schema Registry + // SchemaID defines the schema registered with Confluent schema Registry // Default value is 0, and it implies that both Writer and Reader do not care about schema validation // and should encode/decode the message based on data type provided. // Currently, this only works with SchematizedAvroFormatter diff --git a/config_test.go b/config_test.go index f30f255..12cf25c 100644 --- a/config_test.go +++ b/config_test.go @@ -28,7 +28,7 @@ func Test_getDefaultConsumerTopicConfig(t *testing.T) { wantErr: true, }, { - name: "missing required field (Topic) => error", + name: "missing required field (topic) => error", args: args{conf: &ConsumerTopicConfig{ GroupID: "test_group", ClientID: "test", @@ -36,7 +36,7 @@ func Test_getDefaultConsumerTopicConfig(t *testing.T) { wantErr: true, }, { - name: "missing required non empty fields (Topic and or Topics) => error", + name: "missing required non empty fields (topic and or Topics) => error", args: args{conf: &ConsumerTopicConfig{ GroupID: "test_group", ClientID: "test", @@ -126,7 +126,7 @@ func Test_getDefaultProducerTopicConfig(t *testing.T) { wantErr bool }{ { - name: "missing required field (Topic) => error", + name: "missing required field (topic) => error", args: args{conf: &ProducerTopicConfig{ ClientID: "test", }}, diff --git a/formatter.go b/formatter.go index f74fb0b..4ee1c5d 100644 --- a/formatter.go +++ b/formatter.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" - "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avrov2" "github.com/zillow/zfmt" ) @@ -24,15 +23,12 @@ type Formatter interface { Unmarshal(b []byte, v any) error } -//type confluentFormatter interface { -// marshall(topic string, v any, schema string) ([]byte, error) -// unmarshal(topic string, b []byte, v any) error -//} - type marshReq struct { - topic string + topic string + // subject is the data to be marshalled subject any - schema string + // schema + schema string } type unmarshReq struct { @@ -61,14 +57,26 @@ func (f zfmtShim) unmarshal(req unmarshReq) error { return f.F.Unmarshal(req.data, req.target) } -func getFormatter(formatter zfmt.FormatterType, schemaID int, srCfg SchemaRegistryConfig, getSR srProvider) (kFormatter, error) { +type formatterArgs struct { + formatter zfmt.FormatterType + schemaID int + srCfg SchemaRegistryConfig + getSR srProvider2 +} + +func getFormatter(args formatterArgs) (kFormatter, error) { + formatter := args.formatter + schemaID := args.schemaID + switch formatter { case AvroConfluentFmt: - cl, err := getSR(srCfg) + srCfg := args.srCfg + getSR := args.getSR + scl, err := getSR(srCfg) if err != nil { return nil, err } - cf, err := newAvroSchemaRegistryFormatter(cl, srCfg) + cf, err := NewAvroSchemaRegistryFormatter(scl) return cf, err case CustomFmt: return &errFormatter{}, nil @@ -95,53 +103,14 @@ func (f errFormatter) unmarshal(req unmarshReq) error { return errMissingFmtter } -//var _ confluentFormatter = (*avroSchemaRegistryFormatter)(nil) - type avroSchemaRegistryFormatter struct { schemaRegistryCl schemaRegistryCl - //deser *avrov2.Deserializer - //ser *avrov2.Serializer - f zfmt.SchematizedAvroFormatter -} - -// func newAvroConfig(srConfig SchemaRegistryConfig) (*avro.SerializerConfig, *avro.DeserializerConfig, error) { -// url := srConfig.URL -// if url == "" { -// return nil, nil, errors.New("no schema registry url provided") -// } -// deserConfig := avrov2.NewDeserializerConfig() -// -// serConfig := avrov2.NewSerializerConfig() -// serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas -// serConfig.NormalizeSchemas = true -// -// } -func newAvroSchemaRegistryFormatter(cl schemaregistry.Client, srConfig SchemaRegistryConfig) (avroSchemaRegistryFormatter, error) { - url := srConfig.URL - if url == "" { - return avroSchemaRegistryFormatter{}, errors.New("no schema registry url provided") - } - - deserConfig := avrov2.NewDeserializerConfig() - deser, err := avrov2.NewDeserializer(cl, serde.ValueSerde, deserConfig) - if err != nil { - return avroSchemaRegistryFormatter{}, fmt.Errorf("failed to create deserializer: %w", err) - } - - serConfig := avrov2.NewSerializerConfig() - serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas - serConfig.NormalizeSchemas = true + f zfmt.SchematizedAvroFormatter +} - ser, err := avrov2.NewSerializer(cl, serde.ValueSerde, serConfig) - if err != nil { - return avroSchemaRegistryFormatter{}, fmt.Errorf("failed to create serializer: %w", err) - } - shimcl := shim{ - ser: ser, - deser: deser, - } +func NewAvroSchemaRegistryFormatter(shimCl schemaRegistryCl) (avroSchemaRegistryFormatter, error) { return avroSchemaRegistryFormatter{ - schemaRegistryCl: shimcl, + schemaRegistryCl: shimCl, }, nil } @@ -162,7 +131,7 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { } func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { - err := f.schemaRegistryCl.DeserializeInto(req.topic, req.data, &req.target) + err := f.schemaRegistryCl.Deserialize(req.topic, req.data, &req.target) if err != nil { return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err) } @@ -171,7 +140,7 @@ func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { type schemaRegistryCl interface { GetID(topic string, avroSchema string) (int, error) - DeserializeInto(topic string, value []byte, target any) error + Deserialize(topic string, value []byte, target any) error } var _ schemaRegistryCl = (*shim)(nil) @@ -185,6 +154,6 @@ func (s shim) GetID(topic string, avroSchema string) (int, error) { return s.ser.GetID(topic, nil, &schemaregistry.SchemaInfo{Schema: avroSchema}) } -func (s shim) DeserializeInto(topic string, value []byte, target any) error { +func (s shim) Deserialize(topic string, value []byte, target any) error { return s.deser.DeserializeInto(topic, value, target) } diff --git a/reader.go b/reader.go index bb08d5c..dcf9e7b 100644 --- a/reader.go +++ b/reader.go @@ -11,7 +11,6 @@ import ( "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" ) //// go:generate mockgen -destination=./mocks/mock_metrics.go -source=reader.go @@ -52,28 +51,52 @@ type KReader struct { tCommitMgr *topicCommitMgr } -type srProvider func(_ SchemaRegistryConfig) (schemaregistry.Client, error) +type srProvider2 func(_ SchemaRegistryConfig) (schemaRegistryCl, error) + +type readerArgs struct { + cfg Config + cCfg ConsumerTopicConfig + consumerProvider confluentConsumerProvider + f kFormatter + l Logger + prefix string + hooks LifecycleHooks + opts []ReaderOption +} + +//type readerProvider func(args readerArgs) (Reader, error) // newReader makes a new reader based on the configurations -func newReader(conf Config, topicConfig ConsumerTopicConfig, provider confluentConsumerProvider, logger Logger, prefix string, srProvider srProvider) (*KReader, error) { +func newReader(args readerArgs) (*KReader, error) { + conf := args.cfg + topicConfig := args.cCfg + prefix := args.prefix + provider := args.consumerProvider + formatter := args.f + logger := args.l + confluentConfig := makeConsumerConfig(conf, topicConfig, prefix) consumer, err := provider(confluentConfig) if err != nil { return nil, err } - formatter, err := getFormatter(topicConfig.Formatter, topicConfig.SchemaID, topicConfig.SchemaRegistry, srProvider) - if err != nil { - return nil, err - } - - return &KReader{ + r := &KReader{ consumer: consumer, - fmtter: formatter, topicConfig: topicConfig, + fmtter: formatter, logger: logger, + lifecycle: args.hooks, tCommitMgr: newTopicCommitMgr(), - }, nil + } + s := ReaderSettings{} + for _, opt := range args.opts { + opt(&s) + } + if s.fmtter != nil { + r.fmtter = s.fmtter + } + return r, nil } // Read consumes a single message at a time. Blocks until a message is returned or some @@ -92,8 +115,9 @@ func (r *KReader) Read(ctx context.Context) (*Message, error) { } kmsg, err := r.consumer.ReadMessage(time.Duration(*r.topicConfig.ReadTimeoutMillis) * time.Millisecond) if err != nil { - switch v := err.(type) { - case kafka.Error: + var v kafka.Error + switch { + case errors.As(err, &v): // timeouts occur (because the assigned partitions aren't being written to, lack of activity, etc.). We'll // log them for debugging purposes if v.Code() == kafka.ErrTimedOut { @@ -289,14 +313,18 @@ func getTopicName(topicName *string) string { return topic } +type ReaderSettings struct { + fmtter kFormatter +} + // ReaderOption is a function that modify the KReader configurations -type ReaderOption func(*KReader) +type ReaderOption func(*ReaderSettings) // RFormatterOption sets the formatter for this reader func RFormatterOption(fmtter Formatter) ReaderOption { - return func(r *KReader) { + return func(s *ReaderSettings) { if fmtter != nil { - r.fmtter = zfmtShim{F: fmtter} + s.fmtter = zfmtShim{F: fmtter} } } } diff --git a/reader_test.go b/reader_test.go index f38e01f..a1aa7a7 100644 --- a/reader_test.go +++ b/reader_test.go @@ -31,7 +31,14 @@ func TestReader_Read_NilReturn(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, _ := newReader(Config{}, topicConfig, m, &NoopLogger{}, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + } + r, err := newReader(args) + require.NoError(t, err) got, err := r.Read(context.TODO()) require.NoError(t, err) @@ -59,7 +66,17 @@ func TestReader_Read(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &NoopLogger{}, "", nil) + f, err := getFormatter(formatterArgs{formatter: topicConfig.Formatter}) + require.NoError(t, err) + + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + f: f, + } + r, err := newReader(args) require.NoError(t, err) got, err := r.Read(context.TODO()) @@ -96,7 +113,13 @@ func TestReader_Read_Error(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &NoopLogger{}, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + } + r, err := newReader(args) require.NoError(t, err) got, err := r.Read(context.TODO()) @@ -129,7 +152,13 @@ func TestReader_Read_TimeoutError(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &NoopLogger{}, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + } + r, err := newReader(args) require.NoError(t, err) got, err := r.Read(context.TODO()) @@ -149,7 +178,13 @@ func TestReader_Read_SubscriberError(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &NoopLogger{}, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + } + r, err := newReader(args) require.NoError(t, err) _, err = r.Read(context.TODO()) @@ -170,7 +205,13 @@ func TestReader_Read_CloseError(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &l, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &l, + } + r, err := newReader(args) require.NoError(t, err) err = r.Close() @@ -191,7 +232,13 @@ func TestReader_ReadWhenConnectionIsClosed(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &NoopLogger{}, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + } + r, err := newReader(args) require.NoError(t, err) err = r.Close() @@ -221,16 +268,16 @@ func Test_newReader(t *testing.T) { }, wantErr: false, }, - { - name: "invalid formatter", - args: args{ - consumeProvider: defaultConfluentConsumerProvider{}.NewConsumer, - topicConfig: ConsumerTopicConfig{ - Formatter: zfmt.FormatterType("invalid_fmt"), - }, - }, - wantErr: true, - }, + //{ + // name: "invalid formatter", + // args: args{ + // consumeProvider: defaultConfluentConsumerProvider{}.NewConsumer, + // topicConfig: ConsumerTopicConfig{ + // Formatter: zfmt.FormatterType("invalid_fmt"), + // }, + // }, + // wantErr: true, + //}, { name: "valid formatter but has error when creating NewConsumer", args: args{ @@ -252,7 +299,14 @@ func Test_newReader(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer recoverThenFail(t) - _, err := newReader(tt.args.conf, tt.args.topicConfig, tt.args.consumeProvider, &NoopLogger{}, "", nil) + args := readerArgs{ + cfg: tt.args.conf, + cCfg: tt.args.topicConfig, + consumerProvider: tt.args.consumeProvider, + l: &NoopLogger{}, + } + _, err := newReader(args) + if tt.wantErr { require.Error(t, err) } else { @@ -284,7 +338,13 @@ func Test_ProcessMessage(t *testing.T) { m := mockConfluentConsumerProvider{ c: mock_confluent.NewMockKafkaConsumer(ctrl), }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &l, "", nil) + args := readerArgs{ + cfg: Config{}, + cCfg: topicConfig, + consumerProvider: m, + l: &NoopLogger{}, + } + r, err := newReader(args) require.NoError(t, err) got := r.mapMessage(context.Background(), dupMessage) @@ -317,7 +377,13 @@ func Test_ProcessMultipleMessagesFromDifferentTopics_UpdatesInternalStateProperl m := mockConfluentConsumerProvider{ c: mock_confluent.NewMockKafkaConsumer(ctrl), }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &l, "", nil) + + args := readerArgs{ + cCfg: topicConfig, + consumerProvider: m, + l: &l, + } + r, err := newReader(args) require.NoError(t, err) for _, msg := range msgs { @@ -359,7 +425,12 @@ func Test_ProcessMessage_StoreOffsetError(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &l, "", nil) + args := readerArgs{ + cCfg: topicConfig, + consumerProvider: m, + l: &l, + } + r, err := newReader(args) require.NoError(t, err) mgr := newTopicCommitMgr() @@ -407,7 +478,12 @@ func Test_ProcessMessage_SetError(t *testing.T) { m := mockConfluentConsumerProvider{ c: mockConsumer, }.NewConsumer - r, err := newReader(Config{}, topicConfig, m, &l, "", nil) + args := readerArgs{ + cCfg: topicConfig, + consumerProvider: m, + l: &l, + } + r, err := newReader(args) require.NoError(t, err) mgr := newTopicCommitMgr() diff --git a/test/integration_test.go b/test/integration_test.go index 8301a67..227cc26 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -409,7 +409,7 @@ func Test_WithMultipleTopics_RebalanceDoesntCauseDuplicateMessages(t *testing.T) Val: "sdfds", } - t.Log("Begin writing to Test Topic") + t.Log("Begin writing to Test topic") // write N messages to topic1 msgCount := tc.messageCount for i := 0; i < msgCount; i++ { diff --git a/test/worker_test.go b/test/worker_test.go index cab8e52..d1334c9 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -82,8 +82,7 @@ func TestWork_Run_FailsWithLogsWhenGotNilReader(t *testing.T) { l.EXPECT().Warnw(gomock.Any(), "Kafka worker read message failed", "error", gomock.Any(), "topics", gomock.Any()).Times(1) l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(nil, nil) + kcp := zkafka.FakeClient{R: nil} ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -111,8 +110,7 @@ func TestWork_Run_FailsWithLogsForReadError(t *testing.T) { r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Times(1).Return(nil, errors.New("error occurred during read")) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -139,8 +137,7 @@ func TestWork_Run_CircuitBreakerOpensOnReadError(t *testing.T) { r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(nil, errors.New("error occurred during read")) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) @@ -192,8 +189,7 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) { r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).AnyTimes().Return(r, nil) + kcp := zkafka.FakeClient{R: r} kproc := &fakeProcessor{ process: func(ctx context.Context, message *zkafka.Message) error { @@ -251,8 +247,7 @@ func TestWork_Run_DoNotSkipCircuitBreak(t *testing.T) { r.EXPECT().Read(gomock.Any()).Return(failureMessage, nil).AnyTimes() - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).AnyTimes().Return(r, nil) + kcp := zkafka.FakeClient{R: r} kproc := &fakeProcessor{ process: func(ctx context.Context, message *zkafka.Message) error { @@ -314,8 +309,7 @@ func TestWork_Run_DoSkipCircuitBreak(t *testing.T) { r.EXPECT().Read(gomock.Any()).Return(failureMessage, nil).AnyTimes() - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).AnyTimes().Return(r, nil) + kcp := zkafka.FakeClient{R: r} kproc := fakeProcessor{ process: func(ctx context.Context, message *zkafka.Message) error { @@ -376,8 +370,7 @@ func TestWork_Run_CircuitBreaksOnProcessPanicInsideProcessorGoRoutine(t *testing r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).AnyTimes().Return(r, nil) + kcp := zkafka.FakeClient{R: r} kproc := &fakeProcessor{ process: func(ctx context.Context, message *zkafka.Message) error { @@ -442,8 +435,7 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) { r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).MinTimes(4).Return(nil, errors.New("error occurred on read")) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) @@ -1162,8 +1154,7 @@ func TestWork_Run_OnDoneCallbackCalledOnProcessorError(t *testing.T) { r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) @@ -1224,8 +1215,7 @@ func TestWork_Run_WritesMetrics(t *testing.T) { r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).MinTimes(1).Return(msg, nil) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} lhMtx := sync.Mutex{} lhState := FakeLifecycleState{ @@ -1286,8 +1276,7 @@ func TestWork_LifecycleHooksCalledForEachItem_Reader(t *testing.T) { r.EXPECT().Read(gomock.Any()).AnyTimes().Return(nil, nil), ) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} lhMtx := sync.Mutex{} lhState := FakeLifecycleState{ @@ -1349,8 +1338,7 @@ func TestWork_LifecycleHooksPostReadCanUpdateContext(t *testing.T) { r.EXPECT().Read(gomock.Any()).AnyTimes().Return(nil, nil), ) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} lhMtx := sync.Mutex{} lhState := FakeLifecycleState{ @@ -1410,8 +1398,7 @@ func TestWork_LifecycleHooksPostReadErrorDoesntHaltProcessing(t *testing.T) { r.EXPECT().Read(gomock.Any()).AnyTimes().Return(nil, nil), ) - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(r, nil) + kcp := zkafka.FakeClient{R: r} lhMtx := sync.Mutex{} lhState := FakeLifecycleState{ diff --git a/testhelper.go b/testhelper.go index d17e228..1ce4b3e 100644 --- a/testhelper.go +++ b/testhelper.go @@ -45,47 +45,66 @@ type FakeMessage struct { } // GetMsgFromFake allows the construction of a Message object (allowing the specification of some private fields). -func GetMsgFromFake(msg *FakeMessage) *Message { - if msg == nil { +func GetMsgFromFake(input *FakeMessage) *Message { + if input == nil { return nil } key := "" - if msg.Key != nil { - key = *msg.Key + if input.Key != nil { + key = *input.Key } timeStamp := time.Now() - if !msg.TimeStamp.IsZero() { - timeStamp = msg.TimeStamp + if !input.TimeStamp.IsZero() { + timeStamp = input.TimeStamp } doneFunc := func(ctx context.Context) {} - if msg.DoneFunc != nil { - doneFunc = msg.DoneFunc + if input.DoneFunc != nil { + doneFunc = input.DoneFunc } var val []byte - if msg.Value != nil { - val = msg.Value + if input.Value != nil { + val = input.Value } - if msg.ValueData != nil { + if input.ValueData != nil { //nolint:errcheck // To simplify this helper function's api, we'll suppress marshalling errors. - val, _ = msg.Fmt.Marshall(msg.ValueData) + val, _ = input.Fmt.Marshall(input.ValueData) } return &Message{ Key: key, - isKeyNil: msg.Key == nil, - Headers: msg.Headers, - Offset: msg.Offset, - Partition: msg.Partition, - Topic: msg.Topic, - GroupID: msg.GroupID, + isKeyNil: input.Key == nil, + Headers: input.Headers, + Offset: input.Offset, + Partition: input.Partition, + Topic: input.Topic, + GroupID: input.GroupID, TimeStamp: timeStamp, value: val, topicPartition: kafka.TopicPartition{ - Topic: &msg.Topic, - Partition: msg.Partition, - Offset: kafka.Offset(msg.Offset), + Topic: &input.Topic, + Partition: input.Partition, + Offset: kafka.Offset(input.Offset), }, - fmt: zfmtShim{F: msg.Fmt}, + fmt: zfmtShim{F: input.Fmt}, doneFunc: doneFunc, doneOnce: sync.Once{}, } } + +var _ ClientProvider = (*FakeClient)(nil) + +type FakeClient struct { + R Reader + W Writer +} + +func (f FakeClient) Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) { + return f.R, nil +} + +func (f FakeClient) Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) { + return f.W, nil +} + +func (f FakeClient) Close() error { + return nil +} diff --git a/work.go b/work.go index 1a5de2a..3ac17e9 100644 --- a/work.go +++ b/work.go @@ -645,6 +645,10 @@ func NewWorkFactory( return factory } +func (f WorkFactory) CreateWithFunc(topicConfig ConsumerTopicConfig, p func(_ context.Context, msg *Message) error, options ...WorkOption) *Work { + return f.Create(topicConfig, processorAdapter{p: p}, options...) +} + // Create creates a new Work instance. func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor, options ...WorkOption) *Work { work := &Work{ @@ -825,3 +829,13 @@ func (c *delayCalculator) remaining(targetDelay time.Duration, msgTimeStamp time // this piece makes sure the return isn't possibly greater than the target return min(targetDelay-observedDelay, targetDelay) } + +var _ processor = (*processorAdapter)(nil) + +type processorAdapter struct { + p func(_ context.Context, msg *Message) error +} + +func (a processorAdapter) Process(ctx context.Context, message *Message) error { + return a.p(ctx, message) +} diff --git a/work_test.go b/work_test.go index 9eb7024..f6ac8cd 100644 --- a/work_test.go +++ b/work_test.go @@ -783,14 +783,17 @@ func (m *timeDelayProcessor) Process(_ context.Context, message *Message) error return nil } -type mockClientProvider struct{} +type mockClientProvider struct { + r Reader + w Writer +} -func (mockClientProvider) Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) { - return nil, nil +func (m mockClientProvider) Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) { + return m.r, nil } -func (mockClientProvider) Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) { - return nil, nil +func (m mockClientProvider) Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) { + return m.w, nil } func (mockClientProvider) Close() error { @@ -815,3 +818,14 @@ type workSettings struct { func (w *workSettings) ShutdownSig() <-chan struct{} { return w.shutdownSig } + +type fakeProcessor struct { + process func(context.Context, *Message) error +} + +func (p *fakeProcessor) Process(ctx context.Context, msg *Message) error { + if p.process != nil { + return p.process(ctx, msg) + } + return nil +} diff --git a/writer.go b/writer.go index 01a19ec..8569edc 100644 --- a/writer.go +++ b/writer.go @@ -59,23 +59,47 @@ type keyValuePair struct { value any } -func newWriter(conf Config, topicConfig ProducerTopicConfig, producer confluentProducerProvider, srProvider srProvider) (*KWriter, error) { +type writerArgs struct { + cfg Config + pCfg ProducerTopicConfig + producerProvider confluentProducerProvider + f kFormatter + l Logger + t trace.Tracer + p propagation.TextMapPropagator + hooks LifecycleHooks + opts []WriterOption +} + +func newWriter(args writerArgs) (*KWriter, error) { + conf := args.cfg + topicConfig := args.pCfg + producer := args.producerProvider + formatter := args.f + confluentConfig := makeProducerConfig(conf, topicConfig) p, err := producer(confluentConfig) if err != nil { return nil, err } - formatter, err := getFormatter(topicConfig.Formatter, topicConfig.SchemaID, topicConfig.SchemaRegistry, srProvider) - if err != nil { - return nil, err - } - return &KWriter{ + w := &KWriter{ producer: p, - fmtter: formatter, topicConfig: topicConfig, - logger: NoopLogger{}, - }, nil + fmtter: formatter, + logger: args.l, + tracer: args.t, + p: args.p, + lifecycle: args.hooks, + } + s := WriterSettings{} + for _, opt := range args.opts { + opt(&s) + } + if s.fmtter != nil { + w.fmtter = s.fmtter + } + return w, nil } // Write sends messages to kafka with message key set as nil. @@ -222,14 +246,18 @@ func (w *KWriter) Close() { w.isClosed = true } +type WriterSettings struct { + fmtter kFormatter +} + // WriterOption is a function that modify the writer configurations -type WriterOption func(*KWriter) +type WriterOption func(*WriterSettings) // WFormatterOption sets the formatter for this writer func WFormatterOption(fmtter Formatter) WriterOption { - return func(w *KWriter) { + return func(s *WriterSettings) { if fmtter != nil { - w.fmtter = zfmtShim{F: fmtter} + s.fmtter = zfmtShim{F: fmtter} } } } diff --git a/writer_test.go b/writer_test.go index ccbe0d7..798d949 100644 --- a/writer_test.go +++ b/writer_test.go @@ -513,16 +513,16 @@ func Test_newWriter(t *testing.T) { }, wantErr: false, }, - { - name: "invalid formatter", - args: args{ - producerP: defaultConfluentProducerProvider{}.NewProducer, - topicConfig: ProducerTopicConfig{ - Formatter: zfmt.FormatterType("invalid_fmt"), - }, - }, - wantErr: true, - }, + //{ + // name: "invalid formatter", + // args: args{ + // producerP: defaultConfluentProducerProvider{}.NewProducer, + // topicConfig: ProducerTopicConfig{ + // Formatter: zfmt.FormatterType("invalid_fmt"), + // }, + // }, + // wantErr: true, + //}, { name: "valid formatter but has error from confluent producer constructor", args: args{ @@ -544,7 +544,12 @@ func Test_newWriter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { recoverThenFail(t) - w, err := newWriter(tt.args.conf, tt.args.topicConfig, tt.args.producerP, nil) + args := writerArgs{ + cfg: tt.args.conf, + pCfg: tt.args.topicConfig, + producerProvider: tt.args.producerP, + } + w, err := newWriter(args) if tt.wantErr { require.Error(t, err, "expected error for newWriter()") } else { @@ -560,8 +565,9 @@ func TestWriter_WithOptions(t *testing.T) { w := &KWriter{} require.Nil(t, w.fmtter, "expected nil formatter") - WFormatterOption(&zfmt.StringFormatter{})(w) - require.NotNil(t, w.fmtter, "expected non-nil formatter") + settings := WriterSettings{} + WFormatterOption(&zfmt.StringFormatter{})(&settings) + require.NotNil(t, settings.fmtter, "expected non-nil formatter") } func Test_writeAttributeCarrier_Set(t *testing.T) {