Skip to content

Commit

Permalink
Ver 0.1.0 release (#1)
Browse files Browse the repository at this point in the history
* add basic draft

* add append fo icmp and any

* draft tests

* working test for normalizaer

* add utils

* add validate unit tests

* fix tests and add test or duplicates

* add is_tcp and is_upp

* finish implementation. Work on lints

* add tasks tests

* work on pypi package

* fix repeated separator case

* updating return from methods

* Ver 0.1.0 release
  • Loading branch information
lvrfrc87 authored Mar 1, 2022
1 parent b6a5f30 commit 606612c
Show file tree
Hide file tree
Showing 12 changed files with 686 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .bandit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
skips: []
exclude_dirs:
- "./tests/"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
ignore = E501, W503, E402
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Ver. 0.1.0

- Initial release
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ARG PYTHON_VER

FROM python:${PYTHON_VER}-slim

RUN pip install --upgrade pip \
&& pip install poetry

WORKDIR /local
COPY pyproject.toml /local

RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
81 changes: 80 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,81 @@
# netprot
A system-independent network protocol manipulation library.
A system-indipendent network protocol manipulation and evaluation library.
`netprod` wants to be a library capable standardize and evaluate list of strings rappresenting Network Protocols. The idea is to provide a tool similar to `netaddr` that can help to enhance and simplify code logic wherever is required.

### Installation

```bash
pip3 install netprod
```

Package available [here](https://pypi.org/project/netprot/)

### HOW TO

First thing, we need to initialize an instance of `Netprod` class, passing as arguments a list of string - where each string should rappresent a network protocol and corresponding port. `separator` argument is also possible to pass it as kwarg and will be used to standardize our strings. By default, `separator` is equal to `/`

```python
>>> from netprot.netprot import Netprot
>>> my_list = ['tcp-443-https', 'UDP/53', 'ICMP', 'any', 'tcp/1024-1026', 'TCPP-80', 'tcp/443']
>>> my_protocols = Netprot(my_list, separator='/')
```

Once the instance of the class is created, we can call `standardize` method which will return a tuple containing pontential unlegal protocols and ports, duplicates - if any, and a standardize list of protocols and port.

```python
>>> my_protocols.standardize()
(['TCPP/80'], ['TCP/443'], ['ANY', 'ICMP', 'TCP/1024', 'TCP/1025', 'TCP/1026', 'TCP/443', 'UDP/53'])
```

As we can see, we have:

- Strings using the same `separator`.
- Trailing words such as `https` is removed as not needed
- Protocols defined as `tcp/1024-1026` are unpacked for each port in range defined
- Unlegal protocosl such as `TCPP/80` are removed
- Duplicates are also removed
- All strings are upper cases
- List is sorted
- `ICMP` and `ANY` are recognized as legal tring and paased through


`Netprod` not only standardize data, but also evaluate them. Let's have a look to the other methods

:warning:
List of protocols must be standardized first.

Let's check if the ports are part of well known range of ports (0 to 1024)

```python
>>> my_protocols.is_well_known()
(False, [False, False, True, False, False, True, True])
```

As we can see, some ports are failing to be lower than 1024, hence we return `False` plus a list of bools for each ports.

What about if we want to find those are `TCP`...

```python
>>> my_protocols.is_tcp()
(False, [False, False, True, True, True, True, False])
```

... or `UDP`?
```python
>>> my_protocols.is_udp()
(False, [False, False, False, False, False, False, True])
```

Great! What if we want figure out if our port and protocols are safe or not?
Let's define a list of safe - or unsafe - ports and protocols and paased them to `is_safe` or `is_unsafe` method.

```python
>>> my_safe_applications = ['TCP/443', 'UDP/53']
>>> my_protocols.is_safe(my_safe_applications)
[False, False, False, False, False, True, True]
>>> my_unsafe_applications = ['ICMP', 'ANY']
>>> my_protocols.is_unsafe(my_unsafe_applications)
[True, True, False, False, False, False, False]
```

And that's all, folks!
Empty file added netprot/__init__.py
Empty file.
197 changes: 197 additions & 0 deletions netprot/netprot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""Netprot: library for network protocols normalization and evaluation."""
import re
from collections import Counter


class Netprot:
"""Netprot Class implementation."""

def __init__(self, protocols, separator="/"):
"""__init__ method."""
# Validate protocols and protocols element data type.
if not isinstance(protocols, list) and not any(isinstance(element, str) for element in protocols):
raise TypeError(
"""Protocols must be a list of strings.
i.e --> ['TCP/443', 'UDP/53']"""
)
self.protocols = protocols
# Validate separator data type.
if separator and not isinstance(separator, str):
raise TypeError("Separator must be of type string. i.e. --> '/'")

self.separator = separator

@staticmethod
def _cosmetic(egress_list):
"""Standardize returned list."""
standardize_protocols = [protocol.upper() for protocol in egress_list]
standardize_protocols.sort()

return standardize_protocols

@staticmethod
def _cleaner(inggress_list):
"""Standardize argument list."""
return [protocol.lower().lstrip().rstrip() for protocol in inggress_list]

@staticmethod
def _find_duplicates(string):
elements = Counter(string)
return [key for key, value in elements.items() if value > 1]

def standardize(self):
"""Standardize list fo protocosl. Run normalize(), validate() and remove_duplicates()."""

def normalize():
"""Normalize list of strings containing protocol and port."""
normalized_protocols = list()
for protocol in self._cleaner(self.protocols):
# https://regex101.com/r/DyKeqr/1
result = re.search(r"\b([a-z]+)(\W|_)(\d+)(.)?(\d+)?(\w+)?", protocol)
if result:
# replace whatever separator with self.separator
protocol = protocol.replace(result.group(2), self.separator, 1)

splitted_protocol = protocol.split(self.separator)
if len(splitted_protocol) > 1:
# Expand TCP/1024-1026 --> TCP/1024, TCP/1025, TCP/1026
if result.group(4) and result.group(5):
try:
start = int(result.group(3))
end = int(result.group(5)) + 1
for port in range(start, end):
normalized_protocols.append(f"{protocol[0:3]}/{port}")
except ValueError:
continue
# Normalize TCP/443-HTTPS --> TCP/443
elif result.group(4) and result.group(6):
# Case where self.separator is used twice TCP/443/HTTPS
if self.separator in self._find_duplicates(protocol):
normalized_protocols.append(
f"{self.separator}".join(protocol.split(self.separator)[0:2])
)
else:
index = protocol.index(result.group(4))
normalized_protocols.append(protocol[:index])
else:
normalized_protocols.append(protocol)
# catch 'icmp' and 'any'
else:
normalized_protocols.append(protocol)
self.protocols = self._cosmetic(normalized_protocols)
return True

def validate():
"""Validate list of normalized protocols and ports."""
invalid_services = list()
protocols = self._cleaner(self.protocols)
for protocol in protocols:
if protocol not in ("icmp", "any"):
if protocol[3] == self.separator:
splitted_protocol = protocol.split(self.separator)
if splitted_protocol[0] not in ("tcp", "udp") and not 0 > int(splitted_protocol[1]) > 65535:
invalid_services.append(protocol)
else:
invalid_services.append(protocol)

if invalid_services:
for service in invalid_services:
protocols.remove(service)
self.protocols = self._cosmetic(protocols)
return self._cosmetic(invalid_services)

self.protocols = self._cosmetic(protocols)
return list()

def remove_duplicates():
"""Remove duplicated elements."""
setted_protocols = set()
duplicates = [
element for element in self.protocols if element in setted_protocols or setted_protocols.add(element)
]

self.protocols = self._cosmetic(list(setted_protocols))

if duplicates:
return duplicates
return list()

if normalize():
return (validate(), remove_duplicates(), self.protocols)

def is_well_known(self):
"""Evaluate port if lower than 1024."""
is_well_known = list()

for protocol in self._cleaner(self.protocols):
if protocol not in ("icmp", "any"):
port_number = int(protocol.split(self.separator)[-1])
if port_number <= 1024:
is_well_known.append(True)
else:
is_well_known.append(False)
else:
is_well_known.append(False)

if all(is_well_known):
return (True, is_well_known)
return (False, is_well_known)

def is_tcp(self):
"""Evaluate protocol if TCP."""
is_tcp = list()

for protocol in self._cleaner(self.protocols):
if protocol not in ("icmp", "any"):
prot = protocol.split(self.separator)[0]
if prot == "tcp":
is_tcp.append(True)
else:
is_tcp.append(False)
else:
is_tcp.append(False)

if all(is_tcp):
return (True, is_tcp)
return (False, is_tcp)

def is_udp(self):
"""Evaluate protocol if UDP."""
id_udp = list()

for protocol in self._cleaner(self.protocols):
if protocol not in ("icmp", "any"):
prot = protocol.split(self.separator)[0]
if prot == "udp":
id_udp.append(True)
else:
id_udp.append(False)
else:
id_udp.append(False)

if all(id_udp):
return (True, id_udp)
return (False, id_udp)

def is_safe(self, safe_list):
"""Evaluate port if is safe."""
result = list()
for element in self.protocols:
if element not in safe_list:
result.append(False)
else:
result.append(True)

return result

def is_unsafe(self, unsafe_list):
"""Evaluate port if is not safe."""
result = list()
for element in self.protocols:
if element in unsafe_list:
result.append(True)
else:
result.append(False)

return result
80 changes: 80 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
[tool.poetry]
name = "netprot"
version = "0.1.0"
description = "A system-indipendent network protocol manipulation and evaluation library."
authors = ["Federico Olivieri <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.7"

[tool.poetry.dev-dependencies]
pytest = "*"
requests_mock = "*"
black = "*"
pylint = "*"
pydocstyle = "*"
bandit = "*"
invoke = "*"
toml = "*"
flake8 = "*"


[tool.black]
line-length = 120
target-version = ['py37']
include = '\.pyi?$'
exclude = '''
(
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
| settings.py
)
'''

[tool.pylint.basic]
no-docstring-rgx="^(_|test_|Meta$)"

[tool.pylint.messages_control]
disable = """,
use-dict-literal,
use-list-literal,
line-too-long,
bad-continuation,
E5110,
too-many-nested-blocks,
"""

[tool.pylint.miscellaneous]
notes = """,
FIXME,
XXX,
"""


[tool.pydocstyle]
convention = "google"
inherit = false
match = "(?!__init__).*\\.py"
match-dir = "(?!tests|migrations|development)[^\\.].*"
add_ignore = "D212"

[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"

[tool.pytest.ini_options]
testpaths = [
"tests"
]
addopts = "-vv --doctest-modules"
Loading

0 comments on commit 606612c

Please sign in to comment.