Skip to content

Commit

Permalink
Update logic for creating a cluster to fix bugs with other cluster se…
Browse files Browse the repository at this point in the history
…tups (#74)

Update test golden files and fix bug in golden file checking.

Adds a dependency on networkx for walking the Kustomziation dependency
tree to find the root nodes.
  • Loading branch information
allenporter authored Feb 22, 2023
1 parent c06dd0c commit 73f5cef
Show file tree
Hide file tree
Showing 46 changed files with 776 additions and 68 deletions.
178 changes: 141 additions & 37 deletions flux_local/git_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import contextlib
from dataclasses import dataclass, field
import logging
import networkx
import os
import tempfile
from collections.abc import Callable
from functools import cache
from pathlib import Path
from slugify import slugify
import queue
from typing import Any, Generator

import git
Expand All @@ -63,11 +65,11 @@

_LOGGER = logging.getLogger(__name__)

CLUSTER_KUSTOMIZE_NAME = "flux-system"
CLUSTER_KUSTOMIZE_KIND = "Kustomization"
KUSTOMIZE_KIND = "Kustomization"
HELM_REPO_KIND = "HelmRepository"
HELM_RELEASE_KIND = "HelmRelease"
GIT_REPO_KIND = "GitRepository"
DEFAULT_NAMESPACE = "flux-system"


Expand Down Expand Up @@ -196,7 +198,7 @@ def predicate(

def cluster_metadata_selector() -> MetadataSelector:
"""Create a new MetadataSelector for Kustomizations."""
return MetadataSelector(name=CLUSTER_KUSTOMIZE_NAME, namespace=DEFAULT_NAMESPACE)
return MetadataSelector(namespace=DEFAULT_NAMESPACE)


def ks_metadata_selector() -> MetadataSelector:
Expand Down Expand Up @@ -229,38 +231,147 @@ class ResourceSelector:
"""HelmRelease objects to return."""


async def get_clusters(path: Path, selector: MetadataSelector) -> list[Cluster]:
"""Load Cluster objects from the specified path."""
cmd = kustomize.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}", path).grep(
f"metadata.name={selector.name}",
)
docs = await cmd.objects()
return list(
filter(
selector.predicate,
[
Cluster.parse_doc(doc)
for doc in docs
if CLUSTER_KUSTOMIZE_DOMAIN_FILTER(doc)
],
)
)

async def get_flux_kustomizations(
root: Path, relative_path: Path
) -> list[Kustomization]:
"""Find all flux Kustomizations in the specified path.
async def get_cluster_kustomizations(path: Path) -> list[Kustomization]:
"""Load Kustomization objects from the specified path."""
cmd = kustomize.grep(f"kind={KUSTOMIZE_KIND}", path).grep(
f"metadata.name={CLUSTER_KUSTOMIZE_NAME}",
invert=True,
This may be called repeatedly with different paths to repeatedly collect
Kustomizations from the repo. Assumes that any flux Kustomization
for a GitRepository is pointed at this cluster, following normal conventions.
"""
cmd = kustomize.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}", root / relative_path).grep(
f"spec.sourceRef.kind={GIT_REPO_KIND}"
)
docs = await cmd.objects()
return [
Kustomization.parse_doc(doc)
for doc in docs
if CLUSTER_KUSTOMIZE_DOMAIN_FILTER(doc)
for doc in filter(CLUSTER_KUSTOMIZE_DOMAIN_FILTER, docs)
]


def find_path_parent(search: Path, prefixes: set[Path]) -> Path | None:
"""Return a prefix path that is a parent of the search path."""
for parent in search.parents:
if parent in prefixes:
return parent
return None


async def kustomization_traversal(path_selector: PathSelector) -> list[Kustomization]:
"""Search for kustomizations in the specified path."""

kustomizations: list[Kustomization] = []
visited: set[Path] = set() # Relative paths within the cluster

