-
Notifications
You must be signed in to change notification settings - Fork 368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Artifact definitions filter helper #1883
Changes from 17 commits
99d22df
a220538
f40837a
0b47f08
9f75e53
15ebd78
ae02c6e
fc73658
e432f5c
4652ba9
ba8a693
4e16ceb
d79edc3
8c01638
927cb28
f8004d7
6fe99d1
116b9fc
10c448a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Helper to create filters based on forensic artifact definitions.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
import logging | ||
|
||
from artifacts import definitions as artifact_types | ||
|
||
from dfvfs.helpers import file_system_searcher | ||
from dfwinreg import registry_searcher | ||
from plaso.engine import path_helper | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
class ArtifactDefinitionsFilterHelper(object): | ||
"""Helper to create filters based on artifact definitions. | ||
|
||
Builds extraction filters from forensic artifact definitions. | ||
|
||
For more information about Forensic Artifacts see: | ||
https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc | ||
""" | ||
|
||
ARTIFACT_FILTERS = 'ARTIFACT_FILTERS' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If these are only used by this class please prefix with an underscore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed several of these and added underscore to one only used in this class. |
||
_COMPATIBLE_DFWINREG_KEYS = ('HKEY_LOCAL_MACHINE') | ||
|
||
def __init__(self, artifacts_registry, artifacts, knowledge_base): | ||
"""Initializes an artifact definitions filter helper. | ||
|
||
Args: | ||
artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact | ||
definitions registry. | ||
artifacts (list[str]): artifact names to filter. | ||
path (str): path to a file that contains one or more forensic artifacts. | ||
knowledge_base (KnowledgeBase): contains information from the source | ||
data needed for filtering. | ||
""" | ||
super(ArtifactDefinitionsFilterHelper, self).__init__() | ||
self._artifacts_registry = artifacts_registry | ||
self._artifacts = artifacts | ||
self._knowledge_base = knowledge_base | ||
|
||
def BuildFindSpecs(self, environment_variables=None): | ||
"""Builds find specification from a forensic artifact definitions. | ||
|
||
Args: | ||
environment_variables (Optional[list[EnvironmentVariableArtifact]]): | ||
environment variables. | ||
""" | ||
find_specs = {} | ||
|
||
artifact_definitions = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this loop can be merged with the next loop and remove the need for the artifact_definitions list |
||
for artifact_filter in self._artifacts: | ||
if self._artifacts_registry.GetDefinitionByName(artifact_filter): | ||
artifact_definitions.append( | ||
self._artifacts_registry.GetDefinitionByName(artifact_filter)) | ||
|
||
for definition in artifact_definitions: | ||
for source in definition.sources: | ||
if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: | ||
for path_entry in source.paths: | ||
self.BuildFindSpecsFromFileArtifact( | ||
path_entry, source.separator, environment_variables, | ||
self._knowledge_base.user_accounts, find_specs) | ||
elif (source.type_indicator == | ||
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): | ||
keys = set(source.keys) | ||
for key_entry in keys: | ||
if self._CheckKeyCompatibility(key_entry): | ||
self.BuildFindSpecsFromRegistryArtifact( | ||
key_entry, find_specs) | ||
elif (source.type_indicator == | ||
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): | ||
# TODO: Handle Registry Values Once Supported in dfwinreg. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which dfwinreg issue is this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed log2timeline/dfwinreg#98 and added to TODO. |
||
# https://github.com/log2timeline/dfwinreg/issues/98 | ||
logging.warning(('Unable to handle Registry Value, extracting ' | ||
'key only: "{0:s}"').format(source.key_value_pairs)) | ||
|
||
for key_pair in source.key_value_pairs: | ||
keys = set() | ||
keys.add(key_pair.get('key')) | ||
for key_entry in keys: | ||
if self._CheckKeyCompatibility(key_entry): | ||
self.BuildFindSpecsFromRegistryArtifact(key_entry, find_specs) | ||
else: | ||
logging.warning(('Unable to handle artifact, plaso does not ' | ||
'support: "{0:s}"').format(source.type_indicator)) | ||
|
||
self._knowledge_base.SetValue(self.ARTIFACT_FILTERS, find_specs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why store these in the knowledgebase? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will use them in plaso/parsers/winreg.py, so I wanted to store them somewhere to access again. Any suggestions for a better place to store them? |
||
|
||
@classmethod | ||
def BuildFindSpecsFromFileArtifact( | ||
cls, path_entry, separator, environment_variables, user_accounts, | ||
find_specs): | ||
"""Builds find specifications from a file artifact definition. | ||
|
||
Args: | ||
path_entry (str): current file system path to add. | ||
separator (str): file system path segment separator. | ||
environment_variables list(str): environment variable attributes used to | ||
dynamically populate environment variables in key. | ||
user_accounts (list[str]): identified user accounts stored in the | ||
knowledge base. | ||
find_specs (dict[artifacts.artifact_types]): Dictionary containing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. returning a list of dfvfs.FindSpect will simplify the interface. |
||
find_specs. | ||
""" | ||
for glob_path in path_helper.PathHelper.ExpandRecursiveGlobs( | ||
path_entry, separator): | ||
for path in path_helper.PathHelper.ExpandUserHomeDirectoryPath( | ||
glob_path, user_accounts): | ||
if '%' in path: | ||
path = path_helper.PathHelper.ExpandWindowsPath( | ||
path, environment_variables) | ||
|
||
if not path.startswith(separator): | ||
logging.warning(( | ||
'The path filter must be defined as an absolute path: ' | ||
'"{0:s}"').format(path)) | ||
continue | ||
|
||
# Convert the path filters into a list of path segments and | ||
# strip the root path segment. | ||
path_segments = path.split(separator) | ||
|
||
# Remove initial root entry | ||
path_segments.pop(0) | ||
|
||
if not path_segments[-1]: | ||
logging.warning( | ||
'Empty last path segment in path filter: "{0:s}"'.format(path)) | ||
path_segments.pop(-1) | ||
|
||
try: | ||
find_spec = file_system_searcher.FindSpec( | ||
location_glob=path_segments, case_sensitive=False) | ||
except ValueError as exception: | ||
logging.error(( | ||
'Unable to build find spec for path: "{0:s}" with error: "{1!s}"' | ||
).format(path, exception)) | ||
continue | ||
if artifact_types.TYPE_INDICATOR_FILE not in find_specs: | ||
find_specs[artifact_types.TYPE_INDICATOR_FILE] = [] | ||
find_specs[artifact_types.TYPE_INDICATOR_FILE].append(find_spec) | ||
|
||
@classmethod | ||
def BuildFindSpecsFromRegistryArtifact(cls, key_entry, find_specs): | ||
"""Build find specifications from a Windows registry artifact type. | ||
|
||
Args: | ||
key_entry (str): Current file system key to add. | ||
find_specs dict[artifacts.artifact_types]: Dictionary containing | ||
find_specs. | ||
""" | ||
separator = '\\' | ||
for key in path_helper.PathHelper.ExpandRecursiveGlobs( | ||
key_entry, separator): | ||
if '%%' in key: | ||
logging.error(('Unable to expand path filter: "{0:s}"').format(key)) | ||
continue | ||
find_spec = registry_searcher.FindSpec(key_path_glob=key) | ||
if artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY not in find_specs: | ||
find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY] = [] | ||
|
||
find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].append( | ||
find_spec) | ||
|
||
@staticmethod | ||
def _CheckKeyCompatibility(key): | ||
"""Checks if a Windows Registry key is compatible with dfwinreg. | ||
|
||
Args: | ||
key (str): String key to to check for dfwinreg compatibility. | ||
|
||
Returns: | ||
(bool): True if key is compatible or False if not. | ||
""" | ||
if key.startswith( | ||
ArtifactDefinitionsFilterHelper._COMPATIBLE_DFWINREG_KEYS): | ||
return True | ||
logging.warning('Key "{0:s}", has a prefix that is not supported ' | ||
'by dfwinreg presently'.format(key)) | ||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,9 @@ | |
|
||
from __future__ import unicode_literals | ||
|
||
import logging | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. user engine.logger instead |
||
import re | ||
|
||
from dfvfs.lib import definitions as dfvfs_definitions | ||
|
||
from plaso.lib import py2to3 | ||
|
@@ -11,6 +14,8 @@ | |
class PathHelper(object): | ||
"""Class that implements the path helper.""" | ||
|
||
RECURSIVE_GLOB_LIMIT = 10 | ||
|
||
@classmethod | ||
def ExpandWindowsPath(cls, path, environment_variables): | ||
"""Expands a Windows path containing environment variables. | ||
|
@@ -23,6 +28,8 @@ def ExpandWindowsPath(cls, path, environment_variables): | |
Returns: | ||
str: expanded Windows path. | ||
""" | ||
#TODO: Add support for items such as %%users.localappdata%% | ||
|
||
if environment_variables is None: | ||
environment_variables = [] | ||
|
||
|
@@ -42,9 +49,19 @@ def ExpandWindowsPath(cls, path, environment_variables): | |
not path_segment.endswith('%')): | ||
continue | ||
|
||
lookup_key = path_segment.upper()[1:-1] | ||
check_for_drive_letter = False | ||
if path_segment.upper().startswith('%%ENVIRON_'): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. store upper case path in a variable seeing you're reusing it |
||
lookup_key = path_segment.upper()[10:-2] | ||
check_for_drive_letter = True | ||
else: | ||
lookup_key = path_segment.upper()[1:-1] | ||
path_segments[index] = lookup_table.get(lookup_key, path_segment) | ||
|
||
if check_for_drive_letter: | ||
# Remove the drive letter. | ||
if len(path_segments[index]) >= 2 and path_segments[index][1] == ':': | ||
_, _, path_segments[index] = path_segments[index].rpartition(':') | ||
|
||
return '\\'.join(path_segments) | ||
|
||
@classmethod | ||
|
@@ -133,3 +150,95 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): | |
location = location[len(mount_path):] | ||
|
||
return location | ||
|
||
@classmethod | ||
def ExpandUserHomeDirectoryPath(cls, path, user_accounts): | ||
"""Expands a path to contain all user home directories. | ||
|
||
Args: | ||
path (str): Windows path with environment variables. | ||
user_accounts (list[UserAccountArtifact]): user accounts. | ||
|
||
Returns: | ||
list [str]: paths returned for user accounts. | ||
""" | ||
|
||
user_paths = [] | ||
if path.upper().startswith('%%USERS.HOMEDIR%%'): | ||
regex = re.compile(re.escape('%%users.homedir%%')) | ||
for user_account in user_accounts: | ||
new_path = regex.sub(user_account.user_directory, path) | ||
# Remove the drive letter, if it exists. | ||
if len(new_path) > 2 and new_path[1] == ':': | ||
_, _, new_path = new_path.rpartition(':') | ||
user_paths.append(new_path) | ||
else: | ||
user_paths = [path] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. drive letter is not stripped here |
||
|
||
return user_paths | ||
|
||
@classmethod | ||
def ExpandRecursiveGlobs(cls, path, separator): | ||
"""Expands recursive like globs present in an artifact path. | ||
|
||
If a path ends in '**', with up to two optional digits such as '**10', | ||
the '**' will recursively match all files and zero or more directories | ||
from the specified path. The optional digits indicate the recursion depth. | ||
By default recursion depth is 10 directories. | ||
If the glob is followed by the specified separator, only directories and | ||
subdirectories will be matched. | ||
|
||
Args: | ||
path (str): Path to be expanded. | ||
separator (str): Delimiter for this path from its artifact definition. | ||
|
||
Returns: | ||
list[str]: String path expanded for each glob. | ||
""" | ||
glob_regex = r'(.*)?{0}\*\*(\d{{1,2}})?({0})?$'.format(re.escape(separator)) | ||
match = re.search(glob_regex, path) | ||
if match: | ||
skip_first = False | ||
if match.group(3): | ||
skip_first = True | ||
if match.group(2): | ||
iterations = int(match.group(2)) | ||
else: | ||
iterations = cls.RECURSIVE_GLOB_LIMIT | ||
logging.warning('Path "{0:s}" contains fully recursive glob, limiting ' | ||
'to 10 levels'.format(path)) | ||
paths = cls.AppendPathEntries( | ||
match.group(1), separator, iterations, skip_first) | ||
return paths | ||
else: | ||
return [path] | ||
|
||
@classmethod | ||
def AppendPathEntries(cls, path, separator, count, skip_first): | ||
"""Appends wildcard entries to end of path. | ||
|
||
Will append wildcard * to given path building a list of strings for "count" | ||
iterations, skipping the first directory if skip_first is true. | ||
|
||
Args: | ||
path (str): Path to append wildcards to. | ||
separator (str): Delimiter for this path from its artifact definition. | ||
count (int): Number of entries to be appended. | ||
skip_first (bool): Whether or not to skip first entry to append. | ||
|
||
Returns: | ||
list[str]: Paths that were expanded from the path with wildcards. | ||
""" | ||
paths = [] | ||
replacement = '{0}*'.format(separator) | ||
|
||
iteration = 0 | ||
while iteration < count: | ||
if skip_first and iteration == 0: | ||
path += replacement | ||
else: | ||
path += replacement | ||
paths.append(path) | ||
iteration += 1 | ||
|
||
return paths |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
# Artifact definitions. | ||
|
||
name: TestFiles | ||
doc: Test Doc | ||
sources: | ||
- type: FILE | ||
attributes: | ||
paths: ['%%environ_systemdrive%%\AUTHORS'] | ||
separator: '\' | ||
labels: [System] | ||
supported_os: [Windows] | ||
--- | ||
name: TestFiles2 | ||
doc: Test Doc2 | ||
sources: | ||
- type: FILE | ||
attributes: | ||
paths: | ||
- '%%environ_systemdrive%%\test_data\*.evtx' | ||
- '%%users.homedir%%\Documents\WindowsPowerShell\profile.ps1' | ||
- '\test_data\testdir\filter_*.txt' | ||
- '\does_not_exist\some_file_*.txt' | ||
- '\globbed\test\path\**\' | ||
- 'failing' | ||
separator: '\' | ||
labels: [System] | ||
supported_os: [Windows] | ||
--- | ||
name: TestRegistry | ||
doc: Test Registry Doc | ||
sources: | ||
- type: REGISTRY_KEY | ||
attributes: | ||
keys: ['HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\SecurityProviders\*'] | ||
supported_os: [Windows] | ||
--- | ||
name: TestRegistryKey | ||
doc: Test Registry Doc Key | ||
sources: | ||
- type: REGISTRY_KEY | ||
attributes: | ||
keys: | ||
- 'HKEY_LOCAL_MACHINE\System\ControlSet001\services\**\' | ||
- 'HKEY_LOCAL_MACHINE\System\ControlSet002\services\**\' | ||
- 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR' | ||
- 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR\**' | ||
supported_os: [Windows] | ||
--- | ||
name: TestRegistryValue | ||
doc: Test Registry Doc Value | ||
sources: | ||
- type: REGISTRY_VALUE | ||
attributes: | ||
key_value_pairs: | ||
- {key: 'HKEY_LOCAL_MACHINE\System\ControlSet001\Control\Session Manager', value: 'BootExecute'} | ||
- {key: 'HKEY_LOCAL_MACHINE\System\ControlSet002\Control\Session Manager', value: 'BootExecute'} | ||
supported_os: [Windows] | ||
--- | ||
name: TestFilesImageExport | ||
doc: Test Doc | ||
sources: | ||
- type: FILE | ||
attributes: | ||
paths: ['\a_directory\*_file'] | ||
separator: '\' | ||
labels: [System] | ||
supported_os: [Windows] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use
from plaso.engine import logger
instead of logging