From c182d0d78c6742a366671e8e5ec222da31b09c4a Mon Sep 17 00:00:00 2001 From: Hanne Moa Date: Fri, 10 Nov 2023 13:49:43 +0100 Subject: [PATCH] Move more ldapauth stuff to ldap.py --- python/nav/web/auth/__init__.py | 39 ++------------- python/nav/web/auth/ldap.py | 64 ++++++++++++++++++++++-- tests/unittests/general/webfront_test.py | 4 +- 3 files changed, 67 insertions(+), 40 deletions(-) diff --git a/python/nav/web/auth/__init__.py b/python/nav/web/auth/__init__.py index 887e0b8686..df6005c147 100644 --- a/python/nav/web/auth/__init__.py +++ b/python/nav/web/auth/__init__.py @@ -55,20 +55,9 @@ def authenticate(username, password): try: account = Account.objects.get(login__iexact=username) except Account.DoesNotExist: - if ldap.available: - ldap_user = ldap.authenticate(username, password) - # If we authenticated, store the user in database. - if ldap_user: - account = Account( - login=ldap_user.username, - name=ldap_user.get_real_name(), - ext_sync='ldap', - ) - account = update_ldap_user(ldap_user, account, password) - # We're authenticated now - return account - # No account, bail out - return None + # account autocreated if username is authenticated + account = ldap.authenticate(username, password) + return account if account.locked: _logger.info("Locked user %s tried to log in", account.login) @@ -76,12 +65,12 @@ def authenticate(username, password): if account.ext_sync == 'ldap' and ldap.available: try: - ldap_user = ldap.authenticate(username, password) + ldap_user = ldap.get_ldap_user(username, password) except ldap.NoAnswerError: pass else: if ldap_user: - account = update_ldap_user(ldap_user, account, password) + account = ldap.update_ldap_user(ldap_user, account, password) return account return None # Fallback to stored password if ldap is unavailable @@ -91,24 +80,6 @@ def authenticate(username, password): return None -def update_ldap_user(ldap_user, account, password): - account.set_password(password) - account.save() - _handle_ldap_admin_status(ldap_user, account) - return account - - -def _handle_ldap_admin_status(ldap_user, nav_account): - is_admin = ldap_user.is_admin() - # Only modify admin status if an entitlement is configured in webfront.conf - if is_admin is not None: - admin_group = AccountGroup.objects.get(id=AccountGroup.ADMIN_GROUP) - if is_admin: - nav_account.groups.add(admin_group) - else: - nav_account.groups.remove(admin_group) - - def get_login_url(request): """Calculate which login_url to use""" path = parse.quote(request.get_full_path()) diff --git a/python/nav/web/auth/ldap.py b/python/nav/web/auth/ldap.py index 344608cd05..a3271c2d7b 100644 --- a/python/nav/web/auth/ldap.py +++ b/python/nav/web/auth/ldap.py @@ -24,6 +24,7 @@ import nav.errors from nav.config import NAVConfigParser +from nav.models.profiles import Account, AccountGroup _logger = logging.getLogger(__name__) @@ -121,11 +122,66 @@ def open_ldap(): return lconn -def authenticate(login, password): +def authenticate(username, password): """ - Attempt to authenticate the login name with password against the - configured LDAP server. If the user is authenticated, required - group memberships are also verified. + Authenticate the username and password against the configured LDAP server. + + Required group memberships are also verified. + + Returns an authenticated Account with updated groups, or None. + """ + if not available: + return None + ldap_user = get_ldap_user(username, password) + try: + account = Account.objects.get(login__iexact=username, ext_sync='ldap') + except Account.DoesNotExist: + if ldap_user: + account = autocreate_ldap_user(ldap_user, password) + return account + if account.locked: + _logger.info("Locked user %s tried to log in", account.login) + return None + if account.check_password(password): + account = update_ldap_user(ldap_user, account, password) + return account + return None + + +def autocreate_ldap_user(ldap_user, password): + account = Account( + login=ldap_user.username, + name=ldap_user.get_real_name(), + ext_sync='ldap', + ) + account = update_ldap_user(ldap_user, account, password) + return account + + +def update_ldap_user(ldap_user, account, password): + account.set_password(password) + account.save() + _handle_ldap_admin_status(ldap_user, account) + return account + + +def _handle_ldap_admin_status(ldap_user, nav_account): + is_admin = ldap_user.is_admin() + # Only modify admin status if an entitlement is configured in webfront.conf + if is_admin is not None: + admin_group = AccountGroup.objects.get(id=AccountGroup.ADMIN_GROUP) + if is_admin: + nav_account.groups.add(admin_group) + else: + nav_account.groups.remove(admin_group) + + +def get_ldap_user(login, password): + """ + Fetch an LDAPUser from an ldap server if login and password matches. + + Returns an autenticated LDAPUser of a specific group or with specific + entitlements, or False. """ lconn = open_ldap() server = _config.get('ldap', 'server') diff --git a/tests/unittests/general/webfront_test.py b/tests/unittests/general/webfront_test.py index 5b23d931d5..07becf6b1e 100644 --- a/tests/unittests/general/webfront_test.py +++ b/tests/unittests/general/webfront_test.py @@ -31,12 +31,12 @@ def test_authenticate_should_return_account_when_ldap_says_yes(self): ldap_user = Mock() ldap_user.is_admin.return_value = None # mock to avoid database access with patch("nav.web.auth.ldap.available", new=True): - with patch("nav.web.auth.ldap.authenticate", return_value=ldap_user): + with patch("nav.web.auth.ldap.get_ldap_user", return_value=ldap_user): assert auth.authenticate('knight', 'shrubbery') == LDAP_ACCOUNT def test_authenticate_should_return_false_when_ldap_says_no(self): with patch("nav.web.auth.ldap.available", new=True): - with patch("nav.web.auth.ldap.authenticate", return_value=False): + with patch("nav.web.auth.ldap.get_ldap_user", return_value=False): assert not auth.authenticate('knight', 'shrubbery') def test_authenticate_should_fallback_when_ldap_is_disabled(self):