Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
feat: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kafkaphoenix committed Dec 5, 2024
0 parents commit 1047a62
Show file tree
Hide file tree
Showing 88 changed files with 12,919 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .ci/tools/bump_version.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Set version
export VERSION=${GITHUB_REF_NAME#"v"}
ls
sed -i -E "s/^(version =.*)/version = \"$VERSION\"/g" pyproject.toml
echo -e "Getting tag to publish:\n $(cat pyproject.toml | grep "version =")"

26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
.idea
go.work
go.work.sum

.vscode/
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace

.coverage
coverage-unit.out
coverage-integration.out

build
.venv
dist

# python
__pycache__/
.pytest_cache
.mypy_cache

# OS
.DS_Store
31 changes: 31 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
.DEFAULT_GOAL := help

# AutoDoc
# -------------------------------------------------------------------------
.PHONY: help
help: ## This help
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
.DEFAULT_GOAL := help

.PHONY: protos
protos: ## Generate proto files
protoc -I proto --python_out=sdk/sdk --mypy_out=sdk/sdk proto/kai_nats_msg.proto

.PHONY: tests
tests: ## Run unit tests
poetry run pytest sdk runner --cov --cov-report=term-missing

.PHONY: tidy
tidy: ## Run black, isort and codespell
poetry run black sdk runner \
&& poetry run isort sdk runner \
&& poetry run codespell sdk runner -I dictionary.txt \
--skip="*.git,*.json,kai_nats_msg_pb2.py,.venv,*.lock,__init__.py" \

.PHONY: mypy
mypy: ## Run mypy
poetry run mypy --pretty --warn-redundant-casts --warn-unused-ignores --warn-unreachable --disallow-untyped-decorators --disallow-incomplete-defs --disallow-untyped-calls --check-untyped-defs --disallow-incomplete-defs --python-version 3.11 sdk runner --config-file pyproject.toml

.PHONY: update-poetries
update-poetries: ## Update all dependencies
./scripts/update_poetries.sh
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# KAI Python SDK

KAI SDK's implementation in Python.


## How it works

### SDK

The SDK can be used in a KAI workflows for working with Python code

Once the Python SDK is deployed, it connects to NATS and it subscribes permanently to an input subject. Each node knows to which subject it has to subscribe and also to which subject it has to send messages, since the K8s manager tells it with environment variables. It is important to note that the nodes use a queue subscription, which allows load balancing of messages when there are multiple replicas when using Task and Exit runners but not with Trigger runners

When a new message is published in the input subject of a node, it passes it down to a handler function, along with a context object formed by variables and useful methods for processing data. This handler is the solution implemented by the client and given in the krt file generated. Once executed, the result will be taken and transformed into a NATS message that will then be published to the next node's subject (indicated by an environment variable). After that, the node ACKs the message manually

## Development

- Install the dependencies with `poetry install --group dev`

If you don't have poetry installed (you must have python 3.11 installed in your system):

`python3 -m pip install --user poetry`

## Tests

Execute the test running in the root folder:

``` sh
make pytest
```

## Linter

Execute the linter running in the root folder:

``` sh
make pytidy
```
7 changes: 7 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
runner:
logger:
level: INFO # TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL
output_paths:
- stdout
error_output_paths:
- stderr
Empty file added dictionary.txt
Empty file.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "py-sdk"
}
1,840 changes: 1,840 additions & 0 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions poetry.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[virtualenvs]
in-project = true
create = true
path = ".venv"
18 changes: 18 additions & 0 deletions proto/kai_nats_msg.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";
import "google/protobuf/any.proto";

option go_package = "./kai";

enum MessageType {
UNDEFINED = 0;
OK = 1;
ERROR = 2;
}

message KaiNatsMessage {
string request_id = 1;
google.protobuf.Any payload = 2;
string error = 3;
string from_node = 4;
MessageType message_type = 5;
}
145 changes: 145 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
[tool.poetry]
name = "kai-runner"
version = "1.0.0"
description = "KAI SDK's implementation in Python"
authors = ["Intelygenz - KAI Team"]
maintainers = ["Intelygenz - KAI Team"]
keywords = ["python", "runner", "kai"]
packages = [
{include = "runner"},
{include = "sdk", from = "sdk"},
]

[tool.poetry.dependencies]
python = "~3.13"
nats-py = "2.6.0"
protobuf = ">=4.23.0, <=4.23.4"
loguru = "0.7.0"
vyper-config = "1.1.1"
minio = "7.2.0"
python-keycloak = "3.7.0"
redis = {extras = ["hiredis"], version = "5.0.1"}
semver = "3.0.2"
opentelemetry-sdk = "1.23.0"
opentelemetry-exporter-otlp-proto-grpc = "1.23.0"
grpcio = "1.59.3"

[tool.poetry.group.dev.dependencies]
black = "23.7.0"
pytest = "7.4.0"
pytest-asyncio = "0.21.1"
pytest-cov = "4.1.0"
isort = "5.12.0"
coverage = "7.2.7"
codespell = "2.2.5"
ipdb = "0.13.13"
pytest-mock = "3.11.1"
mock = "5.1.0"
types-mock = "5.1.0.1"
mypy = "1.5.1"
types-protobuf = "4.24.0.1"
loguru-mypy = "0.0.4"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.black]
target-version = ["py311"]
line-length = 120
exclude = '/(.*_pb2.*\.py)|(.venv)|(dist)|(__init__.py)/'

[tool.isort]
py_version=311
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
ensure_newline_before_comments = true
line_length = 120
extend_skip = [
"kai_nats_msg_pb2.py",
".venv",
"dist",
"__init__.py",
]

