Skip to content

Commit

Permalink
backend: fix remaining references after config update
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Feb 28, 2025
1 parent a0f0585 commit e73a2c8
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 60 deletions.
12 changes: 6 additions & 6 deletions backend/pkg/api/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,25 @@ func (s *APIIntegrationTestSuite) SetupSuite() {
},
}
s.cfg.Kafka.Brokers = []string{s.testSeedBroker}
s.cfg.Kafka.Protobuf.Enabled = true
s.cfg.Serde.Protobuf.Enabled = true
s.cfg.SchemaRegistry.Enabled = true
s.cfg.SchemaRegistry.URLs = []string{registryAddr}

// proto message mapping
absProtoPath, err := filepath.Abs("../testutil/testdata/proto")
require.NoError(err)
s.cfg.Kafka.Protobuf.Enabled = true
s.cfg.Serde.Protobuf.Enabled = true
topic0 := config.RegexpOrLiteral{}
topic0.UnmarshalText([]byte(testutil.TopicNameForTest("publish_messages_proto_plain")))
s.cfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{
s.cfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{
{
TopicName: topic0,
ValueProtoType: "testutil.things.v1.Item",
},
}
s.cfg.Kafka.Protobuf.FileSystem.Enabled = true
s.cfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
s.cfg.Kafka.Protobuf.FileSystem.Paths = []string{absProtoPath}
s.cfg.Serde.Protobuf.FileSystem.Enabled = true
s.cfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
s.cfg.Serde.Protobuf.FileSystem.Paths = []string{absProtoPath}

s.api = New(s.cfg)

Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/console/list_messages_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func createNewTestService(t *testing.T, log *zap.Logger,
cfg.Kafka.Brokers = []string{seedBrokers}

if registryAddr != "" {
cfg.Kafka.Protobuf.Enabled = true
cfg.Serde.Protobuf.Enabled = true
cfg.SchemaRegistry.Enabled = true
cfg.SchemaRegistry.URLs = []string{registryAddr}
}
Expand Down
106 changes: 53 additions & 53 deletions backend/pkg/serde/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (s *SerdeIntegrationTestSuite) createBaseConfig() config.Config {
cfg.SetDefaults()
cfg.MetricsNamespace = testutil.MetricNameForTest("serde")
cfg.Kafka.Brokers = []string{s.seedBroker}
cfg.Kafka.Protobuf.Enabled = true
cfg.Serde.Protobuf.Enabled = true
cfg.SchemaRegistry.Enabled = true
cfg.SchemaRegistry.Enabled = true
cfg.SchemaRegistry.URLs = []string{s.registryAddress}
cfg.Kafka.MessagePack.Enabled = false
cfg.Serde.MessagePack.Enabled = false

return cfg
}
Expand Down Expand Up @@ -153,13 +153,13 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() {
logger, err := zap.NewProduction()
require.NoError(err)

protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger)
require.NoError(err)

err = protoSvc.Start()
require.NoError(err)

mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)

schemaClientFactory, err := schemafactory.NewSingleClientProvider(&cfg)
Expand Down Expand Up @@ -546,18 +546,18 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() {
require.NoError(topic0.UnmarshalText([]byte(testTopicName)))

testCfg := cfg
testCfg.Kafka.Protobuf.Enabled = true
testCfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{
testCfg.Serde.Protobuf.Enabled = true
testCfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{
{
TopicName: topic0,
ValueProtoType: "shop.v1.Order",
},
}
testCfg.Kafka.Protobuf.FileSystem.Enabled = true
testCfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
testCfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"}
testCfg.Serde.Protobuf.FileSystem.Enabled = true
testCfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
testCfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"}

protoSvc, err := protopkg.NewService(testCfg.Kafka.Protobuf, logger)
protoSvc, err := protopkg.NewService(testCfg.Serde.Protobuf, logger)
require.NoError(err)
require.NoError(protoSvc.Start())

Expand Down Expand Up @@ -704,18 +704,18 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() {
require.NoError(topic0.UnmarshalText([]byte(testTopicName)))

testCfg := cfg
testCfg.Kafka.Protobuf.Enabled = true
testCfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{
testCfg.Serde.Protobuf.Enabled = true
testCfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{
{
TopicName: topic0,
ValueProtoType: "shop.v2.Order",
},
}
testCfg.Kafka.Protobuf.FileSystem.Enabled = true
testCfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
testCfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"}
testCfg.Serde.Protobuf.FileSystem.Enabled = true
testCfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
testCfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"}

testProtoSvc, err := protopkg.NewService(testCfg.Kafka.Protobuf, logger)
testProtoSvc, err := protopkg.NewService(testCfg.Serde.Protobuf, logger)
require.NoError(err)
require.NoError(testProtoSvc.Start())

Expand Down Expand Up @@ -2161,7 +2161,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() {

// proto schema rediscovery is on a timer... this forces a new refresh

protoSvc2, err := protopkg.NewService(cfg.Kafka.Protobuf, logger)
protoSvc2, err := protopkg.NewService(cfg.Serde.Protobuf, logger)
require.NoError(err)

err = protoSvc.Start()
Expand Down Expand Up @@ -2567,13 +2567,13 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() {
logger, err := zap.NewProduction()
require.NoError(err)

protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger)
require.NoError(err)

err = protoSvc.Start()
require.NoError(err)

mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)

schemaClientFactory, err := schemafactory.NewSingleClientProvider(&cfg)
Expand Down Expand Up @@ -2765,13 +2765,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -2811,30 +2811,30 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
topic0.UnmarshalText([]byte(testTopicName))
cfg := s.createBaseConfig()
cfg.Kafka.Protobuf.Enabled = true
cfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{
cfg.Serde.Protobuf.Enabled = true
cfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{
{
TopicName: topic0,
ValueProtoType: "shop.v1.Order",
},
}
cfg.Kafka.Protobuf.FileSystem.Enabled = true
cfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
cfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"}
cfg.Serde.Protobuf.FileSystem.Enabled = true
cfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
cfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"}
logger, err := zap.NewProduction()
require.NoError(err)
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -2885,30 +2885,30 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
topic0.UnmarshalText([]byte(testTopicName))
cfg := s.createBaseConfig()
cfg.Kafka.Protobuf.Enabled = true
cfg.Kafka.Protobuf.Mappings = []config.ProtoTopicMapping{
cfg.Serde.Protobuf.Enabled = true
cfg.Serde.Protobuf.Mappings = []config.ProtoTopicMapping{
{
TopicName: topic0,
ValueProtoType: "shop.v2.Order",
},
}
cfg.Kafka.Protobuf.FileSystem.Enabled = true
cfg.Kafka.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
cfg.Kafka.Protobuf.FileSystem.Paths = []string{"testdata/proto"}
cfg.Serde.Protobuf.FileSystem.Enabled = true
cfg.Serde.Protobuf.FileSystem.RefreshInterval = 1 * time.Minute
cfg.Serde.Protobuf.FileSystem.Paths = []string{"testdata/proto"}
logger, err := zap.NewProduction()
require.NoError(err)
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3044,13 +3044,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3138,13 +3138,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3256,13 +3256,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3364,13 +3364,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3509,13 +3509,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3669,13 +3669,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down Expand Up @@ -3793,13 +3793,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
type ProductRecord struct {
Expand Down Expand Up @@ -3952,13 +3952,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
type ProductRecord struct {
Expand Down Expand Up @@ -4176,13 +4176,13 @@ func (s *SerdeIntegrationTestSuite) TestSerializeRecord() {
schemaSvc, err := schema.NewService(cfg.Kafka.Schema, logger)
require.NoError(err)
protoSvc, err := protopkg.NewService(cfg.Kafka.Protobuf, logger, schemaSvc)
protoSvc, err := protopkg.NewService(cfg.Serde.Protobuf, logger, schemaSvc)
require.NoError(err)
err = protoSvc.Start()
require.NoError(err)
mspPackSvc, err := ms.NewService(cfg.Kafka.MessagePack)
mspPackSvc, err := ms.NewService(cfg.Serde.MessagePack)
require.NoError(err)
serdeSvc := NewService(schemaSvc, protoSvc, mspPackSvc)
Expand Down

0 comments on commit e73a2c8

Please sign in to comment.