Skip to content

Commit

Permalink
enable add pod log queries (#46)
Browse files Browse the repository at this point in the history
* enable add pod log queries

* lint
  • Loading branch information
asaiacai committed Jul 22, 2024
1 parent b61fda5 commit 6b0055e
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 102 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ For cluster administrators
- [Grafana, Loki](https://grafana.com/) - Visualizations for metrics/logging solution.



## Community & Support
- [Discord](https://discord.com/invite/HQUBJSVgAP)
- [email protected]

## Contributor Guide

Format your code with
```
poetry install --with dev
bash format.sh
```
3 changes: 2 additions & 1 deletion docs/source/admin/controller.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Currently we listen for these `NVIDIA errors <https://docs.nvidia.com/deploy/xid
And we listen for errors from:

- :code:`dmesg`
- Pod/container logs

Controller Launch
=================
Expand Down Expand Up @@ -102,6 +103,6 @@ Features and Roadmap
====================
- :code:`dmesg` error detection - **Available** ✅
- In-cluster deployment of controller - **Available** ✅
- Pod log error detection - In progress 🚧
- Pod log error detection - **Available** ✅
- Health Checks (Taint Removal) - In progress 🚧
- Node Resolution Hooks (Reboot, Power Cycle) - In progress 🚧
6 changes: 3 additions & 3 deletions format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ echo "mypy ver $MYPY_VERSION"

# Run mypy
echo 'Konduktor mypy:'
mypy $(cat tests/mypy_files.txt)
poetry run mypy $(cat tests/mypy_files.txt)

# Run ruff
echo 'Konduktor ruff:'
ruff check --fix konduktor tests
ruff format konduktor tests
poetry run ruff check --fix konduktor tests
poetry run ruff format konduktor tests
74 changes: 42 additions & 32 deletions konduktor/controller/constants.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,49 @@
KONDUKTOR_CONTROLLER_VERSION = "0.1.0"

HARDWARE_XID_ERRORS = (
48,
*range(56, 59),
*range(62, 65),
*range(68, 78),
*range(79, 87),
*range(88, 90),
92,
*range(94, 106),
*range(110, 121),
*range(122, 126),
HARDWARE_XID_ERRORS = set(
(
48,
*range(56, 59),
*range(62, 65),
*range(68, 78),
*range(79, 87),
*range(88, 90),
92,
*range(94, 106),
*range(110, 121),
*range(122, 126),
)
)

# The set of all SXid error ids that are known to be harmless.
# See D.4 of https://docs.nvidia.com/datacenter/tesla/pdf/fabric-manager-user-guide.pdf
WHITELISTED_NVSWITCH_SXID_ERRORS = [
11012,
11021,
11022,
11023,
12021,
12023,
15008,
15011,
19049,
19055,
19057,
19059,
19062,
19065,
19068,
19071,
24001,
24002,
24003,
22013,
ALLOWLISTED_NVSWITCH_SXID_ERRORS = set(
(
11012,
11021,
11022,
11023,
12021,
12023,
15008,
15011,
19049,
19055,
19057,
19059,
19062,
19065,
19068,
19071,
24001,
24002,
24003,
22013,
)
)


POD_LOG_ERROR_REGEXES = [
# possibly indicates degraded nvidia-FM in bad state
r"`invalid device ordinal`",
]
9 changes: 5 additions & 4 deletions konduktor/controller/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"""

import time
from typing import Set

from konduktor import logging
from konduktor.controller import constants, parse
Expand All @@ -21,7 +22,7 @@
KONDUKTOR_CONTROLLER_LOG_POLL_SECONDS = 5
KONDUKTOR_CONTROLLER_HEALTH_CHECK_FREQ = 5

logger = logging.init_logger("konduktor.controller")
logger = logging.get_logger("konduktor.controller")


def main():
Expand All @@ -31,9 +32,9 @@ def main():
while True:
for _ in range(KONDUKTOR_CONTROLLER_HEALTH_CHECK_FREQ):
time.sleep(KONDUKTOR_CONTROLLER_LOG_POLL_SECONDS)
error_by_pod = parse.pod_errors()
error_by_dmesg = parse.dmesg_errors()
for node in error_by_pod + error_by_dmesg:
error_by_pod: Set[str] = parse.pod_errors()
error_by_dmesg: Set[str] = parse.dmesg_errors()
for node in error_by_pod | error_by_dmesg:
node_control.taint(node)

node_control.health_check()
Expand Down
2 changes: 1 addition & 1 deletion konduktor/controller/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# node taint/label
NODE_HEALTH_LABEL = "trainy.konduktor.ai/faulty"

logger = konduktor_logging.init_logger(__name__)
logger = konduktor_logging.get_logger(__name__)


def nccl_single_test(node: str, thresh: int = 400):
Expand Down
76 changes: 24 additions & 52 deletions konduktor/controller/parse.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import os
import re
from typing import Any, Dict, List
from typing import Any, Dict, List, Set

import requests

from konduktor import logging as konduktor_logging
from konduktor.controller import constants

# comma separated list of namespaces to watch for pod errors
WATCHED_NAMESPACES = os.environ.get("WATCHED_NAMESPACES", "default").split(",")
LOGS_SINCE = 10 # retrieves logs generated in the past 30 seconds
LOG_ENDPOINT = os.environ.get(
WATCHED_NAMESPACES: List[str] = os.environ.get("WATCHED_NAMESPACES", "default").split(
","
)
LOGS_SINCE: int = 10 # retrieves logs generated in the past 10 seconds
LOG_ENDPOINT: str = os.environ.get(
"LOG_ENDPOINT",
# this assumes you have access to this endpoint by
# running as a deployment within the cluster
# for local development use 'http://localhost:3100' and
# kubectl port-forward svc/loki -n loki 3100:3100
"http://loki.loki.svc.cluster.local:3100",
)
QUERY_URL = "/loki/api/v1/query_range"
# has to be either 'skypilot' or 'plain'
POD_LOG_TYPE = os.environ.get("POD_LOG_TYPE", "skypilot")
QUERY_URL: str = "/loki/api/v1/query_range"

logger = konduktor_logging.init_logger(__name__)
logger.info(f"using POD_LOG_TYPE = {POD_LOG_TYPE}")
logger = konduktor_logging.get_logger(__name__)


def _query_range(pattern: str, **label_filters) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -53,45 +53,16 @@ def _query_range(pattern: str, **label_filters) -> List[Dict[str, Any]]:
return []


class PodLogParser:
"""Base class for handling pod logs. A PodLogParser is responsible
for turning raw queries from Loki into lists of degraded pods.
"""

@classmethod
def validate_logs(self, logs):
"""Reads pod logs and returns a list of nodes that had a pod
error that indicated a GPU, CUDA, NCCL error
Returns:
List[str]: A list of node names with a detected failure from pod logs
"""
raise NotImplementedError


class SkyLogParser(PodLogParser):
@classmethod
def check_logs(self, logs):
pass


class PlainLogParser(PodLogParser):
@classmethod
def check_logs(self, logs) -> List[str]:
# TODO: implement Pod log parsing
return []


_pod_parser = {
"skypilot": SkyLogParser,
"plain": PlainLogParser,
}[POD_LOG_TYPE]


def pod_errors() -> List[str]:
global _pod_parser
def pod_errors() -> Set[str]:
logger.info("querying pod logs")
return []
bad_nodes = set()
for regex in constants.POD_LOG_ERROR_REGEXES:
for namespace in WATCHED_NAMESPACES:
log_lines = _query_range(regex, k8s_namespace_name=namespace)
for line in log_lines:
log_node = line["stream"]["k8s_node_name"]
bad_nodes.add(log_node)
return bad_nodes


def sxid_error(pattern: str, log_content: str) -> int:
Expand All @@ -109,23 +80,24 @@ def sxid_error(pattern: str, log_content: str) -> int:
return 0


def is_dmesg_error(log_content: str) -> int:
def is_dmesg_error(log_content: str) -> bool:
"""Returns (S)Xid error code, zero otherwise"""
return sxid_error(r"SXid.*?: (\d+),", log_content) or sxid_error(
error_code = sxid_error(r"SXid.*?: (\d+),", log_content) or sxid_error(
r"NVRM: Xid.*?: (\d+),", log_content
)
return error_code not in constants.ALLOWLISTED_NVSWITCH_SXID_ERRORS


def dmesg_errors() -> List[str]:
def dmesg_errors() -> Set[str]:
logger.info("checking dmesg logs")
pattern = r"`(?i)NVRM: xid` or `(?i)SXid` or `(?i)error`"
log_lines = _query_range(pattern, k8s_daemonset_name="dmesg")
bad_nodes = []
bad_nodes = set()
for line in log_lines:
log_node, log_content = line["stream"]["k8s_node_name"], line["values"][0][1]
if is_dmesg_error(log_content):
logger.info(f"node `{log_node}` has dmesg error: {log_content}")
bad_nodes.append(log_node)
bad_nodes.add(log_node)
return bad_nodes


Expand Down
2 changes: 1 addition & 1 deletion konduktor/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from konduktor import logging as konduktor_logging

logger = konduktor_logging.init_logger(__name__)
logger = konduktor_logging.get_logger(__name__)

# Timeout to use for API calls
API_TIMEOUT = 5
Expand Down
14 changes: 8 additions & 6 deletions konduktor/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ def format(self, record):
FORMATTER = NewLineFormatter(_FORMAT, datefmt=_DATE_FORMAT)


def init_logger(name: str):
def get_logger(name: str):
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(FORMATTER)
logger.addHandler(ch)
if not logger.hasHandlers(): # Check if the logger already has handlers
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(FORMATTER)
logger.addHandler(ch)
logger.propagate = False
return logger
6 changes: 6 additions & 0 deletions konduktor/manifests/controller_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ spec:
image: python:3.10
command: ["/bin/sh"]
args: ["-c", "pip install konduktor-nightly && python -m konduktor.controller.launch"]
## define what namespaces to watch for errors, comma separated.
# env:
# - name: WATCHED_NAMESPACES
# value: "default,othernamespace"
# - name: LOG_ENDPOINT
# value: "http://loki.loki.svc.cluster.local:3100"
2 changes: 1 addition & 1 deletion manifests/loki.values
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ loki:
commonConfig:
replication_factor: 1
storage:
type: 'filesystem'
type: filesystem
schemaConfig:
configs:
- from: "2024-01-01"
Expand Down

0 comments on commit 6b0055e

Please sign in to comment.