Skip to content

Commit

Permalink
Code review: 267520043: Made storage time range filter non-static log…
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Dec 31, 2015
1 parent b76b8b9 commit 494a628
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 450 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ python-plaso (1.3.1-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <[email protected]> Fri, 04 Dec 2015 15:29:30 +0100
-- Log2Timeline <[email protected]> Sat, 05 Dec 2015 07:45:25 +0100
8 changes: 8 additions & 0 deletions docs/plaso.output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ plaso.output.elastic module
:undoc-members:
:show-inheritance:

plaso.output.event_buffer module
--------------------------------

.. automodule:: plaso.output.event_buffer
:members:
:undoc-members:
:show-inheritance:

plaso.output.interface module
-----------------------------

Expand Down
8 changes: 8 additions & 0 deletions docs/plaso.storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ plaso.storage.factory module
:undoc-members:
:show-inheritance:

plaso.storage.time_range module
-------------------------------

.. automodule:: plaso.storage.time_range
:members:
:undoc-members:
:show-inheritance:

plaso.storage.writer module
---------------------------

Expand Down
2 changes: 1 addition & 1 deletion plaso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__version__ = '1.3.1'

VERSION_DEV = True
VERSION_DATE = '20151204'
VERSION_DATE = '20151205'


def GetVersion():
Expand Down
31 changes: 16 additions & 15 deletions plaso/frontend/psort.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
from plaso.frontend import frontend
from plaso.lib import bufferlib
from plaso.lib import event
from plaso.lib import pfilter
from plaso.lib import timelib
from plaso.multi_processing import multi_process
from plaso.output import interface as output_interface
from plaso.output import event_buffer as output_event_buffer
from plaso.output import manager as output_manager
from plaso.output import mediator as output_mediator
from plaso.proto import plaso_storage_pb2
from plaso.serializer import protobuf_serializer
from plaso.storage import time_range as storage_time_range

import pytz

Expand All @@ -45,10 +45,12 @@ def __init__(self):
self._data_location = None
self._filter_buffer = None
self._filter_expression = None
# Instance of EventObjectFilter.
self._filter_object = None
self._output_format = None
self._preferred_language = u'en-US'
self._quiet_mode = False
self._time_slice_filter = None
self._use_zeromq = False

def _AppendEvent(self, event_object, output_buffer, event_queues):
Expand Down Expand Up @@ -405,9 +407,14 @@ def ProcessEventsFromStorage(
if not analysis_queues:
analysis_queues = []

event_object = storage_file.GetSortedEntry()
while event_object:
if my_filter:
# TODO: refactor to use StorageReader.
for event_object in storage_file.GetSortedEntries(
time_range=self._time_slice_filter):
# TODO: clean up this function.
if not my_filter:
counter[u'Events Included'] += 1
self._AppendEvent(event_object, output_buffer, analysis_queues)
else:
event_match = event_object
if isinstance(event_object, plaso_storage_pb2.EventObject):
# TODO: move serialization to storage, if low-level filtering is
Expand Down Expand Up @@ -446,14 +453,10 @@ def ProcessEventsFromStorage(
counter[u'Events Filtered Out'] += 1
else:
counter[u'Events Filtered Out'] += 1
else:
counter[u'Events Included'] += 1
self._AppendEvent(event_object, output_buffer, analysis_queues)

event_object = storage_file.GetSortedEntry()

for analysis_queue in analysis_queues:
analysis_queue.Close()

if output_buffer.duplicate_counter:
counter[u'Duplicate Removals'] = output_buffer.duplicate_counter

Expand Down Expand Up @@ -497,15 +500,13 @@ def ProcessStorage(
"""
if time_slice:
if time_slice.event_timestamp:
pfilter.TimeRangeCache.SetLowerTimestamp(time_slice.start_timestamp)
pfilter.TimeRangeCache.SetUpperTimestamp(time_slice.end_timestamp)
self._time_slice_filter = storage_time_range.TimeRange(
time_slice.start_timestamp, time_slice.end_timestamp)

elif use_time_slicer:
self._filter_buffer = bufferlib.CircularBuffer(time_slice.duration)

with storage_file:
storage_file.SetStoreLimit(self._filter_object)

# TODO: allow for single processing.
# TODO: add upper queue limit.
analysis_queue_port = None
Expand Down Expand Up @@ -539,7 +540,7 @@ def ProcessStorage(
else:
event_queue_producers = []

output_buffer = output_interface.EventBuffer(
output_buffer = output_event_buffer.EventBuffer(
output_module, deduplicate_events)
with output_buffer:
counter = self.ProcessEventsFromStorage(
Expand Down
8 changes: 0 additions & 8 deletions plaso/lib/pfilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,6 @@ def __getattr__(self, attr):
class TimeRangeCache(object):
"""A class that stores timeranges from filters."""

@classmethod
def ResetTimeConstraints(cls):
"""Resets the time constraints."""
if hasattr(cls, '_lower'):
del cls._lower
if hasattr(cls, '_upper'):
del cls._upper

@classmethod
def SetLowerTimestamp(cls, timestamp):
"""Sets the lower bound timestamp."""
Expand Down
117 changes: 117 additions & 0 deletions plaso/output/event_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
"""This file contains the event buffer class."""

import logging

from plaso.lib import errors
from plaso.lib import utils


# TODO: fix docstrings after determining this class is still needed.
class EventBuffer(object):
"""Buffer class for event object output processing.
Attributes:
check_dedups: boolean value indicating whether or not the buffer
should check and merge duplicate entries or not.
duplicate_counter: integer that contains the number of duplicates.
"""

MERGE_ATTRIBUTES = [u'inode', u'filename', u'display_name']

def __init__(self, output_module, check_dedups=True):
"""Initializes an event buffer object.
This class is used for buffering up events for duplicate removals
and for other post-processing/analysis of events before being presented
by the appropriate output module.
Args:
output_module: an output module object (instance of OutputModule).
check_dedups: optional boolean value indicating whether or not the buffer
should check and merge duplicate entries or not.
"""
self._buffer_dict = {}
self._current_timestamp = 0
self._output_module = output_module
self._output_module.Open()
self._output_module.WriteHeader()

self.check_dedups = check_dedups
self.duplicate_counter = 0

def __enter__(self):
"""Make usable with "with" statement."""
return self

def __exit__(self, unused_type, unused_value, unused_traceback):
"""Make usable with "with" statement."""
self.End()

def Append(self, event_object):
"""Append an EventObject into the processing pipeline.
Args:
event_object: the EventObject that is being added.
"""
if not self.check_dedups:
self._output_module.WriteEvent(event_object)
return

if event_object.timestamp != self._current_timestamp:
self._current_timestamp = event_object.timestamp
self.Flush()

key = event_object.EqualityString()
if key in self._buffer_dict:
self.JoinEvents(event_object, self._buffer_dict.pop(key))
self._buffer_dict[key] = event_object

def End(self):
"""Calls the formatter to produce the closing line."""
self.Flush()

if self._output_module:
self._output_module.WriteFooter()
self._output_module.Close()

def Flush(self):
"""Flushes the buffer by sending records to a formatter and prints."""
if not self._buffer_dict:
return

for event_object in self._buffer_dict.values():
try:
self._output_module.WriteEvent(event_object)
except errors.WrongFormatter as exception:
logging.error(u'Unable to write event: {0:s}'.format(exception))

self._buffer_dict = {}

def JoinEvents(self, event_a, event_b):
"""Join this EventObject with another one."""
self.duplicate_counter += 1
# TODO: Currently we are using the first event pathspec, perhaps that
# is not the best approach. There is no need to have all the pathspecs
# inside the combined event, however which one should be chosen is
# perhaps something that can be evaluated here (regular TSK in favor of
# an event stored deep inside a VSS for instance).
for attr in self.MERGE_ATTRIBUTES:
# TODO: remove need for GetUnicodeString.
val_a = set(utils.GetUnicodeString(
getattr(event_a, attr, u'')).split(u';'))
val_b = set(utils.GetUnicodeString(
getattr(event_b, attr, u'')).split(u';'))
values_list = list(val_a | val_b)
values_list.sort() # keeping this consistent across runs helps with diffs
setattr(event_a, attr, u';'.join(values_list))

# Special instance if this is a filestat entry we need to combine the
# description field.
if getattr(event_a, u'parser', u'') == u'filestat':
description_a = set(getattr(event_a, u'timestamp_desc', u'').split(u';'))
description_b = set(getattr(event_b, u'timestamp_desc', u'').split(u';'))
descriptions = list(description_a | description_b)
descriptions.sort()
if event_b.timestamp_desc not in event_a.timestamp_desc:
setattr(event_a, u'timestamp_desc', u';'.join(descriptions))
105 changes: 1 addition & 104 deletions plaso/output/interface.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# -*- coding: utf-8 -*-
"""This file contains the interface for output modules."""
"""This file contains the output module interface classes."""

import abc
import logging

from plaso.lib import errors
from plaso.lib import utils


class OutputModule(object):
Expand Down Expand Up @@ -141,105 +140,3 @@ def SetOutputWriter(self, output_writer):
def Close(self):
"""Closes the output."""
self._output_writer = None


class EventBuffer(object):
"""Buffer class for EventObject output processing."""

MERGE_ATTRIBUTES = [u'inode', u'filename', u'display_name']

def __init__(self, output_module, check_dedups=True):
"""Initializes an event buffer object.
This class is used for buffering up events for duplicate removals
and for other post-processing/analysis of events before being presented
by the appropriate output module.
Args:
output_module: An output module object (instance of OutputModule).
check_dedups: Optional boolean value indicating whether or not the buffer
should check and merge duplicate entries or not.
"""
self._buffer_dict = {}
self._current_timestamp = 0
self._output_module = output_module
self._output_module.Open()
self._output_module.WriteHeader()

self.check_dedups = check_dedups
self.duplicate_counter = 0

def Append(self, event_object):
"""Append an EventObject into the processing pipeline.
Args:
event_object: The EventObject that is being added.
"""
if not self.check_dedups:
self._output_module.WriteEvent(event_object)
return

if event_object.timestamp != self._current_timestamp:
self._current_timestamp = event_object.timestamp
self.Flush()

key = event_object.EqualityString()
if key in self._buffer_dict:
self.JoinEvents(event_object, self._buffer_dict.pop(key))
self._buffer_dict[key] = event_object

def Flush(self):
"""Flushes the buffer by sending records to a formatter and prints."""
if not self._buffer_dict:
return

for event_object in self._buffer_dict.values():
try:
self._output_module.WriteEvent(event_object)
except errors.WrongFormatter as exception:
logging.error(u'Unable to write event: {0:s}'.format(exception))

self._buffer_dict = {}

def JoinEvents(self, event_a, event_b):
"""Join this EventObject with another one."""
self.duplicate_counter += 1
# TODO: Currently we are using the first event pathspec, perhaps that
# is not the best approach. There is no need to have all the pathspecs
# inside the combined event, however which one should be chosen is
# perhaps something that can be evaluated here (regular TSK in favor of
# an event stored deep inside a VSS for instance).
for attr in self.MERGE_ATTRIBUTES:
val_a = set(utils.GetUnicodeString(
getattr(event_a, attr, u'')).split(u';'))
val_b = set(utils.GetUnicodeString(
getattr(event_b, attr, u'')).split(u';'))
values_list = list(val_a | val_b)
values_list.sort() # keeping this consistent across runs helps with diffs
setattr(event_a, attr, u';'.join(values_list))

# Special instance if this is a filestat entry we need to combine the
# description field.
if getattr(event_a, u'parser', u'') == u'filestat':
description_a = set(getattr(event_a, u'timestamp_desc', u'').split(u';'))
description_b = set(getattr(event_b, u'timestamp_desc', u'').split(u';'))
descriptions = list(description_a | description_b)
descriptions.sort()
if event_b.timestamp_desc not in event_a.timestamp_desc:
setattr(event_a, u'timestamp_desc', u';'.join(descriptions))

def End(self):
"""Call the formatter to produce the closing line."""
self.Flush()

if self._output_module:
self._output_module.WriteFooter()
self._output_module.Close()

def __exit__(self, unused_type, unused_value, unused_traceback):
"""Make usable with "with" statement."""
self.End()

def __enter__(self):
"""Make usable with "with" statement."""
return self
Loading

0 comments on commit 494a628

Please sign in to comment.