Skip to content

Commit

Permalink
refactor: split spec_cleaner to multiple sub-modules
Browse files Browse the repository at this point in the history
This is a preparation for the support of IPv6 and MAC address
obfuscation.  By splitting it to multiple sub-modules, it will be easy
for adding new modules, e.g. IPv6 and MAC obfuscation.
Jira: RHINENG-15077

- replace the `get_obfuscate_functions` with a new keyword argument
  `width`, for specs that need to keep the original words width, this
  `width` needs to be set by hand before content cleaning.  In the
  future, it will be moved to the RegistryPoint for user to set by hand
  per specs.
- the processing of "password" and "keyword" is kind of "obfuscation"
  which replace the potential `password value` and specified `keywords`
  to kind of fixed strings/words, so moved it to the obfuscation module.
  User can exclude it by specifying it with, e.g.
  "no_obfuscate=['password']" while adding its RegistryPoint in
  `insights.specs.Specs`.
  Jira: RHINENG-14756

Signed-off-by: Xiangce Liu <[email protected]>
  • Loading branch information
xiangce committed Jan 10, 2025
1 parent 340d7a6 commit 6476011
Show file tree
Hide file tree
Showing 38 changed files with 1,710 additions and 1,358 deletions.
68 changes: 66 additions & 2 deletions docs/api_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,77 @@ insights.core.spec_factory
:undoc-members:

insights.core.taglang
--------------------------
---------------------

.. automodule:: insights.core.taglang
:members:
:show-inheritance:
:undoc-members:

insights.cleaner
----------------

.. automodule:: insights.cleaner
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.filters
------------------------

.. automodule:: insights.cleaner.filters
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.hostname
-------------------------

.. automodule:: insights.cleaner.hostname
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.ip
-------------------

.. automodule:: insights.cleaner.ip
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.keyword
------------------------

.. automodule:: insights.cleaner.keyword
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.mac
--------------------

.. automodule:: insights.cleaner.mac
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.password
-------------------------

.. automodule:: insights.cleaner.password
:members:
:show-inheritance:
:undoc-members:

insights.cleaner.pattern
------------------------

.. automodule:: insights.cleaner.pattern
:members:
:show-inheritance:
:undoc-members:

insights.parsers
----------------

