Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Исправить stack level для предупреждений #228

Merged
merged 4 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions fast_bitrix24/user_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@
)
from .server_response import ServerResponseParser
from .srh import ServerRequestHandler
from .utils import get_warning_stack_level


BITRIX_PAGE_SIZE = 50

TOP_MOST_LIBRARY_MODULES = [
"fast_bitrix24\\bitrix",
"fast_bitrix24\\logger",
"fast_bitrix24/bitrix",
"fast_bitrix24/logger",
]


class UserRequestAbstract:
@beartype
Expand Down Expand Up @@ -56,7 +65,7 @@ def standardized_params(self, p):
"Using None as filter value confuses Bitrix. "
"Try using an empty string, 'null' or 'false'.",
UserWarning,
stacklevel=2,
stacklevel=get_warning_stack_level(TOP_MOST_LIBRARY_MODULES),
)

return p
Expand Down Expand Up @@ -119,6 +128,7 @@ def check_special_limitations(self):
"get_all() should be used only with methods that end with '.list'. "
"Use get_by_ID() or call() instead.",
UserWarning,
stacklevel=get_warning_stack_level(TOP_MOST_LIBRARY_MODULES),
)

return True
Expand Down Expand Up @@ -192,7 +202,7 @@ def dedup_results(self):
f"Number of results returned ({len(self.results)}) "
f"doesn't equal 'total' from the server reply ({self.total})",
RuntimeWarning,
stacklevel=4,
stacklevel=get_warning_stack_level(TOP_MOST_LIBRARY_MODULES),
)


Expand Down Expand Up @@ -327,7 +337,7 @@ def __init__(self, bitrix, method_branch: str, ID_field_name: str = "ID"):
"now that exceeding Bitrix request rate limitations gets users "
"heavily penalised. Use 'get_all()' instead.",
DeprecationWarning,
stacklevel=2,
stacklevel=get_warning_stack_level(TOP_MOST_LIBRARY_MODULES),
)

@icontract.require(
Expand Down
61 changes: 61 additions & 0 deletions fast_bitrix24/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import sys
import traceback
from typing import List, Union
from urllib.parse import quote, urlparse


Expand Down Expand Up @@ -33,3 +36,61 @@ def http_build_query(params, convention="%s"):
output = output + convention % key + "=" + val + "&"

return output


def get_warning_stack_level(module_filenames: Union[str, List[str]]) -> int:
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
"""Calculate the stack level for warnings issued from a library.

Returns the number of stack frames between the top-most module from the given list
occurring in the call stack and the caller of the function.

This is used to provide the appropriate stack level to warnings.warn or
warnings.warn_explicit so that the warning appears in user code rather
than in the library itself.

The calculation is based on the system call stack.

Args:
module_filenames: The filenames of possible top-most modules before which the
user code is situated. If a single filename is provided, it is wrapped
in a list.

Returns:
The stack level to be used in warnings.warn or warnings.warn_explicit.

Raises:
ValueError: If none of the modules in the provided sequence have been found
in the stack.
"""

# Wrap single filename in a list
if isinstance(module_filenames, str):
module_filenames = [module_filenames]

# Append '.py' to filenames
module_filenames = tuple(
f if f.endswith(".py") else f"{f}.py" for f in module_filenames
)

# Get filenames from stack
stack_filenames = []
top_frame = sys._getframe()
while top_frame is not None:
stack_filenames.append(top_frame.f_code.co_filename)
top_frame = top_frame.f_back
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved

# Find top-most module filename in the stack
try:
top_most_index = next(
i
for i, v in enumerate(reversed(stack_filenames))
if v.endswith(module_filenames)
)
except StopIteration:
raise ValueError(
"None of the modules in the provided sequence have been found "
"in the stack."
)
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved

# Calculate stack level
return len(stack_filenames) - top_most_index
115 changes: 115 additions & 0 deletions tests/test_warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import warnings
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
from unittest.mock import MagicMock

import pytest

from fast_bitrix24.utils import get_warning_stack_level


async def empty_async(*args, **kwargs):
pass


@pytest.mark.asyncio
async def test_warning_get_all(bx_dummy_async, monkeypatch):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
monkeypatch.setattr(
"fast_bitrix24.user_request.GetAllUserRequest.run",
lambda *args, **kwargs: empty_async(),
)
await bx_dummy_async.get_all("crm.deal.add")
assert len(w) == 1
print(w[0])
assert w[0].filename == __file__ # Assuming the test is in the same file


@pytest.mark.asyncio
async def test_warning_get_all_params(bx_dummy_async, monkeypatch):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
monkeypatch.setattr(
"fast_bitrix24.user_request.GetAllUserRequest.run",
lambda *args, **kwargs: empty_async(),
)
await bx_dummy_async.get_all("crm.deal.list", params={"filter": {"ID": None}})
assert len(w) == 1
print(w[0])
assert w[0].filename == __file__ # Assuming the test is in the same file



def mock_stack(filenames):
frames = []
for filename in filenames:
frame = MagicMock()
frame.f_back = frames[-1] if frames else None
frame.f_code.co_filename = filename
frames.append(frame)
return frames


def mock_get_frame(stack, depth):
return stack[-1 - depth]


# Test cases for happy path
@pytest.mark.parametrize(
"module_filenames, module_sequence, expected_stack_level, test_id",
[
(["utils.py"], ["base.py", "utils.py", "caller.py"], 2, "single_module"),
(
["module2.py", "module1.py"],
["base.py", "module1.py", "module2.py", "caller.py"],
3,
"multiple_modules",
),
(
["module.py"],
["base.py", "module.py", "intermediate.py", "caller.py"],
3,
"intermediate_module",
),
(
"module.py",
["base.py", "module.py", "caller.py"],
2,
"single_module_str_input",
),
],
)
def test_get_warning_stack_level_happy_path(
module_filenames, module_sequence, expected_stack_level, test_id, monkeypatch
):
# Arrange
monkeypatch.setattr(
"fast_bitrix24.utils.sys._getframe",
lambda depth=0: mock_get_frame(mock_stack(module_sequence), depth),
)

# Act
stack_level = get_warning_stack_level(module_filenames)

# Assert
assert stack_level == expected_stack_level


# Test cases for error cases
@pytest.mark.parametrize(
"module_filenames, module_sequence, expected_exception, test_id",
[
(["utils.py"], ["unrelated.py", "caller.py"], ValueError, "unrelated_module"),
],
)
def test_get_warning_stack_level_error_cases(
module_filenames, module_sequence, expected_exception, test_id, monkeypatch
):
# Arrange
monkeypatch.setattr(
"fast_bitrix24.utils.sys._getframe",
lambda depth=0: mock_get_frame(mock_stack(module_sequence), depth),
)

# Act & Assert
with pytest.raises(expected_exception):
get_warning_stack_level(module_filenames)
Loading