From 8417ac855df97de94bc958e0dc8acf2ccbb7bbe3 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Tue, 13 Oct 2015 19:55:40 +0200 Subject: [PATCH] Code review: 267200043: Moved lib.storage to storage.zip_file #102 --- config/dpkg/changelog | 2 +- docs/plaso.lib.rst | 8 - docs/plaso.storage.rst | 8 + plaso/__init__.py | 2 +- plaso/cli/tools.py | 30 ++ plaso/frontend/analysis_frontend.py | 15 +- plaso/frontend/extraction_frontend.py | 4 +- plaso/frontend/log2timeline.py | 2 +- plaso/frontend/psort.py | 418 +++++++++--------- plaso/output/manager.py | 33 +- plaso/output/pstorage.py | 4 +- plaso/storage/writer.py | 4 +- plaso/{lib/storage.py => storage/zip_file.py} | 263 +++++------ tests/cli/tools.py | 8 + tests/frontend/analysis_frontend.py | 4 +- tests/frontend/extraction_frontend.py | 5 +- tests/frontend/psort.py | 19 +- tests/output/interface.py | 16 +- tests/output/manager.py | 21 +- tests/output/pstorage.py | 10 +- tests/{lib/storage.py => storage/zip_file.py} | 44 +- tools/log2timeline.py | 12 +- tools/psort.py | 27 +- tools/psort_test.py | 8 +- 24 files changed, 503 insertions(+), 464 deletions(-) rename plaso/{lib/storage.py => storage/zip_file.py} (96%) rename tests/{lib/storage.py => storage/zip_file.py} (87%) diff --git a/config/dpkg/changelog b/config/dpkg/changelog index 0b424dda14..27b905d5df 100644 --- a/config/dpkg/changelog +++ b/config/dpkg/changelog @@ -2,4 +2,4 @@ python-plaso (1.3.1-1) unstable; urgency=low * Auto-generated - -- Log2Timeline Mon, 12 Oct 2015 08:05:59 +0200 + -- Log2Timeline Tue, 13 Oct 2015 19:55:40 +0200 diff --git a/docs/plaso.lib.rst b/docs/plaso.lib.rst index 54cb3b5174..79afbe70ff 100644 --- a/docs/plaso.lib.rst +++ b/docs/plaso.lib.rst @@ -108,14 +108,6 @@ plaso.lib.specification module :undoc-members: :show-inheritance: -plaso.lib.storage module ------------------------- - -.. automodule:: plaso.lib.storage - :members: - :undoc-members: - :show-inheritance: - plaso.lib.timelib module ------------------------ diff --git a/docs/plaso.storage.rst b/docs/plaso.storage.rst index 74328fa600..aa5da90677 100644 --- a/docs/plaso.storage.rst +++ b/docs/plaso.storage.rst @@ -28,6 +28,14 @@ plaso.storage.writer module :undoc-members: :show-inheritance: +plaso.storage.zip_file module +----------------------------- + +.. automodule:: plaso.storage.zip_file + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/plaso/__init__.py b/plaso/__init__.py index a82655f1e4..21a17aafe6 100644 --- a/plaso/__init__.py +++ b/plaso/__init__.py @@ -3,7 +3,7 @@ __version__ = '1.3.1' VERSION_DEV = True -VERSION_DATE = '20151012' +VERSION_DATE = '20151013' def GetVersion(): diff --git a/plaso/cli/tools.py b/plaso/cli/tools.py index da01a3617c..391959c1a1 100644 --- a/plaso/cli/tools.py +++ b/plaso/cli/tools.py @@ -244,6 +244,36 @@ def AddTimezoneOption(self, argument_group): u'determined automatically where possible. Use "-z list" to ' u'see a list of available timezones.')) + def GetCommandLineArguments(self): + """Retrieves the command line arguments. + + Returns: + A string containing the command line arguments. + """ + command_line_arguments = sys.argv + if isinstance(command_line_arguments, py2to3.BYTES_TYPE): + encoding = sys.stdin.encoding + + # Note that sys.stdin.encoding can be None. + if not encoding: + encoding = self.preferred_encoding + + try: + command_line_arguments = [ + argument.decode(encoding) for argument in command_line_arguments] + + except UnicodeDecodeError: + logging.error( + u'Unable to properly read command line input due to encoding ' + u'error. Replacing non Basic Latin (C0) characters with "?" or ' + u'"\\ufffd".') + + command_line_arguments = [ + argument.decode(encoding, errors=u'replace') + for argument in command_line_arguments] + + return u' '.join(command_line_arguments) + def ListTimeZones(self): """Lists the timezones.""" max_length = 0 diff --git a/plaso/frontend/analysis_frontend.py b/plaso/frontend/analysis_frontend.py index c08065023e..15d3997dc0 100644 --- a/plaso/frontend/analysis_frontend.py +++ b/plaso/frontend/analysis_frontend.py @@ -3,7 +3,7 @@ from plaso.formatters import mediator as formatters_mediator from plaso.frontend import frontend -from plaso.lib import storage +from plaso.storage import zip_file as storage_zip_file class AnalysisFrontend(frontend.Frontend): @@ -13,7 +13,6 @@ def __init__(self): """Initializes the front-end object.""" super(AnalysisFrontend, self).__init__() self._data_location = None - self._storage_file = None def GetFormatterMediator(self): """Retrieves the formatter mediator. @@ -30,12 +29,12 @@ def OpenStorage(self, storage_file_path, read_only=True): Args: storage_file_path: the path of the storage file. read_only: optional boolean value to indicate the storage file should - be opened in read-only mode. The default is True. + be opened in read-only mode. Returns: The storage file object (instance of StorageFile). """ - return storage.StorageFile(storage_file_path, read_only=read_only) + return storage_zip_file.StorageFile(storage_file_path, read_only=read_only) def SetDataLocation(self, data_location): """Set the data location. @@ -45,11 +44,3 @@ def SetDataLocation(self, data_location): from. """ self._data_location = data_location - - def SetStorageFile(self, storage_file): - """Set the storage file. - - Args: - storage_file: The path to the storage file being parsed. - """ - self._storage_file = storage_file diff --git a/plaso/frontend/extraction_frontend.py b/plaso/frontend/extraction_frontend.py index 91870e2c8d..fd6bfd3e0b 100644 --- a/plaso/frontend/extraction_frontend.py +++ b/plaso/frontend/extraction_frontend.py @@ -19,12 +19,12 @@ from plaso.lib import definitions from plaso.lib import errors from plaso.lib import event -from plaso.lib import storage from plaso.lib import timelib from plaso.multi_processing import multi_process from plaso.hashers import manager as hashers_manager from plaso.parsers import manager as parsers_manager from plaso.storage import writer as storage_writer +from plaso.storage import zip_file as storage_zip_file import pytz @@ -166,7 +166,7 @@ def _PreprocessSource(self, source_path_specs, source_type): if self._use_old_preprocess and os.path.isfile(self._storage_file_path): # Check if the storage file contains a preprocessing object. try: - with storage.StorageFile( + with storage_zip_file.StorageFile( self._storage_file_path, read_only=True) as storage_file: storage_information = storage_file.GetStorageInformation() diff --git a/plaso/frontend/log2timeline.py b/plaso/frontend/log2timeline.py index d0e56db874..a0f110dc0d 100644 --- a/plaso/frontend/log2timeline.py +++ b/plaso/frontend/log2timeline.py @@ -60,7 +60,7 @@ def _GetOutputModulesInformation(self): A list of tuples of output module names and descriptions. """ output_modules_information = [] - for name, description in sorted(output_manager.OutputManager.GetOutputs()): + for name, description in output_manager.OutputManager.GetOutputClasses(): output_modules_information.append((name, description)) return output_modules_information diff --git a/plaso/frontend/psort.py b/plaso/frontend/psort.py index 2de30dafaa..26bfa5115a 100644 --- a/plaso/frontend/psort.py +++ b/plaso/frontend/psort.py @@ -5,7 +5,6 @@ import collections import multiprocessing import logging -import sys from plaso import formatters # pylint: disable=unused-import from plaso import output # pylint: disable=unused-import @@ -47,8 +46,6 @@ def __init__(self): self._filter_buffer = None self._filter_expression = None self._filter_object = None - self._output_filename = None - self._output_file_object = None self._output_format = None self._preferred_language = u'en-US' self._quiet_mode = False @@ -100,6 +97,15 @@ def GetAnalysisPluginInfo(self): """ return analysis_manager.AnalysisPluginManager.GetAllPluginInformation() + def GetDisabledOutputClasses(self): + """Retrieves the disabled output classes. + + Returns: + An output module generator which yields tuples of output class names + and type object. + """ + return output_manager.OutputManager.GetDisabledOutputClasses() + def GetOutputClasses(self): """Retrieves the available output classes. @@ -187,40 +193,109 @@ def _ProcessAnalysisPlugins( for item, value in analysis_queue_consumer.counter.iteritems(): counter[item] = value - def SetFilter(self, filter_object, filter_expression): - """Set the filter information. + def _StartAnalysisPlugins( + self, storage_file_path, analysis_plugins, pre_obj, + analysis_queue_port=None, analysis_report_incoming_queue=None, + command_line_arguments=None): + """Start all the analysis plugin. Args: - filter_object: a filter object (instance of FilterObject). - filter_expression: the filter expression string. + storage_file_path: string containing the path of the storage file. + analysis_plugins: list of analysis plugin objects (instance of + AnalysisPlugin) that should be started. + pre_obj: The preprocessor object (instance of PreprocessObject). + analysis_queue_port: optional TCP port that the ZeroMQ analysis report + queues should use. + analysis_report_incoming_queue: optional queue (instance of Queue) that + reports should to pushed to, when ZeroMQ + is not in use. + command_line_arguments: optional string of the command line arguments or + None if not set. """ - self._filter_object = filter_object - self._filter_expression = filter_expression + logging.info(u'Starting analysis plugins.') + self._SetAnalysisPluginProcessInformation( + storage_file_path, analysis_plugins, pre_obj, + command_line_arguments=command_line_arguments) - def SetPreferredLanguageIdentifier(self, language_identifier): - """Sets the preferred language identifier. + knowledge_base_object = knowledge_base.KnowledgeBase(pre_obj=pre_obj) + for analysis_plugin in analysis_plugins: + if self._use_zeromq: + analysis_plugin_output_queue = zeromq_queue.ZeroMQPushConnectQueue( + delay_start=True, port=analysis_queue_port) + else: + analysis_plugin_output_queue = analysis_report_incoming_queue - Args: - language_identifier: the language identifier string e.g. en-US for - US English or is-IS for Icelandic. - """ - self._preferred_language = language_identifier + analysis_report_queue_producer = queue.ItemQueueProducer( + analysis_plugin_output_queue) - def SetOutputFilename(self, output_filename): - """Sets the output format. + completion_event = multiprocessing.Event() + analysis_mediator_object = analysis_mediator.AnalysisMediator( + analysis_report_queue_producer, knowledge_base_object, + data_location=self._data_location, + completion_event=completion_event) + analysis_process = multiprocessing.Process( + name=u'Analysis {0:s}'.format(analysis_plugin.plugin_name), + target=analysis_plugin.RunPlugin, + args=(analysis_mediator_object,)) + + process_info = PsortAnalysisProcess( + completion_event, analysis_plugin, analysis_process) + self._analysis_process_info.append(process_info) + + analysis_process.start() + logging.info( + u'Plugin: [{0:s}] started.'.format(analysis_plugin.plugin_name)) + + logging.info(u'Analysis plugins running') + + def _SetAnalysisPluginProcessInformation( + self, storage_file_path, analysis_plugins, pre_obj, + command_line_arguments=None): + """Sets analysis plugin options in a preprocessor object. Args: - output_filename: the output filename. + storage_file_path: string containing the path of the storage file. + analysis_plugins: the list of analysis plugins to add. + pre_obj: the preprocessor object (instance of PreprocessObject). + command_line_arguments: optional string of the command line arguments or + None if not set. """ - self._output_filename = output_filename + analysis_plugin_names = [plugin.NAME for plugin in analysis_plugins] + time_of_run = timelib.Timestamp.GetNow() - def SetOutputFormat(self, output_format): - """Sets the output format. + pre_obj.collection_information[u'cmd_line'] = command_line_arguments + pre_obj.collection_information[u'file_processed'] = storage_file_path + pre_obj.collection_information[u'method'] = u'Running Analysis Plugins' + pre_obj.collection_information[u'plugins'] = analysis_plugin_names + pre_obj.collection_information[u'time_of_run'] = time_of_run + pre_obj.counter = collections.Counter() + + # TODO: fix docstring, function does not create the pre_obj the call to + # storage does. Likely refactor this functionality into the storage API. + def _GetLastGoodPreprocess(self, storage_file): + """Gets the last stored preprocessing object with time zone information. + + From all preprocessing objects, try to get the last one that has + time zone information stored in it, the highest chance of it containing + the information we are seeking (defaulting to the last one). If there are + no preprocessing objects in the file, we'll make a new one Args: - output_format: the output format. + storage_file: a Plaso storage file object. + + Returns: + A preprocess object (instance of PreprocessObject), or None if there are + no preprocess objects in the storage file. """ - self._output_format = output_format + pre_objs = storage_file.GetStorageInformation() + if not pre_objs: + return None + pre_obj = pre_objs[-1] + for obj in pre_objs: + if getattr(obj, u'time_zone_str', u''): + pre_obj = obj + + return pre_obj def GetOutputModule( self, storage_file, preferred_encoding=u'utf-8', timezone=pytz.UTC): @@ -307,35 +382,102 @@ def GetAnalysisPluginsAndEventQueues(self, analysis_plugins_string): return analysis_plugins, event_producers - def SetQuietMode(self, quiet_mode=False): - """Sets whether tools is in quiet mode or not. + def ProcessEventsFromStorage( + self, storage_file, output_buffer, my_filter=None, filter_buffer=None, + analysis_queues=None): + """Reads event objects from the storage to process and filter them. Args: - quiet_mode: boolean, when True the tool is in quiet mode. + storage_file: the storage file object (instance of StorageFile). + output_buffer: the output buffer object (instance of EventBuffer). + my_filter: optional filter object (instance of PFilter). + filter_buffer: optional filter buffer used to store previously discarded + events to store time slice history. + analysis_queues: optional list of analysis queues. + + Returns: + A Counter object (instance of collections.Counter), that tracks the + number of unique events extracted from storage. """ - self._quiet_mode = quiet_mode + counter = collections.Counter() + my_limit = getattr(my_filter, u'limit', 0) + forward_entries = 0 + if not analysis_queues: + analysis_queues = [] - def SetUseZeroMQ(self, use_zeromq=False): - """Sets whether the tool is using ZeroMQ for queueing or not. + event_object = storage_file.GetSortedEntry() + while event_object: + if my_filter: + event_match = event_object + if isinstance(event_object, plaso_storage_pb2.EventObject): + # TODO: move serialization to storage, if low-level filtering is + # needed storage should provide functions for it. + serializer = protobuf_serializer.ProtobufEventObjectSerializer + event_match = serializer.ReadSerialized(event_object) - Args: - use_zeromq: boolean, when True the tool will use ZeroMQ for queuing. - """ - self._use_zeromq = use_zeromq + if my_filter.Match(event_match): + counter[u'Events Included'] += 1 + if filter_buffer: + # Indicate we want forward buffering. + forward_entries = 1 + # Empty the buffer. + for event_in_buffer in filter_buffer.Flush(): + counter[u'Events Added From Slice'] += 1 + counter[u'Events Included'] += 1 + counter[u'Events Filtered Out'] -= 1 + self._AppendEvent(event_in_buffer, output_buffer, analysis_queues) + self._AppendEvent(event_object, output_buffer, analysis_queues) + if my_limit: + if counter[u'Events Included'] == my_limit: + break + else: + if filter_buffer and forward_entries: + if forward_entries <= filter_buffer.size: + self._AppendEvent(event_object, output_buffer, analysis_queues) + forward_entries += 1 + counter[u'Events Added From Slice'] += 1 + counter[u'Events Included'] += 1 + else: + # Reached the max, don't include other entries. + forward_entries = 0 + counter[u'Events Filtered Out'] += 1 + elif filter_buffer: + filter_buffer.Append(event_object) + counter[u'Events Filtered Out'] += 1 + else: + counter[u'Events Filtered Out'] += 1 + else: + counter[u'Events Included'] += 1 + self._AppendEvent(event_object, output_buffer, analysis_queues) + + event_object = storage_file.GetSortedEntry() + + for analysis_queue in analysis_queues: + analysis_queue.Close() + if output_buffer.duplicate_counter: + counter[u'Duplicate Removals'] = output_buffer.duplicate_counter + + if my_limit: + counter[u'Limited By'] = my_limit + return counter def ProcessStorage( - self, output_module, storage_file, analysis_plugins, - event_queue_producers, deduplicate_events=True, - preferred_encoding=u'utf-8', time_slice=None, use_time_slicer=False): + self, output_module, storage_file, storage_file_path, analysis_plugins, + event_queue_producers, command_line_arguments=None, + deduplicate_events=True, preferred_encoding=u'utf-8', + time_slice=None, use_time_slicer=False): """Processes a plaso storage file. Args: output_module: an output module (instance of OutputModule). storage_file: the storage file object (instance of StorageFile). + storage_file_path: string containing the path of the storage file. analysis_plugins: list of analysis plugin objects (instance of AnalysisPlugin). event_queue_producers: list of event queue producer objects (instance of ItemQueueProducer). + command_line_arguments: optional string of the command line arguments or + None if not set. deduplicate_events: optional boolean value to indicate if the event objects should be deduplicated. The default is True. preferred_encoding: optional preferred encoding. The default is "utf-8". @@ -381,8 +523,11 @@ def ProcessStorage( if analysis_plugins: self._StartAnalysisPlugins( - analysis_plugins, pre_obj, preferred_encoding, analysis_queue_port, - analysis_report_incoming_queue) + storage_file_path, analysis_plugins, pre_obj, + analysis_queue_port=analysis_queue_port, + analysis_report_incoming_queue=analysis_report_incoming_queue, + command_line_arguments=command_line_arguments) + # Assign the preprocessing object to the storage. # This is normally done in the construction of the storage object, # however we cannot do that here since the preprocessing object is @@ -414,10 +559,6 @@ def ProcessStorage( analysis_plugins, analysis_report_incoming_queue, storage_file, counter, preferred_encoding=preferred_encoding) - if self._output_file_object: - self._output_file_object.close() - self._output_file_object = None - if self._filter_object and not counter[u'Limited By']: counter[u'Filter By Date'] = ( counter[u'Stored Events'] - counter[u'Events Included'] - @@ -425,189 +566,48 @@ def ProcessStorage( return counter - def _StartAnalysisPlugins( - self, analysis_plugins, pre_obj, preferred_encoding=u'utf-8', - analysis_queue_port=None, analysis_report_incoming_queue=None): - """Start all the analysis plugin. + def SetFilter(self, filter_object, filter_expression): + """Set the filter information. Args: - analysis_plugins: list of analysis plugin objects (instance of - AnalysisPlugin) that should be started. - pre_obj: The preprocessor object (instance of PreprocessObject). - preferred_encoding: optional preferred encoding to use for the preprocess - object. - analysis_queue_port: optional TCP port that the ZeroMQ analysis report - queues should use. - analysis_report_incoming_queue: optional queue (instance of Queue) that - reports should to pushed to, when ZeroMQ - is not in use. + filter_object: a filter object (instance of FilterObject). + filter_expression: the filter expression string. """ - logging.info(u'Starting analysis plugins.') - self._SetAnalysisPluginProcessInformation( - analysis_plugins, pre_obj, preferred_encoding) - - knowledge_base_object = knowledge_base.KnowledgeBase(pre_obj=pre_obj) - for analysis_plugin in analysis_plugins: - if self._use_zeromq: - analysis_plugin_output_queue = zeromq_queue.ZeroMQPushConnectQueue( - delay_start=True, port=analysis_queue_port) - else: - analysis_plugin_output_queue = analysis_report_incoming_queue - - analysis_report_queue_producer = queue.ItemQueueProducer( - analysis_plugin_output_queue) - - completion_event = multiprocessing.Event() - analysis_mediator_object = analysis_mediator.AnalysisMediator( - analysis_report_queue_producer, knowledge_base_object, - data_location=self._data_location, - completion_event=completion_event) - analysis_process = multiprocessing.Process( - name=u'Analysis {0:s}'.format(analysis_plugin.plugin_name), - target=analysis_plugin.RunPlugin, - args=(analysis_mediator_object,)) - - process_info = PsortAnalysisProcess( - completion_event, analysis_plugin, analysis_process) - self._analysis_process_info.append(process_info) - - analysis_process.start() - logging.info( - u'Plugin: [{0:s}] started.'.format(analysis_plugin.plugin_name)) - - logging.info(u'Analysis plugins running') + self._filter_object = filter_object + self._filter_expression = filter_expression - def _SetAnalysisPluginProcessInformation( - self, analysis_plugins, pre_obj, preferred_encoding): - """Sets analysis plugin options in a preprocessor object. + def SetPreferredLanguageIdentifier(self, language_identifier): + """Sets the preferred language identifier. Args: - analysis_plugins: the list of analysis plugins to add. - pre_obj: the preprocessor object (instance of PreprocessObject). - preferred_encoding: the preferred encoding to use for the preprocess - object. + language_identifier: the language identifier string e.g. en-US for + US English or is-IS for Icelandic. """ - # TODO: We shouldn't be touching the command line here, move to tools - if preferred_encoding: - cmd_line = u' '.join(sys.argv) - try: - pre_obj.collection_information[u'cmd_line'] = cmd_line.decode( - preferred_encoding) - except UnicodeDecodeError: - pass - pre_obj.collection_information[u'file_processed'] = ( - self._storage_file) - pre_obj.collection_information[u'method'] = u'Running Analysis Plugins' - analysis_plugin_names = [plugin.NAME for plugin in analysis_plugins] - pre_obj.collection_information[u'plugins'] = analysis_plugin_names - time_of_run = timelib.Timestamp.GetNow() - pre_obj.collection_information[u'time_of_run'] = time_of_run - pre_obj.counter = collections.Counter() - - def _GetLastGoodPreprocess(self, storage_file): - """Gets the last stored preprocessing object with time zone information. + self._preferred_language = language_identifier - From all preprocessing objects, try to get the last one that has - time zone information stored in it, the highest chance of it containing - the information we are seeking (defaulting to the last one). If there are - no preprocessing objects in the file, we'll make a new one + def SetOutputFormat(self, output_format): + """Sets the output format. Args: - storage_file: a Plaso storage file object. - - Returns: - A preprocess object (instance of PreprocessObject), or None if there are - no preprocess objects in the storage file. + output_format: the output format. """ - pre_objs = storage_file.GetStorageInformation() - if not pre_objs: - return None - pre_obj = pre_objs[-1] - for obj in pre_objs: - if getattr(obj, u'time_zone_str', u''): - pre_obj = obj - - return pre_obj + self._output_format = output_format - def ProcessEventsFromStorage( - self, storage_file, output_buffer, my_filter=None, filter_buffer=None, - analysis_queues=None): - """Reads event objects from the storage to process and filter them. + def SetQuietMode(self, quiet_mode=False): + """Sets whether tools is in quiet mode or not. Args: - storage_file: the storage file object (instance of StorageFile). - output_buffer: the output buffer object (instance of EventBuffer). - my_filter: optional filter object (instance of PFilter). - The default is None. - filter_buffer: optional filter buffer used to store previously discarded - events to store time slice history. The default is None. - analysis_queues: optional list of analysis queues. The default is None. - - Returns: - A Counter object (instance of collections.Counter), that tracks the - number of unique events extracted from storage. + quiet_mode: boolean, when True the tool is in quiet mode. """ - counter = collections.Counter() - my_limit = getattr(my_filter, u'limit', 0) - forward_entries = 0 - if not analysis_queues: - analysis_queues = [] - - event_object = storage_file.GetSortedEntry() - while event_object: - if my_filter: - event_match = event_object - if isinstance(event_object, plaso_storage_pb2.EventObject): - # TODO: move serialization to storage, if low-level filtering is - # needed storage should provide functions for it. - serializer = protobuf_serializer.ProtobufEventObjectSerializer - event_match = serializer.ReadSerialized(event_object) - - if my_filter.Match(event_match): - counter[u'Events Included'] += 1 - if filter_buffer: - # Indicate we want forward buffering. - forward_entries = 1 - # Empty the buffer. - for event_in_buffer in filter_buffer.Flush(): - counter[u'Events Added From Slice'] += 1 - counter[u'Events Included'] += 1 - counter[u'Events Filtered Out'] -= 1 - self._AppendEvent(event_in_buffer, output_buffer, analysis_queues) - self._AppendEvent(event_object, output_buffer, analysis_queues) - if my_limit: - if counter[u'Events Included'] == my_limit: - break - else: - if filter_buffer and forward_entries: - if forward_entries <= filter_buffer.size: - self._AppendEvent(event_object, output_buffer, analysis_queues) - forward_entries += 1 - counter[u'Events Added From Slice'] += 1 - counter[u'Events Included'] += 1 - else: - # Reached the max, don't include other entries. - forward_entries = 0 - counter[u'Events Filtered Out'] += 1 - elif filter_buffer: - filter_buffer.Append(event_object) - counter[u'Events Filtered Out'] += 1 - else: - counter[u'Events Filtered Out'] += 1 - else: - counter[u'Events Included'] += 1 - self._AppendEvent(event_object, output_buffer, analysis_queues) - - event_object = storage_file.GetSortedEntry() + self._quiet_mode = quiet_mode - for analysis_queue in analysis_queues: - analysis_queue.Close() - if output_buffer.duplicate_counter: - counter[u'Duplicate Removals'] = output_buffer.duplicate_counter + def SetUseZeroMQ(self, use_zeromq=False): + """Sets whether the tool is using ZeroMQ for queueing or not. - if my_limit: - counter[u'Limited By'] = my_limit - return counter + Args: + use_zeromq: boolean, when True the tool will use ZeroMQ for queuing. + """ + self._use_zeromq = use_zeromq class PsortAnalysisReportQueueConsumer(queue.ItemQueueConsumer): diff --git a/plaso/output/manager.py b/plaso/output/manager.py index ec3f46e34c..c0af2f2338 100644 --- a/plaso/output/manager.py +++ b/plaso/output/manager.py @@ -9,23 +9,20 @@ class OutputManager(object): _output_classes = {} @classmethod - def DeregisterOutput(cls, output_class, disabled=False): + def DeregisterOutput(cls, output_class): """Deregisters an output class. The output classes are identified based on their NAME attribute. Args: output_class: the class object of the output module. - disabled: boolean determining whether the output module is - disabled due to the module not loading correctly or - not. Defaults to False. Raises: KeyError: if output class is not set for the corresponding data type. """ output_class_name = output_class.NAME.lower() - if disabled: + if output_class_name in cls._disabled_output_classes: class_dict = cls._disabled_output_classes else: class_dict = cls._output_classes @@ -39,12 +36,13 @@ def DeregisterOutput(cls, output_class, disabled=False): @classmethod def GetDisabledOutputClasses(cls): - """Retrieves the disabled output classes. + """Retrieves the disabled output classes and its associated name. - Returns: - A list of the disabled output classes (types of OutputModule). + Yields: + A tuple of output class name and type object (subclass of OutputModule). """ - return cls._disabled_output_classes.values() + for _, output_class in iter(cls._disabled_output_classes.items()): + yield output_class.NAME, output_class @classmethod def GetOutputClass(cls, name): @@ -72,25 +70,14 @@ def GetOutputClass(cls, name): @classmethod def GetOutputClasses(cls): - """Retrieves the available output classes. + """Retrieves the available output classes its associated name. Yields: - A tuple of output class name and type object. + A tuple of output class name and type object (subclass of OutputModule). """ - for output_class in cls._output_classes.itervalues(): + for _, output_class in iter(cls._output_classes.items()): yield output_class.NAME, output_class - # TODO: deprecate in favor of GetOutputClasses. - @classmethod - def GetOutputs(cls): - """Retrieves the available output classes. - - Yields: - A tuple of output class name and description. - """ - for output_class in cls._output_classes.itervalues(): - yield output_class.NAME, output_class.DESCRIPTION - @classmethod def HasOutputClass(cls, name): """Determines if a specific output class is registered with the manager. diff --git a/plaso/output/pstorage.py b/plaso/output/pstorage.py index 48f231e9e0..bf2eae58b4 100644 --- a/plaso/output/pstorage.py +++ b/plaso/output/pstorage.py @@ -2,10 +2,10 @@ """Implements a StorageFile output module.""" from plaso.lib import event -from plaso.lib import storage from plaso.lib import timelib from plaso.output import interface from plaso.output import manager +from plaso.storage import zip_file class PlasoStorageOutputModule(interface.OutputModule): @@ -42,7 +42,7 @@ def Open(self): if storage_file_path: pre_obj.collection_information[u'file_processed'] = storage_file_path - self._storage = storage.StorageFile(self._file_object, pre_obj=pre_obj) + self._storage = zip_file.StorageFile(self._file_object, pre_obj=pre_obj) def SetFilePath(self, file_path): """Sets the file-like object based on the file path. diff --git a/plaso/storage/writer.py b/plaso/storage/writer.py index 251452673a..af8930c9b1 100644 --- a/plaso/storage/writer.py +++ b/plaso/storage/writer.py @@ -3,7 +3,7 @@ from plaso.engine import queue from plaso.lib import definitions -from plaso.lib import storage +from plaso.storage import zip_file as storage_zip_file class StorageWriter(queue.ItemQueueConsumer): @@ -97,7 +97,7 @@ def _ConsumeItem(self, event_object, **unused_kwargs): def _Open(self): """Opens the storage writer.""" - self._storage_file = storage.StorageFile( + self._storage_file = storage_zip_file.StorageFile( self._output_file, buffer_size=self._buffer_size, pre_obj=self._pre_obj, serializer_format=self._serializer_format) diff --git a/plaso/lib/storage.py b/plaso/storage/zip_file.py similarity index 96% rename from plaso/lib/storage.py rename to plaso/storage/zip_file.py index 621a56b603..0c02c6a435 100644 --- a/plaso/lib/storage.py +++ b/plaso/storage/zip_file.py @@ -242,6 +242,7 @@ def __init__( self._file_number = 1 self._first_file_number = None self._max_buffer_size = buffer_size or self.MAX_BUFFER_SIZE + self._merge_buffer = None self._output_file = output_file self._pre_obj = pre_obj self._proto_streams = {} @@ -262,6 +263,8 @@ def __init__( self._profiling_sample = 0 self._serializers_profiler = None + self.store_range = None + def __enter__(self): """Make usable with "with" statement.""" return self @@ -453,106 +456,6 @@ def _GetEventTagIndexValue(self, store_number, store_index, uuid): return tag_index_value - def _GetStreamNames(self): - """Retrieves a generator of the storage stream names.""" - if self._zipfile: - for stream_name in self._zipfile.namelist(): - yield stream_name - - def _GetEventObjectProtobufString(self, stream_number, entry_index=-1): - """Returns a specific event object protobuf string. - - By default the next entry in the appropriate proto file is read - and returned, however any entry can be read using the index file. - - Args: - stream_number: The proto stream number. - entry_index: Read a specific entry in the file. The default is -1, - which represents the next available entry. - - Returns: - A tuple containing the event object protobuf string and the entry index - of the event object protobuf string within the storage file. - - Raises: - EOFError: When we reach the end of the protobuf file. - errors.WrongProtobufEntry: If the probotuf size is too large for storage. - IOError: if the stream cannot be opened. - """ - file_object, last_entry_index = self._GetProtoStream(stream_number) - - if entry_index >= 0: - stream_offset = self._GetProtoStreamOffset(stream_number, entry_index) - if stream_offset is None: - logging.error(( - u'Unable to read entry index: {0:d} from proto stream: ' - u'{1:d}').format(entry_index, stream_number)) - - return None, None - - file_object, last_entry_index = self._GetProtoStreamSeekOffset( - stream_number, entry_index, stream_offset) - - if (not last_entry_index and entry_index == -1 and - self._bound_first is not None): - # We only get here if the following conditions are met: - # 1. last_entry_index is not set (so this is the first read - # from this file). - # 2. There is a lower bound (so we have a date filter). - # 3. The lower bound is higher than zero (basically set to a value). - # 4. We are accessing this function using 'get me the next entry' as an - # opposed to the 'get me entry X', where we just want to server entry - # X. - # - # The purpose: speed seeking into the storage file based on time. Instead - # of spending precious time reading through the storage file and - # deserializing protobufs just to compare timestamps we read a much - # 'cheaper' file, one that only contains timestamps to find the proper - # entry into the storage file. That way we'll get to the right place in - # the file and can start reading protobufs from the right location. - - stream_name = u'plaso_timestamps.{0:06d}'.format(stream_number) - - if stream_name in self._GetStreamNames(): - timestamp_file_object = self._OpenStream(stream_name, u'r') - if timestamp_file_object is None: - raise IOError(u'Unable to open stream: {0:s}'.format(stream_name)) - - index = 0 - timestamp_compare = 0 - encountered_error = False - while timestamp_compare < self._bound_first: - timestamp_raw = timestamp_file_object.read(8) - if len(timestamp_raw) != 8: - encountered_error = True - break - - timestamp_compare = struct.unpack(' self.MAX_PROTO_STRING_SIZE: - raise errors.WrongProtobufEntry( - u'Protobuf string size value exceeds maximum: {0:d}'.format( - proto_string_size)) - - event_object_data = file_object.read(proto_string_size) - self._proto_streams[stream_number] = (file_object, last_entry_index + 1) - - return event_object_data, last_entry_index - def _GetEventGroupProto(self, file_object): """Return a single group entry.""" unpacked = file_object.read(4) @@ -653,17 +556,19 @@ def _GetProtoStreamOffset(self, stream_number, entry_index): # TODO: once cached use the last entry index to determine if the stream # file object should be re-opened. - stream_name = u'plaso_index.{0:06d}'.format(stream_number) - index_file_object = self._OpenStream(stream_name, u'r') + index_stream_name = u'plaso_index.{0:06d}'.format(stream_number) + index_file_object = self._OpenStream(index_stream_name, u'r') if index_file_object is None: - raise IOError(u'Unable to open stream: {0:s}'.format(stream_name)) + raise IOError(u'Unable to open stream: {0:s}'.format(index_stream_name)) # Since zipfile.ZipExtFile is not seekable we need to read upto # the stream offset. - _ = index_file_object.read(entry_index * 4) + index_stream_offset = entry_index * 4 + logging.debug(u'Seeking offset: 0x{0:08x} in index stream: {1:s}'.format( + index_stream_offset, index_stream_name)) + index_file_object.read(index_stream_offset) index_data = index_file_object.read(4) - index_file_object.close() if len(index_data) != 4: @@ -671,6 +576,110 @@ def _GetProtoStreamOffset(self, stream_number, entry_index): return struct.unpack('= 0: + stream_offset = self._GetProtoStreamOffset(stream_number, entry_index) + if stream_offset is None: + logging.error(( + u'Unable to read entry index: {0:d} from proto stream: ' + u'{1:d}').format(entry_index, stream_number)) + + return None, None + + file_object, last_entry_index = self._GetProtoStreamSeekOffset( + stream_number, entry_index, stream_offset) + + if (not last_entry_index and entry_index == -1 and + self._bound_first is not None): + # We only get here if the following conditions are met: + # 1. last_entry_index is not set (so this is the first read + # from this file). + # 2. There is a lower bound (so we have a date filter). + # 3. The lower bound is higher than zero (basically set to a value). + # 4. We are accessing this function using 'get me the next entry' as an + # opposed to the 'get me entry X', where we just want to server entry + # X. + # + # The purpose: speed seeking into the storage file based on time. Instead + # of spending precious time reading through the storage file and + # deserializing protobufs just to compare timestamps we read a much + # 'cheaper' file, one that only contains timestamps to find the proper + # entry into the storage file. That way we'll get to the right place in + # the file and can start reading protobufs from the right location. + + stream_name = u'plaso_timestamps.{0:06d}'.format(stream_number) + + if stream_name in self._GetStreamNames(): + timestamp_file_object = self._OpenStream(stream_name, u'r') + if timestamp_file_object is None: + raise IOError(u'Unable to open stream: {0:s}'.format(stream_name)) + + index = 0 + timestamp_compare = 0 + encountered_error = False + while timestamp_compare < self._bound_first: + timestamp_raw = timestamp_file_object.read(8) + if len(timestamp_raw) != 8: + encountered_error = True + break + + timestamp_compare = struct.unpack(' self.MAX_PROTO_STRING_SIZE: + raise errors.WrongProtobufEntry( + u'Protobuf string size value exceeds maximum: {0:d}'.format( + proto_string_size)) + + event_object_data = file_object.read(proto_string_size) + self._proto_streams[stream_number] = (file_object, last_entry_index + 1) + + return event_object_data, last_entry_index + + def _GetStreamNames(self): + """Retrieves the stream names. + + Yields: + A string containing the stream name. + """ + if self._zipfile: + for stream_name in self._zipfile.namelist(): + yield stream_name + def _OpenStream(self, stream_name, mode='r'): """Opens a stream. @@ -1054,8 +1063,30 @@ def SetStoreLimit(self, unused_my_filter=None): else: logging.debug(u'Store [{0:d}] not used'.format(number)) + def _InitializeMergeBuffer(self): + """Initializes the merge buffer.""" + if self.store_range is None: + number_range = list(self.GetProtoNumbers()) + else: + number_range = self.store_range + + self._merge_buffer = [] + for store_number in number_range: + event_object = self.GetEventObject(store_number) + if not event_object: + return + + while event_object.timestamp < self._bound_first: + event_object = self.GetEventObject(store_number) + if not event_object: + return + + heapq.heappush( + self._merge_buffer, + (event_object.timestamp, store_number, event_object)) + def GetSortedEntry(self): - """Return a sorted entry from the storage file. + """Returns a sorted entry from the storage file. Returns: An event object (instance of EventObject). @@ -1064,22 +1095,8 @@ def GetSortedEntry(self): self._bound_first, self._bound_last = ( pfilter.TimeRangeCache.GetTimeRange()) - if not hasattr(self, u'_merge_buffer'): - self._merge_buffer = [] - number_range = getattr(self, u'store_range', list(self.GetProtoNumbers())) - for store_number in number_range: - event_object = self.GetEventObject(store_number) - if not event_object: - return - - while event_object.timestamp < self._bound_first: - event_object = self.GetEventObject(store_number) - if not event_object: - return - - heapq.heappush( - self._merge_buffer, - (event_object.timestamp, store_number, event_object)) + if self._merge_buffer is None: + self._InitializeMergeBuffer() if not self._merge_buffer: return @@ -1119,7 +1136,7 @@ def GetEventObject(self, stream_number, entry_index=-1): An event object (instance of EventObject) entry read from the file or None if not able to read in a new event. """ - event_object_data, entry_index = self._GetEventObjectProtobufString( + event_object_data, entry_index = self._GetSerializedEventObject( stream_number, entry_index=entry_index) if not event_object_data: return diff --git a/tests/cli/tools.py b/tests/cli/tools.py index 9e25bc4299..7c934adfa3 100644 --- a/tests/cli/tools.py +++ b/tests/cli/tools.py @@ -111,6 +111,14 @@ def testAddTimezoneOption(self): output = argument_parser.format_help() self.assertEqual(output, self._EXPECTED_TIMEZONE_OPTION) + def testGetCommandLineArguments(self): + """Tests the GetCommandLineArguments function.""" + cli_tool = tools.CLITool() + cli_tool.preferred_encoding = u'UTF-8' + + command_line_arguments = cli_tool.GetCommandLineArguments() + self.assertIsNotNone(command_line_arguments) + def testParseStringOption(self): """Tests the ParseStringOption function.""" encoding = sys.stdin.encoding diff --git a/tests/frontend/analysis_frontend.py b/tests/frontend/analysis_frontend.py index 90ee5102b7..0e9e025b01 100644 --- a/tests/frontend/analysis_frontend.py +++ b/tests/frontend/analysis_frontend.py @@ -5,7 +5,7 @@ import unittest from plaso.frontend import analysis_frontend -from plaso.lib import storage +from plaso.storage import zip_file as storage_zip_file from tests.frontend import test_lib @@ -20,7 +20,7 @@ def testOpenStorage(self): storage_file_path = self._GetTestFilePath([u'psort_test.proto.plaso']) storage_file = test_front_end.OpenStorage(storage_file_path) - self.assertIsInstance(storage_file, storage.StorageFile) + self.assertIsInstance(storage_file, storage_zip_file.StorageFile) storage_file.Close() diff --git a/tests/frontend/extraction_frontend.py b/tests/frontend/extraction_frontend.py index a6824542a0..a1284ac1a2 100644 --- a/tests/frontend/extraction_frontend.py +++ b/tests/frontend/extraction_frontend.py @@ -13,7 +13,7 @@ from plaso.frontend import extraction_frontend from plaso.lib import pfilter -from plaso.lib import storage +from plaso.storage import zip_file as storage_zip_file from tests.frontend import test_lib @@ -129,7 +129,8 @@ def testProcessSources(self): test_front_end.ProcessSources([path_spec], source_type) try: - storage_file = storage.StorageFile(storage_file_path, read_only=True) + storage_file = storage_zip_file.StorageFile( + storage_file_path, read_only=True) except IOError: self.fail(u'Not a storage file.') diff --git a/tests/frontend/psort.py b/tests/frontend/psort.py index b23276f100..c9045b6321 100644 --- a/tests/frontend/psort.py +++ b/tests/frontend/psort.py @@ -11,10 +11,10 @@ from plaso.frontend import psort from plaso.lib import event from plaso.lib import pfilter -from plaso.lib import storage from plaso.lib import timelib from plaso.output import interface as output_interface from plaso.output import mediator as output_mediator +from plaso.storage import zip_file as storage_zip_file from tests import test_lib as shared_test_lib from tests.cli import test_lib as cli_test_lib @@ -49,6 +49,8 @@ class PsortTestEventFormatter(formatters_interface.EventFormatter): class TestOutputModule(output_interface.LinearOutputModule): """Test output module.""" + NAME = u'psort_test' + _HEADER = ( u'date,time,timezone,MACB,source,sourcetype,type,user,host,' u'short,desc,version,filename,inode,notes,format,extra\n') @@ -57,7 +59,7 @@ def WriteEventBody(self, event_object): """Writes the body of an event object to the output. Args: - event_object: the event object (instance of EventObject). + event_object: an event object (instance of EventObject). """ message, _ = self._output_mediator.GetFormattedMessages(event_object) source_short, source_long = self._output_mediator.GetFormattedSources( @@ -126,7 +128,8 @@ def testReadEntries(self): pfilter.TimeRangeCache.SetUpperTimestamp(self.last) pfilter.TimeRangeCache.SetLowerTimestamp(self.first) - storage_file = storage.StorageFile(self._test_file_proto, read_only=True) + storage_file = storage_zip_file.StorageFile( + self._test_file_proto, read_only=True) storage_file.SetStoreLimit() event_object = storage_file.GetSortedEntry() @@ -143,7 +146,6 @@ def testReadEntries(self): def testProcessStorage(self): """Test the ProcessStorage function.""" test_front_end = psort.PsortFrontend() - test_front_end.SetOutputFilename(u'output.txt') test_front_end.SetOutputFormat(u'dynamic') test_front_end.SetPreferredLanguageIdentifier(u'en-US') test_front_end.SetQuietMode(True) @@ -155,7 +157,8 @@ def testProcessStorage(self): output_module = test_front_end.GetOutputModule(storage_file) output_module.SetOutputWriter(output_writer) - counter = test_front_end.ProcessStorage(output_module, storage_file, [], []) + counter = test_front_end.ProcessStorage( + output_module, storage_file, storage_file_path, [], []) self.assertEqual(counter[u'Stored Events'], 15) output_writer.SeekToBeginning() @@ -191,13 +194,13 @@ def testOutput(self): with shared_test_lib.TempDirectory() as dirname: temp_file = os.path.join(dirname, u'plaso.db') - storage_file = storage.StorageFile(temp_file, read_only=False) + storage_file = storage_zip_file.StorageFile(temp_file, read_only=False) pfilter.TimeRangeCache.ResetTimeConstraints() storage_file.SetStoreLimit() storage_file.AddEventObjects(events) storage_file.Close() - with storage.StorageFile(temp_file) as storage_file: + with storage_zip_file.StorageFile(temp_file) as storage_file: storage_file.store_range = [1] output_mediator_object = output_mediator.OutputMediator( self._formatter_mediator, storage_file) @@ -247,7 +250,7 @@ def testSetAnalysisPluginProcessInformation(self): preprocess_object = event.PreprocessObject() preprocess_object.SetCollectionInformationValues({}) test_front_end._SetAnalysisPluginProcessInformation( - analysis_plugins, preprocess_object, u'utf-8') + u'', analysis_plugins, preprocess_object) self.assertIsNotNone(preprocess_object) plugin_names = preprocess_object.collection_information[u'plugins'] time_of_run = preprocess_object.collection_information[u'time_of_run'] diff --git a/tests/output/interface.py b/tests/output/interface.py index 2fcaebd370..d209ce1fa1 100644 --- a/tests/output/interface.py +++ b/tests/output/interface.py @@ -109,14 +109,14 @@ def testOutputList(self): """Test listing up all available registered modules.""" manager.OutputManager.RegisterOutput(TestOutputModule) - module_seen = False - for name, description in manager.OutputManager.GetOutputs(): - if name == 'test_xml': - module_seen = True - self.assertEqual(description, ( - u'Test output that provides a simple mocked XML.')) - - self.assertTrue(module_seen) + test_output_class = None + for name, output_class in manager.OutputManager.GetOutputClasses(): + if name == u'test_xml': + test_output_class = output_class + + expected_description = u'Test output that provides a simple mocked XML.' + self.assertIsNotNone(test_output_class) + self.assertEqual(test_output_class.DESCRIPTION, expected_description) manager.OutputManager.DeregisterOutput(TestOutputModule) diff --git a/tests/output/manager.py b/tests/output/manager.py index 4d265d0ce5..6198a8897e 100644 --- a/tests/output/manager.py +++ b/tests/output/manager.py @@ -63,14 +63,14 @@ def testGetOutputClass(self): manager.OutputManager.DeregisterOutput(TestOutput) - def testGetOutputClasses(self): - """Tests the GetOutputClasses function.""" - manager.OutputManager.RegisterOutput(TestOutput) + def testGetDisabledOutputClasses(self): + """Tests the GetDisabledOutputClasses function.""" + manager.OutputManager.RegisterOutput(TestOutput, disabled=True) names = [] output_classes = [] - for name, output_class in manager.OutputManager.GetOutputClasses(): + for name, output_class in manager.OutputManager.GetDisabledOutputClasses(): names.append(name) output_classes.append(output_class) @@ -79,20 +79,19 @@ def testGetOutputClasses(self): manager.OutputManager.DeregisterOutput(TestOutput) - # TODO: remove in favor of testGetOutputClasses. - def testGetOutputs(self): - """Tests the GetOutputs function.""" + def testGetOutputClasses(self): + """Tests the GetOutputClasses function.""" manager.OutputManager.RegisterOutput(TestOutput) names = [] - descriptions = [] + output_classes = [] - for name, description in manager.OutputManager.GetOutputs(): + for name, output_class in manager.OutputManager.GetOutputClasses(): names.append(name) - descriptions.append(description) + output_classes.append(output_class) self.assertIn(u'test_output', names) - self.assertIn(u'This is a test output module.', descriptions) + self.assertIn(TestOutput, output_classes) manager.OutputManager.DeregisterOutput(TestOutput) diff --git a/tests/output/pstorage.py b/tests/output/pstorage.py index 75e94f3819..42b431e994 100644 --- a/tests/output/pstorage.py +++ b/tests/output/pstorage.py @@ -6,9 +6,9 @@ import unittest from plaso.lib import pfilter -from plaso.lib import storage from plaso.output import interface from plaso.output import pstorage +from plaso.storage import zip_file as storage_zip_file from tests import test_lib as shared_test_lib from tests.output import test_lib @@ -29,7 +29,8 @@ def testOutput(self): with shared_test_lib.TempDirectory() as dirname: storage_file = os.path.join(dirname, u'plaso.plaso') # Copy events to pstorage dump. - with storage.StorageFile(self.test_filename, read_only=True) as store: + with storage_zip_file.StorageFile( + self.test_filename, read_only=True) as store: output_mediator = self._CreateOutputMediator(storage_object=store) output_module = pstorage.PlasoStorageOutputModule(output_mediator) output_module.SetFilePath(storage_file) @@ -42,8 +43,9 @@ def testOutput(self): event_object = store.GetSortedEntry() # Make sure original and dump have the same events. - original = storage.StorageFile(self.test_filename, read_only=True) - dump = storage.StorageFile(storage_file, read_only=True) + original = storage_zip_file.StorageFile( + self.test_filename, read_only=True) + dump = storage_zip_file.StorageFile(storage_file, read_only=True) event_object_original = original.GetSortedEntry() event_object_dump = dump.GetSortedEntry() original_list = [] diff --git a/tests/lib/storage.py b/tests/storage/zip_file.py similarity index 87% rename from tests/lib/storage.py rename to tests/storage/zip_file.py index 9c84bbdc8a..3b5e2e2840 100644 --- a/tests/lib/storage.py +++ b/tests/storage/zip_file.py @@ -10,10 +10,10 @@ from plaso.lib import event from plaso.lib import eventdata from plaso.lib import pfilter -from plaso.lib import storage from plaso.lib import timelib from plaso.formatters import winreg # pylint: disable=unused-import from plaso.serializer import protobuf_serializer +from plaso.storage import zip_file from tests import test_lib as shared_test_lib from tests.storage import test_lib @@ -75,31 +75,31 @@ def testStorage(self): serializer = protobuf_serializer.ProtobufEventObjectSerializer with shared_test_lib.TempDirectory() as dirname: - temp_file = os.path.join(dirname, 'plaso.db') - store = storage.StorageFile(temp_file) + temp_file = os.path.join(dirname, u'plaso.db') + store = zip_file.StorageFile(temp_file) store.AddEventObjects(test_event_objects) # Add tagging. tag_1 = event.EventTag() tag_1.store_index = 0 tag_1.store_number = 1 - tag_1.comment = 'My comment' - tag_1.color = 'blue' + tag_1.comment = u'My comment' + tag_1.color = u'blue' tags_mock.append(tag_1) tag_2 = event.EventTag() tag_2.store_index = 1 tag_2.store_number = 1 - tag_2.tags = ['Malware'] - tag_2.color = 'red' + tag_2.tags = [u'Malware'] + tag_2.color = u'red' tags_mock.append(tag_2) tag_3 = event.EventTag() tag_3.store_number = 1 tag_3.store_index = 2 - tag_3.comment = 'This is interesting' - tag_3.tags = ['Malware', 'Benign'] - tag_3.color = 'red' + tag_3.comment = u'This is interesting' + tag_3.tags = [u'Malware', u'Benign'] + tag_3.color = u'red' tags_mock.append(tag_3) store.StoreTagging(tags_mock) @@ -108,18 +108,18 @@ def testStorage(self): tag_4 = event.EventTag() tag_4.store_index = 1 tag_4.store_number = 1 - tag_4.tags = ['Interesting'] + tag_4.tags = [u'Interesting'] store.StoreTagging([tag_4]) group_mock.AddGroup( - 'Malicious', [(1, 1), (1, 2)], desc='Events that are malicious', - color='red', first=1334940286000000, last=1334961526929596, - cat='Malware') + u'Malicious', [(1, 1), (1, 2)], desc=u'Events that are malicious', + color=u'red', first=1334940286000000, last=1334961526929596, + cat=u'Malware') store.StoreGrouping(group_mock) store.Close() - read_store = storage.StorageFile(temp_file, read_only=True) + read_store = zip_file.StorageFile(temp_file, read_only=True) self.assertTrue(read_store.HasTagging()) self.assertTrue(read_store.HasGrouping()) @@ -167,21 +167,21 @@ def testStorage(self): formatter_mediator, tags[0]) self.assertEqual(msg[0:10], u'This is a ') - self.assertEqual(tags[1].tag.tags[0], 'Malware') + self.assertEqual(tags[1].tag.tags[0], u'Malware') msg, _ = formatters_manager.FormattersManager.GetMessageStrings( formatter_mediator, tags[1]) self.assertEqual(msg[0:15], u'[\\HKCU\\Windows\\') self.assertEqual(tags[2].tag.comment, u'This is interesting') - self.assertEqual(tags[2].tag.tags[0], 'Malware') - self.assertEqual(tags[2].tag.tags[1], 'Benign') + self.assertEqual(tags[2].tag.tags[0], u'Malware') + self.assertEqual(tags[2].tag.tags[1], u'Benign') - self.assertEqual(tags[2].parser, 'UNKNOWN') + self.assertEqual(tags[2].parser, u'UNKNOWN') # Test the newly added fourth tag, which should include data from # the first version as well. - self.assertEqual(tags[3].tag.tags[0], 'Interesting') - self.assertEqual(tags[3].tag.tags[1], 'Malware') + self.assertEqual(tags[3].tag.tags[0], u'Interesting') + self.assertEqual(tags[3].tag.tags[1], u'Malware') expected_timestamps = [ 1238934459000000, 1334940286000000, 1334961526929596, 1335966206929596] @@ -225,7 +225,7 @@ def testStorageSort(self): pfilter.TimeRangeCache.ResetTimeConstraints() pfilter.TimeRangeCache.SetUpperTimestamp(last) pfilter.TimeRangeCache.SetLowerTimestamp(first) - store = storage.StorageFile(test_file, read_only=True) + store = zip_file.StorageFile(test_file, read_only=True) store.store_range = [1, 5, 6] diff --git a/tools/log2timeline.py b/tools/log2timeline.py index 54d55ddf5c..b98f1bfd29 100755 --- a/tools/log2timeline.py +++ b/tools/log2timeline.py @@ -26,7 +26,6 @@ from plaso.lib import definitions from plaso.lib import errors from plaso.lib import pfilter -from plaso.lib import py2to3 class Log2TimelineTool(extraction_tool.ExtractionTool): @@ -528,16 +527,7 @@ def ParseArguments(self): return False - command_line_arguments = sys.argv - if isinstance(command_line_arguments, py2to3.BYTES_TYPE): - try: - self._command_line_arguments = [ - argument.decode(self.preferred_encoding) - for argument in command_line_arguments] - except UnicodeDecodeError: - pass - - self._command_line_arguments = u' '.join(command_line_arguments) + self._command_line_arguments = self.GetCommandLineArguments() return True diff --git a/tools/psort.py b/tools/psort.py index 4bbad078a8..286f02d3d8 100755 --- a/tools/psort.py +++ b/tools/psort.py @@ -24,7 +24,6 @@ from plaso.filters import manager as filters_manager from plaso.frontend import psort from plaso.output import interface as output_interface -from plaso.output import manager as output_manager from plaso.lib import errors from plaso.winnt import language_ids @@ -36,7 +35,7 @@ class PsortOptions(object): class PsortTool(analysis_tool.AnalysisTool): """Class that implements the psort CLI tool.""" - _URL = u'http://plaso.kiddaland.net/usage/filters' + _FILTERS_URL = u'http://plaso.kiddaland.net/usage/filters' NAME = u'psort' DESCRIPTION = ( @@ -58,6 +57,7 @@ def __init__(self, input_reader=None, output_writer=None): input_reader=input_reader, output_writer=output_writer) self._analysis_plugins = None self._analysis_plugins_output_format = None + self._command_line_arguments = None self._deduplicate_events = True self._filter_expression = None self._filter_object = None @@ -236,6 +236,7 @@ def _ProcessStorage(self): configuration_object, output_module) missing_parameters = output_module.GetMissingArguments() + # TODO: fix or remove this comment. # Get ANALYSIS PLUGINS AND CONFIGURE! get_plugins_and_producers = self._front_end.GetAnalysisPluginsAndEventQueues analysis_plugins, event_queue_producers = get_plugins_and_producers( @@ -246,11 +247,15 @@ def _ProcessStorage(self): self._options, analysis_plugin) counter = self._front_end.ProcessStorage( - output_module, storage_file, analysis_plugins, event_queue_producers, + output_module, storage_file, self._storage_file_path, + analysis_plugins, event_queue_producers, + command_line_arguments=self._command_line_arguments, deduplicate_events=self._deduplicate_events, preferred_encoding=self.preferred_encoding, time_slice=time_slice, use_time_slicer=self._use_time_slicer) + storage_file.Close() + if not self._quiet_mode: table_view = cli_views.ViewsFactory.GetTableView( self._views_format_type, title=u'Counter') @@ -335,7 +340,8 @@ def AddFilterOptions(self, argument_group): type=str, help=( u'A filter that can be used to filter the dataset before it ' u'is written into storage. More information about the filters ' - u'and how to use them can be found here: {0:s}').format(self._URL)) + u'and how to use them can be found here: {0:s}').format( + self._FILTERS_URL)) def AddLanguageOptions(self, argument_group): """Adds the language options to the argument group. @@ -401,19 +407,19 @@ def ListOutputModules(self): table_view = cli_views.ViewsFactory.GetTableView( self._views_format_type, column_names=[u'Name', u'Description'], title=u'Output Modules') - for name, output_class in sorted(self._front_end.GetOutputClasses()): + for name, output_class in self._front_end.GetOutputClasses(): table_view.AddRow([name, output_class.DESCRIPTION]) table_view.Write(self._output_writer) - disabled_classes = output_manager.OutputManager.GetDisabledOutputClasses() + disabled_classes = list(self._front_end.GetDisabledOutputClasses()) if not disabled_classes: return table_view = cli_views.ViewsFactory.GetTableView( self._views_format_type, column_names=[u'Name', u'Description'], title=u'Disabled Output Modules') - for output_class in disabled_classes: - table_view.AddRow([output_class.NAME, output_class.DESCRIPTION]) + for name, output_class in disabled_classes: + table_view.AddRow([name, output_class.DESCRIPTION]) table_view.Write(self._output_writer) def ParseArguments(self): @@ -580,7 +586,6 @@ def ParseOptions(self, options): self._ParseAnalysisPluginOptions(options) self._ParseExperimentalOptions(options) self._ParseFilterOptions(options) - self._front_end.SetStorageFile(self._storage_file_path) if self._debug_mode: logging_level = logging.DEBUG @@ -604,8 +609,6 @@ def ParseOptions(self, options): self._deduplicate_events = getattr(options, u'dedup', True) self._output_filename = getattr(options, u'write', None) - if self._output_filename: - self._front_end.SetOutputFilename(self._output_filename) if self._data_location: self._front_end.SetDataLocation(self._data_location) @@ -614,6 +617,8 @@ def ParseOptions(self, options): else: logging.warning(u'Unable to automatically determine data location.') + self._command_line_arguments = self.GetCommandLineArguments() + # TODO: refactor this. self._options = options diff --git a/tools/psort_test.py b/tools/psort_test.py index 79230c50c5..5924f21087 100644 --- a/tools/psort_test.py +++ b/tools/psort_test.py @@ -8,6 +8,7 @@ from plaso.cli.helpers import interface as helpers_interface from plaso.cli.helpers import manager as helpers_manager from plaso.lib import errors +from plaso.lib import pfilter from plaso.output import manager as output_manager from tests import test_lib as shared_test_lib from tests.cli import test_lib as cli_test_lib @@ -126,6 +127,10 @@ def testProcessStorageWithMissingParameters(self): helpers_manager.ArgumentHelperManager.RegisterHelper( TestOutputModuleArgumentHelper) + # TODO: this is needed to work around static member issue of pfilter + # which is used in storage file. + pfilter.TimeRangeCache.ResetTimeConstraints() + lines = [] with shared_test_lib.TempDirectory() as temp_directory: temp_file_name = os.path.join(temp_directory, u'output.txt') @@ -135,7 +140,7 @@ def testProcessStorageWithMissingParameters(self): self._test_tool.ProcessStorage() with open(temp_file_name, 'rb') as file_object: - for line in file_object: + for line in file_object.readlines(): lines.append(line.strip()) self.assertTrue(self._input_reader.read_called) @@ -143,6 +148,7 @@ def testProcessStorageWithMissingParameters(self): self.assertEqual(TestOutputModuleMissingParameters.parameters, u'foobar') self.assertIn(u'FILE/UNKNOWN ctime OS:syslog', lines) + output_manager.OutputManager.DeregisterOutput( TestOutputModuleMissingParameters) helpers_manager.ArgumentHelperManager.DeregisterHelper(