Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(flags): limit scopes for secret updates #82897

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions src/sentry/flags/endpoints/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,35 @@ def post(self, request: Request, organization: Organization) -> Response:
if not validator.is_valid():
return self.respond(validator.errors, status=400)

FlagWebHookSigningSecretModel.objects.create_or_update(
organization=organization,
provider=validator.validated_data["provider"],
values={
"created_by": request.user.id,
"date_added": datetime.now(tz=timezone.utc),
"provider": validator.validated_data["provider"],
"secret": validator.validated_data["secret"],
},
)
# these scopes can always update or post secrets
has_update_or_post_access = request.access.has_scope(
"org:write"
) or request.access.has_scope("org:admin")

try:
secret = FlagWebHookSigningSecretModel.objects.filter(
organization_id=organization.id
).get(provider=validator.validated_data["provider"])
# allow update access if the user created the secret
if request.user.id == secret.created_by:
has_update_or_post_access = True
except FlagWebHookSigningSecretModel.DoesNotExist:
# anyone can post a new secret
has_update_or_post_access = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section confused me a little and is probably brittle. I think we can simplify and make it more understandable.

I would make two variables: has_permission and is_creator. And then implement as:

has_permission = request.access.has_scope(...)

try:
    ...
    is_creator = request.user.id == secret.created_by
except:
    is_creator = True

Finally in the call-site:

if is_creator or has_permission:
    ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!


if has_update_or_post_access:
FlagWebHookSigningSecretModel.objects.create_or_update(
organization=organization,
provider=validator.validated_data["provider"],
values={
"created_by": request.user.id,
"date_added": datetime.now(tz=timezone.utc),
"provider": validator.validated_data["provider"],
"secret": validator.validated_data["secret"],
},
)
else:
return Response("Not authorized.", status=401)
michellewzhang marked this conversation as resolved.
Show resolved Hide resolved

return Response(status=201)
Copy link
Member

@cmanallen cmanallen Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if we can only return this response if the boolean condition evaluates to true then we should move the response into the boolean itself. It's less confusing IMO if we know the function terminates at this conditional.

Edit: move into the if branch. Moving in to the boolean doesn't make any sense lol.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this as well


Expand Down
86 changes: 86 additions & 0 deletions tests/sentry/flags/endpoints/test_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,92 @@ def test_post_other_organization(self):
response = self.client.post(url, data={})
assert response.status_code == 403, response.content

def test_update_same_creator(self):
new_user = self.create_user("[email protected]")
member = self.create_member(organization=self.organization, user=new_user)
self.login_as(user=member)

with self.feature(self.features):
response = self.client.post(
self.url,
data={"secret": "41271af8b9804cd99a4c787a28274991", "provider": "generic"},
)
assert response.status_code == 201, response.content

models = FlagWebHookSigningSecretModel.objects.filter(provider="generic").all()
assert len(models) == 1
assert models[0].secret == "41271af8b9804cd99a4c787a28274991"

# update secret should be allowed since the creator is the same
with self.feature(self.features):
response = self.client.post(
self.url,
data={"secret": "31271af8b9804cd99a4c787a28274993", "provider": "generic"},
)
assert response.status_code == 201, response.content

models = FlagWebHookSigningSecretModel.objects.filter(provider="generic").all()
assert len(models) == 1
assert models[0].secret == "31271af8b9804cd99a4c787a28274993"

def test_update_no_access(self):
FlagWebHookSigningSecretModel.objects.create(
created_by="12314124",
organization=self.organization,
provider="generic",
secret="41271af8b9804cd99a4c787a28274991",
)

models = FlagWebHookSigningSecretModel.objects.filter(provider="generic").all()
assert len(models) == 1
assert models[0].secret == "41271af8b9804cd99a4c787a28274991"

# update secret should not allowed since the creator is not the same
new_user = self.create_user("[email protected]")
member = self.create_member(organization=self.organization, user=new_user)
self.login_as(user=member)
with self.feature(self.features):
response = self.client.post(
self.url,
data={"secret": "31271af8b9804cd99a4c787a28274993", "provider": "generic"},
)
assert response.status_code == 401, response.content

models = FlagWebHookSigningSecretModel.objects.filter(provider="generic").all()
assert len(models) == 1
assert models[0].secret == "41271af8b9804cd99a4c787a28274991"

def test_update_has_scope(self):
FlagWebHookSigningSecretModel.objects.create(
created_by="12314124",
organization=self.organization,
provider="generic",
secret="41271af8b9804cd99a4c787a28274991",
)

models = FlagWebHookSigningSecretModel.objects.filter(provider="generic").all()
assert len(models) == 1
assert models[0].secret == "41271af8b9804cd99a4c787a28274991"

# update secret should be allowed due to proper scope
new_user = self.create_user("[email protected]")
owner = self.create_member(
organization=self.organization,
user=new_user,
role="owner",
)
self.login_as(user=owner)
with self.feature(self.features):
response = self.client.post(
self.url,
data={"secret": "31271af8b9804cd99a4c787a28274993", "provider": "generic"},
)
assert response.status_code == 201, response.content

models = FlagWebHookSigningSecretModel.objects.filter(provider="generic").all()
assert len(models) == 1
assert models[0].secret == "31271af8b9804cd99a4c787a28274993"


class OrganizationFlagsWebHookSigningSecretEndpointTestCase(APITestCase):
endpoint = "sentry-api-0-organization-flag-hooks-signing-secret"
Expand Down
Loading