Skip to content

Commit

Permalink
Fix problem when names are not overriten
Browse files Browse the repository at this point in the history
  • Loading branch information
irux committed Aug 28, 2023
1 parent a97148a commit d929459
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
3 changes: 2 additions & 1 deletion kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ class KafkaSinkConnector(KafkaConnector):

@override
def get_input_topics(self) -> list[str]:
return getattr(self.app, "topics", [])
topics = getattr(self.app, "topics", None)
return topics.split(",") if topics is not None else []

@override
def add_input_topics(self, topics: list[str]) -> None:
Expand Down
16 changes: 9 additions & 7 deletions kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__(
self.parse_components(component_list)
self.validate()
self.generate_graph()
# self.validate_graph_components()
self.validate_graph_components()

@classmethod
def load_from_yaml(
Expand Down Expand Up @@ -175,7 +175,7 @@ def generate_graph(self):
all_input_topics: list[str] = []
if input_topics is not None:
all_input_topics += input_topics
if extra_input_topics is not None:
if extra_input_topics is not None and extra_input_topics:
all_input_topics += [
topic
for list_topics in extra_input_topics.values()
Expand All @@ -187,24 +187,26 @@ def generate_graph(self):
extra_output_topics = component.get_extra_output_topics()
if output_topics is not None:
all_output_topics += [output_topics]
if extra_output_topics is not None:
if extra_output_topics is not None and extra_output_topics:
all_output_topics += list(extra_output_topics.values())

self.components.graph_components.add_node(component.name)
component_vertex_name = f"component-{component.name}"
self.components.graph_components.add_node(component_vertex_name)
for input_topic in all_input_topics:
self.components.graph_components.add_node(input_topic)
self.components.graph_components.add_edge(input_topic, component.name)

for output_topic in all_output_topics:
self.components.graph_components.add_node(output_topic)
self.components.graph_components.add_edge(component.name, output_topic)
self.components.graph_components.add_edge(
component_vertex_name, output_topic
)
nx.draw(self.components.graph_components, with_labels=True)
plt.show()

def validate_graph_components(self):
if not nx.is_directed_acyclic_graph(self.components.graph_components):
print("contain loops")
# raise ValueError("Component graph contain loops!")
raise ValueError("Component graph contain loops!")

def parse_components(self, component_list: list[dict]) -> None:
"""Instantiate, enrich and inflate a list of components
Expand Down

0 comments on commit d929459

Please sign in to comment.