Skip to content

Commit

Permalink
Added support for SystemResources .mun files log2timeline#4259
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Dec 31, 2023
1 parent b1a65b6 commit caaea3d
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 119 deletions.
90 changes: 44 additions & 46 deletions plaso/multi_process/extraction_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ def __init__(

except NotImplementedError:
logger.error((
'Unable to determine number of CPUs defaulting to {0:d} worker '
'processes.').format(self._WORKER_PROCESSES_MINIMUM))
f'Unable to determine number of CPUs defaulting to '
f'{self._WORKER_PROCESSES_MINIMUM:d} worker processes.'))
cpu_count = self._WORKER_PROCESSES_MINIMUM

number_of_worker_processes = cpu_count
Expand Down Expand Up @@ -185,15 +185,16 @@ def __init__(
self._resolver_context = context.Context()
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_callback = status_update_callback
self._system_configurations = None
self._task_manager = task_manager.TaskManager()
self._task_merge_helper = None
self._task_merge_helper_on_hold = None
self._task_queue = None
self._task_queue_port = None
self._task_storage_format = None
self._windows_event_log_providers = None
self._worker_memory_limit = worker_memory_limit
self._worker_timeout = worker_timeout
self._system_configurations = None

def _CacheFileSystem(self, file_system):
"""Caches a dfVFS file system object.
Expand Down Expand Up @@ -262,7 +263,7 @@ def _CollectInitialEventSources(self, storage_writer, file_system_path_specs):
if self._CheckExcludedPathSpec(file_system, path_spec):
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
path_spec)
logger.debug('Excluded from extraction: {0:s}.'.format(display_name))
logger.debug(f'Excluded from extraction: {display_name:s}.')
continue

# TODO: determine if event sources should be DataStream or FileEntry
Expand Down Expand Up @@ -305,8 +306,7 @@ def _CreateTask(self, storage_writer, session_identifier, event_source):
if self._CheckExcludedPathSpec(file_system, event_source.path_spec):
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
event_source.path_spec)
logger.debug('Excluded from extraction: {0:s}.'.format(
display_name))
logger.debug(f'Excluded from extraction: {display_name:s}.')
return None

task = self._task_manager.CreateTask(
Expand Down Expand Up @@ -409,11 +409,10 @@ def _MergeAttributeContainer(self, storage_writer, merge_helper, container):
# TODO: store this as a merge warning so this is preserved
# in the storage file.
logger.error((
'Unable to merge {0:s} attribute container: {1:s} since '
'corresponding event data stream: {2:s} could not be '
'found.').format(
container.CONTAINER_TYPE, identifier_string,
event_data_stream_lookup_key))
f'Unable to merge {container.CONTAINER_TYPE:s} attribute '
f'container: {identifier_string:s} since corresponding event '
f'data stream: {event_data_stream_lookup_key:s} could not be '
f'found.'))
return

elif container.CONTAINER_TYPE in (
Expand All @@ -438,10 +437,9 @@ def _MergeAttributeContainer(self, storage_writer, merge_helper, container):
description = 'WEVT_TEMPLATE event definition'

logger.error((
'Unable to merge {0:s} attribute container: {1:s} since '
'corresponding Windows EventLog message file: {2:s} could not '
'be found.').format(
description, identifier_string, message_file_lookup_key))
f'Unable to merge {description:s} attribute container: '
f'{identifier_string:s} since corresponding Windows EventLog '
f'message file: {message_file_lookup_key:s} could not be found.'))
return

lookup_key = None
Expand Down Expand Up @@ -554,9 +552,9 @@ def _MergeTaskStorage(self, storage_writer, session_identifier):
self._task_manager.UpdateTaskAsPendingMerge(task)

except KeyError as exception:
logger.error(
'Unable to retrieve task: {0:s} to prepare it to be merged '
'with error: {1!s}.'.format(task_identifier, exception))
logger.error((
f'Unable to retrieve task: {task_identifier:s} to prepare it to '
f'be merged with error: {exception!s}.'))
continue

if self._processing_profiler:
Expand Down Expand Up @@ -590,8 +588,8 @@ def _MergeTaskStorage(self, storage_writer, session_identifier):

except IOError as exception:
logger.error((
'Unable to merge results of task: {0:s} '
'with error: {1!s}').format(task.identifier, exception))
f'Unable to merge results of task: {task.identifier:s} with '
f'error: {exception!s}'))
self._task_merge_helper = None

if self._task_merge_helper:
Expand Down Expand Up @@ -638,9 +636,9 @@ def _MergeTaskStorage(self, storage_writer, session_identifier):
self._task_manager.CompleteTask(self._merge_task)

except KeyError as exception:
logger.error(
'Unable to complete task: {0:s} with error: {1!s}'.format(
self._merge_task.identifier, exception))
logger.error((
f'Unable to complete task: {self._merge_task.identifier:s} with '
f'error: {exception!s}'))

if not self._task_merge_helper_on_hold:
self._merge_task = None
Expand Down Expand Up @@ -720,9 +718,10 @@ def _ProcessEventSources(self, storage_writer, session_identifier):

else:
path_spec_string = self._GetPathSpecificationString(task.path_spec)
logger.debug(
'Scheduled task: {0:s} for path specification: {1:s}'.format(
task.identifier, path_spec_string.replace('\n', ' ')))
path_spec_string = path_spec_string.replace('\n', ' ')
logger.debug((
f'Scheduled task: {task.identifier:s} for path specification: '
f'{path_spec_string:s}'))

self._task_manager.SampleTaskStatus(task, 'scheduled')

Expand Down Expand Up @@ -753,8 +752,8 @@ def _ProcessEventSources(self, storage_writer, session_identifier):
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
self._ProduceExtractionWarning(storage_writer, (
'unable to process path specification with error: '
'{0!s}').format(exception), event_source.path_spec)
f'unable to process path specification with error: '
f'{exception!s}'), event_source.path_spec)
event_source = None

for task in self._task_manager.GetFailedTasks():
Expand Down Expand Up @@ -862,17 +861,17 @@ def _StartWorkerProcess(self, process_name):
MultiProcessWorkerProcess: extraction worker process or None if the
process could not be started.
"""
logger.debug('Starting worker process {0:s}'.format(process_name))
logger.debug(f'Starting worker process {process_name:s}')

queue_name = '{0:s} task queue'.format(process_name)
queue_name = f'{process_name:s} task queue'
task_queue = zeromq_queue.ZeroMQRequestConnectQueue(
delay_open=True, linger_seconds=0, name=queue_name,
port=self._task_queue_port,
timeout_seconds=self._TASK_QUEUE_TIMEOUT_SECONDS)

process = extraction_process.ExtractionWorkerProcess(
task_queue, self._processing_configuration, self._system_configurations,
self._registry_find_specs,
self._windows_event_log_providers, self._registry_find_specs,
enable_sigsegv_handler=self._enable_sigsegv_handler, name=process_name)

# Remove all possible log handlers to prevent a child process from logging
Expand All @@ -894,9 +893,8 @@ def _StartWorkerProcess(self, process_name):
except (IOError, KeyError) as exception:
pid = process.pid
logger.error((
'Unable to monitor replacement worker process: {0:s} '
'(PID: {1:d}) with error: {2!s}').format(
process_name, pid, exception))
f'Unable to monitor replacement worker process: {process_name:s} '
f'(PID: {pid:d}) with error: {exception!s}'))

