diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 0000000..9613649 --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,26 @@ +name: Pylint + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Set up Python path + run: echo "PYTHONPATH=$PWD" >> $GITHUB_ENV + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Analysing the code with pylint + run: | + pylint $(git ls-files '*.py') diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..bbaa833 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,627 @@ +[MAIN] + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint +# in a server-like mode. +clear-cache-post-run=no + +# Load and enable all available extensions. Use --list-extensions to see a list +# all available extensions. +#enable-all-extensions= + +# In error mode, messages with a category besides ERROR or FATAL are +# suppressed, and no reports are done by default. Error mode is compatible with +# disabling specific errors. +#errors-only= + +# Always return a 0 (non-error) status code, even if lint errors are found. +# This is primarily useful in continuous integration scripts. +#exit-zero= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +extension-pkg-whitelist= + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Specify a score threshold under which the program will exit with error. +fail-under=10 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +#from-stdin= + +# Files or directories to be skipped. They should be base names, not paths. +ignore=CVS + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, +# it can't be used as an escape character. +ignore-paths= + +# Files or directories matching the regular expression patterns are skipped. +# The regex matches against base names, not paths. The default value ignores +# Emacs file locks +ignore-patterns=^\.# + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis). It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Minimum Python version to use for version dependent checks. Will default to +# the version used to run pylint. +py-version=3.12 + +# Discover python modules and packages in the file system subtree. +recursive=no + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +source-roots= + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# In verbose mode, extra non-checker-related info will be displayed. +#verbose= + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. If left empty, argument names will be checked with the set +# naming style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +#class-attribute-rgx= + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +#class-const-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. If left empty, class names will be checked with the set naming style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. If left empty, function names will be checked with the set +# naming style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + k, + ex, + Run, + _ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Regular expression matching correct type alias names. If left empty, type +# alias names will be checked with the set naming style. +#typealias-rgx= + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +#typevar-rgx= + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. If left empty, variable names will be checked with the set +# naming style. +#variable-rgx= + + +[CLASSES] + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + asyncSetUp, + __post_init__ + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make,os._exit + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +exclude-too-few-public-methods= + +# List of qualified class names to ignore when counting class parents (see +# R0901) +ignored-parents= + +# Maximum number of arguments for function / method. +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# Maximum number of branch for function / method body. +max-branches=12 + +# Maximum number of locals for function / method body. +max-locals=15 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body. +max-returns=6 + +# Maximum number of statements in function / method body. +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when caught. +overgeneral-exceptions=builtins.BaseException,builtins.Exception + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=100 + +# Maximum number of lines in a module. +max-module-lines=1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow explicit reexports by alias from a package __init__. +allow-reexport-from-package=no + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules= + +# Output a graph (.gv or any supported image format) of external dependencies +# to the given file (report RP0402 must not be disabled). +ext-import-graph= + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be +# disabled). +import-graph= + +# Output a graph (.gv or any supported image format) of internal dependencies +# to the given file (report RP0402 must not be disabled). +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[LOGGING] + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, +# UNDEFINED. +confidence=HIGH, + CONTROL_FLOW, + INFERENCE, + INFERENCE_FAILURE, + UNDEFINED + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=R,I,W, + logging-fstring-interpolation, + + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + + +[METHOD_ARGS] + +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + +# Regular expression of note tags to take in consideration. +notes-rgx= + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each +# category, as well as 'statement' which is the total number of statements +# analyzed. This score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +msg-template= + +# Set the output format. Available formats are: text, parseable, colorized, +# json2 (improved json format), json (old json format) and msvs (visual +# studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +#output-format= + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[SIMILARITIES] + +# Comments are removed from the similarity computation +ignore-comments=yes + +# Docstrings are removed from the similarity computation +ignore-docstrings=yes + +# Imports are removed from the similarity computation +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. No available dictionaries : You need to install +# both the python package and the system dependency for enchant to work. +spelling-dict= + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins=no-member, + not-async-context-manager, + not-context-manager, + attribute-defined-outside-init + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx=.*[Mm]ixin + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io diff --git a/lshell/builtins.py b/lshell/builtincmd.py similarity index 78% rename from lshell/builtins.py rename to lshell/builtincmd.py index 054ab3d..a1f55f5 100644 --- a/lshell/builtins.py +++ b/lshell/builtincmd.py @@ -1,22 +1,4 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" This module contains the built-in commands of lshell """ import glob import sys @@ -54,9 +36,9 @@ def lsudo(conf): for command in conf["sudo_commands"]: sys.stdout.write(f" - {command}\n") return 0 - else: - sys.stdout.write("No sudo commands allowed\n") - return 1 + + sys.stdout.write("No sudo commands allowed\n") + return 1 def history(conf, log): @@ -67,11 +49,11 @@ def history(conf, log): except IOError: log.error(f"WARN: couldn't write history to file {conf['history_file']}\n") return 1 - f = open(conf["history_file"], "r") - i = 1 - for item in f.readlines(): - sys.stdout.write(f"{i}: {item}") - i += 1 + with open(conf["history_file"], "r", encoding="utf-8") as f: + i = 1 + for item in f.readlines(): + sys.stdout.write(f"{i}: {item}") + i += 1 except (OSError, IOError, FileNotFoundError) as e: # Catch specific exceptions log.critical(f"** Unable to read the history file: {e}") return 1 diff --git a/lshell/checkconfig.py b/lshell/checkconfig.py index ae7105b..53dc745 100644 --- a/lshell/checkconfig.py +++ b/lshell/checkconfig.py @@ -1,22 +1,4 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" This module contains the checkconfig class of lshell """ import sys import os @@ -29,12 +11,12 @@ import grp import time import glob +from logging.handlers import SysLogHandler # import lshell specifics from lshell import utils from lshell import variables -from lshell import sec -from lshell import builtins +from lshell.builtincmd import export class CheckConfig: @@ -69,18 +51,16 @@ def __init__(self, args, refresh=None, stdin=None, stdout=None, stderr=None): self.check_user_integrity() self.get_config_user() self.check_env() - self.check_scp_sftp() self.set_noexec() def check_config_file_exists(self, configfile): """Check if the configuration file exists, else exit with error""" if not os.path.exists(configfile): self.stderr.write("Error: Config file doesn't exist\n") - self.stderr.write(variables.usage) - sys.exit(1) + utils.usage() def check_script(self): - # Check if lshell is invoked with the correct binary and script extension + """Check if lshell is invoked with the correct binary and script extension""" if sys.argv[0].endswith("bin/lshell") and sys.argv[-1].endswith(".lsh"): script_path = os.path.realpath(sys.argv[-1]) self.log.debug(f"Detected script mode: {script_path}") @@ -115,7 +95,7 @@ def getoptions(self, arguments, conf): if option in ["-c"]: conf["ssh"] = value if option in ["-h", "--help"]: - utils.usage() + utils.usage(exitcode=0) if option in ["--version"]: utils.version() @@ -145,10 +125,10 @@ def check_env(self): for envfile in self.conf["env_vars_files"]: file_path = os.path.expandvars(envfile) try: - with open(file_path) as env_vars: + with open(file_path, encoding="utf-8") as env_vars: for env_var in env_vars.readlines(): if env_var.split(" ", 1)[0] == "export": - builtins.export(env_var.strip()) + export(env_var.strip()) except (OSError, IOError): self.stderr.write( f"ERROR: Unable to read environment file: {file_path}\n" @@ -160,8 +140,7 @@ def check_file(self, file): """ if not os.path.exists(file): self.stderr.write("Error: Config file doesn't exist\n") - self.stderr.write(variables.usage) - sys.exit(1) + utils.usage() else: self.config = configparser.ConfigParser() @@ -195,11 +174,12 @@ def check_log(self): } # create logger for lshell application - if "syslogname" in self.conf: + if self.conf.get("syslogname"): try: - logname = eval(self.conf["syslogname"]) + logname = str(self.conf["syslogname"]) except (SyntaxError, NameError, TypeError): - logname = self.conf["syslogname"] + sys.stderr.write("ERR: syslogname must be a string\n") + sys.exit(1) else: logname = "lshell" @@ -209,15 +189,15 @@ def check_log(self): # this is useful if configuration is reloaded for loghandler in logger.handlers: try: - logging.shutdown(logger.handlers) + logging.shutdown(loghandler) except TypeError: pass for logfilter in logger.filters: logger.removeFilter(logfilter) - formatter = logging.Formatter("%%(asctime)s (%s): %%(message)s" % getuser()) + formatter = logging.Formatter(f"%(asctime)s ({getuser()}): %(message)s") syslogformatter = logging.Formatter( - "%s[%s]: %s: %%(message)s" % (logname, os.getpid(), getuser()) + f"{logname}[{os.getpid()}]: {getuser()}: %(message)s" ) logger.setLevel(logging.DEBUG) @@ -241,17 +221,18 @@ def check_log(self): self.conf["loglevel"] = 0 # read logfilename is exists, and set logfilename - if "logfilename" in self.conf: + if self.conf.get("logfilename"): try: - logfilename = eval(self.conf["logfilename"]) + logfilename = str(self.conf["logfilename"]) except (SyntaxError, NameError, TypeError): - logfilename = self.conf["logfilename"] + sys.stderr.write("ERR: logfilename must be a string\n") + sys.exit(1) currentime = time.localtime() - logfilename = logfilename.replace("%y", "%s" % currentime[0]) - logfilename = logfilename.replace("%m", "%02d" % currentime[1]) - logfilename = logfilename.replace("%d", "%02d" % currentime[2]) + logfilename = logfilename.replace("%y", f"{currentime[0]}") + logfilename = logfilename.replace("%m", f"{currentime[1]:02d}") + logfilename = logfilename.replace("%d", f"{currentime[2]:02d}") logfilename = logfilename.replace( - "%h", "%02d%02d" % (currentime[3], currentime[4]) + "%h", f"{currentime[3]:02d}{currentime[4]:02d}" ) logfilename = logfilename.replace("%u", getuser()) else: @@ -260,8 +241,6 @@ def check_log(self): if self.conf["loglevel"] > 0: try: if logfilename == "syslog": - from logging.handlers import SysLogHandler - syslog = SysLogHandler(address="/dev/log") syslog.setFormatter(syslogformatter) syslog.setLevel(self.levels[self.conf["loglevel"]]) @@ -270,8 +249,8 @@ def check_log(self): # if log file is writable add new log file handler logfile = os.path.join(self.conf["logpath"], logfilename + ".log") # create log file if it does not exist, and set permissions - fp = open(logfile, "a") - fp.close() + with open(logfile, "a", encoding="utf-8"): + pass try: os.chmod(logfile, 0o600) except OSError: @@ -299,8 +278,6 @@ def get_config(self): # list the include_dir directory and read configuration files if "include_dir" in self.conf: - import glob - self.conf["include_dir_conf"] = glob.glob(f"{self.conf['include_dir']}*") self.config.read(self.conf["include_dir_conf"]) @@ -369,7 +346,7 @@ def get_config_sub(self, section): # remove double slashes liste[0] = liste[0].replace("//", "/") self.conf_raw.update({key: str(liste)}) - elif stuff and type(eval(stuff)) == list: + elif stuff and isinstance(eval(stuff), list): self.conf_raw.update({key: stuff}) # case allowed is set to 'all' elif key == "allowed" and split[0] == "'all'": @@ -466,7 +443,7 @@ def check_user_integrity(self): In case fields are missing, the user is notified and exited from lshell """ for item in variables.required_config: - if item not in self.conf_raw.keys(): + if item not in self.conf_raw: self.log.critical(f"ERROR: Missing parameter '{item}'") self.log.critical( f"ERROR: Add it in the in the [{self.user}] or [default] section of conf file." @@ -594,7 +571,7 @@ def get_config_user(self): if "intro" in self.conf_raw: self.conf["intro"] = self.myeval(self.conf_raw["intro"]) else: - self.conf["intro"] = variables.intro + self.conf["intro"] = variables.INTRO if os.path.isdir(self.conf["home_path"]): # change dir to home when initially loading the configuration @@ -617,7 +594,7 @@ def get_config_user(self): except (KeyError, SyntaxError, TypeError, NameError): self.log.error(f"CONF: history file error: {self.conf['history_file']}") else: - self.conf["history_file"] = variables.history_file + self.conf["history_file"] = variables.HISTORY_FILE if not self.conf["history_file"].startswith("/"): self.conf["history_file"] = ( @@ -669,117 +646,6 @@ def get_config_user(self): self.log.error("WinSCP session started") - def check_scp_sftp(self): - """This method checks if the user is trying to SCP a file onto the - server. If this is the case, it checks if the user is allowed to use - SCP or not, and acts as requested. : ) - """ - if "ssh" in self.conf: - if "SSH_CLIENT" in os.environ and "SSH_TTY" not in os.environ: - - # check if sftp is requested and allowed - if "sftp-server" in self.conf["ssh"]: - if self.conf["sftp"] == 1: - self.log.error("SFTP connect") - retcode = utils.cmd_parse_execute(self.conf["ssh"]) - self.log.error("SFTP disconnect") - sys.exit(retcode) - else: - self.log.error("*** forbidden SFTP connection") - sys.exit(1) - - # initialize cli session - from lshell.shellcmd import ShellCmd - - cli = ShellCmd(self.conf, None, None, None, None, self.conf["ssh"]) - ret_check_path, self.conf = sec.check_path( - self.conf["ssh"], self.conf, ssh=1 - ) - if ret_check_path == 1: - self.ssh_warn("path over SSH", self.conf["ssh"]) - - # check if scp is requested and allowed - if self.conf["ssh"].startswith("scp "): - if self.conf["scp"] == 1 or "scp" in self.conf["overssh"]: - if " -f " in self.conf["ssh"]: - # case scp download is allowed - if self.conf["scp_download"]: - self.log.error(f'SCP: GET "{self.conf["ssh"]}"') - # case scp download is forbidden - else: - self.log.error( - f'SCP: download forbidden: "{self.conf["ssh"]}"' - ) - sys.exit(1) - elif " -t " in self.conf["ssh"]: - # case scp upload is allowed - if self.conf["scp_upload"]: - if "scpforce" in self.conf: - cmdsplit = self.conf["ssh"].split(" ") - scppath = os.path.realpath(cmdsplit[-1]) - forcedpath = os.path.realpath(self.conf["scpforce"]) - if scppath != forcedpath: - self.log.error( - f"SCP: forced SCP directory: {scppath}" - ) - cmdsplit.pop(-1) - cmdsplit.append(forcedpath) - self.conf["ssh"] = string.join(cmdsplit) - self.log.error(f'SCP: PUT "{self.conf["ssh"]}"') - # case scp upload is forbidden - else: - self.log.error( - f'SCP: upload forbidden: "{self.conf["ssh"]}"' - ) - sys.exit(1) - retcode = utils.cmd_parse_execute(self.conf["ssh"], cli) - self.log.error("SCP disconnect") - sys.exit(retcode) - else: - self.ssh_warn("SCP connection", self.conf["ssh"], "scp") - - # check if command is in allowed overssh commands - elif self.conf["ssh"]: - # replace aliases - self.conf["ssh"] = utils.get_aliases( - self.conf["ssh"], self.conf["aliases"] - ) - # if command is not "secure", exit - ret_check_secure, self.conf = sec.check_secure( - self.conf["ssh"], self.conf, strict=1, ssh=1 - ) - if ret_check_secure: - self.ssh_warn("char/command over SSH", self.conf["ssh"]) - # else - self.log.error(f'Over SSH: "{self.conf["ssh"]}"') - # if command is "help" - if self.conf["ssh"] == "help": - cli.do_help(None) - retcode = 0 - else: - retcode = utils.cmd_parse_execute(self.conf["ssh"], cli) - self.log.error("Exited") - sys.exit(retcode) - - # else warn and log - else: - self.ssh_warn("command over SSH", self.conf["ssh"]) - - else: - # case of shell escapes - self.ssh_warn("shell escape", self.conf["ssh"]) - - def ssh_warn(self, message, command="", key=""): - """log and warn if forbidden action over SSH""" - if key == "scp": - self.log.critical(f"*** forbidden {message}") - self.log.error(f"*** SCP command: {command}") - else: - self.log.critical(f'*** forbidden {message}: "{command}"') - self.stderr.write("This incident has been reported.\n") - self.log.error("Exited") - sys.exit(1) - def set_noexec(self): """This method checks the existence of the sudo_noexec library.""" # list of standard sudo_noexec.so file location diff --git a/lshell/sec.py b/lshell/sec.py index 9bbd14b..87ec056 100644 --- a/lshell/sec.py +++ b/lshell/sec.py @@ -1,26 +1,12 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" This module is used to check the security of the commands entered by the +user. It checks if the command is allowed, if the path is allowed, if the +command contains forbidden characters, etc. +""" import sys import re import os +import subprocess # import lshell specifics from lshell import utils @@ -85,7 +71,6 @@ def check_path(line, conf, completion=None, ssh=None, strict=None): if re.findall(r"\$|\*|\?", item): # remove quotes if available item = re.sub("\"|'", "", item) - import subprocess p = subprocess.Popen( f"`which echo` {item}", @@ -252,7 +237,7 @@ def check_secure(line, conf, strict=None, ssh=None): # in case of a sudo command, check in sudo_commands list if allowed if command == "sudo": - if type(cmdargs) is list: + if isinstance(cmdargs, list): # allow the -u (user) flag if cmdargs[1] == "-u" and cmdargs: sudocmd = cmdargs[3] diff --git a/lshell/shellcmd.py b/lshell/shellcmd.py index dcb238e..60e1095 100644 --- a/lshell/shellcmd.py +++ b/lshell/shellcmd.py @@ -1,22 +1,8 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" This module contains the main class of lshell, it is the class that is +responsible for the command line interface. It inherits from the cmd.Cmd class +from the Python standard library. It offers the default methods to add +security checks, logging, etc. +""" import cmd import sys @@ -26,8 +12,9 @@ import readline # import lshell specifics +from lshell.checkconfig import CheckConfig from lshell import utils -from lshell import builtins +from lshell import builtincmd from lshell import sec @@ -100,14 +87,14 @@ def __getattr__(self, attr): # in case the configuration file has been modified, reload it if self.conf["config_mtime"] != os.path.getmtime(self.conf["configfile"]): - from lshell.checkconfig import CheckConfig - self.conf = CheckConfig( ["--config", self.conf["configfile"]], refresh=1 ).returnconf() self.conf["promptprint"] = utils.updateprompt(os.getcwd(), self.conf) self.log = self.conf["logpath"] + self.check_scp_sftp() + if self.g_cmd in ["quit", "exit", "EOF"]: self.do_exit() @@ -148,7 +135,7 @@ def __getattr__(self, attr): p = re.compile(r"(\s|^)(\$\?)(\s|$)") self.g_line = p.sub(rf" {self.retcode} \3", self.g_line) - if type(self.conf["aliases"]) is dict: + if isinstance(self.conf["aliases"], dict): self.g_line = utils.get_aliases(self.g_line, self.conf["aliases"]) self.log.info(f'CMD: "{self.g_line}"') @@ -164,7 +151,7 @@ def __getattr__(self, attr): directory = directory.split("cd", 1)[1].strip() # change directory then, if success, execute the rest of # the cmd line - self.retcode, self.conf = builtins.cd(directory, self.conf) + self.retcode, self.conf = builtincmd.cd(directory, self.conf) if self.retcode == 0: cmd_split = re.split(r";|&&|&|\|\||\|", command) @@ -173,27 +160,27 @@ def __getattr__(self, attr): else: # set directory to command line argument and change dir directory = self.g_arg - self.retcode, self.conf = builtins.cd(directory, self.conf) + self.retcode, self.conf = builtincmd.cd(directory, self.conf) # built-in lpath function: list all allowed path elif self.g_cmd == "lpath": - self.retcode = builtins.lpath(self.conf) + self.retcode = builtincmd.lpath(self.conf) # built-in lsudo function: list all allowed sudo commands elif self.g_cmd == "lsudo": - self.retcode = builtins.lsudo(self.conf) + self.retcode = builtincmd.lsudo(self.conf) # built-in history function: print command history elif self.g_cmd == "history": - self.retcode = builtins.history(self.conf, self.log) + self.retcode = builtincmd.history(self.conf, self.log) # built-in export function elif self.g_cmd == "export": - self.retcode, var = builtins.export(self.g_line) + self.retcode, var = builtincmd.export(self.g_line) if self.retcode == 1: self.log.critical(f"** forbidden environment variable '{var}'") # case 'cd' is in an alias e.g. {'toto':'cd /var/tmp'} elif self.g_line[0:2] == "cd": self.g_cmd = self.g_line.split()[0] directory = " ".join(self.g_line.split()[1:]) - self.retcode, self.conf = builtins.cd(directory, self.conf) + self.retcode, self.conf = builtincmd.cd(directory, self.conf) else: self.retcode = utils.cmd_parse_execute(self.g_line, self) @@ -206,6 +193,115 @@ def __getattr__(self, attr): self.mytimer(self.conf["timer"]) return object.__getattribute__(self, attr) + def check_scp_sftp(self): + """This method checks if the user is trying to SCP a file onto the + server. If this is the case, it checks if the user is allowed to use + SCP or not, and acts as requested. : ) + """ + + if "ssh" in self.conf: + if "SSH_CLIENT" in os.environ and "SSH_TTY" not in os.environ: + # check if sftp is requested and allowed + if "sftp-server" in self.conf["ssh"]: + if self.conf["sftp"] == 1: + self.log.error("SFTP connect") + retcode = utils.cmd_parse_execute(self.conf["ssh"]) + self.log.error("SFTP disconnect") + sys.exit(retcode) + else: + self.log.error("*** forbidden SFTP connection") + sys.exit(1) + + # initialize cli session + cli = ShellCmd(self.conf, None, None, None, None, self.conf["ssh"]) + ret_check_path, self.conf = sec.check_path( + self.conf["ssh"], self.conf, ssh=1 + ) + if ret_check_path == 1: + self.ssh_warn("path over SSH", self.conf["ssh"]) + + # check if scp is requested and allowed + if self.conf["ssh"].startswith("scp "): + if self.conf["scp"] == 1 or "scp" in self.conf["overssh"]: + if " -f " in self.conf["ssh"]: + # case scp download is allowed + if self.conf["scp_download"]: + self.log.error(f'SCP: GET "{self.conf["ssh"]}"') + # case scp download is forbidden + else: + self.log.error( + f'SCP: download forbidden: "{self.conf["ssh"]}"' + ) + sys.exit(1) + elif " -t " in self.conf["ssh"]: + # case scp upload is allowed + if self.conf["scp_upload"]: + if "scpforce" in self.conf: + cmdsplit = self.conf["ssh"].split(" ") + scppath = os.path.realpath(cmdsplit[-1]) + forcedpath = os.path.realpath(self.conf["scpforce"]) + if scppath != forcedpath: + self.log.error( + f"SCP: forced SCP directory: {scppath}" + ) + cmdsplit.pop(-1) + cmdsplit.append(forcedpath) + self.conf["ssh"] = " ".join(cmdsplit) + self.log.error(f'SCP: PUT "{self.conf["ssh"]}"') + # case scp upload is forbidden + else: + self.log.error( + f'SCP: upload forbidden: "{self.conf["ssh"]}"' + ) + sys.exit(1) + retcode = utils.cmd_parse_execute(self.conf["ssh"], cli) + self.log.error("SCP disconnect") + sys.exit(retcode) + else: + self.ssh_warn("SCP connection", self.conf["ssh"], "scp") + + # check if command is in allowed overssh commands + elif self.conf["ssh"]: + # replace aliases + self.conf["ssh"] = utils.get_aliases( + self.conf["ssh"], self.conf["aliases"] + ) + # if command is not "secure", exit + ret_check_secure, self.conf = sec.check_secure( + self.conf["ssh"], self.conf, strict=1, ssh=1 + ) + if ret_check_secure: + self.ssh_warn("char/command over SSH", self.conf["ssh"]) + # else + self.log.error(f'Over SSH: "{self.conf["ssh"]}"') + # if command is "help" + if self.conf["ssh"] == "help": + cli.do_help(None) + retcode = 0 + else: + retcode = utils.cmd_parse_execute(self.conf["ssh"], cli) + self.log.error("Exited") + sys.exit(retcode) + + # else warn and log + else: + self.ssh_warn("command over SSH", self.conf["ssh"]) + + else: + # case of shell escapes + self.ssh_warn("shell escape", self.conf["ssh"]) + + def ssh_warn(self, message, command="", key=""): + """log and warn if forbidden action over SSH""" + if key == "scp": + self.log.critical(f"*** forbidden {message}") + self.log.error(f"*** SCP command: {command}") + else: + self.log.critical(f'*** forbidden {message}: "{command}"') + sys.stderr.write("This incident has been reported.\n") + self.log.error("Exited") + sys.exit(1) + def run_script_mode(self, script): """Process commands from a script.""" with open(script, "r") as script_file: @@ -268,7 +364,7 @@ def cmdloop(self, intro=None): self.stdout.write(self.conf["promptprint"]) self.stdout.flush() line = self.stdin.readline() - if not len(line): + if not line: line = "EOF" else: # chop \n @@ -315,7 +411,7 @@ def complete(self, text, state): ): compfunc = self.completechdir elif begidx > 0: - cmd, args, foo = self.parseline(line) + cmd, args, _ = self.parseline(line) if cmd == "": compfunc = self.completedefault else: diff --git a/lshell/utils.py b/lshell/utils.py index ab340e5..3848b40 100644 --- a/lshell/utils.py +++ b/lshell/utils.py @@ -1,38 +1,22 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" Utils for lshell """ import re import subprocess import os import sys +import random +import string from getpass import getuser # import lshell specifics from lshell import variables -from lshell import builtins +from lshell import builtincmd -def usage(): +def usage(exitcode=1): """Prints the usage""" - sys.stderr.write(variables.usage) - sys.exit(0) + sys.stderr.write(variables.USAGE) + sys.exit(exitcode) def version(): @@ -43,9 +27,6 @@ def version(): def random_string(length): """generate a random string""" - import random - import string - randstring = "" for char in range(length): char = random.choice(string.ascii_letters + string.digits) @@ -90,8 +71,7 @@ def cmd_parse_execute(command_line, shell_context=None): # Split command line by shell grammar: '&&', '||', and ';;' cmd_split = re.split(r"(;;|&&|\|\|)", command_line) - # Initialize a variable to track whether the previous command succeeded or failed - previous_retcode = 0 + retcode = 0 # Initialize return code # Iterate over commands and operators for i in range(0, len(cmd_split), 2): @@ -99,9 +79,9 @@ def cmd_parse_execute(command_line, shell_context=None): operator = cmd_split[i - 1].strip() if i > 0 else None # Only execute commands based on the previous operator and return code - if operator == "&&" and previous_retcode != 0: + if operator == "&&" and retcode != 0: continue - elif operator == "||" and previous_retcode == 0: + elif operator == "||" and retcode == 0: continue # Get the executable command @@ -117,17 +97,14 @@ def cmd_parse_execute(command_line, shell_context=None): elif executable == "exit": shell_context.do_exit(command) elif executable == "history": - builtins.history(shell_context.conf, shell_context.log) + builtincmd.history(shell_context.conf, shell_context.log) elif executable == "cd": - retcode = builtins.cd(argument, shell_context.conf) + retcode = builtincmd.cd(argument, shell_context.conf) else: - retcode = getattr(builtins, executable)(shell_context.conf) + retcode = getattr(builtincmd, executable)(shell_context.conf) else: retcode = exec_cmd(command) - # Update the previous return code - previous_retcode = retcode - return retcode diff --git a/lshell/variables.py b/lshell/variables.py index 804bb82..8645b42 100644 --- a/lshell/variables.py +++ b/lshell/variables.py @@ -1,22 +1,4 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" This file contains all the variables used in lshell """ import sys import os @@ -33,20 +15,20 @@ lshell_path = os.path.abspath(__file__).split("/lib/")[0] if sys.exec_prefix != "/usr": # for *BSD - conf_prefix = sys.exec_prefix + CONF_PREFIX = sys.exec_prefix elif lshell_path == "/home/ghantoos/.local": # for *Linux user install - conf_prefix = lshell_path + CONF_PREFIX = lshell_path else: # for *Linux system-wide install - conf_prefix = "" -configfile = conf_prefix + "/etc/lshell.conf" + CONF_PREFIX = "" +configfile = CONF_PREFIX + "/etc/lshell.conf" # history file -history_file = ".lhistory" +HISTORY_FILE = ".lhistory" # help text -usage = f"""Usage: lshell [OPTIONS] +USAGE = f"""Usage: lshell [OPTIONS] --config : Config file location (default {configfile}) -- : where is *any* config file parameter -h, --help : Show this help message @@ -54,8 +36,9 @@ """ # Intro Text -intro = """You are in a limited shell. +INTRO = """You are in a limited shell. Type '?' or 'help' to get the list of allowed commands""" + # configuration parameters configparams = [ "config=", diff --git a/setup.py b/setup.py index 348d4ef..2054660 100755 --- a/setup.py +++ b/setup.py @@ -1,27 +1,9 @@ -# -# Limited command Shell (lshell) -# -# Copyright (C) 2008-2024 Ignace Mouzannar -# -# This file is part of lshell -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . +""" Setup script for lshell """ -from setuptools import setup, find_packages import os -from setuptools.command.install import install import shutil +from setuptools import setup, find_packages +from setuptools.command.install import install # import lshell specifics from lshell.variables import __version__ diff --git a/test/__init__.py b/test/__init__.py index a5a1bac..00dc735 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -1,3 +1,5 @@ +""" This module is used to run all the tests in the test directory. """ + from test import test_functional, test_unit if __name__ == "__main__": diff --git a/test/test_functional.py b/test/test_functional.py index 5c0fd32..5ea7253 100644 --- a/test/test_functional.py +++ b/test/test_functional.py @@ -1,10 +1,12 @@ -import unittest -import pexpect +"""Functional tests for lshell""" + import os +import unittest import subprocess from getpass import getuser import tempfile import shutil +import pexpect # import lshell specifics from lshell import utils @@ -13,6 +15,7 @@ class TestFunctions(unittest.TestCase): + """Functional tests for lshell""" user = getuser() @@ -43,7 +46,7 @@ def test_02_builtin_ls_command(self): cout = p.stdout expected = cout.read(-1) self.child.sendline("ls") - self.child.expect("%s:~\\$" % self.user) + self.child.expect(f"{self.user}:~\\$") output = self.child.before.decode("utf8").split("ls\r", 1)[1] self.assertEqual(len(expected.strip().split()), len(output.strip().split())) @@ -361,7 +364,7 @@ def test_24_cd_and_command(self): result = self.child.before.decode("utf8").split("\n")[1].strip() self.assertEqual(expected, result) - def test_25_KeyboardInterrupt(self): + def test_25_keyboard_interrupt(self): """F25 | test cat(1) with KeyboardInterrupt, should not exit""" self.child = pexpect.spawn( f"{TOPDIR}/bin/lshell " @@ -381,6 +384,8 @@ def test_25_KeyboardInterrupt(self): expected = "foo" elif result.startswith("^C"): expected = "^C" + else: + expected = "unknown" except IndexError: # outputs u' ^C' on Debian expected = "^C" @@ -490,7 +495,10 @@ def test_31_security_echo_freedom_and_help(self): self.assertEqual(expected_help_output, help_output) # Step 2: Enter `echo FREEDOM! && help () sh && help` - expected_output = "FREEDOM!\r\ncd clear echo exit help history ll lpath ls lsudo\r\ncd clear echo exit help history ll lpath ls lsudo" + expected_output = ( + "FREEDOM!\r\ncd clear echo exit help history ll lpath ls lsudo\r\n" + "cd clear echo exit help history ll lpath ls lsudo" + ) self.child.sendline("echo FREEDOM! && help () sh && help") self.child.expect(f"{self.user}:~\\$") @@ -588,14 +596,19 @@ def test_36_env_vars_file_not_found(self): # Inject the environment variable file path self.child = pexpect.spawn( - f"{TOPDIR}/bin/lshell --config {TOPDIR}/etc/lshell.conf --env_vars_files \"['{missing_file_path}']\"" + f"{TOPDIR}/bin/lshell --config {TOPDIR}/etc/lshell.conf " + f"--env_vars_files \"['{missing_file_path}']\"" ) # Expect the prompt after shell startup self.child.expect(f"{self.user}:~\\$") # Simulate what happens when the environment variable file is missing - expected = f"ERROR: Unable to read environment file: {missing_file_path}\r\nYou are in a limited shell.\r\nType '?' or 'help' to get the list of allowed commands\r\n" + expected = ( + f"ERROR: Unable to read environment file: {missing_file_path}\r\n" + "You are in a limited shell.\r\n" + "Type '?' or 'help' to get the list of allowed commands\r\n" + ) # Check the error message in the output self.assertIn(expected, self.child.before.decode("utf8")) @@ -613,7 +626,8 @@ def test_37_load_env_vars_from_file(self): # Set the temp env file path in the config self.child = pexpect.spawn( - f"{TOPDIR}/bin/lshell --config {TOPDIR}/etc/lshell.conf --env_vars_files \"['{temp_env_file_path}']\"" + f"{TOPDIR}/bin/lshell --config {TOPDIR}/etc/lshell.conf " + f"--env_vars_files \"['{temp_env_file_path}']\"" ) self.child.expect(f"{self.user}:~\\$") @@ -668,9 +682,6 @@ def test_38_script_execution_with_template(self): cd clear echo exit help history ll lpath ls lsudo\r *** forbidden path: /""" - # Set maxDiff to None to see the full diff - self.maxDiff = None - # Wait for the script to finish executing self.child.expect(pexpect.EOF) @@ -727,9 +738,6 @@ def test_39_script_execution_with_template_strict(self): *** forbidden path -> "/"\r *** Kicked out""" - # Step 4: Set maxDiff to None to see the full diff - self.maxDiff = None - # Wait for the script to finish executing self.child.expect(pexpect.EOF) diff --git a/test/test_unit.py b/test/test_unit.py index c38b8c7..e3c1611 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -1,5 +1,7 @@ -import unittest +""" Unit tests for lshell """ + import os +import unittest from getpass import getuser # import lshell specifics @@ -7,70 +9,73 @@ from lshell.checkconfig import CheckConfig from lshell.utils import get_aliases, updateprompt from lshell.variables import builtins_list -from lshell import builtins +from lshell import builtincmd from lshell import sec TOPDIR = f"{os.path.dirname(os.path.realpath(__file__))}/../" class TestFunctions(unittest.TestCase): + """Unit tests for lshell""" + args = [f"--config={TOPDIR}/etc/lshell.conf", "--quiet=1"] userconf = CheckConfig(args).returnconf() - shell = ShellCmd(userconf, args) def test_03_checksecure_doublepipe(self): """U03 | double pipes should be allowed, even if pipe is forbidden""" args = self.args + ["--forbidden=['|']"] userconf = CheckConfig(args).returnconf() - INPUT = "ls || ls" - return self.assertEqual(sec.check_secure(INPUT, userconf)[0], 0) + input_command = "ls || ls" + return self.assertEqual(sec.check_secure(input_command, userconf)[0], 0) def test_04_checksecure_forbiddenpipe(self): """U04 | forbid pipe, should return 1""" args = self.args + ["--forbidden=['|']"] userconf = CheckConfig(args).returnconf() - INPUT = "ls | ls" - return self.assertEqual(sec.check_secure(INPUT, userconf)[0], 1) + input_command = "ls | ls" + return self.assertEqual(sec.check_secure(input_command, userconf)[0], 1) def test_05_checksecure_forbiddenchar(self): """U05 | forbid character, should return 1""" args = self.args + ["--forbidden=['l']"] userconf = CheckConfig(args).returnconf() - INPUT = "ls" - return self.assertEqual(sec.check_secure(INPUT, userconf)[0], 1) + input_command = "ls" + return self.assertEqual(sec.check_secure(input_command, userconf)[0], 1) def test_06_checksecure_sudo_command(self): """U06 | quoted text should not be forbidden""" - INPUT = "sudo ls" - return self.assertEqual(sec.check_secure(INPUT, self.userconf)[0], 1) + input_command = "sudo ls" + return self.assertEqual(sec.check_secure(input_command, self.userconf)[0], 1) def test_07_checksecure_notallowed_command(self): """U07 | forbidden command, should return 1""" args = self.args + ["--allowed=['ls']"] userconf = CheckConfig(args).returnconf() - INPUT = "ll" - return self.assertEqual(sec.check_secure(INPUT, userconf)[0], 1) + input_command = "ll" + return self.assertEqual(sec.check_secure(input_command, userconf)[0], 1) def test_08_checkpath_notallowed_path(self): """U08 | forbidden command, should return 1""" args = self.args + ["--path=['/home', '/var']"] userconf = CheckConfig(args).returnconf() - INPUT = "cd /tmp" - return self.assertEqual(sec.check_path(INPUT, userconf)[0], 1) + input_command = "cd /tmp" + return self.assertEqual(sec.check_path(input_command, userconf)[0], 1) def test_09_checkpath_notallowed_path_completion(self): """U09 | forbidden command, should return 1""" args = self.args + ["--path=['/home', '/var']"] userconf = CheckConfig(args).returnconf() - INPUT = "cd /tmp/" - return self.assertEqual(sec.check_path(INPUT, userconf, completion=1)[0], 1) + input_command = "cd /tmp/" + return self.assertEqual( + sec.check_path(input_command, userconf, completion=1)[0], 1 + ) def test_10_checkpath_dollarparenthesis(self): """U10 | when $() is allowed, return 0 if path allowed""" args = self.args + ["--forbidden=[';', '&', '|','`','>','<', '${']"] userconf = CheckConfig(args).returnconf() - INPUT = "echo $(echo aze)" - return self.assertEqual(sec.check_path(INPUT, userconf)[0], 0) + input_command = "echo $(echo aze)" + return self.assertEqual(sec.check_path(input_command, userconf)[0], 0) def test_11_checkconfig_configoverwrite(self): """U12 | forbid ';', then check_secure should return 1""" @@ -81,20 +86,22 @@ def test_11_checkconfig_configoverwrite(self): def test_12_overssh(self): """U12 | command over ssh""" args = self.args + ["--overssh=['exit']", "-c exit"] + userconf = CheckConfig(args).returnconf() + shell = ShellCmd(userconf, args) os.environ["SSH_CLIENT"] = "8.8.8.8 36000 22" if "SSH_TTY" in os.environ: os.environ.pop("SSH_TTY") with self.assertRaises(SystemExit) as cm: - CheckConfig(args).returnconf() + shell.check_scp_sftp() return self.assertEqual(cm.exception.code, 0) def test_13_multiple_aliases_with_separator(self): """U13 | multiple aliases using &&, || and ; separators""" # enable &, | and ; characters aliases = {"foo": "foo -l", "bar": "open"} - INPUT = "foo; fooo ;bar&&foo && foo | bar||bar || foo" + input_command = "foo; fooo ;bar&&foo && foo | bar||bar || foo" return self.assertEqual( - get_aliases(INPUT, aliases), + get_aliases(input_command, aliases), " foo -l; fooo ; open&& foo -l " "&& foo -l | open|| open || foo -l", ) @@ -138,16 +145,16 @@ def test_17_allowed_exec_cmd(self): def test_18_forbidden_environment(self): """U18 | unsafe environment are forbidden""" - INPUT = "export LD_PRELOAD=/lib64/ld-2.21.so" - args = INPUT - retcode = builtins.export(args)[0] + input_command = "export LD_PRELOAD=/lib64/ld-2.21.so" + args = input_command + retcode = builtincmd.export(args)[0] return self.assertEqual(retcode, 1) def test_19_allowed_environment(self): """U19 | other environment are accepted""" - INPUT = "export MY_PROJECT_VERSION=43" - args = INPUT - retcode = builtins.export(args)[0] + input_command = "export MY_PROJECT_VERSION=43" + args = input_command + retcode = builtincmd.export(args)[0] return self.assertEqual(retcode, 0) def test_20_winscp_allowed_commands(self): @@ -210,15 +217,15 @@ def test_25_disable_ld_preload(self): def test_26_checksecure_quoted_command(self): """U26 | quoted command should be parsed""" - INPUT = 'echo 1 && "bash"' - return self.assertEqual(sec.check_secure(INPUT, self.userconf)[0], 1) + input_command = 'echo 1 && "bash"' + return self.assertEqual(sec.check_secure(input_command, self.userconf)[0], 1) def test_27_checksecure_quoted_command(self): """U27 | quoted command should be parsed""" - INPUT = '"bash" && echo 1' - return self.assertEqual(sec.check_secure(INPUT, self.userconf)[0], 1) + input_command = '"bash" && echo 1' + return self.assertEqual(sec.check_secure(input_command, self.userconf)[0], 1) def test_28_checksecure_quoted_command(self): """U28 | quoted command should be parsed""" - INPUT = "echo'/1.sh'" - return self.assertEqual(sec.check_secure(INPUT, self.userconf)[0], 1) + input_command = "echo'/1.sh'" + return self.assertEqual(sec.check_secure(input_command, self.userconf)[0], 1)