Skip to content

Commit

Permalink
Merge branch 'feature/snmpv3-synchronous' into snmpv3-synchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
lunkwill42 committed Nov 9, 2023
2 parents ca5a45c + a613588 commit 6a919a6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 13 deletions.
43 changes: 43 additions & 0 deletions python/nav/Snmp/defines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (C) 2023 Sikt
#
# This file is part of Network Administration Visualized (NAV).
#
# NAV is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details. You should have received a copy of the GNU General Public
# License along with NAV. If not, see <http://www.gnu.org/licenses/>.
#
"""Defines types and enumerations for SNMP communication parameters"""
from enum import Enum


class SecurityLevel(Enum):
"""SNMPv3 security levels"""

NO_AUTH_NO_PRIV = "noAuthNoPriv"
AUTH_NO_PRIV = "authNoPriv"
AUTH_PRIV = "authPriv"


class AuthenticationProtocol(Enum):
"""SNMPv3 authentication protocols supported by NET-SNMP"""

MD5 = "MD5"
SHA = "SHA"
SHA512 = "SHA-512"
SHA384 = "SHA-384"
SHA256 = "SHA-256"
SHA224 = "SHA-224"


class PrivacyProtocol(Enum):
"""SNMPv3 privacy protocols supported by NET-SNMP"""

DES = "DES"
AES = "AES"
4 changes: 4 additions & 0 deletions python/nav/Snmp/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ class UnsupportedSnmpVersionError(SnmpError):

class NoSuchObjectError(SnmpError):
"""SNMP agent did not know of this object"""


class SNMPv3ConfigurationError(SnmpError):
"""Error in SNMPv3 configuration parameters"""
95 changes: 82 additions & 13 deletions python/nav/Snmp/pynetsnmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
c_ulong,
c_uint64,
)
from typing import Union, Optional

from IPy import IP
from pynetsnmp import netsnmp
Expand All @@ -48,12 +49,14 @@
)

from nav.oids import OID
from .defines import SecurityLevel, AuthenticationProtocol, PrivacyProtocol
from .errors import (
EndOfMibViewError,
NoSuchObjectError,
SnmpError,
TimeOutException,
UnsupportedSnmpVersionError,
SNMPv3ConfigurationError,
)

PDUVarbind = namedtuple("PDUVarbind", ['oid', 'type', 'value'])
Expand Down Expand Up @@ -85,30 +88,86 @@ class Snmp(object):
"""

def __init__(
self, host, community="public", version="1", port=161, retries=3, timeout=1
self,
host: str,
community: str = "public",
version: Union[str, int] = "1",
port: Union[str, int] = 161,
retries: int = 3,
timeout: int = 1,
# SNMPv3-only options
sec_level: Optional[SecurityLevel] = None,
auth_protocol: Optional[AuthenticationProtocol] = None,
sec_name: Optional[str] = None,
auth_password: Optional[str] = None,
priv_protocol: Optional[PrivacyProtocol] = None,
priv_password: Optional[str] = None,
):
"""Makes a new Snmp-object.
:param host: hostname or IP address
:param community: community (password), defaults to "public"
:param port: udp port number, defaults to "161"
:param sec_level: SNMPv3 security level
:param auth_protocol: SNMPv3 authentication protocol
:param sec_name: SNMPv3 securityName
:param auth_password: SNMPv3 authentication password
:param priv_protocol: SNMPv3 privacy protocol
:param priv_password: SNMPv3 privacy password
"""

self.handle = None
self.host = host
self.community = str(community)
self.version = str(version)
if self.version == '2':
self.version = '2c'
if self.version not in ('1', '2c'):
if self.version not in ('1', '2c', '3'):
raise UnsupportedSnmpVersionError(self.version)
self.port = int(port)
self.retries = retries
self.timeout = timeout

self.sec_level = SecurityLevel(sec_level) if sec_level else None
self.auth_protocol = (
AuthenticationProtocol(auth_protocol) if auth_protocol else None
)
self.sec_name = sec_name
self.auth_password = auth_password
self.priv_protocol = PrivacyProtocol(priv_protocol) if priv_protocol else None
self.priv_password = priv_password
if self.version == "3":
self._verify_snmpv3_params()

self.handle = _MySnmpSession(self._build_cmdline())
self.handle.open()

def _verify_snmpv3_params(self):
if not self.sec_level:
raise SNMPv3ConfigurationError("sec_level is required to be set")
if not self.sec_name:
raise SNMPv3ConfigurationError(
"sec_name is required regardless of security level"
)
if self.sec_level in (SecurityLevel.AUTH_NO_PRIV, SecurityLevel.AUTH_PRIV):
if not self.auth_protocol:
raise SNMPv3ConfigurationError(
f"{self.sec_level.value} requires auth_protocol to be set"
)
if not self.auth_password:
raise SNMPv3ConfigurationError(
f"{self.sec_level.value} requires auth_password to be set"
)
if self.sec_level == SecurityLevel.AUTH_PRIV:
if not self.priv_protocol:
raise SNMPv3ConfigurationError(
f"{self.sec_level.value} requires priv_protocol to be set"
)
if not self.priv_password:
raise SNMPv3ConfigurationError(
f"{self.sec_level.value} requires priv_password to be set"
)

def _build_cmdline(self):
try:
address = IP(self.host)
Expand All @@ -117,19 +176,29 @@ def _build_cmdline(self):
else:
host = 'udp6:[%s]' % self.host if address.version() == 6 else self.host

return (
'-v' + self.version,
'-c',
self.community,
'-r',
str(self.retries),
'-t',
str(self.timeout),
'%s:%s' % (host, self.port),
params = [f"-v{self.version}"]

if self.version in ("1", "2c"):
params.extend(["-c", self.community])
elif self.version == "3":
params.extend(["-l", self.sec_level.value, "-u", self.sec_name])
if self.auth_protocol:
params.extend(["-a", self.auth_protocol.value])
if self.auth_password:
params.extend(["-A", self.auth_password])
if self.priv_protocol:
params.extend(["-x", self.priv_protocol.value])
if self.priv_password:
params.extend(["-X", self.priv_password])

params.extend(
["-r", str(self.retries), "-t", str(self.timeout), f"{host}:{self.port}"]
)
return tuple(params)

def __del__(self):
self.handle.close()
if self.handle:
self.handle.close()

def get(self, query="1.3.6.1.2.1.1.1.0"):
"""Performs an SNMP GET query.
Expand Down

0 comments on commit 6a919a6

Please sign in to comment.