self._TerminateProcess(process)
return None
Expand Down Expand Up @@ -1010,8 +1008,8 @@ def _UpdateProcessingStatus(self, pid, process_status, used_memory):
current_timestamp = time.time()
if current_timestamp > last_activity_timestamp:
logger.error((
'Process {0:s} (PID: {1:d}) has not reported activity within '
'the timeout period.').format(process.name, pid))
f'Process {process.name:s} (PID: {pid:d}) has not reported '
f'activity within the timeout period.'))
processing_status = definitions.STATUS_INDICATOR_NOT_RESPONDING

self._processing_status.UpdateWorkerStatus(
Expand All @@ -1029,9 +1027,9 @@ def _UpdateProcessingStatus(self, pid, process_status, used_memory):
self._task_manager.UpdateTaskAsProcessingByIdentifier(task_identifier)
return
except KeyError:
logger.debug(
'Worker {0:s} is processing unknown task: {1:s}.'.format(
process.name, task_identifier))
logger.debug((
f'Worker {process.name:s} is processing unknown task: '
f'{task_identifier:s}.'))

def _UpdateStatus(self):
"""Updates the status."""
Expand Down Expand Up @@ -1098,8 +1096,7 @@ def ProcessSourceMulti(
filter_file_path=processing_configuration.filter_file)
except errors.InvalidFilter as exception:
raise errors.BadConfigOption(
'Unable to build collection filters with error: {0!s}'.format(
exception))
f'Unable to build collection filters with error: {exception!s}')

self._event_data_timeliner = timeliner.EventDataTimeliner(
data_location=processing_configuration.data_location,
Expand All @@ -1120,6 +1117,8 @@ def ProcessSourceMulti(
self._storage_file_path = storage_file_path
self._storage_writer = storage_writer
self._task_storage_format = processing_configuration.task_storage_format
self._windows_event_log_providers = list(
storage_writer.GetAttributeContainers('windows_eventlog_provider'))

# Set up the task queue.
task_outbound_queue = zeromq_queue.ZeroMQBufferedReplyBindQueue(
Expand All @@ -1138,11 +1137,10 @@ def ProcessSourceMulti(
self._StartTaskStorage(self._task_storage_format)

for worker_number in range(self._number_of_worker_processes):
process_name = 'Worker_{0:02d}'.format(self._last_worker_number)
process_name = f'Worker_{self._last_worker_number:02d}'
worker_process = self._StartWorkerProcess(process_name)
if not worker_process:
logger.error('Unable to create worker process: {0:d}'.format(
worker_number))
logger.error(f'Unable to create worker process: {worker_number:d}')

self._StartProfiling(self._processing_configuration.profiling)
self._task_manager.StartProfiling(
Expand Down Expand Up @@ -1198,8 +1196,7 @@ def ProcessSourceMulti(
self._task_storage_format, session_identifier,
abort=task_storage_abort)
except (IOError, OSError) as exception:
logger.error('Unable to stop task storage with error: {0!s}'.format(
exception))
logger.error(f'Unable to stop task storage with error: {exception!s}')

if self._abort:
logger.debug('Processing aborted.')
Expand All @@ -1219,5 +1216,6 @@ def ProcessSourceMulti(
self._storage_writer = None
self._system_configurations = None
self._task_storage_format = None
self._windows_event_log_providers = None

return self._processing_status
Loading

0 comments on commit caaea3d

Please sign in to comment.