Skip to content

Commit

Permalink
Add Support for Time-Based Log Retention in Loki (#377)
Browse files Browse the repository at this point in the history
  • Loading branch information
IbraAoad authored Apr 22, 2024
1 parent 2a8b0ce commit 58eb250
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 2 deletions.
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ options:
Ref: https://grafana.com/docs/loki/latest/configure/#limits_config
type: int
default: 15
retention-period:
description: |
Sets a global retention period for log streams in Loki. A value of 0 disables retention (default).
Minimum retention period is 1d.
Specify the period in days. For example, to set a 48-day retention period, use `48`.
Specifying retention periods for individual streams is not currently supported.
type: int
default: 0
13 changes: 13 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class CompositeStatus(TypedDict):
k8s_patch: Tuple[str, str]
config: Tuple[str, str]
rules: Tuple[str, str]
retention: Tuple[str, str]


def to_tuple(status: StatusBase) -> Tuple[str, str]:
Expand Down Expand Up @@ -123,6 +124,7 @@ def __init__(self, *args):
k8s_patch=to_tuple(ActiveStatus()),
config=to_tuple(ActiveStatus()),
rules=to_tuple(ActiveStatus()),
retention=to_tuple(ActiveStatus()),
)
)

Expand Down Expand Up @@ -387,10 +389,20 @@ def _configure(self): # noqa: C901
# "can_connect" is a racy check, so we do it once here (instead of in collect-status)
if self._container.can_connect():
self._stored.status["config"] = to_tuple(ActiveStatus())

# The config validity check does not return on error because if a lifecycle event
# comes in after a config change, we still want Loki to continue to function even
# with the invalid config.
else:
self._stored.status["config"] = to_tuple(MaintenanceStatus("Configuring Loki"))
return

if 0 > int(self.config["retention-period"]):
self._stored.status["retention"] = to_tuple(
BlockedStatus("Please provide a non-negative retention duration")
)
return

current_layer = self._container.get_plan()
new_layer = self._build_pebble_layer
restart = current_layer.services != new_layer.services
Expand All @@ -401,6 +413,7 @@ def _configure(self): # noqa: C901
external_url=self._external_url,
ingestion_rate_mb=int(self.config["ingestion-rate-mb"]),
ingestion_burst_size_mb=int(self.config["ingestion-burst-size-mb"]),
retention_period=int(self.config["retention-period"]),
http_tls=(self.server_cert.server_cert is not None),
).build()

Expand Down
26 changes: 24 additions & 2 deletions src/config_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

LOKI_DIR = "/loki"
CHUNKS_DIR = os.path.join(LOKI_DIR, "chunks")
COMPACTOR_DIR = os.path.join(LOKI_DIR, "compactor")
BOLTDB_DIR = os.path.join(LOKI_DIR, "boltdb-shipper-active")
BOLTDB_CACHE_DIR = os.path.join(LOKI_DIR, "boltdb-shipper-cache")
RULES_DIR = os.path.join(LOKI_DIR, "rules")


Expand All @@ -40,6 +42,7 @@ def __init__(
external_url: str,
ingestion_rate_mb: int,
ingestion_burst_size_mb: int,
retention_period: int,
http_tls: bool = False,
):
"""Init method."""
Expand All @@ -49,6 +52,7 @@ def __init__(
self.ingestion_rate_mb = ingestion_rate_mb
self.ingestion_burst_size_mb = ingestion_burst_size_mb
self.http_tls = http_tls
self.retention_period = retention_period

def build(self) -> dict:
"""Build Loki config dictionary."""
Expand All @@ -66,6 +70,7 @@ def build(self) -> dict:
"chunk_store_config": self._chunk_store_config,
"frontend": self._frontend,
"querier": self._querier,
"compactor": self._compactor,
}

@property
Expand Down Expand Up @@ -109,7 +114,7 @@ def _schema_config(self) -> dict:
"index": {"period": "24h", "prefix": "index_"},
"object_store": "filesystem",
"schema": "v11",
"store": "boltdb",
"store": "boltdb-shipper",
}
]
}
Expand All @@ -131,8 +136,13 @@ def _server(self) -> dict:

@property
def _storage_config(self) -> dict:
# Ref: https://grafana.com/docs/loki/latest/configure/#storage_config
return {
"boltdb": {"directory": BOLTDB_DIR},
"boltdb_shipper": {
"active_index_directory": BOLTDB_DIR,
"shared_store": "filesystem",
"cache_location": BOLTDB_CACHE_DIR,
},
"filesystem": {"directory": CHUNKS_DIR},
}

