Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SNMPv3 support to nav.Snmp.pynetsnmp implementation #2703

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions python/nav/Snmp/defines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (C) 2023 Sikt
#
# This file is part of Network Administration Visualized (NAV).
#
# NAV is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details. You should have received a copy of the GNU General Public
# License along with NAV. If not, see <http://www.gnu.org/licenses/>.
#
"""Defines types and enumerations for SNMP communication parameters"""
from enum import Enum


class SecurityLevel(Enum):
"""SNMPv3 security levels"""

NO_AUTH_NO_PRIV = "noAuthNoPriv"
AUTH_NO_PRIV = "authNoPriv"
AUTH_PRIV = "authPriv"


class AuthenticationProtocol(Enum):
"""SNMPv3 authentication protocols supported by NET-SNMP"""

MD5 = "MD5"
SHA = "SHA"
SHA512 = "SHA-512"
SHA384 = "SHA-384"
SHA256 = "SHA-256"
SHA224 = "SHA-224"


class PrivacyProtocol(Enum):
"""SNMPv3 privacy protocols supported by NET-SNMP"""

DES = "DES"
AES = "AES"
4 changes: 4 additions & 0 deletions python/nav/Snmp/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ class UnsupportedSnmpVersionError(SnmpError):

class NoSuchObjectError(SnmpError):
"""SNMP agent did not know of this object"""


class SNMPv3ConfigurationError(SnmpError):
"""Error in SNMPv3 configuration parameters"""
90 changes: 79 additions & 11 deletions python/nav/Snmp/pynetsnmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
c_ulong,
c_uint64,
)
from typing import Union, Optional

from IPy import IP
from pynetsnmp import netsnmp
Expand All @@ -48,12 +49,14 @@
)

from nav.oids import OID
from .defines import SecurityLevel, AuthenticationProtocol, PrivacyProtocol
from .errors import (
EndOfMibViewError,
NoSuchObjectError,
SnmpError,
TimeOutException,
UnsupportedSnmpVersionError,
SNMPv3ConfigurationError,
)

PDUVarbind = namedtuple("PDUVarbind", ['oid', 'type', 'value'])
Expand Down Expand Up @@ -85,13 +88,32 @@
"""

def __init__(
self, host, community="public", version="1", port=161, retries=3, timeout=1
self,
host: str,
community: str = "public",
version: Union[str, int] = "1",
port: Union[str, int] = 161,
retries: int = 3,
timeout: int = 1,
# SNMPv3-only options
sec_level: Optional[SecurityLevel] = None,
auth_protocol: Optional[AuthenticationProtocol] = None,
sec_name: Optional[str] = None,
auth_password: Optional[str] = None,
priv_protocol: Optional[PrivacyProtocol] = None,
priv_password: Optional[str] = None,
Comment on lines +91 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean about the ugly __init__.

):
"""Makes a new Snmp-object.

:param host: hostname or IP address
:param community: community (password), defaults to "public"
:param port: udp port number, defaults to "161"
:param sec_level: SNMPv3 security level
:param auth_protocol: SNMPv3 authentication protocol
:param sec_name: SNMPv3 securityName
:param auth_password: SNMPv3 authentication password
:param priv_protocol: SNMPv3 privacy protocol
:param priv_password: SNMPv3 privacy password

"""

Expand All @@ -100,15 +122,52 @@
self.version = str(version)
if self.version == '2':
self.version = '2c'
if self.version not in ('1', '2c'):
if self.version not in ('1', '2c', '3'):
raise UnsupportedSnmpVersionError(self.version)
self.port = int(port)
self.retries = retries
self.timeout = timeout

self.sec_level = SecurityLevel(sec_level) if sec_level else None
self.auth_protocol = (
AuthenticationProtocol(auth_protocol) if auth_protocol else None
)
self.sec_name = sec_name
self.auth_password = auth_password
self.priv_protocol = PrivacyProtocol(priv_protocol) if priv_protocol else None
self.priv_password = priv_password
if self.version == "3":
self._verify_snmpv3_params()

Check warning on line 140 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L140

Added line #L140 was not covered by tests

self.handle = _MySnmpSession(self._build_cmdline())
self.handle.open()

def _verify_snmpv3_params(self):
if not self.sec_level:
raise SNMPv3ConfigurationError("sec_level is required to be set")
if not self.sec_name:
raise SNMPv3ConfigurationError(

Check warning on line 149 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L146-L149

Added lines #L146 - L149 were not covered by tests
"sec_name is required regardless of security level"
)
if self.sec_level in (SecurityLevel.AUTH_NO_PRIV, SecurityLevel.AUTH_PRIV):
if not self.auth_protocol:
raise SNMPv3ConfigurationError(

Check warning on line 154 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L152-L154

Added lines #L152 - L154 were not covered by tests
f"{self.sec_level.value} requires auth_protocol to be set"
)
if not self.auth_password:
raise SNMPv3ConfigurationError(

Check warning on line 158 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L157-L158

Added lines #L157 - L158 were not covered by tests
f"{self.sec_level.value} requires auth_password to be set"
)
if self.sec_level == SecurityLevel.AUTH_PRIV:
if not self.priv_protocol:
raise SNMPv3ConfigurationError(

Check warning on line 163 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L161-L163

Added lines #L161 - L163 were not covered by tests
f"{self.sec_level.value} requires priv_protocol to be set"
)
if not self.priv_password:
raise SNMPv3ConfigurationError(

Check warning on line 167 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L166-L167

Added lines #L166 - L167 were not covered by tests
f"{self.sec_level.value} requires priv_password to be set"
)

def _build_cmdline(self):
try:
address = IP(self.host)
Expand All @@ -117,16 +176,25 @@
else:
host = 'udp6:[%s]' % self.host if address.version() == 6 else self.host

return (
'-v' + self.version,
'-c',
self.community,
'-r',
str(self.retries),
'-t',
str(self.timeout),
'%s:%s' % (host, self.port),
params = [f"-v{self.version}"]

if self.version in ("1", "2c"):
params.extend(["-c", self.community])
elif self.version == "3":
params.extend(["-l", self.sec_level.value, "-u", self.sec_name])
if self.auth_protocol:
params.extend(["-a", self.auth_protocol.value])
if self.auth_password:
params.extend(["-A", self.auth_password])
if self.priv_protocol:
params.extend(["-x", self.priv_protocol.value])
if self.priv_password:
params.extend(["-X", self.priv_password])

Check warning on line 192 in python/nav/Snmp/pynetsnmp.py

View check run for this annotation

Codecov / codecov/patch

python/nav/Snmp/pynetsnmp.py#L183-L192

Added lines #L183 - L192 were not covered by tests

params.extend(
["-r", str(self.retries), "-t", str(self.timeout), f"{host}:{self.port}"]
)
return tuple(params)

def __del__(self):
self.handle.close()
Expand Down
Loading