Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Asynchronous server-side background task execution #4317

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ jobs:
- name: Install dependencies
run: |
pip install $(grep -iE "pylint|pycodestyle" analyzer/requirements_py/dev/requirements.txt)
- name: Run tests
run: make pylint pycodestyle
- name: Run pycodestyle & pylint
run: make -k pycodestyle pylint

tools:
name: Tools (report-converter, etc.)
Expand Down
39 changes: 1 addition & 38 deletions analyzer/codechecker_analyzer/analysis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
import shutil
import signal
import sys
import time
import traceback
import zipfile

from threading import Timer

import multiprocess
import psutil

from codechecker_common.logger import get_logger
from codechecker_common.process import kill_process_tree
from codechecker_common.review_status_handler import ReviewStatusHandler

from codechecker_statistics_collector.collectors.special_return_value import \
Expand Down Expand Up @@ -341,42 +340,6 @@ def handle_failure(
os.remove(plist_file)


def kill_process_tree(parent_pid, recursive=False):
"""Stop the process tree try it gracefully first.

Try to stop the parent and child processes gracefuly
first if they do not stop in time send a kill signal
to every member of the process tree.

There is a similar function in the web part please
consider to update that in case of changing this.
"""
proc = psutil.Process(parent_pid)
children = proc.children(recursive)

# Send a SIGTERM (Ctrl-C) to the main process
proc.terminate()

# If children processes don't stop gracefully in time,
# slaughter them by force.
_, still_alive = psutil.wait_procs(children, timeout=5)
for p in still_alive:
p.kill()

# Wait until this process is running.
n = 0
timeout = 10
while proc.is_running():
if n > timeout:
LOG.warning("Waiting for process %s to stop has been timed out"
"(timeout = %s)! Process is still running!",
parent_pid, timeout)
break

time.sleep(1)
n += 1


def setup_process_timeout(proc, timeout,
failure_callback=None):
"""
Expand Down
49 changes: 27 additions & 22 deletions bin/CodeChecker
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# -------------------------------------------------------------------------

"""
Used to kickstart CodeChecker.
Save original environment without modifications.

Saves original environment without modifications.
Used to run the logging in the same env.
"""
# This is for enabling CodeChecker as a filename (i.e. module name).
Expand All @@ -25,9 +25,10 @@ import sys
import tempfile

PROC_PID = None
EXIT_CODE = None


def run_codechecker(checker_env, subcommand=None):
def run_codechecker(checker_env, subcommand=None) -> int:
"""
Run the CodeChecker.
* checker_env - CodeChecker will be run in the checker env.
Expand Down Expand Up @@ -63,11 +64,13 @@ def run_codechecker(checker_env, subcommand=None):
global PROC_PID
PROC_PID = proc.pid

proc.wait()
sys.exit(proc.returncode)
global EXIT_CODE
EXIT_CODE = proc.wait()

return EXIT_CODE


def main(subcommand=None):
def main(subcommand=None) -> int:
original_env = os.environ.copy()
checker_env = original_env

Expand All @@ -94,30 +97,32 @@ def main(subcommand=None):
print('Saving original build environment failed.')
print(ex)

def signal_term_handler(signum, _frame):
def signal_handler(signum, _frame):
"""
Forwards the received signal to the CodeChecker subprocess started by
this `main` script.
"""
global PROC_PID
if PROC_PID and sys.platform != "win32":
os.kill(PROC_PID, signal.SIGINT)

_remove_tmp()
sys.exit(128 + signum)

signal.signal(signal.SIGTERM, signal_term_handler)
signal.signal(signal.SIGINT, signal_term_handler)

def signal_reload_handler(_sig, _frame):
global PROC_PID
if PROC_PID:
os.kill(PROC_PID, signal.SIGHUP)
try:
os.kill(PROC_PID, signum)
except ProcessLookupError:
pass

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if sys.platform != "win32":
signal.signal(signal.SIGHUP, signal_reload_handler)
signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGCHLD, signal_handler)

try:
run_codechecker(checker_env, subcommand)
global EXIT_CODE
EXIT_CODE = run_codechecker(checker_env, subcommand)
finally:
_remove_tmp()

return EXIT_CODE


if __name__ == "__main__":
main(None)
sys.exit(main(None) or 0)
15 changes: 11 additions & 4 deletions codechecker_common/compatibility/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
# pylint: disable=no-name-in-module
# pylint: disable=unused-import
if sys.platform in ["darwin", "win32"]:
from multiprocess import Pool # type: ignore
from multiprocess import cpu_count
from multiprocess import \
Pool, Process, \
Queue, \
Value, \
cpu_count
else:
from concurrent.futures import ProcessPoolExecutor as Pool # type: ignore
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import \
Process, \
Queue, \
Value, \
cpu_count
54 changes: 48 additions & 6 deletions codechecker_common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
#
# -------------------------------------------------------------------------


import argparse
import datetime
import json
import logging
from logging import config
from pathlib import Path
import os
import sys
from typing import Optional

# The logging leaves can be accesses without
# importing the logging module in other modules.
# The logging leaves can be accesses without importing the logging module in
# other modules.
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
Expand All @@ -25,14 +27,24 @@

