Skip to content

Commit

Permalink
Code review: 241120043: Clean up of the multi processing code.
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Dec 31, 2015
1 parent b9911b3 commit 9cf3fda
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 111 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ python-plaso (1.2.1-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <[email protected]> Wed, 03 Jun 2015 19:34:33 +0200
-- Log2Timeline <[email protected]> Wed, 03 Jun 2015 20:43:58 +0200
6 changes: 6 additions & 0 deletions plaso/engine/single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ def ProcessSources(
hasher_names_string: Optional comma separated string of names of
hashers to enable. The default is None.
parser_filter_string: Optional parser filter string. The default is None.
Returns:
A boolean value indicating the sources were processed without
unrecoverable errors.
"""
extraction_worker = self.CreateExtractionWorker(0)

Expand Down Expand Up @@ -210,6 +214,8 @@ def ProcessSources(

logging.debug(u'Processing completed.')

return True


class SingleProcessEventExtractionWorker(worker.BaseEventExtractionWorker):
"""Class that defines the single process event extraction worker."""
Expand Down
2 changes: 1 addition & 1 deletion plaso/lib/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ def Close(self):
self._zipfile.close()
self._file_open = False
if not self._read_only:
logging.info((
logging.debug((
u'[Storage] Closing the storage, number of events added: '
u'{0:d}').format(self._write_counter))

Expand Down
64 changes: 35 additions & 29 deletions plaso/multi_processing/foreman.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def _CheckStatus(self, pid):
unexpectedly terminated.
KeyError: if the process is not registered with the foreman.
"""
if pid not in self._processes_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not registered with foreman'.format(pid))
self._RaiseIfNotRegistered(pid)

process = self._processes_per_pid[pid]

Expand Down Expand Up @@ -179,13 +177,8 @@ def _LogMemoryUsage(self, pid):
Raises:
KeyError: if the process is not registered with the foreman.
"""
if pid not in self._processes_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not registered with foreman'.format(pid))

if pid not in self._process_information_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not in monitoring list.'.format(pid))
self._RaiseIfNotRegistered(pid)
self._RaiseIfNotMonitored(pid)

process = self._processes_per_pid[pid]
process_information = self._process_information_per_pid[pid]
Expand All @@ -208,9 +201,7 @@ def _StartMonitoringProcess(self, pid):
if the process if the processed is already being monitored.
IOError: if the RPC client cannot connect to the server.
"""
if pid not in self._processes_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not registered with foreman'.format(pid))
self._RaiseIfNotRegistered(pid)

if pid in self._process_information_per_pid:
raise KeyError(
Expand All @@ -232,23 +223,44 @@ def _StartMonitoringProcess(self, pid):
self._rpc_clients_per_pid[pid] = rpc_client
self._process_information_per_pid[pid] = process_info.ProcessInfo(pid)

def _StopMonitoringProcess(self, pid):
"""Stops monitoring a process.
def _RaiseIfNotMonitored(self, pid):
"""Raises if the process is not monitored by the foreman.
Args:
pid: The process identifier.
Raises:
KeyError: if the process is not registered with the foreman or
if the process is registered, but not monitored.
KeyError: if the process is not monitored by the foreman.
"""
if pid not in self._process_information_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not monitored by foreman.'.format(pid))

def _RaiseIfNotRegistered(self, pid):
"""Raises if the process is not registered with the foreman.
Args:
pid: The process identifier.
Raises:
KeyError: if the process is not registered with the foreman.
"""
if pid not in self._processes_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not registered with foreman'.format(pid))

if pid not in self._process_information_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not in monitoring list.'.format(pid))
def _StopMonitoringProcess(self, pid):
"""Stops monitoring a process.
Args:
pid: The process identifier.
Raises:
KeyError: if the process is not registered with the foreman or
if the process is registered, but not monitored.
"""
self._RaiseIfNotRegistered(pid)
self._RaiseIfNotMonitored(pid)

process = self._processes_per_pid[pid]
del self._process_information_per_pid[pid]
Expand All @@ -274,9 +286,7 @@ def _TerminateProcess(self, pid):
Raises:
KeyError: if the process is not registered with the foreman.
"""
if pid not in self._processes_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not registered with foreman'.format(pid))
self._RaiseIfNotRegistered(pid)

process = self._processes_per_pid[pid]

Expand All @@ -299,9 +309,7 @@ def _UpdateProcessingStatus(self, pid, process_status):
Raises:
KeyError: if the process is not registered with the foreman.
"""
if pid not in self._processes_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not registered with foreman'.format(pid))
self._RaiseIfNotRegistered(pid)

if not process_status:
return
Expand All @@ -324,9 +332,7 @@ def _UpdateProcessingStatus(self, pid, process_status):
process.name, pid, number_of_events, status_indicator)

elif process_type == definitions.PROCESS_TYPE_WORKER:
if pid not in self._process_information_per_pid:
raise KeyError(
u'Process (PID: {0:d}) not in monitoring list.'.format(pid))
self._RaiseIfNotMonitored(pid)

number_of_events = process_status.get(u'number_of_events', 0)
number_of_path_specs = process_status.get(u'number_of_path_specs', 0)
Expand Down
Loading

0 comments on commit 9cf3fda

Please sign in to comment.