From e73a2c8a0b8f6d17eae0453b8bf20d0d833bc1a2 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim <23424570+weeco@users.noreply.github.com> Date: Fri, 28 Feb 2025 22:09:27 +0000 Subject: [PATCH] backend: fix remaining references after config update --- backend/pkg/api/api_integration_test.go | 12 +- .../console/list_messages_integration_test.go | 2 +- backend/pkg/serde/service_integration_test.go | 106 +++++++++--------- 3 files changed, 60 insertions(+), 60 deletions(-) diff --git a/backend/pkg/api/api_integration_test.go b/backend/pkg/api/api_integration_test.go index 913348d63..3c42e643d 100644 --- a/backend/pkg/api/api_integration_test.go +++ b/backend/pkg/api/api_integration_test.go @@ -92,25 +92,25 @@ func (s *APIIntegrationTestSuite) SetupSuite() { }, } s.cfg.Kafka.Brokers = []string{s.testSeedBroker} - s.cfg.Kafka.Protobuf.Enabled = true + s.cfg.Serde.Protobuf.Enabled = true s.cfg.SchemaRegistry.Enabled = true s.cfg.SchemaRegistry.URLs = []string{registryAddr} // proto message mapping absProtoPath, err := filepath.Abs("../testutil/testdata/proto") require.NoError(err) - s.cfg.Kafka.Protobuf.Enabled = true + s.cfg.Serde.Protobuf.Enabled = true topic0 := config.RegexpOrLiteral{} topic0.UnmarshalText([]byte(testutil.TopicNameForTest("publish_messages_proto_plain"))) - s.cfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{ + s.cfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{ { TopicName: topic0, ValueProtoType: "testutil.things.v1.Item", }, } - s.cfg.Kafka.Protobuf.FileSystem.Enabled = true - s.cfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute - s.cfg.Kafka.Protobuf.FileSystem.Paths = []string{absProtoPath} + s.cfg.Serde.Protobuf.FileSystem.Enabled = true + s.cfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute + s.cfg.Serde.Protobuf.FileSystem.Paths = []string{absProtoPath} s.api = New(s.cfg) diff --git a/backend/pkg/console/list_messages_integration_test.go b/backend/pkg/console/list_messages_integration_test.go index 35b1d7914..9f6de4328 100644 --- a/backend/pkg/console/list_messages_integration_test.go +++ b/backend/pkg/console/list_messages_integration_test.go @@ -783,7 +783,7 @@ func createNewTestService(t *testing.T, log *zap.Logger, cfg.Kafka.Brokers = []string{seedBrokers} if registryAddr != "" { - cfg.Kafka.Protobuf.Enabled = true + cfg.Serde.Protobuf.Enabled = true cfg.SchemaRegistry.Enabled = true cfg.SchemaRegistry.URLs = []string{registryAddr} } diff --git a/backend/pkg/serde/service_integration_test.go b/backend/pkg/serde/service_integration_test.go index 95e2f3c3c..52c14bb5b 100644 --- a/backend/pkg/serde/service_integration_test.go +++ b/backend/pkg/serde/service_integration_test.go @@ -80,11 +80,11 @@ func (s *SerdeIntegrationTestSuite) createBaseConfig() config.Config { cfg.SetDefaults() cfg.MetricsNamespace = testutil.MetricNameForTest("serde") cfg.Kafka.Brokers = []string{s.seedBroker} - cfg.Kafka.Protobuf.Enabled = true + cfg.Serde.Protobuf.Enabled = true cfg.SchemaRegistry.Enabled = true cfg.SchemaRegistry.Enabled = true cfg.SchemaRegistry.URLs = []string{s.registryAddress} - cfg.Kafka.MessagePack.Enabled = false + cfg.Serde.MessagePack.Enabled = false return cfg } @@ -153,13 +153,13 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { logger, err := zap.NewProduction() require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) schemaClientFactory, err := schemafactory.NewSingleClientProvider(&cfg) @@ -546,18 +546,18 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { require.NoError(topic0.UnmarshalText([]byte(testTopicName))) testCfg := cfg - testCfg.Kafka.Protobuf.Enabled = true - testCfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{ + testCfg.Serde.Protobuf.Enabled = true + testCfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{ { TopicName: topic0, ValueProtoType: "shop.v1.Order", }, } - testCfg.Kafka.Protobuf.FileSystem.Enabled = true - testCfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute - testCfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"} + testCfg.Serde.Protobuf.FileSystem.Enabled = true + testCfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute + testCfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"} - protoSvc, err := protopkg.NewService(testCfg.Kafka.Protobuf, logger) + protoSvc, err := protopkg.NewService(testCfg.Serde.Protobuf, logger) require.NoError(err) require.NoError(protoSvc.Start()) @@ -704,18 +704,18 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { require.NoError(topic0.UnmarshalText([]byte(testTopicName))) testCfg := cfg - testCfg.Kafka.Protobuf.Enabled = true - testCfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{ + testCfg.Serde.Protobuf.Enabled = true + testCfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{ { TopicName: topic0, ValueProtoType: "shop.v2.Order", }, } - testCfg.Kafka.Protobuf.FileSystem.Enabled = true - testCfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute - testCfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"} + testCfg.Serde.Protobuf.FileSystem.Enabled = true + testCfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute + testCfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"} - testProtoSvc, err := protopkg.NewService(testCfg.Kafka.Protobuf, logger) + testProtoSvc, err := protopkg.NewService(testCfg.Serde.Protobuf, logger) require.NoError(err) require.NoError(testProtoSvc.Start()) @@ -2161,7 +2161,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { // proto schema rediscovery is on a timer... this forces a new refresh - protoSvc2, err := protopkg.NewService(cfg.Kafka.Protobuf, logger) + protoSvc2, err := protopkg.NewService(cfg.Serde.Protobuf, logger) require.NoError(err) err = protoSvc.Start() @@ -2567,13 +2567,13 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { logger, err := zap.NewProduction() require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) schemaClientFactory, err := schemafactory.NewSingleClientProvider(&cfg) @@ -2765,13 +2765,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -2811,16 +2811,16 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { topic0.UnmarshalText([]byte(testTopicName)) cfg := s.createBaseConfig() - cfg.Kafka.Protobuf.Enabled = true - cfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{ + cfg.Serde.Protobuf.Enabled = true + cfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{ { TopicName: topic0, ValueProtoType: "shop.v1.Order", }, } - cfg.Kafka.Protobuf.FileSystem.Enabled = true - cfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute - cfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"} + cfg.Serde.Protobuf.FileSystem.Enabled = true + cfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute + cfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"} logger, err := zap.NewProduction() require.NoError(err) @@ -2828,13 +2828,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -2885,16 +2885,16 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { topic0.UnmarshalText([]byte(testTopicName)) cfg := s.createBaseConfig() - cfg.Kafka.Protobuf.Enabled = true - cfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{ + cfg.Serde.Protobuf.Enabled = true + cfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{ { TopicName: topic0, ValueProtoType: "shop.v2.Order", }, } - cfg.Kafka.Protobuf.FileSystem.Enabled = true - cfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute - cfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"} + cfg.Serde.Protobuf.FileSystem.Enabled = true + cfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute + cfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"} logger, err := zap.NewProduction() require.NoError(err) @@ -2902,13 +2902,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3044,13 +3044,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3138,13 +3138,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3256,13 +3256,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3364,13 +3364,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3509,13 +3509,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3669,13 +3669,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc) @@ -3793,13 +3793,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) type ProductRecord struct { @@ -3952,13 +3952,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) type ProductRecord struct { @@ -4176,13 +4176,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() { schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger) require.NoError(err) - protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc) + protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc) require.NoError(err) err = protoSvc.Start() require.NoError(err) - mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack) + mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack) require.NoError(err) serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)