Skip to content

Commit

Permalink
Add cache-manager OCM-Gear extension
Browse files Browse the repository at this point in the history
The cache manager's purpose in this first iteration is to cleanup the
persistent db cache according to the supplied configuration via the scan
configuration CR. If the configured `max_cache_size_bytes` is exceeded, cache
entries are removed according to the configured weights in the `cleanup_strategy`
cache strategy property until the cache size reaches `max_buffer_bytes` again.
  • Loading branch information
8R0WNI3 committed Nov 22, 2024
1 parent b0ca47e commit 5559f5a
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 0 deletions.
216 changes: 216 additions & 0 deletions cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import asyncio
import argparse
import atexit
import datetime
import logging
import os
import sys

import sqlalchemy
import sqlalchemy.ext.asyncio as sqlasync
import sqlalchemy.sql.elements

import ci.log

import config
import ctx_util
import deliverydb
import deliverydb.model as dm
import k8s.logging
import k8s.model
import k8s.util


logger = logging.getLogger(__name__)
ci.log.configure_default_logging()
k8s.logging.configure_kubernetes_logging()


def deserialise_cache_manager_cfg(
cfg_name: str,
namespace: str,
kubernetes_api: k8s.util.KubernetesApi,
) -> config.CacheManagerConfig:
scan_cfg_crd = kubernetes_api.custom_kubernetes_api.get_namespaced_custom_object(
group=k8s.model.ScanConfigurationCrd.DOMAIN,
version=k8s.model.ScanConfigurationCrd.VERSION,
plural=k8s.model.ScanConfigurationCrd.PLURAL_NAME,
namespace=namespace,
name=cfg_name,
)
spec = scan_cfg_crd.get('spec', dict())

cache_manager_cfg = config.deserialise_cache_manager_config(spec_config=spec)

if not cache_manager_cfg:
logger.warning(
f'no cache manager configuration for config elem {cfg_name} set, '
'job is not able to process and will terminate'
)
sys.exit(0)

return cache_manager_cfg


def bytes_to_str(
bytes: int,
ndigits: int=2,
) -> str:
return f'{round(bytes / 1000000, ndigits)}Mb'


async def prune_cache(
db_session: sqlasync.session.AsyncSession,
cfg: config.CacheManagerConfig,
chunk_size: int=50,
):
size_query = sqlalchemy.select(sqlalchemy.func.sum(dm.DBCache.size))
size_result = await db_session.execute(size_query)
if not (size := size_result.one()[0]):
logger.info('Delivery-db cache does not have any entries yet, skipping cleanup...')
return

logger.info(f'Current cache size: {bytes_to_str(size)}')
logger.info(f'Max cache size: {bytes_to_str(cfg.max_cache_size_bytes)}')

if size < cfg.max_cache_size_bytes:
current_utilisation = round(size * 100 / cfg.max_cache_size_bytes, ndigits=2)
logger.info(f'Skipped db cache cleanup ({current_utilisation=}%)')
return

now = datetime.datetime.now(tz=datetime.timezone.utc)

def interval_min(column: sqlalchemy.DateTime) -> sqlalchemy.sql.elements.BinaryExpression:
return sqlalchemy.func.coalesce(sqlalchemy.extract('epoch', (now - column)) / 60, 0)

# Multiply with the negative weight to order in descending order (i.e. entry with greatest
# weight first). Note: Using sqlalchemy's `.desc()` function only works when ordering by columns
# without any modifications of the same.
db_statement = sqlalchemy.select(dm.DBCache).order_by(
interval_min(dm.DBCache.creation_date) * -cfg.cache_pruning_weights.creation_date_weight
+ interval_min(dm.DBCache.last_update) * -cfg.cache_pruning_weights.last_update_weight
+ interval_min(dm.DBCache.delete_after) * -cfg.cache_pruning_weights.delete_after_weight
+ interval_min(dm.DBCache.keep_until) * -cfg.cache_pruning_weights.keep_until_weight
+ interval_min(dm.DBCache.last_read) * -cfg.cache_pruning_weights.last_read_weight
+ dm.DBCache.read_count * -cfg.cache_pruning_weights.read_count_weight
+ dm.DBCache.revision * -cfg.cache_pruning_weights.revision_weight
+ dm.DBCache.costs * -cfg.cache_pruning_weights.costs_weight
+ dm.DBCache.size * -cfg.cache_pruning_weights.size_weight
)
db_stream = await db_session.stream(db_statement)

