diff --git a/Dockerfile.backend b/Dockerfile.backend new file mode 100644 index 0000000..0697bb2 --- /dev/null +++ b/Dockerfile.backend @@ -0,0 +1,57 @@ +# Use Python 3.11 slim as the base image +FROM python:3.11-slim AS base + +# Set environment variables for Python behavior +ENV PYTHONFAULTHANDLER=1 \ + PYTHONHASHSEED=random \ + PYTHONUNBUFFERED=1 + +# Set the working directory inside the container +WORKDIR /app + +# Builder stage: Install dependencies and build the backend package +FROM base AS builder + +# Set environment variables for pip and Poetry +ENV PIP_DEFAULT_TIMEOUT=100 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 \ + PIP_NO_CACHE_DIR=1 \ + POETRY_VERSION=1.3.1 + +# Install Poetry +RUN pip install "poetry==$POETRY_VERSION" + +# Copy the pyproject.toml and poetry.lock files +COPY pyproject.toml poetry.lock ./ + +# Copy the entire konduktor directory to the container +COPY konduktor ./konduktor + +# List the contents of the konduktor directory to verify the copy +RUN ls -la ./konduktor + +# Configure Poetry and install dependencies only for the dashboard group +RUN poetry config virtualenvs.in-project true && \ + poetry install --with dashboard --no-root + +# Final stage for production +FROM base AS final + +# Set the working directory +WORKDIR /app + +# Copy the virtual environment from the builder stage +COPY --from=builder /app/.venv ./.venv + +# Copy the konduktor directory from the builder stage +COPY --from=builder /app/konduktor ./konduktor + +# Copy the startup script +COPY startup.sh /app/startup.sh +RUN chmod +x /app/startup.sh + +# Expose the port the app runs on +EXPOSE 5001 + +# Set the startup command +CMD ["/app/startup.sh"] diff --git a/Dockerfile.frontend b/Dockerfile.frontend new file mode 100644 index 0000000..8588b27 --- /dev/null +++ b/Dockerfile.frontend @@ -0,0 +1,23 @@ +# Use the official Node.js 18 slim image +FROM node:18-slim + +# Set the working directory +WORKDIR /app + +# Copy package.json and package-lock.json from the /frontend folder +COPY konduktor/dashboard/frontend/package*.json ./ + +# Install dependencies +RUN npm install + +# Copy the entire frontend source code +COPY konduktor/dashboard/frontend/ . + +# Build the frontend for production +RUN npm run build + +# Expose the frontend port +EXPOSE 5173 + +# Start the frontend app +CMD ["npm", "run", "start"] diff --git a/docs/source/admin/observability.rst b/docs/source/admin/observability.rst index 902aab4..d128840 100644 --- a/docs/source/admin/observability.rst +++ b/docs/source/admin/observability.rst @@ -75,4 +75,37 @@ As well as (S)Xid errors by following :code:`dmesg` on each node. You can also p .. figure:: ../images/otel-loki.png :width: 120% :align: center - :alt: dashboard \ No newline at end of file + :alt: dashboard + +Dashboard +------------ + +This is a user-friendly localhost dashboard to manage workloads within a cluster, all in one place. + +Features include: + +- Grafana konduktor dashboard +- Loki logs (search + filtering by namespace) +- Table to view, delete, and modify priority of workloads in queue + +To open the dashboard, run this inside the root konduktor directory: + +.. code-block:: console + + $ ./start_dashboard.sh + +If running into a permission error, try this instead: + +.. code-block:: console + + $ chmod +x start_dashboard.sh && ./start_dashboard.sh + +.. figure:: ../images/dashboard-logs.png + :width: 120% + :align: center + :alt: dashboard-logs + +.. figure:: ../images/dashboard-jobs.png + :width: 120% + :align: center + :alt: dashboard-jobs \ No newline at end of file diff --git a/docs/source/images/dashboard-jobs.png b/docs/source/images/dashboard-jobs.png new file mode 100644 index 0000000..3351bd0 Binary files /dev/null and b/docs/source/images/dashboard-jobs.png differ diff --git a/docs/source/images/dashboard-logs.png b/docs/source/images/dashboard-logs.png new file mode 100644 index 0000000..9e8a430 Binary files /dev/null and b/docs/source/images/dashboard-logs.png differ diff --git a/format.sh b/format.sh index 90948a2..f5bdf3e 100644 --- a/format.sh +++ b/format.sh @@ -1,8 +1,8 @@ #!/usr/bin/env bash set -eo pipefail -RUFF_VERSION=$(ruff --version | head -n 1 | awk '{print $2}') -MYPY_VERSION=$(mypy --version | awk '{print $2}') +RUFF_VERSION=$(poetry run ruff --version | head -n 1 | awk '{print $2}') +MYPY_VERSION=$(poetry run mypy --version | awk '{print $2}') echo "ruff ver $RUFF_VERSION" echo "mypy ver $MYPY_VERSION" diff --git a/konduktor/dashboard/README.md b/konduktor/dashboard/README.md new file mode 100644 index 0000000..0c7ab05 --- /dev/null +++ b/konduktor/dashboard/README.md @@ -0,0 +1,30 @@ +### Prereqs: kubectl is configured with remote machine/cluster + +# OPTION 1 (Automated Setup) + +To open the dashboard, run this inside the root konduktor directory: +``` +./start_dashboard.sh +``` + +# OPTION 2 (Manual Setup) + +## 1. Apply kubernetes manifest +Inside manifests directory (one with dashboard_deployment.yaml): +``` +kubectl apply -f dashboard_deployment.yaml +``` +Then, wait a minute or two for the pods to finish setup + +## 2. Port forward frontend in a terminal +``` +kubectl port-forward svc/frontend 5173:5173 -n konduktor-dashboard +``` + +## 3. Port forward grafana in a terminal +``` +kubectl port-forward svc/kube-prometheus-stack-grafana 3000:80 -n prometheus +``` + +## 4. Open dashboard at http://localhost:5173/ + diff --git a/konduktor/dashboard/backend/main.py b/konduktor/dashboard/backend/main.py new file mode 100644 index 0000000..bfff82c --- /dev/null +++ b/konduktor/dashboard/backend/main.py @@ -0,0 +1,169 @@ +from typing import Any, Dict, List + +import socketio +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from kubernetes import client +from kubernetes.client.exceptions import ApiException + +from konduktor import logging as konduktor_logging +from konduktor.kube_client import batch_api, core_api, crd_api + +from .sockets import socketio as sio + +logger = konduktor_logging.get_logger2(__name__) + +# FastAPI app +app = FastAPI() + + +# CORS Configuration +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Allow all origins + allow_credentials=True, + allow_methods=["*"], # Allow all methods + allow_headers=["*"], # Allow all headers +) + +# Use Kubernetes API clients +# Initialize BatchV1 and CoreV1 API (native kubernetes) +batch_client = batch_api() +core_client = core_api() +# Initialize Kueue API +crd_client = crd_api() + + +@app.get("/") +async def home(): + return JSONResponse({"home": "/"}) + + +@app.delete("/deleteJob") +async def delete_job(request: Request): + data = await request.json() + name = data.get("name", "") + namespace = data.get("namespace", "default") + + try: + delete_options = client.V1DeleteOptions(propagation_policy="Background") + + crd_client.delete_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="workloads", + name=name, + body=delete_options, + ) + logger.debug(f"Kueue Workload '{name}' deleted successfully.") + + return JSONResponse({"success": True, "status": 200}) + + except ApiException as e: + logger.debug(f"Exception: {e}") + return JSONResponse({"error": str(e)}, status_code=e.status) + + +@app.get("/getJobs") +async def get_jobs(): + rows = fetch_jobs() + return JSONResponse(rows) + + +@app.get("/getNamespaces") +async def get_namespaces(): + try: + # Get the list of namespaces + namespaces = core_client.list_namespace() + # Extract the namespace names from the response + namespace_list = [ns.metadata.name for ns in namespaces.items] + return JSONResponse(namespace_list) + except ApiException as e: + logger.debug(f"Exception: {e}") + return JSONResponse({"error": str(e)}, status_code=e.status) + + +@app.put("/updatePriority") +async def update_priority(request: Request): + data = await request.json() + name = data.get("name", "") + namespace = data.get("namespace", "default") + priority = data.get("priority", 0) + + try: + job = crd_client.get_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="workloads", + name=name, + ) + + job["spec"]["priority"] = priority + + crd_client.patch_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="workloads", + name=name, + body=job, + ) + return JSONResponse({"success": True, "status": 200}) + + except ApiException as e: + logger.debug(f"Exception: {e}") + return JSONResponse({"error": str(e)}, status_code=e.status) + + +# Get a listing of workloads in kueue +def fetch_jobs(): + listing = crd_client.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace="default", + plural="workloads", + ) + + return format_workloads(listing) + + +def format_workloads(listing: Dict[str, Any]) -> List[Dict[str, Any]]: + if not listing: + return [] + + res = [] + + for job in listing["items"]: + id = job["metadata"]["uid"] + name = job["metadata"]["name"] + created_at = job["metadata"]["creationTimestamp"] + namespace = job["metadata"]["namespace"] + localQueueName = job["spec"].get("queueName", "Unknown") + priority = job["spec"]["priority"] + active = job["spec"].get("active", 0) + status = "ADMITTED" if "admission" in job.get("status", {}) else "PENDING" + + statusVal = 1 if "admission" in job.get("status", {}) else 0 + order = (statusVal * 10) + priority + + res.append( + { + "id": id, + "name": name, + "namespace": namespace, + "localQueueName": localQueueName, + "priority": priority, + "status": status, + "active": active, + "created_at": created_at, + "order": order, + } + ) + + return res + + +app = socketio.ASGIApp(sio, app) diff --git a/konduktor/dashboard/backend/sockets.py b/konduktor/dashboard/backend/sockets.py new file mode 100644 index 0000000..4164199 --- /dev/null +++ b/konduktor/dashboard/backend/sockets.py @@ -0,0 +1,155 @@ +import asyncio +import datetime +import os +import time +from typing import Dict, List + +import requests +from socketio import AsyncServer # Import the AsyncServer for ASGI compatibility + +from konduktor import logging as konduktor_logging + +# SocketIO configuration +socketio = AsyncServer( + cors_allowed_origins="*", ping_interval=25, ping_timeout=60, async_mode="asgi" +) + +# logger = logging.getLogger(__name__) +logger = konduktor_logging.get_logger2(__name__) + +# Global variables +CLIENT_CONNECTED = False +FIRST_RUN = True +BACKGROUND_TASK_RUNNING = False +LOG_CHECKPOINT_TIME = None +SELECTED_NAMESPACES: list[str] = [] + +# "http://loki.loki.svc.cluster.local:3100/loki/api/v1/query_range" for prod +# "http://localhost:3100/loki/api/v1/query_range" for local +LOGS_URL = os.environ.get("LOGS_URL", "http://localhost:3100/loki/api/v1/query_range") + + +def format_log_entry(entry: List[str], namespace: str) -> Dict[str, str]: + """ + Formats a log entry and its corresponding namespace + + Args: + entry (List[str]): A list of log entry strings to be formatted. + namespace (str): The namespace to apply to each log entry. + + Returns: + Dict[str, str]: an object with the following properties: + timestamp, log (message), and namespace + """ + timestamp_ns = entry[0] + log_message = entry[1] + timestamp_s = int(timestamp_ns) / 1e9 + dt = datetime.datetime.utcfromtimestamp(timestamp_s) + human_readable_time = dt.strftime("%Y-%m-%d %H:%M:%S") + formatted_log = { + "timestamp": human_readable_time, + "log": log_message, + "namespace": namespace, + } + return formatted_log + + +def get_logs(FIRST_RUN: bool) -> List[Dict[str, str]]: + global LOG_CHECKPOINT_TIME + + logger.debug(f"Selected namespaces: {SELECTED_NAMESPACES}") + + # Use the selected namespaces in the query + namespace_filter = ( + "|".join(SELECTED_NAMESPACES) if SELECTED_NAMESPACES else "default" + ) + query = f'{{k8s_namespace_name=~"{namespace_filter}"}}' + + logger.debug(f"Loki logs query: {query}") + + if FIRST_RUN: + # Calculate how many nanoseconds to look back when first time looking at logs + # (currently 1 hour) + now = int(time.time() * 1e9) + one_hour_ago = now - int(3600 * 1e9) + start_time = str(one_hour_ago) + else: + # calculate new start_time based on newest, last message + if LOG_CHECKPOINT_TIME is None: + LOG_CHECKPOINT_TIME = 0 + start_time = str(int(LOG_CHECKPOINT_TIME) + 1) + + params = {"query": query, "start": start_time, "limit": "300"} + + url = LOGS_URL + response = requests.get(url, params=params) + formatted_logs = [] + + last = 0 + + if response.status_code == 200: + data = response.json() + rows = data["data"]["result"] + + for row in rows: + namespace = row["stream"]["k8s_namespace_name"] + for value in row["values"]: + last = max(int(value[0]), last) + formatted_logs.append(format_log_entry(value, namespace)) + + if formatted_logs: + # sort because sometimes loki API is wrong and logs are out of order + formatted_logs.sort( + key=lambda log: datetime.datetime.strptime( + log["timestamp"], "%Y-%m-%d %H:%M:%S" + ) + ) + LOG_CHECKPOINT_TIME = last + + logger.debug(f"Formatted logs length: {len(formatted_logs)}") + + return formatted_logs + + +async def send_logs(): + global CLIENT_CONNECTED, FIRST_RUN, BACKGROUND_TASK_RUNNING + while CLIENT_CONNECTED: + logs = get_logs(FIRST_RUN) + + FIRST_RUN = False # After the first successful fetch, set to False + if logs: + await socketio.emit("log_data", logs) + + await asyncio.sleep(5) + + # Background task is no longer running after the loop + BACKGROUND_TASK_RUNNING = False + + +@socketio.event +async def connect(sid, environ): + global CLIENT_CONNECTED, FIRST_RUN, BACKGROUND_TASK_RUNNING + CLIENT_CONNECTED = True + FIRST_RUN = True + logger.debug("Client connected") + + # Start the background task only if it's not already running + if not BACKGROUND_TASK_RUNNING: + BACKGROUND_TASK_RUNNING = True + socketio.start_background_task(send_logs) + + +@socketio.event +async def update_namespaces(sid, namespaces): + global SELECTED_NAMESPACES + SELECTED_NAMESPACES = namespaces + logger.debug("Updated namespaces") + + +@socketio.event +async def disconnect(sid): + global CLIENT_CONNECTED, FIRST_RUN, BACKGROUND_TASK_RUNNING + CLIENT_CONNECTED = False + FIRST_RUN = True + BACKGROUND_TASK_RUNNING = False + logger.debug("Client disconnected") diff --git a/konduktor/dashboard/frontend/.eslintrc.json b/konduktor/dashboard/frontend/.eslintrc.json new file mode 100644 index 0000000..bffb357 --- /dev/null +++ b/konduktor/dashboard/frontend/.eslintrc.json @@ -0,0 +1,3 @@ +{ + "extends": "next/core-web-vitals" +} diff --git a/konduktor/dashboard/frontend/.gitignore b/konduktor/dashboard/frontend/.gitignore new file mode 100644 index 0000000..fd3dbb5 --- /dev/null +++ b/konduktor/dashboard/frontend/.gitignore @@ -0,0 +1,36 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# dependencies +/node_modules +/.pnp +.pnp.js +.yarn/install-state.gz + +# testing +/coverage + +# next.js +/.next/ +/out/ + +# production +/build + +# misc +.DS_Store +*.pem + +# debug +npm-debug.log* +yarn-debug.log* +yarn-error.log* + +# local env files +.env*.local + +# vercel +.vercel + +# typescript +*.tsbuildinfo +next-env.d.ts diff --git a/konduktor/dashboard/frontend/app/api/jobs/route.js b/konduktor/dashboard/frontend/app/api/jobs/route.js new file mode 100644 index 0000000..434e318 --- /dev/null +++ b/konduktor/dashboard/frontend/app/api/jobs/route.js @@ -0,0 +1,71 @@ +import { NextResponse } from 'next/server'; + +const backendUrl = process.env.NODE_ENV === 'development' + ? 'http://127.0.0.1:5001' // Development API + : 'http://backend:5001' // Production API + +// GET request for jobs +export async function GET() { + try { + // Forward request to backend API + const response = await fetch(`${backendUrl}/getJobs`, { + method: 'GET', + headers: { + 'Content-Type': 'application/json' + }, + }) + + const data = await response.json() + return new NextResponse(JSON.stringify(data)) + } catch (error) { + console.error("Server get error:", error); + return new NextResponse(error) + } +} + +// DELETE request for job deletion +export async function DELETE(req) { + try { + const { name, namespace } = await req.json(); // Parse the request body + + // Forward request to backend API + const response = await fetch(`${backendUrl}/deleteJob`, { + method: 'DELETE', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ name, namespace }) + }) + + const data = await response.json() + console.log(`Server Component deleteJob: ${JSON.stringify(data)}`) + return new NextResponse(JSON.stringify(data)) + } catch (error) { + console.error("Server delete error:", error); + return new NextResponse(error) + } +} + +// PUT request for updating job priority +export async function PUT(req) { + try { + const { name, namespace, priority, priority_class_name } = await req.json(); // Parse the request body + + // Forward request to backend API + const response = await fetch(`${backendUrl}/updatePriority`, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ name, namespace, priority, priority_class_name }) + }) + + const data = await response.json() + console.log(`Server Component updatePriority: ${JSON.stringify(data)}`) + return new NextResponse(JSON.stringify(data)) + } catch (error) { + console.error("Server update error:", error); + return new NextResponse(error) + } +} + \ No newline at end of file diff --git a/konduktor/dashboard/frontend/app/api/namespaces/route.js b/konduktor/dashboard/frontend/app/api/namespaces/route.js new file mode 100644 index 0000000..f2934a3 --- /dev/null +++ b/konduktor/dashboard/frontend/app/api/namespaces/route.js @@ -0,0 +1,69 @@ +import { NextResponse } from 'next/server'; + +const backendUrl = process.env.NODE_ENV === 'development' + ? 'http://127.0.0.1:5001' // Development API + : 'http://backend:5001' // Production API + +// GET request for jobs +export async function GET() { + try { + // Forward request to backend API + const response = await fetch(`${backendUrl}/getNamespaces`, { + method: 'GET', + headers: { + 'Content-Type': 'application/json' + }, + }) + + const data = await response.json() + return new NextResponse(JSON.stringify(data)) + } catch (error) { + console.error("Server get error:", error); + return new NextResponse(error) + } +} + +// DELETE request for job deletion +export async function DELETE(req) { + try { + const { name, namespace } = await req.json(); // Parse the request body + + // Forward request to backend API + const response = await fetch(`${backendUrl}/deleteJob`, { + method: 'DELETE', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ name, namespace }) + }) + + const data = await response.json() + return new NextResponse(JSON.stringify(data)) + } catch (error) { + console.error("Server delete error:", error); + return new NextResponse(error) + } +} + +// PUT request for updating job priority +export async function PUT(req) { + try { + const { name, namespace, priority, priority_class_name } = await req.json(); // Parse the request body + + // Forward request to backend API + const response = await fetch(`${backendUrl}/updatePriority`, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ name, namespace, priority, priority_class_name }) + }) + + const data = await response.json() + return new NextResponse(JSON.stringify(data)) + } catch (error) { + console.error("Server update error:", error); + return new NextResponse(error) + } +} + \ No newline at end of file diff --git a/konduktor/dashboard/frontend/app/components/Grafana.jsx b/konduktor/dashboard/frontend/app/components/Grafana.jsx new file mode 100644 index 0000000..f9af54c --- /dev/null +++ b/konduktor/dashboard/frontend/app/components/Grafana.jsx @@ -0,0 +1,66 @@ +'use client' +import { useState, useRef } from 'react'; +import BarLoader from "react-spinners/BarLoader"; + + +function Grafana() { + + const [isIframeLoaded, setIsIframeLoaded] = useState(false); + const [isError, setIsError] = useState(false); + const iframeRef = useRef(null); + + const handleLoad = () => { + setIsIframeLoaded(true); + setIsError(false); + }; + + const handleError = () => { + setIsError(true); + setIsIframeLoaded(false); + }; + + return ( +
+ {!isIframeLoaded ? +
+

Loading Grafana Konduktor Dashboard

+ +

If stuck loading, check port forwarding for errors

+

kubectl port-forward svc/kube-prometheus-stack-grafana 3000:80 -n prometheus

+
+ : + null + } + {isError ? +
+

Error Loading Grafana Konduktor Dashboard

+

Check port forwarding for errors

+

kubectl port-forward svc/kube-prometheus-stack-grafana 3000:80 -n prometheus

+
+ : + null + } +