Expand All @@ -150,6 +160,7 @@ def _limits_config(self) -> dict:
# This charmed operator is intended for running a single loki instances, so we don't need to split queries
# https://community.grafana.com/t/too-many-outstanding-requests-on-loki-2-7-1/78249/9
"split_queries_by_interval": "0",
"retention_period": f"{self.retention_period}d",
}

@property
Expand Down Expand Up @@ -197,3 +208,14 @@ def _querier(self) -> dict:
# The maximum number of concurrent queries allowed. Default is 10.
"max_concurrent": 20,
}

@property
def _compactor(self) -> dict:
# Ref: https://grafana.com/docs/loki/latest/configure/#compactor
retention_enabled = self.retention_period != 0
return {
# Activate custom retention. Default is False.
"retention_enabled": retention_enabled,
"working_directory": COMPACTOR_DIR,
"shared_store": "filesystem",
}
74 changes: 74 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,80 @@ async def loki_rules(ops_test, app_name) -> dict:
return {}


async def loki_services(ops_test, app_name: str) -> dict:
"""Fetches the status of Loki services from loki HTTP api.
Returns:
dict: A dictionary containing the status of Loki services, where keys are service names and values are their statuses.
Example:
{
'server': 'Running',
'ring': 'Running',
'analytics': 'Running',
'querier': 'Running',
'query-frontend': 'Running',
'query-scheduler-ring': 'Running',
'query-frontend-tripperware': 'Running',
'ingester': 'Running',
'distributor': 'Running',
'query-scheduler': 'Running',
'ingester-querier': 'Running',
'store': 'Running',
'cache-generation-loader': 'Running',
'memberlist-kv': 'Running',
'compactor': 'Running',
'ruler': 'Running'
}
"""
address = await get_unit_address(ops_test, app_name, 0)
url = f"http://{address}:3100/services"
try:
response = requests.get(url)
if response.status_code == 200:
services = {}
# Parse the response and populate the services dictionary
# Each line represents a service name and its status separated by " => "
# We split each line by " => " and store the key-value pairs in the services dictionary
for line in response.text.split("\n"):
if line.strip():
key, value = line.strip().split(" => ")
services[key.strip()] = value.strip()
return services
return {}
except requests.exceptions.RequestException:
return {}


async def loki_config(ops_test, app_name: str) -> dict:
"""Fetches the Loki configuration from loki HTTP api.
Returns:
dict: A dictionary containing the Loki configuration.
Example:
{
'limits_config': {
'retention_period': '0s'
},
'compactor': {
'retention_enabled': False
},
# Other configuration parameters...
}
"""
address = await get_unit_address(ops_test, app_name, 0)
url = f"http://{address}:3100/config"
try:
response = requests.get(url)
if response.status_code == 200:
yaml_dict = yaml.safe_load(response.text)
return yaml_dict
return {}
except requests.exceptions.RequestException:
return {}


async def loki_endpoint_request(ops_test, app_name: str, endpoint: str, unit_num: int = 0):
address = await get_unit_address(ops_test, app_name, unit_num)
url = urljoin(f"http://{address}:3100/", endpoint)
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_loki_configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
from pathlib import Path

import pytest
import yaml
from helpers import is_loki_up, loki_config, loki_services
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
app_name = METADATA["name"]
resources = {"loki-image": METADATA["resources"]["loki-image"]["upstream-source"]}


@pytest.mark.abort_on_fail
async def test_services_running(ops_test: OpsTest, loki_charm):
"""Deploy the charm-under-test."""
logger.debug("deploy local charm")

await ops_test.model.deploy(
loki_charm, application_name=app_name, resources=resources, trust=True
)
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)
assert await is_loki_up(ops_test, app_name)

services = await loki_services(ops_test, app_name)
assert all(status == "Running" for status in services.values()), "Not all services are running"


@pytest.mark.abort_on_fail
async def test_retention_configs(ops_test: OpsTest):
default_configs = await loki_config(ops_test, app_name)
assert all(
[
default_configs["limits_config"]["retention_period"] == "0s",
not default_configs["compactor"]["retention_enabled"],
]
)

await ops_test.model.applications[app_name].set_config({"retention-period": "3"})
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)

configs_with_retention = await loki_config(ops_test, app_name)
assert all(
[
configs_with_retention["limits_config"]["retention_period"] == "3d",
configs_with_retention["compactor"]["retention_enabled"],
]
)

0 comments on commit 58eb250

Please sign in to comment.