Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jens Langhammer <[email protected]>
  • Loading branch information
BeryJu committed Nov 19, 2024
1 parent bcaa651 commit f888d39
Showing 1 changed file with 222 additions and 0 deletions.
222 changes: 222 additions & 0 deletions authentik/providers/oauth2/tests/test_token_cc_jwt_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""Test token view"""

from datetime import datetime, timedelta
from json import loads

from django.test import RequestFactory
from django.urls import reverse
from django.utils.timezone import now
from jwt import decode

from authentik.blueprints.tests import apply_blueprint
from authentik.core.models import Application, Group
from authentik.core.tests.utils import create_test_cert, create_test_flow, create_test_user
from authentik.lib.generators import generate_id
from authentik.policies.models import PolicyBinding
from authentik.providers.oauth2.constants import (
GRANT_TYPE_CLIENT_CREDENTIALS,
SCOPE_OPENID,
SCOPE_OPENID_EMAIL,
SCOPE_OPENID_PROFILE,
TOKEN_TYPE,
)
from authentik.providers.oauth2.models import AccessToken, OAuth2Provider, ScopeMapping
from authentik.providers.oauth2.tests.utils import OAuthTestCase


class TestTokenClientCredentialsJWTProvider(OAuthTestCase):
"""Test token (client_credentials, with JWT) view"""

@apply_blueprint("system/providers-oauth2.yaml")
def setUp(self) -> None:
super().setUp()
self.factory = RequestFactory()
self.other_cert = create_test_cert()
self.cert = create_test_cert()

self.other_provider = OAuth2Provider.objects.create(
name=generate_id(),
authorization_flow=create_test_flow(),
signing_key=self.other_cert,
)
self.other_provider.property_mappings.set(ScopeMapping.objects.all())
self.app = Application.objects.create(
name=generate_id(), slug=generate_id(), provider=self.other_provider
)

self.provider: OAuth2Provider = OAuth2Provider.objects.create(
name="test",
authorization_flow=create_test_flow(),
redirect_uris="http://testserver",
signing_key=self.cert,
)
self.provider.jwt_federation_providers.add(self.other_provider)
self.provider.property_mappings.set(ScopeMapping.objects.all())
self.app = Application.objects.create(name="test", slug="test", provider=self.provider)

def test_invalid_type(self):
"""test invalid type"""
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "foo",
"client_assertion": "foo.bar",
},
)
self.assertEqual(response.status_code, 400)
body = loads(response.content.decode())
self.assertEqual(body["error"], "invalid_grant")

def test_invalid_jwt(self):
"""test invalid JWT"""
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": "foo.bar",
},
)
self.assertEqual(response.status_code, 400)
body = loads(response.content.decode())
self.assertEqual(body["error"], "invalid_grant")

def test_invalid_signature(self):
"""test invalid JWT"""
token = self.provider.encode(
{
"sub": "foo",
"exp": datetime.now() + timedelta(hours=2),
}
)
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token + "foo",
},
)
self.assertEqual(response.status_code, 400)
body = loads(response.content.decode())
self.assertEqual(body["error"], "invalid_grant")

def test_invalid_expired(self):
"""test invalid JWT"""
token = self.provider.encode(
{
"sub": "foo",
"exp": datetime.now() - timedelta(hours=2),
}
)
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token,
},
)
self.assertEqual(response.status_code, 400)
body = loads(response.content.decode())
self.assertEqual(body["error"], "invalid_grant")

def test_invalid_no_app(self):
"""test invalid JWT"""
self.app.provider = None
self.app.save()
token = self.provider.encode(
{
"sub": "foo",
"exp": datetime.now() + timedelta(hours=2),
}
)
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token,
},
)
self.assertEqual(response.status_code, 400)
body = loads(response.content.decode())
self.assertEqual(body["error"], "invalid_grant")

def test_invalid_access_denied(self):
"""test invalid JWT"""
group = Group.objects.create(name="foo")
PolicyBinding.objects.create(
group=group,
target=self.app,
order=0,
)
token = self.provider.encode(
{
"sub": "foo",
"exp": datetime.now() + timedelta(hours=2),
}
)
response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token,
},
)
self.assertEqual(response.status_code, 400)
body = loads(response.content.decode())
self.assertEqual(body["error"], "invalid_grant")

def test_successful(self):
"""test successful"""
user = create_test_user()
token = self.other_provider.encode(
{
"sub": "foo",
"exp": datetime.now() + timedelta(hours=2),
}
)
AccessToken.objects.create(
provider=self.other_provider,
token=token,
user=user,
auth_time=now(),
)

response = self.client.post(
reverse("authentik_providers_oauth2:token"),
{
"grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
"scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
"client_id": self.provider.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token,
},
)
self.assertEqual(response.status_code, 200)
body = loads(response.content.decode())
self.assertEqual(body["token_type"], TOKEN_TYPE)
_, alg = self.provider.jwt_key
jwt = decode(
body["access_token"],
key=self.provider.signing_key.public_key,
algorithms=[alg],
audience=self.provider.client_id,
)
self.assertEqual(jwt["given_name"], user.name)
self.assertEqual(jwt["preferred_username"], user.username)

0 comments on commit f888d39

Please sign in to comment.