path_queue: queue.Queue[Path] = queue.Queue()
path_queue.put(path_selector.relative_path)
root = path_selector.root
while not path_queue.empty():
path = path_queue.get()
_LOGGER.debug("Visiting path (%s) %s", root, path)
docs = await get_flux_kustomizations(root, path)

# Source path is relative to the search path. Update to have the
# full prefix relative to the root.
for kustomization in docs:
if not kustomization.source_path:
continue
_LOGGER.debug(
"Updating relative path: %s, %s, %s",
root,
path,
kustomization.source_path,
)
kustomization.source_path = str(
((root / path) / kustomization.source_path).relative_to(root)
)

visited |= set({path})

_LOGGER.debug("Found %s Kustomizations", len(docs))
for doc in docs:
found_path = Path(doc.path)
if not find_path_parent(found_path, visited) and found_path not in visited:
path_queue.put(found_path)
else:
_LOGGER.debug("Already visited %s", found_path)
kustomizations.extend(docs)
return kustomizations


def make_clusters(kustomizations: list[Kustomization]) -> list[Cluster]:
"""Convert the flat list of Kustomizations into a Cluster.
This will reverse engineer which Kustomizations are root nodes for the cluster
based on the parent paths. Root Kustomizations are made the cluster and everything
else is made a child.
"""

# Build a directed graph from a kustomization path to the path
# of the kustomization that created it.
graph = networkx.DiGraph()
parent_paths = set([Path(ks.path) for ks in kustomizations])
for ks in kustomizations:
if not ks.source_path:
raise ValueError("Kustomization did not have source path; Old kustomize?")
path = Path(ks.path)
source = Path(ks.source_path)
graph.add_node(path, ks=ks)
# Find the parent Kustomization that produced this based on the
# matching the kustomize source parent paths with a Kustomization
# target path.
if (
parent_path := find_path_parent(source, parent_paths)
) and parent_path != path:
_LOGGER.debug("Found parent %s => %s", path, parent_path)
graph.add_edge(parent_path, path)
else:
_LOGGER.debug("No parent for %s (%s)", path, source)

# Clusters are subgraphs within the graph that are connected, with the root
# node being the cluster itself. All children Kustomizations are flattended.
_LOGGER.debug("Creating clusters based on connectivity")
roots = [node for node, degree in graph.in_degree() if degree == 0]
roots.sort()
clusters: list[Cluster] = []
_LOGGER.debug("roots=%s", roots)
for root in roots:
root_ks = graph.nodes[root]["ks"]
child_nodes = list(networkx.descendants(graph, root))
child_nodes.sort()
children = [graph.nodes[child]["ks"] for child in child_nodes]
clusters.append(
Cluster(
name=root_ks.name,
namespace=root_ks.namespace,
path=root_ks.path,
kustomizations=children,
)
)
_LOGGER.debug(
"Created cluster %s with %s kustomizations", root_ks.name, len(children)
)

return clusters


async def get_clusters(
path_selector: PathSelector,
cluster_selector: MetadataSelector,
kustomization_selector: MetadataSelector,
) -> list[Cluster]:
"""Load Cluster objects from the specified path."""

kustomizations = await kustomization_traversal(path_selector)
clusters = list(filter(cluster_selector.predicate, make_clusters(kustomizations)))
for cluster in clusters:
cluster.kustomizations = list(
filter(kustomization_selector.predicate, cluster.kustomizations)
)
return clusters


