Skip to content

Commit

Permalink
dashboard logger
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhayame committed Nov 7, 2024
1 parent f124883 commit b977951
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 41 deletions.
39 changes: 27 additions & 12 deletions konduktor/dashboard/backend/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
import os
from typing import Any, Dict, List

from flask import Flask, jsonify, request
Expand All @@ -9,6 +11,16 @@

from .sockets import socketio

# Configure the logger
logging.basicConfig(
level=logging.DEBUG
if os.environ.get("KONDUKTOR_DEBUG") in [None, "1"]
else logging.INFO,
format="%(asctime)s - %(name)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
logger = logging.getLogger(__name__)

app = Flask(__name__)

# Ensure CORS is configured correctly
Expand Down Expand Up @@ -86,10 +98,10 @@ def list_all_workloads(namespace="default"):
plural="workloads",
)
for workload in workloads.get("items", []):
print(f"Workload Name: {workload['metadata']['name']}")
logger.debug(f"Workload Name: {workload['metadata']['name']}")
except ApiException as e:
print(f"Failed to list workloads: {e}")
logger.debug(f"Failed to list workloads: {e}")
# for testing: prints jobs in native kubernetes kueue
Expand All @@ -98,14 +110,17 @@ def list_all_jobs():
jobs = batch_client.list_job_for_all_namespaces(watch=False) # Get all jobs
if not jobs.items:
print("No jobs found.")
logger.debug("No jobs found.")
else:
print("Jobs found:")
logger.debug("Jobs found:")
for job in jobs.items:
print(f"Name: {job.metadata.name}, Namespace: {job.metadata.namespace}")
logger.debug(
f"Name: {job.metadata.name},
Namespace: {job.metadata.namespace}"
)
except ApiException as e:
print(f"Failed to list jobs: {e}")
logger.debug(f"Failed to list jobs: {e}")
"""


Expand Down Expand Up @@ -149,26 +164,26 @@ def delete_job():
name=name,
body=delete_options,
)
print(f"Kueue Workload '{name}' deleted successfully.")
logger.debug(f"Kueue Workload '{name}' deleted successfully.")

"""
list_all_workloads()
list_all_jobs()
print(f"Native Kubernetes Job Name: {native_job_name}")
logger.debug(f"Native Kubernetes Job Name: {native_job_name}")
batch_client.delete_namespaced_job(
name=native_job_name,
namespace=namespace,
body=delete_options
)
print(f"Native Kubernetes Job {native_job_name} deleted successfully.")
logger.debug(f"Native Kubernetes Job {native_job_name} deleted successfully.")
"""

return jsonify({"success": True, "status": 200})

except ApiException as e:
print(f"Exception: {e}")
logger.debug(f"Exception: {e}")
return jsonify({"error": str(e)}), e.status


Expand All @@ -187,7 +202,7 @@ def get_namespaces():
namespace_list = [ns.metadata.name for ns in namespaces.items]
return jsonify(namespace_list)
except ApiException as e:
print(f"Exception: {e}")
logger.debug(f"Exception: {e}")
return jsonify({"error": str(e)}), e.status


Expand Down Expand Up @@ -220,7 +235,7 @@ def update_priority():
return jsonify({"success": True, "status": 200})

except ApiException as e:
print(f"Exception: {e}")
logger.debug(f"Exception: {e}")
return jsonify({"error": str(e)}), e.status


Expand Down
31 changes: 15 additions & 16 deletions konduktor/dashboard/backend/sockets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import logging
import os
import time
from typing import Dict, List

Expand All @@ -8,15 +10,18 @@
# SocketIO configuration
socketio = SocketIO(cors_allowed_origins="*", ping_interval=25, ping_timeout=60)

logger = logging.getLogger(__name__)

# Global variables
CLIENT_CONNECTED = False
FIRST_RUN = True
BACKGROUND_TASK_RUNNING = False
LOG_CHECKPOINT_TIME = None
SELECTED_NAMESPACES: list[str] = []
PROD_LOGS_URL = "http://loki.loki.svc.cluster.local:3100/loki/api/v1/query_range"
DEV_LOGS_URL = "http://localhost:3100/loki/api/v1/query_range"

# "http://loki.loki.svc.cluster.local:3100/loki/api/v1/query_range" for prod
# "http://localhost:3100/loki/api/v1/query_range" for local
LOGS_URL = os.environ.get("LOGS_URL", "http://localhost:3100/loki/api/v1/query_range")


def format_log_entry(entry: List[str], namespace: str) -> Dict[str, str]:
Expand Down Expand Up @@ -44,18 +49,18 @@ def format_log_entry(entry: List[str], namespace: str) -> Dict[str, str]:
return formatted_log


def get_logs(FIRST_RUN: bool, dev: bool) -> List[Dict[str, str]]:
def get_logs(FIRST_RUN: bool) -> List[Dict[str, str]]:
global LOG_CHECKPOINT_TIME

# print(f'SELECTED NAMESPACES (GET_LOGS): {SELECTED_NAMESPACES}')
logger.debug(f"SELECTED NAMESPACES (GET_LOGS): {SELECTED_NAMESPACES}")

# Use the selected namespaces in the query
namespace_filter = (
"|".join(SELECTED_NAMESPACES) if SELECTED_NAMESPACES else "default"
)
query = f'{{k8s_namespace_name=~"{namespace_filter}"}}'

# print(f'QUERY (GET_LOGS): {query}')
logger.debug(f"QUERY (GET_LOGS): {query}")

if FIRST_RUN:
# Calculate how many nanoseconds to look back when first time looking at logs
Expand All @@ -71,7 +76,7 @@ def get_logs(FIRST_RUN: bool, dev: bool) -> List[Dict[str, str]]:

params = {"query": query, "start": start_time, "limit": "300"}

url = DEV_LOGS_URL if dev else PROD_LOGS_URL
url = LOGS_URL
response = requests.get(url, params=params)
formatted_logs = []

Expand All @@ -86,7 +91,6 @@ def get_logs(FIRST_RUN: bool, dev: bool) -> List[Dict[str, str]]:
for value in row["values"]:
last = max(int(value[0]), last)
formatted_logs.append(format_log_entry(value, namespace))
# print('status 200 getting logs')

if formatted_logs:
# sort because sometimes loki API is wrong and logs are out of order
Expand All @@ -97,20 +101,15 @@ def get_logs(FIRST_RUN: bool, dev: bool) -> List[Dict[str, str]]:
)
LOG_CHECKPOINT_TIME = last

# print(f'formatted logs length: {len(formatted_logs)}')
logger.debug(f"formatted logs length: {len(formatted_logs)}")

return formatted_logs


def send_logs():
global CLIENT_CONNECTED, FIRST_RUN, BACKGROUND_TASK_RUNNING
while CLIENT_CONNECTED:
logs = []
try:
# Attempt to get logs from the production setup; if fails, switch to dev
logs = get_logs(FIRST_RUN, False)
except Exception:
logs = get_logs(FIRST_RUN, True)
logs = get_logs(FIRST_RUN)

FIRST_RUN = False # After the first successful fetch, set to False
if logs:
Expand All @@ -127,7 +126,7 @@ def handle_connect():
global CLIENT_CONNECTED, FIRST_RUN, BACKGROUND_TASK_RUNNING
CLIENT_CONNECTED = True
FIRST_RUN = True
print("Client connected")
logger.debug("Client connected")

# Start the background task only if it's not already running
if not BACKGROUND_TASK_RUNNING:
Expand All @@ -146,4 +145,4 @@ def handle_disconnect():
global CLIENT_CONNECTED, FIRST_RUN
CLIENT_CONNECTED = False
FIRST_RUN = True
print("Client disconnected")
logger.debug("Client disconnected")
10 changes: 0 additions & 10 deletions konduktor/dashboard/frontend/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ app.prepare().then(() => {
let flaskSocket;

io.on('connection', (clientSocket) => {
console.log('Client connected to Next.js server');
activeClients += 1;

console.log(`activeClients on connect: ${activeClients}`)

// Establish a connection to Flask only if this is the first client
if (activeClients === 1) {
const backendUrl = process.env.NODE_ENV === 'development'
Expand All @@ -42,28 +39,21 @@ app.prepare().then(() => {

flaskSocket.on('log_data', (data) => {
io.emit('log_data', data); // Broadcast to all connected clients
console.log(`activeClients during log_data: ${activeClients}`)
});

// Receive updated namespaces from the client (forward to Flask)
clientSocket.on('update_namespaces', (namespaces) => {
flaskSocket.emit('update_namespaces', namespaces); // Send to Flask
console.log(`activeClients during update_namespaces: ${activeClients}`)
});

console.log('Connected to Flask backend');
}

clientSocket.on('disconnect', () => {
activeClients -= 1;
console.log('Client disconnected from Next.js server');
console.log(`activeClients after disconnect: ${activeClients}`)

// Disconnect from Flask when no clients are connected
if (activeClients === 0 && flaskSocket) {
flaskSocket.disconnect();
flaskSocket = null;
console.log('Disconnected from Flask backend');
}
});
});
Expand Down
12 changes: 10 additions & 2 deletions konduktor/manifests/dashboard_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,19 @@ spec:
serviceAccountName: konduktor-service-account
containers:
- name: backend
image: ryanattrainy/konduktor-dashboard:backend1.121
image: ryanattrainy/konduktor-dashboard:backend1.13
imagePullPolicy: Always
ports:
- containerPort: 5001
command: ["/app/startup.sh"]
env:
- name: KONDUKTOR_DEBUG
value: "0" # Set debug mode: 1 (DEBUG) or 0
- name: LOGS_URL # Set loki logs URL
# "http://loki.loki.svc.cluster.local:3100/loki/api/v1/query_range" for prod
# "http://localhost:3100/loki/api/v1/query_range" for local
value: "http://loki.loki.svc.cluster.local:3100/loki/api/v1/query_range"

---
apiVersion: v1
kind: Service
Expand Down Expand Up @@ -105,7 +113,7 @@ spec:
spec:
containers:
- name: frontend
image: ryanattrainy/konduktor-dashboard:frontend1.12
image: ryanattrainy/konduktor-dashboard:frontend1.13
imagePullPolicy: Always
ports:
- containerPort: 5173
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ types-requests = "^2.32.0.20240622"
Flask = "^3.0.3"
flask-cors = "^5.0.0"
flask-socketio = "^5.4.1"
gunicorn = "^23.0.0"

[tool.ruff]
line-length = 88
Expand Down

0 comments on commit b977951

Please sign in to comment.