Skip to content

Commit

Permalink
Merge pull request #6 from MountainGod2/dev
Browse files Browse the repository at this point in the history
fix: update influxdb logic and improve testing coverage
  • Loading branch information
MountainGod2 authored Aug 12, 2024
2 parents f3d0673 + 879de34 commit 62f9d9a
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 49 deletions.
6 changes: 5 additions & 1 deletion src/chaturbate_poller/chaturbate_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ async def fetch_events(self, url: str | None = None) -> EventsAPIResponse:

# Write events to InfluxDB
for event in events_api_response.events:
self.influxdb_handler.write_event("chaturbate_events", event.model_dump())
try:
self.influxdb_handler.write_event("chaturbate_events", event.model_dump())
except Exception:
logger.exception("An error occurred while writing event data to InfluxDB")
raise

return events_api_response

Expand Down
1 change: 1 addition & 0 deletions src/chaturbate_poller/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def write_event(self, measurement: str, data: dict[str, Any]) -> None:
self.logger.info("Event data written to InfluxDB: %s", str(flattened_data))
except ApiException:
self.logger.exception("Failed to write data to InfluxDB")
raise

def close(self) -> None:
"""Close the InfluxDB client."""
Expand Down
26 changes: 25 additions & 1 deletion tests/test___main__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
# ruff: noqa: S101
# ruff: noqa: S101, S603
"""Tests for the __main__ module."""

import asyncio
import subprocess
import sys
from contextlib import suppress

import pytest

from chaturbate_poller.__main__ import start_polling


def test_cli_missing_arguments() -> None:
"""Test the CLI with missing arguments."""
result = subprocess.run(
[sys.executable, "-m", "chaturbate_poller", "--username", "testuser"],
capture_output=True,
text=True,
check=False,
)
assert "Unauthorized access" in result.stderr


def test_cli_with_invalid_token() -> None:
"""Test the CLI with an invalid token."""
result = subprocess.run(
[sys.executable, "-m", "chaturbate_poller", "--username", "testuser", "--token", "invalid"],
capture_output=True,
text=True,
check=False,
)
assert "Unauthorized access" in result.stderr


def test_start_polling() -> None:
"""Test the start_polling function."""
with (
Expand Down
46 changes: 32 additions & 14 deletions tests/test_chaturbate_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
AsyncClient,
ConnectError,
HTTPStatusError,
ReadTimeout,
Request,
Response,
TimeoutException,
Expand Down Expand Up @@ -64,7 +65,6 @@
}
"""dict: The test event data."""