prunable_size = size - cfg.min_pruning_bytes
logger.info(
f'Will prune cache (prunable size {bytes_to_str(prunable_size)}) until '
f'{bytes_to_str(cfg.min_pruning_bytes)} are available again.'
)

try:
async for partition in db_stream.partitions(size=chunk_size):
for row in partition:
if prunable_size <= 0:
break # deleted enough cache entries
entry = row[0]
prunable_size -= entry.size
await db_session.delete(entry)
else:
continue
break # deleted enough cache entries

await db_session.commit()
logger.info(
f'Pruned {bytes_to_str(size - cfg.min_pruning_bytes - prunable_size)} from cache'
)
except Exception:
await db_session.rollback()
raise


def parse_args():
parser = argparse.ArgumentParser()

parser.add_argument(
'--k8s-cfg-name',
help='specify kubernetes cluster to interact with',
default=os.environ.get('K8S_CFG_NAME'),
)
parser.add_argument(
'--kubeconfig',
help='''
specify kubernetes cluster to interact with extensions (and logs); if both
`k8s-cfg-name` and `kubeconfig` are set, `k8s-cfg-name` takes precedence
''',
)
parser.add_argument(
'--k8s-namespace',
help='specify kubernetes cluster namespace to interact with',
default=os.environ.get('K8S_TARGET_NAMESPACE'),
)
parser.add_argument(
'--cfg-name',
help='specify the context the process should run in',
default=os.environ.get('CFG_NAME'),
)

parsed_arguments = parser.parse_args()

if not parsed_arguments.k8s_namespace:
raise ValueError(
'k8s namespace must be set, either via argument "k8s-namespace" '
'or via environment variable "K8S_TARGET_NAMESPACE"'
)

if not parsed_arguments.cfg_name:
raise ValueError(
'name of the to-be-used scan configuration must be set, either via '
'argument "--cfg-name" or via environment variable "CFG_NAME"'
)

return parsed_arguments


async def main():
parsed_arguments = parse_args()
cfg_name = parsed_arguments.cfg_name
namespace = parsed_arguments.k8s_namespace

cfg_factory = ctx_util.cfg_factory()

if k8s_cfg_name := parsed_arguments.k8s_cfg_name:
kubernetes_cfg = cfg_factory.kubernetes(k8s_cfg_name)
kubernetes_api = k8s.util.kubernetes_api(kubernetes_cfg=kubernetes_cfg)
else:
kubernetes_api = k8s.util.kubernetes_api(kubeconfig_path=parsed_arguments.kubeconfig)

k8s.logging.init_logging_thread(
service=config.Services.CACHE_MANAGER,
namespace=namespace,
kubernetes_api=kubernetes_api,
)
atexit.register(
k8s.logging.log_to_crd,
service=config.Services.CACHE_MANAGER,
namespace=namespace,
kubernetes_api=kubernetes_api,
)

cache_manager_cfg = deserialise_cache_manager_cfg(
cfg_name=cfg_name,
namespace=namespace,
kubernetes_api=kubernetes_api,
)

db_url = cfg_factory.delivery_db(cache_manager_cfg.delivery_db_cfg_name).as_url()

db_session = await deliverydb.sqlalchemy_session(db_url)
try:
await prune_cache(
db_session=db_session,
cfg=cache_manager_cfg,
)
finally:
await db_session.close()


if __name__ == '__main__':
asyncio.run(main())
104 changes: 104 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Services(enum.StrEnum):
ARTEFACT_ENUMERATOR = 'artefactEnumerator'
BACKLOG_CONTROLLER = 'backlogController'
BDBA = 'bdba'
CACHE_MANAGER = 'cacheManager'
CLAMAV = 'clamav'
DELIVERY_DB_BACKUP = 'deliveryDbBackup'
ISSUE_REPLICATOR = 'issueReplicator'
Expand Down Expand Up @@ -219,12 +220,50 @@ class IssueReplicatorConfig:
milestone_cfg: gcmi.MilestoneConfiguration