CMDLINE_LOG_LEVELS = ['info', 'debug_analyzer', 'debug']

DEBUG_ANALYZER = logging.DEBUG_ANALYZER = 15 # type: ignore
DEBUG_ANALYZER = 15
logging.addLevelName(DEBUG_ANALYZER, 'DEBUG_ANALYZER')


_Levels = {"DEBUG": DEBUG,
"DEBUG_ANALYZER": DEBUG_ANALYZER,
"INFO": INFO,
"WARNING": WARNING,
"ERROR": ERROR,
"CRITICAL": CRITICAL,
"NOTSET": NOTSET,
}


class CCLogger(logging.Logger):
def debug_analyzer(self, msg, *args, **kwargs):
if self.isEnabledFor(logging.DEBUG_ANALYZER):
self._log(logging.DEBUG_ANALYZER, msg, args, **kwargs)
if self.isEnabledFor(DEBUG_ANALYZER):
self._log(DEBUG_ANALYZER, msg, args, **kwargs)


logging.setLoggerClass(CCLogger)
Expand Down Expand Up @@ -113,6 +125,36 @@ def validate_loglvl(log_level):
return log_level


def raw_sprint_log(logger: logging.Logger, level: str, message: str) \
-> Optional[str]:
"""
Formats a raw log `message` using the date format of the specified
`logger`, without actually invoking the logging infrastructure.
"""
if not logger.isEnabledFor(_Levels[level]):
return None

formatter = logger.handlers[0].formatter if len(logger.handlers) > 0 \
else None
datefmt = formatter.datefmt if formatter else None
time = datetime.datetime.now().strftime(datefmt) if datefmt \
else str(datetime.datetime.now())

return f"[{validate_loglvl(level)} {time}] - {message}"


def signal_log(logger: logging.Logger, level: str, message: str):
"""
Simulates a log output and logs a message within a signal handler, without
triggering a `RuntimeError` due to reentrancy in `print`-like method calls.
"""
formatted = raw_sprint_log(logger, level, message)
if not formatted:
return

os.write(sys.stderr.fileno(), f"{formatted}\n".encode())


class LogCfgServer:
"""
Initialize a log configuration server for dynamic log configuration.
Expand Down
49 changes: 49 additions & 0 deletions codechecker_common/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -------------------------------------------------------------------------
#
# Part of the CodeChecker project, under the Apache License v2.0 with
# LLVM Exceptions. See LICENSE for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# -------------------------------------------------------------------------
import time

import psutil

from .logger import get_logger


LOG = get_logger("system")


def kill_process_tree(parent_pid, recursive=False):
"""
Stop the process tree, gracefully at first.

Try to stop the parent and child processes gracefuly first.
If they do not stop in time, send a kill signal to every member of the
process tree.
"""
proc = psutil.Process(parent_pid)
children = proc.children(recursive)

# Send a SIGTERM to the main process.
proc.terminate()

# If children processes don't stop gracefully in time, slaughter them
# by force.
_, still_alive = psutil.wait_procs(children, timeout=5)
for p in still_alive:
p.kill()

# Wait until this process is running.
n = 0
timeout = 10
while proc.is_running():
if n > timeout:
LOG.warning("Waiting for process %s to stop has been timed out"
"(timeout = %s)! Process is still running!",
parent_pid, timeout)
break

time.sleep(1)
n += 1
19 changes: 19 additions & 0 deletions codechecker_common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
"""
Util module.
"""
import datetime
import hashlib
import itertools
import json
import os
import random
from typing import TextIO

import portalocker
Expand Down Expand Up @@ -112,3 +115,19 @@ def path_for_fake_root(full_path: str, root_path: str = '/') -> str:
def strtobool(value: str) -> bool:
"""Parse a string value to a boolean."""
return value.lower() in ('y', 'yes', 't', 'true', 'on', '1')


def generate_random_token(num_bytes: int = 32) -> str:
"""
Returns a random-generated string usable as a token with `num_bytes`
hexadecimal characters in the output.
"""
prefix = str(os.getpid()).encode()
suffix = str(datetime.datetime.now()).encode()

hash_value = ''.join(
[hashlib.sha256(prefix + os.urandom(num_bytes * 2) + suffix)
.hexdigest()
for _ in range(0, -(num_bytes // -64))])
idx = random.randrange(0, len(hash_value) - num_bytes + 1)
return hash_value[idx:(idx + num_bytes)]
10 changes: 9 additions & 1 deletion docs/web/server_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@ Table of Contents
* [Size of the compilation database](#size-of-the-compilation-database)
* [Authentication](#authentication)

## Number of worker processes
## Number of API worker processes
The `worker_processes` section of the config file controls how many processes
will be started on the server to process API requests.

*Default value*: <CPU count>

The server needs to be restarted if the value is changed in the config file.

### Number of task worker processes
The `background_worker_processes` section of the config file controls how many
processes will be started on the server to process background jobs.

*Default value*: Fallback to same amount as `worker_processes`.

The server needs to be restarted if the value is changed in the config file.

## Run limitation
The `max_run_count` section of the config file controls how many runs can be
stored on the server for a product.
Expand Down
Loading
Loading