async def get_kustomizations(path: Path) -> list[dict[str, Any]]:
"""Load Kustomization objects from the specified path."""
cmd = kustomize.grep(f"kind={KUSTOMIZE_KIND}", path)
Expand Down Expand Up @@ -348,17 +459,10 @@ async def build_manifest(
if not selector.cluster.enabled:
return Manifest(clusters=[])

clusters = await get_clusters(selector.path.process_path, selector.cluster)
if len(clusters) > 0:
for cluster in clusters:
_LOGGER.debug("Processing cluster: %s", cluster.path)
cluster.kustomizations = await get_cluster_kustomizations(
selector.path.root / cluster.path
)
cluster.kustomizations = list(
filter(selector.kustomization.predicate, cluster.kustomizations)
)
elif selector.path.path:
clusters = await get_clusters(
selector.path, selector.cluster, selector.kustomization
)
if not clusters and selector.path.path:
_LOGGER.debug("No clusters found; Processing as a Kustomization: %s", selector)
# The argument path may be a Kustomization inside a cluster. Create a synthetic
# cluster with any found Kustomizations
Expand Down
16 changes: 11 additions & 5 deletions flux_local/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def parse_doc(cls, doc: dict[str, Any]) -> "HelmChart":
raise ValueError(f"Invalid {cls} missing spec.chart.spec: {doc}")
if not (chart := chart_spec.get("chart")):
raise ValueError(f"Invalid {cls} missing spec.chart.spec.chart: {doc}")
if not (version := chart_spec.get("version")):
raise ValueError(f"Invalid {cls} missing spec.chart.spec.version: {doc}")
version = chart_spec.get("version")
if not (source_ref := chart_spec.get("sourceRef")):
raise ValueError(f"Invalid {cls} missing spec.chart.spec.sourceRef: {doc}")
if "namespace" not in source_ref or "name" not in source_ref:
Expand Down Expand Up @@ -207,14 +206,17 @@ class Kustomization(BaseManifest):
"""The namespace of the kustomization."""

path: str
"""The local repo path to the kustomization."""
"""The local repo path to the kustomization contents."""

helm_repos: list[HelmRepository] = Field(default_factory=list)
"""The set of HelmRepositories represented in this kustomization."""

helm_releases: list[HelmRelease] = Field(default_factory=list)
"""The set of HelmRelease represented in this kustomization."""

source_path: str | None = None
"""Optional source path for this Kustomization, relative to the build path."""

@classmethod
def parse_doc(cls, doc: dict[str, Any]) -> "Kustomization":
"""Parse a partial Kustomization from a kubernetes resource."""
Expand All @@ -229,7 +231,10 @@ def parse_doc(cls, doc: dict[str, Any]) -> "Kustomization":
raise ValueError(f"Invalid {cls} missing spec: {doc}")
if not (path := spec.get("path")):
raise ValueError(f"Invalid {cls} missing spec.path: {doc}")
return Kustomization(name=name, namespace=namespace, path=path)
source_path = metadata.get("annotations", {}).get("config.kubernetes.io/path")
return Kustomization(
name=name, namespace=namespace, path=path, source_path=source_path
)

@property
def id_name(self) -> str:
Expand All @@ -239,7 +244,8 @@ def id_name(self) -> str:
_COMPACT_EXCLUDE_FIELDS = {
"helm_releases": {
"__all__": HelmRelease._COMPACT_EXCLUDE_FIELDS,
}
},
"source_path": True,
}


Expand Down
3 changes: 3 additions & 0 deletions flux_local/tool/flux_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import pathlib
import sys
import traceback

import yaml

Expand Down Expand Up @@ -64,6 +65,8 @@ def main() -> None:
try:
asyncio.run(action.run(**vars(args)))
except command.CommandException as err:
if args.log_level == "DEBUG":
traceback.print_exc()
print("Command failed: ", err)
sys.exit(1)

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ coverage==7.1.0
GitPython==3.1.31
mypy==1.0.1
nest_asyncio==1.5.6
networkx==3.0
pdoc==12.3.1
pip==23.0
pre-commit==3.0.4
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ install_requires =
python-slugify>=8.0.0
GitPython>=3.1.30
PyYAML>=6.0
networkx>=3.0
# Note: flux-local provides repo testing using pytest
pytest>=7.2.1
pytest-asyncio>=0.20.3
Expand Down
Loading

0 comments on commit 73f5cef

Please sign in to comment.