Skip to content

Commit

Permalink
Add support for streams-bootstrap producer jobs (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Jan 10, 2023
1 parent 7e657dd commit 2a7680d
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from kubernetes_asyncio.client import V1beta1CronJob, V1Job

import streams_explorer.core.k8s_app as k8s
from streams_explorer.core.extractor.extractor import ProducerAppExtractor

if TYPE_CHECKING:
from streams_explorer.core.k8s_app import K8sAppCronJob, K8sAppJob


class StreamsBootstrapProducer(ProducerAppExtractor):
def on_job_parsing(self, job: V1Job) -> K8sAppJob | None:
producer = k8s.K8sAppJob(job)

# filter out Jobs created by CronJobs
if producer.metadata.owner_references:
for owner_reference in producer.metadata.owner_references:
if owner_reference.kind == "CronJob":
return None

if producer.is_streams_app:
return producer

def on_cron_job_parsing(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
producer = k8s.K8sAppCronJob(cron_job)
if producer.is_streams_app:
return producer
8 changes: 6 additions & 2 deletions backend/streams_explorer/core/extractor/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from kubernetes_asyncio.client import V1beta1CronJob
from kubernetes_asyncio.client import V1beta1CronJob, V1Job

from streams_explorer.models.k8s import K8sConfig
from streams_explorer.models.kafka_connector import KafkaConnector
Expand All @@ -13,7 +13,7 @@
from streams_explorer.plugins import Plugin

if TYPE_CHECKING:
from streams_explorer.core.k8s_app import K8sAppCronJob
from streams_explorer.core.k8s_app import K8sAppCronJob, K8sAppJob


@dataclass
Expand Down Expand Up @@ -45,6 +45,10 @@ def on_streaming_app_delete(self, config: K8sConfig) -> None:


class ProducerAppExtractor(Extractor):
@abstractmethod
def on_job_parsing(self, job: V1Job) -> K8sAppJob | None:
...

@abstractmethod
def on_cron_job_parsing(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
...
10 changes: 8 additions & 2 deletions backend/streams_explorer/core/extractor/extractor_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import TYPE_CHECKING, NamedTuple

from kubernetes_asyncio.client import V1beta1CronJob
from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from loguru import logger

from streams_explorer.core.extractor.default.generic import GenericSink, GenericSource
Expand All @@ -18,7 +18,7 @@
from streams_explorer.models.source import Source

if TYPE_CHECKING:
from streams_explorer.core.k8s_app import K8sAppCronJob
from streams_explorer.core.k8s_app import K8sAppCronJob, K8sAppJob


class SourcesSinks(NamedTuple):
Expand Down Expand Up @@ -67,6 +67,12 @@ def on_connector_info_parsing(
):
return connector

def on_job(self, job: V1Job) -> K8sAppJob | None:
for extractor in self.extractors:
if isinstance(extractor, ProducerAppExtractor):
if app := extractor.on_job_parsing(job):
return app

def on_cron_job(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
for extractor in self.extractors:
if isinstance(extractor, ProducerAppExtractor):
Expand Down
49 changes: 40 additions & 9 deletions backend/streams_explorer/core/k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
V1beta1CronJob,
V1Container,
V1Deployment,
V1Job,
V1ObjectMeta,
V1PodSpec,
V1StatefulSet,
Expand All @@ -20,7 +21,7 @@

ATTR_PIPELINE = "pipeline"

K8sObject: TypeAlias = V1Deployment | V1StatefulSet | V1beta1CronJob
K8sObject: TypeAlias = V1Deployment | V1StatefulSet | V1Job | V1beta1CronJob

config_parser: type[K8sConfigParser] = load_config_parser()

Expand Down Expand Up @@ -157,14 +158,17 @@ def _set_annotations(self) -> None:

@staticmethod
def factory(k8s_object: K8sObject) -> K8sApp:
if isinstance(k8s_object, V1Deployment):
return K8sAppDeployment(k8s_object)
elif isinstance(k8s_object, V1StatefulSet):
return K8sAppStatefulSet(k8s_object)
elif isinstance(k8s_object, V1beta1CronJob):
return K8sAppCronJob(k8s_object)
else:
raise ValueError(k8s_object)
match k8s_object:
case V1Deployment(): # type: ignore[misc]
return K8sAppDeployment(k8s_object)
case V1StatefulSet(): # type: ignore[misc]
return K8sAppStatefulSet(k8s_object)
case V1Job(): # type: ignore[misc]
return K8sAppJob(k8s_object)
case V1beta1CronJob(): # type: ignore[misc]
return K8sAppCronJob(k8s_object)
case _:
raise ValueError(k8s_object)

@staticmethod
def get_app_container(
Expand All @@ -184,6 +188,33 @@ def _labels_to_use() -> set[str]:
return set(settings.k8s.labels)


class K8sAppJob(K8sApp):
def __init__(self, k8s_object: V1Job) -> None:
super().__init__(k8s_object)

def setup(self) -> None:
self.spec = self._get_pod_spec()
self.container = self.get_app_container(self.spec)
self.extract_config()
self.__set_attributes()

@property
def replicas_ready(self) -> None:
return None

@property
def replicas_total(self) -> None:
return None

def _get_pod_spec(self) -> V1PodSpec | None:
if self.k8s_object.spec and self.k8s_object.spec.template.spec:
return self.k8s_object.spec.template.spec

def __set_attributes(self) -> None:
self._set_labels()
self._set_pipeline()


class K8sAppCronJob(K8sApp):
def __init__(self, k8s_object: V1beta1CronJob) -> None:
super().__init__(k8s_object)
Expand Down
27 changes: 22 additions & 5 deletions backend/streams_explorer/core/services/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
V1beta1CronJobList,
V1Deployment,
V1DeploymentList,
V1Job,
V1JobList,
V1StatefulSet,
V1StatefulSetList,
)
Expand All @@ -30,7 +32,11 @@
class K8sResource(NamedTuple):
func: Callable[
...,
V1DeploymentList | V1StatefulSetList | V1beta1CronJobList | EventsV1EventList,
V1DeploymentList
| V1StatefulSetList
| V1JobList
| V1beta1CronJobList
| EventsV1EventList,
]
return_type: type | None
callback: Callable[..., Awaitable[None]]
Expand All @@ -48,8 +54,8 @@ class K8sEvent(TypedDict):


class Kubernetes:
context = settings.k8s.deployment.context
namespaces = settings.k8s.deployment.namespaces
context: str = settings.k8s.deployment.context
namespaces: list[str] = settings.k8s.deployment.namespaces

def __init__(self, streams_explorer: StreamsExplorer) -> None:
self.streams_explorer = streams_explorer
Expand All @@ -70,7 +76,8 @@ async def setup(self) -> None:
kubernetes_asyncio.client.Configuration.set_default(conf)

self.k8s_app_client = kubernetes_asyncio.client.AppsV1Api()
self.k8s_batch_client = kubernetes_asyncio.client.BatchV1beta1Api()
self.k8s_batch_client = kubernetes_asyncio.client.BatchV1Api()
self.k8s_beta_batch_client = kubernetes_asyncio.client.BatchV1beta1Api()
self.k8s_events_client = kubernetes_asyncio.client.EventsV1Api()

async def watch(self) -> None:
Expand All @@ -84,8 +91,13 @@ def list_stateful_sets(namespace: str, *args, **kwargs) -> V1StatefulSetList:
*args, namespace=namespace, **kwargs
)

def list_jobs(namespace: str, *args, **kwargs) -> V1JobList:
return self.k8s_batch_client.list_namespaced_job(
*args, namespace=namespace, **kwargs
)

def list_cron_jobs(namespace: str, *args, **kwargs) -> V1beta1CronJobList:
return self.k8s_batch_client.list_namespaced_cron_job(
return self.k8s_beta_batch_client.list_namespaced_cron_job(
*args, namespace=namespace, **kwargs
)

Expand All @@ -105,6 +117,11 @@ def list_events(namespace: str, *args, **kwargs) -> EventsV1EventList:
V1StatefulSet,
self.streams_explorer.handle_deployment_update,
),
K8sResource(
list_jobs,
V1Job,
self.streams_explorer.handle_deployment_update,
),
K8sResource(
list_cron_jobs,
V1beta1CronJob,
Expand Down
4 changes: 4 additions & 0 deletions backend/streams_explorer/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from streams_explorer.core.extractor.default.elasticsearch_sink import ElasticsearchSink
from streams_explorer.core.extractor.default.jdbc_sink import JdbcSink
from streams_explorer.core.extractor.default.s3_sink import S3Sink
from streams_explorer.core.extractor.default.streams_bootstrap_producer import (
StreamsBootstrapProducer,
)
from streams_explorer.core.extractor.extractor import Extractor
from streams_explorer.core.extractor.extractor_container import ExtractorContainer
from streams_explorer.plugins import load_plugin
Expand All @@ -10,6 +13,7 @@


def load_default() -> None:
extractor_container.add(StreamsBootstrapProducer())
extractor_container.add(ElasticsearchSink())
extractor_container.add(S3Sink())
extractor_container.add(JdbcSink())
Expand Down
13 changes: 8 additions & 5 deletions backend/streams_explorer/streams_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from cachetools.func import ttl_cache
from fastapi import WebSocket
from kubernetes_asyncio.client import V1beta1CronJob
from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from loguru import logger

from streams_explorer.core.client_manager import ClientManager
Expand Down Expand Up @@ -156,10 +156,13 @@ async def handle_deployment_update(self, update: K8sDeploymentUpdate) -> None:
)

app: K8sApp | None = None
if isinstance(item, V1beta1CronJob):
app = extractor_container.on_cron_job(item)
else:
app = K8sApp.factory(item)
match item:
case V1Job(): # type: ignore[misc]
app = extractor_container.on_job(item)
case V1beta1CronJob(): # type: ignore[misc]
app = extractor_container.on_cron_job(item)
case _:
app = K8sApp.factory(item)
if app:
await self._handle_app_update(update["type"], app)

Expand Down
Loading

0 comments on commit 2a7680d

Please sign in to comment.