Skip to content

Commit

Permalink
Merge pull request #3065 from redpanda-data/mihaitodor-fix-schema-id-…
Browse files Browse the repository at this point in the history
…translation-config-rp-migrator

Fix schema ID translation in `redpanda_migrator_bundle` output
  • Loading branch information
Jeffail authored Dec 9, 2024
2 parents ad959a9 + 25de0fb commit bf73d88
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- Trial Redpanda Enterprise licenses are now considered valid. (@Jeffail)
- The `redpanda_migrator_bundle` output now skips schema ID translation when `translate_schema_ids: false` and `schema_registry` is configured. (@mihaitodor)

## 4.43.0 - 2024-12-05

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mapping: |
"^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)"
]
},
"translate_schema_ids": this.schema_registry.length() != 0
"translate_schema_ids": this.redpanda_migrator.translate_schema_ids.or(true) && this.schema_registry.length() != 0
}
)
Expand Down Expand Up @@ -212,6 +212,140 @@ tests:
- output:
reject: ${! @fallback_error }

- name: Migrate messages, offsets and schemas but skip schema ID translation
config:
redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
translate_schema_ids: false
max_in_flight: 1
schema_registry:
url: http://localhost:8081
max_in_flight: 1

expected:
switch:
cases:
- check: metadata("input_label") == "redpanda_migrator"
output:
fallback:
- redpanda_migrator:
key: ${! metadata("kafka_key") }
max_in_flight: 1
partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) }
partitioner: manual
seed_brokers:
- 127.0.0.1:9092
timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) }
topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) }
metadata:
include_patterns:
- ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)
translate_schema_ids: false
processors:
- mapping: |
meta input_label = deleted()
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
- check: metadata("input_label") == "redpanda_migrator_offsets"
output:
fallback:
- redpanda_migrator_offsets:
seed_brokers:
- 127.0.0.1:9092
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
- check: metadata("input_label") == "schema_registry"
output:
fallback:
- schema_registry:
subject: ${! @schema_registry_subject }
url: http://localhost:8081
max_in_flight: 1
- switch:
cases:
- check: '@fallback_error == "request returned status: 422"'
output:
drop: {}
processors:
- log:
message: |
Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
- output:
reject: ${! @fallback_error }

- name: Migrate messages, offsets and schemas and do schema ID translation when requested explicitly
config:
redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
translate_schema_ids: true
max_in_flight: 1
schema_registry:
url: http://localhost:8081
max_in_flight: 1

expected:
switch:
cases:
- check: metadata("input_label") == "redpanda_migrator"
output:
fallback:
- redpanda_migrator:
key: ${! metadata("kafka_key") }
max_in_flight: 1
partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) }
partitioner: manual
seed_brokers:
- 127.0.0.1:9092
timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) }
topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) }
metadata:
include_patterns:
- ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)
translate_schema_ids: true
processors:
- mapping: |
meta input_label = deleted()
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
- check: metadata("input_label") == "redpanda_migrator_offsets"
output:
fallback:
- redpanda_migrator_offsets:
seed_brokers:
- 127.0.0.1:9092
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
- check: metadata("input_label") == "schema_registry"
output:
fallback:
- schema_registry:
subject: ${! @schema_registry_subject }
url: http://localhost:8081
max_in_flight: 1
- switch:
cases:
- check: '@fallback_error == "request returned status: 422"'
output:
drop: {}
processors:
- log:
message: |
Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
- output:
reject: ${! @fallback_error }

- name: Migrate only messages and offsets
config:
redpanda_migrator:
Expand Down

0 comments on commit bf73d88

Please sign in to comment.