Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expire user permissions #72

Merged
merged 18 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions hq_superset/hq_domain.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import superset

from datetime import timedelta
from flask import flash, g, redirect, request, session, url_for

from .utils import SESSION_USER_DOMAINS_KEY
from superset.config import USER_DOMAIN_ROLE_EXPIRY

from .utils import (
SESSION_DOMAIN_ROLE_LAST_SYNCED_AT,
SESSION_USER_DOMAINS_KEY,
DomainSyncUtil,
datetime_utcnow_naive,
)


def before_request_hook():
Expand All @@ -9,7 +19,10 @@ def before_request_hook():
if any hook returns a response,
break the chain and return that response
"""
hooks = [ensure_domain_selected]
hooks = [
ensure_domain_selected,
sync_user_domain_role
]
for _function in hooks:
response = _function()
if response:
Expand Down Expand Up @@ -64,6 +77,28 @@ def ensure_domain_selected():
return redirect(url_for('SelectDomainView.list', next=request.url))


def sync_user_domain_role():
if is_user_admin() or (
request.url_rule
and request.url_rule.endpoint in DOMAIN_EXCLUDED_VIEWS
):
return
if _domain_role_expired():
_sync_domain_role()


def _domain_role_expired():
if not session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT):
return True

time_since_last_sync = datetime_utcnow_naive() - session[SESSION_DOMAIN_ROLE_LAST_SYNCED_AT]
Copy link
Contributor

@Charl1996 Charl1996 Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkangia
Nit: for more robust implementation we could check if session[SESSION_DOMAIN_ROLE_LAST_SYNCED_AT] is a datetime-like object that could be operated on.

How about:

last_sync_datetime = session[SESSION_DOMAIN_ROLE_LAST_SYNCED_AT]
if not isinstance(last_sync_datetime, datetime):
    return True

time_since_last_sync = datetime_utcnow_naive() - last_sync_datetime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I can add that. This value is only always set by CCA and can't edited by the user so it would certainly be a datetime but no harm in adding safety.

return time_since_last_sync >= timedelta(minutes=USER_DOMAIN_ROLE_EXPIRY)


def _sync_domain_role():
DomainSyncUtil(superset.appbuilder.sm).sync_domain_role(g.hq_domain)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkangia
What happens if the domain role cannot sync during this routine? Is it fine to keep the user logged in in the current state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, Great question.

I guess I will follow the same pattern you have followed otherwise in case sync domain role returns false and redirect user to the select domain page, though possibly with a different error message like "Your permissions have expired and the sync failed due to an unexpected error. Please select project space again or login to continue"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This turned out to be a little challenging, but I got it working like this
a195929

Screenshot from 2024-11-05 00-58-37



def is_valid_user_domain(hq_domain):
# Admins have access to all domains
return is_user_admin() or hq_domain in user_domains()
Expand Down
1 change: 1 addition & 0 deletions hq_superset/tests/config_for_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@
# CommCare Analytics extensions
FLASK_APP_MUTATOR = flask_app_mutator
CUSTOM_SECURITY_MANAGER = oauth.CommCareSecurityManager
USER_DOMAIN_ROLE_EXPIRY = 60 # minutes
61 changes: 61 additions & 0 deletions hq_superset/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from hq_superset.tests.utils import MockResponse, OAuthMock, UserMock
from hq_superset.utils import (
SESSION_DOMAIN_ROLE_LAST_SYNCED_AT,
SESSION_USER_DOMAINS_KEY,
DomainSyncUtil,
get_schema_name_for_domain,
Expand Down Expand Up @@ -311,3 +312,63 @@ def _test_upload(test_data, expected_output):

client.get('/hq_datasource/list/', follow_redirects=True)
self.assert_context('ucr_id_to_pks', {})

@patch('hq_superset.hq_requests.get_valid_cchq_oauth_token', return_value={})
@patch('hq_superset.hq_domain._sync_domain_role', return_value=True)
def test_sync_user_domain_role_calls(self, sync_domain_role_mock, *args):
with self.app.test_client() as client:
assert SESSION_USER_DOMAINS_KEY not in session
self.login(client)
self.assertIsNone(session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))
# not called on login
sync_domain_role_mock.assert_not_called()

response = client.get('/', follow_redirects=True)
self.assertEqual(response.status, "200 OK")
self.assertTrue('/domain/list' in response.request.path)
self.assertIsNone(session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))
# not called on domain listing after login
sync_domain_role_mock.assert_not_called()

client.get('/domain/select/test1/', follow_redirects=True)
self.assertIsNotNone(session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))
# not called on domain selection
sync_domain_role_mock.assert_not_called()

client.get('/hq_datasource/list/', follow_redirects=True)
self.assertIsNotNone(session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))
# not called if recently synced
sync_domain_role_mock.assert_not_called()

with patch('hq_superset.hq_domain.USER_DOMAIN_ROLE_EXPIRY', 0):
client.get('/hq_datasource/list/', follow_redirects=True)
self.assertIsNotNone(session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))
# called only if expired
sync_domain_role_mock.assert_called()

self.logout(client)

@patch('hq_superset.hq_requests.get_valid_cchq_oauth_token', return_value={})
def test_sync_user_domain_before_request(self, *args):
with self.app.test_client() as client:
assert SESSION_USER_DOMAINS_KEY not in session
self.login(client)

client.get('/domain/select/test1/', follow_redirects=True)
self.assertIsNotNone(session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))

with patch('hq_superset.hq_domain.USER_DOMAIN_ROLE_EXPIRY', 0):
session_role_last_synced_at = session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT)
client.get('/hq_datasource/list/', follow_redirects=True)
# timestamp in session updated
self.assertNotEqual(session_role_last_synced_at, session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT))

with patch.object(DomainSyncUtil, 'sync_domain_role') as domain_sync_util_mock:
client.get('/hq_datasource/list/', follow_redirects=True)
# confirm function call for sync
domain_sync_util_mock.assert_called_once_with(
"test1"
)

self.logout(client)

2 changes: 1 addition & 1 deletion hq_superset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def sync_domain_role(self, domain):

self.sm.get_session.add(current_user)
self.sm.get_session.commit()
session['domain_role_last_synced_at'] = datetime_utcnow_naive()
session[SESSION_DOMAIN_ROLE_LAST_SYNCED_AT] = datetime_utcnow_naive()
return True

def _ensure_hq_user_role(self):
Expand Down
2 changes: 2 additions & 0 deletions superset_config.example.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,5 @@ class CeleryConfig:
"session_cookie_secure": False,
}

USER_DOMAIN_ROLE_EXPIRY = 60 # minutes