diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml new file mode 100644 index 0000000..81637db --- /dev/null +++ b/.github/workflows/linters.yml @@ -0,0 +1,15 @@ +name: lint +on: [push, pull_request] +jobs: + code_linting: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - name: Code linters + run: | + pip install -r requirements.txt + pycodestyle workbench-agent.py + pylint --errors-only --rcfile .pylintrc workbench-agent.py + + + diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 index 68bc17f..53f0526 --- a/.gitignore +++ b/.gitignore @@ -1,160 +1,15 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class +# Editors +.vscode/ +.idea/ -# C extensions -*.so +# Vagrant +.vagrant/ -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST +# Mac/OSX +.DS_Store -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec +# Windows +Thumbs.db -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/#use-with-ide -.pdm.toml - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +# Log file +log-agent.txt \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100755 index 0000000..901d303 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,575 @@ +[MAIN] + +disable=consider-using-f-string, + simplifiable-if-statement, + no-else-return, + broad-exception-caught, + unspecified-encoding, + logging-fstring-interpolation, + logging-format-interpolation + +# Specify a configuration file. +#rcfile= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Files or directories to be skipped. They should be base names, not +# paths. +ignore=CVS + +# Add files or directories matching the regex patterns to the ignore-list. The +# regex matches against paths and can be in Posix or Windows format. +ignore-paths= + +# Files or directories matching the regex patterns are skipped. The regex +# matches against base names, not paths. +ignore-patterns=^\.# + +# Pickle collected data for later comparisons. +persistent=yes + +# List of plugins (as comma separated values of python modules names) to load, +# usually to register additional checkers. +load-plugins= + pylint.extensions.check_elif, + pylint.extensions.bad_builtin, + pylint.extensions.docparams, + pylint.extensions.for_any_all, + pylint.extensions.set_membership, + pylint.extensions.code_style, + pylint.extensions.overlapping_exceptions, + pylint.extensions.typing, + pylint.extensions.redefined_variable_type, + pylint.extensions.comparison_placement, + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use. +jobs=1 + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code +extension-pkg-allow-list= + +# Minimum supported python version +py-version = 3.7.2 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# Specify a score threshold to be exceeded before program exits with error. +fail-under=10.0 + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED +# confidence= + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + use-symbolic-message-instead, + useless-suppression, + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once).You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use"--disable=all --enable=classes +# --disable=W" + +disable= + attribute-defined-outside-init, + invalid-name, + missing-docstring, + protected-access, + too-few-public-methods, + # handled by black + format, + # We anticipate #3512 where it will become optional + fixme, + cyclic-import, + # No exception type(s) specified (bare-except) + W0702, + + +[REPORTS] + +# Set the output format. Available formats are text, parseable, colorized, msvs +# (visual studio) and html. You can also give a reporter class, eg +# mypackage.mymodule.MyReporterClass. +output-format=text + +# Tells whether to display a full report or only the messages +reports=no + +# Python expression which should return a note less than 10 (10 is the highest +# note). You have access to the variables 'fatal', 'error', 'warning', 'refactor', 'convention' +# and 'info', which contain the number of messages in each category, as +# well as 'statement', which is the total number of statements analyzed. This +# score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details +#msg-template= + +# Activate the evaluation score. +score=yes + + +[LOGGING] + +# Logging modules to check that the string format arguments are in logging +# function parameter format +logging-modules=logging + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME,XXX,TODO + +# Regular expression of note tags to take in consideration. +#notes-rgx= + + +[SIMILARITIES] + +# Minimum lines number of a similarity. +min-similarity-lines=6 + +# Ignore comments when computing similarities. +ignore-comments=yes + +# Ignore docstrings when computing similarities. +ignore-docstrings=yes + +# Ignore imports when computing similarities. +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + + +[VARIABLES] + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# A regular expression matching the name of dummy variables (i.e. expectedly +# not used). +dummy-variables-rgx=_$|dummy + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_,_cb + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# Argument names that match this expression will be ignored. Default to name +# with leading underscore. +ignored-argument-names=_.* + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io + + +[FORMAT] + +# Maximum number of characters on a single line. +max-line-length=100 + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Maximum number of lines in a module +max-module-lines=2000 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + + +[BASIC] + +# Good variable names which should always be accepted, separated by a comma +good-names=i,j,k,ex,Run,_ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Bad variable names which should always be refused, separated by a comma +bad-names=foo,bar,baz,toto,tutu,tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Include a hint for the correct naming format with invalid-name +include-naming-hint=no + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names +function-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names +variable-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names +const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names +attr-rgx=[a-z_][a-z0-9_]{2,}$ + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names +argument-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names +class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. +#class-const-rgx= + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names +inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names +class-rgx=[A-Z_][a-zA-Z0-9]+$ + + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names +module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ + + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names +method-rgx=[a-z_][a-z0-9_]{2,}$ + +# Regular expression which can overwrite the naming style set by typevar-naming-style. +#typevar-rgx= + +# Regular expression which should only match function or class names that do +# not require a docstring. Use ^(?!__init__$)_ to also check __init__. +no-docstring-rgx=__.*__ + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# List of decorators that define properties, such as abc.abstractproperty. +property-classes=abc.abstractproperty + + +[TYPECHECK] + +# Regex pattern to define which classes are considered mixins if ignore-mixin- +# members is set to 'yes' +mixin-class-rgx=.*MixIn + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis). It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members=REQUEST,acl_users,aq_parent,argparse.Namespace + +# List of decorators that create context managers from functions, such as +# contextlib.contextmanager. +contextmanager-decorators=contextlib.contextmanager + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +[SPELLING] + +# Spelling dictionary name. Available dictionaries: none. To make it working +# install python-enchant package. +spelling-dict= + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# List of comma separated words that should be considered directives if they +# appear and the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:,pragma:,# noinspection + +# A path to a file that contains private dictionary; one word per line. +spelling-private-dict-file=.pyenchant_pylint_custom_dict.txt + +# Tells whether to store unknown words to indicated private dictionary in +# --spelling-private-dict-file option instead of raising a message. +spelling-store-unknown-words=no + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=2 + + +[DESIGN] + +# Maximum number of arguments for function / method +max-args=12 + +# Maximum number of locals for function / method body +max-locals=25 + +# Maximum number of return / yield for function / method body +max-returns=11 + +# Maximum number of branch for function / method body +max-branches=27 + +# Maximum number of statements in function / method body +max-statements=100 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# List of qualified class names to ignore when counting class parents (see R0901). +ignored-parents= + +# Maximum number of attributes for a class (see R0902). +max-attributes=11 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=25 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# List of regular expressions of class ancestor names to +# ignore when counting public methods (see R0903). +exclude-too-few-public-methods= + +[CLASSES] + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__,__new__,setUp,__post_init__ + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Deprecated modules which should not be used, separated by a comma +deprecated-modules=regsub,TERMIOS,Bastion,rexec + +# Create a graph of every (i.e. internal and external) dependencies in the +# given file (report RP0402 must not be disabled) +import-graph= + +# Create a graph of external dependencies in the given file (report RP0402 must +# not be disabled) +ext-import-graph= + +# Create a graph of internal dependencies in the given file (report RP0402 must +# not be disabled) +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when being caught. Defaults to +# "Exception" +overgeneral-exceptions=Exception + + +[TYPING] + +# Set to ``no`` if the app / library does **NOT** need to support runtime +# introspection of type annotations. If you use type annotations +# **exclusively** for type checking of an application, you're probably fine. +# For libraries, evaluate if some users what to access the type hints at +# runtime first, e.g., through ``typing.get_type_hints``. Applies to Python +# versions 3.7 - 3.9 +runtime-typing = no + + +[DEPRECATED_BUILTINS] + +# List of builtins function names that should not be used, separated by a comma +bad-functions=map,input + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[CODE_STYLE] + +# Max line length for which to sill emit suggestions. Used to prevent optional +# suggestions which would get split by a code formatter (e.g., black). Will +# default to the setting for ``max-line-length``. +#max-line-length-suggestions= \ No newline at end of file diff --git a/README.md b/README.md new file mode 100755 index 0000000..f1fd35d --- /dev/null +++ b/README.md @@ -0,0 +1,208 @@ +# Workbench-Agent + +## Overview + +The **Workbench-Agent** is a Python script used for integrating with **FossID Workbench** in CI/CD pipelines. It leverages the +Workbench API in order to upload code, scan code and retrieve various types of results. + +There are various scenarios for integrating the Workbench into a CI/CD pipeline, each with its own pros and cons. Those +scenarios are presented in the Workbench documentation. + +At this moment the Workbench-Agent supports two scenarios: + +- Upload code directly to Workbench + +- Generate hashes locally using **fossid-cli** and upload those to Workbench (also known as a blind scan). + +### 1. Upload code directly to Workbench + +WB-Agent Calls Workbench API and creates project and scan (or uses already existing one with given project/scan code) + +Uploads files from given path via Workbench API. Extract archives API actions is also called to expand any uploaded archive. + +Initiates scan, usually with auto id and delta scan enabled + +Checks status in a loop. Use also a max limit of time to stop on malfunctioning scans. + +When scan finishes can return various type of results: list of all licenses identified, list of all components found, +policy warnings at scan or project level. Also saves results to a file specified by parameter --path-result PATH_RESULT + +Below are some pros and cons compared with other integration scenarios: + +#### Pros: +- local file content is available when inspecting the files in Workbench + +- no need to manually expand .war/.jar files, this is handled in the Workbench + +#### Cons: +- much larger files to be uploaded to the Workbench resulting in possibly longer execution time of the pipeline. + + + +### 2. Generate hashes using fossid-cli and upload those to Workbench (blind scan) + +Requires fossid-cli for generating file signatures using the --local flag. Usually WB-Agent is distributed in a container +image containing also fossid-cli and Shinobi License Extractor. This image can be easily pulled in CI/CD pipelines from +a container repository. + +Saves file signatures on a temporary file with .fossid extension + +Calls Workbench API and create project and scan (or use already existing one with give project/scan code) + +Uploads .fossid file via Workbench API + +Initiates scan, usually with auto id and delta scan enabled + +Checks status in a loop. Use also a max limit of time to stop on malfunctioning scans. + +When scan finishes can return various type of results: list of all licenses identified, list of all components found, +policy warnings at scan or project level. Also saves results to a file specified by parameter --path-result PATH_RESULT + +Below are some pros and cons compared with other integration scenarios: + +##### Pros: + +- no need to make code available to Workbench avoiding large files being uploaded + +- easy setup + +#### Cons: + +- the scanned files (local files) will not be available for comparison with matches in Workbench UI. + + +## Installation + +Copy the file "workbench-agent.py" file to a server with Python installed and with access to a Workbench API. +Install dependencies: + +```bash +pip install -r requirements.txt +``` + + +## Usage +Example: +```bash + python3 workbench-agent.py --api_url=https://myserver.com/api.php \ + --api_user=my_user \ + --api_token=xxxxxxxxx \ + --project_code=prod \ + --scan_code=${BUILD_NUMBER} \ + --limit=10 \ + --sensitivity=10 \ + --auto_identification_detect_declaration \ + --auto_identification_detect_copyright \ + --delta_only \ + --scan_number_of_tries=100 \ + --scan_wait_time=30 \ + --path='/some/path/to/files/to/be/scanned' + --path-result='/tmp/fossid_result.json' + + + +``` +Detailed parameters description: +```bash + python3 workbench-agent.py --help +usage: python3 workbench-agent.py [-h] --api_url API_URL --api_user API_USER --api_token API_TOKEN --project_code PROJECT_CODE --scan_code SCAN_CODE [--limit LIMIT] + [--sensitivity SENSITIVITY] [--auto_identification_detect_declaration] [--auto_identification_detect_copyright] + [--auto_identification_resolve_pending_ids] [--delta_only] [--reuse_identifications] + [--identification_reuse_type {any,only_me,specific_project,specific_scan}] [--specific_code SPECIFIC_CODE] + [--scan_number_of_tries SCAN_NUMBER_OF_TRIES] [--scan_wait_time SCAN_WAIT_TIME] --path PATH [--log LOG] [--get_scan_identified_components] + +Run FossID Workbench Agent + +required arguments: + --api_url API_URL URL of the Workbench API instance, Ex: https://myserver.com/api.php + --api_user API_USER Workbench user that will make API calls + --api_token API_TOKEN + Workbench user API token (Not the same with user password!!!) + --project_code PROJECT_CODE + Name of the project inside Workbench where the scan will be created. + If the project doesnt exist, it will be created + --scan_code SCAN_CODE + The scan code used when creating the scan in Workbench. It can be based on some env var, + for example: ${BUILD_NUMBER} + --scan_number_of_tries SCAN_NUMBER_OF_TRIES + Number of calls to "check_status" till declaring the scan failed from the point of view of the agent. + --scan_wait_time SCAN_WAIT_TIME + Time interval between calling "check_status", expressed in seconds (default 30 seconds) + --path PATH Path of the directory where the files to be scanned reside + +optional arguments: + -h, --help show this help message and exit + --limit LIMIT Limits CLI results to N most significant matches (default: 10) + --sensitivity SENSITIVITY + Sets snippet sensitivity to a minimum of N lines (default: 10) + --auto_identification_detect_declaration + Automatically detect license declaration inside files. This argument expects no value, not passing + this argument is equivalent to assigning false. + --auto_identification_detect_copyright + Automatically detect copyright statements inside files. This argument expects no value, not passing + this argument is equivalent to assigning false. + --auto_identification_resolve_pending_ids + Automatically resolve pending identifications. This argument expects no value, not passing + this argument is equivalent to assigning false. + --delta_only Scan only delta (newly added files from last scan). + --run_dependency_analysis + Initiate dependency analysis after finishing scanning for matches in KB. + --run_only_dependency_analysis + Scan only for dependencies, no results from KB. + --reuse_identifications + If present, try to use an existing identification depending on parameter "identification_reuse_type". + --identification_reuse_type {any,only_me,specific_project,specific_scan} + Based on reuse type last identification found will be used for files with the same hash. + --specific_code SPECIFIC_CODE + The scan code user when creating the scan in Workbench. It can be based on some env var, + for example: ${BUILD_NUMBER} + --log LOG specify logging level. Allowed values: DEBUG, INFO, WARNING, ERROR + --path-result PATH_RESULT + Save results to specified path + + --get_scan_identified_components + By default at the end of scanning the list of licenses identified will be retrieved. + When passing this parameter the agent will return the list of identified components instead. + This argument expects no value, not passing this argument is equivalent to assigning false. + --scans_get_policy_warnings_counter + By default at the end of scanning the list of licenses identified will be retrieved. + When passing this parameter the agent will return information about policy warnings found in this scan + based on policy rules set at Project level. + This argument expects no value, not passing this argument is equivalent to assigning false. + + --projects_get_policy_warnings_info + By default at the end of scanning the list of licenses identified will be retrieved. + When passing this parameter the agent will return information about policy warnings for project, + including the warnings counter. + This argument expects no value, not passing this argument is equivalent to assigning false. + + +``` + + +## Contributing + +Thank you for considering contributing to FossID Workbench-Agent. Easiest way to contribute is by reporting bugs or by +sending improvement suggestions. The FossID Support Portal is the preferred channel for sending those, but you can use +the Issues in GitHub repository is an alternative channel. + +Pull requests are also welcomed. Please note that the Workbench-Agent is licensed under MIT license. +The submission of your contribution implies that you agree with MIT licensing terms. + +## Development + +We make efforts to comply with PEP8 Style guide (https://peps.python.org/pep-0008/) +Run this command for checking code style issues: +```bash + pycodestyle workbench-agent.py +``` +Using 'black' auto-formatter various problems can be fixed with this command: +```bash + black workbench-agent.py +``` +Linting + +Run pylint in order reveal possible issues: +```bash + pylint workbench-agent.py +``` diff --git a/requirements.txt b/requirements.txt new file mode 100755 index 0000000..787c11a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests +python-dotenv +pycodestyle +pylint +black diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..d3c0fc6 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[pycodestyle] +count = False +ignore = E1,E23,E722,W503 +max-line-length = 120 +statistics = True diff --git a/workbench-agent.py b/workbench-agent.py new file mode 100755 index 0000000..876ddc1 --- /dev/null +++ b/workbench-agent.py @@ -0,0 +1,1269 @@ +#!/usr/bin/env python3 + +# Copyright: FossID AB 2022 + +import builtins +import json +import time +import logging +import argparse +import random +import base64 +import os +import subprocess +from argparse import RawTextHelpFormatter +import sys +import traceback +import requests + +# from dotenv import load_dotenv +logger = logging.getLogger("log") + + +class Workbench: + """ + A class to interact with the FossID Workbench API for managing scans and projects. + + Attributes: + api_url (str): The base URL of the Workbench API. + api_user (str): The username used for API authentication. + api_token (str): The API token for authentication. + """ + + def __init__(self, api_url: str, api_user: str, api_token: str): + """ + Initializes the Workbench object with API credentials and endpoint. + + Args: + api_url (str): The base URL of the Workbench API. + api_user (str): The username used for API authentication. + api_token (str): The API token for authentication. + """ + self.api_url = api_url + self.api_user = api_user + self.api_token = api_token + + def _send_request(self, payload: dict) -> dict: + """ + Sends a request to the Workbench API. + + Args: + payload (dict): The payload of the request. + + Returns: + dict: The JSON response from the API. + """ + url = self.api_url + headers = { + "Accept": "*/*", + "Content-Type": "application/json; charset=utf-8", + } + req_body = json.dumps(payload) + logger.debug("url %s", url) + logger.debug("url %s", headers) + logger.debug(req_body) + response = requests.request( + "POST", url, headers=headers, data=req_body, timeout=1800 + ) + logger.debug(response.text) + return json.loads(response.text) + + def upload_files(self, scan_code: str, path: str): + """ + Uploads a .fossid file to the Workbench using the API's Upload endpoint. + + Args: + scan_code (str): The code of the scan where the hashes should be uploaded. + path (str): Path to the blind scan result (.fossid file). + """ + name = base64.b64encode(os.path.basename(path).encode()).decode("utf-8") + scan_code = base64.b64encode(scan_code.encode()).decode("utf-8") + headers = {"FOSSID-SCAN-CODE": scan_code, "FOSSID-FILE-NAME": name} + try: + with open(path, "rb") as file: + resp = requests.post( + self.api_url, + headers=headers, + data=file, + auth=(self.api_user, self.api_token), + timeout=1800, + ) + try: + resp.json() + except: + print(f"Failed to decode json {resp.text}") + print(traceback.print_exc()) + sys.exit(1) + except IOError: + # Error opening file + print(f"Failed to upload hashes for scan {scan_code}") + print(traceback.print_exc()) + sys.exit(1) + + def _delete_existing_scan(self, scan_code: str): + """ + Deletes a scan + + Args: + scan_code (str): The code of the scan to be deleted + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "scans", + "action": "delete", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + "delete_identifications": "true", + }, + } + return self._send_request(payload) + + def create_webapp_scan(self, scan_code: str, project_code: str = None) -> bool: + """ + Creates a new web application scan in the Workbench. + + Args: + scan_code (str): The unique identifier for the scan. + project_code (str, optional): The project code within which to create the scan. + + Returns: + bool: True if the scan was successfully created, False otherwise. + """ + payload = { + "group": "scans", + "action": "create", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + "scan_name": scan_code, + "project_code": project_code, + "description": "Automatically created scan by Workbench Agent script.", + }, + } + response = self._send_request(payload) + if response["status"] != "1": + raise builtins.Exception( + "Failed to create scan {}: {}".format(scan_code, response) + ) + if "error" in response.keys(): + raise builtins.Exception( + "Failed to create scan {}: {}".format(scan_code, response["error"]) + ) + return response["data"]["scan_id"] + + def _get_scan_status(self, scan_type: str, scan_code: str): + """ + Calls API scans -> check_status to determine if the process is finished. + + Args: + scan_type (str): One of these: SCAN, REPORT_IMPORT, DEPENDENCY_ANALYSIS, REPORT_GENERATION, DELETE_SCAN. + scan_code (str): The unique identifier for the scan. + + Returns: + dict: The data section from the JSON response returned from API. + """ + payload = { + "group": "scans", + "action": "check_status", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + "type": scan_type, + }, + } + response = self._send_request(payload) + if response["status"] != "1": + raise builtins.Exception( + "Failed to retrieve scan status from \ + scan {}: {}".format( + scan_code, response["error"] + ) + ) + return response["data"] + + def start_dependency_analysis(self, scan_code: str): + """ + Initiate dependency analysis for a scan. + + Args: + scan_code (str): The unique identifier for the scan. + """ + payload = { + "group": "scans", + "action": "run_dependency_analysis", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] != "1": + raise builtins.Exception( + "Failed to start dependency analysis scan {}: {}".format( + scan_code, response["error"] + ) + ) + + def wait_for_scan_to_finish( + self, + scan_type: str, + scan_code: str, + scan_number_of_tries: int, + scan_wait_time: int, + ): + """ + Check if the scan finished after each 'scan_wait_time' seconds for 'scan_number_of_tries' number of tries. + If the scan is finished return true. If the scan is not finished after all tries throw Exception. + + Args: + scan_type (str): Types: SCAN, REPORT_IMPORT, DEPENDENCY_ANALYSIS, REPORT_GENERATION, DELETE_SCAN + scan_code (str): Unique scan identifier. + scan_number_of_tries (int): Number of calls to "check_status" till declaring the scan failed. + scan_wait_time (int): Time interval between calling "check_status", expressed in seconds + + Returns: + bool + """ + # pylint: disable-next=unused-variable + for x in range(scan_number_of_tries): + scan_status = self._get_scan_status(scan_type, scan_code) + is_finished = ( + scan_status["is_finished"] + or scan_status["is_finished"] == "1" + or scan_status["status"] == "FAILED" + or scan_status["status"] == "FINISHED" + ) + if is_finished: + if ( + scan_status["percentage_done"] == "100%" + or scan_status["percentage_done"] == 100 + or ( + scan_type == "DEPENDENCY_ANALYSIS" + and ( + scan_status["percentage_done"] == "0%" + or scan_status["percentage_done"] == "0%%" + ) + ) + ): + print( + "Scan percentage_done = 100%, scan has finished. Status: {}".format( + scan_status["status"] + ) + ) + return True + raise builtins.Exception( + "Scan finished with status: {} percentage: {} ".format( + scan_status["status"], scan_status["percentage_done"] + ) + ) + # If scan did not finished, print info about progress + print( + "Scan {} is running. Percentage done: {}% Status: {}".format( + scan_code, scan_status["percentage_done"], scan_status["status"] + ) + ) + # Wait given time + time.sleep(scan_wait_time) + # If this code is reached it means the scan didn't finished after scan_number_of_tries X scan_wait_time + print("{} timeout: {}".format(scan_type, scan_code)) + raise builtins.Exception("scan timeout") + + def _get_pending_files(self, scan_code: str): + """ + Call API scans -> get_pending_files. + + Args: + scan_code (str): The unique identifier for the scan. + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "scans", + "action": "get_pending_files", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] == "1" and "data" in response.keys(): + return response["data"] + # all other situations + raise builtins.Exception( + "Error getting pending files \ + result: {}".format( + response + ) + ) + + def scans_get_policy_warnings_counter(self, scan_code: str): + """ + Retrieve policy warnings information at scan level. + + Args: + scan_code (str): The unique identifier for the scan. + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "scans", + "action": "get_policy_warnings_counter", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] == "1" and "data" in response.keys(): + return response["data"] + raise builtins.Exception( + "Error getting project policy warnings information \ + result: {}".format( + response + ) + ) + + def projects_get_policy_warnings_info(self, project_code: str): + """ + Retrieve policy warnings information at project level. + + Args: + project_code (str): The unique identifier for the project. + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "projects", + "action": "get_policy_warnings_info", + "data": { + "username": self.api_user, + "key": self.api_token, + "project_code": project_code, + }, + } + response = self._send_request(payload) + if response["status"] == "1" and "data" in response.keys(): + return response["data"] + raise builtins.Exception( + "Error getting project policy warnings information \ + result: {}".format( + response + ) + ) + + def get_scan_identified_components(self, scan_code: str): + """ + Retrieve the list of identified components from one scan. + + Args: + scan_code (str): The unique identifier for the scan. + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "scans", + "action": "get_scan_identified_components", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] == "1" and "data" in response.keys(): + return response["data"] + raise builtins.Exception( + "Error getting identified components \ + result: {}".format( + response + ) + ) + + def get_scan_identified_licenses(self, scan_code: str): + """ + Retrieve the list of identified licenses from one scan. + + Args: + scan_code (str): The unique identifier for the scan. + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "scans", + "action": "get_scan_identified_licenses", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + "unique": "1", + }, + } + response = self._send_request(payload) + if response["status"] == "1" and "data" in response.keys(): + return response["data"] + raise builtins.Exception( + "Error getting identified licenses \ + result: {}".format( + response + ) + ) + + def _get_dependency_analysis_result(self, scan_code: str): + """ + Retrieve dependency analysis results. + + Args: + scan_code (str): The unique identifier for the scan. + + Returns: + dict: The JSON response from the API. + """ + payload = { + "group": "scans", + "action": "get_dependency_analysis_results", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] == "1" and "data" in response.keys(): + return response["data"] + + raise builtins.Exception( + "Error getting dependency analysis \ + result: {}".format( + response + ) + ) + + def _cancel_scan(self, scan_code: str): + """ + Cancel a scan. + + Args: + scan_code (str): The unique identifier for the scan. + """ + payload = { + "group": "scans", + "action": "cancel_run", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] != "1": + raise builtins.Exception("Error cancelling scan: {}".format(response)) + + def _assert_scan_can_start(self, scan_code: str): + """ + Verify if a new scan can be initiated. + + Args: + scan_code (str): The unique identifier for the scan. + """ + scan_status = self._get_scan_status("SCAN", scan_code) + # List of possible scan statuses taken from Workbench code: + # public const NEW = 'NEW'; + # public const QUEUED = 'QUEUED'; + # public const STARTING = 'STARTING'; + # public const RUNNING = 'RUNNING'; + # public const FINISHED = 'FINISHED'; + # public const FAILED = 'FAILED'; + if scan_status["status"] not in ["NEW", "FINISHED", "FAILED"]: + raise builtins.Exception( + "Cannot start scan. Current status of the scan is {}.".format( + scan_status["status"] + ) + ) + + def assert_dependency_analysis_can_start(self, scan_code: str): + """ + Verify if a new dependency analysis scan can be initiated. + + Args: + scan_code (str): The unique identifier for the scan. + """ + scan_status = self._get_scan_status("DEPENDENCY_ANALYSIS", scan_code) + # List of possible scan statuses taken from Workbench code: + # public const NEW = 'NEW'; + # public const QUEUED = 'QUEUED'; + # public const STARTING = 'STARTING'; + # public const RUNNING = 'RUNNING'; + # public const FINISHED = 'FINISHED'; + # public const FAILED = 'FAILED'; + if scan_status["status"] not in ["NEW", "FINISHED", "FAILED"]: + raise builtins.Exception( + "Cannot start dependency analysis. Current status of the scan is {}.".format( + scan_status["status"] + ) + ) + + def extract_archives( + self, + scan_code: str, + recursively_extract_archives: bool, + jar_file_extraction: bool, + ): + """ + Extract archive + + Args: + scan_code (str): The unique identifier for the scan. + recursively_extract_archives (bool): Yes or no + jar_file_extraction (bool): Yes or no + + Returns: + bool: true for successful API call + """ + payload = { + "group": "scans", + "action": "extract_archives", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + "recursively_extract_archives": recursively_extract_archives, + "jar_file_extraction": jar_file_extraction, + }, + } + response = self._send_request(payload) + if response["status"] == "0": + raise builtins.Exception( + "Call extract_archives returned error: {}".format(response) + ) + return True + + def check_if_scan_exists(self, scan_code: str): + """ + Check if scan exists. + + Args: + scan_code (str): The unique identifier for the scan. + + Returns: + bool: Yes or no. + """ + payload = { + "group": "scans", + "action": "get_information", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + }, + } + response = self._send_request(payload) + if response["status"] == "1": + return True + else: + return False + + def check_if_project_exists(self, project_code: str): + """ + Check if project exists. + + Args: + project_code (str): The unique identifier for the scan. + + Returns: + bool: Yes or no. + """ + payload = { + "group": "projects", + "action": "get_information", + "data": { + "username": self.api_user, + "key": self.api_token, + "project_code": project_code, + }, + } + response = self._send_request(payload) + if response["status"] == "0": + return False + # if response["status"] == "0": + # raise builtins.Exception("Failed to get project status: {}".format(response)) + return True + + def create_project(self, project_code: str): + """ + Create new project + + Args: + project_code (str): The unique identifier for the scan. + """ + payload = { + "group": "projects", + "action": "create", + "data": { + "username": self.api_user, + "key": self.api_token, + "project_code": project_code, + "project_name": project_code, + "description": "Automatically created by Workbench Agent script", + }, + } + response = self._send_request(payload) + if response["status"] != "1": + raise builtins.Exception("Failed to create project: {}".format(response)) + print("Created project {}".format(project_code)) + + def run_scan( + self, + scan_code: str, + limit: int, + sensitivity: int, + auto_identification_detect_declaration: bool, + auto_identification_detect_copyright: bool, + auto_identification_resolve_pending_ids: bool, + delta_only: bool, + reuse_identification: bool, + identification_reuse_type: str = None, + specific_code: str = None, + ): + """ + + Args: + scan_code (str): Unique scan identifier + limit (int): Limit the number of matches against the KB + sensitivity (int): Result sensitivity + auto_identification_detect_declaration (bool): Automatically detect license declaration inside files + auto_identification_detect_copyright (bool): Automatically detect copyright statements inside files + auto_identification_resolve_pending_ids (bool): Automatically resolve pending identifications + delta_only (bool): Scan only new or modified files + reuse_identification (bool): Reuse previous identifications + identification_reuse_type (str): Possible values: any,only_me,specific_project,specific_scan + specific_code (str): Fill only when reuse type: specific_project or specific_scan + + Returns: + + """ + scan_exists = self.check_if_scan_exists(scan_code) + if not scan_exists: + raise builtins.Exception( + "Scan with scan_code: {} doesn't exist when calling 'run' action!".format( + scan_code + ) + ) + + self._assert_scan_can_start(scan_code) + print("Starting scan {}".format(scan_code)) + payload = { + "group": "scans", + "action": "run", + "data": { + "username": self.api_user, + "key": self.api_token, + "scan_code": scan_code, + "limit": limit, + "sensitivity": sensitivity, + "auto_identification_detect_declaration": int( + auto_identification_detect_declaration + ), + "auto_identification_detect_copyright": int( + auto_identification_detect_copyright + ), + "auto_identification_resolve_pending_ids": int( + auto_identification_resolve_pending_ids + ), + "delta_only": int(delta_only), + }, + } + if reuse_identification: + data = payload["data"] + data["reuse_identification"] = "1" + # 'any', 'only_me', 'specific_project', 'specific_scan' + if identification_reuse_type in {"specific_project", "specific_scan"}: + data["identification_reuse_type"] = identification_reuse_type + data["specific_code"] = specific_code + else: + data["identification_reuse_type"] = identification_reuse_type + + response = self._send_request(payload) + if response["status"] != "1": + logger.error( + "Failed to start scan {}: {} payload {}".format( + scan_code, response, payload + ) + ) + raise builtins.Exception( + "Failed to start scan {}: {}".format(scan_code, response["error"]) + ) + return response + + +class CliWrapper: + """ + A class to interact with the FossID CLI. + + Attributes: + cli_path (string): Path to the executable file "fossid" + config_path (string): Path to the configuration file "fossid.conf" + timeout (int): timeout for CLI expressed in seconds + """ + + # __parameters (dictionary): Dictionary of parameters passed to 'fossid-cli' + __parameters = {} + + def __init__(self, cli_path, config_path, timeout="120"): + self.cli_path = cli_path + self.config_path = config_path + self.timeout = timeout + + # Executes fossid-cli --version + # Returns string + def get_version(self): + """ + Get CLI version + + Args: + self + + Returns: + str + """ + args = ["timeout", self.timeout, self.cli_path, "--version"] + try: + result = subprocess.check_output(args, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + return ( + "Calledprocerr: " + + str(e.cmd) + + " " + + str(e.returncode) + + " " + + str(e.output) + ) + # pylint: disable-next=broad-except + except Exception as e: + return "Error: " + str(e) + + return result + + def blind_scan(self, path): + """ + Call fossid-cli on a given path in order to generate hashes of the files from that path + + Args: + path (str): path of the code to be scanned + + Returns: + str: path to temporary .fossid file containing generated hashes + """ + temporary_file_path = "/tmp/blind_scan_result_" + self.randstring(8) + ".fossid" + # Create temporary file, make it empty if already exists + # pylint: disable-next=consider-using-with,unspecified-encoding + open(temporary_file_path, "w").close() + my_cmd = f"timeout {self.timeout} {self.cli_path} --local --enable-sha1=1 {path} > {temporary_file_path}" + try: + # pylint: disable-next=unspecified-encoding + with open(temporary_file_path, "w") as outfile: + subprocess.check_output(my_cmd, shell=True, stderr=outfile) + # result = subprocess.check_output(args, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + print( + "Calledprocerr: " + + str(e.cmd) + + " " + + str(e.returncode) + + " " + + str(e.output) + ) + print(traceback.format_exc()) + sys.exit() + # pylint: disable-next=broad-except + except Exception as e: + print("Error: " + str(e)) + print(traceback.format_exc()) + sys.exit() + + return temporary_file_path + + @staticmethod + def randstring(length=10): + """ + Generate a random string of a given length + + Parameters: + length (int): Length of the generated string + + Returns: + str + """ + valid_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + return "".join((random.choice(valid_letters) for i in range(0, length))) + + +def parse_cmdline_args(): + """ + Parses command line arguments for the script. + + Returns: + argparse.Namespace: An object containing the parsed command line arguments. + """ + parser = argparse.ArgumentParser( + add_help=False, + description="Run FossID Workbench Agent", + formatter_class=RawTextHelpFormatter, + ) + required = parser.add_argument_group("required arguments") + optional = parser.add_argument_group("optional arguments") + + # Add back help + optional.add_argument( + "-h", + "--help", + action="help", + default=argparse.SUPPRESS, + help="show this help message and exit", + ) + + required.add_argument( + "--api_url", + help="URL of the Workbench API instance, Ex: https://myserver.com/api.php", + type=str, + required=True, + ) + required.add_argument( + "--api_user", + help="Workbench user that will make API calls", + type=str, + required=True, + ) + required.add_argument( + "--api_token", + help="Workbench user API token (Not the same with user password!!!)", + type=str, + required=True, + ) + required.add_argument( + "--project_code", + help="Name of the project inside Workbench where the scan will be created.\n" + "If the project doesn't exist, it will be created", + type=str, + required=True, + ) + required.add_argument( + "--scan_code", + help="The scan code user when creating the scan in Workbench. It can be based on some env var,\n" + "for example: ${BUILD_NUMBER}", + type=str, + required=True, + ) + optional.add_argument( + "--limit", + help="Limits CLI results to N most significant matches (default: 10)", + type=int, + default=10, + ) + optional.add_argument( + "--sensitivity", + help="Sets snippet sensitivity to a minimum of N lines (default: 10)", + type=int, + default=10, + ) + optional.add_argument( + "--recursively_extract_archives", + help="Recursively extract nested archives. Default true.", + action="store_true", + default=True, + ) + optional.add_argument( + "--jar_file_extraction", + help="Control default behavior related to extracting jar files. Default false.", + action="store_true", + default=False, + ) + optional.add_argument( + "--blind_scan", + help="Call CLI and generate file hashes. Upload hashes and initiate blind scan.", + action="store_true", + default=False, + ) + + optional.add_argument( + "--run_dependency_analysis", + help="Initiate dependency analysis after finishing scanning for matches in KB.", + action="store_true", + default=False, + ) + optional.add_argument( + "--run_only_dependency_analysis", + help="Scan only for dependencies, no results from KB.", + action="store_true", + default=False, + ) + optional.add_argument( + "--auto_identification_detect_declaration", + help="Automatically detect license declaration inside files. This argument expects no value, not passing\n" + "this argument is equivalent to assigning false.", + action="store_true", + default=False, + ) + optional.add_argument( + "--auto_identification_detect_copyright", + help="Automatically detect copyright statements inside files. This argument expects no value, not passing\n" + "this argument is equivalent to assigning false.", + action="store_true", + default=False, + ) + optional.add_argument( + "--auto_identification_resolve_pending_ids", + help="Automatically resolve pending identifications. This argument expects no value, not passing\n" + "this argument is equivalent to assigning false.", + action="store_true", + default=False, + ) + optional.add_argument( + "--delta_only", + help="""Scan only delta (newly added files from last scan).""", + action="store_true", + default=False, + ) + optional.add_argument( + "--reuse_identifications", + help="If present, try to use an existing identification depending on parameter ‘identification_reuse_type‘.", + action="store_true", + default=False, + required=False, + ) + optional.add_argument( + "--identification_reuse_type", + help="Based on reuse type last identification found will be used for files with the same hash.", + choices=["any", "only_me", "specific_project", "specific_scan"], + default="any", + type=str, + required=False, + ) + optional.add_argument( + "--specific_code", + help="The scan code used when creating the scan in Workbench. It can be based on some env var,\n" + "for example: ${BUILD_NUMBER}", + type=str, + required=False, + ) + required.add_argument( + "--scan_number_of_tries", + help="""Number of calls to 'check_status' till declaring the scan failed from the point of view of the agent""", + type=int, + default=960, # This means 8 hours when --scan_wait_time has default value 30 seconds + required=False, + ) + required.add_argument( + "--scan_wait_time", + help="Time interval between calling 'check_status', expressed in seconds (default 30 seconds)", + type=int, + default=30, + required=False, + ) + required.add_argument( + "--path", + help="Path of the directory where the files to be scanned reside", + type=str, + required=True, + ) + + optional.add_argument( + "--log", + help="specify logging level. Allowed values: DEBUG, INFO, WARNING, ERROR", + default="ERROR", + ) + + optional.add_argument( + "--path-result", + help="Save results to specified path", + type=str, + required=False, + ) + + optional.add_argument( + "--get_scan_identified_components", + help="By default at the end of scanning the list of licenses identified will be retrieved.\n" + "When passing this parameter the agent will return the list of identified components instead.\n" + "This argument expects no value, not passing this argument is equivalent to assigning false.", + action="store_true", + default=False, + ) + optional.add_argument( + "--scans_get_policy_warnings_counter", + help="By default at the end of scanning the list of licenses identified will be retrieved.\n" + "When passing this parameter the agent will return information about policy warnings found in this scan\n" + "based on policy rules set at Project level.\n" + "This argument expects no value, not passing this argument is equivalent to assigning false.", + action="store_true", + default=False, + ) + optional.add_argument( + "--projects_get_policy_warnings_info", + help="By default at the end of scanning the list of licenses identified will be retrieved.\n" + "When passing this parameter the agent will return information about policy warnings for project,\n" + "including the warnings counter.\n" + "This argument expects no value, not passing this argument is equivalent to assigning false.", + action="store_true", + default=False, + ) + + args = parser.parse_args() + return args + + +def save_results(params, results): + """ + Saves the scanning results to a specified path. + + Parameters: + params (argparse.Namespace): Parsed command line parameters. + results (dict): The scan results to be saved. + """ + if params.path_result: + if os.path.isdir(params.path_result): + fname = os.path.join(params.path_result, "wb_results.json") + try: + with open(fname, "w") as file: + file.write(json.dumps(results, indent=4)) + print(f"Results saved to: {fname}") + except builtins.Exception: + logger.debug(f"Error trying to write results to {fname}") + print(f"Error trying to write results to {fname}") + elif os.path.isfile(params.path_result): + fname = params.path_result + _folder = os.path.dirname(params.path_result) + _fname = os.path.basename(params.path_result) + if _fname: + if not _fname.endswith(".json"): + try: + extension = _fname.split(".")[-1] + _fname = _fname.replace(extension, "json") + except (TypeError, IndexError): + _fname = f"{_fname.replace('.', '_')}.json" + else: + _fname = "wb_results.json" + try: + os.makedirs(_folder, exist_ok=True) + try: + with open(fname, "w") as file: + file.write(json.dumps(results, indent=4)) + print(f"Results saved to: {fname}") + except builtins.Exception: + logger.debug(f"Error trying to write results to {fname}") + except PermissionError: + logger.debug(f"Error trying to create folder: {_folder}") + else: + logger.debug(f"Folder or file does not exist: {params.path_result}") + try: + fname = params.path_result + if fname.endswith(".json"): + _folder = os.path.dirname(fname) + else: + if "." in fname: + _folder = os.path.dirname(fname) + else: + _folder = fname + fname = os.path.join(_folder, "wb_results.json") + try: + os.makedirs(_folder, exist_ok=True) + try: + with open(fname, "w") as file: + file.write(json.dumps(results, indent=4)) + print(f"Results saved to: {fname}") + except builtins.Exception: + logger.debug(f"Error trying to write results to {fname}") + except builtins.Exception: + logger.debug(f"Error trying to create folder: {_folder}") + except builtins.Exception: + logger.debug(f"Error trying to create report: {params.path_result}") + + +def main(): + # Retrieve parameters from command line + params = parse_cmdline_args() + logger.setLevel(params.log) + f_handler = logging.FileHandler("log-agent.txt") + logger.addHandler(f_handler) + + # Display parsed parameters + print("Parsed parameters: ") + for k, v in params.__dict__.items(): + print("{} = {}".format(k, v)) + + if params.blind_scan: + cli_wrapper = CliWrapper("/usr/bin/fossid-cli", "/etc/fossid.conf") + # Display fossid-cli version just to validate the path to CLI + print(cli_wrapper.get_version()) + + # Run scan and save .fossid file as temporary file + blind_scan_result_path = cli_wrapper.blind_scan(params.path) + print( + "Temporary file containing hashes generated at path: {}".format( + blind_scan_result_path + ) + ) + + # Create Project if it doesn't exist + workbench = Workbench(params.api_url, params.api_user, params.api_token) + if not workbench.check_if_project_exists(params.project_code): + workbench.create_project(params.project_code) + # Create scan if it doesn't exist + scan_exists = workbench.check_if_scan_exists(params.scan_code) + if not scan_exists: + print( + f"Scan with code {params.scan_code} does not exist. Calling API to create it..." + ) + workbench.create_webapp_scan(params.scan_code, params.project_code) + else: + print( + f"Scan with code {params.scan_code} already exists. Proceeding to uploading hashes..." + ) + # Handle blind scan differently from regular scan + if params.blind_scan: + # Upload temporary file with blind scan hashes + print("Parsed path: ", params.path) + workbench.upload_files(params.scan_code, blind_scan_result_path) + + # delete .fossid file containing hashes (after upload to scan) + if os.path.isfile(blind_scan_result_path): + os.remove(blind_scan_result_path) + else: + print( + "Can not delete the file {} as it doesn't exists".format( + blind_scan_result_path + ) + ) + # Handle normal scanning (directly uploading files at given path instead of generating hashes with CLI) + else: + if not os.path.isdir(params.path): + # The given path is an actual file path. Only this file will be uploaded + print( + "Uploading file indicated in --path parameter: {}".format(params.path) + ) + workbench.upload_files(params.scan_code, params.path) + else: + # Get all files found at given path (including in subdirectories). Exclude directories + print( + "Uploading files found in directory indicated in --path parameter: {}".format( + params.path + ) + ) + counter_files = 0 + for root, directories, filenames in os.walk(params.path): + for filename in filenames: + if not os.path.isdir(os.path.join(root, filename)): + counter_files = counter_files + 1 + workbench.upload_files( + params.scan_code, os.path.join(root, filename) + ) + print("A total of {} files uploaded".format(counter_files)) + print("Calling API scans->extracting_archives") + workbench.extract_archives( + params.scan_code, + params.recursively_extract_archives, + params.jar_file_extraction, + ) + + # If --run_only_dependency_analysis parameter is true ONLY run dependency analysis, no KB scanning + if params.run_only_dependency_analysis: + workbench.assert_dependency_analysis_can_start(params.scan_code) + print("Starting dependency analysis for scan: {}".format(params.scan_code)) + workbench.start_dependency_analysis(params.scan_code) + # Check if finished based on: scan_number_of_tries X scan_wait_time until throwing an error + workbench.wait_for_scan_to_finish( + "DEPENDENCY_ANALYSIS", + params.scan_code, + params.scan_number_of_tries, + params.scan_wait_time, + ) + # Run scan + else: + workbench.run_scan( + params.scan_code, + params.limit, + params.sensitivity, + params.auto_identification_detect_declaration, + params.auto_identification_detect_copyright, + params.auto_identification_resolve_pending_ids, + params.delta_only, + params.reuse_identifications, + params.identification_reuse_type, + params.specific_code, + ) + # Check if finished based on: scan_number_of_tries X scan_wait_time until throwing an error + workbench.wait_for_scan_to_finish( + "SCAN", params.scan_code, params.scan_number_of_tries, params.scan_wait_time + ) + + # If --run_dependency_analysis parameter is true run also dependency analysis + if params.run_dependency_analysis: + workbench.assert_dependency_analysis_can_start(params.scan_code) + print("Starting dependency analysis for scan: {}".format(params.scan_code)) + workbench.start_dependency_analysis(params.scan_code) + # Check if finished based on: scan_number_of_tries X scan_wait_time until throwing an error + workbench.wait_for_scan_to_finish( + "DEPENDENCY_ANALYSIS", + params.scan_code, + params.scan_number_of_tries, + params.scan_wait_time, + ) + + # When scan finished retrieve licenses list by default of if parameter --get_scan_identified_components is True call + # scans -> get_scan_identified_components + if params.get_scan_identified_components: + print("Identified components: ") + identified_components = workbench.get_scan_identified_components( + params.scan_code + ) + print(json.dumps(identified_components)) + save_results(params=params, results=identified_components) + sys.exit(0) + + # projects -> get_policy_warnings_info + elif params.scans_get_policy_warnings_counter: + if params.project_code is None or params.project_code == "": + print( + "Parameter project_code missing!\n" + "In order for the scans->get_policy_warnings_counter to be called a project code is required." + ) + sys.exit(1) + print(f"Scan: {params.scan_code} policy warnings info: ") + info_policy = workbench.scans_get_policy_warnings_counter(params.scan_code) + print(json.dumps(info_policy)) + save_results(params=params, results=info_policy) + sys.exit(0) + # When scan finished retrieve project policy warnings info + # projects -> get_policy_warnings_info + elif params.projects_get_policy_warnings_info: + if params.project_code is None or params.project_code == "": + print( + "Parameter project_code missing!\n" + "In order for the projects->get_policy_warnings_info to be called a project code is required." + ) + sys.exit(1) + print(f"Project {params.project_code} policy warnings info: ") + info_policy = workbench.projects_get_policy_warnings_info(params.project_code) + print(json.dumps(info_policy)) + save_results(params=params, results=info_policy) + sys.exit(0) + else: + print("Identified licenses: ") + identified_licenses = workbench.get_scan_identified_licenses(params.scan_code) + print(json.dumps(identified_licenses)) + save_results(params=params, results=identified_licenses) + + +main()