Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Dec 8, 2024
1 parent 7c59f4d commit 721e5c2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 49 deletions.
99 changes: 51 additions & 48 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,55 @@ output:
err = stream.Run(ctx)
require.NoError(t, err)

// Run the Redpanda Migrator bundle
streamBuilder = service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: blobfish
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
output:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
`, source.brokerAddr, dummyTopic, source.schemaRegistryURL, destination.brokerAddr, destination.schemaRegistryURL)))
// require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

stream, err = streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

t.Log("Running migrator")

// Run stream in the background and shut it down when the test is finished
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

close(migratorCloseChan)
}()
t.Cleanup(func() {
require.NoError(t, stream.StopWithin(3*time.Second))

<-migratorCloseChan
})

time.Sleep(10 * time.Second)

// Read the message using a consumer group
dummyConsumerGroup := "test"

Expand Down Expand Up @@ -759,53 +808,7 @@ input:
err = stream.Run(ctx)
require.NoError(t, err)

// Run the Redpanda Migrator bundle
streamBuilder = service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: %s
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
output:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
`, source.brokerAddr, dummyTopic, dummyConsumerGroup, source.schemaRegistryURL, destination.brokerAddr, destination.schemaRegistryURL)))
// require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

stream, err = streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done = context.WithTimeout(context.Background(), 30*time.Second)

t.Log("Running migrator")

// Run stream in the background and shut it down when the test is finished
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(ctx)
require.NoError(t, err)

close(migratorCloseChan)
}()
t.Cleanup(func() {
done()
<-migratorCloseChan
})
time.Sleep(10 * time.Second)

// Produce one message
streamBuilder = service.NewStreamBuilder()
Expand Down Expand Up @@ -844,7 +847,7 @@ output:
require.NoError(t, err)

t.Log("Produced message")
time.Sleep(5 * time.Hour) //////////////////////////////// TODOOOOOOOOOOOOOOOOOOOO
time.Sleep(10 * time.Second) //////////////////////////////// TODOOOOOOOOOOOOOOOOOOOO

// Read the message using a consumer group
streamBuilder = service.NewStreamBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func init() {
return nil, fmt.Errorf("failed to decode record key: %s", err)
}

isExpectedTopic := false
var isExpectedTopic bool
if len(topicPatterns) > 0 {
isExpectedTopic = slices.ContainsFunc(topicPatterns, func(tp *regexp.Regexp) bool {
return tp.MatchString(key.Topic)
Expand Down
1 change: 1 addition & 0 deletions internal/impl/kafka/franz_reader_ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func FranzReaderOrderedConfigFields() []*service.ConfigField {

//------------------------------------------------------------------------------

// RecordToMessageFn is a function that converts a Kafka record into a Message.
type RecordToMessageFn func(record *kgo.Record) (*service.Message, error)

// FranzReaderOrdered implements a kafka reader using the franz-go library.
Expand Down

0 comments on commit 721e5c2

Please sign in to comment.