Skip to content

Commit

Permalink
Replace kubernetes-asyncio with lightkube (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Aug 15, 2024
1 parent 8ed1ed6 commit 887d766
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 543 deletions.
65 changes: 26 additions & 39 deletions kpops/component_handlers/kubernetes/pvc_handler.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,36 @@
from __future__ import annotations

import logging
from collections.abc import AsyncIterable

from kubernetes_asyncio import client, config
from kubernetes_asyncio.client import ApiClient
from lightkube.core.async_client import AsyncClient
from lightkube.resources.core_v1 import PersistentVolumeClaim

log = logging.getLogger("PVC_handler")


class PVCHandler:
def __init__(self, app_name: str, namespace: str):
def __init__(self, app_name: str, namespace: str) -> None:
self.app_name = app_name
self.namespace = namespace

@classmethod
async def create(cls, app_name: str, namespace: str) -> PVCHandler:
self = cls(app_name, namespace)
await config.load_kube_config()
return self

async def list_pvcs(self) -> list[str]:
async with ApiClient() as api:
core_v1_api = client.CoreV1Api(api)
pvc_list = await core_v1_api.list_namespaced_persistent_volume_claim(
namespace=self.namespace, label_selector=f"app={self.app_name}"
)

pvc_names = [pvc.metadata.name for pvc in pvc_list.items]
if not pvc_names:
log.warning(
f"No PVCs found for app '{self.app_name}', in namespace '{self.namespace}'"
)
log.debug(
f"In namespace '{self.namespace}' StatefulSet '{self.app_name}' has corresponding PVCs: '{pvc_names}'"
)
return pvc_names

async def delete_pvcs(self) -> None:
async with ApiClient() as api:
core_v1_api = client.CoreV1Api(api)
pvc_names = await self.list_pvcs()
log.debug(
f"Deleting in namespace '{self.namespace}' StatefulSet '{self.app_name}' PVCs '{pvc_names}'"
self._client = AsyncClient(namespace=namespace) # pyright: ignore[reportArgumentType]

async def list_pvcs(self) -> AsyncIterable[PersistentVolumeClaim]:
return self._client.list(PersistentVolumeClaim, labels={"app": self.app_name})

async def delete_pvcs(self, dry_run: bool) -> None:
pvc_names: list[str] = [
pvc.metadata.name
async for pvc in await self.list_pvcs()
if pvc.metadata and pvc.metadata.name
]
if not pvc_names:
log.warning(
f"No PVCs found for app '{self.app_name}', in namespace '{self.namespace}'"
)
for pvc_name in pvc_names:
await core_v1_api.delete_namespaced_persistent_volume_claim(
pvc_name, self.namespace
) # type: ignore [reportGeneralTypeIssues]
return
log.debug(
f"Deleting in namespace '{self.namespace}' StatefulSet '{self.app_name}' PVCs {pvc_names}"
)
if dry_run:
return
for pvc_name in pvc_names:
await self._client.delete(PersistentVolumeClaim, pvc_name)
9 changes: 2 additions & 7 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,8 @@ async def clean(self, dry_run: bool) -> None:

async def clean_pvcs(self, dry_run: bool) -> None:
app_full_name = super(HelmApp, self).full_name
pvc_handler = await PVCHandler.create(app_full_name, self.namespace)
if dry_run:
pvc_names = await pvc_handler.list_pvcs()
log.info(f"Deleting the PVCs {pvc_names} for StatefulSet '{app_full_name}'")
else:
log.info(f"Deleting the PVCs for StatefulSet '{app_full_name}'")
await pvc_handler.delete_pvcs()
pvc_handler = PVCHandler(app_full_name, self.namespace)
await pvc_handler.delete_pvcs(dry_run)


class StreamsApp(KafkaApp, StreamsBootstrap):
Expand Down
Loading

0 comments on commit 887d766

Please sign in to comment.