Skip to content

Commit

Permalink
Code review: 263430043: Clean up of output modules - step 5 log2timel…
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Dec 31, 2015
1 parent b1dc19e commit 07e927c
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 30 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ python-plaso (1.3.1-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <[email protected]> Tue, 29 Sep 2015 09:31:13 +0200
-- Log2Timeline <[email protected]> Tue, 29 Sep 2015 21:29:06 +0200
2 changes: 1 addition & 1 deletion plaso/cli/helpers/pstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def ParseOptions(cls, options, output_module):

file_path = getattr(options, u'write', None)
if file_path:
output_module.SetFilehandle(file_path=file_path)
output_module.SetFilePath(file_path)


manager.ArgumentHelperManager.RegisterHelper(PstorageOutputHelper)
4 changes: 3 additions & 1 deletion plaso/output/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from plaso.output import dynamic
from plaso.output import elastic
# TODO: renable after refactor as indicated in:
# https://github.com/log2timeline/plaso/issues/197
# from plaso.output import elastic
from plaso.output import json_line
from plaso.output import json_out
from plaso.output import l2t_csv
Expand Down
13 changes: 3 additions & 10 deletions plaso/output/pstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, output_mediator):
output_mediator: The output mediator object (instance of OutputMediator).
"""
super(PlasoStorageOutputModule, self).__init__(output_mediator)

self._file_object = None
self._storage = None

Expand All @@ -45,19 +44,13 @@ def Open(self):

self._storage = storage.StorageFile(self._file_object, pre_obj=pre_obj)

def SetFilehandle(self, file_path=None, file_object=None):
"""Sets the filehandle.
def SetFilePath(self, file_path):
"""Sets the file-like object based on the file path.
Args:
file_path: the full path to the output file.
file_object: a file like object to use for a filehandle.
"""
if file_object:
self._file_object = file_object
return

if file_path:
self._file_object = open(file_path, 'wb')
self._file_object = open(file_path, 'wb')

def WriteEventBody(self, event_object):
"""Writes the body of an event object to the output.
Expand Down
135 changes: 121 additions & 14 deletions tests/output/mediator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,133 @@

import unittest

from plaso.formatters import interface as formatters_interface
from plaso.formatters import manager as formatters_manager
from plaso.lib import event
from plaso.lib import eventdata
from plaso.lib import timelib
from plaso.output import mediator


class TestEvent(event.EventObject):
DATA_TYPE = 'test:mediator'

def __init__(self):
super(TestEvent, self).__init__()
self.timestamp = timelib.Timestamp.CopyFromString(u'2012-06-27 18:17:01')
self.timestamp_desc = eventdata.EventTimestamp.CHANGE_TIME
self.hostname = u'ubuntu'
self.filename = u'log/syslog.1'
self.text = (
u'Reporter <CRON> PID: 8442 (pam_unix(cron:session): session\n '
u'closed for user root)')
self.username = u'root'


class TestEventFormatter(formatters_interface.EventFormatter):
DATA_TYPE = 'test:mediator'
FORMAT_STRING = u'{text}'

SOURCE_SHORT = 'LOG'
SOURCE_LONG = 'Syslog'


class OutputMediatorTest(unittest.TestCase):
"""Tests for the output mediator object."""

def testInitialization(self):
"""Tests the initialization."""
output_mediator = mediator.OutputMediator(None, None)
self.assertNotEqual(output_mediator, None)

# TODO: add more tests:
# GetEventFormatter test.
# GetFormattedMessages test.
# GetFormattedSources test.
# GetFormatStringAttributeNames test.
# GetHostname test.
# GetMACBRepresentation test.
# GetStoredHostname test.
# GetUsername test.
def setUp(self):
"""Sets up the objects needed for this test."""
self._output_mediator = mediator.OutputMediator(None, None)

def testGetEventFormatter(self):
"""Tests the GetEventFormatter function."""
event_object = TestEvent()

formatters_manager.FormattersManager.RegisterFormatter(
TestEventFormatter)

event_formatter = self._output_mediator.GetEventFormatter(event_object)
self.assertIsInstance(event_formatter, TestEventFormatter)

formatters_manager.FormattersManager.DeregisterFormatter(
TestEventFormatter)

def testGetFormattedMessages(self):
"""Tests the GetFormattedMessages function."""
event_object = TestEvent()

formatters_manager.FormattersManager.RegisterFormatter(
TestEventFormatter)

expected_message = (
u'Reporter <CRON> PID: 8442'
u' (pam_unix(cron:session): session closed for user root)')

message, message_short = self._output_mediator.GetFormattedMessages(
event_object)
self.assertEqual(message, expected_message)
self.assertEqual(message_short, expected_message)

formatters_manager.FormattersManager.DeregisterFormatter(
TestEventFormatter)

def testGetFormattedSources(self):
"""Tests the GetFormattedSources function."""
event_object = TestEvent()

formatters_manager.FormattersManager.RegisterFormatter(
TestEventFormatter)

source_short, source = self._output_mediator.GetFormattedSources(
event_object)
self.assertEqual(source, u'Syslog')
self.assertEqual(source_short, u'LOG')

formatters_manager.FormattersManager.DeregisterFormatter(
TestEventFormatter)

def testGetFormatStringAttributeNames(self):
"""Tests the GetFormatStringAttributeNames function."""
event_object = TestEvent()

formatters_manager.FormattersManager.RegisterFormatter(
TestEventFormatter)

expected_attribute_names = set([u'text'])

attribute_names = self._output_mediator.GetFormatStringAttributeNames(
event_object)
self.assertEqual(attribute_names, expected_attribute_names)

formatters_manager.FormattersManager.DeregisterFormatter(
TestEventFormatter)

def testGetHostname(self):
"""Tests the GetHostname function."""
event_object = TestEvent()

hostname = self._output_mediator.GetHostname(event_object)
self.assertEqual(hostname, u'ubuntu')

def testGetMACBRepresentation(self):
"""Tests the GetMACBRepresentation function."""
event_object = TestEvent()

macb_representation = self._output_mediator.GetMACBRepresentation(
event_object)
self.assertEqual(macb_representation, u'..C.')

def testGetStoredHostname(self):
"""Tests the GetStoredHostname function."""
stored_hostname = self._output_mediator.GetStoredHostname()
self.assertIsNone(stored_hostname)

def testGetUsername(self):
"""Tests the GetUsername function."""
event_object = TestEvent()

username = self._output_mediator.GetUsername(event_object)
self.assertEqual(username, u'root')


if __name__ == '__main__':
Expand Down
6 changes: 3 additions & 3 deletions tests/output/pstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def testOutput(self):
# Copy events to pstorage dump.
with storage.StorageFile(self.test_filename, read_only=True) as store:
output_mediator = self._CreateOutputMediator(storage_object=store)
formatter = pstorage.PlasoStorageOutputModule(output_mediator)
formatter.SetFilehandle(storage_file)
output_module = pstorage.PlasoStorageOutputModule(output_mediator)
output_module.SetFilePath(storage_file)

with interface.EventBuffer(
formatter, check_dedups=False) as output_buffer:
output_module, check_dedups=False) as output_buffer:
event_object = store.GetSortedEntry()
while event_object:
output_buffer.Append(event_object)
Expand Down

0 comments on commit 07e927c

Please sign in to comment.