-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
176 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,9 @@ | |
from jose import jwt | ||
|
||
from qualicharge.auth.factories import IDTokenFactory | ||
from qualicharge.auth.models import UserRead | ||
from qualicharge.auth.models import IDToken, UserCreate, UserRead | ||
from qualicharge.auth.oidc import discover_provider, get_public_keys | ||
from qualicharge.auth.schemas import User | ||
from qualicharge.conf import settings | ||
|
||
|
||
|
@@ -141,3 +142,94 @@ def test_whoami_jwt_decoding_error( | |
assert response.json() == { | ||
"message": "Authentication failed: Unable to decode ID token" | ||
} | ||
|
||
|
||
def test_login_with_invalid_user(client): | ||
"""Test the login endpoint with invalid user.""" | ||
response = client.post( | ||
"/auth/token", data={"username": "johndoe", "password": "foo"} | ||
) | ||
assert response.status_code == status.HTTP_401_UNAUTHORIZED | ||
assert response.json() == { | ||
"message": "Authentication failed: Wrong login or password" | ||
} | ||
|
||
|
||
def test_login_with_invalid_password(client, db_session): | ||
"""Test the login endpoint with invalid password.""" | ||
user = User( | ||
**UserCreate( | ||
username="johndoe", | ||
password="foo", # noqa: S106 | ||
email="[email protected]", | ||
first_name="John", | ||
last_name="Doe", | ||
is_superuser=False, | ||
is_staff=False, | ||
is_active=True, | ||
).model_dump() | ||
) | ||
db_session.add(user) | ||
|
||
response = client.post( | ||
"/auth/token", data={"username": "johndoe", "password": "bar"} | ||
) | ||
assert response.status_code == status.HTTP_401_UNAUTHORIZED | ||
assert response.json() == { | ||
"message": "Authentication failed: Wrong login or password" | ||
} | ||
|
||
|
||
def test_login_with_inactive_user(client, db_session): | ||
"""Test the login endpoint with an inactive user.""" | ||
user = User( | ||
**UserCreate( | ||
username="johndoe", | ||
password="foo", # noqa: S106 | ||
email="[email protected]", | ||
first_name="John", | ||
last_name="Doe", | ||
is_superuser=False, | ||
is_staff=False, | ||
is_active=False, | ||
).model_dump() | ||
) | ||
db_session.add(user) | ||
|
||
response = client.post( | ||
"/auth/token", data={"username": "johndoe", "password": "foo"} | ||
) | ||
assert response.status_code == status.HTTP_401_UNAUTHORIZED | ||
assert response.json() == {"message": "Authentication failed: User is not active"} | ||
|
||
|
||
def test_login(client, db_session): | ||
"""Test the login endpoint.""" | ||
user = User( | ||
**UserCreate( | ||
username="johndoe", | ||
password="foo", # noqa: S106 | ||
email="[email protected]", | ||
first_name="John", | ||
last_name="Doe", | ||
is_superuser=False, | ||
is_staff=False, | ||
is_active=True, | ||
).model_dump() | ||
) | ||
db_session.add(user) | ||
|
||
response = client.post( | ||
"/auth/token", data={"username": "johndoe", "password": "foo"} | ||
) | ||
assert response.status_code == status.HTTP_200_OK | ||
token = response.json() | ||
assert token["token_type"] == "bearer" # noqa: S105 | ||
decoded = jwt.decode( | ||
token=token["access_token"], | ||
key=settings.OAUTH2_TOKEN_ENCODING_KEY, | ||
audience=settings.OIDC_EXPECTED_AUDIENCE, | ||
) | ||
id_token = IDToken(**decoded) | ||
assert id_token.sub == "johndoe" | ||
assert id_token.email == "[email protected]" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
"""Tests for qualicharge.auth.oidc module.""" | ||
|
||
from datetime import datetime | ||
from typing import Union | ||
|
||
import httpx | ||
import pytest | ||
|
@@ -92,65 +93,86 @@ def test_get_public_keys_with_bad_configuration(httpx_mock): | |
get_public_keys("http://oidc/wrong") | ||
|
||
|
||
def test_get_token(httpx_mock, monkeypatch, id_token_factory: IDTokenFactory): | ||
@pytest.mark.parametrize("oidc_is_enabled", (True, False)) | ||
def test_get_token( | ||
oidc_is_enabled, httpx_mock, monkeypatch, id_token_factory: IDTokenFactory | ||
): | ||
"""Test the OIDC get token utility.""" | ||
monkeypatch.setenv("QUALICHARGE_OIDC_PROVIDER_BASE_URL", "http://oidc") | ||
httpx_mock.add_response( | ||
method="GET", | ||
url=str(settings.OIDC_CONFIGURATION_URL), | ||
json={ | ||
"jwks_uri": "https://oidc/certs", | ||
"id_token_signing_alg_values_supported": "HS256", | ||
}, | ||
) | ||
httpx_mock.add_response( | ||
method="GET", | ||
url="https://oidc/certs", | ||
json=[ | ||
"secret", | ||
], | ||
) | ||
monkeypatch.setattr(settings, "OIDC_IS_ENABLED", oidc_is_enabled) | ||
monkeypatch.setattr(settings, "OAUTH2_TOKEN_ENCODING_KEY", "secret") | ||
|
||
if oidc_is_enabled: | ||
httpx_mock.add_response( | ||
method="GET", | ||
url=str(settings.OIDC_CONFIGURATION_URL), | ||
json={ | ||
"jwks_uri": "https://oidc/certs", | ||
"id_token_signing_alg_values_supported": "HS256", | ||
}, | ||
) | ||
httpx_mock.add_response( | ||
method="GET", | ||
url="https://oidc/certs", | ||
json=[ | ||
"secret", | ||
], | ||
) | ||
|
||
bearer_token = HTTPAuthorizationCredentials( | ||
scheme="Bearer", | ||
credentials=jwt.encode( | ||
claims=id_token_factory.build().model_dump(), key="secret" | ||
), | ||
token = jwt.encode( | ||
claims=id_token_factory.build().model_dump(), | ||
key="secret", | ||
) | ||
token = get_token(security_scopes=SecurityScopes(), token=bearer_token) | ||
assert token.email == "[email protected]" | ||
bearer_token: Union[str, HTTPAuthorizationCredentials] = token | ||
if oidc_is_enabled: | ||
bearer_token = HTTPAuthorizationCredentials( | ||
scheme="Bearer", | ||
credentials=token, | ||
) | ||
|
||
id_token = get_token(security_scopes=SecurityScopes(), token=bearer_token) | ||
assert id_token.email == "[email protected]" | ||
|
||
|
||
@pytest.mark.parametrize("oidc_is_enabled", (True, False)) | ||
def test_get_token_with_expired_token( | ||
httpx_mock, monkeypatch, id_token_factory: IDTokenFactory | ||
oidc_is_enabled, httpx_mock, monkeypatch, id_token_factory: IDTokenFactory | ||
): | ||
"""Test the OIDC get token utility when the token expired.""" | ||
monkeypatch.setenv("QUALICHARGE_OIDC_PROVIDER_BASE_URL", "http://oidc") | ||
httpx_mock.add_response( | ||
method="GET", | ||
url=str(settings.OIDC_CONFIGURATION_URL), | ||
json={ | ||
"jwks_uri": "https://oidc/certs", | ||
"id_token_signing_alg_values_supported": "HS256", | ||
}, | ||
) | ||
httpx_mock.add_response( | ||
method="GET", | ||
url="https://oidc/certs", | ||
json=[ | ||
"secret", | ||
], | ||
) | ||
monkeypatch.setattr(settings, "OIDC_IS_ENABLED", oidc_is_enabled) | ||
monkeypatch.setattr(settings, "OAUTH2_TOKEN_ENCODING_KEY", "secret") | ||
|
||
if oidc_is_enabled: | ||
httpx_mock.add_response( | ||
method="GET", | ||
url=str(settings.OIDC_CONFIGURATION_URL), | ||
json={ | ||
"jwks_uri": "https://oidc/certs", | ||
"id_token_signing_alg_values_supported": "HS256", | ||
}, | ||
) | ||
httpx_mock.add_response( | ||
method="GET", | ||
url="https://oidc/certs", | ||
json=[ | ||
"secret", | ||
], | ||
) | ||
|
||
# As exp should be set to iat + 300, the token should be expired | ||
iat = int(datetime.now().timestamp()) - 500 | ||
bearer_token = HTTPAuthorizationCredentials( | ||
scheme="Bearer", | ||
credentials=jwt.encode( | ||
claims=id_token_factory.build(iat=iat).model_dump(), | ||
key="secret", | ||
), | ||
token = jwt.encode( | ||
claims=id_token_factory.build(iat=iat).model_dump(), | ||
key="secret", | ||
) | ||
bearer_token: Union[str, HTTPAuthorizationCredentials] = token | ||
if oidc_is_enabled: | ||
bearer_token = HTTPAuthorizationCredentials( | ||
scheme="Bearer", | ||
credentials=token, | ||
) | ||
|
||
with pytest.raises(OIDCAuthenticationError, match="Token signature expired"): | ||
get_token(security_scopes=SecurityScopes(), token=bearer_token) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
from qualicharge.auth.factories import GroupFactory, IDTokenFactory, UserFactory | ||
from qualicharge.auth.oidc import get_token | ||
from qualicharge.auth.schemas import UserGroup | ||
from qualicharge.conf import settings | ||
|
||
|
||
@pytest.fixture | ||
|
@@ -17,7 +18,9 @@ def client(): | |
|
||
|
||
@pytest.fixture | ||
def client_auth(request, id_token_factory: IDTokenFactory, db_session: Session): | ||
def client_auth( | ||
request, id_token_factory: IDTokenFactory, db_session: Session, monkeypatch | ||
): | ||
"""An authenticated test client configured for the /api/v1 application. | ||
Parameter: | ||
|
@@ -33,6 +36,8 @@ def client_auth(request, id_token_factory: IDTokenFactory, db_session: Session): | |
GroupFactory.__session__ = db_session | ||
UserFactory.__session__ = db_session | ||
|
||
monkeypatch.setattr(settings, "OIDC_IS_ENABLED", True) | ||
|
||
persist = True | ||
fields = { | ||
"email": "[email protected]", | ||
|