From 494a6282defcd6bfb8cd471f62015b03f994b5e1 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 5 Dec 2015 07:45:25 +0100 Subject: [PATCH] Code review: 267520043: Made storage time range filter non-static #56 --- config/dpkg/changelog | 2 +- docs/plaso.output.rst | 8 + docs/plaso.storage.rst | 8 + plaso/__init__.py | 2 +- plaso/frontend/psort.py | 31 ++-- plaso/lib/pfilter.py | 8 - plaso/output/event_buffer.py | 117 ++++++++++++++ plaso/output/interface.py | 105 +------------ plaso/storage/time_range.py | 42 +++++ plaso/storage/zip_file.py | 218 ++++++++++---------------- tests/frontend/extraction_frontend.py | 5 - tests/frontend/psort.py | 61 ++++--- tests/output/event_buffer.py | 71 +++++++++ tests/output/interface.py | 85 +--------- tests/output/pstorage.py | 13 +- tests/output/test_lib.py | 35 +++++ tests/storage/zip_file.py | 116 ++++++++------ tools/psort_test.py | 5 - 18 files changed, 482 insertions(+), 450 deletions(-) create mode 100644 plaso/output/event_buffer.py create mode 100644 plaso/storage/time_range.py create mode 100644 tests/output/event_buffer.py diff --git a/config/dpkg/changelog b/config/dpkg/changelog index 2c1542a276..721da24289 100644 --- a/config/dpkg/changelog +++ b/config/dpkg/changelog @@ -2,4 +2,4 @@ python-plaso (1.3.1-1) unstable; urgency=low * Auto-generated - -- Log2Timeline Fri, 04 Dec 2015 15:29:30 +0100 + -- Log2Timeline Sat, 05 Dec 2015 07:45:25 +0100 diff --git a/docs/plaso.output.rst b/docs/plaso.output.rst index 15f2d99944..473d332152 100644 --- a/docs/plaso.output.rst +++ b/docs/plaso.output.rst @@ -20,6 +20,14 @@ plaso.output.elastic module :undoc-members: :show-inheritance: +plaso.output.event_buffer module +-------------------------------- + +.. automodule:: plaso.output.event_buffer + :members: + :undoc-members: + :show-inheritance: + plaso.output.interface module ----------------------------- diff --git a/docs/plaso.storage.rst b/docs/plaso.storage.rst index aa5da90677..9ed56307ac 100644 --- a/docs/plaso.storage.rst +++ b/docs/plaso.storage.rst @@ -20,6 +20,14 @@ plaso.storage.factory module :undoc-members: :show-inheritance: +plaso.storage.time_range module +------------------------------- + +.. automodule:: plaso.storage.time_range + :members: + :undoc-members: + :show-inheritance: + plaso.storage.writer module --------------------------- diff --git a/plaso/__init__.py b/plaso/__init__.py index 13f24943b6..0b3293d388 100644 --- a/plaso/__init__.py +++ b/plaso/__init__.py @@ -3,7 +3,7 @@ __version__ = '1.3.1' VERSION_DEV = True -VERSION_DATE = '20151204' +VERSION_DATE = '20151205' def GetVersion(): diff --git a/plaso/frontend/psort.py b/plaso/frontend/psort.py index d2f3bb7fdc..71c7ab98f1 100644 --- a/plaso/frontend/psort.py +++ b/plaso/frontend/psort.py @@ -18,14 +18,14 @@ from plaso.frontend import frontend from plaso.lib import bufferlib from plaso.lib import event -from plaso.lib import pfilter from plaso.lib import timelib from plaso.multi_processing import multi_process -from plaso.output import interface as output_interface +from plaso.output import event_buffer as output_event_buffer from plaso.output import manager as output_manager from plaso.output import mediator as output_mediator from plaso.proto import plaso_storage_pb2 from plaso.serializer import protobuf_serializer +from plaso.storage import time_range as storage_time_range import pytz @@ -45,10 +45,12 @@ def __init__(self): self._data_location = None self._filter_buffer = None self._filter_expression = None + # Instance of EventObjectFilter. self._filter_object = None self._output_format = None self._preferred_language = u'en-US' self._quiet_mode = False + self._time_slice_filter = None self._use_zeromq = False def _AppendEvent(self, event_object, output_buffer, event_queues): @@ -405,9 +407,14 @@ def ProcessEventsFromStorage( if not analysis_queues: analysis_queues = [] - event_object = storage_file.GetSortedEntry() - while event_object: - if my_filter: + # TODO: refactor to use StorageReader. + for event_object in storage_file.GetSortedEntries( + time_range=self._time_slice_filter): + # TODO: clean up this function. + if not my_filter: + counter[u'Events Included'] += 1 + self._AppendEvent(event_object, output_buffer, analysis_queues) + else: event_match = event_object if isinstance(event_object, plaso_storage_pb2.EventObject): # TODO: move serialization to storage, if low-level filtering is @@ -446,14 +453,10 @@ def ProcessEventsFromStorage( counter[u'Events Filtered Out'] += 1 else: counter[u'Events Filtered Out'] += 1 - else: - counter[u'Events Included'] += 1 - self._AppendEvent(event_object, output_buffer, analysis_queues) - - event_object = storage_file.GetSortedEntry() for analysis_queue in analysis_queues: analysis_queue.Close() + if output_buffer.duplicate_counter: counter[u'Duplicate Removals'] = output_buffer.duplicate_counter @@ -497,15 +500,13 @@ def ProcessStorage( """ if time_slice: if time_slice.event_timestamp: - pfilter.TimeRangeCache.SetLowerTimestamp(time_slice.start_timestamp) - pfilter.TimeRangeCache.SetUpperTimestamp(time_slice.end_timestamp) + self._time_slice_filter = storage_time_range.TimeRange( + time_slice.start_timestamp, time_slice.end_timestamp) elif use_time_slicer: self._filter_buffer = bufferlib.CircularBuffer(time_slice.duration) with storage_file: - storage_file.SetStoreLimit(self._filter_object) - # TODO: allow for single processing. # TODO: add upper queue limit. analysis_queue_port = None @@ -539,7 +540,7 @@ def ProcessStorage( else: event_queue_producers = [] - output_buffer = output_interface.EventBuffer( + output_buffer = output_event_buffer.EventBuffer( output_module, deduplicate_events) with output_buffer: counter = self.ProcessEventsFromStorage( diff --git a/plaso/lib/pfilter.py b/plaso/lib/pfilter.py index dc9adc721e..81ee8a37f4 100644 --- a/plaso/lib/pfilter.py +++ b/plaso/lib/pfilter.py @@ -412,14 +412,6 @@ def __getattr__(self, attr): class TimeRangeCache(object): """A class that stores timeranges from filters.""" - @classmethod - def ResetTimeConstraints(cls): - """Resets the time constraints.""" - if hasattr(cls, '_lower'): - del cls._lower - if hasattr(cls, '_upper'): - del cls._upper - @classmethod def SetLowerTimestamp(cls, timestamp): """Sets the lower bound timestamp.""" diff --git a/plaso/output/event_buffer.py b/plaso/output/event_buffer.py new file mode 100644 index 0000000000..24b8bb7ae1 --- /dev/null +++ b/plaso/output/event_buffer.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +"""This file contains the event buffer class.""" + +import logging + +from plaso.lib import errors +from plaso.lib import utils + + +# TODO: fix docstrings after determining this class is still needed. +class EventBuffer(object): + """Buffer class for event object output processing. + + Attributes: + check_dedups: boolean value indicating whether or not the buffer + should check and merge duplicate entries or not. + duplicate_counter: integer that contains the number of duplicates. + """ + + MERGE_ATTRIBUTES = [u'inode', u'filename', u'display_name'] + + def __init__(self, output_module, check_dedups=True): + """Initializes an event buffer object. + + This class is used for buffering up events for duplicate removals + and for other post-processing/analysis of events before being presented + by the appropriate output module. + + Args: + output_module: an output module object (instance of OutputModule). + check_dedups: optional boolean value indicating whether or not the buffer + should check and merge duplicate entries or not. + """ + self._buffer_dict = {} + self._current_timestamp = 0 + self._output_module = output_module + self._output_module.Open() + self._output_module.WriteHeader() + + self.check_dedups = check_dedups + self.duplicate_counter = 0 + + def __enter__(self): + """Make usable with "with" statement.""" + return self + + def __exit__(self, unused_type, unused_value, unused_traceback): + """Make usable with "with" statement.""" + self.End() + + def Append(self, event_object): + """Append an EventObject into the processing pipeline. + + Args: + event_object: the EventObject that is being added. + """ + if not self.check_dedups: + self._output_module.WriteEvent(event_object) + return + + if event_object.timestamp != self._current_timestamp: + self._current_timestamp = event_object.timestamp + self.Flush() + + key = event_object.EqualityString() + if key in self._buffer_dict: + self.JoinEvents(event_object, self._buffer_dict.pop(key)) + self._buffer_dict[key] = event_object + + def End(self): + """Calls the formatter to produce the closing line.""" + self.Flush() + + if self._output_module: + self._output_module.WriteFooter() + self._output_module.Close() + + def Flush(self): + """Flushes the buffer by sending records to a formatter and prints.""" + if not self._buffer_dict: + return + + for event_object in self._buffer_dict.values(): + try: + self._output_module.WriteEvent(event_object) + except errors.WrongFormatter as exception: + logging.error(u'Unable to write event: {0:s}'.format(exception)) + + self._buffer_dict = {} + + def JoinEvents(self, event_a, event_b): + """Join this EventObject with another one.""" + self.duplicate_counter += 1 + # TODO: Currently we are using the first event pathspec, perhaps that + # is not the best approach. There is no need to have all the pathspecs + # inside the combined event, however which one should be chosen is + # perhaps something that can be evaluated here (regular TSK in favor of + # an event stored deep inside a VSS for instance). + for attr in self.MERGE_ATTRIBUTES: + # TODO: remove need for GetUnicodeString. + val_a = set(utils.GetUnicodeString( + getattr(event_a, attr, u'')).split(u';')) + val_b = set(utils.GetUnicodeString( + getattr(event_b, attr, u'')).split(u';')) + values_list = list(val_a | val_b) + values_list.sort() # keeping this consistent across runs helps with diffs + setattr(event_a, attr, u';'.join(values_list)) + + # Special instance if this is a filestat entry we need to combine the + # description field. + if getattr(event_a, u'parser', u'') == u'filestat': + description_a = set(getattr(event_a, u'timestamp_desc', u'').split(u';')) + description_b = set(getattr(event_b, u'timestamp_desc', u'').split(u';')) + descriptions = list(description_a | description_b) + descriptions.sort() + if event_b.timestamp_desc not in event_a.timestamp_desc: + setattr(event_a, u'timestamp_desc', u';'.join(descriptions)) diff --git a/plaso/output/interface.py b/plaso/output/interface.py index 92ba798a97..1b04c191ff 100644 --- a/plaso/output/interface.py +++ b/plaso/output/interface.py @@ -1,11 +1,10 @@ # -*- coding: utf-8 -*- -"""This file contains the interface for output modules.""" +"""This file contains the output module interface classes.""" import abc import logging from plaso.lib import errors -from plaso.lib import utils class OutputModule(object): @@ -141,105 +140,3 @@ def SetOutputWriter(self, output_writer): def Close(self): """Closes the output.""" self._output_writer = None - - -class EventBuffer(object): - """Buffer class for EventObject output processing.""" - - MERGE_ATTRIBUTES = [u'inode', u'filename', u'display_name'] - - def __init__(self, output_module, check_dedups=True): - """Initializes an event buffer object. - - This class is used for buffering up events for duplicate removals - and for other post-processing/analysis of events before being presented - by the appropriate output module. - - Args: - output_module: An output module object (instance of OutputModule). - check_dedups: Optional boolean value indicating whether or not the buffer - should check and merge duplicate entries or not. - """ - self._buffer_dict = {} - self._current_timestamp = 0 - self._output_module = output_module - self._output_module.Open() - self._output_module.WriteHeader() - - self.check_dedups = check_dedups - self.duplicate_counter = 0 - - def Append(self, event_object): - """Append an EventObject into the processing pipeline. - - Args: - event_object: The EventObject that is being added. - """ - if not self.check_dedups: - self._output_module.WriteEvent(event_object) - return - - if event_object.timestamp != self._current_timestamp: - self._current_timestamp = event_object.timestamp - self.Flush() - - key = event_object.EqualityString() - if key in self._buffer_dict: - self.JoinEvents(event_object, self._buffer_dict.pop(key)) - self._buffer_dict[key] = event_object - - def Flush(self): - """Flushes the buffer by sending records to a formatter and prints.""" - if not self._buffer_dict: - return - - for event_object in self._buffer_dict.values(): - try: - self._output_module.WriteEvent(event_object) - except errors.WrongFormatter as exception: - logging.error(u'Unable to write event: {0:s}'.format(exception)) - - self._buffer_dict = {} - - def JoinEvents(self, event_a, event_b): - """Join this EventObject with another one.""" - self.duplicate_counter += 1 - # TODO: Currently we are using the first event pathspec, perhaps that - # is not the best approach. There is no need to have all the pathspecs - # inside the combined event, however which one should be chosen is - # perhaps something that can be evaluated here (regular TSK in favor of - # an event stored deep inside a VSS for instance). - for attr in self.MERGE_ATTRIBUTES: - val_a = set(utils.GetUnicodeString( - getattr(event_a, attr, u'')).split(u';')) - val_b = set(utils.GetUnicodeString( - getattr(event_b, attr, u'')).split(u';')) - values_list = list(val_a | val_b) - values_list.sort() # keeping this consistent across runs helps with diffs - setattr(event_a, attr, u';'.join(values_list)) - - # Special instance if this is a filestat entry we need to combine the - # description field. - if getattr(event_a, u'parser', u'') == u'filestat': - description_a = set(getattr(event_a, u'timestamp_desc', u'').split(u';')) - description_b = set(getattr(event_b, u'timestamp_desc', u'').split(u';')) - descriptions = list(description_a | description_b) - descriptions.sort() - if event_b.timestamp_desc not in event_a.timestamp_desc: - setattr(event_a, u'timestamp_desc', u';'.join(descriptions)) - - def End(self): - """Call the formatter to produce the closing line.""" - self.Flush() - - if self._output_module: - self._output_module.WriteFooter() - self._output_module.Close() - - def __exit__(self, unused_type, unused_value, unused_traceback): - """Make usable with "with" statement.""" - self.End() - - def __enter__(self): - """Make usable with "with" statement.""" - return self diff --git a/plaso/storage/time_range.py b/plaso/storage/time_range.py new file mode 100644 index 0000000000..4f3b19180d --- /dev/null +++ b/plaso/storage/time_range.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +"""Storage time range objects.""" + +class TimeRange(object): + """A class that defines a date and time range. + + The timestamp are integers containing the number of micro seconds + since January 1, 1970, 00:00:00 UTC. + + Attributes: + start_timestamp: integer containing the timestamp that marks + the start of the range. + end_timestamp: integer containing the timestamp that marks + the end of the range. + """ + + def __init__(self, start_timestamp, end_timestamp): + """Initializes a date and time range object. + + The timestamp are integers containing the number of micro seconds + since January 1, 1970, 00:00:00 UTC. + + Args: + start_timestamp: integer containing the timestamp that marks + the start of the range. + end_timestamp: integer containing the timestamp that marks + the end of the range. + + Raises: + ValueError: If the time range is badly formed. + """ + if start_timestamp is None or end_timestamp is None: + raise ValueError( + u'Time range must have either a start and an end timestamp.') + + if start_timestamp > end_timestamp: + raise ValueError( + u'Invalid start must be earlier than end timestamp.') + + super(TimeRange, self).__init__() + self.end_timestamp = end_timestamp + self.start_timestamp = start_timestamp diff --git a/plaso/storage/zip_file.py b/plaso/storage/zip_file.py index e5af258c4b..d9414b8e3f 100644 --- a/plaso/storage/zip_file.py +++ b/plaso/storage/zip_file.py @@ -96,8 +96,6 @@ from plaso.engine import profiler from plaso.lib import definitions from plaso.lib import event -from plaso.lib import limit -from plaso.lib import pfilter from plaso.lib import timelib from plaso.lib import utils from plaso.proto import plaso_storage_pb2 @@ -734,10 +732,6 @@ def __init__( not exist. """ super(StorageFile, self).__init__() - # bound_first and bound_last contain timestamps and None is used - # to indicate not set. - self._bound_first = None - self._bound_last = None self._buffer = [] self._buffer_first_timestamp = sys.maxint self._buffer_last_timestamp = 0 @@ -768,8 +762,6 @@ def __init__( self._profiling_sample = 0 self._serializers_profiler = None - self.store_range = None - def __enter__(self): """Make usable with "with" statement.""" return self @@ -888,8 +880,8 @@ def _GetEventObject(self, stream_number, entry_index=-1): Args: stream_number: an integer containing the number of the serialized event object stream. - entry_index: an integer containing the number of the serialized event - object within the stream. Where -1 represents the next + entry_index: an optional integer containing the number of the serialized + event object within the stream. Where -1 represents the next available event object. Returns: @@ -922,8 +914,9 @@ def _GetEventObjectSerializedData(self, stream_number, entry_index=-1): Args: stream_number: an integer containing the number of the stream. - entry_index: Read a specific entry in the file. The default is -1, - which represents the first available entry. + entry_index: an optional integer containing the number of the serialized + event object within the stream. Where -1 represents the next + available event object. Returns: A tuple containing the event object serialized data and the entry index @@ -940,41 +933,6 @@ def _GetEventObjectSerializedData(self, stream_number, entry_index=-1): u'with error: {1:s}.').format(stream_number, exception)) return None, None - if (not data_stream.entry_index and entry_index == -1 and - self._bound_first is not None): - # We only get here if the following conditions are met: - # 1. data_stream.entry_index is not set (so this is the first read - # from this file). - # 2. There is a lower bound (so we have a date filter). - # 3. We are accessing this function using 'get me the next entry' as an - # opposed to the 'get me entry X', where we just want to server entry - # X. - # - # The purpose: speed seeking into the storage file based on time. Instead - # of spending precious time reading through the storage file and - # deserializing protobufs just to compare timestamps we read a much - # 'cheaper' file, one that only contains timestamps to find the proper - # entry into the storage file. That way we'll get to the right place in - # the file and can start reading protobufs from the right location. - - stream_name = u'plaso_timestamps.{0:06d}'.format(stream_number) - if self._HasStream(stream_name): - try: - entry_index = self._GetFirstSerializedDataStreamEntryIndex( - stream_number, self._bound_first) - - except IOError as exception: - entry_index = None - - logging.error( - u'Unable to read timestamp table from stream: {0:s}.'.format( - stream_name)) - - if entry_index is None: - return None, None - - # TODO: determine if anything else needs to be handled here. - if entry_index >= 0: try: offset_table = self._GetSerializedEventObjectOffsetTable(stream_number) @@ -1022,54 +980,58 @@ def _GetEventTagIndexValue(self, store_number, entry_index, uuid): return tag_index_value - def _GetFirstSerializedDataStreamEntryIndex(self, stream_number, timestamp): - """Retrieves the first serialized data stream entry index for a timestamp. + def _InitializeMergeBuffer(self, time_range=None): + """Initializes the event objects into the merge buffer. - The timestamp indicates the lower bound. The first entry with a timestamp - greater or equal to the lower bound will be returned. + This function fills the merge buffer with the first relevant event object + from each stream. Args: - stream_number: an integer containing the number of the stream. - timestamp: the lower bounds timestamp which is an integer containing - the number of micro seconds since January 1, 1970, - 00:00:00 UTC. - - Returns: - The index of the entry in the corresponding serialized data stream or - None if there was no matching entry. - - Raises: - IOError: if the stream cannot be opened. + time_range: an optional time range object (instance of TimeRange). """ - timestamp_table = self._GetSerializedEventObjectTimestampTable( - stream_number) - - for entry_index in range(timestamp_table.number_of_timestamps): - timestamp_compare = timestamp_table.GetTimestamp(entry_index) - if timestamp_compare >= timestamp: - return entry_index - - def _InitializeMergeBuffer(self): - """Initializes the merge buffer.""" - if self.store_range is None: - number_range = self._GetSerializedEventObjectStreamNumbers() - else: - number_range = self.store_range - self._merge_buffer = [] - for store_number in number_range: - event_object = self._GetEventObject(store_number) - if not event_object: - return - while event_object.timestamp < self._bound_first: - event_object = self._GetEventObject(store_number) - if not event_object: - return + number_range = self._GetSerializedEventObjectStreamNumbers() + for stream_number in number_range: + entry_index = -1 + if time_range: + stream_name = u'plaso_timestamps.{0:06d}'.format(stream_number) + if self._HasStream(stream_name): + try: + timestamp_table = self._GetSerializedEventObjectTimestampTable( + stream_number) + except IOError as exception: + logging.error(( + u'Unable to read timestamp table from stream: {0:s} ' + u'with error: {1:s}.').format(stream_name, exception)) + + # If the start timestamp of the time range filter is larger than the + # last timestamp in the timestamp table skip this stream. + timestamp_compare = timestamp_table.GetTimestamp(-1) + if time_range.start_timestamp > timestamp_compare: + continue + + for table_index in range(timestamp_table.number_of_timestamps - 1): + timestamp_compare = timestamp_table.GetTimestamp(table_index) + if time_range.start_timestamp >= timestamp_compare: + entry_index = table_index + break + + event_object = self._GetEventObject( + stream_number, entry_index=entry_index) + # Check the lower bound in case no timestamp table was available. + while (event_object and time_range and + event_object.timestamp < time_range.start_timestamp): + event_object = self._GetEventObject(stream_number) + + if event_object: + if (time_range and + event_object.timestamp > time_range.end_timestamp): + continue - heapq.heappush( - self._merge_buffer, - (event_object.timestamp, store_number, event_object)) + heapq.heappush( + self._merge_buffer, + (event_object.timestamp, stream_number, event_object)) # pylint: disable=arguments-differ def _Open( @@ -1311,13 +1273,6 @@ def _WritePreprocessObject(self, pre_obj): # Store information about store range for this particular # preprocessing object. This will determine which stores # this information is applicable for. - stores = self._GetSerializedEventObjectStreamNumbers() - if stores: - end = stores[-1] + 1 - else: - end = self._first_file_number - pre_obj.store_range = (self._first_file_number, end) - if self._serializers_profiler: self._serializers_profiler.StartTiming(u'pre_obj') @@ -1464,41 +1419,54 @@ def GetReports(self): report_string = file_object.read(self.MAXIMUM_REPORT_PROTOBUF_SIZE) yield self._analysis_report_serializer.ReadSerialized(report_string) - def GetSortedEntry(self): - """Retrieves a sorted entry from the storage file. + def GetSortedEntry(self, time_range=None): + """Retrieves a sorted entry. + + Args: + time_range: an optional time range object (instance of TimeRange). Returns: An event object (instance of EventObject). """ - if self._bound_first is None: - self._bound_first, self._bound_last = ( - pfilter.TimeRangeCache.GetTimeRange()) - if self._merge_buffer is None: - self._InitializeMergeBuffer() + self._InitializeMergeBuffer(time_range=time_range) if not self._merge_buffer: return - _, store_number, event_read = heapq.heappop(self._merge_buffer) - if not event_read: + _, stream_number, event_object = heapq.heappop(self._merge_buffer) + if not event_object: return # Stop as soon as we hit the upper bound. - if event_read.timestamp > self._bound_last: + if time_range and event_object.timestamp > time_range.end_timestamp: return - new_event_object = self._GetEventObject(store_number) - - if new_event_object: + # Read the next event object in a stream. + next_event_object = self._GetEventObject(stream_number) + if next_event_object: heapq.heappush( self._merge_buffer, - (new_event_object.timestamp, store_number, new_event_object)) + (next_event_object.timestamp, stream_number, next_event_object)) - event_read.tag = self._ReadEventTagByIdentifier( - event_read.store_number, event_read.store_index, event_read.uuid) + event_object.tag = self._ReadEventTagByIdentifier( + event_object.store_number, event_object.store_index, event_object.uuid) + + return event_object + + def GetSortedEntries(self, time_range=None): + """Retrieves a sorted entries. + + Args: + time_range: an optional time range object (instance of TimeRange). - return event_read + Yields: + An event object (instance of EventObject). + """ + event_object = self.GetSortedEntry(time_range=time_range) + while event_object: + yield event_object + event_object = self.GetSortedEntry(time_range=time_range) def GetStorageInformation(self): """Retrieves storage (preprocessing) information stored in the storage file. @@ -1592,32 +1560,6 @@ def SetEnableProfiling(self, enable_profiling, profiling_type=u'all'): not self._serializers_profiler): self._serializers_profiler = profiler.SerializersProfiler(u'Storage') - def SetStoreLimit(self, unused_my_filter=None): - """Set a limit to the stores used for returning data.""" - # Retrieve set first and last timestamps. - self._bound_first, self._bound_last = pfilter.TimeRangeCache.GetTimeRange() - - self.store_range = [] - - # TODO: Fetch a filter object from the filter query. - - for number in self._GetSerializedEventObjectStreamNumbers(): - # TODO: Read more criteria from here. - first, last = self._ReadMeta(number).get(u'range', (0, limit.MAX_INT64)) - if last < first: - logging.error( - u'last: {0:d} first: {1:d} container: {2:d} (last < first)'.format( - last, first, number)) - - if first <= self._bound_last and self._bound_first <= last: - # TODO: Check at least parser and data_type (stored in metadata). - # Check whether these attributes exist in filter, if so use the filter - # to determine whether the stores should be included. - self.store_range.append(number) - - else: - logging.debug(u'Store [{0:d}] not used'.format(number)) - # TODO: move only used in testing. def StoreGrouping(self, rows): """Store group information into the storage file. diff --git a/tests/frontend/extraction_frontend.py b/tests/frontend/extraction_frontend.py index a1284ac1a2..5b113a04c4 100644 --- a/tests/frontend/extraction_frontend.py +++ b/tests/frontend/extraction_frontend.py @@ -12,7 +12,6 @@ from dfvfs.path import factory as path_spec_factory from plaso.frontend import extraction_frontend -from plaso.lib import pfilter from plaso.storage import zip_file as storage_zip_file from tests.frontend import test_lib @@ -23,10 +22,6 @@ class ExtractionFrontendTests(test_lib.FrontendTestCase): def setUp(self): """Sets up the objects used throughout the test.""" - # This is necessary since TimeRangeCache uses class members. - # TODO: remove this work around and properly fix TimeRangeCache. - pfilter.TimeRangeCache.ResetTimeConstraints() - self._temp_directory = tempfile.mkdtemp() def tearDown(self): diff --git a/tests/frontend/psort.py b/tests/frontend/psort.py index a5f9490278..0fca8c171d 100644 --- a/tests/frontend/psort.py +++ b/tests/frontend/psort.py @@ -10,10 +10,11 @@ from plaso.formatters import mediator as formatters_mediator from plaso.frontend import psort from plaso.lib import event -from plaso.lib import pfilter from plaso.lib import timelib +from plaso.output import event_buffer as output_event_buffer from plaso.output import interface as output_interface from plaso.output import mediator as output_mediator +from plaso.storage import time_range as storage_time_range from plaso.storage import zip_file as storage_zip_file from tests import test_lib as shared_test_lib @@ -72,7 +73,7 @@ def WriteHeader(self): self._WriteLine(self._HEADER) -class TestEventBuffer(output_interface.EventBuffer): +class TestEventBuffer(output_event_buffer.EventBuffer): """A test event buffer.""" def __init__(self, output_module, check_dedups=True, store=None): @@ -116,30 +117,28 @@ def setUp(self): self._front_end = psort.PsortFrontend() # TODO: have sample output generated from the test. - self._test_file_proto = self._GetTestFilePath([u'psort_test.proto.plaso']) - self._test_file_json = self._GetTestFilePath([u'psort_test.json.plaso']) - self.first = timelib.Timestamp.CopyFromString(u'2012-07-24 21:45:24') - self.last = timelib.Timestamp.CopyFromString(u'2016-11-18 01:15:43') + + self._start_timestamp = timelib.Timestamp.CopyFromString( + u'2012-07-24 21:45:24') + self._end_timestamp = timelib.Timestamp.CopyFromString( + u'2016-11-18 01:15:43') def testReadEntries(self): """Ensure returned EventObjects from the storage are within time bounds.""" - timestamp_list = [] - pfilter.TimeRangeCache.ResetTimeConstraints() - pfilter.TimeRangeCache.SetUpperTimestamp(self.last) - pfilter.TimeRangeCache.SetLowerTimestamp(self.first) - + storage_file_path = self._GetTestFilePath([u'psort_test.proto.plaso']) storage_file = storage_zip_file.StorageFile( - self._test_file_proto, read_only=True) - storage_file.SetStoreLimit() + storage_file_path, read_only=True) + time_range = storage_time_range.TimeRange( + self._start_timestamp, self._end_timestamp) - event_object = storage_file.GetSortedEntry() - while event_object: + timestamp_list = [] + # TODO: refactor to use StorageReader. + for event_object in storage_file.GetSortedEntries(time_range=time_range): timestamp_list.append(event_object.timestamp) - event_object = storage_file.GetSortedEntry() self.assertEqual(len(timestamp_list), 15) - self.assertTrue( - timestamp_list[0] >= self.first and timestamp_list[-1] <= self.last) + self.assertEqual(timestamp_list[0], self._start_timestamp) + self.assertEqual(timestamp_list[-1], self._end_timestamp) storage_file.Close() @@ -181,27 +180,25 @@ def testOutput(self): formatters_manager.FormattersManager.RegisterFormatter( PsortTestEventFormatter) - events = [] - events.append(PsortTestEvent(5134324321)) - events.append(PsortTestEvent(2134324321)) - events.append(PsortTestEvent(9134324321)) - events.append(PsortTestEvent(15134324321)) - events.append(PsortTestEvent(5134324322)) - events.append(PsortTestEvent(5134024321)) + events = [ + PsortTestEvent(5134324321), + PsortTestEvent(2134324321), + PsortTestEvent(9134324321), + PsortTestEvent(15134324321), + PsortTestEvent(5134324322), + PsortTestEvent(5134024321)] output_writer = cli_test_lib.TestOutputWriter() with shared_test_lib.TempDirectory() as dirname: temp_file = os.path.join(dirname, u'plaso.db') - storage_file = storage_zip_file.StorageFile(temp_file, read_only=False) - pfilter.TimeRangeCache.ResetTimeConstraints() - storage_file.SetStoreLimit() + storage_file = storage_zip_file.StorageFile(temp_file) storage_file.AddEventObjects(events) storage_file.Close() - with storage_zip_file.StorageFile(temp_file) as storage_file: - storage_file.store_range = [1] + with storage_zip_file.StorageFile( + temp_file, read_only=True) as storage_file: output_mediator_object = output_mediator.OutputMediator( self._formatter_mediator, storage_file) output_module = TestOutputModule(output_mediator_object) @@ -235,8 +232,8 @@ def testOutput(self): def testGetLastGoodPreprocess(self): """Tests the last good preprocess method.""" test_front_end = psort.PsortFrontend() - storage_file = test_front_end.OpenStorage( - self._test_file_json, read_only=True) + storage_file_path = self._GetTestFilePath([u'psort_test.json.plaso']) + storage_file = test_front_end.OpenStorage(storage_file_path, read_only=True) preprocessor_object = test_front_end._GetLastGoodPreprocess(storage_file) self.assertIsNotNone(preprocessor_object) timezone = getattr(preprocessor_object, u'zone') diff --git a/tests/output/event_buffer.py b/tests/output/event_buffer.py new file mode 100644 index 0000000000..321a7235fe --- /dev/null +++ b/tests/output/event_buffer.py @@ -0,0 +1,71 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import unittest + +from plaso.output import event_buffer + +from tests.cli import test_lib as cli_test_lib +from tests.output import test_lib + + +class TestEvent(object): + """Simple class that defines a dummy event.""" + + def __init__(self, timestamp, entry): + self.date = u'03/01/2012' + try: + self.timestamp = int(timestamp) + except ValueError: + self.timestamp = 0 + self.entry = entry + def EqualityString(self): + return u';'.join(map(str, [self.timestamp, self.entry])) + + +class EventBufferTest(test_lib.OutputModuleTestCase): + """Few unit tests for the EventBuffer class.""" + + def _CheckBufferLength(self, event_buffer_object, expected_length): + """Checks the length of the event buffer. + + Args: + event_buffer_object: the event buffer object (instance of EventBuffer). + expect_length: the expected event buffer length. + """ + if not event_buffer_object.check_dedups: + expected_length = 0 + + # pylint: disable=protected-access + self.assertEqual(len(event_buffer_object._buffer_dict), expected_length) + + def testFlush(self): + """Test to ensure we empty our buffers and sends to output properly.""" + output_mediator = self._CreateOutputMediator() + output_writer = cli_test_lib.TestOutputWriter() + output_module = test_lib.TestOutputModule(output_mediator) + output_module.SetOutputWriter(output_writer) + event_buffer_object = event_buffer.EventBuffer(output_module, False) + + event_buffer_object.Append(TestEvent(123456, u'Now is now')) + self._CheckBufferLength(event_buffer_object, 1) + + # Add three events. + event_buffer_object.Append(TestEvent(123456, u'OMG I AM DIFFERENT')) + event_buffer_object.Append(TestEvent(123456, u'Now is now')) + event_buffer_object.Append(TestEvent(123456, u'Now is now')) + self._CheckBufferLength(event_buffer_object, 2) + + event_buffer_object.Flush() + self._CheckBufferLength(event_buffer_object, 0) + + event_buffer_object.Append(TestEvent(123456, u'Now is now')) + event_buffer_object.Append(TestEvent(123456, u'Now is now')) + event_buffer_object.Append(TestEvent(123456, u'Different again :)')) + self._CheckBufferLength(event_buffer_object, 2) + event_buffer_object.Append(TestEvent(123457, u'Now is different')) + self._CheckBufferLength(event_buffer_object, 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/output/interface.py b/tests/output/interface.py index d209ce1fa1..3fbbb0f1d4 100644 --- a/tests/output/interface.py +++ b/tests/output/interface.py @@ -3,7 +3,6 @@ import unittest -from plaso.output import interface from plaso.output import manager from tests.cli import test_lib as cli_test_lib @@ -24,40 +23,6 @@ def EqualityString(self): return u';'.join(map(str, [self.timestamp, self.entry])) -class TestOutputModule(interface.LinearOutputModule): - """This is a test output module that provides a simple XML.""" - - NAME = u'test_xml' - DESCRIPTION = u'Test output that provides a simple mocked XML.' - - def WriteEventBody(self, event_object): - """Writes the body of an event object to the output. - - Args: - event_object: the event object (instance of EventObject). - """ - self._WriteLine(( - u'\t{0:s}\n\t\n' - u'\t{2:s}\n').format( - event_object.date, event_object.timestamp, event_object.entry)) - - def WriteEventEnd(self): - """Writes the end of an event object to the output.""" - self._WriteLine(u'\n') - - def WriteEventStart(self): - """Writes the start of an event object to the output.""" - self._WriteLine(u'\n') - - def WriteFooter(self): - """Writes the footer to the output.""" - self._WriteLine(u'\n') - - def WriteHeader(self): - """Writes the header to the output.""" - self._WriteLine(u'\n') - - class LinearOutputModuleTest(test_lib.OutputModuleTestCase): """Tests the linear output module.""" @@ -71,7 +36,7 @@ def testOutput(self): output_mediator = self._CreateOutputMediator() output_writer = cli_test_lib.TestOutputWriter() - output_module = TestOutputModule(output_mediator) + output_module = test_lib.TestOutputModule(output_mediator) output_module.SetOutputWriter(output_writer) output_module.WriteHeader() for event_object in events: @@ -107,7 +72,7 @@ def testOutput(self): def testOutputList(self): """Test listing up all available registered modules.""" - manager.OutputManager.RegisterOutput(TestOutputModule) + manager.OutputManager.RegisterOutput(test_lib.TestOutputModule) test_output_class = None for name, output_class in manager.OutputManager.GetOutputClasses(): @@ -118,51 +83,7 @@ def testOutputList(self): self.assertIsNotNone(test_output_class) self.assertEqual(test_output_class.DESCRIPTION, expected_description) - manager.OutputManager.DeregisterOutput(TestOutputModule) - - -class EventBufferTest(test_lib.OutputModuleTestCase): - """Few unit tests for the EventBuffer class.""" - - def _CheckBufferLength(self, event_buffer, expected_length): - """Checks the length of the event buffer. - - Args: - event_buffer: the event buffer object (instance of EventBuffer). - expect_length: the expected event buffer length. - """ - if not event_buffer.check_dedups: - expected_length = 0 - - # pylint: disable=protected-access - self.assertEqual(len(event_buffer._buffer_dict), expected_length) - - def testFlush(self): - """Test to ensure we empty our buffers and sends to output properly.""" - output_mediator = self._CreateOutputMediator() - output_writer = cli_test_lib.TestOutputWriter() - output_module = TestOutputModule(output_mediator) - output_module.SetOutputWriter(output_writer) - event_buffer = interface.EventBuffer(output_module, False) - - event_buffer.Append(TestEvent(123456, u'Now is now')) - self._CheckBufferLength(event_buffer, 1) - - # Add three events. - event_buffer.Append(TestEvent(123456, u'OMG I AM DIFFERENT')) - event_buffer.Append(TestEvent(123456, u'Now is now')) - event_buffer.Append(TestEvent(123456, u'Now is now')) - self._CheckBufferLength(event_buffer, 2) - - event_buffer.Flush() - self._CheckBufferLength(event_buffer, 0) - - event_buffer.Append(TestEvent(123456, u'Now is now')) - event_buffer.Append(TestEvent(123456, u'Now is now')) - event_buffer.Append(TestEvent(123456, u'Different again :)')) - self._CheckBufferLength(event_buffer, 2) - event_buffer.Append(TestEvent(123457, u'Now is different')) - self._CheckBufferLength(event_buffer, 1) + manager.OutputManager.DeregisterOutput(test_lib.TestOutputModule) if __name__ == '__main__': diff --git a/tests/output/pstorage.py b/tests/output/pstorage.py index 42b431e994..40a5e14784 100644 --- a/tests/output/pstorage.py +++ b/tests/output/pstorage.py @@ -5,8 +5,7 @@ import os import unittest -from plaso.lib import pfilter -from plaso.output import interface +from plaso.output import event_buffer from plaso.output import pstorage from plaso.storage import zip_file as storage_zip_file @@ -21,10 +20,6 @@ def setUp(self): """Sets up the objects needed for this test.""" self.test_filename = os.path.join(u'test_data', u'psort_test.proto.plaso') - # Show full diff results, part of TestCase so does not follow our naming - # conventions. - pfilter.TimeRangeCache.ResetTimeConstraints() - def testOutput(self): with shared_test_lib.TempDirectory() as dirname: storage_file = os.path.join(dirname, u'plaso.plaso') @@ -35,12 +30,10 @@ def testOutput(self): output_module = pstorage.PlasoStorageOutputModule(output_mediator) output_module.SetFilePath(storage_file) - with interface.EventBuffer( + with event_buffer.EventBuffer( output_module, check_dedups=False) as output_buffer: - event_object = store.GetSortedEntry() - while event_object: + for event_object in store.GetSortedEntries(): output_buffer.Append(event_object) - event_object = store.GetSortedEntry() # Make sure original and dump have the same events. original = storage_zip_file.StorageFile( diff --git a/tests/output/test_lib.py b/tests/output/test_lib.py index 495691ffcd..75fa800344 100644 --- a/tests/output/test_lib.py +++ b/tests/output/test_lib.py @@ -11,6 +11,7 @@ from plaso.formatters import mediator as formatters_mediator from plaso.lib import event from plaso.lib import timelib +from plaso.output import interface from plaso.output import mediator @@ -52,6 +53,40 @@ class TestEventFormatter(formatters_interface.EventFormatter): SOURCE_LONG = u'Syslog' +class TestOutputModule(interface.LinearOutputModule): + """This is a test output module that provides a simple XML.""" + + NAME = u'test_xml' + DESCRIPTION = u'Test output that provides a simple mocked XML.' + + def WriteEventBody(self, event_object): + """Writes the body of an event object to the output. + + Args: + event_object: the event object (instance of EventObject). + """ + self._WriteLine(( + u'\t{0:s}\n\t\n' + u'\t{2:s}\n').format( + event_object.date, event_object.timestamp, event_object.entry)) + + def WriteEventEnd(self): + """Writes the end of an event object to the output.""" + self._WriteLine(u'\n') + + def WriteEventStart(self): + """Writes the start of an event object to the output.""" + self._WriteLine(u'\n') + + def WriteFooter(self): + """Writes the footer to the output.""" + self._WriteLine(u'\n') + + def WriteHeader(self): + """Writes the header to the output.""" + self._WriteLine(u'\n') + + class OutputModuleTestCase(unittest.TestCase): """The unit test case for a output module.""" diff --git a/tests/storage/zip_file.py b/tests/storage/zip_file.py index 2fba9052ed..ac0cdc0cac 100644 --- a/tests/storage/zip_file.py +++ b/tests/storage/zip_file.py @@ -10,10 +10,10 @@ from plaso.formatters import mediator as formatters_mediator from plaso.lib import event from plaso.lib import eventdata -from plaso.lib import pfilter from plaso.lib import timelib from plaso.formatters import winreg # pylint: disable=unused-import from plaso.serializer import protobuf_serializer +from plaso.storage import time_range from plaso.storage import zip_file from tests import test_lib as shared_test_lib @@ -380,6 +380,71 @@ def _GetTaggedEvent(self, storage_file, event_tag): event_object.tag = event_tag return event_object + def testGetSortedEntries(self): + """Tests the GetSortedEntries function.""" + test_file = self._GetTestFilePath([u'psort_test.proto.plaso']) + + storage_file = zip_file.StorageFile(test_file, read_only=True) + + timestamps = [] + for event_object in storage_file.GetSortedEntries(): + timestamps.append(event_object.timestamp) + + expected_timestamps = [ + 1343166324000000, 1344270407000000, 1390377153000000, 1390377153000000, + 1390377181000000, 1390377241000000, 1390377241000000, 1390377272000000, + 1392438730000000, 1418925272000000, 1427151678000000, 1427151678000123, + 1451584472000000, 1479431720000000, 1479431743000000] + + self.assertEqual(sorted(timestamps), expected_timestamps) + + # Test lower bound time range filter. + test_time_range = time_range.TimeRange( + timelib.Timestamp.CopyFromString(u'2014-02-16 00:00:00'), + timelib.Timestamp.CopyFromString(u'2030-12-31 23:59:59')) + + storage_file = zip_file.StorageFile(test_file, read_only=True) + + timestamps = [] + for event_object in storage_file.GetSortedEntries( + time_range=test_time_range): + timestamps.append(event_object.timestamp) + + expected_timestamps = [ + 1418925272000000, + 1427151678000000, + 1427151678000123, + 1451584472000000, + 1479431720000000, + 1479431743000000] + + self.assertEqual(sorted(timestamps), expected_timestamps) + + # Test upper bound time range filter. + test_time_range = time_range.TimeRange( + timelib.Timestamp.CopyFromString(u'2000-01-01 00:00:00'), + timelib.Timestamp.CopyFromString(u'2014-02-16 00:00:00')) + + storage_file = zip_file.StorageFile(test_file, read_only=True) + + timestamps = [] + for event_object in storage_file.GetSortedEntries( + time_range=test_time_range): + timestamps.append(event_object.timestamp) + + expected_timestamps = [ + 1343166324000000, + 1344270407000000, + 1390377153000000, + 1390377153000000, + 1390377181000000, + 1390377241000000, + 1390377241000000, + 1390377272000000, + 1392438730000000] + + self.assertEqual(sorted(timestamps), expected_timestamps) + def testStorage(self): """Test the storage object.""" formatter_mediator = formatters_mediator.FormatterMediator() @@ -393,10 +458,6 @@ def testStorage(self): serializer = protobuf_serializer.ProtobufEventObjectSerializer - # TODO: this is needed to work around static member issue of pfilter - # which is used in storage file. - pfilter.TimeRangeCache.ResetTimeConstraints() - with shared_test_lib.TempDirectory() as dirname: temp_file = os.path.join(dirname, u'plaso.db') self._CreateStorageFile(temp_file) @@ -493,50 +554,7 @@ def testStorage(self): self.assertEqual(same_events, proto_group_events) - def testStorageSort(self): - """This test ensures that items read and output are in the expected order. - - This method by design outputs data as it runs. In order to test this a - a modified output renderer is used for which the flush functionality has - been removed. - - The test will be to read the TestEventBuffer storage and check to see - if it matches the known good sort order. - """ - # TODO: have sample output generated from the test. - # TODO: Use input data with a defined year. The syslog parser chooses a - # year based on system clock; forcing updates to test file if regenerated. - test_file = self._GetTestFilePath([u'psort_test.proto.plaso']) - # First: 1342799054000000 - first = timelib.Timestamp.CopyFromString(u'2012-07-20 15:44:14') - # Last: 1479431743000000 - last = timelib.Timestamp.CopyFromString(u'2016-11-18 01:15:43') - - pfilter.TimeRangeCache.ResetTimeConstraints() - pfilter.TimeRangeCache.SetUpperTimestamp(last) - pfilter.TimeRangeCache.SetLowerTimestamp(first) - storage_file = zip_file.StorageFile(test_file, read_only=True) - - storage_file.store_range = [1, 5, 6] - - timestamps = [] - event_object = storage_file.GetSortedEntry() - while event_object: - timestamps.append(event_object.timestamp) - event_object = storage_file.GetSortedEntry() - - expected_timestamps = [ - 1343166324000000, - 1344270407000000, - 1392438730000000, - 1418925272000000, - 1427151678000000, - 1427151678000123, - 1451584472000000] - - self.assertEqual(sorted(timestamps), expected_timestamps) - - # TODO: add test for StoreReport + # TODO: add test for StoreReport. if __name__ == '__main__': diff --git a/tools/psort_test.py b/tools/psort_test.py index 5924f21087..2d8b400dea 100644 --- a/tools/psort_test.py +++ b/tools/psort_test.py @@ -8,7 +8,6 @@ from plaso.cli.helpers import interface as helpers_interface from plaso.cli.helpers import manager as helpers_manager from plaso.lib import errors -from plaso.lib import pfilter from plaso.output import manager as output_manager from tests import test_lib as shared_test_lib from tests.cli import test_lib as cli_test_lib @@ -127,10 +126,6 @@ def testProcessStorageWithMissingParameters(self): helpers_manager.ArgumentHelperManager.RegisterHelper( TestOutputModuleArgumentHelper) - # TODO: this is needed to work around static member issue of pfilter - # which is used in storage file. - pfilter.TimeRangeCache.ResetTimeConstraints() - lines = [] with shared_test_lib.TempDirectory() as temp_directory: temp_file_name = os.path.join(temp_directory, u'output.txt')