INVALID_TIP_EVENT_DATA = {
"events": [
{
Expand Down Expand Up @@ -255,38 +255,35 @@ class TestChaturbateClientInitialization:

@pytest.mark.asyncio()
async def test_initialization(self) -> None:
"""Test ChaturbateClient initialization."""
"""Test successful initialization of ChaturbateClient with default settings."""
async with ChaturbateClient(USERNAME, TOKEN) as client:
assert client.username == USERNAME
assert client.token == TOKEN

@pytest.mark.asyncio()
async def test_initialization_with_timeout(self) -> None:
"""Test ChaturbateClient initialization with timeout."""
"""Test ChaturbateClient initialization with custom timeout."""
timeout = API_TIMEOUT
async with ChaturbateClient(USERNAME, TOKEN, timeout=timeout) as client:
assert client.timeout == timeout

@pytest.mark.asyncio()
async def test_initialization_with_non_default_base_url(self) -> None:
"""Test ChaturbateClient initialization with non-default base URL."""
base_url = TESTBED_BASE_URL
async def test_initialization_with_testbed(self) -> None:
"""Test ChaturbateClient initialization with testbed base URL."""
async with ChaturbateClient(USERNAME, TOKEN, testbed=True) as client:
assert client.base_url == base_url
assert client.base_url == TESTBED_BASE_URL

@pytest.mark.parametrize(("username", "token"), [("", TOKEN), (USERNAME, ""), ("", "")])
@pytest.mark.asyncio()
async def test_initialization_failure(self) -> None:
"""Test ChaturbateClient initialization failure."""
with pytest.raises(ValueError, match="Chaturbate username and token are required."):
async with ChaturbateClient("", TOKEN):
await asyncio.sleep(0)
async def test_initialization_failure(self, username: str, token: str) -> None:
"""Test ChaturbateClient initialization failure with missing username or token."""
with pytest.raises(ValueError, match="Chaturbate username and token are required."):
async with ChaturbateClient(USERNAME, ""):
async with ChaturbateClient(username, token):
await asyncio.sleep(0)

@pytest.mark.asyncio()
async def test_initialization_with_invalid_timeout(self) -> None:
"""Test ChaturbateClient initialization with an invalid timeout."""
"""Test ChaturbateClient initialization with invalid timeout."""
invalid_timeout = "invalid_timeout"
with pytest.raises(TypeError):
async with ChaturbateClient(USERNAME, TOKEN, timeout=invalid_timeout): # type: ignore[arg-type]
Expand Down Expand Up @@ -431,6 +428,27 @@ async def test_timeout_handling(
with pytest.raises(TimeoutException):
await chaturbate_client.fetch_events(TEST_URL)

@pytest.mark.asyncio()
async def test_fetch_events_influxdb_error(self, mocker, chaturbate_client) -> None: # noqa: ANN001
"""Test fetch_events method when InfluxDB write fails."""
mocker.patch.object(
chaturbate_client.influxdb_handler,
"write_event",
side_effect=Exception("InfluxDB Error"),
)
with pytest.raises(ValueError, match="Unauthorized access. Verify the username and token."):
await chaturbate_client.fetch_events()

@pytest.mark.asyncio()
async def test_fetch_events_timeout(self, mocker, chaturbate_client) -> None: # noqa: ANN001
"""Test fetch_events method when a timeout occurs."""
mocker.patch.object(
chaturbate_client.client, "get", side_effect=ReadTimeout("Request timed out")
)

with pytest.raises(ReadTimeout):
await chaturbate_client.fetch_events()


class TestURLConstruction:
"""Tests for URL construction."""
Expand Down
39 changes: 39 additions & 0 deletions tests/test_docker_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# ruff: noqa: S101, S603
"""Test the Docker integration."""

import shutil
import subprocess


def test_docker_image_build(mocker) -> None: # noqa: ANN001
"""Test building the Docker image."""
docker_executable = shutil.which("docker") # Find the full path of the Docker executable
assert docker_executable is not None, "Docker is not installed or not found in PATH"

mock_subprocess = mocker.patch("subprocess.run")
mock_subprocess.return_value.returncode = 0

result = subprocess.run(
[docker_executable, "build", "-t", "chaturbate_poller:latest", "."], check=True
)

assert result.returncode == 0
mock_subprocess.assert_called_once_with(
[docker_executable, "build", "-t", "chaturbate_poller:latest", "."], check=True
)


def test_docker_image_run(mocker) -> None: # noqa: ANN001
"""Test running the Docker container."""
docker_executable = shutil.which("docker") # Find the full path of the Docker executable
assert docker_executable is not None, "Docker is not installed or not found in PATH"

mock_subprocess = mocker.patch("subprocess.run")
mock_subprocess.return_value.returncode = 0

result = subprocess.run([docker_executable, "run", "chaturbate_poller:latest"], check=True)

assert result.returncode == 0
mock_subprocess.assert_called_once_with(
[docker_executable, "run", "chaturbate_poller:latest"], check=True
)
24 changes: 24 additions & 0 deletions tests/test_format_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ruff: noqa: S101
"""Tests for the format_messages module."""

import pytest

from chaturbate_poller.format_messages import format_message
from chaturbate_poller.models import Event, EventData


@pytest.mark.asyncio()
async def test_format_message_unknown_method() -> None:
"""Test formatting an event with an unknown method."""
event = Event(method="unknown_method", object=EventData(), id="UNIQUE_EVENT_ID")
formatted_message = await format_message(event)
assert formatted_message == "Unknown method: unknown_method"


@pytest.mark.asyncio()
async def test_format_message_no_user_or_media() -> None:
"""Test formatting an event with no user or media in the EventData."""
event_data = EventData(broadcaster="example_broadcaster")
event = Event(method="mediaPurchase", object=event_data, id="UNIQUE_EVENT_ID")
formatted_message = await format_message(event)
assert formatted_message == "Unknown media purchase event"
75 changes: 75 additions & 0 deletions tests/test_influxdb_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# ruff: noqa: S101
"""Tests for InfluxDBHandler."""

import logging
import os
from unittest import mock

import pytest
from influxdb_client.rest import ApiException

from chaturbate_poller.influxdb_client import InfluxDBHandler


Expand All @@ -15,3 +22,71 @@ def test_flatten_dict(self) -> None:
expected_dict = {"level1.level2": "value", "level1.level2b.level3": "value3"}

assert flattened_dict == expected_dict

@pytest.mark.parametrize(
("input_dict", "expected_dict"),
[
(
{"key": "value", "nested": {"subkey": "subvalue"}},
{"key": "value", "nested.subkey": "subvalue"},
),
(
{
"key": "value",
"nested": {"subkey": "subvalue", "subkey2": {"subsubkey": "value"}},
},
{"key": "value", "nested.subkey": "subvalue", "nested.subkey2.subsubkey": "value"},
),
],
)
def test_flatten_dict_with_params(self, input_dict: dict, expected_dict: dict) -> None:
"""Test flatten_dict method with various inputs."""
handler = InfluxDBHandler()
assert handler.flatten_dict(input_dict) == expected_dict

def test_write_event(self, mocker) -> None: # noqa: ANN001
"""Test writing an event to InfluxDB."""
handler = InfluxDBHandler()
mock_write_api = mocker.patch.object(handler.write_api, "write", autospec=True)
event_data = {"event": "data"}
handler.write_event("test_measurement", event_data)
mock_write_api.assert_called_once()

def test_write_event_failure(self, mocker, caplog) -> None: # noqa: ANN001
"""Test writing an event to InfluxDB failure."""
handler = InfluxDBHandler()
mocker.patch.object(handler.write_api, "write", side_effect=ApiException)
event_data = {"event": "data"}

with caplog.at_level(logging.ERROR), pytest.raises(ApiException):
handler.write_event("test_measurement", event_data)

assert "Failed to write data to InfluxDB" in caplog.text

def test_close_handler(self, mocker) -> None: # noqa: ANN001
"""Test closing the InfluxDB handler."""
handler = InfluxDBHandler()
mock_close = mocker.patch.object(handler.client, "close", autospec=True)
handler.close()
mock_close.assert_called_once()

@mock.patch.dict(
os.environ, {"INFLUXDB_URL": "http://localhost:8086", "INFLUXDB_TOKEN": "test_token"}
)
def test_init_handler(self) -> None:
"""Test initializing the InfluxDB handler."""
handler = InfluxDBHandler()
assert handler.client.url == "http://localhost:8086"
assert handler.client.token == "test_token" # noqa: S105

def test_flatten_dict_with_complex_structure(self) -> None:
"""Test flatten_dict method with a more complex nested structure."""
handler = InfluxDBHandler()
complex_dict = {
"level1": {"level2": {"level3": {"level4": "value"}}},
"another_level1": "another_value",
}
flattened_dict = handler.flatten_dict(complex_dict)
expected_dict = {"level1.level2.level3.level4": "value", "another_level1": "another_value"}

assert flattened_dict == expected_dict
47 changes: 47 additions & 0 deletions tests/test_logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# ruff: noqa: S101
"""Tests for the logging configuration module."""

import logging

from chaturbate_poller.logging_config import CustomFormatter, SanitizeURLFilter, sanitize_url


def test_sanitize_url() -> None:
"""Test that the sanitize_url function masks sensitive information."""
url = "https://eventsapi.chaturbate.com/events/username/token/"
sanitized = sanitize_url(url)
assert sanitized == "https://eventsapi.chaturbate.com/events/USERNAME/TOKEN/"


def test_sanitize_url_filter() -> None:
"""Test the SanitizeURLFilter to ensure it sanitizes log messages."""
_filter = SanitizeURLFilter()
log_record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="https://eventsapi.chaturbate.com/events/username/token/",
args=(),
exc_info=None,
)

assert _filter.filter(log_record)
assert log_record.msg == "https://eventsapi.chaturbate.com/events/USERNAME/TOKEN/"


def test_custom_formatter() -> None:
"""Test the CustomFormatter to ensure it formats log records properly."""
formatter = CustomFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log_record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="Test message",
args=None,
exc_info=None,
)
formatted = formatter.format(log_record)

assert "Test message" in formatted
Loading

0 comments on commit 62f9d9a

Please sign in to comment.