Skip to content

Commit

Permalink
Refactor Harvester python package
Browse files Browse the repository at this point in the history
  • Loading branch information
mario-winkler committed Oct 18, 2024
1 parent 0f3fc44 commit e3ba48f
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 233 deletions.
14 changes: 4 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Basis-Image verwenden
FROM python:3.11-slim

# Any python libraries that require system libraries to be installed will likely
Expand All @@ -9,13 +8,8 @@ RUN apt-get update && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

WORKDIR /code
WORKDIR /app
COPY . /app
RUN python -m pip install --no-cache-dir --upgrade /app

COPY ./worker /code/app
COPY ./README.md /code/app/README.md
COPY ./LICENSE /code/app/LICENSE
COPY ./etc /code/etc

RUN python -m pip install --no-cache-dir --upgrade /code/app

CMD ["fastapi", "run", "app/main.py", "--port", "8080"]
CMD ["fastapi", "run", "src/worker/sentinel/main.py", "--port", "8080"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<br />
<p align="center">
<a href="https://github.com/EOEPCA/registration-harvester">
<img src="images/logo.png" alt="Logo" width="80" height="80">
<img src="https://raw.githubusercontent.com/EOEPCA/resource-registration/main/docs/img/eoepca-logo.png" alt="Logo" width="80" height="80">
</a>

<h3 align="center">EOEPCA+ Registration Harvester</h3>
Expand Down
Binary file removed images/logo.png
Binary file not shown.
18 changes: 13 additions & 5 deletions worker/pyproject.toml → pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "registration-harvester-worker"
version = "0.1"
authors = []
name = "worker"
version = "2.0.0-beta1"
authors = [
{ name="Mario Winkler", email="[email protected]" },
{ name="Jonas Eberle", email="[email protected]" }
]
description = "A Flowable External Worker implementation with FastAPI acting as worker for Registration Harvester component"
readme = "README.md"
license = {file = "LICENSE"}
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
Expand All @@ -14,8 +22,8 @@ dependencies = [
"requests",
"fastapi[standard]",
"flowable.external_worker_client",
"registration-library @ git+https://github.com/EOEPCA/registration-library"
# "flowable.external-worker-client @ git+https://github.com/EOEPCA/eoepca-flowable-external-client-python"
# "registration-library @ git+https://github.com/EOEPCA/registration-library",
# "flowable.external-worker-client @ git+https://github.com/EOEPCA/eoepca-flowable-external-client-python@develop"
]

[project.optional-dependencies]
Expand Down
File renamed without changes.
File renamed without changes.
64 changes: 64 additions & 0 deletions src/worker/common/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from typing import get_type_hints, Union
from dotenv import load_dotenv

# Load configuration
# The value of a variable is the first of the values found in:
# - the environment
# - the .env file
# - the default value, if provided
load_dotenv(".env")


class WorkerConfigError(Exception):
pass


def _parse_bool(val: Union[str, bool]) -> bool: # pylint: disable=E1136
return val if type(val) is bool else val.lower() in ["true", "yes", "1"]


class WorkerConfig:
# Fields and default values
FLOWABLE_HOST: str = "https://registration-harvester-api.develop.eoepca.org/flowable-rest"
FLOWABLE_REST_USER: str = "eoepca"
FLOWABLE_REST_PASSWORD: str = "eoepca"
FLOWABLE_HOST_CACERT: str = "etc/eoepca-ca-chain.pem"
FLOWABLE_USE_TLS: bool = True

"""
Map environment variables to class fields according to these rules:
- Field won't be parsed unless it has a type annotation
- Field will be skipped if not in all caps
- Class field and environment variable name are the same
"""

def __init__(self, env):
for field in self.__annotations__:
if not field.isupper():
continue

# Raise AppConfigError if required field not supplied
default_value = getattr(self, field, None)
if default_value is None and env.get(field) is None:
raise WorkerConfigError("The {} field is required".format(field))

# Cast env var value to expected type and raise AppConfigError on failure
try:
var_type = get_type_hints(WorkerConfig)[field]
if var_type == bool:
value = _parse_bool(env.get(field, default_value))
else:
value = var_type(env.get(field, default_value))
self.__setattr__(field, value)
except ValueError:
raise WorkerConfigError(
'Unable to cast value of "{}" to type "{}" for "{}" field'.format(env[field], var_type, field)
)

def __repr__(self):
return str(self.__dict__)


# Expose Config object for app to import
Config = WorkerConfig(os.environ)
19 changes: 9 additions & 10 deletions worker/worker/log_utils.py → src/worker/common/log_utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import logging


def configure_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d [%(levelname)s] [%(thread)d] %(message)s",
handlers=[logging.StreamHandler()],
datefmt="%Y-%m-%dT%H:%M:%S"
)
datefmt="%Y-%m-%dT%H:%M:%S",
)


