Skip to content

Commit

Permalink
Merge pull request #39 from omnivector-solutions/dusktreader/add-payl…
Browse files Browse the repository at this point in the history
…oad-mappers

Dusktreader/add payload mappers
  • Loading branch information
dusktreader authored Sep 14, 2023
2 parents 9ac0e56 + f754d92 commit 8caa457
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 392 deletions.
11 changes: 11 additions & 0 deletions armasec/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,14 @@ class AuthorizationError(ArmasecError):
"""

status_code: int = starlette.status.HTTP_403_FORBIDDEN


class PayloadMappingError(ArmasecError):
"""
Indicates that the configured payload_claim_mapping did not match a path in the token.
Attributes:
status_code: The HTTP status code indicated by the error. Set to 500.
"""

status_code: int = starlette.status.HTTP_500_INTERNAL_SERVER_ERROR
26 changes: 20 additions & 6 deletions armasec/schemas/armasec_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import Any, Dict, List, Optional, Set, Union

import snick
from pydantic import BaseModel, Field


Expand All @@ -27,17 +28,30 @@ class DomainConfig(BaseModel):
)
use_https: bool = Field(
True,
description=(
"If falsey, use ``http`` when pulling openid config "
"from the OIDC server instead of ``https`` (the default)."
description=snick.unwrap(
"""
If falsey, use ``http`` when pulling openid config from the OIDC server
instead of ``https`` (the default).
""",
),
)
match_keys: Dict[
str, Union[str, List[Any], Dict[Any, Any], Set[Any], bool, int, float]
] = Field(
dict(),
description=(
"Dictionary of key-value pair to match in the token when decoding it. It will"
" raise 403 in case the input key-value pair cannot be found in the token."
description=snick.unwrap(
"""
Dictionary of key-value pair to match in the token when decoding it. It will
raise 403 in case the input key-value pair cannot be found in the token.
"""
),
)
payload_claim_mapping: Optional[Dict[str, Any]] = Field(
None,
description=snick.unwrap(
"""
Optional mappings that are applied to map claims to top-level properties of
TokenPayload. See docs for `TokenDecoder` for more info.
"""
)
)
54 changes: 49 additions & 5 deletions armasec/token_decoder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""
This module provides an abstract base class for algorithmic token decoders
"""
from __future__ import annotations

from functools import partial
from typing import Callable, Optional
from typing import Callable

import jmespath
import buzz
from jose import jwt

from armasec.exceptions import AuthenticationError
from armasec.exceptions import AuthenticationError, PayloadMappingError
from armasec.schemas.jwks import JWKs
from armasec.token_payload import TokenPayload
from armasec.utilities import log_error, noop
Expand All @@ -24,8 +27,9 @@ def __init__(
self,
jwks: JWKs,
algorithm: str = "RS256",
debug_logger: Optional[Callable[..., None]] = None,
decode_options_override: Optional[dict] = None,
debug_logger: Callable[..., None] | None = None,
decode_options_override: dict | None = None,
payload_claim_mapping: dict | None = None,
):
"""
Initializes a TokenDecoder.
Expand All @@ -39,11 +43,36 @@ def __init__(
decode_options_override: Options that can override the default behavior of the jwt
decode method. For example, one can ignore token expiration by
setting this to `{ "verify_exp": False }`
payload_claim_mapping: Optional mappings that are applied to map claims to top-level
attribute of TokenPayload using a dict format of:
```
{
"top_level+attribute": "decoded.token.JMESPath"
}
```
The values _must_ be a valid JMESPath.
Consider this example:
```
{
"permissions": "resource_access.default.roles"
}
```
The above example would result in a TokenPayload like:
```
TokenPayload(permissions=token["resource_access"]["default"]["roles"])
```
Raises a 500 if the path does not match
"""
self.algorithm = algorithm
self.jwks = jwks
self.debug_logger = debug_logger if debug_logger else noop
self.decode_options_override = decode_options_override if decode_options_override else {}
self.payload_claim_mapping = payload_claim_mapping if payload_claim_mapping else {}

