Skip to content

Commit

Permalink
Create an allow-list of redirect URLs (#6409)
Browse files Browse the repository at this point in the history
In various flows, we use application-provided URLs to redirect the user
back into the application. These URLs should be allow-listed and checked
before redirecting the user to mitigate against spoofing.

Co-authored-by: Scott Trinh <[email protected]>
  • Loading branch information
msullivan and scotttrinh authored Oct 31, 2023
1 parent 7c70be3 commit ab55ac7
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 24 deletions.
9 changes: 9 additions & 0 deletions edb/lib/ext/auth.edgeql
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,15 @@ CREATE EXTENSION PACKAGE auth VERSION '1.0' {
indicates that the token should never expire.";
set default := <std::duration>'336 hours';
};

create multi property allowed_redirect_urls -> std::str {
create annotation std::description :=
"When redirecting the user in various flows, the URL will be \
checked against this list to ensure they are going \
to a trusted domain controlled by the application. URLs are \
matched based on checking if the candidate redirect URL is \
a match or a subdirectory of any of these allowed URLs";
};
};

create scalar type ext::auth::SMTPSecurity extending enum<PlainText, TLS, STARTTLS, STARTTLSOrPlainText>;
Expand Down
13 changes: 13 additions & 0 deletions edb/pgsql/patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,18 @@ def _setup_patches(patches: list[tuple[str, str]]) -> list[tuple[str, str]]:
SET default := { cfg::ConnectionTransport.SIMPLE_HTTP };
};
};
'''),
('ext-pkg', 'auth'),
('edgeql+user_ext+config|auth', '''
alter type ext::auth::AuthConfig {
create multi property allowed_redirect_urls -> std::str {
create annotation std::description :=
"When redirecting the user in various flows, the URL will be \
checked against this list to ensure they are going \
to a trusted domain controlled by the application. URLs are \
matched based on checking if the candidate redirect URL is \
a match or a subdirectory of any of these allowed URLs";
};
}
'''),
])
107 changes: 92 additions & 15 deletions edb/server/protocol/auth_ext/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ async def handle_authorize(self, request: Any, response: Any):
redirect_to_on_signup = _maybe_get_search_param(
query, "redirect_to_on_signup"
)
if not self._is_url_allowed(redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
if redirect_to_on_signup and not self._is_url_allowed(
redirect_to_on_signup
):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
challenge = _get_search_param(query, "challenge")
oauth_client = oauth.Client(
db=self.db, provider_name=provider_name, base_url=self.test_url
Expand Down Expand Up @@ -258,6 +268,11 @@ async def handle_callback(self, request: Any, response: Any):
except Exception:
raise errors.InvalidData("Invalid state token")

if not self._is_url_allowed(redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)

params = {
"error": error,
}
Expand All @@ -281,6 +296,16 @@ async def handle_callback(self, request: Any, response: Any):
redirect_to_on_signup = cast(
Optional[str], claims.get("redirect_to_on_signup")
)
if not self._is_url_allowed(redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
if redirect_to_on_signup and not self._is_url_allowed(
redirect_to_on_signup
):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
challenge = cast(str, claims["challenge"])
except Exception:
raise errors.InvalidData("Invalid state token")
Expand Down Expand Up @@ -369,9 +394,13 @@ async def handle_token(self, request: Any, response: Any):
async def handle_register(self, request: Any, response: Any):
data = self._get_data_from_request(request)

maybe_redirect_to = data.get("redirect_to")
maybe_challenge = data.get("challenge")
register_provider_name = data.get("provider")
maybe_redirect_to = cast(Optional[str], data.get("redirect_to"))
if maybe_redirect_to and not self._is_url_allowed(maybe_redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
maybe_challenge = cast(Optional[str], data.get("challenge"))
register_provider_name = cast(Optional[str], data.get("provider"))
if register_provider_name is None:
raise errors.InvalidData('Missing "provider" in register request')

Expand Down Expand Up @@ -452,6 +481,12 @@ async def handle_authenticate(self, request: Any, response: Any):
raise errors.InvalidData('Missing "challenge" in register request')
await pkce.create(self.db, maybe_challenge)

maybe_redirect_to = data.get("redirect_to")
if maybe_redirect_to and not self._is_url_allowed(maybe_redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)

local_client = local.Client(
db=self.db, provider_name=authenticate_provider_name
)
Expand All @@ -471,14 +506,14 @@ async def handle_authenticate(self, request: Any, response: Any):
)
session_token = self._make_session_token(local_identity.id)
_set_cookie(response, "edgedb-session", session_token)
if data.get("redirect_to") is not None:
if maybe_redirect_to:
response.status = http.HTTPStatus.FOUND
redirect_params = urllib.parse.urlencode(
{
"code": pkce_code,
}
)
redirect_url = f"{data['redirect_to']}?{redirect_params}"
redirect_url = f"{maybe_redirect_to}?{redirect_params}"
response.custom_headers["Location"] = redirect_url
else:
response.status = http.HTTPStatus.OK
Expand All @@ -490,9 +525,13 @@ async def handle_authenticate(self, request: Any, response: Any):
).encode()
except Exception as ex:
redirect_on_failure = data.get(
"redirect_on_failure", data.get("redirect_to")
"redirect_on_failure", maybe_redirect_to
)
if redirect_on_failure is not None:
if not self._is_url_allowed(redirect_on_failure):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
response.status = http.HTTPStatus.FOUND
redirect_params = urllib.parse.urlencode(
{
Expand Down Expand Up @@ -594,6 +633,15 @@ async def handle_send_reset_email(self, request: Any, response: Any):

_check_keyset(data, {"provider", "reset_url", "challenge"})
local_client = local.Client(db=self.db, provider_name=data["provider"])
if not self._is_url_allowed(data["reset_url"]):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
maybe_redirect_to = data.get("redirect_to")
if maybe_redirect_to and not self._is_url_allowed(maybe_redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)

try:
try:
Expand Down Expand Up @@ -625,10 +673,10 @@ async def handle_send_reset_email(self, request: Any, response: Any):
"email_sent": data.get('email'),
}

if data.get("redirect_to") is not None:
if maybe_redirect_to:
response.status = http.HTTPStatus.FOUND
redirect_params = urllib.parse.urlencode(return_data)
redirect_url = f"{data['redirect_to']}?{redirect_params}"
redirect_url = f"{maybe_redirect_to}?{redirect_params}"
response.custom_headers["Location"] = redirect_url
else:
response.status = http.HTTPStatus.OK
Expand All @@ -643,9 +691,13 @@ async def handle_send_reset_email(self, request: Any, response: Any):

except Exception as ex:
redirect_on_failure = data.get(
"redirect_on_failure", data.get("redirect_to")
"redirect_on_failure", maybe_redirect_to
)
if redirect_on_failure is not None:
if not self._is_url_allowed(redirect_on_failure):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
response.status = http.HTTPStatus.FOUND
redirect_params = urllib.parse.urlencode(
{
Expand All @@ -664,35 +716,43 @@ async def handle_reset_password(self, request: Any, response: Any):
_check_keyset(data, {"provider", "reset_token"})
local_client = local.Client(db=self.db, provider_name=data["provider"])

maybe_redirect_to = data.get("redirect_to")
if maybe_redirect_to and not self._is_url_allowed(maybe_redirect_to):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)

try:
reset_token = data['reset_token']

identity_id, secret, challenge = self._get_data_from_reset_token(
reset_token
)

await local_client.update_password(
identity_id, secret, data
)
await local_client.update_password(identity_id, secret, data)
await pkce.create(self.db, challenge)
code = await pkce.link_identity_challenge(
self.db, identity_id, challenge
)

if data.get("redirect_to") is not None:
if maybe_redirect_to:
response.status = http.HTTPStatus.FOUND
redirect_params = urllib.parse.urlencode({"code": code})
redirect_url = f"{data['redirect_to']}?{redirect_params}"
redirect_url = f"{maybe_redirect_to}?{redirect_params}"
response.custom_headers["Location"] = redirect_url
else:
response.status = http.HTTPStatus.OK
response.content_type = b"application/json"
response.body = json.dumps({"code": code}).encode()
except Exception as ex:
redirect_on_failure = data.get(
"redirect_on_failure", data.get("redirect_to")
"redirect_on_failure", maybe_redirect_to
)
if redirect_on_failure is not None:
if not self._is_url_allowed(redirect_on_failure):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)
response.status = http.HTTPStatus.FOUND
redirect_params = urllib.parse.urlencode(
{
Expand Down Expand Up @@ -1317,6 +1377,23 @@ async def _try_verify_email(
" in our system, or it might already be verified."
)

def _is_url_allowed(self, url: str) -> bool:
allowed_urls = util.get_config(
self.db,
"ext::auth::AuthConfig::allowed_redirect_urls",
frozenset,
)
allowed_urls = cast(FrozenSet[str], allowed_urls).union(
{self.base_path}
)
lower_url = url.lower()

for allowed_url in allowed_urls:
if lower_url.startswith(allowed_url.lower()):
return True

return False


def _fail_with_error(
*,
Expand Down
Loading

0 comments on commit ab55ac7

Please sign in to comment.