@dataclasses.dataclass(frozen=True)
class CachePruningWeights:
'''
The individual weights determine how much the respective values are being considered when
determining those cache entries which should be deleted next (in case the max cache size is
reached). The greater the weight, the more likely an entry will be considered for deletion.
Negative values may be also used to express a property which rather determines that an entry
should not be deleted. 0 means the property does not affect the priority for the next deletion.
'''
creation_date_weight: float = 0
last_update_weight: float = 0
delete_after_weight: float = 0
keep_until_weight: float = 0
last_read_weight: float = 0
read_count_weight: float = 0
revision_weight: float = 0
costs_weight: float = 0
size_weight: float = 0


@dataclasses.dataclass(frozen=True)
class CacheManagerConfig:
'''
:param str delivery_db_cfg_name:
name of config element of the delivery database
:param int max_cache_size_bytes
:param int min_pruning_bytes:
If `max_cache_size_bytes` is reached, existing cache entries will be removed according to
the `cache_pruning_weights` until `min_pruning_bytes` is available again.
:param CachePruningWeights cache_pruning_weights
'''
delivery_db_cfg_name: str
max_cache_size_bytes: int
min_pruning_bytes: int
cache_pruning_weights: CachePruningWeights


@dataclasses.dataclass(frozen=True)
class ScanConfiguration:
artefact_enumerator_config: ArtefactEnumeratorConfig
bdba_config: BDBAConfig
issue_replicator_config: IssueReplicatorConfig
clamav_config: ClamAVConfig
cache_manager_config: CacheManagerConfig


def deserialise_component_config(
Expand Down Expand Up @@ -903,6 +942,63 @@ def deserialise_issue_replicator_config(
)


def deserialise_cache_manager_config(
spec_config: dict,
) -> CacheManagerConfig | None:
cache_manager_config = spec_config.get('cacheManager')

if not cache_manager_config:
return None

delivery_db_cfg_name = deserialise_config_property(
config=cache_manager_config,
property_key='delivery_db_cfg_name',
)

max_cache_size_bytes = deserialise_config_property(
config=cache_manager_config,
property_key='max_cache_size_bytes',
default_value=1000000000, # 1Gb
)

min_pruning_bytes = deserialise_config_property(
config=cache_manager_config,
property_key='min_pruning_bytes',
default_value=100000000, # 100Mb
)

cache_pruning_weights_raw = deserialise_config_property(
config=cache_manager_config,
property_key='cache_pruning_weights',
default_value=dict(),
)

if cache_pruning_weights_raw:
cache_pruning_weights = dacite.from_dict(
data_class=CachePruningWeights,
data=cache_pruning_weights_raw,
)
else:
cache_pruning_weights = CachePruningWeights(
creation_date_weight=0,
last_update_weight=0,
delete_after_weight=1.5, # deletion (i.e. stale) flag -> delete
keep_until_weight=1, # keep until has passed -> delete
last_read_weight=1, # long time no read -> delete
read_count_weight=-10, # has many reads -> rather not delete
revision_weight=0,
costs_weight=-10, # is expensive to re-calculate -> rather not delete
size_weight=0,
)

return CacheManagerConfig(
delivery_db_cfg_name=delivery_db_cfg_name,
max_cache_size_bytes=max_cache_size_bytes,
min_pruning_bytes=min_pruning_bytes,
cache_pruning_weights=cache_pruning_weights,
)


def deserialise_scan_configuration(
spec_config: dict,
included_services: tuple[Services],
Expand Down Expand Up @@ -935,11 +1031,19 @@ def deserialise_scan_configuration(
else:
clamav_config = None

if Services.CACHE_MANAGER in included_services:
cache_manager_config = deserialise_cache_manager_config(
spec_config=spec_config,
)
else:
cache_manager_config = None

return ScanConfiguration(
artefact_enumerator_config=artefact_enumerator_config,
bdba_config=bdba_config,
issue_replicator_config=issue_replicator_config,
clamav_config=clamav_config,
cache_manager_config=cache_manager_config,
)


Expand Down

0 comments on commit 5559f5a

Please sign in to comment.