Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow write-enabled SNMP profiles to be used for reading when no read-only profiles are available #2751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/naventity
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def device(devicestring):
else:
netbox = netbox[0]

if not netbox.get_preferred_snmp_management_profile(writeable=False):
if not netbox.get_preferred_snmp_management_profile():
msg = "No SNMP management profile set for %s" % netbox
raise argparse.ArgumentTypeError(msg)

Expand Down
6 changes: 3 additions & 3 deletions python/nav/arnold.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def disable(candidate, justification, username, comment="", autoenablestep=0):
)

if not candidate.interface.netbox.get_preferred_snmp_management_profile(
writeable=True
require_write=True
):
raise NoReadWriteManagementProfileError(candidate.interface.netbox)
identity = check_identity(candidate)
Expand All @@ -322,7 +322,7 @@ def quarantine(candidate, qvlan, justification, username, comment="", autoenable
)

if not candidate.interface.netbox.get_preferred_snmp_management_profile(
writeable=True
require_write=True
):
raise NoReadWriteManagementProfileError(candidate.interface.netbox)
identity = check_identity(candidate)
Expand Down Expand Up @@ -465,7 +465,7 @@ def change_port_status(

# Create snmp-object
netbox = identity.interface.netbox
profile = netbox.get_preferred_snmp_management_profile(writeable=True)
profile = netbox.get_preferred_snmp_management_profile(require_write=True)

if not profile:
raise NoReadWriteManagementProfileError
Expand Down
2 changes: 1 addition & 1 deletion python/nav/ipdevpoll/snmp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def factory(
"""
kwargs_out = {}
if netbox:
profile = netbox.get_preferred_snmp_management_profile(writeable=False)
profile = netbox.get_preferred_snmp_management_profile()
if profile:
if profile.protocol == profile.PROTOCOL_SNMPV3:
kwargs["version"] = 3
Expand Down
41 changes: 26 additions & 15 deletions python/nav/models/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,17 @@
def snmp_version(self):
"""Returns the configured SNMP version as an integer"""
if self.protocol == self.PROTOCOL_SNMP:
value = self.configuration['version']
value = self.configuration.get("version")
if value == "2c":
return 2
return int(value)
if value:
return int(value)
else:
_logger.error(

Check warning on line 169 in python/nav/models/manage.py

View check run for this annotation

Codecov / codecov/patch

python/nav/models/manage.py#L169

Added line #L169 was not covered by tests
"Broken management profile %s has no SNMP version", self.name
)
return None

Check warning on line 172 in python/nav/models/manage.py

View check run for this annotation

Codecov / codecov/patch

python/nav/models/manage.py#L172

Added line #L172 was not covered by tests

elif self.protocol == self.PROTOCOL_SNMPV3:
return 3

Expand Down Expand Up @@ -304,29 +311,33 @@
return chassis.device

def get_preferred_snmp_management_profile(
self, writeable=None
self, require_write=False
) -> Optional[ManagementProfile]:
"""
Returns the snmp management profile with the highest available
SNMP version.
"""Returns the snmp management profile with the highest available SNMP version.

:param require_write: If True, only write-enabled profiles will be
considered. If false, read-only profiles will be
preferred, unless a write-enabled profile is the only
available alternative.
"""
query = Q(
protocol__in=(
ManagementProfile.PROTOCOL_SNMP,
ManagementProfile.PROTOCOL_SNMPV3,
)
)
if writeable:
if require_write:
query = query & Q(configuration__write=True)
elif writeable is not None:
query = query & (
Q(configuration__write=False) | ~Q(configuration__has_key='write')

profiles = self.profiles.filter(query)

if not require_write:
# Sort read-only profiles first
profiles = sorted(
profiles, key=lambda p: p.configuration.get("write", False)
)
lunkwill42 marked this conversation as resolved.
Show resolved Hide resolved
profiles = sorted(
self.profiles.filter(query),
key=lambda p: p.snmp_version or 0,
reverse=True,
)

profiles = sorted(profiles, key=lambda p: p.snmp_version or 0, reverse=True)
if profiles:
return profiles[0]

Expand Down
8 changes: 5 additions & 3 deletions python/nav/portadmin/snmp/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
def _get_read_only_handle(self):
"""Get a read only SNMP-handle."""
if self.read_only_handle is None:
profile = self.netbox.get_preferred_snmp_management_profile(writeable=False)
profile = self.netbox.get_preferred_snmp_management_profile()

if not profile:
raise NoReadOnlyManagementProfileError
Expand Down Expand Up @@ -201,7 +201,9 @@
:rtype: nav.Snmp.Snmp
"""
if self.read_write_handle is None:
profile = self.netbox.get_preferred_snmp_management_profile(writeable=True)
profile = self.netbox.get_preferred_snmp_management_profile(
require_write=True
)
self.read_write_handle = get_snmp_session_for_profile(profile)(
host=self.netbox.ip,
retries=self.retries,
Expand Down Expand Up @@ -563,7 +565,7 @@
raise ProtocolError("SNMP error") from error

def raise_if_not_configurable(self):
if not self.netbox.get_preferred_snmp_management_profile(writeable=True):
if not self.netbox.get_preferred_snmp_management_profile(require_write=True):

Check warning on line 568 in python/nav/portadmin/snmp/base.py

View check run for this annotation

Codecov / codecov/patch

python/nav/portadmin/snmp/base.py#L568

Added line #L568 was not covered by tests
raise DeviceNotConfigurableError(
"No writeable SNMP management profile set for this device, "
"changes cannot be saved"
Expand Down
7 changes: 4 additions & 3 deletions python/nav/web/portadmin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@
"%s did not respond within the set timeouts. Values displayed are from database"
% netbox.sysname,
)
if isinstance(
handler, SNMPHandler
) and not netbox.get_preferred_snmp_management_profile(writeable=False):
if (

Check warning on line 217 in python/nav/web/portadmin/views.py

View check run for this annotation

Codecov / codecov/patch

python/nav/web/portadmin/views.py#L217

Added line #L217 was not covered by tests
isinstance(handler, SNMPHandler)
and not netbox.get_preferred_snmp_management_profile()
):
messages.error(request, "Read only management profile not set")
except ProtocolError:
readonly = True
Expand Down
65 changes: 65 additions & 0 deletions tests/unittests/models/netbox_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from unittest.mock import patch

from nav.models.manage import Netbox, ManagementProfile

import pytest


class TestGetPreferredSnmpProfiles:
def test_when_write_required_is_false_then_it_should_prefer_a_read_profile(
self, mocked_netbox
):
profile = mocked_netbox.get_preferred_snmp_management_profile()
assert profile.name == "v2 read"

def test_when_write_required_is_true_then_it_should_return_a_write_profile(
self, mocked_netbox
):
profile = mocked_netbox.get_preferred_snmp_management_profile(
require_write=True
)
assert profile.name.startswith("v2 write")

def test_when_write_required_is_false_it_should_return_a_writable_profile_of_a_higher_snmp_version(
self, mocked_netbox
):
profiles = mocked_netbox.profiles.filter()
profiles.append(
ManagementProfile(
name="v3 write",
protocol=ManagementProfile.PROTOCOL_SNMPV3,
configuration={"write": True},
)
)
profile = mocked_netbox.get_preferred_snmp_management_profile()
assert profile.name == "v3 write"


@pytest.fixture
def mocked_netbox(mock_profiles):
with patch.object(Netbox, "profiles") as profiles:
netbox = Netbox(sysname="test", ip="127.0.0.1")
profiles.filter.return_value = mock_profiles
yield netbox


@pytest.fixture
def mock_profiles():
profiles = [
ManagementProfile(
name="v2 write first",
protocol=ManagementProfile.PROTOCOL_SNMP,
configuration={"version": 2, "write": True},
),
ManagementProfile(
name="v2 write second",
protocol=ManagementProfile.PROTOCOL_SNMP,
configuration={"version": 2, "write": True},
),
ManagementProfile(
name="v2 read",
protocol=ManagementProfile.PROTOCOL_SNMP,
configuration={"version": 2, "write": False},
),
]
yield profiles
Loading