From 0a8d82a7a1f33a255814d7d5c1c29f01cdec3c75 Mon Sep 17 00:00:00 2001 From: Noah Watson <107630091+nwatson22@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:55:41 -0500 Subject: [PATCH] Options improvements retry (#967) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes from: - https://github.com/runtimeverification/pyk/pull/916 - https://github.com/runtimeverification/pyk/pull/955 --------- Co-authored-by: devops Co-authored-by: Tamás Tóth --- pyk/docs/conf.py | 4 +- pyk/package/version | 2 +- pyk/pyproject.toml | 2 +- pyk/src/pyk/__main__.py | 351 +++---------------------------- pyk/src/pyk/cli/args.py | 345 +++++++++++++++++++++--------- pyk/src/pyk/cli/cli.py | 110 ++++++++++ pyk/src/pyk/cli/pyk.py | 381 ++++++++++++++++++++++++++++++++++ pyk/src/pyk/cli/utils.py | 3 +- pyk/src/pyk/kdist/__main__.py | 207 ++++++++++-------- 9 files changed, 895 insertions(+), 510 deletions(-) create mode 100644 pyk/src/pyk/cli/cli.py create mode 100644 pyk/src/pyk/cli/pyk.py diff --git a/pyk/docs/conf.py b/pyk/docs/conf.py index 4013f7a9593..4ed3a5302aa 100644 --- a/pyk/docs/conf.py +++ b/pyk/docs/conf.py @@ -9,8 +9,8 @@ project = 'pyk' author = 'Runtime Verification, Inc' copyright = '2024, Runtime Verification, Inc' -version = '0.1.695' -release = '0.1.695' +version = '0.1.696' +release = '0.1.696' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/pyk/package/version b/pyk/package/version index 8cfa54c4d4e..67c686e0c37 100644 --- a/pyk/package/version +++ b/pyk/package/version @@ -1 +1 @@ -0.1.695 +0.1.696 diff --git a/pyk/pyproject.toml b/pyk/pyproject.toml index cd80f4617a4..0f743e7962d 100644 --- a/pyk/pyproject.toml +++ b/pyk/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyk" -version = "0.1.695" +version = "0.1.696" description = "" authors = [ "Runtime Verification, Inc. ", diff --git a/pyk/src/pyk/__main__.py b/pyk/src/pyk/__main__.py index 96bcd9572c8..01be9517539 100644 --- a/pyk/src/pyk/__main__.py +++ b/pyk/src/pyk/__main__.py @@ -1,345 +1,52 @@ from __future__ import annotations -import json import logging import sys -from argparse import ArgumentParser, FileType -from enum import Enum -from pathlib import Path from typing import TYPE_CHECKING -from graphviz import Digraph - -from pyk.kast.inner import KInner -from pyk.kore.rpc import ExecuteResult - -from .cli.args import KCLIArgs -from .cli.utils import LOG_FORMAT, dir_path, loglevel -from .coverage import get_rule_by_id, strip_coverage_logger -from .cterm import CTerm -from .kast.manip import ( - flatten_label, - minimize_rule, - minimize_term, - propagate_up_constraints, - remove_source_map, - split_config_and_constraints, +from .cli.args import LoggingOptions +from .cli.cli import CLI +from .cli.pyk import ( + CoverageCommand, + GraphImportsCommand, + JsonToKoreCommand, + KoreToJsonCommand, + PrintCommand, + ProveLegacyCommand, + RPCKastCommand, + RPCPrintCommand, ) -from .kast.outer import read_kast_definition -from .kast.pretty import PrettyPrinter -from .kore.parser import KoreParser -from .kore.rpc import StopReason -from .kore.syntax import Pattern, kore_term -from .ktool.kprint import KPrint -from .ktool.kprove import KProve -from .prelude.k import GENERATED_TOP_CELL -from .prelude.ml import is_top, mlAnd, mlOr +from .cli.utils import LOG_FORMAT, loglevel if TYPE_CHECKING: - from argparse import Namespace - from typing import Any, Final + from typing import Final _LOGGER: Final = logging.getLogger(__name__) -class PrintInput(Enum): - KORE_JSON = 'kore-json' - KAST_JSON = 'kast-json' - - def main() -> None: # KAST terms can end up nested quite deeply, because of the various assoc operators (eg. _Map_, _Set_, ...). # Most pyk operations are defined recursively, meaning you get a callstack the same depth as the term. # This change makes it so that in most cases, by default, pyk doesn't run out of stack space. sys.setrecursionlimit(10**7) - cli_parser = create_argument_parser() - args = cli_parser.parse_args() - - logging.basicConfig(level=loglevel(args), format=LOG_FORMAT) - - executor_name = 'exec_' + args.command.lower().replace('-', '_') - if executor_name not in globals(): - raise AssertionError(f'Unimplemented command: {args.command}') - - execute = globals()[executor_name] - execute(args) - - -def exec_print(args: Namespace) -> None: - kompiled_dir: Path = args.definition_dir - printer = KPrint(kompiled_dir) - if args.input == PrintInput.KORE_JSON: - _LOGGER.info(f'Reading Kore JSON from file: {args.term.name}') - kore = Pattern.from_json(args.term.read()) - term = printer.kore_to_kast(kore) - else: - _LOGGER.info(f'Reading Kast JSON from file: {args.term.name}') - term = KInner.from_json(args.term.read()) - if is_top(term): - args.output_file.write(printer.pretty_print(term)) - _LOGGER.info(f'Wrote file: {args.output_file.name}') - else: - if args.minimize: - if args.omit_labels != '' and args.keep_cells != '': - raise ValueError('You cannot use both --omit-labels and --keep-cells.') - - abstract_labels = args.omit_labels.split(',') if args.omit_labels != '' else [] - keep_cells = args.keep_cells.split(',') if args.keep_cells != '' else [] - minimized_disjuncts = [] - - for disjunct in flatten_label('#Or', term): - try: - minimized = minimize_term(disjunct, abstract_labels=abstract_labels, keep_cells=keep_cells) - config, constraint = split_config_and_constraints(minimized) - except ValueError as err: - raise ValueError('The minimized term does not contain a config cell.') from err - - if not is_top(constraint): - minimized_disjuncts.append(mlAnd([config, constraint], sort=GENERATED_TOP_CELL)) - else: - minimized_disjuncts.append(config) - term = propagate_up_constraints(mlOr(minimized_disjuncts, sort=GENERATED_TOP_CELL)) - - args.output_file.write(printer.pretty_print(term)) - _LOGGER.info(f'Wrote file: {args.output_file.name}') - - -def exec_rpc_print(args: Namespace) -> None: - kompiled_dir: Path = args.definition_dir - printer = KPrint(kompiled_dir) - input_dict = json.loads(args.input_file.read()) - output_buffer = [] - - def pretty_print_request(request_params: dict[str, Any]) -> list[str]: - output_buffer = [] - non_state_keys = set(request_params.keys()).difference(['state']) - for key in non_state_keys: - output_buffer.append(f'{key}: {request_params[key]}') - state = CTerm.from_kast(printer.kore_to_kast(kore_term(request_params['state']))) - output_buffer.append('State:') - output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) - return output_buffer - - def pretty_print_execute_response(execute_result: ExecuteResult) -> list[str]: - output_buffer = [] - output_buffer.append(f'Depth: {execute_result.depth}') - output_buffer.append(f'Stop reason: {execute_result.reason.value}') - if execute_result.reason == StopReason.TERMINAL_RULE or execute_result.reason == StopReason.CUT_POINT_RULE: - output_buffer.append(f'Stop rule: {execute_result.rule}') - output_buffer.append( - f'Number of next states: {len(execute_result.next_states) if execute_result.next_states is not None else 0}' - ) - state = CTerm.from_kast(printer.kore_to_kast(execute_result.state.kore)) - output_buffer.append('State:') - output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) - if execute_result.next_states is not None: - next_states = [CTerm.from_kast(printer.kore_to_kast(s.kore)) for s in execute_result.next_states] - for i, s in enumerate(next_states): - output_buffer.append(f'Next state #{i}:') - output_buffer.append(printer.pretty_print(s.kast, sort_collections=True)) - return output_buffer - - try: - if 'method' in input_dict: - output_buffer.append('JSON RPC request') - output_buffer.append(f'id: {input_dict["id"]}') - output_buffer.append(f'Method: {input_dict["method"]}') - try: - if 'state' in input_dict['params']: - output_buffer += pretty_print_request(input_dict['params']) - else: # this is an "add-module" request, skip trying to print state - for key in input_dict['params'].keys(): - output_buffer.append(f'{key}: {input_dict["params"][key]}') - except KeyError as e: - _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') - exit(1) - else: - if not 'result' in input_dict: - _LOGGER.critical('The input is neither a request not a resonse') - exit(1) - output_buffer.append('JSON RPC Response') - output_buffer.append(f'id: {input_dict["id"]}') - if list(input_dict['result'].keys()) == ['state']: # this is a "simplify" response - output_buffer.append('Method: simplify') - state = CTerm.from_kast(printer.kore_to_kast(kore_term(input_dict['result']['state']))) - output_buffer.append('State:') - output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) - elif list(input_dict['result'].keys()) == ['module']: # this is an "add-module" response - output_buffer.append('Method: add-module') - output_buffer.append('Module:') - output_buffer.append(input_dict['result']['module']) - else: - try: # assume it is an "execute" response - output_buffer.append('Method: execute') - execute_result = ExecuteResult.from_dict(input_dict['result']) - output_buffer += pretty_print_execute_response(execute_result) - except KeyError as e: - _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') - exit(1) - if args.output_file is not None: - args.output_file.write('\n'.join(output_buffer)) - else: - print('\n'.join(output_buffer)) - except ValueError as e: - # shorten and print the error message in case kore_to_kast throws ValueError - _LOGGER.critical(str(e)[:200]) - exit(1) - - -def exec_rpc_kast(args: Namespace) -> None: - """ - Convert an 'execute' JSON RPC response to a new 'execute' or 'simplify' request, - copying parameters from a reference request. - """ - reference_request = json.loads(args.reference_request_file.read()) - input_dict = json.loads(args.response_file.read()) - execute_result = ExecuteResult.from_dict(input_dict['result']) - non_state_keys = set(reference_request['params'].keys()).difference(['state']) - request_params = {} - for key in non_state_keys: - request_params[key] = reference_request['params'][key] - request_params['state'] = {'format': 'KORE', 'version': 1, 'term': execute_result.state.kore.dict} - request = { - 'jsonrpc': reference_request['jsonrpc'], - 'id': reference_request['id'], - 'method': reference_request['method'], - 'params': request_params, - } - args.output_file.write(json.dumps(request)) - - -def exec_prove_legacy(args: Namespace) -> None: - kompiled_dir: Path = args.definition_dir - kprover = KProve(kompiled_dir, args.main_file) - final_state = kprover.prove(Path(args.spec_file), spec_module_name=args.spec_module, args=args.kArgs) - args.output_file.write(json.dumps(mlOr([state.kast for state in final_state]).to_dict())) - _LOGGER.info(f'Wrote file: {args.output_file.name}') - - -def exec_graph_imports(args: Namespace) -> None: - kompiled_dir: Path = args.definition_dir - kprinter = KPrint(kompiled_dir) - definition = kprinter.definition - import_graph = Digraph() - graph_file = kompiled_dir / 'import-graph' - for module in definition.modules: - module_name = module.name - import_graph.node(module_name) - for module_import in module.imports: - import_graph.edge(module_name, module_import.name) - import_graph.render(graph_file) - _LOGGER.info(f'Wrote file: {graph_file}') - - -def exec_coverage(args: Namespace) -> None: - kompiled_dir: Path = args.definition_dir - definition = remove_source_map(read_kast_definition(kompiled_dir / 'compiled.json')) - pretty_printer = PrettyPrinter(definition) - for rid in args.coverage_file: - rule = minimize_rule(strip_coverage_logger(get_rule_by_id(definition, rid.strip()))) - args.output.write('\n\n') - args.output.write('Rule: ' + rid.strip()) - args.output.write('\nUnparsed:\n') - args.output.write(pretty_printer.print(rule)) - _LOGGER.info(f'Wrote file: {args.output.name}') - - -def exec_kore_to_json(args: Namespace) -> None: - text = sys.stdin.read() - kore = KoreParser(text).pattern() - print(kore.json) - - -def exec_json_to_kore(args: dict[str, Any]) -> None: - text = sys.stdin.read() - kore = Pattern.from_json(text) - kore.write(sys.stdout) - sys.stdout.write('\n') - - -def create_argument_parser() -> ArgumentParser: - k_cli_args = KCLIArgs() - - definition_args = ArgumentParser(add_help=False) - definition_args.add_argument('definition_dir', type=dir_path, help='Path to definition directory.') - - pyk_args = ArgumentParser() - pyk_args_command = pyk_args.add_subparsers(dest='command', required=True) - - print_args = pyk_args_command.add_parser( - 'print', - help='Pretty print a term.', - parents=[k_cli_args.logging_args, definition_args, k_cli_args.display_args], - ) - print_args.add_argument('term', type=FileType('r'), help='Input term (in format specified with --input).') - print_args.add_argument('--input', default=PrintInput.KAST_JSON, type=PrintInput, choices=list(PrintInput)) - print_args.add_argument('--omit-labels', default='', nargs='?', help='List of labels to omit from output.') - print_args.add_argument( - '--keep-cells', default='', nargs='?', help='List of cells with primitive values to keep in output.' - ) - print_args.add_argument('--output-file', type=FileType('w'), default='-') - - rpc_print_args = pyk_args_command.add_parser( - 'rpc-print', - help='Pretty-print an RPC request/response', - parents=[k_cli_args.logging_args, definition_args], - ) - rpc_print_args.add_argument( - 'input_file', - type=FileType('r'), - help='An input file containing the JSON RPC request or response with KoreJSON payload.', - ) - rpc_print_args.add_argument('--output-file', type=FileType('w'), default='-') - - rpc_kast_args = pyk_args_command.add_parser( - 'rpc-kast', - help='Convert an "execute" JSON RPC response to a new "execute" or "simplify" request, copying parameters from a reference request.', - parents=[k_cli_args.logging_args], - ) - rpc_kast_args.add_argument( - 'reference_request_file', - type=FileType('r'), - help='An input file containing a JSON RPC request to server as a reference for the new request.', - ) - rpc_kast_args.add_argument( - 'response_file', - type=FileType('r'), - help='An input file containing a JSON RPC response with KoreJSON payload.', - ) - rpc_kast_args.add_argument('--output-file', type=FileType('w'), default='-') - - prove_legacy_args = pyk_args_command.add_parser( - 'prove-legacy', - help='Prove an input specification (using kprovex).', - parents=[k_cli_args.logging_args, definition_args], - ) - prove_legacy_args.add_argument('main_file', type=str, help='Main file used for kompilation.') - prove_legacy_args.add_argument('spec_file', type=str, help='File with the specification module.') - prove_legacy_args.add_argument('spec_module', type=str, help='Module with claims to be proven.') - prove_legacy_args.add_argument('--output-file', type=FileType('w'), default='-') - prove_legacy_args.add_argument('kArgs', nargs='*', help='Arguments to pass through to K invocation.') - - pyk_args_command.add_parser( - 'graph-imports', - help='Graph the imports of a given definition.', - parents=[k_cli_args.logging_args, definition_args], - ) - - coverage_args = pyk_args_command.add_parser( - 'coverage', - help='Convert coverage file to human readable log.', - parents=[k_cli_args.logging_args, definition_args], - ) - coverage_args.add_argument('coverage_file', type=FileType('r'), help='Coverage file to build log for.') - coverage_args.add_argument('-o', '--output', type=FileType('w'), default='-') - - pyk_args_command.add_parser('kore-to-json', help='Convert textual KORE to JSON', parents=[k_cli_args.logging_args]) - - pyk_args_command.add_parser('json-to-kore', help='Convert JSON to textual KORE', parents=[k_cli_args.logging_args]) - - return pyk_args + cli = CLI( + [ + CoverageCommand, + GraphImportsCommand, + JsonToKoreCommand, + KoreToJsonCommand, + PrintCommand, + ProveLegacyCommand, + RPCKastCommand, + RPCPrintCommand, + ] + ) + command = cli.get_command() + assert isinstance(command, LoggingOptions) + logging.basicConfig(level=loglevel(command), format=LOG_FORMAT) + command.exec() if __name__ == '__main__': diff --git a/pyk/src/pyk/cli/args.py b/pyk/src/pyk/cli/args.py index 47b633cf637..af0fc12f2a3 100644 --- a/pyk/src/pyk/cli/args.py +++ b/pyk/src/pyk/cli/args.py @@ -1,155 +1,298 @@ from __future__ import annotations -from argparse import ArgumentParser -from functools import cached_property -from typing import TYPE_CHECKING +import sys +from argparse import FileType +from typing import IO, TYPE_CHECKING, Any -from ..utils import ensure_dir_path +from pyk.utils import ensure_dir_path + +from .cli import Options from .utils import bug_report_arg, dir_path, file_path if TYPE_CHECKING: + from argparse import ArgumentParser + from pathlib import Path from typing import TypeVar + from ..utils import BugReport + T = TypeVar('T') -class KCLIArgs: - @cached_property - def logging_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument('--verbose', '-v', default=False, action='store_true', help='Verbose output.') - args.add_argument('--debug', default=False, action='store_true', help='Debug output.') - return args - - @cached_property - def parallel_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument('--workers', '-j', default=1, type=int, help='Number of processes to run in parallel.') - return args - - @cached_property - def bug_report_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument( - '--bug-report', - type=bug_report_arg, - help='Generate bug report with given name', +class LoggingOptions(Options): + debug: bool + verbose: bool + + @staticmethod + def default() -> dict[str, Any]: + return { + 'verbose': False, + 'debug': False, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('--verbose', '-v', default=None, action='store_true', help='Verbose output.') + parser.add_argument('--debug', default=None, action='store_true', help='Debug output.') + + +class OutputFileOptions(Options): + output_file: IO[Any] + + @staticmethod + def default() -> dict[str, Any]: + return { + 'output_file': sys.stdout, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('--output-file', type=FileType('w')) + + +class DefinitionOptions(Options): + definition_dir: Path + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('definition_dir', type=dir_path, help='Path to definition directory.') + + +class DisplayOptions(Options): + minimize: bool + + @staticmethod + def default() -> dict[str, Any]: + return { + 'minimize': True, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('--minimize', dest='minimize', default=None, action='store_true', help='Minimize output.') + parser.add_argument( + '--no-minimize', dest='minimize', default=None, action='store_false', help='Do not minimize output.' ) - return args - @cached_property - def kompile_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument( + +class KDefinitionOptions(Options): + includes: list[str] + main_module: str | None + syntax_module: str | None + spec_module: str | None + definition_dir: Path | None + md_selector: str + + @staticmethod + def default() -> dict[str, Any]: + return { + 'spec_module': None, + 'main_module': None, + 'syntax_module': None, + 'definition_dir': None, + 'md_selector': 'k', + 'includes': [], + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( + '-I', type=str, dest='includes', action='append', help='Directories to lookup K definitions in.' + ) + parser.add_argument('--main-module', type=str, help='Name of the main module.') + parser.add_argument('--syntax-module', type=str, help='Name of the syntax module.') + parser.add_argument('--spec-module', type=str, help='Name of the spec module.') + parser.add_argument('--definition', type=dir_path, dest='definition_dir', help='Path to definition to use.') + parser.add_argument( + '--md-selector', + type=str, + help='Code selector expression to use when reading markdown.', + ) + + +class SaveDirOptions(Options): + save_directory: Path | None + + @staticmethod + def default() -> dict[str, Any]: + return { + 'save_directory': None, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('--save-directory', type=ensure_dir_path, help='Path to where CFGs are stored.') + + +class SpecOptions(SaveDirOptions): + spec_file: Path + claim_labels: list[str] | None + exclude_claim_labels: list[str] + + @staticmethod + def default() -> dict[str, Any]: + return { + 'claim_labels': None, + 'exclude_claim_labels': [], + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('spec_file', type=file_path, help='Path to spec file.') + parser.add_argument( + '--claim', + type=str, + dest='claim_labels', + action='append', + help='Only prove listed claims, MODULE_NAME.claim-id', + ) + parser.add_argument( + '--exclude-claim', + type=str, + dest='exclude_claim_labels', + action='append', + help='Skip listed claims, MODULE_NAME.claim-id', + ) + + +class KompileOptions(Options): + emit_json: bool + ccopts: list[str] + llvm_kompile: bool + llvm_library: bool + enable_llvm_debug: bool + read_only: bool + o0: bool + o1: bool + o2: bool + o3: bool + + @staticmethod + def default() -> dict[str, Any]: + return { + 'emit_json': True, + 'llvm_kompile': False, + 'llvm_library': False, + 'enable_llvm_debug': False, + 'read_only': False, + 'o0': False, + 'o1': False, + 'o2': False, + 'o3': False, + 'ccopts': [], + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( '--emit-json', dest='emit_json', - default=True, + default=None, action='store_true', help='Emit JSON definition after compilation.', ) - args.add_argument( - '--no-emit-json', dest='emit_json', action='store_false', help='Do not JSON definition after compilation.' + parser.add_argument( + '--no-emit-json', + dest='emit_json', + default=None, + action='store_false', + help='Do not JSON definition after compilation.', ) - args.add_argument( + parser.add_argument( '-ccopt', dest='ccopts', - default=[], action='append', help='Additional arguments to pass to llvm-kompile.', ) - args.add_argument( + parser.add_argument( '--no-llvm-kompile', dest='llvm_kompile', - default=True, + default=None, action='store_false', help='Do not run llvm-kompile process.', ) - args.add_argument( + parser.add_argument( '--with-llvm-library', dest='llvm_library', - default=False, + default=None, action='store_true', help='Make kompile generate a dynamic llvm library.', ) - args.add_argument( + parser.add_argument( '--enable-llvm-debug', dest='enable_llvm_debug', - default=False, + default=None, action='store_true', help='Make kompile generate debug symbols for llvm.', ) - args.add_argument( + parser.add_argument( '--read-only-kompiled-directory', dest='read_only', - default=False, + default=None, action='store_true', help='Generated a kompiled directory that K will not attempt to write to afterwards.', ) - args.add_argument('-O0', dest='o0', default=False, action='store_true', help='Optimization level 0.') - args.add_argument('-O1', dest='o1', default=False, action='store_true', help='Optimization level 1.') - args.add_argument('-O2', dest='o2', default=False, action='store_true', help='Optimization level 2.') - args.add_argument('-O3', dest='o3', default=False, action='store_true', help='Optimization level 3.') - return args - - @cached_property - def smt_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument('--smt-timeout', dest='smt_timeout', type=int, help='Timeout in ms to use for SMT queries.') - args.add_argument( + parser.add_argument('-O0', dest='o0', default=None, action='store_true', help='Optimization level 0.') + parser.add_argument('-O1', dest='o1', default=None, action='store_true', help='Optimization level 1.') + parser.add_argument('-O2', dest='o2', default=None, action='store_true', help='Optimization level 2.') + parser.add_argument('-O3', dest='o3', default=None, action='store_true', help='Optimization level 3.') + + +class ParallelOptions(Options): + workers: int + + @staticmethod + def default() -> dict[str, Any]: + return { + 'workers': 1, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('--workers', '-j', type=int, help='Number of processes to run in parallel.') + + +class BugReportOptions(Options): + bug_report: BugReport | None + + @staticmethod + def default() -> dict[str, Any]: + return {'bug_report': None} + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( + '--bug-report', + type=bug_report_arg, + help='Generate bug report with given name', + ) + + +class SMTOptions(Options): + smt_timeout: int + smt_retry_limit: int + smt_tactic: str | None + + @staticmethod + def default() -> dict[str, Any]: + return { + 'smt_timeout': 300, + 'smt_retry_limit': 10, + 'smt_tactic': None, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('--smt-timeout', dest='smt_timeout', type=int, help='Timeout in ms to use for SMT queries.') + parser.add_argument( '--smt-retry-limit', dest='smt_retry_limit', type=int, help='Number of times to retry SMT queries with scaling timeouts.', ) - args.add_argument( + parser.add_argument( '--smt-tactic', dest='smt_tactic', type=str, help='Z3 tactic to use when checking satisfiability. Example: (check-sat-using smt)', ) - return args - - @cached_property - def display_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument('--minimize', dest='minimize', default=True, action='store_true', help='Minimize output.') - args.add_argument('--no-minimize', dest='minimize', action='store_false', help='Do not minimize output.') - return args - - @cached_property - def definition_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument( - '-I', type=str, dest='includes', default=[], action='append', help='Directories to lookup K definitions in.' - ) - args.add_argument('--main-module', default=None, type=str, help='Name of the main module.') - args.add_argument('--syntax-module', default=None, type=str, help='Name of the syntax module.') - args.add_argument('--spec-module', default=None, type=str, help='Name of the spec module.') - args.add_argument('--definition', type=dir_path, dest='definition_dir', help='Path to definition to use.') - args.add_argument( - '--md-selector', - type=str, - help='Code selector expression to use when reading markdown.', - ) - return args - - @cached_property - def spec_args(self) -> ArgumentParser: - args = ArgumentParser(add_help=False) - args.add_argument('spec_file', type=file_path, help='Path to spec file.') - args.add_argument('--save-directory', type=ensure_dir_path, help='Path to where CFGs are stored.') - args.add_argument( - '--claim', - type=str, - dest='claim_labels', - action='append', - help='Only prove listed claims, MODULE_NAME.claim-id', - ) - args.add_argument( - '--exclude-claim', - type=str, - dest='exclude_claim_labels', - action='append', - help='Skip listed claims, MODULE_NAME.claim-id', - ) - return args diff --git a/pyk/src/pyk/cli/cli.py b/pyk/src/pyk/cli/cli.py new file mode 100644 index 00000000000..1bbc5e2b438 --- /dev/null +++ b/pyk/src/pyk/cli/cli.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from argparse import ArgumentParser +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from argparse import _SubParsersAction + + +class CLI: + commands: list[type[Command]] + top_level_args: Iterable[type[Options]] + + # Input a list of all Command types to be used + def __init__(self, commands: Iterable[type[Command]], top_level_args: Iterable[type[Options]] = ()): + self.commands = list(commands) + self.top_level_args = top_level_args + + # Return an instance of the correct Options subclass by matching its name with the requested command + def generate_command(self, args: dict[str, Any]) -> Command: + command = args['command'].lower() + for cmd_type in self.commands: + if cmd_type.name() == command: + if issubclass(cmd_type, Options): + return cmd_type(args) + else: + return cmd_type() + raise ValueError(f'Unrecognized command: {command}') + + # Generate the parsers for all commands + def add_parsers(self, base: _SubParsersAction) -> _SubParsersAction: + for cmd_type in self.commands: + base = cmd_type.parser(base) + return base + + def create_argument_parser(self) -> ArgumentParser: + pyk_args = ArgumentParser(parents=[tla.all_args() for tla in self.top_level_args]) + pyk_args_command = pyk_args.add_subparsers(dest='command', required=True) + + pyk_args_command = self.add_parsers(pyk_args_command) + + return pyk_args + + def get_command(self) -> Command: + parser = self.create_argument_parser() + args = parser.parse_args() + stripped_args = { + key: val + for (key, val) in vars(args).items() + if val is not None and not (isinstance(val, Iterable) and not val) + } + return self.generate_command(stripped_args) + + def get_and_exec_command(self) -> None: + cmd = self.get_command() + cmd.exec() + + +class Command(ABC): + @staticmethod + @abstractmethod + def name() -> str: + ... + + @staticmethod + @abstractmethod + def help_str() -> str: + ... + + @abstractmethod + def exec(self) -> None: + ... + + @classmethod + def parser(cls, base: _SubParsersAction) -> _SubParsersAction: + all_args = [cls.all_args()] if issubclass(cls, Options) else [] + base.add_parser( + name=cls.name(), + help=cls.help_str(), + parents=all_args, + ) + return base + + +class Options: + def __init__(self, args: dict[str, Any]) -> None: + # Get defaults from this and all superclasses that define them, preferring the most specific class + defaults: dict[str, Any] = {} + for cl in reversed(type(self).mro()): + if hasattr(cl, 'default'): + defaults = defaults | cl.default() + + # Overwrite defaults with args from command line + _args = defaults | args + + for attr, val in _args.items(): + self.__setattr__(attr, val) + + @classmethod + def all_args(cls: type[Options]) -> ArgumentParser: + # Collect args from this and all superclasses + parser = ArgumentParser(add_help=False) + mro = cls.mro() + mro.reverse() + for cl in mro: + if hasattr(cl, 'update_args') and 'update_args' in cl.__dict__: + cl.update_args(parser) + return parser diff --git a/pyk/src/pyk/cli/pyk.py b/pyk/src/pyk/cli/pyk.py new file mode 100644 index 00000000000..cf638ce37dc --- /dev/null +++ b/pyk/src/pyk/cli/pyk.py @@ -0,0 +1,381 @@ +from __future__ import annotations + +import json +import sys +from argparse import FileType +from enum import Enum +from pathlib import Path +from typing import IO, TYPE_CHECKING, Any + +from graphviz import Digraph + +from pyk.coverage import get_rule_by_id, strip_coverage_logger +from pyk.cterm import CTerm +from pyk.kast.inner import KInner +from pyk.kast.manip import ( + flatten_label, + minimize_rule, + minimize_term, + propagate_up_constraints, + remove_source_map, + split_config_and_constraints, +) +from pyk.kast.outer import read_kast_definition +from pyk.kast.pretty import PrettyPrinter +from pyk.kore.parser import KoreParser +from pyk.kore.rpc import ExecuteResult, StopReason +from pyk.kore.syntax import Pattern, kore_term +from pyk.ktool.kprint import KPrint +from pyk.ktool.kprove import KProve +from pyk.prelude.k import GENERATED_TOP_CELL +from pyk.prelude.ml import is_top, mlAnd, mlOr +from pyk.utils import _LOGGER + +from .args import DefinitionOptions, DisplayOptions, LoggingOptions, OutputFileOptions +from .cli import Command + +if TYPE_CHECKING: + from argparse import ArgumentParser + from collections.abc import Iterable + + +class PrintInput(Enum): + KORE_JSON = 'kore-json' + KAST_JSON = 'kast-json' + + +class JsonToKoreCommand(Command, LoggingOptions): + @staticmethod + def name() -> str: + return 'json-to-kore' + + @staticmethod + def help_str() -> str: + return 'Convert JSON to textual KORE' + + def exec(self) -> None: + text = sys.stdin.read() + kore = Pattern.from_json(text) + kore.write(sys.stdout) + sys.stdout.write('\n') + + +class KoreToJsonCommand(Command, LoggingOptions): + @staticmethod + def name() -> str: + return 'kore-to-json' + + @staticmethod + def help_str() -> str: + return 'Convert textual KORE to JSON' + + def exec(self) -> None: + text = sys.stdin.read() + kore = KoreParser(text).pattern() + print(kore.json) + + +class CoverageCommand(Command, DefinitionOptions, OutputFileOptions, LoggingOptions): + coverage_file: IO[Any] + + @staticmethod + def name() -> str: + return 'coverage' + + @staticmethod + def help_str() -> str: + return 'Convert coverage file to human readable log.' + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('coverage_file', type=FileType('r'), help='Coverage file to build log for.') + + def exec(self) -> None: + kompiled_dir: Path = self.definition_dir + definition = remove_source_map(read_kast_definition(kompiled_dir / 'compiled.json')) + pretty_printer = PrettyPrinter(definition) + for rid in self.coverage_file: + rule = minimize_rule(strip_coverage_logger(get_rule_by_id(definition, rid.strip()))) + self.output_file.write('\n\n') + self.output_file.write('Rule: ' + rid.strip()) + self.output_file.write('\nUnparsed:\n') + self.output_file.write(pretty_printer.print(rule)) + _LOGGER.info(f'Wrote file: {self.output_file.name}') + + +class GraphImportsCommand(Command, DefinitionOptions, LoggingOptions): + @staticmethod + def name() -> str: + return 'graph-imports' + + @staticmethod + def help_str() -> str: + return 'Graph the imports of a given definition.' + + def exec(self) -> None: + kompiled_dir: Path = self.definition_dir + kprinter = KPrint(kompiled_dir) + definition = kprinter.definition + import_graph = Digraph() + graph_file = kompiled_dir / 'import-graph' + for module in definition.modules: + module_name = module.name + import_graph.node(module_name) + for module_import in module.imports: + import_graph.edge(module_name, module_import.name) + import_graph.render(graph_file) + _LOGGER.info(f'Wrote file: {graph_file}') + + +class RPCKastCommand(Command, OutputFileOptions, LoggingOptions): + reference_request_file: IO[Any] + response_file: IO[Any] + + @staticmethod + def name() -> str: + return 'rpc-kast' + + @staticmethod + def help_str() -> str: + return 'Convert an "execute" JSON RPC response to a new "execute" or "simplify" request, copying parameters from a reference request.' + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( + 'reference_request_file', + type=FileType('r'), + help='An input file containing a JSON RPC request to server as a reference for the new request.', + ) + parser.add_argument( + 'response_file', + type=FileType('r'), + help='An input file containing a JSON RPC response with KoreJSON payload.', + ) + + def exec(self) -> None: + """ + Convert an 'execute' JSON RPC response to a new 'execute' or 'simplify' request, + copying parameters from a reference request. + """ + reference_request = json.loads(self.reference_request_file.read()) + input_dict = json.loads(self.response_file.read()) + execute_result = ExecuteResult.from_dict(input_dict['result']) + non_state_keys = set(reference_request['params'].keys()).difference(['state']) + request_params = {} + for key in non_state_keys: + request_params[key] = reference_request['params'][key] + request_params['state'] = {'format': 'KORE', 'version': 1, 'term': execute_result.state.kore.dict} + request = { + 'jsonrpc': reference_request['jsonrpc'], + 'id': reference_request['id'], + 'method': reference_request['method'], + 'params': request_params, + } + self.output_file.write(json.dumps(request)) + + +class RPCPrintCommand(Command, DefinitionOptions, OutputFileOptions, LoggingOptions): + input_file: IO[Any] + + @staticmethod + def name() -> str: + return 'rpc-print' + + @staticmethod + def help_str() -> str: + return 'Pretty-print an RPC request/response' + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( + 'input_file', + type=FileType('r'), + help='An input file containing the JSON RPC request or response with KoreJSON payload.', + ) + + def exec(self) -> None: + kompiled_dir: Path = self.definition_dir + printer = KPrint(kompiled_dir) + input_dict = json.loads(self.input_file.read()) + output_buffer = [] + + def pretty_print_request(request_params: dict[str, Any]) -> list[str]: + output_buffer = [] + non_state_keys = set(request_params.keys()).difference(['state']) + for key in non_state_keys: + output_buffer.append(f'{key}: {request_params[key]}') + state = CTerm.from_kast(printer.kore_to_kast(kore_term(request_params['state']))) + output_buffer.append('State:') + output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) + return output_buffer + + def pretty_print_execute_response(execute_result: ExecuteResult) -> list[str]: + output_buffer = [] + output_buffer.append(f'Depth: {execute_result.depth}') + output_buffer.append(f'Stop reason: {execute_result.reason.value}') + if execute_result.reason == StopReason.TERMINAL_RULE or execute_result.reason == StopReason.CUT_POINT_RULE: + output_buffer.append(f'Stop rule: {execute_result.rule}') + output_buffer.append( + f'Number of next states: {len(execute_result.next_states) if execute_result.next_states is not None else 0}' + ) + state = CTerm.from_kast(printer.kore_to_kast(execute_result.state.kore)) + output_buffer.append('State:') + output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) + if execute_result.next_states is not None: + next_states = [CTerm.from_kast(printer.kore_to_kast(s.kore)) for s in execute_result.next_states] + for i, s in enumerate(next_states): + output_buffer.append(f'Next state #{i}:') + output_buffer.append(printer.pretty_print(s.kast, sort_collections=True)) + return output_buffer + + try: + if 'method' in input_dict: + output_buffer.append('JSON RPC request') + output_buffer.append(f'id: {input_dict["id"]}') + output_buffer.append(f'Method: {input_dict["method"]}') + try: + if 'state' in input_dict['params']: + output_buffer += pretty_print_request(input_dict['params']) + else: # this is an "add-module" request, skip trying to print state + for key in input_dict['params'].keys(): + output_buffer.append(f'{key}: {input_dict["params"][key]}') + except KeyError as e: + _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') + exit(1) + else: + if not 'result' in input_dict: + _LOGGER.critical('The input is neither a request not a resonse') + exit(1) + output_buffer.append('JSON RPC Response') + output_buffer.append(f'id: {input_dict["id"]}') + if list(input_dict['result'].keys()) == ['state']: # this is a "simplify" response + output_buffer.append('Method: simplify') + state = CTerm.from_kast(printer.kore_to_kast(kore_term(input_dict['result']['state']))) + output_buffer.append('State:') + output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) + elif list(input_dict['result'].keys()) == ['module']: # this is an "add-module" response + output_buffer.append('Method: add-module') + output_buffer.append('Module:') + output_buffer.append(input_dict['result']['module']) + else: + try: # assume it is an "execute" response + output_buffer.append('Method: execute') + execute_result = ExecuteResult.from_dict(input_dict['result']) + output_buffer += pretty_print_execute_response(execute_result) + except KeyError as e: + _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') + exit(1) + if self.output_file is not None: + self.output_file.write('\n'.join(output_buffer)) + else: + print('\n'.join(output_buffer)) + except ValueError as e: + # shorten and print the error message in case kore_to_kast throws ValueError + _LOGGER.critical(str(e)[:200]) + exit(1) + + +class PrintCommand(Command, DefinitionOptions, OutputFileOptions, DisplayOptions, LoggingOptions): + term: IO[Any] + input: PrintInput + minimize: bool + omit_labels: str | None + keep_cells: str | None + + @staticmethod + def name() -> str: + return 'print' + + @staticmethod + def help_str() -> str: + return 'Pretty print a term.' + + @staticmethod + def default() -> dict[str, Any]: + return { + 'input': PrintInput.KAST_JSON, + 'omit_labels': None, + 'keep_cells': None, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( + 'term', type=FileType('r'), help='File containing input term (in format specified with --input).' + ) + parser.add_argument('--input', type=PrintInput, choices=list(PrintInput)) + parser.add_argument('--omit-labels', nargs='?', help='List of labels to omit from output.') + parser.add_argument('--keep-cells', nargs='?', help='List of cells with primitive values to keep in output.') + + def exec(self) -> None: + kompiled_dir: Path = self.definition_dir + printer = KPrint(kompiled_dir) + if self.input == PrintInput.KORE_JSON: + _LOGGER.info(f'Reading Kore JSON from file: {self.term.name}') + kore = Pattern.from_json(self.term.read()) + term = printer.kore_to_kast(kore) + else: + _LOGGER.info(f'Reading Kast JSON from file: {self.term.name}') + term = KInner.from_json(self.term.read()) + if is_top(term): + self.output_file.write(printer.pretty_print(term)) + _LOGGER.info(f'Wrote file: {self.output_file.name}') + else: + if self.minimize: + if self.omit_labels != None and self.keep_cells != None: + raise ValueError('You cannot use both --omit-labels and --keep-cells.') + + abstract_labels = self.omit_labels.split(',') if self.omit_labels is not None else [] + keep_cells = self.keep_cells.split(',') if self.keep_cells is not None else [] + minimized_disjuncts = [] + + for disjunct in flatten_label('#Or', term): + try: + minimized = minimize_term(disjunct, abstract_labels=abstract_labels, keep_cells=keep_cells) + config, constraint = split_config_and_constraints(minimized) + except ValueError as err: + raise ValueError('The minimized term does not contain a config cell.') from err + + if not is_top(constraint): + minimized_disjuncts.append(mlAnd([config, constraint], sort=GENERATED_TOP_CELL)) + else: + minimized_disjuncts.append(config) + term = propagate_up_constraints(mlOr(minimized_disjuncts, sort=GENERATED_TOP_CELL)) + + self.output_file.write(printer.pretty_print(term)) + _LOGGER.info(f'Wrote file: {self.output_file.name}') + + +class ProveLegacyCommand(Command, DefinitionOptions, OutputFileOptions, LoggingOptions): + main_file: Path + spec_file: Path + spec_module: str + k_args: Iterable[str] + + @staticmethod + def name() -> str: + return 'prove-legacy' + + @staticmethod + def help_str() -> str: + return 'Prove an input specification (using kprovex).' + + @staticmethod + def default() -> dict[str, Any]: + return { + 'k_args': [], + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('main_file', type=str, help='Main file used for kompilation.') + parser.add_argument('spec_file', type=str, help='File with the specification module.') + parser.add_argument('spec_module', type=str, help='Module with claims to be proven.') + parser.add_argument('k_args', nargs='*', help='Arguments to pass through to K invocation.') + + def exec(self) -> None: + kompiled_dir: Path = self.definition_dir + kprover = KProve(kompiled_dir, self.main_file) + final_state = kprover.prove(Path(self.spec_file), spec_module_name=self.spec_module, args=self.k_args) + self.output_file.write(json.dumps(mlOr([state.kast for state in final_state]).to_dict())) + _LOGGER.info(f'Wrote file: {self.output_file.name}') diff --git a/pyk/src/pyk/cli/utils.py b/pyk/src/pyk/cli/utils.py index c98320fdd60..4f236d0d542 100644 --- a/pyk/src/pyk/cli/utils.py +++ b/pyk/src/pyk/cli/utils.py @@ -12,6 +12,7 @@ from typing import Final, TypeVar from ..kcfg.kcfg import NodeIdLike + from .args import LoggingOptions T1 = TypeVar('T1') T2 = TypeVar('T2') @@ -19,7 +20,7 @@ LOG_FORMAT: Final = '%(levelname)s %(asctime)s %(name)s - %(message)s' -def loglevel(args: Namespace) -> int: +def loglevel(args: LoggingOptions | Namespace) -> int: if args.debug: return logging.DEBUG diff --git a/pyk/src/pyk/kdist/__main__.py b/pyk/src/pyk/kdist/__main__.py index 1a0624310af..b168347831d 100644 --- a/pyk/src/pyk/kdist/__main__.py +++ b/pyk/src/pyk/kdist/__main__.py @@ -2,16 +2,16 @@ import fnmatch import logging -from argparse import ArgumentParser -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any -from pyk.cli.args import KCLIArgs +from pyk.cli.args import LoggingOptions +from pyk.cli.cli import CLI, Command from pyk.cli.utils import loglevel from ..kdist import kdist, target_ids if TYPE_CHECKING: - from argparse import Namespace + from argparse import ArgumentParser from typing import Final @@ -20,42 +20,14 @@ def main() -> None: - args = _parse_arguments() - - logging.basicConfig(level=loglevel(args), format=_LOG_FORMAT) - - if args.command == 'build': - _exec_build(**vars(args)) - - elif args.command == 'clean': - _exec_clean(args.target) - - elif args.command == 'which': - _exec_which(args.target) - - elif args.command == 'list': - _exec_list() - - else: - raise AssertionError() - - -def _exec_build( - command: str, - targets: list[str], - args: list[str], - jobs: int, - force: bool, - verbose: bool, - debug: bool, -) -> None: - kdist.build( - target_ids=_process_targets(targets), - args=_process_args(args), - jobs=jobs, - force=force, - verbose=verbose or debug, + kdist_cli = CLI( + {KDistBuildCommand, KDistCleanCommand, KDistWhichCommand, KDistListCommand}, top_level_args=[LoggingOptions] ) + cmd = kdist_cli.get_command() + assert isinstance(cmd, LoggingOptions) + print(vars(cmd)) + logging.basicConfig(level=loglevel(cmd), format=_LOG_FORMAT) + cmd.exec() def _process_targets(targets: list[str]) -> list[str]: @@ -80,66 +52,137 @@ def _process_args(args: list[str]) -> dict[str, str]: return res -def _exec_clean(target: str | None) -> None: - res = kdist.clean(target) - print(res) +class KDistBuildCommand(Command, LoggingOptions): + targets: list[str] + force: bool + jobs: int + args: list[str] + + @staticmethod + def name() -> str: + return 'build' + + @staticmethod + def help_str() -> str: + return 'build targets' + + @staticmethod + def default() -> dict[str, Any]: + return { + 'force': False, + 'jobs': 1, + 'targets': ['*'], + 'args': [], + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument('targets', metavar='TARGET', nargs='*', help='target to build') + parser.add_argument( + '-a', + '--arg', + dest='args', + metavar='ARG', + action='append', + help='build with argument', + ) + parser.add_argument('-f', '--force', default=None, action='store_true', help='force build') + parser.add_argument('-j', '--jobs', metavar='N', type=int, help='maximal number of build jobs') + + def exec(self) -> None: + print(self.verbose) + print(self.debug) + kdist.build( + target_ids=_process_targets(self.targets), + args=_process_args(self.args), + jobs=self.jobs, + force=self.force, + verbose=self.verbose or self.debug, + ) + + +class KDistCleanCommand(Command, LoggingOptions): + target: str + + @staticmethod + def name() -> str: + return 'clean' + + @staticmethod + def help_str() -> str: + return 'clean targets' + + @staticmethod + def default() -> dict[str, Any]: + return { + 'target': None, + } + + @staticmethod + def update_args(parser: ArgumentParser) -> None: + parser.add_argument( + 'target', + metavar='TARGET', + nargs='?', + help='target to clean', + ) + def exec(self) -> None: + res = kdist.clean(self.target) + print(res) -def _exec_which(target: str | None) -> None: - res = kdist.which(target) - print(res) +class KDistWhichCommand(Command, LoggingOptions): + target: str -def _exec_list() -> None: - targets_by_plugin: dict[str, list[str]] = {} - for plugin_name, target_name in target_ids(): - targets = targets_by_plugin.get(plugin_name, []) - targets.append(target_name) - targets_by_plugin[plugin_name] = targets + @staticmethod + def name() -> str: + return 'which' - for plugin_name in targets_by_plugin: - print(plugin_name) - for target_name in targets_by_plugin[plugin_name]: - print(f'* {target_name}') + @staticmethod + def help_str() -> str: + return 'print target location' + @staticmethod + def default() -> dict[str, Any]: + return { + 'target': None, + } -def _parse_arguments() -> Namespace: - def add_target_arg(parser: ArgumentParser, help_text: str) -> None: + @staticmethod + def update_args(parser: ArgumentParser) -> None: parser.add_argument( 'target', metavar='TARGET', nargs='?', - help=help_text, + help='target to print directory for', ) - k_cli_args = KCLIArgs() - - parser = ArgumentParser(prog='kdist', parents=[k_cli_args.logging_args]) - command_parser = parser.add_subparsers(dest='command', required=True) - - build_parser = command_parser.add_parser('build', help='build targets') - build_parser.add_argument('targets', metavar='TARGET', nargs='*', default='*', help='target to build') - build_parser.add_argument( - '-a', - '--arg', - dest='args', - metavar='ARG', - action='append', - default=[], - help='build with argument', - ) - build_parser.add_argument('-f', '--force', action='store_true', default=False, help='force build') - build_parser.add_argument('-j', '--jobs', metavar='N', type=int, default=1, help='maximal number of build jobs') + def exec(self) -> None: + res = kdist.which(self.target) + print(res) + - clean_parser = command_parser.add_parser('clean', help='clean targets') - add_target_arg(clean_parser, 'target to clean') +class KDistListCommand(Command, LoggingOptions): + @staticmethod + def name() -> str: + return 'list' - which_parser = command_parser.add_parser('which', help='print target location') - add_target_arg(which_parser, 'target to print directory for') + @staticmethod + def help_str() -> str: + return 'print list of available targets' - command_parser.add_parser('list', help='print list of available targets') + def exec(self) -> None: + targets_by_plugin: dict[str, list[str]] = {} + for plugin_name, target_name in target_ids(): + targets = targets_by_plugin.get(plugin_name, []) + targets.append(target_name) + targets_by_plugin[plugin_name] = targets - return parser.parse_args() + for plugin_name in targets_by_plugin: + print(plugin_name) + for target_name in targets_by_plugin[plugin_name]: + print(f'* {target_name}') if __name__ == '__main__':