diff --git a/bin/naventity b/bin/naventity index 239b7543b2..a5cb2c08e9 100755 --- a/bin/naventity +++ b/bin/naventity @@ -164,7 +164,7 @@ def device(devicestring): else: netbox = netbox[0] - if not netbox.get_preferred_snmp_management_profile(writeable=False): + if not netbox.get_preferred_snmp_management_profile(): msg = "No SNMP management profile set for %s" % netbox raise argparse.ArgumentTypeError(msg) diff --git a/python/nav/arnold.py b/python/nav/arnold.py index 53f9ab8e6b..ad97a56efe 100644 --- a/python/nav/arnold.py +++ b/python/nav/arnold.py @@ -300,7 +300,7 @@ def disable(candidate, justification, username, comment="", autoenablestep=0): ) if not candidate.interface.netbox.get_preferred_snmp_management_profile( - writeable=True + require_write=True ): raise NoReadWriteManagementProfileError(candidate.interface.netbox) identity = check_identity(candidate) @@ -322,7 +322,7 @@ def quarantine(candidate, qvlan, justification, username, comment="", autoenable ) if not candidate.interface.netbox.get_preferred_snmp_management_profile( - writeable=True + require_write=True ): raise NoReadWriteManagementProfileError(candidate.interface.netbox) identity = check_identity(candidate) @@ -465,7 +465,7 @@ def change_port_status( # Create snmp-object netbox = identity.interface.netbox - profile = netbox.get_preferred_snmp_management_profile(writeable=True) + profile = netbox.get_preferred_snmp_management_profile(require_write=True) if not profile: raise NoReadWriteManagementProfileError diff --git a/python/nav/ipdevpoll/snmp/common.py b/python/nav/ipdevpoll/snmp/common.py index c50e6fdf9b..baf0b62cd6 100644 --- a/python/nav/ipdevpoll/snmp/common.py +++ b/python/nav/ipdevpoll/snmp/common.py @@ -202,7 +202,7 @@ def factory( """ kwargs_out = {} if netbox: - profile = netbox.get_preferred_snmp_management_profile(writeable=False) + profile = netbox.get_preferred_snmp_management_profile() if profile: if profile.protocol == profile.PROTOCOL_SNMPV3: kwargs["version"] = 3 diff --git a/python/nav/models/manage.py b/python/nav/models/manage.py index 2c68061601..2d5c3492ce 100644 --- a/python/nav/models/manage.py +++ b/python/nav/models/manage.py @@ -160,10 +160,17 @@ def is_snmp(self): def snmp_version(self): """Returns the configured SNMP version as an integer""" if self.protocol == self.PROTOCOL_SNMP: - value = self.configuration['version'] + value = self.configuration.get("version") if value == "2c": return 2 - return int(value) + if value: + return int(value) + else: + _logger.error( + "Broken management profile %s has no SNMP version", self.name + ) + return None + elif self.protocol == self.PROTOCOL_SNMPV3: return 3 @@ -304,11 +311,14 @@ def device(self): return chassis.device def get_preferred_snmp_management_profile( - self, writeable=None + self, require_write=False ) -> Optional[ManagementProfile]: - """ - Returns the snmp management profile with the highest available - SNMP version. + """Returns the snmp management profile with the highest available SNMP version. + + :param require_write: If True, only write-enabled profiles will be + considered. If false, read-only profiles will be + preferred, unless a write-enabled profile is the only + available alternative. """ query = Q( protocol__in=( @@ -316,17 +326,18 @@ def get_preferred_snmp_management_profile( ManagementProfile.PROTOCOL_SNMPV3, ) ) - if writeable: + if require_write: query = query & Q(configuration__write=True) - elif writeable is not None: - query = query & ( - Q(configuration__write=False) | ~Q(configuration__has_key='write') + + profiles = self.profiles.filter(query) + + if not require_write: + # Sort read-only profiles first + profiles = sorted( + profiles, key=lambda p: p.configuration.get("write", False) ) - profiles = sorted( - self.profiles.filter(query), - key=lambda p: p.snmp_version or 0, - reverse=True, - ) + + profiles = sorted(profiles, key=lambda p: p.snmp_version or 0, reverse=True) if profiles: return profiles[0] diff --git a/python/nav/portadmin/snmp/base.py b/python/nav/portadmin/snmp/base.py index f2eb65b0f0..c101bd662e 100644 --- a/python/nav/portadmin/snmp/base.py +++ b/python/nav/portadmin/snmp/base.py @@ -169,7 +169,7 @@ def _get_query(self, oid, if_index): def _get_read_only_handle(self): """Get a read only SNMP-handle.""" if self.read_only_handle is None: - profile = self.netbox.get_preferred_snmp_management_profile(writeable=False) + profile = self.netbox.get_preferred_snmp_management_profile() if not profile: raise NoReadOnlyManagementProfileError @@ -201,7 +201,9 @@ def _get_read_write_handle(self): :rtype: nav.Snmp.Snmp """ if self.read_write_handle is None: - profile = self.netbox.get_preferred_snmp_management_profile(writeable=True) + profile = self.netbox.get_preferred_snmp_management_profile( + require_write=True + ) self.read_write_handle = get_snmp_session_for_profile(profile)( host=self.netbox.ip, retries=self.retries, @@ -563,7 +565,7 @@ def is_port_access_control_enabled(self): raise ProtocolError("SNMP error") from error def raise_if_not_configurable(self): - if not self.netbox.get_preferred_snmp_management_profile(writeable=True): + if not self.netbox.get_preferred_snmp_management_profile(require_write=True): raise DeviceNotConfigurableError( "No writeable SNMP management profile set for this device, " "changes cannot be saved" diff --git a/python/nav/web/portadmin/views.py b/python/nav/web/portadmin/views.py index f6f268af87..7069115f04 100644 --- a/python/nav/web/portadmin/views.py +++ b/python/nav/web/portadmin/views.py @@ -214,9 +214,10 @@ def populate_infodict(request, netbox, interfaces): "%s did not respond within the set timeouts. Values displayed are from database" % netbox.sysname, ) - if isinstance( - handler, SNMPHandler - ) and not netbox.get_preferred_snmp_management_profile(writeable=False): + if ( + isinstance(handler, SNMPHandler) + and not netbox.get_preferred_snmp_management_profile() + ): messages.error(request, "Read only management profile not set") except ProtocolError: readonly = True diff --git a/tests/unittests/models/netbox_test.py b/tests/unittests/models/netbox_test.py new file mode 100644 index 0000000000..8273625713 --- /dev/null +++ b/tests/unittests/models/netbox_test.py @@ -0,0 +1,65 @@ +from unittest.mock import patch + +from nav.models.manage import Netbox, ManagementProfile + +import pytest + + +class TestGetPreferredSnmpProfiles: + def test_when_write_required_is_false_then_it_should_prefer_a_read_profile( + self, mocked_netbox + ): + profile = mocked_netbox.get_preferred_snmp_management_profile() + assert profile.name == "v2 read" + + def test_when_write_required_is_true_then_it_should_return_a_write_profile( + self, mocked_netbox + ): + profile = mocked_netbox.get_preferred_snmp_management_profile( + require_write=True + ) + assert profile.name.startswith("v2 write") + + def test_when_write_required_is_false_it_should_return_a_writable_profile_of_a_higher_snmp_version( + self, mocked_netbox + ): + profiles = mocked_netbox.profiles.filter() + profiles.append( + ManagementProfile( + name="v3 write", + protocol=ManagementProfile.PROTOCOL_SNMPV3, + configuration={"write": True}, + ) + ) + profile = mocked_netbox.get_preferred_snmp_management_profile() + assert profile.name == "v3 write" + + +@pytest.fixture +def mocked_netbox(mock_profiles): + with patch.object(Netbox, "profiles") as profiles: + netbox = Netbox(sysname="test", ip="127.0.0.1") + profiles.filter.return_value = mock_profiles + yield netbox + + +@pytest.fixture +def mock_profiles(): + profiles = [ + ManagementProfile( + name="v2 write first", + protocol=ManagementProfile.PROTOCOL_SNMP, + configuration={"version": 2, "write": True}, + ), + ManagementProfile( + name="v2 write second", + protocol=ManagementProfile.PROTOCOL_SNMP, + configuration={"version": 2, "write": True}, + ), + ManagementProfile( + name="v2 read", + protocol=ManagementProfile.PROTOCOL_SNMP, + configuration={"version": 2, "write": False}, + ), + ] + yield profiles