Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename role to label #525

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions docs/docs/schema/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"additionalProperties": false,
"description": "Input topic.",
"properties": {
"role": {
"label": {
"anyOf": [
{
"type": "string"
Expand All @@ -41,7 +41,7 @@
],
"default": null,
"description": "Custom identifier belonging to a topic; define only if `type` is `pattern` or `None`",
"title": "Role"
"title": "Label"
},
"type": {
"anyOf": [
Expand Down Expand Up @@ -1688,20 +1688,20 @@
"description": "Key schema class name",
"title": "Key schema"
},
"partitions_count": {
"label": {
"anyOf": [
{
"type": "integer"
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Number of partitions into which the topic is divided",
"title": "Partitions count"
"description": "Custom identifier belonging to one or multiple topics, provide only if `type` is `extra`",
"title": "Label"
},
"replication_factor": {
"partitions_count": {
"anyOf": [
{
"type": "integer"
Expand All @@ -1711,21 +1711,21 @@
}
],
"default": null,
"description": "Replication factor of the topic",
"title": "Replication factor"
"description": "Number of partitions into which the topic is divided",
"title": "Partitions count"
},
"role": {
"replication_factor": {
"anyOf": [
{
"type": "string"
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Custom identifier belonging to one or multiple topics, provide only if `type` is `extra`",
"title": "Role"
"description": "Replication factor of the topic",
"title": "Replication factor"
},
"type": {
"anyOf": [
Expand Down
26 changes: 13 additions & 13 deletions docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"additionalProperties": false,
"description": "Input topic.",
"properties": {
"role": {
"label": {
"anyOf": [
{
"type": "string"
Expand All @@ -41,7 +41,7 @@
],
"default": null,
"description": "Custom identifier belonging to a topic; define only if `type` is `pattern` or `None`",
"title": "Role"
"title": "Label"
},
"type": {
"anyOf": [
Expand Down Expand Up @@ -1088,20 +1088,20 @@
"description": "Key schema class name",
"title": "Key schema"
},
"partitions_count": {
"label": {
"anyOf": [
{
"type": "integer"
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Number of partitions into which the topic is divided",
"title": "Partitions count"
"description": "Custom identifier belonging to one or multiple topics, provide only if `type` is `extra`",
"title": "Label"
},
"replication_factor": {
"partitions_count": {
"anyOf": [
{
"type": "integer"
Expand All @@ -1111,21 +1111,21 @@
}
],
"default": null,
"description": "Replication factor of the topic",
"title": "Replication factor"
"description": "Number of partitions into which the topic is divided",
"title": "Partitions count"
},
"role": {
"replication_factor": {
"anyOf": [
{
"type": "string"
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Custom identifier belonging to one or multiple topics, provide only if `type` is `extra`",
"title": "Role"
"description": "Replication factor of the topic",
"title": "Replication factor"
},
"type": {
"anyOf": [
Expand Down
2 changes: 1 addition & 1 deletion examples
12 changes: 6 additions & 6 deletions kpops/components/base_components/models/from_section.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@ class FromTopic(DescConfigModel):
"""Input topic.

:param type: Topic type, defaults to None
:param role: Custom identifier belonging to a topic;
:param label: Custom identifier belonging to a topic;
define only if `type` is `pattern` or `None`, defaults to None
"""

type: InputTopicTypes | None = Field(
default=None, description=describe_attr("type", __doc__)
)
role: str | None = Field(default=None, description=describe_attr("role", __doc__))
label: str | None = Field(default=None, description=describe_attr("label", __doc__))

model_config = ConfigDict(
extra="forbid",
use_enum_values=True,
)

@model_validator(mode="after")
def extra_topic_role(self) -> Any:
"""Ensure that `cls.role` is used correctly, assign type if needed."""
if self.type == InputTopicTypes.INPUT and self.role:
msg = "Define role only if `type` is `pattern` or `None`"
def extra_topic_label(self) -> Any:
"""Ensure that `cls.label` is used correctly, assign type if needed."""
if self.type == InputTopicTypes.INPUT and self.label:
msg = "Define label only if `type` is `pattern` or `None`"
raise ValueError(msg)
return self

Expand Down
36 changes: 18 additions & 18 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def full_name(self) -> str:
@property
def inputs(self) -> Iterator[KafkaTopic]:
yield from self.input_topics
for role_topics in self.extra_input_topics.values():
yield from role_topics
for labeled_topics in self.extra_input_topics.values():
yield from labeled_topics

@property
def outputs(self) -> Iterator[KafkaTopic]:
Expand Down Expand Up @@ -109,11 +109,11 @@ def add_input_topics(self, topics: list[KafkaTopic]) -> None:
:param topics: Input topics
"""

def add_extra_input_topics(self, role: str, topics: list[KafkaTopic]) -> None:
"""Add given extra topics that share a role to the list of extra input topics.
def add_extra_input_topics(self, label: str, topics: list[KafkaTopic]) -> None:
"""Add given extra topics that share a label to the list of extra input topics.

:param topics: Extra input topics
:param role: Topic role
:param label: Topic label
"""

def set_input_pattern(self, name: str) -> None:
Expand All @@ -122,10 +122,10 @@ def set_input_pattern(self, name: str) -> None:
:param name: Input pattern name
"""

def add_extra_input_pattern(self, role: str, topic: str) -> None:
def add_extra_input_pattern(self, label: str, topic: str) -> None:
"""Add an input pattern of type extra.

:param role: Custom identifier belonging to one or multiple topics
:param label: Custom identifier belonging to one or multiple topics
:param topic: Topic name
"""

Expand All @@ -141,17 +141,17 @@ def set_error_topic(self, topic: KafkaTopic) -> None:
:param topic: Error topic
"""

def add_extra_output_topic(self, topic: KafkaTopic, role: str) -> None:
def add_extra_output_topic(self, topic: KafkaTopic, label: str) -> None:
"""Add an output topic of type extra.

:param topic: Output topic
:param role: Role that is unique to the extra output topic
:param label: Label that is unique to the extra output topic
"""

def set_input_topics(self) -> None:
"""Put values of config.from into the streams config section of streams bootstrap.

Supports extra_input_topics (topics by role) or input_topics.
Supports extra_input_topics (topics by label) or input_topics.
"""
if self.from_:
for name, topic in self.from_.topics.items():
Expand All @@ -165,10 +165,10 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> None:
"""
kafka_topic = KafkaTopic(name=name)
match topic.type:
case None if topic.role:
self.add_extra_input_topics(topic.role, [kafka_topic])
case InputTopicTypes.PATTERN if topic.role:
self.add_extra_input_pattern(topic.role, name)
case None if topic.label:
self.add_extra_input_topics(topic.label, [kafka_topic])
case InputTopicTypes.PATTERN if topic.label:
self.add_extra_input_pattern(topic.label, name)
case InputTopicTypes.PATTERN:
self.set_input_pattern(name)
case _:
Expand All @@ -177,7 +177,7 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> None:
def set_output_topics(self) -> None:
"""Put values of `to` section into the producer config section of streams bootstrap.

Supports extra_output_topics (topics by role) or output_topics.
Supports extra_output_topics (topics by label) or output_topics.
"""
if self.to:
for name, topic in self.to.topics.items():
Expand All @@ -191,8 +191,8 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None:
"""
kafka_topic = KafkaTopic(name=name)
match topic.type:
case None if topic.role:
self.add_extra_output_topic(kafka_topic, topic.role)
case None if topic.label:
self.add_extra_output_topic(kafka_topic, topic.label)
case OutputTopicTypes.ERROR:
self.set_error_topic(kafka_topic)
case _:
Expand All @@ -214,7 +214,7 @@ def weave_from_topics(
input_topics = [
topic_name
for topic_name, topic_config in to.topics.items()
if topic_config.type != OutputTopicTypes.ERROR and not topic_config.role
if topic_config.type != OutputTopicTypes.ERROR and not topic_config.label
]
for input_topic in input_topics:
self.apply_from_inputs(input_topic, from_topic)
Expand Down
6 changes: 3 additions & 3 deletions kpops/components/common/streams_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ def deserialize_extra_output_topics(
) -> dict[str, KafkaTopic] | Any:
if isinstance(extra_output_topics, dict):
return {
role: KafkaTopic(name=topic_name)
for role, topic_name in extra_output_topics.items()
label: KafkaTopic(name=topic_name)
for label, topic_name in extra_output_topics.items()
}
return extra_output_topics

@pydantic.field_serializer("extra_output_topics")
def serialize_extra_output_topics(
self, extra_topics: dict[str, KafkaTopic]
) -> dict[str, str]:
return {role: topic.name for role, topic in extra_topics.items()}
return {label: topic.name for label, topic in extra_topics.items()}

# TODO(Ivan Yordanov): Currently hacky and potentially unsafe. Find cleaner solution
@pydantic.model_serializer(mode="wrap", when_used="always")
Expand Down
15 changes: 6 additions & 9 deletions kpops/components/common/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TopicConfig(DescConfigModel):
:param partitions_count: Number of partitions into which the topic is divided
:param replication_factor: Replication factor of the topic
:param configs: Topic configs
:param role: Custom identifier belonging to one or multiple topics, provide only if `type` is `extra`
:param label: Custom identifier belonging to one or multiple topics, provide only if `type` is `extra`
"""

type: OutputTopicTypes | None = Field(
Expand Down Expand Up @@ -60,10 +60,7 @@ class TopicConfig(DescConfigModel):
configs: dict[str, str | int] = Field(
default={}, description=describe_attr("configs", __doc__)
)
# TODO: We can rename this but this would be a breaking change
role: str | None = Field(default=None, description=describe_attr("role", __doc__))
# TODO: Alternatively, we can define label and use both. Double checks everywhere.
# label: str | None = Field(default=None, description=describe_attr("label", __doc__))
label: str | None = Field(default=None, description=describe_attr("label", __doc__))

model_config = ConfigDict(
extra="forbid",
Expand All @@ -72,10 +69,10 @@ class TopicConfig(DescConfigModel):
)

@model_validator(mode="after")
def extra_topic_role(self) -> Any:
"""Ensure that `cls.role` is used correctly, assign type if needed."""
if self.type and self.role:
msg = "Define `role` only if `type` is undefined"
def extra_topic_label(self) -> Any:
"""Ensure that `cls.label` is used correctly, assign type if needed."""
if self.type and self.label:
msg = "Define `label` only if `type` is undefined"
raise ValueError(msg)
return self

Expand Down
4 changes: 2 additions & 2 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def set_output_topic(self, topic: KafkaTopic) -> None:
self.values.streams.output_topic = topic

@override
def add_extra_output_topic(self, topic: KafkaTopic, role: str) -> None:
self.values.streams.extra_output_topics[role] = topic
def add_extra_output_topic(self, topic: KafkaTopic, label: str) -> None:
self.values.streams.extra_output_topics[label] = topic

@property
@override
Expand Down
17 changes: 9 additions & 8 deletions kpops/components/streams_bootstrap/streams/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def deserialize_extra_input_topics(
) -> dict[str, list[KafkaTopic]] | Any:
if isinstance(extra_input_topics, dict):
return {
role: [KafkaTopic(name=topic_name) for topic_name in topics]
for role, topics in extra_input_topics.items()
label: [KafkaTopic(name=topic_name) for topic_name in topics]
for label, topics in extra_input_topics.items()
}
return extra_input_topics

Expand All @@ -82,7 +82,8 @@ def serialize_extra_input_topics(
self, extra_topics: dict[str, list[KafkaTopic]]
) -> dict[str, list[str]]:
return {
role: self.serialize_topics(topics) for role, topics in extra_topics.items()
label: self.serialize_topics(topics)
for label, topics in extra_topics.items()
}

def add_input_topics(self, topics: list[KafkaTopic]) -> None:
Expand All @@ -94,16 +95,16 @@ def add_input_topics(self, topics: list[KafkaTopic]) -> None:
"""
self.input_topics = KafkaTopic.deduplicate(self.input_topics + topics)

def add_extra_input_topics(self, role: str, topics: list[KafkaTopic]) -> None:
"""Add given extra topics that share a role to the list of extra input topics.
def add_extra_input_topics(self, label: str, topics: list[KafkaTopic]) -> None:
"""Add given extra topics that share a label to the list of extra input topics.

Ensures no duplicate topics in the list.

:param topics: Extra input topics
:param role: Topic role
:param label: Topic label
"""
self.extra_input_topics[role] = KafkaTopic.deduplicate(
self.extra_input_topics.get(role, []) + topics
self.extra_input_topics[label] = KafkaTopic.deduplicate(
self.extra_input_topics.get(label, []) + topics
)


Expand Down
Loading
Loading