diff --git a/config/end-to-end.ini b/config/end-to-end.ini index edf1cb762f..11f7f9d3ce 100644 --- a/config/end-to-end.ini +++ b/config/end-to-end.ini @@ -21,6 +21,11 @@ tagging_file=data/tag_windows.txt case=image_export source=test_data/image.qcow2 +[image_export_with_artifact_filters] +case=image_export +artifact_filters=data/windows_artifact_filters +source=test_data/image.qcow2 + [image_export_with_filter_file] case=image_export filter_file=data/filter_windows.txt diff --git a/plaso/cli/extraction_tool.py b/plaso/cli/extraction_tool.py index 3ad1fc1e8f..344c441cb8 100644 --- a/plaso/cli/extraction_tool.py +++ b/plaso/cli/extraction_tool.py @@ -75,6 +75,8 @@ def _CreateProcessingConfiguration(self, knowledge_base): """ # TODO: pass preferred_encoding. configuration = configurations.ProcessingConfiguration() + configuration.artifact_filters = self._artifact_filters + configuration.artifacts_registry = self._artifacts_registry configuration.credentials = self._credential_configurations configuration.debug_output = self._debug_mode configuration.event_extraction.text_prepend = self._text_prepend diff --git a/plaso/cli/helpers/__init__.py b/plaso/cli/helpers/__init__.py index 7de9cb6ed7..f9bd2e5726 100644 --- a/plaso/cli/helpers/__init__.py +++ b/plaso/cli/helpers/__init__.py @@ -3,6 +3,7 @@ from plaso.cli.helpers import analysis_plugins from plaso.cli.helpers import artifact_definitions +from plaso.cli.helpers import artifact_filters from plaso.cli.helpers import data_location from plaso.cli.helpers import date_filters from plaso.cli.helpers import dynamic_output diff --git a/plaso/cli/helpers/artifact_definitions.py b/plaso/cli/helpers/artifact_definitions.py index 9381dbde86..bb5fbeb869 100644 --- a/plaso/cli/helpers/artifact_definitions.py +++ b/plaso/cli/helpers/artifact_definitions.py @@ -43,6 +43,15 @@ def AddArguments(cls, argument_group): 'quickly collect data of interest, such as specific files or ' 'Windows Registry keys.')) + argument_group.add_argument( + '--custom_artifact_definitions', '--custom-artifact-definitions', + dest='custom_artifact_definitions_path', type=str, metavar='PATH', + action='store', help=( + 'Path to a file containing custom artifact definitions, which are ' + '.yaml files. Artifact definitions can be used to describe and ' + 'quickly collect data of interest, such as specific files or ' + 'Windows Registry keys.')) + @classmethod def ParseOptions(cls, options, configuration_object): """Parses and validates options. @@ -86,11 +95,19 @@ def ParseOptions(cls, options, configuration_object): raise errors.BadConfigOption( 'Unable to determine path to artifact definitions.') + custom_artifacts_path = getattr( + options, 'custom_artifact_definitions_path', None) + registry = artifacts_registry.ArtifactDefinitionsRegistry() reader = artifacts_reader.YamlArtifactsReader() try: registry.ReadFromDirectory(reader, artifacts_path) + if custom_artifacts_path and os.path.isfile(custom_artifacts_path): + registry.ReadFromFile(reader, custom_artifacts_path) + elif custom_artifacts_path and not os.path.isfile(custom_artifacts_path): + raise errors.BadConfigOption( + 'No such artifacts filter file: {0:s}.'.format(custom_artifacts_path)) except (KeyError, artifacts_errors.FormatError) as exception: raise errors.BadConfigOption(( diff --git a/plaso/cli/helpers/artifact_filters.py b/plaso/cli/helpers/artifact_filters.py new file mode 100644 index 0000000000..28441f3322 --- /dev/null +++ b/plaso/cli/helpers/artifact_filters.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +"""The artifacts filter file CLI arguments helper.""" + +from __future__ import unicode_literals + +import os + +from plaso.cli import tools +from plaso.cli.helpers import interface +from plaso.cli.helpers import manager +from plaso.lib import errors + + +class ArtifactFiltersArgumentsHelper(interface.ArgumentsHelper): + """Artifacts filter file CLI arguments helper.""" + + NAME = 'artifact_filters' + DESCRIPTION = 'Artifact filters command line arguments.' + + @classmethod + def AddArguments(cls, argument_group): + """Adds command line arguments to an argument group. + + This function takes an argument parser or an argument group object and adds + to it all the command line arguments this helper supports. + + Args: + argument_group (argparse._ArgumentGroup|argparse.ArgumentParser): + argparse group. + """ + argument_group.add_argument( + '--artifact_filters', '--artifact-filters', + dest='artifact_filters', type=str, default=None, + action='store', help=( + 'Names of forensic artifact definitions, provided in the following' + 'formats. (1) Directly on the command line (comma separated), in a' + 'in a file with one artifact name per line, or one operating system' + 'specific keyword which will process all artifacts supporting that' + 'OS (windows, linux, darwin). Forensic artifacts are stored ' + 'in .yaml files that are directly pulled from the artifact ' + 'definitions project. You can also specify a custom artifacts yaml' + 'file (see --custom_artifact_definitions). Artifact definitions ' + 'can be used to describe and quickly collect data of interest, such' + ' as specific files or Windows Registry keys.')) + + @classmethod + def ParseOptions(cls, options, configuration_object): + """Parses and validates options. + + Args: + options (argparse.Namespace): parser options. + configuration_object (CLITool): object to be configured by the argument + helper. + + Raises: + BadConfigObject: when the configuration object is of the wrong type. + BadConfigOption: if the required artifact definitions are not defined. + """ + + if not isinstance(configuration_object, tools.CLITool): + raise errors.BadConfigObject( + 'Configuration object is not an instance of CLITool') + + artifact_filters = cls._ParseStringOption( + options, 'artifact_filters').lower() + + if artifact_filters and os.path.isfile(artifact_filters): + with open(artifact_filters) as f: + artifact_filters = f.read().splitlines() + elif artifact_filters: + artifact_filters = artifact_filters.split(',') + + setattr(configuration_object, '_artifact_filters', + artifact_filters) + + +manager.ArgumentHelperManager.RegisterHelper(ArtifactFiltersArgumentsHelper) diff --git a/plaso/cli/image_export_tool.py b/plaso/cli/image_export_tool.py index 9a4fc263f6..74f87e9dfb 100644 --- a/plaso/cli/image_export_tool.py +++ b/plaso/cli/image_export_tool.py @@ -7,6 +7,7 @@ import os import textwrap + from dfvfs.helpers import file_system_searcher from dfvfs.lib import errors as dfvfs_errors from dfvfs.path import factory as path_spec_factory @@ -16,9 +17,9 @@ from plaso.analyzers.hashers import manager as hashers_manager from plaso.cli import logger from plaso.cli import storage_media_tool +from plaso.cli import tools from plaso.cli.helpers import manager as helpers_manager from plaso.engine import extractors -from plaso.engine import filter_file from plaso.engine import knowledge_base from plaso.engine import path_helper from plaso.filters import file_entry as file_entry_filters @@ -73,6 +74,7 @@ def __init__(self, input_reader=None, output_writer=None): super(ImageExportTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._abort = False + self._artifact_filters = None self._artifacts_registry = None self._destination_path = None self._digests = {} @@ -287,7 +289,8 @@ def _ExtractFileEntry( # TODO: merge with collector and/or engine. def _ExtractWithFilter( self, source_path_specs, destination_path, output_writer, - filter_file_path, skip_duplicates=True): + artifacts_registry, artifact_filters_path, filter_file_path, + skip_duplicates=True): """Extracts files using a filter expression. This method runs the file extraction process on the image and @@ -297,8 +300,12 @@ def _ExtractWithFilter( source_path_specs (list[dfvfs.PathSpec]): path specifications to extract. destination_path (str): path where the extracted files should be stored. output_writer (CLIOutputWriter): output writer. + artifacts_registry (ArtifactRegistry): Artifacts registry object. + artifact_filters_path (str): path of the file that contains the + names of the artifacts filter definitions or definitions directly + listed comma separated. filter_file_path (str): path of the file that contains the filter - expressions. + expressions or artifact definitions. skip_duplicates (Optional[bool]): True if files with duplicate content should be skipped. """ @@ -314,10 +321,8 @@ def _ExtractWithFilter( output_writer.Write( 'Extracting file entries from: {0:s}\n'.format(display_name)) - environment_variables = self._knowledge_base.GetEnvironmentVariables() - filter_file_object = filter_file.FilterFile(filter_file_path) - find_specs = filter_file_object.BuildFindSpecs( - environment_variables=environment_variables) + find_specs = tools.FindSpecsGetter().GetFindSpecs(artifacts_registry, + artifact_filters_path, filter_file_path, self._knowledge_base) searcher = file_system_searcher.FileSystemSearcher( file_system, mount_point) @@ -399,7 +404,7 @@ def _ParseFilterOptions(self, options): Raises: BadConfigOption: if the options are invalid. """ - names = ['date_filters', 'filter_file'] + names = ['artifact_filters', 'date_filters', 'filter_file'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=names) @@ -416,7 +421,7 @@ def _ParseFilterOptions(self, options): except (IOError, ValueError) as exception: raise errors.BadConfigOption(exception) - if self._filter_file: + if self._artifact_filters or self._filter_file: self.has_filters = True else: self.has_filters = self._filter_collection.HasFilters() @@ -559,7 +564,7 @@ def AddFilterOptions(self, argument_group): Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ - names = ['date_filters', 'filter_file'] + names = ['artifact_filters', 'date_filters', 'filter_file'] helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_group, names=names) @@ -749,9 +754,10 @@ def ProcessSources(self): if not os.path.isdir(self._destination_path): os.makedirs(self._destination_path) - if self._filter_file: + if self._artifact_filters or self._filter_file: self._ExtractWithFilter( self._source_path_specs, self._destination_path, self._output_writer, + self._artifacts_registry, self._artifact_filters, self._filter_file, skip_duplicates=self._skip_duplicates) else: self._Extract( diff --git a/plaso/cli/log2timeline_tool.py b/plaso/cli/log2timeline_tool.py index 13e9a7b732..d24ec9a899 100644 --- a/plaso/cli/log2timeline_tool.py +++ b/plaso/cli/log2timeline_tool.py @@ -23,7 +23,6 @@ from plaso.cli import views from plaso.cli.helpers import manager as helpers_manager from plaso.engine import engine -from plaso.engine import filter_file from plaso.engine import single_process as single_process_engine from plaso.lib import definitions from plaso.lib import errors @@ -167,7 +166,8 @@ def ParseArguments(self): 'extraction arguments') argument_helper_names = [ - 'extraction', 'filter_file', 'hashers', 'parsers', 'yara_rules'] + 'artifact_filters', 'extraction', 'filter_file', 'hashers', + 'parsers', 'yara_rules'] helpers_manager.ArgumentHelperManager.AddCommandLineArguments( extraction_group, names=argument_helper_names) @@ -317,8 +317,9 @@ def ParseOptions(self, options): self._ParseInformationalOptions(options) argument_helper_names = [ - 'artifact_definitions', 'extraction', 'filter_file', 'status_view', - 'storage_file', 'storage_format', 'text_prepend', 'yara_rules'] + 'artifact_definitions', 'artifact_filters', 'extraction', + 'filter_file', 'status_view', 'storage_file', 'storage_format', + 'text_prepend', 'yara_rules'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=argument_helper_names) @@ -370,7 +371,9 @@ def ExtractEventsFromSources(self): self._status_view.SetMode(self._status_view_mode) self._status_view.SetSourceInformation( - self._source_path, self._source_type, filter_file=self._filter_file) + self._source_path, self._source_type, + artifact_filters=self._artifact_filters, + filter_file=self._filter_file) status_update_callback = ( self._status_view.GetExtractionStatusUpdateCallback()) @@ -380,6 +383,7 @@ def ExtractEventsFromSources(self): self._output_writer.Write('Processing started.\n') session = engine.BaseEngine.CreateSession( + artifact_filters=self._artifact_filters, command_line_arguments=self._command_line_arguments, debug_mode=self._debug_mode, filter_file=self._filter_file, @@ -415,13 +419,9 @@ def ExtractEventsFromSources(self): self._SetExtractionParsersAndPlugins(configuration, session) self._SetExtractionPreferredTimeZone(extraction_engine.knowledge_base) - filter_find_specs = None - if configuration.filter_file: - environment_variables = ( - extraction_engine.knowledge_base.GetEnvironmentVariables()) - filter_file_object = filter_file.FilterFile(configuration.filter_file) - filter_find_specs = filter_file_object.BuildFindSpecs( - environment_variables=environment_variables) + filter_find_specs = tools.FindSpecsGetter().GetFindSpecs( + configuration.artifacts_registry, configuration.artifact_filters, + configuration.filter_file, extraction_engine.knowledge_base) processing_status = None if single_process_mode: diff --git a/plaso/cli/pinfo_tool.py b/plaso/cli/pinfo_tool.py index 7e83032f56..c5142d5e00 100644 --- a/plaso/cli/pinfo_tool.py +++ b/plaso/cli/pinfo_tool.py @@ -353,6 +353,7 @@ def _PrintSessionsDetails(self, storage): command_line_arguments = session.command_line_arguments or 'N/A' parser_filter_expression = session.parser_filter_expression or 'N/A' preferred_encoding = session.preferred_encoding or 'N/A' + artifact_filters = session.artifact_filters or 'N/A' filter_file = session.filter_file or 'N/A' title = 'Session: {0!s}'.format(session_identifier) @@ -368,6 +369,7 @@ def _PrintSessionsDetails(self, storage): table_view.AddRow(['Enabled parser and plugins', enabled_parser_names]) table_view.AddRow(['Preferred encoding', preferred_encoding]) table_view.AddRow(['Debug mode', session.debug_mode]) + table_view.AddRow(['Artifact filters', artifact_filters]) table_view.AddRow(['Filter file', filter_file]) table_view.Write(self._output_writer) diff --git a/plaso/cli/psteal_tool.py b/plaso/cli/psteal_tool.py index 1f57cc5990..d54eb186ab 100644 --- a/plaso/cli/psteal_tool.py +++ b/plaso/cli/psteal_tool.py @@ -19,10 +19,10 @@ from plaso.cli import logger from plaso.cli import status_view from plaso.cli import tool_options +from plaso.cli import tools from plaso.cli import views from plaso.cli.helpers import manager as helpers_manager from plaso.engine import engine -from plaso.engine import filter_file from plaso.engine import knowledge_base from plaso.engine import single_process as single_process_engine from plaso.lib import errors @@ -273,7 +273,9 @@ def ExtractEventsFromSources(self): self._status_view.SetMode(self._status_view_mode) self._status_view.SetSourceInformation( - self._source_path, source_type, filter_file=self._filter_file) + self._source_path, source_type, + artifact_filters=self._artifact_filters, + filter_file=self._filter_file) status_update_callback = ( self._status_view.GetExtractionStatusUpdateCallback()) @@ -283,6 +285,7 @@ def ExtractEventsFromSources(self): self._output_writer.Write('Processing started.\n') session = engine.BaseEngine.CreateSession( + artifact_filters=self._artifact_filters, command_line_arguments=self._command_line_arguments, filter_file=self._filter_file, preferred_encoding=self.preferred_encoding, @@ -317,13 +320,9 @@ def ExtractEventsFromSources(self): self._SetExtractionParsersAndPlugins(configuration, session) self._SetExtractionPreferredTimeZone(extraction_engine.knowledge_base) - filter_find_specs = None - if configuration.filter_file: - environment_variables = ( - extraction_engine.knowledge_base.GetEnvironmentVariables()) - filter_file_object = filter_file.FilterFile(configuration.filter_file) - filter_find_specs = filter_file_object.BuildFindSpecs( - environment_variables=environment_variables) + filter_find_specs = tools.FindSpecsGetter().GetFindSpecs( + configuration.artifacts_registry, configuration.artifact_filters, + configuration.filter_file, extraction_engine.knowledge_base) processing_status = None if single_process_mode: @@ -450,7 +449,8 @@ def ParseOptions(self, options): self._ParseTimezoneOption(options) argument_helper_names = [ - 'artifact_definitions', 'hashers', 'language', 'parsers'] + 'artifact_definitions', 'custom_artifact_definitions', + 'hashers', 'language', 'parsers'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=argument_helper_names) diff --git a/plaso/cli/status_view.py b/plaso/cli/status_view.py index 0bba8fe97a..a2b8064643 100644 --- a/plaso/cli/status_view.py +++ b/plaso/cli/status_view.py @@ -45,6 +45,7 @@ def __init__(self, output_writer, tool_name): tool_name (str): namd of the tool. """ super(StatusView, self).__init__() + self._artifact_filters = None self._filter_file = None self._mode = self.MODE_WINDOW self._output_writer = output_writer @@ -318,6 +319,9 @@ def PrintExtractionStatusHeader(self, processing_status): self._output_writer.Write( 'Source type\t: {0:s}\n'.format(self._source_type)) + if self._artifact_filters: + self._output_writer.Write('Artifact filters \t: {0:s}\n'.format( + self._artifact_filters)) if self._filter_file: self._output_writer.Write('Filter file\t: {0:s}\n'.format( self._filter_file)) @@ -400,14 +404,18 @@ def SetMode(self, mode): """ self._mode = mode - def SetSourceInformation(self, source_path, source_type, filter_file=None): + def SetSourceInformation( + self, source_path, source_type, artifact_filters=None, + filter_file=None): """Sets the source information. Args: source_path (str): path of the source. source_type (str): source type. + artifact_filters (Optional[str]): artifact filters. filter_file (Optional[str]): filter file. """ + self._artifact_filters = artifact_filters self._filter_file = filter_file self._source_path = source_path self._source_type = self._SOURCE_TYPES.get(source_type, 'UNKNOWN') diff --git a/plaso/cli/storage_media_tool.py b/plaso/cli/storage_media_tool.py index 0b8da0a90c..29ddbe72c5 100644 --- a/plaso/cli/storage_media_tool.py +++ b/plaso/cli/storage_media_tool.py @@ -63,6 +63,7 @@ def __init__(self, input_reader=None, output_writer=None): """ super(StorageMediaTool, self).__init__( input_reader=input_reader, output_writer=output_writer) + self._artifact_filters = None self._credentials = [] self._credential_configurations = [] self._filter_file = None diff --git a/plaso/cli/tools.py b/plaso/cli/tools.py index 5bee062107..d4c7a58904 100644 --- a/plaso/cli/tools.py +++ b/plaso/cli/tools.py @@ -13,10 +13,14 @@ except ImportError: resource = None +from artifacts import definitions as artifact_types + import plaso from plaso.cli import logger from plaso.cli import views +from plaso.engine import artifact_filters +from plaso.engine import filter_file from plaso.lib import errors from plaso.lib import py2to3 @@ -537,3 +541,38 @@ def Write(self, string): # sys.stdout.write() on Python 3 by default will error if string is # of type bytes. sys.stdout.write(string) + + +class FindSpecsGetter(object): + """Get FindSpecs for CLI Tools.""" + + def GetFindSpecs( + self, artifacts_registry, artifact_filter_names, filter_file_path, + knowledge_base): + """Get Find Specs from artifacts or filter file if available. + + Args: + artifact_filters (str): Path to file listing artifact filters by + name or artifact names listed directly, comma separated. + filter_file_path (str): Path of filter file. + knowledge_base (KnowledgeBase): Knowledge base. + """ + if artifact_filter_names and filter_file_path: + raise RuntimeError('Please only specify one type of filter file, ' + 'artifacts or file filters.') + + environment_variables = knowledge_base.GetEnvironmentVariables() + find_specs = None + if artifact_filter_names: + artifact_filters_object = ( + artifact_filters.ArtifactFilters( + artifacts_registry, artifact_filter_names, knowledge_base)) + find_specs = artifact_filters_object.BuildFindSpecs( + environment_variables=environment_variables)[ + artifact_types.TYPE_INDICATOR_FILE] + elif filter_file_path: + filter_file_object = filter_file.FilterFile(filter_file_path) + find_specs = filter_file_object.BuildFindSpecs( + environment_variables=environment_variables) + + return find_specs diff --git a/plaso/containers/sessions.py b/plaso/containers/sessions.py index eac9df76c7..d68d2b6ad6 100644 --- a/plaso/containers/sessions.py +++ b/plaso/containers/sessions.py @@ -19,6 +19,8 @@ class Session(interface.AttributeContainer): aborted (bool): True if the session was aborted. analysis_reports_counter (collections.Counter): number of analysis reports per analysis plugin. + artifact_filters (str): Names of artifact definitions + that are used for filtering file system and Windows Registry key paths. command_line_arguments (str): command line arguments. completion_time (int): time that the session was completed. Contains the number of micro seconds since January 1, 1970, 00:00:00 UTC. @@ -47,6 +49,7 @@ def __init__(self): super(Session, self).__init__() self.aborted = False self.analysis_reports_counter = collections.Counter() + self.artifact_filters = None self.command_line_arguments = None self.completion_time = None self.debug_mode = False @@ -97,6 +100,7 @@ def CopyAttributesFromSessionStart(self, session_start): Args: session_start (SessionStart): session start attribute container. """ + self.artifact_filters = session_start.artifact_filters self.command_line_arguments = session_start.command_line_arguments self.debug_mode = session_start.debug_mode self.enabled_parser_names = session_start.enabled_parser_names @@ -133,6 +137,7 @@ def CreateSessionStart(self): SessionStart: session start attribute container. """ session_start = SessionStart() + session_start.artifact_filters = self.artifact_filters session_start.command_line_arguments = self.command_line_arguments session_start.debug_mode = self.debug_mode session_start.enabled_parser_names = self.enabled_parser_names @@ -184,6 +189,7 @@ class SessionStart(interface.AttributeContainer): """Session start attribute container. Attributes: + artifact_filters (str): names of artifact definitions. command_line_arguments (str): command line arguments. debug_mode (bool): True if debug mode was enabled. enabled_parser_names (list[str]): parser and parser plugin names that @@ -211,6 +217,7 @@ def __init__(self, identifier=None): session completion information. """ super(SessionStart, self).__init__() + self.artifact_filters = None self.command_line_arguments = None self.debug_mode = False self.enabled_parser_names = None diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py deleted file mode 100644 index 7a3d91ff78..0000000000 --- a/plaso/engine/artifact_filters.py +++ /dev/null @@ -1,201 +0,0 @@ -# -*- coding: utf-8 -*- -"""Helper to create filters based on forensic artifact definitions.""" - -from __future__ import unicode_literals - -from artifacts import definitions as artifact_types - -from dfvfs.helpers import file_system_searcher -from dfwinreg import registry_searcher -from plaso.engine import logger -from plaso.engine import path_helper - - -class ArtifactDefinitionsFilterHelper(object): - """Helper to create filters based on artifact definitions. - - Builds extraction filters from forensic artifact definitions. - - For more information about Forensic Artifacts see: - https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc - """ - - _KNOWLEDGE_BASE_VALUE = 'ARTIFACT_FILTERS' - - _COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES = ['HKEY_LOCAL_MACHINE'] - - def __init__(self, artifacts_registry, artifact_definitions, knowledge_base): - """Initializes an artifact definitions filter helper. - - Args: - artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact - definitions registry. - artifact_definitions (list[str]): artifact definition names to filter. - path (str): path to a file that contains one or more artifact definitions. - knowledge_base (KnowledgeBase): contains information from the source - data needed for filtering. - """ - super(ArtifactDefinitionsFilterHelper, self).__init__() - self._artifacts = artifact_definitions - self._artifacts_registry = artifacts_registry - self._knowledge_base = knowledge_base - - def _CheckKeyCompatibility(self, key_path): - """Checks if a Windows Registry key path is supported by dfWinReg. - - Args: - key_path (str): path of the Windows Registry key. - - Returns: - bool: True if key is compatible or False if not. - """ - for key_path_prefix in self._COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES: - if key_path.startswith(key_path_prefix): - return True - - logger.warning( - 'Prefix of key "{0:s}" is currently not supported'.format(key_path)) - return False - - def BuildFindSpecs(self, environment_variables=None): - """Builds find specifications from artifact definitions. - - The resulting find specifications are set in the knowledge base. - - Args: - environment_variables (Optional[list[EnvironmentVariableArtifact]]): - environment variables. - """ - find_specs_per_source_type = { - artifact_types.TYPE_INDICATOR_FILE: [], - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY: []} - - for name in self._artifacts: - definition = self._artifacts_registry.GetDefinitionByName(name) - if not definition: - continue - - for source in definition.sources: - if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: - # TODO: move source.paths iteration into - # BuildFindSpecsFromFileArtifact. - for path_entry in set(source.paths): - find_specs = self.BuildFindSpecsFromFileArtifact( - path_entry, source.separator, environment_variables, - self._knowledge_base.user_accounts) - find_specs_per_source_type[ - artifact_types.TYPE_INDICATOR_FILE].extend(find_specs) - - elif (source.type_indicator == - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): - # TODO: move source.keys iteration into - # BuildFindSpecsFromRegistryArtifact. - for key_path in set(source.keys): - if self._CheckKeyCompatibility(key_path): - find_specs = self.BuildFindSpecsFromRegistryArtifact(key_path) - find_specs_per_source_type[ - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].extend( - find_specs) - - elif (source.type_indicator == - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): - # TODO: Handle Registry Values Once Supported in dfwinreg. - # https://github.com/log2timeline/dfwinreg/issues/98 - logger.warning(( - 'Windows Registry values are not supported, extracting key: ' - '"{0!s}"').format(source.key_value_pairs)) - - # TODO: move source.key_value_pairs iteration into - # BuildFindSpecsFromRegistryArtifact. - for key_path in set([ - key_path for key_path, _ in source.key_value_pairs]): - if self._CheckKeyCompatibility(key_path): - find_specs = self.BuildFindSpecsFromRegistryArtifact(key_path) - find_specs_per_source_type[ - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].extend( - find_specs) - - else: - logger.warning( - 'Unsupported artifact definition source type: "{0:s}"'.format( - source.type_indicator)) - - self._knowledge_base.SetValue( - self._KNOWLEDGE_BASE_VALUE, find_specs_per_source_type) - - def BuildFindSpecsFromFileArtifact( - self, source_path, path_separator, environment_variables, user_accounts): - """Builds find specifications from a file source type. - - Args: - source_path (str): file system path defined by the source. - path_separator (str): file system path segment separator. - environment_variables list(str): environment variable attributes used to - dynamically populate environment variables in key. - user_accounts (list[str]): identified user accounts stored in the - knowledge base. - - Returns: - list[dfvfs.FindSpec]: find specifications for the file source type. - """ - find_specs = [] - for glob_path in path_helper.PathHelper.ExpandRecursiveGlobs( - source_path, path_separator): - for path in path_helper.PathHelper.ExpandUsersHomeDirectoryPath( - glob_path, user_accounts): - if '%' in path: - path = path_helper.PathHelper.ExpandWindowsPath( - path, environment_variables) - - if not path.startswith(path_separator): - logger.warning(( - 'The path filter must be defined as an absolute path: ' - '"{0:s}"').format(path)) - continue - - # Convert the path filters into a list of path segments and - # strip the root path segment. - path_segments = path.split(path_separator) - - # Remove initial root entry - path_segments.pop(0) - - if not path_segments[-1]: - logger.warning( - 'Empty last path segment in path filter: "{0:s}"'.format(path)) - path_segments.pop(-1) - - try: - find_spec = file_system_searcher.FindSpec( - location_glob=path_segments, case_sensitive=False) - except ValueError as exception: - logger.error(( - 'Unable to build find specification for path: "{0:s}" with ' - 'error: {1!s}').format(path, exception)) - continue - - find_specs.append(find_spec) - - return find_specs - - def BuildFindSpecsFromRegistryArtifact(self, source_key_path): - """Build find specifications from a Windows Registry source type. - - Args: - source_key_path (str): Windows Registry key path defined by the source. - - Returns: - list[dfwinreg.FindSpec]: find specifications for the Windows Registry - source type. - """ - find_specs = [] - for key_path in path_helper.PathHelper.ExpandRecursiveGlobs( - source_key_path, '\\'): - if '%%' in key_path: - logger.error('Unable to expand key path: "{0:s}"'.format(key_path)) - continue - - find_spec = registry_searcher.FindSpec(key_path_glob=key_path) - find_specs.append(find_spec) - - return find_specs diff --git a/plaso/engine/configurations.py b/plaso/engine/configurations.py index 4c36c90cf7..91b453b8e0 100644 --- a/plaso/engine/configurations.py +++ b/plaso/engine/configurations.py @@ -190,6 +190,7 @@ class ProcessingConfiguration(interface.AttributeContainer): """Configuration settings for processing. Attributes: + artifact_filters (str): artifact filters definitions. credentials (list[CredentialConfiguration]): credential configurations. data_location (str): path to the data files. debug_output (bool): True if debug output should be enabled. @@ -211,6 +212,8 @@ class ProcessingConfiguration(interface.AttributeContainer): def __init__(self): """Initializes a process configuration object.""" super(ProcessingConfiguration, self).__init__() + self.artifacts_registry = None + self.artifact_filters = None self.credentials = [] self.data_location = None self.debug_output = False diff --git a/plaso/engine/engine.py b/plaso/engine/engine.py index a1813f25a2..3311d5c975 100644 --- a/plaso/engine/engine.py +++ b/plaso/engine/engine.py @@ -162,12 +162,13 @@ def _StopProfiling(self): @classmethod def CreateSession( - cls, command_line_arguments=None, debug_mode=False, - filter_file=None, preferred_encoding='utf-8', + cls, artifact_filters=None, command_line_arguments=None, + debug_mode=False, filter_file=None, preferred_encoding='utf-8', preferred_time_zone=None, preferred_year=None): """Creates a session attribute container. Args: + artifact_filters (Optional[str]): Artifact filters definitions. command_line_arguments (Optional[str]): the command line arguments. debug_mode (bool): True if debug mode was enabled. filter_file (Optional[str]): path to a file with find specifications. @@ -180,6 +181,7 @@ def CreateSession( """ session = sessions.Session() + session.artifact_filters = artifact_filters session.command_line_arguments = command_line_arguments session.debug_mode = debug_mode session.filter_file = filter_file diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py deleted file mode 100644 index 190c86bcfc..0000000000 --- a/plaso/engine/path_helper.py +++ /dev/null @@ -1,255 +0,0 @@ -# -*- coding: utf-8 -*- -"""The path helper.""" - -from __future__ import unicode_literals - -import re - -from dfvfs.lib import definitions as dfvfs_definitions - -from plaso.engine import logger -from plaso.lib import py2to3 - - -class PathHelper(object): - """Class that implements the path helper.""" - - _RECURSIVE_GLOB_LIMIT = 10 - - @classmethod - def AppendPathEntries(cls, path, path_separator, count, skip_first): - """Appends wildcard entries to end of path. - - Will append wildcard * to given path building a list of strings for "count" - iterations, skipping the first directory if skip_first is true. - - Args: - path (str): Path to append wildcards to. - path_separator (str): path segment separator. - count (int): Number of entries to be appended. - skip_first (bool): Whether or not to skip first entry to append. - - Returns: - list[str]: Paths that were expanded from the path with wildcards. - """ - paths = [] - replacement = '{0:s}*'.format(path_separator) - - iteration = 0 - while iteration < count: - if skip_first and iteration == 0: - path += replacement - else: - path += replacement - paths.append(path) - iteration += 1 - - return paths - - @classmethod - def ExpandRecursiveGlobs(cls, path, path_separator): - """Expands recursive like globs present in an artifact path. - - If a path ends in '**', with up to two optional digits such as '**10', - the '**' will recursively match all files and zero or more directories - from the specified path. The optional digits indicate the recursion depth. - By default recursion depth is 10 directories. - - If the glob is followed by the specified path segment separator, only - directories and subdirectories will be matched. - - Args: - path (str): path to be expanded. - path_separator (str): path segment separator. - - Returns: - list[str]: String path expanded for each glob. - """ - glob_regex = r'(.*)?{0}\*\*(\d{{1,2}})?({0})?$'.format( - re.escape(path_separator)) - - match = re.search(glob_regex, path) - if not match: - return [path] - - skip_first = False - if match.group(3): - skip_first = True - if match.group(2): - iterations = int(match.group(2)) - else: - iterations = cls._RECURSIVE_GLOB_LIMIT - logger.warning(( - 'Path "{0:s}" contains fully recursive glob, limiting to 10 ' - 'levels').format(path)) - - return cls.AppendPathEntries( - match.group(1), path_separator, iterations, skip_first) - - @classmethod - def ExpandUsersHomeDirectoryPath(cls, path, user_accounts): - """Expands a path to contain all users home or profile directories. - - Expands the GRR artifacts path variable "%%users.homedir%%". - - Args: - path (str): Windows path with environment variables. - user_accounts (list[UserAccountArtifact]): user accounts. - - Returns: - list [str]: paths returned for user accounts without a drive letter. - """ - path_upper_case = path.upper() - if not path_upper_case.startswith('%%USERS.HOMEDIR%%'): - user_paths = [path] - else: - regex = re.compile(re.escape('%%users.homedir%%')) - - user_paths = [] - for user_account in user_accounts: - user_path = regex.sub(user_account.user_directory, path, re.IGNORECASE) - user_paths.append(user_path) - - # Remove the drive letter, if it exists. - for path_index, user_path in enumerate(user_paths): - if len(user_path) > 2 and user_path[1] == ':': - _, _, user_path = user_path.rpartition(':') - user_paths[path_index] = user_path - - return user_paths - - @classmethod - def ExpandWindowsPath(cls, path, environment_variables): - """Expands a Windows path containing environment variables. - - Args: - path (str): Windows path with environment variables. - environment_variables (list[EnvironmentVariableArtifact]): environment - variables. - - Returns: - str: expanded Windows path. - """ - # TODO: Add support for items such as %%users.localappdata%% - - if environment_variables is None: - environment_variables = [] - - lookup_table = {} - if environment_variables: - for environment_variable in environment_variables: - attribute_name = environment_variable.name.upper() - attribute_value = environment_variable.value - if not isinstance(attribute_value, py2to3.STRING_TYPES): - continue - - lookup_table[attribute_name] = attribute_value - - path_segments = path.split('\\') - for index, path_segment in enumerate(path_segments): - if (len(path_segment) <= 2 or not path_segment.startswith('%') or - not path_segment.endswith('%')): - continue - - check_for_drive_letter = False - path_segment_upper_case = path_segment.upper() - if path_segment_upper_case.startswith('%%ENVIRON_'): - lookup_key = path_segment_upper_case[10:-2] - check_for_drive_letter = True - else: - lookup_key = path_segment_upper_case[1:-1] - path_segments[index] = lookup_table.get(lookup_key, path_segment) - - if check_for_drive_letter: - # Remove the drive letter. - if len(path_segments[index]) >= 2 and path_segments[index][1] == ':': - _, _, path_segments[index] = path_segments[index].rpartition(':') - - return '\\'.join(path_segments) - - @classmethod - def GetDisplayNameForPathSpec( - cls, path_spec, mount_path=None, text_prepend=None): - """Retrieves the display name of a path specification. - - Args: - path_spec (dfvfs.PathSpec): path specification. - mount_path (Optional[str]): path where the file system that is used - by the path specification is mounted, such as "/mnt/image". The - mount path will be stripped from the absolute path defined by - the path specification. - text_prepend (Optional[str]): text to prepend. - - Returns: - str: human readable version of the path specification or None. - """ - if not path_spec: - return None - - relative_path = cls.GetRelativePathForPathSpec( - path_spec, mount_path=mount_path) - if not relative_path: - return path_spec.type_indicator - - if text_prepend: - relative_path = '{0:s}{1:s}'.format(text_prepend, relative_path) - - parent_path_spec = path_spec.parent - if parent_path_spec and path_spec.type_indicator in [ - dfvfs_definitions.TYPE_INDICATOR_BZIP2, - dfvfs_definitions.TYPE_INDICATOR_GZIP]: - parent_path_spec = parent_path_spec.parent - - if parent_path_spec and parent_path_spec.type_indicator in [ - dfvfs_definitions.TYPE_INDICATOR_VSHADOW]: - store_index = getattr(path_spec.parent, 'store_index', None) - if store_index is not None: - return 'VSS{0:d}:{1:s}:{2:s}'.format( - store_index + 1, path_spec.type_indicator, relative_path) - - return '{0:s}:{1:s}'.format(path_spec.type_indicator, relative_path) - - @classmethod - def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): - """Retrieves the relative path of a path specification. - - If a mount path is defined the path will be relative to the mount point, - otherwise the path is relative to the root of the file system that is used - by the path specification. - - Args: - path_spec (dfvfs.PathSpec): path specification. - mount_path (Optional[str]): path where the file system that is used - by the path specification is mounted, such as "/mnt/image". The - mount path will be stripped from the absolute path defined by - the path specification. - - Returns: - str: relative path or None. - """ - if not path_spec: - return None - - # TODO: Solve this differently, quite possibly inside dfVFS using mount - # path spec. - location = getattr(path_spec, 'location', None) - if not location and path_spec.HasParent(): - location = getattr(path_spec.parent, 'location', None) - - if not location: - return None - - data_stream = getattr(path_spec, 'data_stream', None) - if data_stream: - location = '{0:s}:{1:s}'.format(location, data_stream) - - if path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_OS: - return location - - # If we are parsing a mount point we don't want to include the full - # path to file's location here, we are only interested in the path - # relative to the mount point. - if mount_path and location.startswith(mount_path): - location = location[len(mount_path):] - - return location diff --git a/plaso/parsers/winreg.py b/plaso/parsers/winreg.py index 5cb4a6e252..47a134a188 100644 --- a/plaso/parsers/winreg.py +++ b/plaso/parsers/winreg.py @@ -3,11 +3,15 @@ from __future__ import unicode_literals +from artifacts import definitions as artifact_types from dfwinreg import errors as dfwinreg_errors from dfwinreg import interface as dfwinreg_interface from dfwinreg import regf as dfwinreg_regf from dfwinreg import registry as dfwinreg_registry +from dfwinreg import registry_searcher as dfwinreg_registry_searcher +from plaso.engine import artifact_filters +from plaso.engine import logger from plaso.lib import specification from plaso.filters import path_filter from plaso.parsers import interface @@ -158,6 +162,30 @@ def _NormalizeKeyPath(self, key_path): return ''.join([ self._NORMALIZED_CONTROL_SET_PREFIX, normalized_key_path[39:]]) + def _ParseKey(self, parser_mediator, registry_key): + """Parses the Registry key with a specific plugin. + + Args: + parser_mediator (ParserMediator): parser mediator. + registry_key (dfwinreg.WinRegistryKey): Windwos Registry key. + """ + matching_plugin = None + + normalized_key_path = self._NormalizeKeyPath(registry_key.path) + if self._path_filter.CheckPath(normalized_key_path): + matching_plugin = self._plugin_per_key_path[normalized_key_path] + else: + for plugin in self._plugins_without_key_paths: + if self._CanProcessKeyWithPlugin(registry_key, plugin): + matching_plugin = plugin + break + + if not matching_plugin: + matching_plugin = self._default_plugin + + if matching_plugin: + self._ParseKeyWithPlugin(parser_mediator, registry_key, matching_plugin) + def _ParseRecurseKeys(self, parser_mediator, root_key): """Parses the Registry keys recursively. @@ -169,23 +197,25 @@ def _ParseRecurseKeys(self, parser_mediator, root_key): if parser_mediator.abort: break - matching_plugin = None + self._ParseKey(parser_mediator, registry_key) - normalized_key_path = self._NormalizeKeyPath(registry_key.path) - if self._path_filter.CheckPath(normalized_key_path): - matching_plugin = self._plugin_per_key_path[normalized_key_path] - else: - for plugin in self._plugins_without_key_paths: - if self._CanProcessKeyWithPlugin(registry_key, plugin): - matching_plugin = plugin - break + def _ParseKeysFromFindSpecs(self, parser_mediator, win_registry, find_specs): + """Parses the Registry keys from FindSpecs. - if not matching_plugin: - matching_plugin = self._default_plugin + Args: + parser_mediator (ParserMediator): parser mediator. + win_registry (dfwinreg.WinRegistryKey): root Windows Registry key. + find_specs (dfwinreg.FindSpecs): Keys to search for. + """ + searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) + for registry_key_path in list(searcher.Find(find_specs=find_specs)): + if parser_mediator.abort: + break + + registry_key = searcher.GetKeyByPath(registry_key_path) + self._ParseKey(parser_mediator, registry_key) - if matching_plugin: - self._ParseKeyWithPlugin(parser_mediator, registry_key, matching_plugin) def ParseFileObject(self, parser_mediator, file_object, **kwargs): """Parses a Windows Registry file-like object. @@ -205,16 +235,43 @@ def ParseFileObject(self, parser_mediator, file_object, **kwargs): return win_registry = dfwinreg_registry.WinRegistry() + key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) registry_file.SetKeyPathPrefix(key_path_prefix) root_key = registry_file.GetRootKey() if not root_key: return - try: - self._ParseRecurseKeys(parser_mediator, root_key) - except IOError as exception: - parser_mediator.ProduceExtractionError('{0:s}'.format(exception)) - + find_specs = parser_mediator.knowledge_base.GetValue( + artifact_filters.ARTIFACT_FILTERS) + + registry_find_specs = None + if find_specs: + registry_find_specs = find_specs.get( + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY) + + key_path_compatible = False + if (key_path_prefix.upper() in + artifact_filters.COMPATIBLE_DFWINREG_KEYS): + key_path_compatible = True + + if registry_find_specs and key_path_compatible: + try: + win_registry.MapFile(key_path_prefix, registry_file) + self._ParseKeysFromFindSpecs( + parser_mediator, win_registry, registry_find_specs) + # TODO: This shouldn't be necessary, check with dfwinreg + win_registry._registry_files.clear() + except IOError as exception: + parser_mediator.ProduceExtractionError('{0:s}'.format(exception)) + else: + if registry_find_specs and key_path_compatible: + logger.warning('Artifacts Registry Filters are not supported for ' + 'the registry prefix {0:s}. ' + 'Parsing entire file.'.format(key_path_prefix)) + try: + self._ParseRecurseKeys(parser_mediator, root_key) + except IOError as exception: + parser_mediator.ProduceExtractionError('{0:s}'.format(exception)) manager.ParsersManager.RegisterParser(WinRegistryParser) diff --git a/test_data/artifacts/artifacts_filters.yaml b/test_data/artifacts/artifacts_filters.yaml deleted file mode 100644 index 7de1896d7f..0000000000 --- a/test_data/artifacts/artifacts_filters.yaml +++ /dev/null @@ -1,67 +0,0 @@ -# Artifact definitions. - -name: TestFiles -doc: Test Doc -sources: -- type: FILE - attributes: - paths: ['%%environ_systemdrive%%\AUTHORS'] - separator: '\' -labels: [System] -supported_os: [Windows] ---- -name: TestFiles2 -doc: Test Doc2 -sources: -- type: FILE - attributes: - paths: - - '%%environ_systemdrive%%\test_data\*.evtx' - - '%%users.homedir%%\Documents\WindowsPowerShell\profile.ps1' - - '\test_data\testdir\filter_*.txt' - - '\does_not_exist\some_file_*.txt' - - '\globbed\test\path\**\' - - 'failing' - separator: '\' -labels: [System] -supported_os: [Windows] ---- -name: TestRegistry -doc: Test Registry Doc -sources: -- type: REGISTRY_KEY - attributes: - keys: ['HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\SecurityProviders\*'] -supported_os: [Windows] ---- -name: TestRegistryKey -doc: Test Registry Doc Key -sources: -- type: REGISTRY_KEY - attributes: - keys: - - 'HKEY_LOCAL_MACHINE\System\ControlSet001\services\**\' - - 'HKEY_LOCAL_MACHINE\System\ControlSet002\services\**\' - - 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR' - - 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR\**' -supported_os: [Windows] ---- -name: TestRegistryValue -doc: Test Registry Doc Value -sources: -- type: REGISTRY_VALUE - attributes: - key_value_pairs: - - {key: 'HKEY_LOCAL_MACHINE\System\ControlSet001\Control\Session Manager', value: 'BootExecute'} - - {key: 'HKEY_LOCAL_MACHINE\System\ControlSet002\Control\Session Manager', value: 'BootExecute'} -supported_os: [Windows] ---- -name: TestFilesImageExport -doc: Test Doc -sources: -- type: FILE - attributes: - paths: ['\a_directory\*_file'] - separator: '\' -labels: [System] -supported_os: [Windows] \ No newline at end of file diff --git a/tests/cli/helpers/artifact_definitions.py b/tests/cli/helpers/artifact_definitions.py index 411708c0bf..c672075752 100644 --- a/tests/cli/helpers/artifact_definitions.py +++ b/tests/cli/helpers/artifact_definitions.py @@ -22,6 +22,7 @@ class ArtifactDefinitionsArgumentsHelperTest(cli_test_lib.CLIToolTestCase): _EXPECTED_OUTPUT = """\ usage: cli_helper.py [--artifact_definitions PATH] + [--custom_artifact_definitions PATH] Test argument parser. @@ -31,6 +32,11 @@ class ArtifactDefinitionsArgumentsHelperTest(cli_test_lib.CLIToolTestCase): which are .yaml files. Artifact definitions can be used to describe and quickly collect data of interest, such as specific files or Windows Registry keys. + --custom_artifact_definitions PATH, --custom-artifact-definitions PATH + Path to a file containing custom artifact definitions, + which are .yaml files. Artifact definitions can be + used to describe and quickly collect data of interest, + such as specific files or Windows Registry keys. """ def testAddArguments(self): diff --git a/tests/cli/image_export_tool.py b/tests/cli/image_export_tool.py index 4505b9d525..93e9a7df30 100644 --- a/tests/cli/image_export_tool.py +++ b/tests/cli/image_export_tool.py @@ -398,6 +398,35 @@ def testProcessSourcesExtractWithFilter(self): self.assertEqual(sorted(extracted_files), expected_extracted_files) + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) + @shared_test_lib.skipUnlessHasTestFile(['image.qcow2']) + def testProcessSourcesExtractWithArtifactsFilter(self): + """Tests the ProcessSources function with a filter file.""" + output_writer = test_lib.TestOutputWriter(encoding='utf-8') + test_tool = image_export_tool.ImageExportTool(output_writer=output_writer) + + options = test_lib.TestOptions() + options.artifact_definitions_path = self._GetTestFilePath(['artifacts']) + options.image = self._GetTestFilePath(['image.qcow2']) + options.quiet = True + options.artifact_filters = 'TestFilesImageExport' + + with shared_test_lib.TempDirectory() as temp_directory: + options.path = temp_directory + + test_tool.ParseOptions(options) + + test_tool.ProcessSources() + + expected_extracted_files = sorted([ + os.path.join(temp_directory, 'a_directory'), + os.path.join(temp_directory, 'a_directory', 'another_file'), + os.path.join(temp_directory, 'a_directory', 'a_file')]) + + extracted_files = self._RecursiveList(temp_directory) + + self.assertEqual(sorted(extracted_files), expected_extracted_files) + @shared_test_lib.skipUnlessHasTestFile(['syslog_image.dd']) def testProcessSourcesExtractWithSignaturesFilter(self): """Tests the ProcessSources function with a signatures filter.""" diff --git a/tests/cli/pinfo_tool.py b/tests/cli/pinfo_tool.py index 5d47115537..eec31cc2fe 100644 --- a/tests/cli/pinfo_tool.py +++ b/tests/cli/pinfo_tool.py @@ -185,6 +185,7 @@ def testPrintStorageInformationAsText(self): table_view.AddRow(['Enabled parser and plugins', enabled_parser_names]) table_view.AddRow(['Preferred encoding', 'UTF-8']) table_view.AddRow(['Debug mode', 'False']) + table_view.AddRow(['Artifact filters', 'N/A']) table_view.AddRow(['Filter file', 'N/A']) table_view.Write(output_writer) diff --git a/tests/cli/psteal_tool.py b/tests/cli/psteal_tool.py index f8773d28d7..0ec50d5795 100644 --- a/tests/cli/psteal_tool.py +++ b/tests/cli/psteal_tool.py @@ -119,6 +119,7 @@ def testParseOptions(self): test_tool = psteal_tool.PstealTool(output_writer=output_writer) options = test_lib.TestOptions() + options.artifact_definitions_path = self._GetTestFilePath(['artifacts']) options.source = 'source' # Test when the output file is missing. expected_error = 'Output format: dynamic requires an output file' diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py deleted file mode 100644 index 41a43e0e88..0000000000 --- a/tests/engine/artifact_filters.py +++ /dev/null @@ -1,246 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -"""Tests for the artifacts file filter functions.""" - -from __future__ import unicode_literals - -import unittest - -from artifacts import definitions as artifact_types -from artifacts import reader as artifacts_reader -from artifacts import registry as artifacts_registry - -from dfwinreg import registry as dfwinreg_registry -from dfwinreg import registry_searcher as dfwinreg_registry_searcher - -from dfvfs.helpers import file_system_searcher -from dfvfs.lib import definitions as dfvfs_definitions -from dfvfs.path import factory as path_spec_factory -from dfvfs.resolver import resolver as path_spec_resolver - -from plaso.containers import artifacts -from plaso.engine import artifact_filters -from plaso.engine import knowledge_base as knowledge_base_engine -from plaso.parsers import winreg as windows_registry_parser - -from tests import test_lib as shared_test_lib - - -class ArtifactDefinitionsFilterHelperTest(shared_test_lib.BaseTestCase): - """Tests for artifact definitions filter helper.""" - - # pylint: disable=protected-access - - def _CreateTestArtifactDefinitionsFilterHelper( - self, artifact_definitions, knowledge_base): - """Creates an artifact definitions filter helper for testing. - - Args: - artifact_definitions (list[str]): artifact definition names to filter. - knowledge_base (KnowledgeBase): contains information from the source - data needed for filtering. - - Returns: - ArtifactDefinitionsFilterHelper: artifact definitions filter helper. - """ - registry = artifacts_registry.ArtifactDefinitionsRegistry() - reader = artifacts_reader.YamlArtifactsReader() - - test_artifacts_path = self._GetTestFilePath(['artifacts']) - registry.ReadFromDirectory(reader, test_artifacts_path) - - return artifact_filters.ArtifactDefinitionsFilterHelper( - registry, artifact_definitions, knowledge_base) - - @shared_test_lib.skipUnlessHasTestFile(['artifacts']) - @shared_test_lib.skipUnlessHasTestFile(['System.evtx']) - @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_1.txt']) - @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_3.txt']) - def testBuildFindSpecsWithFileSystem(self): - """Tests the BuildFindSpecs function for file type artifacts.""" - knowledge_base = knowledge_base_engine.KnowledgeBase() - - testuser1 = artifacts.UserAccountArtifact( - identifier='1000', - user_directory='C:\\\\Users\\\\testuser1', - username='testuser1') - knowledge_base.AddUserAccount(testuser1) - - testuser2 = artifacts.UserAccountArtifact( - identifier='1001', - user_directory='C:\\\\Users\\\\testuser2', - username='testuser2') - knowledge_base.AddUserAccount(testuser2) - - test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( - ['TestFiles', 'TestFiles2'], knowledge_base) - - environment_variable = artifacts.EnvironmentVariableArtifact( - case_sensitive=False, name='SystemDrive', value='C:') - - test_filter_file.BuildFindSpecs( - environment_variables=[environment_variable]) - find_specs_per_source_type = knowledge_base.GetValue( - test_filter_file._KNOWLEDGE_BASE_VALUE) - find_specs = find_specs_per_source_type.get( - artifact_types.TYPE_INDICATOR_FILE, []) - - # Should build 15 FindSpec entries. - self.assertEqual(len(find_specs), 15) - - # Last find_spec should contain the testuser2 profile path. - location_segments = sorted([ - find_spec._location_segments for find_spec in find_specs]) - path_segments = [ - 'Users', 'testuser2', 'Documents', 'WindowsPowerShell', 'profile\\.ps1'] - self.assertEqual(location_segments[2], path_segments) - - path_spec = path_spec_factory.Factory.NewPathSpec( - dfvfs_definitions.TYPE_INDICATOR_OS, location='.') - file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec) - searcher = file_system_searcher.FileSystemSearcher( - file_system, path_spec) - - path_spec_generator = searcher.Find(find_specs=find_specs) - self.assertIsNotNone(path_spec_generator) - - path_specs = list(path_spec_generator) - - # Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files, - # total 6 path specifications. - self.assertEqual(len(path_specs), 6) - - file_system.Close() - - @shared_test_lib.skipUnlessHasTestFile(['artifacts']) - @shared_test_lib.skipUnlessHasTestFile(['SYSTEM']) - def testBuildFindSpecsWithRegistry(self): - """Tests the BuildFindSpecs function on Windows Registry artifacts.""" - knowledge_base = knowledge_base_engine.KnowledgeBase() - test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( - ['TestRegistry'], knowledge_base) - - test_filter_file.BuildFindSpecs(environment_variables=None) - find_specs_per_source_type = knowledge_base.GetValue( - test_filter_file._KNOWLEDGE_BASE_VALUE) - find_specs = find_specs_per_source_type.get( - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY, []) - - self.assertEqual(len(find_specs), 1) - - win_registry_reader = ( - windows_registry_parser.FileObjectWinRegistryFileReader()) - - file_entry = self._GetTestFileEntry(['SYSTEM']) - file_object = file_entry.GetFileObject() - - registry_file = win_registry_reader.Open(file_object) - - win_registry = dfwinreg_registry.WinRegistry() - key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) - registry_file.SetKeyPathPrefix(key_path_prefix) - win_registry.MapFile(key_path_prefix, registry_file) - - searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) - key_paths = list(searcher.Find(find_specs=find_specs)) - - self.assertIsNotNone(key_paths) - - # Three key paths found. - self.assertEqual(len(key_paths), 3) - - def testCheckKeyCompatibility(self): - """Tests the _CheckKeyCompatibility function""" - knowledge_base = knowledge_base_engine.KnowledgeBase() - test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( - [], knowledge_base) - - # Compatible Key. - key_path = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control' - compatible_key = test_filter_file._CheckKeyCompatibility(key_path) - self.assertTrue(compatible_key) - - # NOT a Compatible Key. - key_path = 'HKEY_USERS\\S-1-5-18' - compatible_key = test_filter_file._CheckKeyCompatibility(key_path) - self.assertFalse(compatible_key) - - def testBuildFindSpecsFromFileArtifact(self): - """Tests the BuildFindSpecsFromFileArtifact function for file artifacts.""" - knowledge_base = knowledge_base_engine.KnowledgeBase() - test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( - [], knowledge_base) - - separator = '\\' - user_accounts = [] - - # Test expansion of environment variables. - path_entry = '%%environ_systemroot%%\\test_data\\*.evtx' - environment_variable = [artifacts.EnvironmentVariableArtifact( - case_sensitive=False, name='SystemRoot', value='C:\\Windows')] - - find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts) - - # Should build 1 find_spec. - self.assertEqual(len(find_specs), 1) - - # Location segments should be equivalent to \Windows\test_data\*.evtx. - path_segments = ['Windows', 'test\\_data', '.*\\.evtx'] - self.assertEqual(find_specs[0]._location_segments, path_segments) - - # Test expansion of globs. - path_entry = '\\test_data\\**' - find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts) - - # Glob expansion should by default recurse ten levels. - self.assertEqual(len(find_specs), 10) - - # Last entry in find_specs list should be 10 levels of depth. - path_segments = [ - 'test\\_data', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', - '.*'] - self.assertEqual(find_specs[9]._location_segments, path_segments) - - # Test expansion of user home directories - separator = '/' - testuser1 = artifacts.UserAccountArtifact( - user_directory='/homes/testuser1', username='testuser1') - testuser2 = artifacts.UserAccountArtifact( - user_directory='/home/testuser2', username='testuser2') - user_accounts = [testuser1, testuser2] - path_entry = '%%users.homedir%%/.thumbnails/**3' - - find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts) - - # Six total find specs should be created for testuser1 and testuser2. - self.assertEqual(len(find_specs), 6) - - # Last entry in find_specs list should be testuser2 with a depth of 3 - path_segments = ['home', 'testuser2', '\\.thumbnails', '.*', '.*', '.*'] - self.assertEqual(find_specs[5]._location_segments, path_segments) - - # Test Windows path with profile directories and globs with a depth of 4. - separator = '\\' - testuser1 = artifacts.UserAccountArtifact( - user_directory='\\Users\\\\testuser1', username='testuser1') - testuser2 = artifacts.UserAccountArtifact( - user_directory='\\Users\\\\testuser2', username='testuser2') - user_accounts = [testuser1, testuser2] - path_entry = '%%users.homedir%%\\AppData\\**4' - - find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts) - - # Eight find specs should be created for testuser1 and testuser2. - self.assertEqual(len(find_specs), 8) - - # Last entry in find_specs list should be testuser2, with a depth of 4. - path_segments = ['Users', 'testuser2', 'AppData', '.*', '.*', '.*', '.*'] - self.assertEqual(find_specs[7]._location_segments, path_segments) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/parsers/winreg.py b/tests/parsers/winreg.py index 82856573ea..45c3214ede 100644 --- a/tests/parsers/winreg.py +++ b/tests/parsers/winreg.py @@ -6,6 +6,11 @@ import unittest +from artifacts import reader as artifacts_reader +from artifacts import registry as artifacts_registry + +from plaso.engine import artifact_filters +from plaso.engine import knowledge_base as knowledge_base_engine from plaso.parsers import winreg # Register all plugins. from plaso.parsers import winreg_plugins # pylint: disable=unused-import @@ -102,6 +107,55 @@ def testParseSystem(self): parser_chain = self._PluginNameToParserChain('windows_services') self.assertEqual(parser_chains.get(parser_chain, 0), 831) + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) + @shared_test_lib.skipUnlessHasTestFile(['SYSTEM']) + def testParseSystemWithArtifactFilters(self): + """Tests the Parse function on a SYSTEM file with artifact filters.""" + parser = winreg.WinRegistryParser() + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifacts_filters = ['TestRegistryKey', 'TestRegistryValue'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactFilters( + registry, artifacts_filters, knowledge_base) + + test_filter_file.BuildFindSpecs(environment_variables=None) + + find_specs = { + artifact_filters.ARTIFACT_FILTERS : knowledge_base.GetValue( + artifact_filters.ARTIFACT_FILTERS)} + storage_writer = self._ParseFile(['SYSTEM'], parser, + knowledge_base_values=find_specs) + + events = list(storage_writer.GetEvents()) + + parser_chains = self._GetParserChains(events) + + # Check the existence of few known plugins, see if they + # are being properly picked up and are parsed. + plugin_names = [ + 'windows_usbstor_devices', 'windows_boot_execute', + 'windows_services'] + for plugin in plugin_names: + expected_parser_chain = self._PluginNameToParserChain(plugin) + self.assertTrue( + expected_parser_chain in parser_chains, + 'Chain {0:s} not found in events.'.format(expected_parser_chain)) + + # Check that the number of events produced by each plugin are correct. + parser_chain = self._PluginNameToParserChain('windows_usbstor_devices') + self.assertEqual(parser_chains.get(parser_chain, 0), 10) + + parser_chain = self._PluginNameToParserChain('windows_boot_execute') + self.assertEqual(parser_chains.get(parser_chain, 0), 4) + + parser_chain = self._PluginNameToParserChain('windows_services') + self.assertEqual(parser_chains.get(parser_chain, 0), 831) + if __name__ == '__main__': unittest.main()