Expand Down Expand Up @@ -208,4 +272,4 @@ insights
.. automodule:: insights.collect
:members: default_manifest, collect
:show-inheritance:
:undoc-members:
:undoc-members:
218 changes: 218 additions & 0 deletions insights/cleaner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
Clean Specs (files/commands)
============================
The following modules are provided in the Cleaner and can be applied to the
specs during collection according to the user configuration and specs setting.
- Redaction (patterns redaction)
This is a must-be-done operation to all the collected specs. A `no_redact`
option is available to specs, if it's surely contains non-security
information, e.g. the `machine-id` spec.
- Filtering
Filter lines as per the allow list got from the `filters.yaml`. The
`filtering` can only be applied when `allowlist` is available (not None) for
the spec.
- Obfuscation (IPv4, [IPv6], Hostname, MAC, Password, Keywords)
Obfuscate lines in spec content according to the user configuration and
specs requirement. The `no_obfuscate` can be used to exclude obfuscation
target from the obfuscation. Currently, the supported obfuscation target
are:
* hostname
* ip (ipv4)
* ipv6
* keyword
* mac
* password
"""

import logging
import json
import os
import six

from insights.cleaner.filters import AllowFilter
from insights.cleaner.hostname import Hostname
from insights.cleaner.ip import IPv4 # IPv6
from insights.cleaner.keyword import Keyword

# from insights.cleaner.mac import Mac
from insights.cleaner.password import Password
from insights.cleaner.pattern import Pattern
from insights.cleaner.utilities import write_report
from insights.util.hostname import determine_hostname
from insights.util.posix_regex import replace_posix

logger = logging.getLogger(__name__)
DEFAULT_OBFUSCATIONS = {
'hostname',
'ip', # ipv4
'ipv6',
'keyword',
'mac',
'password',
}


class Cleaner(object):
def __init__(self, config, rm_conf, fqdn=None):
self.report_dir = '/tmp' # FIXME
self.rhsm_facts_file = getattr(
config, 'rhsm_facts_file', os.path.join(self.report_dir, 'insights-client.facts')
)
# Handle User Configuration
rm_conf = rm_conf or {}
exclude = rm_conf.get('patterns', [])
regex = False
if isinstance(exclude, dict) and exclude.get('regex'):
exclude = [r'%s' % replace_posix(i) for i in exclude['regex']]
regex = True
# - Pattern redaction and allow-list filter
self.redact = {
'pattern': Pattern(exclude, regex) if exclude else None,
'allow_filter': AllowFilter(),
}
# - Keyword and Password replacement
keywords = rm_conf.get('keywords')
self.obfuscate = {
'keyword': Keyword(keywords) if keywords else None,
'password': Password(),
}

self.fqdn = fqdn if fqdn else determine_hostname()
if config and config.obfuscate:
# - IPv4 obfuscation
self.obfuscate.update(ip=IPv4())
# # - IPv6 obfuscation
# self.obfuscate.update(ipv6=IPv6()) if config.obfuscate_ipv6 else None
# - Hostname obfuscation
(
self.obfuscate.update(hostname=Hostname(self.fqdn))
if config.obfuscate_hostname
else None
)
# # - MAC obfuscation
# self.obfuscate.update(mac=Mac()) if config.obfuscate_mac else None

def clean_content(self, lines, no_obfuscate=None, no_redact=False, allowlist=None, width=False):
"""
Clean lines one by one according to the configuration, the cleaned
lines will be returned.
"""

def _clean_line(line):
for parser, kwargs in parsers:
line = parser.parse_line(line, **kwargs)
return line

# List of parsers to be applied with Order
parsers = list()
# 1. Redact when NO "no_redact=True" is set
if self.redact['pattern'] and not no_redact:
parsers.append((self.redact['pattern'], {})) if not no_redact else None
# 2. Filter as per allowlist got from add_filter
(
parsers.append((self.redact['allow_filter'], {'allowlist': allowlist}))
if allowlist is not None
else None
)
# 3. Obfuscation entries
# - Hostname
# - IPv4
# - IPv6
# - Keyword
# - Mac
# - Password
no_obfuscate.append('ipv6') if no_obfuscate and 'ip' in no_obfuscate else None
for obf in set(self.obfuscate.keys()) - set(no_obfuscate or []):
if self.obfuscate[obf]:
parsers.append((self.obfuscate[obf], {'width': width}))

# handle single string
if not isinstance(lines, list):
return _clean_line(lines)

result = []
for line in lines:
line = _clean_line(line)
result.append(line) if line is not None else None
if result and any(l for l in result):
# When there are some lines Truth
return result
# All lines blank
return []

def clean_file(self, _file, no_obfuscate=None, no_redact=False, allowlist=None):
"""
Clean a file according to the configuration, the file will be updated
directly with the cleaned content.
"""
logger.debug('Cleaning %s ...' % _file)

if os.path.exists(_file) and not os.path.islink(_file):
# Process the file
raw_data = content = None
try:
with open(_file, 'r') as fh:
raw_data = fh.readlines()
content = self.clean_content(
raw_data,
no_obfuscate=no_obfuscate,
no_redact=no_redact,
allowlist=allowlist,
width=_file.endswith("netstat_-neopa"),
)
except Exception as e: # pragma: no cover
logger.warning(e)
raise Exception("Error: Cannot Open File for Cleaning: %s" % _file)
# Store it
try:
if raw_data:
if content:
with open(_file, 'wb') as fh:
for line in content:
fh.write(line.encode('utf-8') if six.PY3 else line)
else:
# Remove Empty file
logger.debug('Removing %s, as it\'s empty after cleaning' % _file)
os.remove(_file)
except Exception as e: # pragma: no cover
logger.warning(e)
raise Exception("Error: Cannot Write to File: %s" % _file)

def generate_rhsm_facts(self):
logger.info('Writing RHSM facts to %s ...', self.rhsm_facts_file)

hostname = self.obfuscate.get('hostname')
hn_mapping = hostname.mapping() if hostname else []

keyword = self.obfuscate.get('keyword')
kw_mapping = keyword.mapping() if keyword else []

ipv4 = self.obfuscate.get('ip')
ipv4_mapping = ipv4.mapping() if ipv4 else []

facts = {
'insights_client.hostname': self.fqdn,
'insights_client.obfuscate_ip_enabled': 'ip' in self.obfuscate,
# 'insights_client.obfuscate_ipv6_enabled': 'ipv6' in self.obfuscate,
# 'insights_client.obfuscate_mac_enabled': 'mac' in self.obfuscate,
'insights_client.obfuscate_hostname_enabled': 'hostname' in self.obfuscate,
'insights_client.obfuscated_ipv4': json.dumps(ipv4_mapping),
# 'insights_client.obfuscated_ipv6': json.dumps(),
# 'insights_client.obfuscated_mac': json.dumps(),
'insights_client.obfuscated_keyword': json.dumps(kw_mapping),
'insights_client.obfuscated_hostname': json.dumps(hn_mapping),
}

write_report(facts, self.rhsm_facts_file)

def generate_report(self, archive_name):
# Always generate the rhsm.facts files
self.generate_rhsm_facts()
# Generate CSV reports accordingly
for parser in list(self.redact.values()) + list(self.obfuscate.values()):
if parser:
parser.generate_report(self.report_dir, archive_name)
34 changes: 34 additions & 0 deletions insights/cleaner/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Filtering
=========
"""

import logging

logger = logging.getLogger(__name__)


class AllowFilter(object):
"""
Class for filtering per allow list.
"""

def parse_line(self, line, **kwargs):
# filter line as per the allow list specified by plugins
if not line:
return line
allowlist = kwargs.get('allowlist', {})
if allowlist:
for a_key in list(allowlist.keys()):
# keep line when any filter match
# FIXME:
# Considering performance, didn't handle multiple filters in one same line
if a_key in line:
allowlist[a_key] -= 1
# stop checking it when enough lines contain the key were found
allowlist.pop(a_key) if allowlist[a_key] == 0 else None
return line
# discard line when none filters found

def generate_report(self, report_dir, archive_name):
pass # pragma: no cover
Loading

0 comments on commit 6476011

Please sign in to comment.