From 606612c427c0c80c9e787e1023a3859dc358e04e Mon Sep 17 00:00:00 2001 From: Federico87 <15066806+lvrfrc87@users.noreply.github.com> Date: Tue, 1 Mar 2022 15:21:06 +0100 Subject: [PATCH] Ver 0.1.0 release (#1) * add basic draft * add append fo icmp and any * draft tests * working test for normalizaer * add utils * add validate unit tests * fix tests and add test or duplicates * add is_tcp and is_upp * finish implementation. Work on lints * add tasks tests * work on pypi package * fix repeated separator case * updating return from methods * Ver 0.1.0 release --- .bandit.yml | 4 + .flake8 | 2 + CHANGELOG.rst | 3 + Dockerfile | 12 +++ README.md | 81 ++++++++++++++- netprot/__init__.py | 0 netprot/netprot.py | 197 +++++++++++++++++++++++++++++++++++++ pyproject.toml | 80 +++++++++++++++ setup.py | 13 +++ tasks.py | 157 +++++++++++++++++++++++++++++ tests/conftest.py | 22 +++++ tests/unit/test_netprot.py | 116 ++++++++++++++++++++++ 12 files changed, 686 insertions(+), 1 deletion(-) create mode 100644 .bandit.yml create mode 100644 .flake8 create mode 100644 CHANGELOG.rst create mode 100644 Dockerfile create mode 100644 netprot/__init__.py create mode 100644 netprot/netprot.py create mode 100644 pyproject.toml create mode 100644 setup.py create mode 100644 tasks.py create mode 100644 tests/conftest.py create mode 100644 tests/unit/test_netprot.py diff --git a/.bandit.yml b/.bandit.yml new file mode 100644 index 0000000..1bfcaf8 --- /dev/null +++ b/.bandit.yml @@ -0,0 +1,4 @@ +--- +skips: [] +exclude_dirs: + - "./tests/" diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..7550949 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = E501, W503, E402 diff --git a/CHANGELOG.rst b/CHANGELOG.rst new file mode 100644 index 0000000..4c6b878 --- /dev/null +++ b/CHANGELOG.rst @@ -0,0 +1,3 @@ +# Ver. 0.1.0 + +- Initial release \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..25430fe --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +ARG PYTHON_VER + +FROM python:${PYTHON_VER}-slim + +RUN pip install --upgrade pip \ + && pip install poetry + +WORKDIR /local +COPY pyproject.toml /local + +RUN poetry config virtualenvs.create false \ + && poetry install --no-interaction --no-ansi diff --git a/README.md b/README.md index 47b06cf..58da14f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,81 @@ # netprot -A system-independent network protocol manipulation library. +A system-indipendent network protocol manipulation and evaluation library. +`netprod` wants to be a library capable standardize and evaluate list of strings rappresenting Network Protocols. The idea is to provide a tool similar to `netaddr` that can help to enhance and simplify code logic wherever is required. + +### Installation + +```bash +pip3 install netprod +``` + +Package available [here](https://pypi.org/project/netprot/) + +### HOW TO + +First thing, we need to initialize an instance of `Netprod` class, passing as arguments a list of string - where each string should rappresent a network protocol and corresponding port. `separator` argument is also possible to pass it as kwarg and will be used to standardize our strings. By default, `separator` is equal to `/` + +```python +>>> from netprot.netprot import Netprot +>>> my_list = ['tcp-443-https', 'UDP/53', 'ICMP', 'any', 'tcp/1024-1026', 'TCPP-80', 'tcp/443'] +>>> my_protocols = Netprot(my_list, separator='/') +``` + +Once the instance of the class is created, we can call `standardize` method which will return a tuple containing pontential unlegal protocols and ports, duplicates - if any, and a standardize list of protocols and port. + +```python +>>> my_protocols.standardize() +(['TCPP/80'], ['TCP/443'], ['ANY', 'ICMP', 'TCP/1024', 'TCP/1025', 'TCP/1026', 'TCP/443', 'UDP/53']) +``` + +As we can see, we have: + +- Strings using the same `separator`. +- Trailing words such as `https` is removed as not needed +- Protocols defined as `tcp/1024-1026` are unpacked for each port in range defined +- Unlegal protocosl such as `TCPP/80` are removed +- Duplicates are also removed +- All strings are upper cases +- List is sorted +- `ICMP` and `ANY` are recognized as legal tring and paased through + + +`Netprod` not only standardize data, but also evaluate them. Let's have a look to the other methods + +:warning: +List of protocols must be standardized first. + +Let's check if the ports are part of well known range of ports (0 to 1024) + +```python +>>> my_protocols.is_well_known() +(False, [False, False, True, False, False, True, True]) +``` + +As we can see, some ports are failing to be lower than 1024, hence we return `False` plus a list of bools for each ports. + +What about if we want to find those are `TCP`... + +```python +>>> my_protocols.is_tcp() +(False, [False, False, True, True, True, True, False]) +``` + +... or `UDP`? +```python +>>> my_protocols.is_udp() +(False, [False, False, False, False, False, False, True]) +``` + +Great! What if we want figure out if our port and protocols are safe or not? +Let's define a list of safe - or unsafe - ports and protocols and paased them to `is_safe` or `is_unsafe` method. + +```python +>>> my_safe_applications = ['TCP/443', 'UDP/53'] +>>> my_protocols.is_safe(my_safe_applications) +[False, False, False, False, False, True, True] +>>> my_unsafe_applications = ['ICMP', 'ANY'] +>>> my_protocols.is_unsafe(my_unsafe_applications) +[True, True, False, False, False, False, False] +``` + +And that's all, folks! \ No newline at end of file diff --git a/netprot/__init__.py b/netprot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/netprot/netprot.py b/netprot/netprot.py new file mode 100644 index 0000000..c8a062a --- /dev/null +++ b/netprot/netprot.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +"""Netprot: library for network protocols normalization and evaluation.""" +import re +from collections import Counter + + +class Netprot: + """Netprot Class implementation.""" + + def __init__(self, protocols, separator="/"): + """__init__ method.""" + # Validate protocols and protocols element data type. + if not isinstance(protocols, list) and not any(isinstance(element, str) for element in protocols): + raise TypeError( + """Protocols must be a list of strings. + i.e --> ['TCP/443', 'UDP/53']""" + ) + self.protocols = protocols + # Validate separator data type. + if separator and not isinstance(separator, str): + raise TypeError("Separator must be of type string. i.e. --> '/'") + + self.separator = separator + + @staticmethod + def _cosmetic(egress_list): + """Standardize returned list.""" + standardize_protocols = [protocol.upper() for protocol in egress_list] + standardize_protocols.sort() + + return standardize_protocols + + @staticmethod + def _cleaner(inggress_list): + """Standardize argument list.""" + return [protocol.lower().lstrip().rstrip() for protocol in inggress_list] + + @staticmethod + def _find_duplicates(string): + elements = Counter(string) + return [key for key, value in elements.items() if value > 1] + + def standardize(self): + """Standardize list fo protocosl. Run normalize(), validate() and remove_duplicates().""" + + def normalize(): + """Normalize list of strings containing protocol and port.""" + normalized_protocols = list() + for protocol in self._cleaner(self.protocols): + # https://regex101.com/r/DyKeqr/1 + result = re.search(r"\b([a-z]+)(\W|_)(\d+)(.)?(\d+)?(\w+)?", protocol) + if result: + # replace whatever separator with self.separator + protocol = protocol.replace(result.group(2), self.separator, 1) + + splitted_protocol = protocol.split(self.separator) + if len(splitted_protocol) > 1: + # Expand TCP/1024-1026 --> TCP/1024, TCP/1025, TCP/1026 + if result.group(4) and result.group(5): + try: + start = int(result.group(3)) + end = int(result.group(5)) + 1 + for port in range(start, end): + normalized_protocols.append(f"{protocol[0:3]}/{port}") + except ValueError: + continue + # Normalize TCP/443-HTTPS --> TCP/443 + elif result.group(4) and result.group(6): + # Case where self.separator is used twice TCP/443/HTTPS + if self.separator in self._find_duplicates(protocol): + normalized_protocols.append( + f"{self.separator}".join(protocol.split(self.separator)[0:2]) + ) + else: + index = protocol.index(result.group(4)) + normalized_protocols.append(protocol[:index]) + else: + normalized_protocols.append(protocol) + # catch 'icmp' and 'any' + else: + normalized_protocols.append(protocol) + self.protocols = self._cosmetic(normalized_protocols) + return True + + def validate(): + """Validate list of normalized protocols and ports.""" + invalid_services = list() + protocols = self._cleaner(self.protocols) + for protocol in protocols: + if protocol not in ("icmp", "any"): + if protocol[3] == self.separator: + splitted_protocol = protocol.split(self.separator) + if splitted_protocol[0] not in ("tcp", "udp") and not 0 > int(splitted_protocol[1]) > 65535: + invalid_services.append(protocol) + else: + invalid_services.append(protocol) + + if invalid_services: + for service in invalid_services: + protocols.remove(service) + self.protocols = self._cosmetic(protocols) + return self._cosmetic(invalid_services) + + self.protocols = self._cosmetic(protocols) + return list() + + def remove_duplicates(): + """Remove duplicated elements.""" + setted_protocols = set() + duplicates = [ + element for element in self.protocols if element in setted_protocols or setted_protocols.add(element) + ] + + self.protocols = self._cosmetic(list(setted_protocols)) + + if duplicates: + return duplicates + return list() + + if normalize(): + return (validate(), remove_duplicates(), self.protocols) + + def is_well_known(self): + """Evaluate port if lower than 1024.""" + is_well_known = list() + + for protocol in self._cleaner(self.protocols): + if protocol not in ("icmp", "any"): + port_number = int(protocol.split(self.separator)[-1]) + if port_number <= 1024: + is_well_known.append(True) + else: + is_well_known.append(False) + else: + is_well_known.append(False) + + if all(is_well_known): + return (True, is_well_known) + return (False, is_well_known) + + def is_tcp(self): + """Evaluate protocol if TCP.""" + is_tcp = list() + + for protocol in self._cleaner(self.protocols): + if protocol not in ("icmp", "any"): + prot = protocol.split(self.separator)[0] + if prot == "tcp": + is_tcp.append(True) + else: + is_tcp.append(False) + else: + is_tcp.append(False) + + if all(is_tcp): + return (True, is_tcp) + return (False, is_tcp) + + def is_udp(self): + """Evaluate protocol if UDP.""" + id_udp = list() + + for protocol in self._cleaner(self.protocols): + if protocol not in ("icmp", "any"): + prot = protocol.split(self.separator)[0] + if prot == "udp": + id_udp.append(True) + else: + id_udp.append(False) + else: + id_udp.append(False) + + if all(id_udp): + return (True, id_udp) + return (False, id_udp) + + def is_safe(self, safe_list): + """Evaluate port if is safe.""" + result = list() + for element in self.protocols: + if element not in safe_list: + result.append(False) + else: + result.append(True) + + return result + + def is_unsafe(self, unsafe_list): + """Evaluate port if is not safe.""" + result = list() + for element in self.protocols: + if element in unsafe_list: + result.append(True) + else: + result.append(False) + + return result diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f4eb524 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,80 @@ +[tool.poetry] +name = "netprot" +version = "0.1.0" +description = "A system-indipendent network protocol manipulation and evaluation library." +authors = ["Federico Olivieri "] + +[tool.poetry.dependencies] +python = "^3.7" + +[tool.poetry.dev-dependencies] +pytest = "*" +requests_mock = "*" +black = "*" +pylint = "*" +pydocstyle = "*" +bandit = "*" +invoke = "*" +toml = "*" +flake8 = "*" + + +[tool.black] +line-length = 120 +target-version = ['py37'] +include = '\.pyi?$' +exclude = ''' +( + /( + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + )/ + | settings.py + +) +''' + +[tool.pylint.basic] +no-docstring-rgx="^(_|test_|Meta$)" + +[tool.pylint.messages_control] +disable = """, + use-dict-literal, + use-list-literal, + line-too-long, + bad-continuation, + E5110, + too-many-nested-blocks, + """ + +[tool.pylint.miscellaneous] +notes = """, + FIXME, + XXX, + """ + + +[tool.pydocstyle] +convention = "google" +inherit = false +match = "(?!__init__).*\\.py" +match-dir = "(?!tests|migrations|development)[^\\.].*" +add_ignore = "D212" + +[build-system] +requires = ["poetry>=0.12"] +build-backend = "poetry.masonry.api" + +[tool.pytest.ini_options] +testpaths = [ + "tests" +] +addopts = "-vv --doctest-modules" diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c75b706 --- /dev/null +++ b/setup.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 + +from distutils.core import setup + +setup( + name="netprot", + version="0.1.0", + description="A system-indipendent network protocol manipulation and evaluation library.", + author="Federico Olivieri", + author_email="lvrfrc87@mail.com", + url="https://github.com/lvrfrc87/netprot", + packages=["netprot"], +) diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..88fc721 --- /dev/null +++ b/tasks.py @@ -0,0 +1,157 @@ +"""Tasks for use with Invoke.""" +import os +import sys +from distutils.util import strtobool +from invoke import task + +try: + import toml +except ImportError: + sys.exit("Please make sure to `pip install toml` or enable the Poetry shell and run `poetry install`.") + + +def is_truthy(arg): + """Convert "truthy" strings into Booleans. + Examples: + >>> is_truthy('yes') + True + Args: + arg (str): Truthy string (True values are y, yes, t, true, on and 1; false values are n, no, + f, false, off and 0. Raises ValueError if val is anything else. + """ + if isinstance(arg, bool): + return arg + return bool(strtobool(arg)) + + +PYPROJECT_CONFIG = toml.load("pyproject.toml") +TOOL_CONFIG = PYPROJECT_CONFIG["tool"]["poetry"] + +# Can be set to a separate Python version to be used for launching or building image +PYTHON_VER = os.getenv("PYTHON_VER", "3.7") +# Name of the docker image/image +IMAGE_NAME = os.getenv("IMAGE_NAME", TOOL_CONFIG["name"]) +# Tag for the image +IMAGE_VER = os.getenv("IMAGE_VER", f"{TOOL_CONFIG['version']}-py{PYTHON_VER}") +# Gather current working directory for Docker commands +PWD = os.getcwd() +# Local or Docker execution provide "local" to run locally without docker execution +INVOKE_LOCAL = is_truthy(os.getenv("INVOKE_LOCAL", False)) # pylint: disable=W1508 + + +def run_cmd(context, exec_cmd, local=INVOKE_LOCAL): + """Wrapper to run the invoke task commands. + Args: + context ([invoke.task]): Invoke task object. + exec_cmd ([str]): Command to run. + local (bool): Define as `True` to execute locally + Returns: + result (obj): Contains Invoke result from running task. + """ + if is_truthy(local): + print(f"LOCAL - Running command {exec_cmd}") + result = context.run(exec_cmd, pty=True) + else: + print(f"DOCKER - Running command: {exec_cmd} container: {IMAGE_NAME}:{IMAGE_VER}") + result = context.run(f"docker run -it -v {PWD}:/local {IMAGE_NAME}:{IMAGE_VER} sh -c '{exec_cmd}'", pty=True) + + return result + + +@task( + help={ + "cache": "Whether to use Docker's cache when building images (default enabled)", + "force_rm": "Always remove intermediate images", + "hide": "Suppress output from Docker", + } +) +def build(context, cache=True, force_rm=False, hide=False): + """Build a Docker image.""" + print(f"Building image {IMAGE_NAME}:{IMAGE_VER}") + command = f"docker build --tag {IMAGE_NAME}:{IMAGE_VER} --build-arg PYTHON_VER={PYTHON_VER} -f Dockerfile ." + + if not cache: + command += " --no-cache" + if force_rm: + command += " --force-rm" + + result = context.run(command, hide=hide) + if result.exited != 0: + print(f"Failed to build image {IMAGE_NAME}:{IMAGE_VER}\nError: {result.stderr}") + + +@task +def clean(context): + """Remove the project specific image.""" + print(f"Attempting to forcefully remove image {IMAGE_NAME}:{IMAGE_VER}") + context.run(f"docker rmi {IMAGE_NAME}:{IMAGE_VER} --force") + print(f"Successfully removed image {IMAGE_NAME}:{IMAGE_VER}") + + +@task +def rebuild(context): + """Clean the Docker image and then rebuild without using cache.""" + clean(context) + build(context, cache=False) + + +@task(help={"local": "Run locally or within the Docker container"}) +def pytest(context, local=INVOKE_LOCAL): + """Run pytest test cases.""" + exec_cmd = "pytest" + run_cmd(context, exec_cmd, local) + + +@task(help={"local": "Run locally or within the Docker container"}) +def black(context, path=".", local=INVOKE_LOCAL): + """Run black to check that Python files adherence to black standards.""" + exec_cmd = f"black {path}" + run_cmd(context, exec_cmd, local) + + +@task(help={"local": "Run locally or within the Docker container"}) +def flake8(context, path=".", local=INVOKE_LOCAL): + """Run flake8 code analysis.""" + exec_cmd = f"flake8 {path}" + run_cmd(context, exec_cmd, local) + + +@task(help={"local": "Run locally or within the Docker container"}) +def pylint(context, path=".", local=INVOKE_LOCAL): + """Run pylint code analysis.""" + exec_cmd = f'find {path} -name "*.py" | xargs pylint' + run_cmd(context, exec_cmd, local) + + +@task(help={"local": "Run locally or within the Docker container"}) +def pydocstyle(context, path=".", local=INVOKE_LOCAL): + """Run pydocstyle to validate docstring formatting adheres to NTC defined standards.""" + exec_cmd = f"pydocstyle {path}" + run_cmd(context, exec_cmd, local) + + +@task(help={"local": "Run locally or within the Docker container"}) +def bandit(context, path=".", local=INVOKE_LOCAL): + """Run bandit to validate basic static code security analysis.""" + exec_cmd = f"bandit --recursive ./{path} --configfile .bandit.yml" + run_cmd(context, exec_cmd, local) + + +@task +def cli(context): + """Enter the image to perform troubleshooting or dev work.""" + dev = f"docker run -it -v {PWD}:/local {IMAGE_NAME}:{IMAGE_VER} /bin/bash" + context.run(f"{dev}", pty=True) + + +@task(help={"local": "Run locally or within the Docker container"}) +def tests(context, path=".", local=INVOKE_LOCAL): + """Run all tests for this repository.""" + black(context, path, local) + flake8(context, path, local) + pylint(context, path, local) + pydocstyle(context, path, local) + bandit(context, path, local) + pytest(context, local) + + print("All tests have passed!") diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a7aebc5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,22 @@ +import pytest + + +@pytest.fixture +def mock(): + mock_data = [ + "TCP-443", + "UDPC-5353", + "udp-53", + "UDP=53", + "TCP-443", + "UDP%65535", + "TCP/65636", + "tcpudp$443-HTTPS", + "tcp-1024-1026", + "ICMP-1223-1224", + "TCP/443-HTTPS", + "icmp", + "ANY", + "TCP-443/HTTPS", + ] + return mock_data diff --git a/tests/unit/test_netprot.py b/tests/unit/test_netprot.py new file mode 100644 index 0000000..a1710b7 --- /dev/null +++ b/tests/unit/test_netprot.py @@ -0,0 +1,116 @@ +import os +import sys +import pytest + +sys.path.append(f"{os.path.abspath(os.getcwd())}/netprot") + +from netprot import Netprot + +expected_output = ( + ["ICM/1223", "ICM/1224", "TCPUDP/443", "UDPC/5353"], + ["TCP/443", "TCP/443", "TCP/443", "UDP/53"], + [ + "ANY", + "ICMP", + "TCP/1024", + "TCP/1025", + "TCP/1026", + "TCP/443", + "TCP/65636", + "UDP/53", + "UDP/65535", + ], +) +@pytest.mark.parametrize("expected_output", [expected_output]) +def test_evaluate_standardize(expected_output, mock): + test = Netprot(mock) + assert test.standardize() == expected_output[0:3] + + +expected_output = ( + False, + [False, False, True, False, False, True, False, True, False], + [ + "ANY", + "ICMP", + "TCP/1024", + "TCP/1025", + "TCP/1026", + "TCP/443", + "TCP/65636", + "UDP/53", + "UDP/65535", + ], +) +@pytest.mark.parametrize("expected_output", [expected_output]) +def test_evaluate_is_well_known(expected_output, mock): + test = Netprot(mock) + test.standardize() + assert test.is_well_known() == expected_output[0:2] + assert test.protocols == expected_output[-1] + + +expected_output = ( + False, + [False, False, True, True, True, True, True, False, False], + [ + "ANY", + "ICMP", + "TCP/1024", + "TCP/1025", + "TCP/1026", + "TCP/443", + "TCP/65636", + "UDP/53", + "UDP/65535", + ], +) +@pytest.mark.parametrize("expected_output", [expected_output]) +def test_evaluate_is_tcp(expected_output, mock): + test = Netprot(mock) + test.standardize() + assert test.is_tcp() == expected_output[0:2] + assert test.protocols == expected_output[-1] + + +expected_output = ( + False, + [False, False, False, False, False, False, False, True, True], + [ + "ANY", + "ICMP", + "TCP/1024", + "TCP/1025", + "TCP/1026", + "TCP/443", + "TCP/65636", + "UDP/53", + "UDP/65535", + ], +) +@pytest.mark.parametrize("expected_output", [expected_output]) +def test_evaluate_is_udp(expected_output, mock): + test = Netprot(mock) + test.standardize() + assert test.is_udp() == expected_output[0:2] + assert test.protocols == expected_output[-1] + + +expected_output = ( + [False, False, False, False, False, True, False, False, False] +) +@pytest.mark.parametrize("expected_output", [expected_output]) +def test_evaluate_is_safe(expected_output, mock): + test = Netprot(mock) + test.standardize() + assert test.is_safe(safe_list=["TCP/443", "TCP/22"]) == expected_output + + +expected_output = ( + [False, False, False, False, False, False, False, False, False] +) +@pytest.mark.parametrize("expected_output", [expected_output]) +def test_evaluate_is_unsafe(expected_output, mock): + test = Netprot(mock) + test.standardize() + assert test.is_unsafe(unsafe_list=["TCP/53"]) == expected_output