def log_with_job(message, job=None, log_level='info', **kwargs):
def log_with_job(message, job=None, log_level="info", **kwargs):
log_function = __get_log_function(log_level)

if job is not None:
log_function(f"[BPMN_TASK: {job.element_name}] {message}", **kwargs)
else:
log_function(message, **kwargs)

def log_with_context(message, context=None, log_level='info', **kwargs):

def log_with_context(message, context=None, log_level="info", **kwargs):
context = context if context is not None else {}
log_function = __get_log_function(log_level)

Expand All @@ -37,9 +40,5 @@ def __get_log_context_prefix(context):


def __get_log_function(log_level):
switcher = {
'info': logging.info,
'warning': logging.warning,
'error': logging.error
}
return switcher.get(log_level, logging.info)
switcher = {"info": logging.info, "warning": logging.warning, "error": logging.error}
return switcher.get(log_level, logging.info)
38 changes: 23 additions & 15 deletions worker/worker/manager.py → src/worker/common/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,33 @@
from datetime import datetime
from flowable.external_worker_client import ExternalWorkerClient
from requests.auth import HTTPBasicAuth
from worker.config import Config
from worker.log_utils import configure_logging
from worker.common.config import Config
from worker.common.log_utils import configure_logging

logger = logging.getLogger()
configure_logging()


def customize_session(session):
if Config.FLOWABLE_USE_TLS and session is not None:
session.verify = Config.FLOWABLE_HOST_CACERT
return session
else:
return None



class SubscriptionManager:
"""
"""
""" """

def __init__(self):
self.client = ExternalWorkerClient(
flowable_host=Config.FLOWABLE_HOST,
auth=HTTPBasicAuth(Config.FLOWABLE_REST_USER, Config.FLOWABLE_REST_PASSWORD,),
customize_session=customize_session)
flowable_host=Config.FLOWABLE_HOST,
auth=HTTPBasicAuth(
Config.FLOWABLE_REST_USER,
Config.FLOWABLE_REST_PASSWORD,
),
customize_session=customize_session,
)
self.subscriptions = {}

def subscriptions_info(self):
Expand All @@ -36,7 +41,7 @@ def subscriptions_info(self):
"thread": worker_data["thread"],
"jobs_completed": worker_data["jobs_completed"],
"start_time": worker_data["start_time"],
"settings": worker_data["settings"]
"settings": worker_data["settings"],
}
worker_details.append(w)
subscriptions[topic] = worker_details
Expand All @@ -55,18 +60,22 @@ def subscribe(self, topic: str, settings: dict):
"thread": subscription.thread.ident,
"jobs_completed": 0,
"start_time": now.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"settings": settings
"settings": settings,
}
logger.info(f"Successfully subscribed worker {worker_id} to topic {topic} in thread {subscription.thread.ident}")
logger.info(
f"Successfully subscribed worker {worker_id} to topic {topic} in thread {subscription.thread.ident}"
)

def unsubscribe(self, topic: str, worker_id: str):
if self.worker_exists(topic, worker_id):
subscription = self.subscriptions[topic][worker_id]["subscription"]
if subscription is not None:
subscription.unsubscribe()
logger.info(f"Successfully unsubscribed worker {worker_id} from topic {topic}")
logger.info(f"Successfully unsubscribed {worker_id} from {topic}")
else:
logger.error(f"Unable to unsubscribe worker {worker_id} from topic {topic}. Invalid subscription {str(subscription)}")
logger.error(
f"Unable to unsubscribe {worker_id} from {topic}. Invalid subscription {str(subscription)}"
)
else:
logger.error(f"Worker {worker_id} not subscribed to topic {topic}")

Expand All @@ -76,5 +85,4 @@ def unsubscribe_all(self):
self.unsubscribe(topic=topic, worker_id=worker_id)

def worker_exists(self, topic: str, worker_id: str):
return topic in self.subscriptions and worker_id in self.subscriptions[topic]

return topic in self.subscriptions and worker_id in self.subscriptions[topic]
82 changes: 82 additions & 0 deletions src/worker/common/subscription_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from flowable.external_worker_client import ExternalWorkerSubscription


class SubscriptionManager:
"""
Diese Klassen-Implementierung sorgt dafür, dass nur eine Instanz der Klasse SubscriptionManager existiert.
Wenn versucht wird, zusätzlich eine neue Instanz zu erstellen,
wird immer die zu Beginn erstellte Instanz zurückgegeben. Auf diese Weise verhält sich das SubscriptionManagerObjekt
wie eine globale Variable, kann aber Vorteile von OOP nutzen (siehe Singleton Class).
"""

# Statische Klassenvariable, die die einzige Instanz der Klasse speichert
_instance = None