[tool.pytest.ini_options]
addopts = [
"-vvv",
"-s",
"-W ignore::DeprecationWarning",
]
testpaths = [
"sdk/sdk/**/*_test.py",
"runner/**/*_test.py",
]
python_files = "*_test.py"
python_classes = "Test*"
python_functions = "test_*"
asyncio_mode = "auto"

[tool.coverage.run]
branch = true
omit = [
"*_test.py",
"kai_nats_msg_pb2.py",
".venv",
"dist",
"/**/*__init__.py",
]
relative_files = true

[tool.coverage.report]
# Regexes for lines to exclude from consideration
exclude_lines = [
# Skip any pass lines such as may be used for @abstractmethod
"pass",

# Have to re-enable the standard pragma
"pragma: no cover",

# Do not complain about missing debug-only code:
"def __repr__",
"if self.debug",

# Do not complain if non-runnable code is not run:
"if 0:",
"if __name__ == .__main__.:",

# Do not complain about logs not being tested
"self.logger.*",
"logger.*",

# Do not complain about random uuids
"uuid.uuid4()",

# Do not complain about type checking
"if TYPE_CHECKING:",

# Do not complain about signals
"self.loop.add_signal_handler",
]

ignore_errors = false
fail_under = 80
precision = 2
show_missing = true

[tool.codespell]
skip = [
".venv",
"kai_nats_msg_pb2.py",
"dist",
"__init__.py",
]

[tool.mypy]
python_version = 3.11
exclude = [
".venv",
"kai_nats_msg_pb2.py",
"dist",
"__init__.py",
]
ignore_missing_imports = true
Empty file added runner/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions runner/common/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

from typing import Awaitable, Callable

from google.protobuf.any_pb2 import Any
from loguru import logger
from vyper import v

from sdk.centralized_config.centralized_config import CentralizedConfig
from sdk.kai_sdk import KaiSDK

Initializer = Finalizer = Task = Callable[[KaiSDK], Awaitable[None] | None]
Handler = Callable[[KaiSDK, Any], Awaitable[None] | None]


async def initialize_process_configuration(sdk: KaiSDK) -> None:
values = v.get("centralized_configuration.process.config")

assert sdk.logger is not None
origin = logger._core.extra["origin"]
logger_ = sdk.logger.bind(context=f"{origin}.[CONFIG INITIALIZER]")
logger_.info("initializing process configuration")

if isinstance(values, dict):
for key, value in values.items():
try:
assert isinstance(sdk.centralized_config, CentralizedConfig)
await sdk.centralized_config.set_config(key, value)
except Exception as e:
logger.error(f"error initializing process configuration with key {key}: {e}")

logger_.info("process configuration initialized")
74 changes: 74 additions & 0 deletions runner/common/common_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from unittest.mock import AsyncMock, Mock, call, patch

import pytest
from nats.aio.client import Client as NatsClient
from nats.js.client import JetStreamContext
from nats.js.kv import KeyValue
from vyper import v

from runner.common.common import initialize_process_configuration
from sdk.centralized_config.centralized_config import CentralizedConfig
from sdk.kai_nats_msg_pb2 import KaiNatsMessage
from sdk.kai_sdk import KaiSDK
from sdk.model_registry.model_registry import ModelRegistry
from sdk.persistent_storage.persistent_storage import PersistentStorage
from sdk.predictions.store import Predictions


@pytest.fixture(scope="function")
def m_centralized_config() -> CentralizedConfig:
js = Mock(spec=JetStreamContext)
global_kv = AsyncMock(spec=KeyValue)
product_kv = AsyncMock(spec=KeyValue)
workflow_kv = AsyncMock(spec=KeyValue)
process_kv = AsyncMock(spec=KeyValue)

centralized_config = CentralizedConfig(js=js)
centralized_config.global_kv = global_kv
centralized_config.product_kv = product_kv
centralized_config.workflow_kv = workflow_kv
centralized_config.process_kv = process_kv

return centralized_config


@pytest.fixture(scope="function")
@patch.object(Predictions, "__new__", return_value=Mock(spec=Predictions))
@patch.object(PersistentStorage, "__new__", return_value=Mock(spec=PersistentStorage))
@patch.object(ModelRegistry, "__new__", return_value=Mock(spec=ModelRegistry))
async def m_sdk(
_: ModelRegistry,
__: PersistentStorage,
___: ModelRegistry,
m_centralized_config: CentralizedConfig,
) -> KaiSDK:
nc = AsyncMock(spec=NatsClient)
js = Mock(spec=JetStreamContext)
request_msg = KaiNatsMessage()

sdk = KaiSDK(nc=nc, js=js)
sdk.set_request_msg(request_msg)
sdk.centralized_config = m_centralized_config

return sdk


async def test_initialize_process_configuration_ok(m_sdk):
v.set("centralized_configuration.process.config", {"test_key": "test_value"})

await initialize_process_configuration(m_sdk)

assert m_sdk.centralized_config is not None
assert m_sdk.centralized_config.process_kv.put.call_count == 1
assert m_sdk.centralized_config.process_kv.put.call_args == call("test_key", b"test_value")


async def test_initialize_process_configuration_ko(m_sdk):
v.set("centralized_configuration.process.config", {"test_key": "test_value"})
m_sdk.centralized_config.process_kv.put.side_effect = Exception("test exception")

await initialize_process_configuration(m_sdk)

assert m_sdk.centralized_config is not None
assert m_sdk.centralized_config.process_kv.put.call_count == 1
assert m_sdk.centralized_config.process_kv.put.call_args == call("test_key", b"test_value")
Loading

0 comments on commit 1047a62

Please sign in to comment.