def get_decode_key(self, token: str) -> dict:
"""
Expand Down Expand Up @@ -95,7 +124,22 @@ def decode(self, token: str, **claims) -> TokenPayload:
**claims,
)
)
self.debug_logger(f"Payload dictionary is {payload_dict}")
self.debug_logger(f"Raw payload dictionary is {payload_dict}")

with PayloadMappingError.handle_errors(
"Failed to map decoded token to payload",
do_except=partial(log_error, self.debug_logger),
):
for (payload_key, token_jmespath) in self.payload_claim_mapping.items():
mapped_value = jmespath.search(token_jmespath, payload_dict)
buzz.require_condition(
mapped_value is not None,
f"No matching values found for claim mapping {token_jmespath} -> {payload_key}",
raise_exc_class=KeyError,
)
payload_dict[payload_key] = mapped_value
self.debug_logger(f"Mapped payload dictionary is {payload_dict}")

self.debug_logger("Attempting to convert to TokenPayload")
token_payload = TokenPayload(**payload_dict)
self.debug_logger(f"Built token_payload as {token_payload}")
Expand Down
7 changes: 6 additions & 1 deletion armasec/token_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ def _load_manager(self, domain_config: DomainConfig) -> TokenManager:
loader = OpenidConfigLoader(
domain_config.domain, use_https=domain_config.use_https, debug_logger=self.debug_logger
)
decoder = TokenDecoder(loader.jwks, domain_config.algorithm, debug_logger=self.debug_logger)
decoder = TokenDecoder(
loader.jwks,
domain_config.algorithm,
debug_logger=self.debug_logger,
payload_claim_mapping=domain_config.payload_claim_mapping,
)
return TokenManager(
loader.config,
decoder,
Expand Down
79 changes: 31 additions & 48 deletions docs/source/tutorials/getting_started_with_keycloak.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ If you want to try the example locally without an existing Keycloak server, you
using Docker with the following command:

```bash
docker run keycloak/keycloak:18.0.0 -p 8080:8080 -e KEYCLOAK_ADMIN="admin" -e KEYCLOAK_ADMIN_PASSWORD="admin" start-dev
docker run -p 8080:8080 -e KEYCLOAK_ADMIN="admin" -e KEYCLOAK_ADMIN_PASSWORD="admin" keycloak/keycloak:18.0.0 start-dev
```

This will start a server who's admin UI is available at localhost:8080.
Expand Down Expand Up @@ -59,7 +59,9 @@ tab in the nav bar and click the "Create" button:
![Clients](../images/keycloak-03.png){: .framed-image}
_Clients_

For this tutorial, we will use the name "armasec-tutorial". Click "Save" to create the new client.
For this tutorial, we will use the name "armasec_tutorial". Click "Save" to create the new client.

[TODO]: # (Take new screenshots with "armasec_tutorial" instead of "armasec-tutorial")

![Create client](../images/keycloak-04.png){: .framed-image}
_Create client_
Expand Down Expand Up @@ -95,32 +97,15 @@ Click the "Save" button to add the role to the client.
![Save role](../images/keycloak-08.png){: .framed-image}
_Save role_

Finally, we need to set up an "Audience" mapper to set the audience claim in the token that our
example app will check for.

Next, we need to set up a Mapper that will put the roles inside of the "permissions" claim in the
issued tokens. Navigate back to the "armasec-tutorial" client and open the "Mappers" tab. Click the
Navigate back to the "armasec_tutorial" client and open the "Mappers" tab. Click the
"Create" button to add a new mapper.

![Mappers](../images/keycloak-09.png){: .framed-image}
_Mappers_

In the form, there's a few things to set:

| _Field_ | _Value_ |
| -----------------| ---------------- |
| Name | permissions |
| Mapper Type | User Client Role |
| Client ID | armasec-tutorial |
| Token Claim Name | Permissions |
| Claim JSON Type | String |

Set the fields as specified above and click the "Save" button to create the new mapper

![Permissions mapper](../images/keycloak-10.png){: .framed-image}
_Permissions mapper_

Finally, we need to set up an "Audience" mapper to set the audience claim in the token that our
example app will check for.

Create a new mapper with the following settings:

| _Field_ | _Value_ |
Expand All @@ -139,7 +124,7 @@ Now, for the purposes of this tutorial, we will be getting a token via a request
"Service Account Role". We need to add the role we created above to the "Service Account Role" so
that the token issued later includes the needed permissions.

Navigate to the "Service Account Roles" tab. Then, select "armasec-tutorial" for the "Client Roles"
Navigate to the "Service Account Roles" tab. Then, select "armasec_tutorial" for the "Client Roles"
field. Select the "read:stuff" role and add it with the "Add selected >>" button.

![Add role](../images/keycloak-11.png){: .framed-image}
Expand All @@ -148,36 +133,21 @@ _Add role_
Now your client should be all set up and ready to go.


## Prepare the environment

The example FastAPI app requires the "ARMASEC_DOMAIN" and "ARMASEC_AUDIENCE" environment variables
to be set.

We also need to set the "CLIENT_ID" and "CLIENT_SECRET" for our `curl` request to Keycloak to get a
test token:

```bash
export ARMASEC_DOMAIN=localhost:8080/realms/master
export ARMASEC_AUDIENCE=http://keycloak.local
export CLIENT_ID=armasec-tutorial
export CLIENT_SECRET=<your-client-secret>
```


## Start up the example app

```python title="example.py" linenums="1"
import os

from armasec import Armasec
from fastapi import FastAPI, Depends


app = FastAPI()
armasec = Armasec(
domain=os.environ.get("ARMASEC_DOMAIN"),
audience=os.environ.get("ARMASEC_AUDIENCE"),
domain="localhost:8080/realms/master",
audience="http://keycloak.local",
use_https=False,
payload_claim_mapping=dict(permissions="resource_access.armasec_tutorial.roles"),
debug_logger=print,
debug_exceptions=True,
)

@app.get("/stuff", dependencies=[Depends(armasec.lockdown("read:stuff"))])
Expand All @@ -188,25 +158,38 @@ async def check_access():
Note in this example that the `use_https` flag must be set to false to allow a local server using
unsecured HTTP.

Also not that we need to add a `payload_claim_mapping` because Keycloak does not provide
a permissions claim at the top level. This mapping copies the roles found at
`resource_access.armasec_tutorial.roles` to a top-level attribute of the token payload
called permissions.

Copy the `example.py` app to a local source file called "example.py".

Start it up with uvicorn:
Start it up with uvicorn (running in the background):

```bash
python -m uvicorn --host 0.0.0.0 example:app
```

Once it is up and running, hit `<ctrl-z>` and type the command `bg` to put the uvicorn
process into the background. You should make sure to kill the process when you complete
the tutorial.


## Get the test token

We will use `curl` to get an example token to test out our route's security:
We will use `curl` to get an example token to test out our route's security. You will
need to replace `$CLIENT_SECRET` with the secret variable we noted earlier.
Alternatively, you can set a shell variable with this value and use the command
directly.


```bash
curl -d "client_id=$CLIENT_ID&client_secret=$CLIENT_SECRET&grant_type=client_credentials" -X POST "http://localhost:8080/realms/master/protocol/openid-connect/token"
curl -d "client_id=armasec_tutorial&client_secret=$CLIENT_SECRET&grant_type=client_credentials" -X POST "http://localhost:8080/realms/master/protocol/openid-connect/token"
```

If you wish to see metadata about the token (such as its lifespan, you can omit the `jq` command at
the end.
You should see a JSON blob printed out that includes an `access_token` attribute along
with other metadata about the token.


## Try it out
Expand Down
Loading

0 comments on commit 8caa457

Please sign in to comment.