From 4ec808bb4683e17bc915fe062fe6562d01a45036 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Thu, 1 Oct 2015 21:45:48 +0200 Subject: [PATCH] Code review: 268880043: Separated table view from CLI tool #279. --- config/dpkg/changelog | 2 +- docs/plaso.analysis.rst | 8 +++ docs/plaso.cli.rst | 8 +++ plaso/__init__.py | 2 +- plaso/analysis/chrome_extension.py | 6 +- plaso/analysis/definitions.py | 16 +++++ plaso/analysis/interface.py | 10 +--- plaso/analysis/manager.py | 32 +++++++--- plaso/analysis/windows_services.py | 3 +- plaso/cli/tools.py | 72 ++++------------------ plaso/cli/views.py | 94 +++++++++++++++++++++++++++++ tests/cli/tools.py | 81 ------------------------- tests/cli/views.py | 95 ++++++++++++++++++++++++++++++ tools/log2timeline.py | 33 ++++++----- tools/preg.py | 74 +++++++++++++++-------- tools/preg_test.py | 65 ++++++++++++++++++-- tools/psort.py | 60 +++++++++---------- 17 files changed, 419 insertions(+), 242 deletions(-) create mode 100644 plaso/analysis/definitions.py create mode 100644 plaso/cli/views.py create mode 100644 tests/cli/views.py diff --git a/config/dpkg/changelog b/config/dpkg/changelog index d80f08dbb7..418e6c65ba 100644 --- a/config/dpkg/changelog +++ b/config/dpkg/changelog @@ -2,4 +2,4 @@ python-plaso (1.3.1-1) unstable; urgency=low * Auto-generated - -- Log2Timeline Tue, 29 Sep 2015 22:18:57 +0200 + -- Log2Timeline Thu, 01 Oct 2015 21:45:47 +0200 diff --git a/docs/plaso.analysis.rst b/docs/plaso.analysis.rst index cea7d4fcf0..ea03ae5b26 100644 --- a/docs/plaso.analysis.rst +++ b/docs/plaso.analysis.rst @@ -20,6 +20,14 @@ plaso.analysis.chrome_extension module :undoc-members: :show-inheritance: +plaso.analysis.definitions module +--------------------------------- + +.. automodule:: plaso.analysis.definitions + :members: + :undoc-members: + :show-inheritance: + plaso.analysis.file_hashes module --------------------------------- diff --git a/docs/plaso.cli.rst b/docs/plaso.cli.rst index 5cec4aab5a..2f3f6374d4 100644 --- a/docs/plaso.cli.rst +++ b/docs/plaso.cli.rst @@ -51,6 +51,14 @@ plaso.cli.tools module :undoc-members: :show-inheritance: +plaso.cli.views module +---------------------- + +.. automodule:: plaso.cli.views + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/plaso/__init__.py b/plaso/__init__.py index f0872c7bc4..183ac49e0a 100644 --- a/plaso/__init__.py +++ b/plaso/__init__.py @@ -3,7 +3,7 @@ __version__ = '1.3.1' VERSION_DEV = True -VERSION_DATE = '20150929' +VERSION_DATE = '20151001' def GetVersion(): diff --git a/plaso/analysis/chrome_extension.py b/plaso/analysis/chrome_extension.py index 509cfdc370..dde20ab265 100644 --- a/plaso/analysis/chrome_extension.py +++ b/plaso/analysis/chrome_extension.py @@ -30,17 +30,15 @@ def __init__(self, incoming_queue): """ super(ChromeExtensionPlugin, self).__init__(incoming_queue) + # Saved list of already looked up extensions. + self._extensions = {} self._results = {} - self.plugin_type = self.TYPE_REPORT # TODO: see if these can be moved to arguments passed to ExamineEvent # or some kind of state object. self._sep = None self._user_paths = None - # Saved list of already looked up extensions. - self._extensions = {} - def _GetChromeWebStorePage(self, extension_id): """Retrieves the page for the extension from the Chrome store website. diff --git a/plaso/analysis/definitions.py b/plaso/analysis/definitions.py new file mode 100644 index 0000000000..d6af1ae8ce --- /dev/null +++ b/plaso/analysis/definitions.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +"""This file contains the definitions for analysis plugins.""" + +# All the possible analysis plugin types. + +# Plugin that detects anomalies. +PLUGIN_TYPE_ANOMALY = 1 + +# Plugin that calculating statistical information. +PLUGIN_TYPE_STATISTICS = 2 + +# Plugin that annotates (or tags) event objects. +PLUGIN_TYPE_ANNOTATION = 3 + +# Pluging that provides summary information. +PLUGIN_TYPE_REPORT = 4 diff --git a/plaso/analysis/interface.py b/plaso/analysis/interface.py index 3b1e8efcd6..2bb45ee952 100644 --- a/plaso/analysis/interface.py +++ b/plaso/analysis/interface.py @@ -19,6 +19,7 @@ from collections import defaultdict +from plaso.analysis import definitions from plaso.engine import queue from plaso.lib import timelib from plaso.lib import errors @@ -47,13 +48,6 @@ class AnalysisPlugin(queue.ItemQueueConsumer): # should be able to run during the extraction phase. ENABLE_IN_EXTRACTION = False - # All the possible report types. - TYPE_ANOMALY = 1 # Plugin that is inspecting events for anomalies. - TYPE_STATISTICS = 2 # Statistical calculations. - TYPE_ANNOTATION = 3 # Inspecting events with the primary purpose of - # annotating or tagging them. - TYPE_REPORT = 4 # Inspecting events to provide a summary information. - # A flag to indicate that this plugin takes a long time to compile a report. LONG_RUNNING_PLUGIN = False @@ -64,7 +58,7 @@ def __init__(self, incoming_queue): incoming_queue: A queue that is used to listen to incoming events. """ super(AnalysisPlugin, self).__init__(incoming_queue) - self.plugin_type = self.TYPE_REPORT + self.plugin_type = definitions.PLUGIN_TYPE_REPORT def _ConsumeItem(self, item, analysis_mediator=None, **kwargs): """Consumes an item callback for ConsumeItems. diff --git a/plaso/analysis/manager.py b/plaso/analysis/manager.py index 8fae0e16f2..d8aaed8c76 100644 --- a/plaso/analysis/manager.py +++ b/plaso/analysis/manager.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """This file contains the analysis plugin manager class.""" +from plaso.analysis import definitions from plaso.lib import errors @@ -9,6 +10,18 @@ class AnalysisPluginManager(object): _plugin_classes = {} + _PLUGIN_TYPE_STRINGS = { + definitions.PLUGIN_TYPE_ANNOTATION: ( + u'Annotation/Tagging plugin'), + definitions.PLUGIN_TYPE_ANOMALY: ( + u'Anomaly plugin'), + definitions.PLUGIN_TYPE_REPORT: ( + u'Summary/Report plugin'), + definitions.PLUGIN_TYPE_STATISTICS: ( + u'Statistics plugin') + } + _PLUGIN_TYPE_STRINGS.setdefault(u'Unknown type') + @classmethod def DeregisterPlugin(cls, plugin_class): """Deregisters an analysis plugin class. @@ -48,17 +61,20 @@ def GetAllPluginInformation(cls, show_all=True): plugin names. The default is True. Returns: - A sorted list of tuples containing the name, docstring and type of each - analysis plugin. + A sorted list of tuples containing the name, docstring and type string + of each analysis plugin. """ results = [] - for cls_obj in cls._plugin_classes.itervalues(): - # TODO: Use a specific description variable, not the docstring. - doc_string, _, _ = cls_obj.__doc__.partition(u'\n') + for plugin_class in iter(cls._plugin_classes.values()): + plugin_object = plugin_class(None) + if not show_all and not plugin_class.ENABLE_IN_EXTRACTION: + continue - obj = cls_obj(None) - if show_all or cls_obj.ENABLE_IN_EXTRACTION: - results.append((obj.plugin_name, doc_string, obj.plugin_type)) + # TODO: Use a specific description variable, not the docstring. + doc_string, _, _ = plugin_class.__doc__.partition(u'\n') + type_string = cls._PLUGIN_TYPE_STRINGS.get(plugin_object.plugin_type) + information_tuple = (plugin_object.plugin_name, doc_string, type_string) + results.append(information_tuple) return sorted(results) diff --git a/plaso/analysis/windows_services.py b/plaso/analysis/windows_services.py index 05d9e70069..2028d8d76a 100644 --- a/plaso/analysis/windows_services.py +++ b/plaso/analysis/windows_services.py @@ -180,9 +180,8 @@ def __init__(self, incoming_queue): incoming_queue: A queue to read events from. """ super(WindowsServicesPlugin, self).__init__(incoming_queue) - self._service_collection = WindowsServiceCollection() - self.plugin_type = interface.AnalysisPlugin.TYPE_REPORT self._output_format = u'text' + self._service_collection = WindowsServiceCollection() def ExamineEvent(self, analysis_mediator, event_object, **kwargs): """Analyzes an event_object and creates Windows Services as required. diff --git a/plaso/cli/tools.py b/plaso/cli/tools.py index 729862d313..e68db66d7d 100644 --- a/plaso/cli/tools.py +++ b/plaso/cli/tools.py @@ -10,6 +10,7 @@ import plaso from plaso.lib import errors +from plaso.cli import views import pytz @@ -36,10 +37,10 @@ def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: - input_reader: the input reader (instance of InputReader). + input_reader: optional input reader (instance of InputReader). The default is None which indicates the use of the stdin input reader. - output_writer: the output writer (instance of OutputWriter). + output_writer: optional output writer (instance of OutputWriter). The default is None which indicates the use of the stdout output writer. """ @@ -242,7 +243,6 @@ def AddTimezoneOption(self, argument_group): def ListTimeZones(self): """Lists the timezones.""" - self.PrintHeader(u'Zones') max_length = 0 for timezone_name in pytz.all_timezones: if len(timezone_name) > max_length: @@ -250,7 +250,11 @@ def ListTimeZones(self): utc_date_time = datetime.datetime.utcnow() - self.PrintColumnValue(u'Timezone', u'UTC Offset', column_width=max_length) + table_view = views.CLITableView( + self._output_writer, column_width=max_length) + + table_view.PrintHeader(u'Zones') + table_view.PrintRow(u'Timezone', u'UTC Offset') for timezone_name in pytz.all_timezones: local_timezone = pytz.timezone(timezone_name) @@ -263,9 +267,9 @@ def ListTimeZones(self): _, _, diff = local_date_string.rpartition(u'-') diff_string = u'-{0:s}'.format(diff) - self.PrintColumnValue(timezone_name, diff_string, column_width=max_length) + table_view.PrintRow(timezone_name, diff_string) - self.PrintSeparatorLine() + table_view.PrintFooter() def ParseOptions(self, options): """Parses tool specific options. @@ -275,62 +279,6 @@ def ParseOptions(self, options): """ self._ParseInformationalOptions(options) - def PrintColumnValue(self, name, description, column_width=25): - """Prints a value with a name and description aligned to the column width. - - Args: - name: the name. - description: the description. - column_width: optional column width. The default is 25. - """ - line_length = self._LINE_LENGTH - column_width - 3 - - # The format string of the first line of the column value. - primary_format_string = u'{{0:>{0:d}s}} : {{1:s}}\n'.format(column_width) - - # The format string of successive lines of the column value. - secondary_format_string = u'{{0:<{0:d}s}}{{1:s}}\n'.format( - column_width + 3) - - if len(description) < line_length: - self._output_writer.Write(primary_format_string.format(name, description)) - return - - # Split the description in words. - words = description.split() - - current = 0 - - lines = [] - word_buffer = [] - for word in words: - current += len(word) + 1 - if current >= line_length: - current = len(word) - lines.append(u' '.join(word_buffer)) - word_buffer = [word] - else: - word_buffer.append(word) - lines.append(u' '.join(word_buffer)) - - # Print the column value on multiple lines. - self._output_writer.Write(primary_format_string.format(name, lines[0])) - for line in lines[1:]: - self._output_writer.Write(secondary_format_string.format(u'', line)) - - def PrintHeader(self, text, character=u'*'): - """Prints the header as a line with centered text. - - Args: - text: The header text. - character: Optional header line character. The default is '*'. - """ - self._output_writer.Write(u'\n') - - format_string = u'{{0:{0:s}^{1:d}}}\n'.format(character, self._LINE_LENGTH) - header_string = format_string.format(u' {0:s} '.format(text)) - self._output_writer.Write(header_string) - def PrintSeparatorLine(self): """Prints a separator line.""" self._output_writer.Write(u'-' * self._LINE_LENGTH) diff --git a/plaso/cli/views.py b/plaso/cli/views.py new file mode 100644 index 0000000000..12c0e8fa7e --- /dev/null +++ b/plaso/cli/views.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +"""The CLI view classes.""" + + +class CLITableView(object): + """Class that implements a 2 column command line table view.""" + + # The maximum width of the table in number of characters. + # The standard width of Windows cmd.exe is 80 characters. + _MAXIMUM_WIDTH = 80 + + _HEADER_FORMAT_STRING = u'{{0:*^{0:d}}}\n'.format(_MAXIMUM_WIDTH) + + def __init__(self, output_writer, column_width=25): + """Initializes the command line table view object. + + Args: + output_writer: the output writer (instance of OutputWriter). + The default is None which indicates the use of the stdout + output writer. + column_width: optional column width, which cannot be smaller than 0 or + larger than the maximum width. + + Raises: + ValueError: if the column width is out of bounds. + """ + if column_width < 0 or column_width > self._MAXIMUM_WIDTH: + raise ValueError(u'Column width value out of bounds.') + + super(CLITableView, self).__init__() + self._column_width = column_width + self._output_writer = output_writer + + def PrintFooter(self): + """Prints the footer.""" + self._output_writer.Write(u'-' * self._MAXIMUM_WIDTH) + self._output_writer.Write(u'\n') + + def PrintHeader(self, text): + """Prints the header as a line with centered text. + + Args: + text: The header text. + """ + self._output_writer.Write(u'\n') + + text = u' {0:s} '.format(text) + header_string = self._HEADER_FORMAT_STRING.format(text) + self._output_writer.Write(header_string) + + def PrintRow(self, first_column, second_column): + """Prints a row of 2 column values aligned to the column width. + + Args: + first_column: the first column value. + second_column: the second column value. + """ + maximum_row_width = self._MAXIMUM_WIDTH - self._column_width - 3 + + # The format string of the first line of the column value. + primary_format_string = u'{{0:>{0:d}s}} : {{1:s}}\n'.format( + self._column_width) + + # The format string of successive lines of the column value. + secondary_format_string = u'{{0:<{0:d}s}}{{1:s}}\n'.format( + self._column_width + 3) + + if len(second_column) < maximum_row_width: + self._output_writer.Write(primary_format_string.format( + first_column, second_column)) + return + + # Split the column value in words. + words = second_column.split() + + current = 0 + + lines = [] + word_buffer = [] + for word in words: + current += len(word) + 1 + if current >= maximum_row_width: + current = len(word) + lines.append(u' '.join(word_buffer)) + word_buffer = [word] + else: + word_buffer.append(word) + lines.append(u' '.join(word_buffer)) + + # Print the column value on multiple lines. + self._output_writer.Write(primary_format_string.format( + first_column, lines[0])) + for line in lines[1:]: + self._output_writer.Write(secondary_format_string.format(u'', line)) diff --git a/tests/cli/tools.py b/tests/cli/tools.py index 7e463baeda..5869439457 100644 --- a/tests/cli/tools.py +++ b/tests/cli/tools.py @@ -110,87 +110,6 @@ def testAddTimezoneOption(self): output = argument_parser.format_help() self.assertEqual(output, self._EXPECTED_TIMEZONE_OPTION) - def testPrintColumnValue(self): - """Tests the PrintColumnValue function.""" - output_writer = test_lib.TestOutputWriter() - cli_tool = tools.CLITool(output_writer=output_writer) - - cli_tool.PrintColumnValue(u'Name', u'Description') - string = output_writer.ReadOutput() - expected_string = b' Name : Description\n' - self.assertEqual(string, expected_string) - - cli_tool.PrintColumnValue(u'Name', u'Description', column_width=10) - string = output_writer.ReadOutput() - expected_string = b' Name : Description\n' - self.assertEqual(string, expected_string) - - with self.assertRaises(ValueError): - cli_tool.PrintColumnValue(u'Name', u'Description', column_width=-10) - - # TODO: determine if this is the desired behavior. - cli_tool.PrintColumnValue(u'Name', u'Description', column_width=100) - string = output_writer.ReadOutput() - expected_string = ( - b' ' - b' Name : \n' - b' ' - b' Description\n') - self.assertEqual(string, expected_string) - - def testPrintHeader(self): - """Tests the PrintHeader function.""" - output_writer = test_lib.TestOutputWriter() - cli_tool = tools.CLITool(output_writer=output_writer) - - cli_tool.PrintHeader(u'Text') - string = output_writer.ReadOutput() - expected_string = ( - b'\n' - b'************************************* ' - b'Text ' - b'*************************************\n') - self.assertEqual(string, expected_string) - - cli_tool.PrintHeader(u'Another Text', character=u'x') - string = output_writer.ReadOutput() - expected_string = ( - b'\n' - b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx ' - b'Another Text ' - b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n') - self.assertEqual(string, expected_string) - - # TODO: determine if this is the desired behavior. - cli_tool.PrintHeader(u'') - string = output_writer.ReadOutput() - expected_string = ( - b'\n' - b'*************************************** ' - b' ' - b'***************************************\n') - self.assertEqual(string, expected_string) - - # TODO: determine if this is the desired behavior. - cli_tool.PrintHeader(None) - string = output_writer.ReadOutput() - expected_string = ( - b'\n' - b'************************************* ' - b'None ' - b'*************************************\n') - self.assertEqual(string, expected_string) - - # TODO: determine if this is the desired behavior. - expected_string = ( - u'\n ' - u'In computer programming, a string is traditionally a sequence ' - u'of characters, either as a literal constant or as some kind of ' - u'variable. \n') - cli_tool.PrintHeader(expected_string[2:-2]) - string = output_writer.ReadOutput() - self.assertEqual(string, expected_string) - def testPrintSeparatorLine(self): """Tests the PrintSeparatorLine function.""" output_writer = test_lib.TestOutputWriter() diff --git a/tests/cli/views.py b/tests/cli/views.py new file mode 100644 index 0000000000..98fc4ec01a --- /dev/null +++ b/tests/cli/views.py @@ -0,0 +1,95 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the CLI view classes.""" + +import unittest + +from plaso.cli import views + +from tests.cli import test_lib + + +class CLITableViewTests(unittest.TestCase): + """Tests for the 2 column command line table view class.""" + + def testPrintFooter(self): + """Tests the PrintFooter function.""" + output_writer = test_lib.TestOutputWriter() + table_view = views.CLITableView(output_writer) + + table_view.PrintFooter() + string = output_writer.ReadOutput() + expected_string = ( + b'----------------------------------------' + b'----------------------------------------\n') + self.assertEqual(string, expected_string) + + def testPrintHeader(self): + """Tests the PrintHeader function.""" + output_writer = test_lib.TestOutputWriter() + table_view = views.CLITableView(output_writer) + + table_view.PrintHeader(u'Text') + string = output_writer.ReadOutput() + expected_string = ( + b'\n' + b'************************************* ' + b'Text ' + b'*************************************\n') + self.assertEqual(string, expected_string) + + # TODO: determine if this is the desired behavior. + table_view.PrintHeader(u'') + string = output_writer.ReadOutput() + expected_string = ( + b'\n' + b'*************************************** ' + b' ' + b'***************************************\n') + self.assertEqual(string, expected_string) + + # TODO: determine if this is the desired behavior. + table_view.PrintHeader(None) + string = output_writer.ReadOutput() + expected_string = ( + b'\n' + b'************************************* ' + b'None ' + b'*************************************\n') + self.assertEqual(string, expected_string) + + # TODO: determine if this is the desired behavior. + expected_string = ( + u'\n ' + u'In computer programming, a string is traditionally a sequence ' + u'of characters, either as a literal constant or as some kind of ' + u'variable. \n') + table_view.PrintHeader(expected_string[2:-2]) + string = output_writer.ReadOutput() + self.assertEqual(string, expected_string) + + def testPrintRow(self): + """Tests the PrintRow function.""" + output_writer = test_lib.TestOutputWriter() + table_view = views.CLITableView(output_writer) + + table_view.PrintRow(u'Name', u'Description') + string = output_writer.ReadOutput() + expected_string = b' Name : Description\n' + self.assertEqual(string, expected_string) + + table_view = views.CLITableView(output_writer, column_width=10) + table_view.PrintRow(u'Name', u'Description') + string = output_writer.ReadOutput() + expected_string = b' Name : Description\n' + self.assertEqual(string, expected_string) + + with self.assertRaises(ValueError): + table_view = views.CLITableView(output_writer, column_width=-10) + + with self.assertRaises(ValueError): + table_view = views.CLITableView(output_writer, column_width=100) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/log2timeline.py b/tools/log2timeline.py index 97f1b75dd8..3d21db5b5b 100755 --- a/tools/log2timeline.py +++ b/tools/log2timeline.py @@ -20,6 +20,7 @@ import plaso from plaso.cli import extraction_tool from plaso.cli import tools as cli_tools +from plaso.cli import views as cli_views from plaso.frontend import log2timeline from plaso.lib import definitions from plaso.lib import errors @@ -347,31 +348,34 @@ def AddProcessingOptions(self, argument_group): def ListHashers(self): """Lists information about the available hashers.""" + table_view = cli_views.CLITableView(self._output_writer) + table_view.PrintHeader(u'Hashers') hashers_information = self._front_end.GetHashersInformation() - self.PrintHeader(u'Hashers') for name, description in sorted(hashers_information): - self.PrintColumnValue(name, description) + table_view.PrintRow(name, description) - self._output_writer.Write(u'\n') + table_view.PrintFooter() def ListParsersAndPlugins(self): """Lists information about the available parsers and plugins.""" + table_view = cli_views.CLITableView(self._output_writer) + + table_view.PrintHeader(u'Parsers') parsers_information = self._front_end.GetParsersInformation() - self.PrintHeader(u'Parsers') for name, description in sorted(parsers_information): - self.PrintColumnValue(name, description) + table_view.PrintRow(name, description) + table_view.PrintHeader(u'Parser Plugins') plugins_information = self._front_end.GetParserPluginsInformation() - self.PrintHeader(u'Parser Plugins') for name, description in sorted(plugins_information): - self.PrintColumnValue(name, description) + table_view.PrintRow(name, description) presets_information = self._front_end.GetParserPresetsInformation() - self.PrintHeader(u'Parsers Presets') + table_view.PrintHeader(u'Parsers Presets') for name, description in sorted(presets_information): - self.PrintColumnValue(name, description) + table_view.PrintRow(name, description) - self._output_writer.Write(u'\n') + table_view.PrintFooter() def ParseArguments(self): """Parses the command line arguments. @@ -638,16 +642,17 @@ def ProcessSources(self): def ShowInfo(self): """Shows information about available hashers, parsers, plugins, etc.""" - plugin_list = self._front_end.GetPluginData() self._output_writer.Write( u'{0:=^80s}\n'.format(u' log2timeline/plaso information ')) + table_view = cli_views.CLITableView(self._output_writer) + plugin_list = self._front_end.GetPluginData() for header, data in plugin_list.items(): - self.PrintHeader(header) + table_view.PrintHeader(header) for entry_header, entry_data in sorted(data): - self.PrintColumnValue(entry_header, entry_data) + table_view.PrintRow(entry_header, entry_data) - self._output_writer.Write(u'\n') + table_view.PrintFooter() def Main(): diff --git a/tools/preg.py b/tools/preg.py index f9d64a69b6..ad3460ce35 100755 --- a/tools/preg.py +++ b/tools/preg.py @@ -41,6 +41,7 @@ from plaso.cli import hexdump from plaso.cli import storage_media_tool from plaso.cli import tools as cli_tools +from plaso.cli import views as cli_views from plaso.engine import knowledge_base from plaso.frontend import preg from plaso.lib import errors @@ -362,7 +363,7 @@ def _PrintEventBody(self, event_object, file_entry=None, show_hex=False): event_object.pathspec = file_entry.path_spec hexadecimal_output = self._GetEventDataHexDump(event_object) - self.PrintHeader(u'Hexadecimal output from event.', u'-') + self.PrintHeader(u'Hexadecimal output from event.', character=u'-') self._output_writer.Write(hexadecimal_output) self._output_writer.Write(u'\n') @@ -396,7 +397,7 @@ def _PrintEventHeader(self, event_object, descriptions, exclude_timestamp): u'Description', event_object.timestamp_desc)) self._output_writer.Write(u'\n') - self.PrintHeader(u'Data', u'+') + self.PrintHeader(u'Data', character=u'+') def _PrintEventObjectsBasedOnTime( self, event_objects, file_entry, show_hex=False): @@ -451,7 +452,7 @@ def _PrintParsedRegistryFile(self, parsed_data, registry_helper): parsed_data: dict object returned from ParseRegisterFile. registry_helper: Registry file object (instance of PregRegistryHelper). """ - self.PrintHeader(u'Registry File', u'x') + self.PrintHeader(u'Registry File', character=u'x') self._output_writer.Write(u'\n') self._output_writer.Write( u'{0:>15} : {1:s}\n'.format(u'Registry file', registry_helper.path)) @@ -533,6 +534,19 @@ def _ScanFileSystem(self, path_resolver): return result + def PrintHeader(self, text, character=u'*'): + """Prints the header as a line with centered text. + + Args: + text: The header text. + character: Optional header line character. The default is '*'. + """ + self._output_writer.Write(u'\n') + + format_string = u'{{0:{0:s}^{1:d}}}\n'.format(character, self._LINE_LENGTH) + header_string = format_string.format(u' {0:s} '.format(text)) + self._output_writer.Write(header_string) + def PrintParsedRegistryKey(self, key_data, file_entry=None, show_hex=False): """Write extracted data returned from ParseRegistryKey to an output writer. @@ -544,8 +558,9 @@ def PrintParsedRegistryKey(self, key_data, file_entry=None, show_hex=False): of the event should be included in the output. The default is False. """ - self.PrintHeader(u'Plugins', u'-') + self.PrintHeader(u'Plugins', character=u'-') for plugin, event_objects in iter(key_data.items()): + # TODO: make this a table view. self.PrintHeader(u'Plugin: {0:s}'.format(plugin.plugin_name)) self._output_writer.Write(u'[{0:s}] {1:s}\n'.format( plugin.REG_TYPE, plugin.DESCRIPTION)) @@ -609,13 +624,12 @@ def GetWindowsVolumeIdentifiers(self, scan_node, volume_identifiers): def ListPluginInformation(self): """Lists Registry plugin information.""" + table_view = cli_views.CLITableView(self._output_writer) + table_view.PrintHeader(u'Supported Plugins') plugin_list = self._front_end.registry_plugin_list - - self.PrintHeader(u'Supported Plugins', u'=') for plugin_class in plugin_list.GetAllPlugins(): - self.PrintColumnValue(plugin_class.NAME, plugin_class.DESCRIPTION) - - self._output_writer.Write(u'\n') + table_view.PrintRow(plugin_class.NAME, plugin_class.DESCRIPTION) + table_view.PrintFooter() def ParseArguments(self): """Parses the command line arguments. @@ -1131,14 +1145,16 @@ def ParseCurrentKey(self, line): # Print out a hexadecimal representation of all binary values. if verbose: + table_view = cli_views.CLITableView(self.output_writer) header_shown = False for value in current_helper.GetCurrentRegistryKey().GetValues(): if value.DataIsBinaryData(): if not header_shown: + table_view.PrintHeader(u'Hexadecimal representation') header_shown = True - self.console.preg_tool.PrintHeader(u'Hexadecimal representation') + self.console.preg_tool.PrintSeparatorLine() - self.console.preg_tool.PrintColumnValue(u'Attribute', value.name) + table_view.PrintRow(u'Attribute', value.name) self.console.preg_tool.PrintSeparatorLine() value_string = hexdump.Hexdump.FormatData(value.data) @@ -1147,6 +1163,25 @@ def ParseCurrentKey(self, line): self.output_writer.Write(u'+-'*40) self.output_writer.Write(u'\n') + def _PrintPluginHelp(self, plugin_object): + """Prints the help information of a plugin. + + Args: + plugin_object: a Windows Registry plugin object (instance of + WindowsRegistryPlugin). + """ + table_view = cli_views.CLITableView(self.output_writer) + table_view.PrintHeader(plugin_object.NAME) + + # TODO: replace __doc__ by DESCRIPTION. + description = plugin_object.__doc__ + table_view.PrintRow(u'Description', description) + self.output_writer.Write(u'\n') + + for registry_key in plugin_object.expanded_keys: + table_view.PrintRow(u'Registry Key', registry_key) + table_view.PrintFooter() + @magic.line_magic(u'plugin') def ParseWithPlugin(self, line): """Parse a Registry key using a specific plugin.""" @@ -1189,13 +1224,7 @@ def ParseWithPlugin(self, line): return if u'-h' in line: - self.console.preg_tool.PrintHeader(plugin_name) - # TODO: replace __doc__ by DESCRIPTION. - self.console.preg_tool.PrintColumnValue( - u'Description', plugin_object.__doc__) - self.output_writer.Write(u'\n') - for registry_key in plugin_object.expanded_keys: - self.console.preg_tool.PrintColumnValue(u'Registry Key', registry_key) + self._PrintPluginHelp(plugin_object) return if not plugin_object.expanded_keys: @@ -1392,7 +1421,8 @@ def IsLoaded(self): def PrintBanner(self): """Writes a banner to the output writer.""" - self.preg_tool.PrintHeader( + table_view = cli_views.CLITableView(self._output_writer, column_width=23) + table_view.PrintHeader( u'Welcome to PREG - home of the Plaso Windows Registry Parsing.') self._output_writer.Write(u'\n') @@ -1401,10 +1431,8 @@ def PrintBanner(self): self._output_writer.Write(u'\n') for function_name, description in self._BASE_FUNCTIONS: - self.preg_tool.PrintColumnValue( - function_name, description, column_width=23) - - self._output_writer.Write(u'\n') + table_view.PrintRow(function_name, description) + table_view.PrintFooter() if len(self._registry_helpers) == 1: self.LoadRegistryFile(0) diff --git a/tools/preg_test.py b/tools/preg_test.py index a8ab2ec7a0..6f7fc776a3 100644 --- a/tools/preg_test.py +++ b/tools/preg_test.py @@ -77,11 +77,61 @@ def testListPluginInformation(self): output = self._output_writer.ReadOutput() # TODO: refactor to more accurate way to test this. - self.assertIn(b'= Supported Plugins =', output) + self.assertIn(b'* Supported Plugins *', output) self.assertIn(b'userassist : Parser for User Assist Registry data', output) self.assertIn( b'services : Parser for services and drivers Registry', output) + def testPrintHeader(self): + """Tests the PrintHeader function.""" + self._test_tool.PrintHeader(u'Text') + string = self._output_writer.ReadOutput() + expected_string = ( + b'\n' + b'************************************* ' + b'Text ' + b'*************************************\n') + self.assertEqual(string, expected_string) + + self._test_tool.PrintHeader(u'Another Text', character=u'x') + string = self._output_writer.ReadOutput() + expected_string = ( + b'\n' + b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx ' + b'Another Text ' + b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n') + self.assertEqual(string, expected_string) + + # TODO: determine if this is the desired behavior. + self._test_tool.PrintHeader(u'') + string = self._output_writer.ReadOutput() + expected_string = ( + b'\n' + b'*************************************** ' + b' ' + b'***************************************\n') + self.assertEqual(string, expected_string) + + # TODO: determine if this is the desired behavior. + self._test_tool.PrintHeader(None) + string = self._output_writer.ReadOutput() + expected_string = ( + b'\n' + b'************************************* ' + b'None ' + b'*************************************\n') + self.assertEqual(string, expected_string) + + # TODO: determine if this is the desired behavior. + expected_string = ( + u'\n ' + u'In computer programming, a string is traditionally a sequence ' + u'of characters, either as a literal constant or as some kind of ' + u'variable. \n') + self._test_tool.PrintHeader(expected_string[2:-2]) + string = self._output_writer.ReadOutput() + self.assertEqual(string, expected_string) + def testRunModeRegistryPlugin(self): """Tests the RunModeRegistryPlugin function.""" options = cli_test_lib.TestOptions() @@ -166,7 +216,7 @@ def testRunModeRegistryFile(self): class PregConsoleTest(test_lib.ToolTestCase): """Tests for the preg console.""" - _EXPECTED_BANNER_HEADER = ( + _EXPECTED_BANNER_HEADER = [ (u'******** Welcome to PREG - home of the Plaso Windows Registry ' u'Parsing. *********'), u'', @@ -190,7 +240,9 @@ class PregConsoleTest(test_lib.ToolTestCase): (u'get_value_data value_name : Get a value data from a value stored in ' u'the'), u' currently loaded Registry key.', - u' get_key : Return the currently loaded Registry key.') + u' get_key : Return the currently loaded Registry key.', + (u'---------------------------------------------------------------------' + u'-----------')] _EXPECTED_BANNER_FOOTER = u'Happy command line console fu-ing.' @@ -229,10 +281,11 @@ def testPrintBanner(self): self._test_console.PrintBanner() extra_text = ( - u'Opening hive: {0:s} [OS]\nRegistry file: NTUSER.DAT [{0:s}] is ' - u'available and loaded.\n').format(self._file_path) + u'Opening hive: {0:s} [OS]\n' + u'Registry file: NTUSER.DAT [{0:s}] is available and ' + u'loaded.\n').format(self._file_path) - expected_banner = u'\n{0:s}\n\n{1:s}\n{2:s}'.format( + expected_banner = u'\n{0:s}\n{1:s}\n{2:s}'.format( u'\n'.join(self._EXPECTED_BANNER_HEADER), extra_text, self._EXPECTED_BANNER_FOOTER) banner = output_writer.ReadOutput() diff --git a/tools/psort.py b/tools/psort.py index 426be823b5..267479fb42 100755 --- a/tools/psort.py +++ b/tools/psort.py @@ -17,10 +17,9 @@ # The following import makes sure the filters are registered. from plaso import filters # pylint: disable=unused-import -# TODO: remove after psort options refactor. -from plaso.analysis import interface as analysis_interface from plaso.cli import analysis_tool from plaso.cli import tools as cli_tools +from plaso.cli import views as cli_views from plaso.cli.helpers import manager as helpers_manager from plaso.filters import manager as filters_manager from plaso.frontend import psort @@ -249,9 +248,11 @@ def _ProcessStorage(self): time_slice=time_slice, use_time_slicer=self._use_time_slicer) if not self._quiet_mode: - self.PrintHeader(u'Counter') + table_view = cli_views.CLITableView(self._output_writer) + table_view.PrintHeader(u'Counter') for element, count in counter.most_common(): - self.PrintColumnValue(element, u'{0:d}'.format(count)) + table_view.PrintRow(element, u'{0:d}'.format(count)) + table_view.PrintFooter() def _PromptUserForInput(self, input_text): """Prompts user for an input and return back read data. @@ -365,54 +366,49 @@ def AddOutputModuleOptions(self, argument_group, module_names): def ListAnalysisPlugins(self): """Lists the analysis modules.""" - self.PrintHeader(u'Analysis Plugins') - format_length = 10 analysis_plugin_info = self._front_end.GetAnalysisPluginInfo() + column_width = 10 for name, _, _ in analysis_plugin_info: - if len(name) > format_length: - format_length = len(name) - - # TODO: refactor to use type object (class) and add GetTypeString method. - for name, description, plugin_type in analysis_plugin_info: - if plugin_type == analysis_interface.AnalysisPlugin.TYPE_ANNOTATION: - type_string = u'Annotation/tagging plugin' - elif plugin_type == analysis_interface.AnalysisPlugin.TYPE_ANOMALY: - type_string = u'Anomaly plugin' - elif plugin_type == analysis_interface.AnalysisPlugin.TYPE_REPORT: - type_string = u'Summary/Report plugin' - elif plugin_type == analysis_interface.AnalysisPlugin.TYPE_STATISTICS: - type_string = u'Statistics plugin' - else: - type_string = u'Unknown type' + if len(name) > column_width: + column_width = len(name) + table_view = cli_views.CLITableView( + self._output_writer, column_width=column_width) + table_view.PrintHeader(u'Analysis Plugins') + for name, description, type_string in analysis_plugin_info: description = u'{0:s} [{1:s}]'.format(description, type_string) - self.PrintColumnValue(name, description, format_length) - self.PrintSeparatorLine() + table_view.PrintRow(name, description) + table_view.PrintFooter() def ListLanguageIdentifiers(self): """Lists the language identifiers.""" - self.PrintHeader(u'Language identifiers') - self.PrintColumnValue(u'Identifier', u'Language') + table_view = cli_views.CLITableView(self._output_writer) + table_view.PrintHeader(u'Language identifiers') + table_view.PrintRow(u'Identifier', u'Language') for language_id, value_list in sorted( language_ids.LANGUAGE_IDENTIFIERS.items()): - self.PrintColumnValue(language_id, value_list[1]) + table_view.PrintRow(language_id, value_list[1]) + table_view.PrintFooter() def ListOutputModules(self): """Lists the output modules.""" - self.PrintHeader(u'Output Modules') + table_view = cli_views.CLITableView(self._output_writer, column_width=10) + table_view.PrintHeader(u'Output Modules') for name, output_class in sorted(self._front_end.GetOutputClasses()): - self.PrintColumnValue(name, output_class.DESCRIPTION, 10) - self.PrintSeparatorLine() + table_view.PrintRow(name, output_class.DESCRIPTION) + table_view.PrintFooter() # Assign to an attribute due to line length limitations. disabled_classes = output_manager.OutputManager.GetDisabledOutputClasses if not disabled_classes(): return - self.PrintHeader(u'Disabled Output Modules') + + table_view = cli_views.CLITableView(self._output_writer, column_width=10) + table_view.PrintHeader(u'Disabled Output Modules') for output_class in disabled_classes(): - self.PrintColumnValue(output_class.NAME, output_class.DESCRIPTION, 10) - self.PrintSeparatorLine() + table_view.PrintRow(output_class.NAME, output_class.DESCRIPTION) + table_view.PrintFooter() def ParseArguments(self): """Parses the command line arguments.