# __new__ wird aufgerufen und dient zur Erstellung einer neuen Instanz der Klasse
def __new__(cls, *args, **kwargs):
# Prüfen, ob bereits eine Instanz der Klasse existiert. Überprüfung kann aber auch
# an get_instance() delegiert werden, da SubscriptionManager() nicht zum Instanziieren
# verwendet werden sollte (best practice)
if not cls._instance:
# Wenn keine Instanz existiert, wird eine neue erstellt
cls._instance = super(SubscriptionManager, cls).__new__(cls, *args, **kwargs)
# Initialisieren eines leeren Dictionaries für Abonnements
cls._instance._subscriptions = {}
# Gibt die einzige Instanz zurück (entweder eine neue oder die bereits existierende)
return cls._instance

@classmethod
def get_instance(cls):
# Gibt die Singleton-Instanz von SubscriptionManager zurück und instanziiert sie vorher, falls nötig.
if cls._instance is None:
cls._instance = cls()
return cls._instance

def add_subscription(self, topic: str, worker_id: str, subscription: ExternalWorkerSubscription, timestamp: str):
# Erstelle einen neuen Key für das Topic, falls es noch nicht existiert
if topic not in self._subscriptions:
self._subscriptions[topic] = {}

if worker_id not in self._subscriptions[topic]:
# Füge die Subscription für die gegebene worker_id unter dem jeweiligen Topic hinzu
self._subscriptions[topic][worker_id] = {"sub_obj": subscription, "jobs_done": 0, "start_time": timestamp}

def remove_subscription(self, topic: str, worker_id: str):
# Überprüfen, ob zu löschender Worker existiert und ihn dann löschen
if self.worker_exists(topic, worker_id):
# Worker aus manager löschen
subscription = self._subscriptions[topic].pop(worker_id)
subscription["sub_obj"].unsubscribe()
return True
return False

def worker_exists(self, topic: str, worker_id: str):
# checkt für entsprechenden Topic, ob ein gegebener Worker exisitert
return topic in self._subscriptions and worker_id in self._subscriptions[topic]

def _count_subscriptions(self):
# zählt die subscriptions über die subscribed worker ids (keys)
all_subs = set(key for d in self._subscriptions.values() for key in d.keys())
return len(all_subs)

def get_subscription_objects(self):
# gibt ein Array mit allen Subscription-Objects des Managers zurück
return [worker_id["sub_obj"] for topic in self._subscriptions.values() for worker_id in topic.values()]

def increment_job(self, current_worker):
# zählt "jobs_done"-Variable des jeweiligen Workerdictionaries um eins nach oben
current_topic = current_worker.split("_")[1]
if self.worker_exists(current_topic, current_worker):
self._subscriptions[current_topic][current_worker]["jobs_done"] += 1

def get_subscription_info(self):
# erstellt auf Basis der im Manager gespeicherten Worker-Infos ein Output Dictionary
result_dict = {
topic: {
worker_id: {"jobs_done": worker_data["jobs_done"], "start_time": worker_data["start_time"]}
for worker_id, worker_data in workers.items()
}
for topic, workers in self._subscriptions.items()
}
count_dict = {"worker currently working": self._count_subscriptions()}
return {**count_dict, **result_dict}
File renamed without changes.
29 changes: 17 additions & 12 deletions worker/main.py → src/worker/sentinel/main.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,46 @@
from fastapi import FastAPI
from contextlib import asynccontextmanager
from worker.config import Config
from worker.manager import SubscriptionManager
from worker.log_utils import configure_logging
from worker.common.config import Config
from worker.common.manager import SubscriptionManager
from worker.common.log_utils import configure_logging
from worker.sentinel.tasks import tasks_config

manager = SubscriptionManager()


@asynccontextmanager
async def lifespan(app: FastAPI):
configure_logging()
#logger = logging.getLogger("uvicorn.access")
#handler = logging.StreamHandler()
#handler.setFormatter(logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s"))
#logger.addHandler(handler)
# logger = logging.getLogger("uvicorn.access")
# handler = logging.StreamHandler()
# handler.setFormatter(logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s"))
# logger.addHandler(handler)
yield
# end all subs before fastapi server shutdown
manager.unsubscribe_all()


app = FastAPI(lifespan=lifespan)

subcriptions = Config.default_subscriptions
for topic in subcriptions:
manager.subscribe(topic=topic, settings=subcriptions[topic])
for topic in tasks_config:
manager.subscribe(topic=topic, settings=tasks_config[topic])


@app.get("/subscriptions")
def get_subscriptions():
return {"subscriptions": manager.subscriptions_info()}



@app.get("/config")
def config():
# show the configuration
config = {}
for field in Config.__annotations__:
value = getattr(Config, field, None)
config[field]=value
config[field] = value
return {"config": config}


@app.get("/health")
def health():
# health check
Expand Down
Loading

0 comments on commit e3ba48f

Please sign in to comment.