diff --git a/blark/apischema_compat.py b/blark/apischema_compat.py new file mode 100644 index 0000000..7f0fa49 --- /dev/null +++ b/blark/apischema_compat.py @@ -0,0 +1,150 @@ +""" +Serialization helpers for apischema, an optional dependency. +""" +# Largely based on issue discussions regarding tagged unions. +from __future__ import annotations + +from collections import defaultdict +from collections.abc import Callable, Iterator +from types import new_class +from typing import Any, Dict, Generic, List, Tuple, TypeVar, get_type_hints + +import lark +from apischema import deserializer, serializer, type_name +from apischema.conversions import Conversion +from apischema.metadata import conversion +from apischema.objects import object_deserialization +from apischema.tagged_unions import Tagged, TaggedUnion, get_tagged +from apischema.typing import get_origin +from apischema.utils import to_pascal_case + +_alternative_constructors: Dict[type, List[Callable]] = defaultdict(list) +Func = TypeVar("Func", bound=Callable) + + +def alternative_constructor(func: Func) -> Func: + """Alternative constructor for a given type.""" + return_type = get_type_hints(func)["return"] + _alternative_constructors[get_origin(return_type) or return_type].append(func) + return func + + +def get_all_subclasses(cls: type) -> Iterator[type]: + """Recursive implementation of type.__subclasses__""" + for sub_cls in cls.__subclasses__(): + yield sub_cls + yield from get_all_subclasses(sub_cls) + + +Cls = TypeVar("Cls", bound=type) + + +def _get_generic_name_factory(cls: type, *args: type): + def _capitalized(name: str) -> str: + return name[0].upper() + name[1:] + + return "".join((cls.__name__, *(_capitalized(arg.__name__) for arg in args))) + + +generic_name = type_name(_get_generic_name_factory) + + +def as_tagged_union(cls: Cls) -> Cls: + """ + Tagged union decorator, to be used on base class. + + Supports generics as well, with names generated by way of + `_get_generic_name_factory`. + """ + params = tuple(getattr(cls, "__parameters__", ())) + tagged_union_bases: Tuple[type, ...] = (TaggedUnion,) + + # Generic handling is here: + if params: + tagged_union_bases = (TaggedUnion, Generic[params]) + generic_name(cls) + prev_init_subclass = getattr(cls, "__init_subclass__", None) + + def __init_subclass__(cls, **kwargs): + if prev_init_subclass is not None: + prev_init_subclass(**kwargs) + generic_name(cls) + + cls.__init_subclass__ = classmethod(__init_subclass__) + + def with_params(cls: type) -> Any: + """Specify type of Generic if set.""" + return cls[params] if params else cls + + def serialization() -> Conversion: + """ + Define the serializer Conversion for the tagged union. + + source is the base ``cls`` (or ``cls[T]``). + target is the new tagged union class ``TaggedUnion`` which gets the + dictionary {cls.__name__: obj} as its arguments. + """ + annotations = { + # Assume that subclasses have same generic parameters than cls + sub.__name__: Tagged[with_params(sub)] + for sub in get_all_subclasses(cls) + } + namespace = {"__annotations__": annotations} + tagged_union = new_class( + cls.__name__, tagged_union_bases, exec_body=lambda ns: ns.update(namespace) + ) + return Conversion( + lambda obj: tagged_union(**{obj.__class__.__name__: obj}), + source=with_params(cls), + target=with_params(tagged_union), + # Conversion must not be inherited because it would lead to + # infinite recursion otherwise + inherited=False, + ) + + def deserialization() -> Conversion: + """ + Define the deserializer Conversion for the tagged union. + + Allows for alternative standalone constructors as per the apischema + example. + """ + annotations: dict[str, Any] = {} + namespace: dict[str, Any] = {"__annotations__": annotations} + for sub in get_all_subclasses(cls): + annotations[sub.__name__] = Tagged[with_params(sub)] + for constructor in _alternative_constructors.get(sub, ()): + # Build the alias of the field + alias = to_pascal_case(constructor.__name__) + # object_deserialization uses get_type_hints, but the constructor + # return type is stringified and the class not defined yet, + # so it must be assigned manually + constructor.__annotations__["return"] = with_params(sub) + # Use object_deserialization to wrap constructor as deserializer + deserialization = object_deserialization(constructor, generic_name) + # Add constructor tagged field with its conversion + annotations[alias] = Tagged[with_params(sub)] + namespace[alias] = Tagged(conversion(deserialization=deserialization)) + # Create the deserialization tagged union class + tagged_union = new_class( + cls.__name__, tagged_union_bases, exec_body=lambda ns: ns.update(namespace) + ) + return Conversion( + lambda obj: get_tagged(obj)[1], + source=with_params(tagged_union), + target=with_params(cls), + ) + + deserializer(lazy=deserialization, target=cls) + serializer(lazy=serialization, source=cls) + return cls + + +@serializer +def token_serializer(token: lark.Token) -> List[str]: + return [token.type, token.value] + + +@deserializer +def token_deserializer(parts: List[str]) -> lark.Token: + return lark.Token(*parts) diff --git a/blark/config.py b/blark/config.py new file mode 100644 index 0000000..f1e38f8 --- /dev/null +++ b/blark/config.py @@ -0,0 +1,3 @@ +import os + +BLARK_TWINCAT_ROOT = os.environ.get("BLARK_TWINCAT_ROOT", ".") diff --git a/blark/dependency_store.py b/blark/dependency_store.py new file mode 100644 index 0000000..05349fe --- /dev/null +++ b/blark/dependency_store.py @@ -0,0 +1,357 @@ +""" +TwinCAT project dependency handling. +""" +from __future__ import annotations + +import dataclasses +import distutils.version +import functools +import json +import logging +import pathlib +from dataclasses import dataclass +from typing import Any, Dict, Generator, List, Optional, Tuple, Union + +import pytmc +import pytmc.code + +from . import parse +from . import transform as tf +from . import util +from .config import BLARK_TWINCAT_ROOT +from .summary import CodeSummary + +AnyPath = Union[str, pathlib.Path] + +logger = logging.getLogger(__name__) +_dependency_store = None + + +@dataclass +class ResolvedDependency: + """Resolved dependency version information.""" + name: str + vendor: str + version: str + vendor_short: str + + +@dataclass +class DependencyStoreConfig: + """Dependency store configuration, from ``config.json``.""" + filename: Optional[pathlib.Path] + libraries: Dict[str, DependencyStoreLibrary] + + @classmethod + def from_dict( + cls, config: dict, filename: Optional[pathlib.Path] = None + ) -> DependencyStoreConfig: + libraries = config.get("libraries", {}) + for library_name, library_info in libraries.items(): + libraries[library_name] = DependencyStoreLibrary(**library_info) + return cls(filename=filename, libraries=libraries) + + def as_json(self) -> str: + """Get the configuration as JSON.""" + config = dataclasses.asdict(self) + config.pop("filename") + return json.dumps(config, indent=4) + + def save(self, path: AnyPath) -> None: + """Save the configuration as JSON to a file.""" + with open(path, "wt") as fp: + print(self.as_json(), file=fp) + + +@dataclass +class DependencyStoreLibrary: + name: str + versioned: bool + path: str + project: str + + def get_latest_version_path(self, root: pathlib.Path) -> pathlib.Path: + """ + Get the latest version project filename. + + Returns + ------- + pathlib.Path + """ + def get_version(path): + try: + version = path.name.lstrip('v').replace('-', '.') + version = tuple(distutils.version.LooseVersion(version).version) + if isinstance(version[0], int): + return version + except Exception: + ... + + project_root = root / self.path + + paths = { + (get_version(path), path) for path in project_root.iterdir() + if get_version(path) is not None + } + + for version, path in reversed(sorted(paths)): + project_fn = path / self.project + if project_fn.exists(): + logger.debug( + "Found latest %s %s in %s", + self.name, version, project_fn + ) + return project_fn + + raise FileNotFoundError( + f"No valid versions of {self.name} found in {project_root}" + ) + + def get_project_filename(self, root: pathlib.Path, version: str) -> pathlib.Path: + """Get the full project filename, given the root path and version.""" + if not self.versioned: + return root / self.path / self.project + if version == "*": + return self.get_latest_version_path(root) + + return root / self.path / version / self.project + + +class DependencyStore: + """ + A storage container for dependency configuration and loading. + + Environment variable: ``BLARK_TWINCAT_ROOT`` is required to be set for this + to be functional, along with a "config.json" in that directory. This + should contain information as to the supported library dependencies and + where to find them. + + .. code:: + + { + "libraries": { + "LCLS General": { + "name": "LCLS General", + "versioned": false, + "path": "lcls-twincat-general", + "project": "LCLSGeneral.sln" + }, + "lcls-twincat-motion": { + "name": "lcls-twincat-motion", + "versioned": true, + "path": "lcls-twincat-motion", + "project": "lcls-twincat-motion.sln" + } + } + } + + The above would indicate that the "LCLS General" library + (as named in TwinCAT) is available relative to the root directory in + ``lcls-twincat-general/LCLSGeneral.sln``. + It would also indicate that the "lcls-twincat-motion" library could + be found in + ``lcls-twincat-motion/VERSION/lcls-twincat-motion.sln`` + where VERSION is the project-defined version. + """ + root: pathlib.Path + config: DependencyStoreConfig + + def __init__(self, root: pathlib.Path): + self.root = root + self.load_config() + + @property + def config_filename(self): + """The configuration filename.""" + return (self.root / "config.json").expanduser().resolve() + + def _read_config(self) -> Any: + with open(self.config_filename) as fp: + return json.load(fp) + + def load_config(self): + """Load the dependency store configuration file.""" + try: + config = self._read_config() + except FileNotFoundError: + logger.warning( + "pytmc dependencies will not be loaded as either " + "BLARK_TWINCAT_ROOT is unset or invalid. Expected " + "file %s to exist", + self.root / "config.json" + ) + self.config = DependencyStoreConfig(filename=None, libraries={}) + return + + self.config = DependencyStoreConfig.from_dict( + config, filename=self.config_filename + ) + + @functools.lru_cache(maxsize=50) + def get_dependency(self, name: str, version: str) -> List[PlcProjectMetadata]: + """Get a dependency by name and version number.""" + try: + info: DependencyStoreLibrary = self.config.libraries[name] + except KeyError: + logger.warning("Unable to find library %s in dependency store", name) + return [] + + try: + filename = info.get_project_filename(self.root, version=version) + except FileNotFoundError: + logger.warning("Unable to find library project %s version %s", name, version) + return [] + + if not filename.exists(): + logger.warning( + "Library project %s version %s file %s does not exist", + name, + version, + filename, + ) + return [] + + return list( + PlcProjectMetadata.from_project_filename( + str(filename.resolve()), + # TODO: one level only for now to avoid circular deps + include_dependencies=False, + ) + ) + + def get_dependencies( + self, + plc: pytmc.parser.Plc, + ) -> Generator[Tuple[ResolvedDependency, PlcProjectMetadata], None, None]: + """Get dependency projects from a PLC.""" + for resolution in plc.root.find(pytmc.parser.Resolution): + resolution: pytmc.parser.Resolution + try: + info = ResolvedDependency(**resolution.resolution) + except (KeyError, ValueError) as ex: + logger.warning("Failed to get dependency: %s", ex) + continue + + for proj in self.get_dependency(info.name, info.version): + yield info, proj + + @staticmethod + def get_instance() -> DependencyStore: + """Get the global DependencyStore instance.""" + return get_dependency_store() + + +def get_dependency_store() -> DependencyStore: + """Get the global DependencyStore instance.""" + global _dependency_store + + if _dependency_store is None: + _dependency_store = DependencyStore( + root=pathlib.Path(BLARK_TWINCAT_ROOT) + ) + return _dependency_store + + +@dataclass +class PlcProjectMetadata: + """This is a per-PLC project metadata container.""" + name: str + filename: pathlib.Path + include_dependencies: bool + code: List[tf.SourceCode] + summary: CodeSummary + tmc_symbols: Dict[str, pytmc.parser.Symbol] + loaded_files: Dict[pathlib.Path, str] + dependencies: Dict[str, ResolvedDependency] + plc: Optional[pytmc.parser.Plc] + + @classmethod + def from_pytmc( + cls, + plc: pytmc.parser.Plc, + include_dependencies: bool = True, + ) -> Optional[PlcProjectMetadata]: + """Create a PlcProjectMetadata instance from a pytmc-parsed one.""" + filename = plc.filename.resolve() + loaded_files = {} + deps = {} + code = [] + combined_summary = CodeSummary() + + loaded_files[filename] = util.get_file_sha256(filename) + + if include_dependencies: + store = get_dependency_store() + for resolution, proj in store.get_dependencies(plc): + code.extend(proj.code) + deps.update(proj.dependencies) + loaded_files.update(proj.loaded_files) + deps[resolution.name] = resolution + combined_summary.append(proj.summary, namespace=proj.plc.name) + + for code_path, code_obj in parse.parse_plc(plc, transform=True): + if isinstance(code_obj, Exception): + logger.debug("Failed to load: %s %s", code_path, code_obj) + continue + code.append(code_obj) + loaded_files[code_path] = util.get_file_sha256(code_path) + combined_summary.append(CodeSummary.from_source(code_obj, filename=code_path)) + + tmc = plc.tmc + return cls( + name=plc.name, + filename=filename, + include_dependencies=include_dependencies, + code=code, + dependencies=deps, + loaded_files=loaded_files, + summary=combined_summary, + plc=plc, + tmc_symbols=list(tmc.find(pytmc.parser.Symbol, recurse=False)) if tmc else None, + ) + + @classmethod + def from_project_filename( + cls, + project: AnyPath, + include_dependencies: bool = True, + plc_whitelist: Optional[List[str]] = None, + ) -> Generator[PlcProjectMetadata, None, None]: + """Given a project/solution filename, get all PlcProjectMetadata.""" + solution_path, projects = util.get_tsprojects_from_filename(project) + logger.debug("Solution path %s projects %s", solution_path, projects) + for tsproj_project in projects: + logger.debug("Found tsproj %s", tsproj_project.name) + try: + parsed_tsproj = pytmc.parser.parse(tsproj_project) + except Exception: + logger.exception("Failed to load project %s", tsproj_project.name) + continue + + for plc_name, plc in parsed_tsproj.plcs_by_name.items(): + if plc_whitelist and plc_name not in plc_whitelist: + continue + + logger.debug("Found PLC project %s", plc_name) + plc_md = cls.from_pytmc( + plc, + include_dependencies=include_dependencies, + ) + if plc_md is not None: + yield plc_md + + +def load_projects( + *projects: AnyPath, + include_dependencies: bool = True, + plc_whitelist: Optional[List[str]] = None, +) -> List[PlcProjectMetadata]: + """Load the given projects by filename.""" + result = [] + for project in projects: + mds = PlcProjectMetadata.from_project_filename( + project, include_dependencies=include_dependencies, + plc_whitelist=plc_whitelist, + ) + result.extend(mds) + return result diff --git a/blark/iec.lark b/blark/iec.lark index 90bb7bf..5f6319c 100644 --- a/blark/iec.lark +++ b/blark/iec.lark @@ -27,7 +27,6 @@ _library_element_declaration: data_type_declaration | function_block_method_declaration | function_block_property_declaration | program_declaration - | configuration_declaration | global_var_declarations | action @@ -62,7 +61,9 @@ integer_literal: [ INTEGER_TYPE_NAME "#" ] any_integer | "8#" OCTAL_STRING -> octal_integer | "16#" HEX_STRING -> hex_integer | SIGNED_INTEGER -> signed_integer - | INTEGER -> integer + | integer + +integer: INTEGER real_literal: [ REAL_TYPE_NAME "#" ] /((\+|\-)?[0-9](_?[0-9])*)\.([0-9](_?[0-9])*)((e|E)(\+|\-)?([0-9](_?[0-9])*))?/ | [ REAL_TYPE_NAME "#" ] /((\+|\-)?[0-9](_?[0-9])*)((e|E)(\+|\-)?([0-9](_?[0-9])*))/ @@ -79,8 +80,8 @@ TRUE_VALUE: "1" | "TRUE"i FALSE_VALUE: "0" | "FALSE"i // B.1.2.2 -?string_literal: SINGLE_BYTE_CHARACTER_STRING - | DOUBLE_BYTE_CHARACTER_STRING +string_literal: SINGLE_BYTE_CHARACTER_STRING + | DOUBLE_BYTE_CHARACTER_STRING ESCAPE_CHARACTER: "$$" | "$L" @@ -172,7 +173,7 @@ TYPE_DATETIME: "DATE_AND_TIME"i ?elementary_type_name: NUMERIC_TYPE_NAME | DATE_TYPE_NAME | BIT_STRING_TYPE_NAME - | STRING_TYPE // was: STRING_VAR_TYPE (TODO) + | STRING_TYPE STRING_TYPE: ( STRING | WSTRING ) WS* [ STRING_SPEC_LENGTH ] @@ -256,9 +257,9 @@ pointer_type: POINTER_TO simple_spec_init: [ indirection_type ] simple_specification [ ":=" expression ] -?simple_specification: elementary_type_name - | simple_type_name - | DOTTED_IDENTIFIER +simple_specification: elementary_type_name + | simple_type_name + | DOTTED_IDENTIFIER subrange_type_declaration: subrange_type_name ":" subrange_spec_init @@ -299,7 +300,7 @@ _array_spec_type_name: STRING_TYPE array_initialization: "[" array_initial_element ( "," array_initial_element )* "]" | array_initial_element ( "," array_initial_element )* -array_initial_element: ( INTEGER | enumerated_value ) "(" [ _array_initial_element ] ")" -> array_initial_element_count +array_initial_element: ( integer | enumerated_value ) "(" [ _array_initial_element ] ")" -> array_initial_element_count | _array_initial_element _array_initial_element: constant @@ -359,22 +360,23 @@ field_selector: [ DEREFERENCED ] "." ( variable_name | INTEGER ) multi_element_variable: variable_name ( subscript_list | field_selector )+ // B.1.4.3 -RETAIN: "RETAIN"i -NON_RETAIN: "NON_RETAIN"i - R_EDGE: "R_EDGE"i F_EDGE: "F_EDGE"i ?fb_name: IDENTIFIER -?retain: RETAIN - | NON_RETAIN +VAR_ATTRIB: "RETAIN"i + | "NON_RETAIN"i + | "PERSISTENT"i + | "CONSTANT"i -input_declarations: "VAR_INPUT"i [ retain ] _var_input_body "END_VAR"i ";"* +variable_attributes: VAR_ATTRIB+ -output_declarations: "VAR_OUTPUT"i [ retain ] var_body "END_VAR"i ";"* +input_declarations: "VAR_INPUT"i [ variable_attributes ] _var_input_body "END_VAR"i ";"* -input_output_declarations: "VAR_IN_OUT"i var_body "END_VAR"i ";"* +output_declarations: "VAR_OUTPUT"i [ variable_attributes ] var_body "END_VAR"i ";"* + +input_output_declarations: "VAR_IN_OUT"i [ variable_attributes ] var_body "END_VAR"i ";"* _var_input_body: ( _var_input_body_item ";"+ )* @@ -412,37 +414,34 @@ array_var_declaration: var1_list ":" array_specification structured_var_declaration: var1_list ":" structure_type_name -var_declarations: "VAR"i [ var_declaration_config ] var_body "END_VAR"i ";"* - -static_var_declarations: "VAR_STAT"i var_body "END_VAR"i ";"* - -?var_declaration_config: CONSTANT | PERSISTENT | RETAIN | NON_RETAIN +var_declarations: "VAR"i [ variable_attributes ] var_body "END_VAR"i ";"* -located_var_declarations: "VAR"i [ located_var_config ] [ PERSISTENT ] located_var_decl* "END_VAR"i ";"* +static_var_declarations: "VAR_STAT"i [ variable_attributes ] var_body "END_VAR"i ";"* -?located_var_config: RETAIN - | NON_RETAIN - | CONSTANT +located_var_declarations: "VAR"i [ variable_attributes ] located_var_decl* "END_VAR"i ";"* located_var_decl: [ variable_name ] location ":" _located_var_spec_init ";"+ -external_var_declarations: "VAR_EXTERNAL"i [ CONSTANT ] external_declaration* "END_VAR"i ";"* +external_var_declarations: "VAR_EXTERNAL"i [ variable_attributes ] external_declaration* "END_VAR"i ";"* external_declaration: global_var_name ":" ( simple_specification | subrange_specification | enumerated_specification | array_specification | structure_type_name | function_block_type_name ) ";"+ ?global_var_name: IDENTIFIER -CONSTANT: "CONSTANT"i PERSISTENT: "PERSISTENT"i -global_var_declarations: "VAR_GLOBAL"i [ global_const_or_retain ] [ PERSISTENT ] global_var_body_item* "END_VAR"i ";"* +?action_name: DOTTED_IDENTIFIER + +action: "ACTION"i action_name ":" [ function_block_body ] "END_ACTION"i ";"* -?global_const_or_retain: CONSTANT - | RETAIN +global_var_declarations: "VAR_GLOBAL"i [ variable_attributes ] global_var_body_item* "END_VAR"i ";"* ?global_var_body_item: var_init_decl | global_var_decl -global_var_decl: global_var_spec ":" ( _located_var_spec_init | fb_invocation | function_block_type_name ) ";"+ +global_var_decl: global_var_spec ":" ( _located_var_spec_init | fb_invocation ) ";"+ +// Note - function_block_type_name is also valid here, but it is picked up by +// an equivalent rule +// function_block_type_name global_var_spec: global_var_list | global_var_name (location | incomplete_location) @@ -473,7 +472,7 @@ double_byte_string_var_declaration: var1_list ":" double_byte_string_spec !double_byte_string_spec: "WSTRING"i [ STRING_SPEC_LENGTH ] [ ":=" DOUBLE_BYTE_CHARACTER_STRING ] -incomplete_located_var_declarations: "VAR"i [ retain ] incomplete_located_var_decl* "END_VAR"i ";"* +incomplete_located_var_declarations: "VAR"i [ variable_attributes ] incomplete_located_var_decl* "END_VAR"i ";"* incomplete_located_var_decl: variable_name incomplete_location ":" var_spec ";"+ @@ -494,7 +493,9 @@ string_type_specification: STRING [ STRING_SPEC_LENGTH ] // B.1.5.1 ?derived_function_name: IDENTIFIER -function_declaration: "FUNCTION"i derived_function_name [ ":" simple_specification ] ";"* [ function_var_block+ ] [ function_body ] "END_FUNCTION"i ";"* +indirect_simple_specification: [ indirection_type ] simple_specification + +function_declaration: "FUNCTION"i derived_function_name [ ":" indirect_simple_specification ] ";"* [ function_var_block+ ] [ function_body ] "END_FUNCTION"i ";"* ?function_var_block: input_declarations | output_declarations @@ -502,13 +503,9 @@ function_declaration: "FUNCTION"i derived_function_name [ ":" simple_specificati | static_var_declarations | function_var_declarations -function_var_declarations: "VAR"i [ CONSTANT ] var_body "END_VAR"i ";"* +function_var_declarations: "VAR"i [ variable_attributes ] var_body "END_VAR"i ";"* -function_body: statement_list - | instruction_list - | sequential_function_chart -// TODO? | ladder_diagram -// TODO? | function_block_diagram +?function_body: statement_list function_var_decl: var1_init_decl | array_var_init_decl @@ -546,9 +543,7 @@ END_FUNCTION_BLOCK: "END_FUNCTION_BLOCK"i temp_var_decls: "VAR_TEMP"i var_body "END_VAR"i ";"* -?function_block_body: sequential_function_chart - | statement_list - | instruction_list +?function_block_body: statement_list // TODO: really only a couple of these METHOD_ACCESS: "ABSTRACT"i @@ -560,7 +555,7 @@ METHOD_ACCESS: "ABSTRACT"i method_access: METHOD_ACCESS+ -var_inst_declaration: "VAR_INST"i var_body "END_VAR"i ";"* +var_inst_declaration: "VAR_INST"i [ variable_attributes ] var_body "END_VAR"i ";"* ?method_var_declaration: fb_var_declaration | var_inst_declaration @@ -588,9 +583,9 @@ function_block_property_declaration: "PROPERTY"i [ property_access ] DOTTED_IDEN // B.1.5.3 ?program_type_name: IDENTIFIER -program_declaration: "PROGRAM"i program_type_name [ program_var_declarations ] [ function_block_body ] "END_PROGRAM"i ";"* +program_declaration: "PROGRAM"i program_type_name program_var_declarations [ function_block_body ] "END_PROGRAM"i ";"* -program_var_declarations: program_var_declaration+ +program_var_declarations: [ program_var_declaration+ ] ?program_var_declaration: input_declarations | output_declarations @@ -605,6 +600,11 @@ program_var_declarations: program_var_declaration+ program_access_decls: "VAR_ACCESS"i (program_access_decl ";"+)+ "END_VAR"i ";"* +!?access_direction: "READ_WRITE"i + | "READ_ONLY"i + +?access_name: IDENTIFIER + program_access_decl: access_name ":" symbolic_variable ":" non_generic_type_name [ access_direction ] // B.1.6 @@ -612,115 +612,6 @@ program_access_decl: access_name ":" symbolic_variable ":" non_generic_type_name INITIAL_STEP: "INITIAL_STEP"i STEP: "STEP"i -sequential_function_chart: sfc_network+ - -sfc_network: sfc_initial_step ( sfc_step | sfc_transition | action | entry_action | exit_action )* -sfc_initial_step: INITIAL_STEP step_name ":" [ sfc_step_body ] ";"* "END_STEP"i ";"* -sfc_step: STEP step_name ":" [ sfc_step_body ] ";"* "END_STEP"i ";"* - -?sfc_step_body: sequential_function_chart - | statement_list - | action_association - -action_association: action_name "(" [ action_qualifier ] ( "," indicator_name )* ")" - -?action_name: DOTTED_IDENTIFIER - -action_qualifier: NON_TIMED_QUALIFIER - | TIMED_QUALIFIER "," action_time - -NON_TIMED_QUALIFIER: "N" - | "R" - | "S" - | "P" - | "P0" - | "P1" -TIMED_QUALIFIER: "L" - | "D" - | "SD" - | "DS" - | "SL" - -?action_time: duration - | variable_name - -?indicator_name: variable_name - -sfc_transition: "TRANSITION"i [ _transition_name ] [ transition_priority ] "FROM"i sfc_transition_steps "TO"i sfc_transition_steps transition_condition "END_TRANSITION"i ";"* - -?transition_priority: "(" "PRIORITY"i ":=" INTEGER ")" - -_transition_name: [ LOGICAL_NOT ] DOTTED_IDENTIFIER - -sfc_transition_steps: step_name - | "(" step_name ( "," step_name )* ")" - -?transition_condition: ":=" ( expression ";"* ) - -action: "ACTION"i action_name ":" [ function_block_body ] "END_ACTION"i ";"* - -entry_action: "ENTRY_ACTION"i [ function_block_body ] "END_ACTION"i ";"* - -exit_action: "EXIT_ACTION"i [ function_block_body ] "END_ACTION"i ";"* - -// B.1.7 -?access_name: IDENTIFIER -?configuration_name: IDENTIFIER -?program_name: IDENTIFIER -?resource_name: IDENTIFIER -?resource_type_name: IDENTIFIER -?task_name: IDENTIFIER - -configuration_declaration: "CONFIGURATION"i configuration_name [ global_var_declarations ] ( single_resource_declaration | resource_declaration+ ) [ config_access_declarations ] [ instance_specific_initializations ] "END_CONFIGURATION"i ";"* - -resource_declaration: "RESOURCE"i resource_name "ON"i resource_type_name [ global_var_declarations ] single_resource_declaration "END_RESOURCE"i ";"* - -single_resource_declaration: ( task_configuration ";"+ )* ( program_configuration ";"+ )+ - -config_access_declarations: "VAR_ACCESS"i ( config_access_declaration ";"+ )* "END_VAR"i ";"* - -config_access_declaration: access_name ":" access_path ":" non_generic_type_name [ access_direction ] - -access_path: [ resource_name "." ] direct_variable -> direct_access_path - | [ resource_name "." ] [ program_name "." ] ( fb_name "." )* symbolic_variable - -global_var_reference: [ resource_name "." ] global_var_name [ "." structure_element_name ] - -!?access_direction: "READ_WRITE"i - | "READ_ONLY"i - -task_configuration: "TASK"i task_name task_initialization - -task_initialization: "(" [ "SINGLE"i ":=" task_init_data_source "," ] [ "INTERVAL"i ":=" task_init_data_source "," ] "PRIORITY"i ":=" task_priority ")" - -?task_priority: INTEGER - -// program_output_reference: program_name "." symbolic_variable -?task_init_data_source: constant - | _variable // global_var_reference or program_output_reference - -program_configuration: "PROGRAM"i [ retain ] program_name [ "WITH"i task_name ] ":" program_type_name [ "(" prog_conf_elements ")" ] - -prog_conf_elements: prog_conf_element ( "," prog_conf_element )* - -prog_conf_element: fb_task - | prog_cnxn - -fb_task: fb_name "WITH"i task_name - -prog_cnxn: symbolic_variable ":=" prog_data_source - | symbolic_variable "=>" data_sink - -?prog_data_source: constant - | enumerated_value - | _variable - -?data_sink: _variable - -instance_specific_initializations: "VAR_CONFIG"i ( instance_specific_init ";"+ )* "END_VAR"i ";"* - -instance_specific_init: resource_name "." program_name "." ( fb_name "." )* ( ( variable_name [ location ] ":" _located_var_spec_init ) | ( fb_name ":" function_block_type_name ":=" structure_initialization ) ) - // B.2.1, B.3.1 LOGICAL_OR: "OR"i LOGICAL_XOR: "XOR"i @@ -799,9 +690,15 @@ _statement: ";" | set_statement | reset_statement | reference_assignment_statement - | _subprogram_control_statement - | _selection_statement - | _iteration_statement + | return_statement + | fb_invocation_statement + | if_statement + | case_statement + | for_statement + | while_statement + | repeat_statement + | exit_statement + // B.3.2.1 no_op_statement: _variable ";"+ @@ -818,10 +715,10 @@ reference_assignment_statement: _variable "REF="i expression ";"+ method_statement: symbolic_variable "(" ")" ";"+ // B.3.2.2 -return_statement: "RETURN"i ";"* +return_statement.1: "RETURN"i ";"* +// return_statement: priority > 0 so that it doesn't clash with no_op_statement -_subprogram_control_statement: return_statement - | fb_invocation ";"+ +fb_invocation_statement: fb_invocation ";"+ fb_invocation: symbolic_variable "(" [ param_assignment ( "," param_assignment )* ","? ] ")" @@ -830,14 +727,13 @@ param_assignment: [ LOGICAL_NOT ] variable_name "=>" [ expression ] -> output_pa | expression // B.3.2.3 -_selection_statement: if_statement - | case_statement - if_statement: "IF"i expression "THEN"i [ statement_list ] ( else_if_clause )* [ else_clause ] "END_IF"i ";"* else_if_clause: "ELSIF"i expression "THEN"i [ statement_list ] else_clause: "ELSE"i [ statement_list ] -case_statement: "CASE"i expression "OF"i case_element+ [ else_clause ] "END_CASE"i ";"* +case_statement: "CASE"i expression "OF"i case_elements [ else_clause ] "END_CASE"i ";"* + +case_elements: case_element+ case_element: case_list ":" [ statement_list ] @@ -851,11 +747,6 @@ case_list: case_list_element ( "," case_list_element )* // B.3.2.4 ?control_variable: symbolic_variable -_iteration_statement: for_statement - | while_statement - | repeat_statement - | exit_statement - for_statement: "FOR"i control_variable ":=" _for_list "DO"i statement_list "END_FOR"i ";"* _for_list: expression "TO"i expression [ "BY"i expression ] @@ -864,104 +755,4 @@ while_statement: "WHILE"i expression "DO"i statement_list "END_WHILE"i ";"* repeat_statement: "REPEAT"i statement_list "UNTIL"i expression "END_REPEAT"i ";"* -exit_statement: "EXIT"i ";"* - -// Instruction list section -_EOL: /\s*[\n\r]+\s*/ - -?il_label: IDENTIFIER -?function_name: DOTTED_IDENTIFIER - -instruction_list: il_instruction+ - -il_instruction: [ il_label ":" ] [ il_single_operation ] _EOL - -?il_single_operation: il_simple_operation - | il_expression - | il_jump_operation - | il_fb_call - | il_formal_function_call - | il_return_operator - -il_simple_operation: il_simple_operator /\s+/ il_operand? - | function_name /\s+/ _il_operand_list? - -il_expression: il_expr_operator "(" [ il_operand ] [ _EOL il_simple_instruction+ ] ")" - -il_jump_operation: il_jump_operator il_label - -il_fb_call: il_call_operator fb_name [ _il_fb_call_args ] - -_il_fb_call_args: "(" _EOL [ _il_param_list ] ")" - | "(" [ _il_operand_list ] ")" - -il_formal_function_call: function_name "(" _EOL [ _il_param_list ] ")" - -?il_operand: constant - | _variable - | enumerated_value - -_il_operand_list: il_operand ( "," il_operand )* - -?il_simple_instruction: il_simple_operation _EOL - | il_expression _EOL - | il_formal_function_call _EOL - -_il_param_list: ( il_any_param_assignment "," _EOL )* il_any_param_assignment _EOL - -?il_any_param_assignment: il_param_operand_assignment - | il_param_instruction_assignment - | il_param_out_assignment - -il_param_operand_assignment: variable_name ":=" il_operand - -il_param_instruction_assignment: variable_name ":=" "(" _EOL il_simple_instruction+ ")" - -il_param_out_assignment: [ IL_OPERATOR_NOT ] variable_name "=>" _variable - -// B.2.2 -IL_OPERATOR_NOT: "NOT"i -IL_OPERATOR_ANDN: "ANDN"i | "&N"i -IL_OPERATOR_AND: "AND"i | "&" - -!?il_simple_operator: "LD"i | "LDN"i - | "ST"i | "STN"i - | "NOT"i - | "S"i | "S1"i - | "R"i | "R1"i - | "CLK"i - | "CU"i | "CD"i - | "PV"i - | "IN"i - | "PT"i - | il_expr_operator - -!?il_expr_operator: IL_OPERATOR_ANDN - | IL_OPERATOR_AND - | "OR"i - | "XOR"i - | "ORN"i - | "XORN"i - | "ADD"i - | "SUB"i - | "MUL"i - | "DIV"i - | "MOD"i - | "GT"i - | "GE"i - | "EQ"i - | "LT"i - | "LE"i - | "NE"i - -!?il_call_operator: "CAL"i - | "CALC"i - | "CALCN"i - -!il_return_operator: "RET"i - | "RETC"i - | "RETCN"i - -!?il_jump_operator: "JMP"i - | "JMPC"i - | "JMPCN"i +exit_statement.1: "EXIT"i ";"* diff --git a/blark/parse.py b/blark/parse.py index 2606d96..0cda1c5 100644 --- a/blark/parse.py +++ b/blark/parse.py @@ -6,7 +6,7 @@ import pathlib import sys import traceback -from typing import Any, Generator, Optional, Tuple, Union +from typing import Generator, Optional, Tuple, Union import lark import pytmc @@ -57,6 +57,9 @@ def get_parser() -> lark.Lark: _DEFAULT_PREPROCESSORS = object() +ParseResult = Union[Exception, tf.SourceCode, lark.Tree] + + def parse_source_code( source_code: str, *, @@ -65,7 +68,7 @@ def parse_source_code( preprocessors=_DEFAULT_PREPROCESSORS, parser: Optional[lark.Lark] = None, transform: bool = True, -): +) -> Union[tf.SourceCode, lark.Tree]: """ Parse source code and return the transformed result. @@ -139,49 +142,57 @@ def parse_single_file(fn, *, transform: bool = True): return parse_source_code(source_code, fn=fn, transform=transform) +def parse_plc( + plc: pytmc.parser.Plc, + *, + verbose: int = 0, + transform: bool = True +) -> Generator[Tuple[pathlib.Path, ParseResult], None, None]: + """Parse a single PLC instance from pytmc.""" + source_items = ( + item + for item in ( + list(plc.dut_by_name.values()) + + list(plc.gvl_by_name.values()) + + list(plc.pou_by_name.values()) + ) + if hasattr(item, "get_source_code") + ) + for source_item in source_items: + source_code = source_item.get_source_code() + if not source_code: + continue + + try: + yield source_item.filename, parse_source_code( + source_code, + fn=source_item.filename, + verbose=verbose, + transform=transform, + ) + except Exception as ex: + tb = traceback.format_exc() + ex.traceback = tb + yield source_item.filename, ex + + def parse_project( tsproj_project: AnyFile, *, verbose: int = 0, transform: bool = True -) -> Generator[Tuple[pathlib.Path, Any], None, None]: +) -> Generator[Tuple[pathlib.Path, ParseResult], None, None]: """Parse an entire tsproj project file.""" - proj_path = pathlib.Path(tsproj_project) - proj_root = proj_path.parent.resolve().absolute() # noqa: F841 TODO - + proj_path = pathlib.Path(tsproj_project).resolve() if proj_path.suffix.lower() not in (".tsproj",): raise ValueError("Expected a .tsproj file") project = pytmc.parser.parse(proj_path) - results = {} - success = True - for i, plc in enumerate(project.plcs, 1): - source_items = ( - list(plc.dut_by_name.items()) - + list(plc.gvl_by_name.items()) - + list(plc.pou_by_name.items()) - ) - for name, source_item in source_items: - if not hasattr(source_item, "get_source_code"): - continue - - source_code = source_item.get_source_code() - if not source_code: - continue - - try: - yield source_item.filename, parse_source_code( - source_code, fn=source_item.filename, verbose=verbose - ) - except Exception as ex: - tb = traceback.format_exc() - ex.traceback = tb - yield source_item.filename, ex - - return success, results + for plc in project.plcs: + yield from parse_plc(plc, verbose=verbose, transform=transform) -def parse(path: AnyPath) -> Generator[Tuple[pathlib.Path, Any], None, None]: +def parse(path: AnyPath) -> Generator[Tuple[pathlib.Path, ParseResult], None, None]: """ Parse the given source code file (or all files from the given project). """ diff --git a/blark/sphinxdomain.py b/blark/sphinxdomain.py index 7e6f9e7..3080711 100644 --- a/blark/sphinxdomain.py +++ b/blark/sphinxdomain.py @@ -46,14 +46,9 @@ def instance(): def find_by_name(self, name: str): for item in self.cache.values(): - for container in ( - item.function_blocks, - item.functions, - item.data_types, - ): - obj = container.get(name, None) - if obj is not None: - return obj + obj = item.find(name) + if obj is not None: + return obj raise KeyError(f"{name!r} not found") @@ -128,7 +123,7 @@ def declaration_to_signature( def declaration_to_content(obj: summary.DeclarationSummary): if obj.value: - default = nodes.paragraph(text="Default:") + default = nodes.paragraph(text="Default: ") default += addnodes.literal_strong(text=str(obj.value)) yield default @@ -197,7 +192,11 @@ def handle_signature( self, sig: str, signode: addnodes.desc_signature ) -> Tuple[str, str]: self.block_header = sig.upper() - func = self.env.ref_context["bk:function"] + func = self.env.ref_context.get("bk:function", None) + if func is None: + self.declarations = None + return "", "" + self.parent_name = func.name self.declarations = list(func.declarations_by_block[self.block_header].values()) signode += addnodes.desc_name( @@ -207,6 +206,9 @@ def handle_signature( return self.block_header, "" def transform_content(self, contentnode: addnodes.desc_content) -> None: + if not self.env.ref_context.get("bk:function", None): + return + contentnode += declarations_to_block(self.declarations, env=self.env) @@ -273,6 +275,19 @@ def decl_items() -> Generator[List[nodes.Node], None, None]: ) +class MissingDeclaration: + name: str + declarations: dict + declarations_by_block: dict + source_code: str + + def __init__(self, name: str): + self.declarations = {} + self.declarations_by_block = {} + self.name = name + self.source_code = f"(Missing item: {name})" + + class BlarkDirectiveWithDeclarations(BlarkDirective): obj: Union[summary.FunctionSummary, summary.FunctionBlockSummary] doc_field_types = [ @@ -299,6 +314,7 @@ def handle_signature(self, sig: str, signode: addnodes.desc_signature) -> Tuple[ try: self.obj = BlarkSphinxCache.instance().find_by_name(sig) except KeyError: + self.obj = MissingDeclaration(sig) logger.error( "Could not find object: %r (signatures unsupported)", sig ) @@ -368,7 +384,11 @@ def _get_links(self) -> Generator[addnodes.desc, None, None]: def _get_basic_variable_blocks(self) -> Generator[addnodes.desc, None, None]: """Get the usual input/output variable blocks as sphinx nodes.""" - for block in ("VAR_INPUT", "VAR_IN_OUT", "VAR_OUTPUT"): + blocks = ("VAR_INPUT", "VAR_IN_OUT", "VAR_OUTPUT", "VAR_GLOBAL") + if isinstance(self.obj, summary.ProgramSummary): + blocks += ("VAR", ) + + for block in blocks: decls = self.obj.declarations_by_block.get(block, {}) if not decls: continue @@ -448,6 +468,16 @@ class TypeDirective(BlarkDirectiveWithDeclarations): signature_prefix: ClassVar[str] = "TYPE" +class ProgramDirective(BlarkDirectiveWithDeclarations): + obj: summary.ProgramSummary + signature_prefix: ClassVar[str] = "PROGRAM" + + +class GvlDirective(BlarkDirectiveWithDeclarations): + obj: summary.GlobalVariableSummary + signature_prefix: ClassVar[str] = "GVL" + + class BlarkXRefRole(XRefRole): def process_link(self, env, refnode, has_explicit_title, title, target): refnode["bk:scope"] = list(env.ref_context.get("bk:scope", [])) @@ -471,40 +501,48 @@ class BlarkDomain(Domain): name = "bk" label = "Blark" object_types: ClassVar[Dict[str, ObjType]] = { - "function_block": ObjType(l_("functionblock"), l_("function_block"), l_("fb")), + "declaration": ObjType(l_("declaration"), "declaration"), "function": ObjType(l_("function"), l_("func")), - "type": ObjType(l_("type"), "type"), + "function_block": ObjType(l_("functionblock"), l_("function_block"), l_("fb")), + "gvl": ObjType(l_("gvl"), "global_variable_list"), "module": ObjType(l_("module"), "mod"), - "variable_block": ObjType(l_("variable_block"), "var"), + "program": ObjType(l_("program"), ), "source_code": ObjType(l_("source_code"), "plc_source"), - "declaration": ObjType(l_("declaration"), "declaration"), + "type": ObjType(l_("type"), "type"), + "variable_block": ObjType(l_("variable_block"), "var"), } directives: ClassVar[Dict[str, Type[BlarkDirective]]] = { - "function_block": FunctionBlockDirective, - "function": FunctionDirective, - "variable_block": VariableBlockDirective, "declaration": DeclarationDirective, + "function": FunctionDirective, + "function_block": FunctionBlockDirective, + "gvl": GvlDirective, + "program": ProgramDirective, "type": TypeDirective, + "variable_block": VariableBlockDirective, } roles: Dict[str, BlarkXRefRole] = { - "function_block": BlarkXRefRole(fix_parens=False), - "function": BlarkXRefRole(fix_parens=False), + "declaration": BlarkXRefRole(), "fb": BlarkXRefRole(fix_parens=False), - "type": BlarkXRefRole(), + "function": BlarkXRefRole(fix_parens=False), + "function_block": BlarkXRefRole(fix_parens=False), + "gvl": BlarkXRefRole(fix_parens=False), "mod": BlarkXRefRole(), - "declaration": BlarkXRefRole(), + "program": BlarkXRefRole(fix_parens=False), + "type": BlarkXRefRole(), } initial_data: ClassVar[str, Dict[str, Any]] = { - "module": {}, - "type": {}, - "function": {}, + "action": {}, "declaration": {}, + "function": {}, "function_block": {}, + "gvl": {}, "method": {}, - "action": {}, + "module": {}, + "program": {}, + "type": {}, } indices: List[Index] = [ # BlarkModuleIndex, diff --git a/blark/summary.py b/blark/summary.py index 7562587..885139f 100644 --- a/blark/summary.py +++ b/blark/summary.py @@ -1,9 +1,11 @@ from __future__ import annotations +import collections +import pathlib import textwrap import typing from dataclasses import dataclass, field, fields, is_dataclass -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Union from . import transform as tf @@ -77,6 +79,7 @@ class Summary: """Base class for summary objects.""" comments: List[str] pragmas: List[str] + filename: Optional[pathlib.Path] meta: Optional[tf.Meta] = field(repr=False) def __str__(self) -> str: @@ -110,7 +113,8 @@ def get_meta_kwargs(meta: Optional[tf.Meta]) -> Dict[str, Any]: class DeclarationSummary(Summary): """Summary representation of a single declaration.""" name: str - parent: str + item: Union[tf.Declaration, tf.GlobalVariableDeclaration] + parent: Optional[str] location: Optional[str] block: str base_type: str @@ -147,43 +151,49 @@ def from_declaration( Union[tf.Function, tf.Method, tf.FunctionBlock, tf.StructureTypeDeclaration] ] = None, block_header: str = "unknown", + filename: Optional[pathlib.Path] = None, ) -> Dict[str, DeclarationSummary]: result = {} - # OK, a bit lazy for now - try: - spec = item.init.spec - except AttributeError: - spec = item.init - spec = getattr(spec, "name", spec) - - if isinstance(spec, (str, tf.SimpleVariable)): # TODO - base_type = str(spec) - elif hasattr(spec, "type_name"): - base_type = str(spec.type_name) - elif hasattr(spec, "type"): - if hasattr(spec.type, "type_name"): - base_type = spec.type.type_name - else: - base_type = str(spec.type) - else: - raise ValueError(f"TODO: {type(spec)}") - - try: - value = item.init.value - except AttributeError: - value = item.init - for var in item.variables: name = getattr(var, "name", var) location = getattr(var, "location", None) result[name] = DeclarationSummary( name=str(name), + item=item, location=str(location).replace("AT ", "") if location else None, block=block_header, - type=str(spec), - base_type=base_type, - value=value, + type=item.init.full_type_name, + base_type=item.init.base_type_name, + value=str(item.init.value), + parent=parent.name if parent is not None else "", + filename=filename, + **Summary.get_meta_kwargs(item.meta), + ) + return result + + @classmethod + def from_global_variable( + cls, + item: tf.GlobalVariableDeclaration, + parent: Optional[tf.GlobalVariableDeclarations] = None, + block_header: str = "VAR_GLOBAL", + filename: Optional[pathlib.Path] = None, + ) -> Dict[str, DeclarationSummary]: + result = {} + location = (str(item.spec.location or "").replace("AT ", "")) or None + + for var in item.spec.variables: + name = getattr(var, "name", var) + result[name] = DeclarationSummary( + name=str(name), + item=item, + location=location, + block=block_header, + type=item.full_type_name, + base_type=item.base_type_name, + value=str(item.init.value), parent=parent.name if parent is not None else "", + filename=filename, **Summary.get_meta_kwargs(item.meta), ) return result @@ -193,11 +203,13 @@ def from_block( cls, block: tf.VariableDeclarationBlock, parent: Union[tf.Function, tf.Method, tf.FunctionBlock], + filename: Optional[pathlib.Path] = None, ) -> Dict[str, DeclarationSummary]: result = {} for decl in block.items: result.update( - cls.from_declaration(decl, parent=parent, block_header=block.block_header) + cls.from_declaration(decl, parent=parent, block_header=block.block_header, + filename=filename) ) return result @@ -206,16 +218,27 @@ def from_block( class ActionSummary(Summary): """Summary representation of a single action.""" name: str + item: tf.Action source_code: str + def __getitem__(self, key: str): + raise KeyError(f"{key}: Actions do not contain declarations") + @classmethod - def from_action(cls, action: tf.Action, source_code: Optional[str] = None) -> ActionSummary: + def from_action( + cls, + action: tf.Action, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, + ) -> ActionSummary: if source_code is None: source_code = str(action) return ActionSummary( name=str(action.name), + item=action, source_code=source_code, + filename=filename, **Summary.get_meta_kwargs(action.meta), ) @@ -224,10 +247,14 @@ def from_action(cls, action: tf.Action, source_code: Optional[str] = None) -> Ac class MethodSummary(Summary): """Summary representation of a single method.""" name: str + item: tf.Method return_type: Optional[str] source_code: str declarations: Dict[str, DeclarationSummary] = field(default_factory=dict) + def __getitem__(self, key: str) -> DeclarationSummary: + return self.declarations[key] + @property def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: result = {} @@ -236,30 +263,72 @@ def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: return result @classmethod - def from_method(cls, method: tf.Method, source_code: Optional[str] = None) -> MethodSummary: + def from_method( + cls, + method: tf.Method, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, + ) -> MethodSummary: if source_code is None: source_code = str(method) summary = MethodSummary( name=method.name, + item=method, return_type=str(method.return_type) if method.return_type else None, source_code=source_code, + filename=filename, **Summary.get_meta_kwargs(method.meta), ) for decl in method.declarations: - summary.declarations.update(DeclarationSummary.from_block(decl, parent=method)) + summary.declarations.update( + DeclarationSummary.from_block(decl, parent=method, filename=filename) + ) return summary +@dataclass +class PropertySummary(Summary): + """Summary representation of a single property.""" + name: str + item: tf.Property + source_code: str + + def __getitem__(self, key: str): + raise KeyError(f"{key}: Properties do not contain declarations") + + @classmethod + def from_property( + cls, + property: tf.Property, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, + ) -> PropertySummary: + if source_code is None: + source_code = str(property) + + return PropertySummary( + name=str(property.name), + item=property, + source_code=source_code, + filename=filename, + **Summary.get_meta_kwargs(property.meta), + ) + + @dataclass class FunctionSummary(Summary): """Summary representation of a single function.""" name: str + item: tf.Function return_type: Optional[str] source_code: str declarations: Dict[str, DeclarationSummary] = field(default_factory=dict) + def __getitem__(self, key: str) -> DeclarationSummary: + return self.declarations[key] + @property def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: result = {} @@ -269,20 +338,27 @@ def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: @classmethod def from_function( - cls, func: tf.Function, source_code: Optional[str] = None + cls, + func: tf.Function, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, ) -> FunctionSummary: if source_code is None: source_code = str(func) summary = FunctionSummary( name=func.name, + item=func, return_type=str(func.return_type) if func.return_type else None, source_code=source_code, + filename=filename, **Summary.get_meta_kwargs(func.meta), ) for decl in func.declarations: - summary.declarations.update(DeclarationSummary.from_block(decl, parent=func)) + summary.declarations.update( + DeclarationSummary.from_block(decl, parent=func, filename=filename) + ) return summary @@ -292,10 +368,21 @@ class FunctionBlockSummary(Summary): """Summary representation of a single function block.""" name: str source_code: str + item: tf.FunctionBlock + extends: Optional[str] + squashed: bool declarations: Dict[str, DeclarationSummary] = field(default_factory=dict) actions: List[ActionSummary] = field(default_factory=list) methods: List[MethodSummary] = field(default_factory=list) + def __getitem__(self, key: str) -> DeclarationSummary: + if key in self.declarations: + return self.declarations[key] + for item in self.actions + self.methods: + if item.name == key: + return item + raise KeyError(key) + @property def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: result = {} @@ -305,32 +392,80 @@ def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: @classmethod def from_function_block( - cls, fb: tf.FunctionBlock, source_code: Optional[str] = None + cls, + fb: tf.FunctionBlock, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, ) -> FunctionBlockSummary: if source_code is None: source_code = str(fb) summary = FunctionBlockSummary( name=fb.name, + item=fb, source_code=source_code, + filename=filename, + extends=fb.extends.name if fb.extends else None, + squashed=False, **Summary.get_meta_kwargs(fb.meta), ) for decl in fb.declarations: - summary.declarations.update(DeclarationSummary.from_block(decl, parent=fb)) + summary.declarations.update( + DeclarationSummary.from_block(decl, parent=fb, filename=filename) + ) return summary + def squash_base_extends( + self, function_blocks: Dict[str, FunctionBlockSummary] + ) -> FunctionBlockSummary: + """Squash the "EXTENDS" function block into this one.""" + if self.extends is None: + return self + + extends_from = function_blocks.get(self.extends, None) + if extends_from is None: + return self + + if extends_from.extends: + extends_from = extends_from.squash_base_extends(function_blocks) + + declarations = dict(extends_from.declarations) + declarations.update(self.declarations) + actions = list(extends_from.actions) + self.actions + methods = list(extends_from.methods) + self.methods + return FunctionBlockSummary( + name=self.name, + comments=extends_from.comments + self.comments, + pragmas=extends_from.pragmas + self.pragmas, + meta=self.meta, + filename=self.filename, + source_code="\n\n".join((extends_from.source_code, self.source_code)), + item=self.item, + extends=self.extends, + declarations=declarations, + actions=actions, + methods=methods, + squashed=True, + ) + @dataclass class DataTypeSummary(Summary): """Summary representation of a single data type.""" # Note: structures only for now. name: str + item: tf.TypeDeclarationItem source_code: str type: str + extends: Optional[str] + squashed: bool = False declarations: Dict[str, DeclarationSummary] = field(default_factory=dict) + def __getitem__(self, key: str) -> DeclarationSummary: + return self.declarations[key] + @property def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: return { @@ -339,26 +474,183 @@ def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: @classmethod def from_data_type( - cls, dtype: tf.TypeDeclarationItem, source_code: Optional[str] = None + cls, + dtype: tf.TypeDeclarationItem, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, ) -> DataTypeSummary: if source_code is None: source_code = str(dtype) - summary = DataTypeSummary( + if isinstance(dtype, tf.StructureTypeDeclaration): + extends = dtype.extends.name if dtype.extends else None + else: + extends = None + + summary = cls( name=dtype.name, + item=dtype, + extends=extends, source_code=source_code, type=type(dtype).__name__, + filename=filename, + squashed=False, **Summary.get_meta_kwargs(dtype.meta), ) if isinstance(dtype, tf.StructureTypeDeclaration): for decl in dtype.declarations: summary.declarations.update( - DeclarationSummary.from_declaration(decl, parent=dtype, block_header="STRUCT") + DeclarationSummary.from_declaration( + decl, + parent=dtype, + block_header="STRUCT", + filename=filename, + ) ) return summary + def squash_base_extends( + self, data_types: Dict[str, DataTypeSummary] + ) -> DataTypeSummary: + """Squash the "EXTENDS" function block into this one.""" + if self.extends is None: + return self + + extends_from = data_types.get(self.extends, None) + if extends_from is None: + return self + + if extends_from.extends: + extends_from = extends_from.squash_base_extends(data_types) + + declarations = dict(extends_from.declarations) + declarations.update(self.declarations) + raise + return DataTypeSummary( + name=self.name, + type=self.type, + comments=extends_from.comments + self.comments, + pragmas=extends_from.pragmas + self.pragmas, + meta=self.meta, + filename=self.filename, + source_code="\n\n".join((extends_from.source_code, self.source_code)), + item=self.item, + extends=self.extends, + declarations=declarations, + squashed=True, + ) + + +@dataclass +class GlobalVariableSummary(Summary): + """Summary representation of a VAR_GLOBAL block.""" + name: str + item: tf.GlobalVariableDeclarations + source_code: str + type: str + qualified_only: bool = False + declarations: Dict[str, DeclarationSummary] = field(default_factory=dict) + + def __getitem__(self, key: str) -> DeclarationSummary: + return self.declarations[key] + + @property + def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: + return { + "VAR_GLOBAL": self.declarations + } + + @classmethod + def from_globals( + cls, + decls: tf.GlobalVariableDeclarations, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, + ) -> GlobalVariableSummary: + if source_code is None: + source_code = str(decls) + + summary = GlobalVariableSummary( + name=decls.name or "(unknown)", + item=decls, + source_code=source_code, + type=type(decls).__name__, + filename=filename, + qualified_only="qualified_only" in decls.attribute_pragmas, + **Summary.get_meta_kwargs(decls.meta), + ) + + for decl in decls.items: + summary.declarations.update( + **DeclarationSummary.from_global_variable( + decl, + parent=summary, + block_header="VAR_GLOBAL", + filename=filename, + ) + ) + + return summary + + +@dataclass +class ProgramSummary(Summary): + """Summary representation of a single program.""" + name: str + source_code: str + item: tf.Program + declarations: Dict[str, DeclarationSummary] = field(default_factory=dict) + actions: List[ActionSummary] = field(default_factory=list) + methods: List[MethodSummary] = field(default_factory=list) + properties: List[PropertySummary] = field(default_factory=list) + + def __getitem__(self, key: str) -> DeclarationSummary: + if key in self.declarations: + return self.declarations[key] + for item in self.actions + self.methods + self.properties: + if item.name == key: + return item + raise KeyError(key) + + @property + def declarations_by_block(self) -> Dict[str, Dict[str, DeclarationSummary]]: + result = {} + for decl in self.declarations.values(): + result.setdefault(decl.block, {})[decl.name] = decl + return result + + @classmethod + def from_program( + cls, + program: tf.Program, + source_code: Optional[str] = None, + filename: Optional[pathlib.Path] = None, + ) -> ProgramSummary: + if source_code is None: + source_code = str(program) + + summary = ProgramSummary( + name=program.name, + item=program, + source_code=source_code, + filename=filename, + **Summary.get_meta_kwargs(program.meta), + ) + + for decl in program.declarations: + summary.declarations.update( + DeclarationSummary.from_block(decl, parent=program, filename=filename) + ) + + return summary + + +def path_to_file_and_line(path: List[Summary]) -> List[Tuple[pathlib.Path, int]]: + """Get symbol metadata given a pytmc Symbol.""" + return [(part.filename, part.item.meta.line) for part in path] + @dataclass class CodeSummary: @@ -368,6 +660,8 @@ class CodeSummary: default_factory=dict ) data_types: Dict[str, DataTypeSummary] = field(default_factory=dict) + programs: Dict[str, ProgramSummary] = field(default_factory=dict) + globals: Dict[str, GlobalVariableSummary] = field(default_factory=dict) def __str__(self): return "\n".join( @@ -375,8 +669,100 @@ def __str__(self): for name, fb in self.function_blocks.items() ) + def find(self, name: str) -> Optional[Summary]: + """Find a declaration or other item by its qualified name.""" + path = self.find_path(name) + return path[-1] if path else None + + def find_path(self, name: str) -> Optional[List[Summary]]: + """Given a qualified name, find its Declaration.""" + parts = collections.deque(name.split(".")) + if len(parts) <= 1: + item = self.get_item_by_name(name) + return [item] if item is not None else None + + variable_name = parts.pop() + parent = None + path = [] + while parts: + part = parts.popleft() + if "[" in part: # ] + part = part.split("[")[0] # ] + + try: + if parent is None: + parent = self.get_item_by_name(part) + else: + part_obj = parent[part] + path.append(part_obj) + part_type = str(part_obj.base_type) + parent = self.get_item_by_name(part_type) + except KeyError: + return + + if parent is None: + return + + try: + path.append(parent[variable_name]) + except KeyError: + # Is it better to give a partial path or no path at all? + ... + + return path + + def get_all_items_by_name(self, name: str) -> Generator: + """Get any code item (function, data type, global variable, etc.) by name.""" + for dct in ( + self.globals, + self.programs, + self.functions, + self.function_blocks, + self.data_types, + ): + # Very inefficient, be warned + try: + yield dct[name] + except KeyError: + ... + + def get_item_by_name(self, name: str) -> Optional[Any]: + """Get any code item (function, data type, global variable, etc.) by name.""" + try: + return next(self.get_all_items_by_name(name)) + except StopIteration: + return None + + def append(self, other: CodeSummary, namespace: Optional[str] = None): + """ + In-place add code summary information from another instance. + + New entries take precedence over old ones. + """ + + self.functions.update(other.functions) + self.function_blocks.update(other.function_blocks) + self.data_types.update(other.data_types) + self.globals.update(other.globals) + self.programs.update(other.programs) + + if namespace: + # LCLS_General.GVL_Logger and GVL_Logger are equally valid + for name, item in other.functions.items(): + self.functions[f"{namespace}.{name}"] = item + for name, item in other.function_blocks.items(): + self.function_blocks[f"{namespace}.{name}"] = item + for name, item in other.data_types.items(): + self.data_types[f"{namespace}.{name}"] = item + for name, item in other.globals.items(): + self.globals[f"{namespace}.{name}"] = item + # for name, item in other.programs.items(): + # self.programs[f"{namespace}.{name}"] = item + @staticmethod - def from_source(code: tf.SourceCode) -> CodeSummary: + def from_source( + code: tf.SourceCode, filename: Optional[pathlib.Path] = None + ) -> CodeSummary: result = CodeSummary() code_by_lines = [""] + code.raw_source.splitlines() items = code.items @@ -386,46 +772,93 @@ def get_code_by_meta(meta: Optional[tf.Meta]) -> str: return "" return "\n".join(code_by_lines[meta.line:meta.end_line + 1]) - last_function_block = None + last_parent = None for item in items: if isinstance(item, tf.FunctionBlock): summary = FunctionBlockSummary.from_function_block( item, - source_code=get_code_by_meta(item.meta) + source_code=get_code_by_meta(item.meta), + filename=filename, ) result.function_blocks[item.name] = summary - last_function_block = summary + last_parent = summary elif isinstance(item, tf.Function): summary = FunctionSummary.from_function( item, - source_code=get_code_by_meta(item.meta) + source_code=get_code_by_meta(item.meta), + filename=filename, ) result.functions[item.name] = summary - last_function_block = None + last_parent = None elif isinstance(item, tf.DataTypeDeclaration): if isinstance(item.declaration, tf.StructureTypeDeclaration): summary = DataTypeSummary.from_data_type( item.declaration, - source_code=get_code_by_meta(item.declaration.meta) + source_code=get_code_by_meta(item.declaration.meta), + filename=filename, ) result.data_types[item.declaration.name] = summary - last_function_block = None + last_parent = None elif isinstance(item, tf.Method): - if last_function_block is not None: - last_function_block.methods.append( + if last_parent is not None: + last_parent.methods.append( MethodSummary.from_method( item, - source_code=get_code_by_meta(item.meta) + source_code=get_code_by_meta(item.meta), + filename=filename, ) ) elif isinstance(item, tf.Action): - if last_function_block is not None: - last_function_block.actions.append( + if last_parent is not None: + last_parent.actions.append( ActionSummary.from_action( item, - source_code=get_code_by_meta(item.meta) + source_code=get_code_by_meta(item.meta), + filename=filename, + ) + ) + elif isinstance(item, tf.Property): + if last_parent is not None and isinstance(last_parent, ProgramSummary): + last_parent.properties.append( + PropertySummary.from_property( + item, + source_code=get_code_by_meta(item.meta), + filename=filename, ) ) + elif isinstance(item, tf.GlobalVariableDeclarations): + summary = GlobalVariableSummary.from_globals( + item, + source_code=get_code_by_meta(item.meta), + filename=filename, + ) + result.globals[item.name] = summary + # for global_var in summary.declarations.values(): + # if not qualified_only: + # result.globals[global_var.name] = summary + # result.globals[global_var.qualified_name] = summary + + elif isinstance(item, tf.Program): + summary = ProgramSummary.from_program( + item, + source_code=get_code_by_meta(item.meta), + filename=filename, + ) + result.programs[item.name] = summary + last_parent = summary + + for name, item in list(result.function_blocks.items()): + if item.extends and not item.squashed: + result.function_blocks[name] = item.squash_base_extends( + result.function_blocks + ) + + for name, item in list(result.data_types.items()): + if item.extends and not item.squashed: + result.data_types[name] = item.squash_base_extends( + result.data_types + ) + return result diff --git a/blark/tests/test_parsing.py b/blark/tests/test_parsing.py index bca68cc..153bf08 100644 --- a/blark/tests/test_parsing.py +++ b/blark/tests/test_parsing.py @@ -22,13 +22,15 @@ def pou_filename(request): def test_parsing(pou_filename): try: - ((fn, result),) = list(parse(pou_filename)) + ((_, result),) = list(parse(pou_filename)) except FileNotFoundError: pytest.skip(f"Missing file: {pou_filename}") else: print("transformed:") print(result) print("summary:") + if isinstance(result, Exception): + raise result print(summarize(result)) diff --git a/blark/tests/test_transformer.py b/blark/tests/test_transformer.py index 721a86a..dcd6f75 100644 --- a/blark/tests/test_transformer.py +++ b/blark/tests/test_transformer.py @@ -1,4 +1,5 @@ import pathlib +import sys from typing import Optional import pytest @@ -8,16 +9,41 @@ from ..parse import parse_source_code from .conftest import get_grammar -# try: -# import apischema -# except ImportError: -# # apischema is optional for serialization testing -# apischema = None +try: + import apischema +except ImportError: + # apischema is optional for serialization testing + apischema = None +# TODO: apischema serialization is recursing infinitely on 3.9 and 3.10; +# need to dig into details and report it (first test that fails is ARRAY-related) +APISCHEMA_SKIP = sys.version_info[:2] >= (3, 9) + TEST_PATH = pathlib.Path(__file__).parent +def roundtrip_rule(rule_name: str, value: str, expected: Optional[str] = None): + parser = get_grammar(start=rule_name) + transformed = parse_source_code(value, parser=parser) + print("\n\nTransformed:") + print(repr(transformed)) + print("\n\nOr:") + print(transformed) + if expected is None: + expected = value + assert str(transformed) == expected + + if apischema is not None and not APISCHEMA_SKIP: + serialized = apischema.serialize(transformed) + print("serialized", serialized) + deserialized = apischema.deserialize(type(transformed), serialized) + print("deserialized", deserialized) + assert str(transformed) == str(deserialized) + # assert transformed == deserialized + return transformed + + def test_check_unhandled_rules(grammar): defined_rules = set( rule.origin.name for rule in grammar.rules @@ -48,24 +74,7 @@ def test_check_unhandled_rules(grammar): } - todo_rules = { - # program configuration - "prog_cnxn", - "prog_conf_element", - "prog_conf_elements", - "program_configuration", - "program_var_declarations", - "fb_task", - - # tasks - "configuration_declaration", - "instance_specific_init", - "instance_specific_initializations", - - # resources - "resource_declaration", - "single_resource_declaration", - } + todo_rules = set() aliased = { "boolean_literal", @@ -834,6 +843,17 @@ def test_statement_roundtrip(rule_name, value): """), id="int_with_input", ), + param("function_declaration", tf.multiline_code_block( + """ + FUNCTION FuncName : POINTER TO INT + VAR + iValue : INT := 0; + END_VAR + FuncName := ADR(iValue); + END_FUNCTION + """), + id="int_with_pointer_retval", + ), param("function_declaration", tf.multiline_code_block( """ FUNCTION FuncName : INT @@ -907,27 +927,7 @@ def test_function_roundtrip(rule_name, value): ], ) def test_program_roundtrip(rule_name, value): - _ = roundtrip_rule(rule_name, value) - - -def roundtrip_rule(rule_name: str, value: str, expected: Optional[str] = None): - parser = get_grammar(start=rule_name) - transformed = parse_source_code(value, parser=parser) - print("\n\nTransformed:") - print(repr(transformed)) - print("\n\nOr:") - print(transformed) - if expected is None: - expected = value - assert str(transformed) == expected - - # if apischema is not None: - # serialized = apischema.serialize(transformed) - # print("serialized", serialized) - # deserialized = apischema.deserialize(type(transformed), serialized) - # print("deserialized", deserialized) - # assert transformed == deserialized - return transformed + roundtrip_rule(rule_name, value) @pytest.mark.parametrize( @@ -1057,171 +1057,52 @@ def test_data_type_declaration(rule_name, value): @pytest.mark.parametrize( - "rule_name, value", + "value, init, base_type, full_type", [ - param("access_path", "resource.%IX1.1"), - param("access_path", "resource.prog.func1.func2.Variable"), - param("access_path", "resource.func2.Variable"), + param("fValue : INT;", tf.TypeInitialization, "INT", "INT"), + param("fValue : INT (0..10);", tf.SubrangeTypeInitialization, "INT", "INT (0..10)"), + param("fValue : (A, B);", tf.EnumeratedTypeInitialization, "INT", "INT"), + param("fValue : (A, B) DINT;", tf.EnumeratedTypeInitialization, "DINT", "DINT"), param( - "config_access_declaration", - "AccessName : resource.%IX1.1 : TypeName READ_ONLY" + "fValue : ARRAY [1..10] OF INT;", + tf.ArrayTypeInitialization, + "INT", + "ARRAY [1..10] OF INT", ), param( - "config_access_declaration", - "AccessName : resource.%IX1.1 : TypeName" + "fValue : FB_Test(1, 2, 3);", + tf.InitializedStructure, + "FB_Test", + "FB_Test", + marks=pytest.mark.xfail(reason="Overlap with function block invocation") ), param( - "config_access_declaration", - "AccessName : resource.Variable : TypeName READ_WRITE" + "fValue : FB_Test(A := 1, B := 2, C => 3);", + tf.FunctionBlockInvocation, + "FB_Test", + "FB_Test", ), - param("config_access_declarations", tf.multiline_code_block( - """ - (* This is an access block *) - VAR_ACCESS - (* Access 1 *) - AccessName1 : resource.Variable : TypeName READ_WRITE; - (* Access 2 *) - AccessName2 : resource.Variable : TypeName READ_ONLY; - (* Access 3 *) - AccessName3 : resource.Variable : TypeName; - (* Access 4 *) - AccessName4 : resource.%IX1.1 : TypeName; - END_VAR - """ - )), - param("task_initialization", "(SINGLE := 1, INTERVAL := 2, PRIORITY := 3)"), - param("task_initialization", "(INTERVAL := 2, PRIORITY := 3)"), - param("task_initialization", "(SINGLE := 1, PRIORITY := 3)"), - param("task_initialization", "(PRIORITY := 3)"), - param("task_initialization", "(SINGLE := abc.def, PRIORITY := 3)"), - param("task_configuration", "TASK taskname (PRIORITY := 3)"), - ], -) -def test_config_roundtrip(rule_name, value): - roundtrip_rule(rule_name, value) - -@pytest.mark.parametrize( - "rule_name, value", - [ - param("function_declaration", tf.multiline_code_block( - """ - FUNCTION ILTest : INT - LD Speed - GT 2000 - JMPCN VOLTS_OK - LD Volts - VOLTS_OK: LD 1 - ST %QX75 - END_FUNCTION - """ - )), - param("function_declaration", tf.multiline_code_block( - """ - FUNCTION ILTest : INT - LD LoadVar - ST toninstance.IN - CAL fb(1, 2, 3) - CAL toninstance( - PT := t1, - ET => tOut2 - ) - LD toninst1.Q - JMPC labelname - ST otherton.IN - labelname: LD iVar2 - SUB 100 - END_FUNCTION - """ - )), - # I don't understand IL well enough, but this is apparently valid - # grammar - param("function_declaration", tf.multiline_code_block( - """ - FUNCTION ILTest : INT - ADD(iOperand - LD test - ST test1 - ) - ADD(iOperand) - end: RET - END_FUNCTION - """ - )), - param("function_declaration", tf.multiline_code_block( - """ - // Comments 0 - FUNCTION ILTest : INT - // Comments 1 - ADD(iOperand - LD test - ST test1 - ) - // Comments 2 - ADD(iOperand) - // Comments 3 - end: RET - END_FUNCTION - """ - )), + # Aliased by TypeInitialization, it has been removed from the grammar: + # param("fValue : fbName;", lark.Token, "fbName", "fbName"), + # Aliased by TypeInitialization: + param( + "fValue : STRING[10] := 'abc';", + tf.StringTypeSpecification, + "STRING", + "STRING[10]", + marks=pytest.mark.xfail(reason="Overlap with TypeInitialization") + ), ] ) -def test_instruction_list(rule_name, value): - roundtrip_rule(rule_name, value) +def test_global_types(value, init, base_type, full_type): + parser = get_grammar(start="global_var_decl") + transformed = parse_source_code(value, parser=parser) + assert isinstance(transformed, tf.GlobalVariableDeclaration) + assert transformed.variables == ["fValue"] + assert isinstance(transformed.init, init) -@pytest.mark.parametrize( - "rule_name, value", - [ - param("action_qualifier", "N"), - param("action_qualifier", "D, Variable"), - param("action_qualifier", "D, TIME#1D"), - param("action_association", "ActionName()"), - param("action_association", "ActionName(N)"), - param("action_association", "ActionName(D, TIME#1D)"), - param("action_association", "ActionName(D, TIME#1D, IndicatorName)"), - param("action_association", "ActionName(D, TIME#1D, Name1, Name2^)"), - param("sfc_initial_step", tf.multiline_code_block( - """ - INITIAL_STEP StepName : - END_STEP - """ - )), - param("sfc_step", tf.multiline_code_block( - """ - STEP StepName : - END_STEP - """ - )), - param("sfc_step", tf.multiline_code_block( - """ - STEP StepName : - iValue := iValue + 1; - END_STEP - """ - )), - param("sfc_step", tf.multiline_code_block( - """ - STEP StepName : - ActionName(D, TIME#1D, Name1, Name2^) - END_STEP - """ - )), - param("sfc_transition", tf.multiline_code_block( - """ - TRANSITION TransitionName - FROM StepName1 TO StepName2 := 1 - END_TRANSITION - """ - )), - param("sfc_transition", tf.multiline_code_block( - """ - TRANSITION TransitionName - FROM (StepName1, StepName2) TO StepName3 := 1 - END_TRANSITION - """ - )), - ] -) -def test_sfc_sequential_function_chart(rule_name, value): - roundtrip_rule(rule_name, value) + assert transformed.spec.variables + assert transformed.base_type_name == base_type + assert transformed.full_type_name == full_type diff --git a/blark/transform.py b/blark/transform.py index 1089ac1..43bf11e 100644 --- a/blark/transform.py +++ b/blark/transform.py @@ -9,16 +9,31 @@ from dataclasses import dataclass, fields, is_dataclass from enum import Enum from typing import (Any, Callable, ClassVar, Dict, Generator, List, Optional, - Tuple, Type, TypeVar, Union) + Set, Tuple, Type, TypeVar, Union) import lark +T = TypeVar("T") + +try: + # NOTE: apischema is an optional requirement; this should work regardless. + import apischema + + from .apischema_compat import as_tagged_union +except ImportError: + apischema = None + + def as_tagged_union(cls: Type[T]) -> Type[T]: + """No-operation stand-in for when apischema is not available.""" + return cls + + _rule_to_class: Dict[str, type] = {} _class_handlers = {} _comment_consumers = [] -T = TypeVar("T") INDENT = " " # TODO: make it configurable +StringOrToken = Union[str, lark.Token] def multiline_code_block(block: str) -> str: @@ -78,7 +93,7 @@ def instantiator(*args) -> T: def _rule_handler( - *rules: Union[str, List[str]], + *rules: str, comments: bool = False ) -> Callable[[Type[T]], Type[T]]: """Decorator - the wrapped class will handle the provided rules.""" @@ -167,10 +182,50 @@ def meta_field(): return dataclasses.field(default=None, repr=False, compare=False) +class _FlagHelper: + @classmethod + def from_lark(cls, token: lark.Token, *tokens: lark.Token): + result = cls[token.lower()] + for token in tokens: + result |= cls[token.lower()] + return result + + def __str__(self): + return " ".join( + option.name.upper() + for option in type(self) + if option in self + ) + + +@_rule_handler("variable_attributes") +class VariableAttributes(_FlagHelper, enum.Flag): + constant = 0b0000_0001 + retain = 0b0000_0010 + non_retain = 0b0000_0100 + persistent = 0b0000_1000 + + +@_rule_handler( + "method_access", + "property_access", +) +class MethodAccess(_FlagHelper, enum.Flag): + public = 0b0000_0001 + private = 0b0000_0010 + abstract = 0b0000_0100 + protected = 0b0000_1000 + internal = 0b0001_0000 + final = 0b010_0000 + + +@dataclass +@as_tagged_union class Expression: ... +@as_tagged_union class Literal(Expression): """Literal value.""" value: Any # Type specified in subclass @@ -438,37 +493,11 @@ class String(Literal): meta: Optional[Meta] = meta_field() +@as_tagged_union class Variable(Expression): ... -@_rule_handler( - "method_access", - "property_access", -) -class MethodAccess(enum.Flag): - public = enum.auto() - private = enum.auto() - abstract = enum.auto() - protected = enum.auto() - internal = enum.auto() - final = enum.auto() - - @staticmethod - def from_lark(token: lark.Token, *tokens: lark.Token) -> MethodAccess: - result = MethodAccess[token.lower()] - for token in tokens: - result |= MethodAccess[token.lower()] - return result - - def __str__(self): - return " ".join( - option.name.upper() - for option in MethodAccess - if option in self - ) - - @_rule_handler( "indirection_type", "pointer_type", @@ -561,7 +590,7 @@ def __str__(self) -> str: @_rule_handler("location") class Location(DirectVariable): @staticmethod - def from_lark(var: DirectVariable): + def from_lark(var: DirectVariable) -> Location: return Location( location_prefix=var.location_prefix, location=var.location, @@ -575,15 +604,9 @@ def __str__(self) -> str: return f"AT {direct_loc}" -class SymbolicVariable(Variable): - name: lark.Token - dereferenced: bool - meta: Optional[Meta] - - @dataclass @_rule_handler("variable_name") -class SimpleVariable(SymbolicVariable): +class SimpleVariable(Variable): name: lark.Token dereferenced: bool meta: Optional[Meta] = meta_field() @@ -641,19 +664,17 @@ def __str__(self) -> str: @dataclass @_rule_handler("multi_element_variable") -class MultiElementVariable(SymbolicVariable): - name: lark.Token +class MultiElementVariable(Variable): + name: SymbolicVariable dereferenced: bool elements: List[Union[SubscriptList, FieldSelector]] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(variable_name, *subscript_or_field): - if not subscript_or_field: - return SymbolicVariable( - name=variable_name, - dereferenced=False - ) + def from_lark( + variable_name: SymbolicVariable, + *subscript_or_field: Union[SubscriptList, FieldSelector] + ) -> MultiElementVariable: return MultiElementVariable( name=variable_name, elements=list(subscript_or_field), @@ -664,17 +685,29 @@ def __str__(self) -> str: return "".join(str(part) for part in (self.name, *self.elements)) +SymbolicVariable = Union[SimpleVariable, MultiElementVariable] + + @dataclass @_rule_handler("simple_spec_init") class TypeInitialization: indirection: Optional[IndirectionType] - spec: Optional[lark.Token] + spec: SimpleSpecification value: Optional[Expression] meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.spec.type + + @property + def full_type_name(self) -> str: + """The full type name.""" + return join_if(self.indirection, " ", self.spec.type) + def __str__(self) -> str: - type_ = join_if(self.indirection, " ", self.spec) - return join_if(type_, " := ", self.value) + return join_if(self.full_type_name, " := ", self.value) class Declaration: @@ -710,7 +743,7 @@ class StringTypeDeclaration: name: lark.Token string_type: lark.Token length: Optional[lark.Token] - value: lark.Token + value: Optional[String] meta: Optional[Meta] = meta_field() def __str__(self) -> str: @@ -727,9 +760,19 @@ class StringTypeSpecification: length: Optional[lark.Token] = None meta: Optional[Meta] = meta_field() - def __str__(self) -> str: + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.type_name + + @property + def full_type_name(self) -> str: + """The full type name.""" return join_if(self.type_name, "", self.length) + def __str__(self) -> str: + return self.full_type_name + @dataclass @_rule_handler( @@ -741,6 +784,16 @@ class StringTypeInitialization: value: Optional[lark.Token] meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.spec.base_type_name + + @property + def full_type_name(self) -> str: + """The full type name.""" + return self.spec.full_type_name + @staticmethod def from_lark( *args: lark.Token, @@ -762,6 +815,8 @@ def __str__(self) -> str: return join_if(self.spec, " := ", self.value) +@dataclass +@as_tagged_union class Subrange: ... @@ -792,11 +847,21 @@ class SubrangeSpecification: subrange: Optional[Subrange] = None meta: Optional[Meta] = meta_field() - def __str__(self) -> str: + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.type_name + + @property + def full_type_name(self) -> str: + """The full type name.""" if self.subrange: return f"{self.type_name} ({self.subrange})" return f"{self.type_name}" + def __str__(self) -> str: + return self.full_type_name + @dataclass @_rule_handler("subrange_spec_init") @@ -806,6 +871,16 @@ class SubrangeTypeInitialization: value: Optional[Expression] = None meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.spec.base_type_name + + @property + def full_type_name(self) -> str: + """The full type name.""" + return self.spec.full_type_name + def __str__(self) -> str: spec = join_if(self.indirection, " ", self.spec) if not self.value: @@ -841,10 +916,21 @@ def __str__(self) -> str: @dataclass @_rule_handler("enumerated_specification") class EnumeratedSpecification: + _implicit_type_default_: ClassVar[str] = "INT" type_name: Optional[lark.Token] values: Optional[List[EnumeratedValue]] = None meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> Union[lark.Token, str]: + """The full type name.""" + return self.type_name or self._implicit_type_default_ + + @property + def full_type_name(self) -> Union[lark.Token, str]: + """The full type name.""" + return self.base_type_name + @staticmethod def from_lark(*args): if len(args) == 1: @@ -865,9 +951,19 @@ def __str__(self) -> str: class EnumeratedTypeInitialization: indirection: Optional[IndirectionType] spec: EnumeratedSpecification - value: Optional[Expression] + value: Optional[EnumeratedValue] meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> Union[lark.Token, str]: + """The base type name.""" + return self.spec.base_type_name + + @property + def full_type_name(self) -> Union[lark.Token, str]: + """The full type name.""" + return self.spec.full_type_name + def __str__(self) -> str: spec = join_if(self.indirection, " ", self.spec) return join_if(spec, " := ", self.value) @@ -897,6 +993,27 @@ def __str__(self) -> str: return f"{self.type_name}" +@dataclass +@_rule_handler("simple_specification") +class SimpleSpecification: + type: lark.Token + meta: Optional[Meta] = meta_field() + + def __str__(self) -> str: + return str(self.type) + + +@dataclass +@_rule_handler("indirect_simple_specification") +class IndirectSimpleSpecification: + indirection: Optional[IndirectionType] + type: SimpleSpecification + meta: Optional[Meta] = meta_field() + + def __str__(self) -> str: + return join_if(self.indirection, " ", self.type) + + @dataclass @_rule_handler("array_specification") class ArraySpecification: @@ -904,9 +1021,24 @@ class ArraySpecification: subranges: List[Subrange] meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.type.type_name + + @property + def full_type_name(self) -> str: + """The full type name.""" + return str(self) + @staticmethod def from_lark(*args): *subranges, type = args + if isinstance(type, lark.Token): + type = DataType( + indirection=None, + type_name=type, + ) return ArraySpecification(type=type, subranges=subranges) def __str__(self) -> str: @@ -945,7 +1077,7 @@ def __str__(self) -> str: @dataclass @_rule_handler("array_initialization") class ArrayInitialization: - elements: List[ArrayInitialElement] + elements: List[Union[ArrayInitialElement, ArrayInitialElementCount]] meta: Optional[Meta] = meta_field() @staticmethod @@ -965,6 +1097,16 @@ class ArrayTypeInitialization: value: Optional[ArrayInitialization] meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.spec.base_type_name + + @property + def full_type_name(self) -> str: + """The full type name.""" + return self.spec.full_type_name + def __str__(self) -> str: if self.indirection: spec = f"{self.indirection} {self.spec}" @@ -992,7 +1134,7 @@ def __str__(self) -> str: @_rule_handler("structure_type_declaration", comments=True) class StructureTypeDeclaration: name: lark.Token - extends: Optional[lark.Token] + extends: Optional[Extends] indirection: Optional[IndirectionType] declarations: List[StructureElementDeclaration] meta: Optional[Meta] = meta_field() @@ -1000,7 +1142,7 @@ class StructureTypeDeclaration: @staticmethod def from_lark( name: lark.Token, - extends: Optional[lark.Token], + extends: Optional[Extends], indirection: Optional[IndirectionType], *declarations: StructureElementDeclaration, ): @@ -1040,6 +1182,7 @@ class StructureElementDeclaration: TypeInitialization, SubrangeTypeInitialization, EnumeratedTypeInitialization, + InitializedStructure, ] meta: Optional[Meta] = meta_field() @@ -1055,7 +1198,7 @@ def __str__(self) -> str: UnionElementSpecification = Union[ ArraySpecification, - lark.Token, # simple_specification + SimpleSpecification, SubrangeSpecification, EnumeratedSpecification, ] @@ -1119,6 +1262,21 @@ class InitializedStructure: init: StructureInitialization meta: Optional[Meta] = meta_field() + @property + def value(self) -> str: + """The initialization value (call).""" + return str(self.init) + + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.name + + @property + def full_type_name(self) -> lark.Token: + """The full type name.""" + return self.name + def __str__(self) -> str: return f"{self.name} := {self.init}" @@ -1273,7 +1431,7 @@ class FunctionCall(Expression): @staticmethod def from_lark( name: SymbolicVariable, - *parameters: ParameterAssignment + *parameters: ParameterAssignment, ) -> FunctionCall: return FunctionCall( name=name, @@ -1307,9 +1465,32 @@ def __str__(self) -> str: return join_if(self.variable, " ", self.location) +@dataclass +class _GenericInit: + """API compat to give a valid init attribute.""" + # TODO: can we restructure this to be less confusing? + base_type_name: str + full_type_name: str + repr: str + value: Optional[str] + + def __str__(self) -> str: + return str(self.repr) + + +InitDeclarationType = Union[ + TypeInitialization, + SubrangeTypeInitialization, + EnumeratedTypeInitialization, + ArrayTypeInitialization, + InitializedStructure, + _GenericInit, # StringVariableInitDeclaration, EdgeDeclaration +] + + class InitDeclaration: variables: List[DeclaredVariable] - init: Any + init: InitDeclarationType meta: Optional[Meta] def __str__(self) -> str: @@ -1351,6 +1532,7 @@ class StringVariableInitDeclaration(InitDeclaration): variables: List[DeclaredVariable] spec: StringTypeSpecification value: Optional[lark.Token] + init: _GenericInit meta: Optional[Meta] = meta_field() @staticmethod @@ -1359,12 +1541,14 @@ def from_lark(variables: List[DeclaredVariable], string_info: StringTypeInitiali variables=variables, spec=string_info.spec, value=string_info.value, + init=_GenericInit( + base_type_name=str(string_info.spec.base_type_name), + full_type_name=str(string_info.spec.full_type_name), + value=str(string_info.value), + repr=join_if(string_info.spec, " := ", string_info.value), + ) ) - @property - def init(self) -> str: - return join_if(self.spec, " := ", self.value) - @dataclass @_rule_handler("edge_declaration", comments=True) @@ -1373,11 +1557,21 @@ class EdgeDeclaration(InitDeclaration): edge: lark.Token meta: Optional[Meta] = meta_field() - @property - def init(self) -> str: - return f"BOOL {self.edge}" + def __post_init__(self): + full_type_name = f"BOOL {self.edge}" + self.init = _GenericInit( + base_type_name="BOOL", + full_type_name=full_type_name, + value=None, + repr=full_type_name, + ) + + def __str__(self): + variables = ", ".join(str(variable) for variable in self.variables) + return f"{variables} : {self.init.full_type_name}" +@as_tagged_union class FunctionBlockDeclaration: ... @@ -1385,7 +1579,7 @@ class FunctionBlockDeclaration: @dataclass @_rule_handler("fb_name_decl", comments=True) class FunctionBlockNameDeclaration(FunctionBlockDeclaration): - variables: List[lark.Token] + variables: List[lark.Token] # fb_decl_name_list -> fb_name spec: lark.Token init: Optional[StructureInitialization] = None meta: Optional[Meta] = meta_field() @@ -1408,21 +1602,26 @@ def __str__(self) -> str: return f"{variables} : {self.init}" +@as_tagged_union +class ParameterAssignment: + ... + + @dataclass @_rule_handler("param_assignment") -class ParameterAssignment: - name: Optional[lark.Token] +class InputParameterAssignment(ParameterAssignment): + name: Optional[SimpleVariable] value: Optional[Expression] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(*args) -> ParameterAssignment: + def from_lark(*args) -> InputParameterAssignment: if len(args) == 1: value, = args name = None else: name, value = args - return ParameterAssignment(name, value) + return InputParameterAssignment(name, value) def __str__(self) -> str: return join_if(self.name, " := ", self.value) @@ -1431,13 +1630,15 @@ def __str__(self) -> str: @dataclass @_rule_handler("output_parameter_assignment") class OutputParameterAssignment(ParameterAssignment): + name: SimpleVariable + value: Optional[Expression] inverted: bool = False meta: Optional[Meta] = meta_field() @staticmethod def from_lark( inverted: Optional[lark.Token], - name: lark.Token, + name: SimpleVariable, value: Expression, ) -> OutputParameterAssignment: return OutputParameterAssignment( @@ -1452,14 +1653,29 @@ def __str__(self) -> str: @dataclass @_rule_handler("fb_invocation") class FunctionBlockInvocation: - name: lark.Token + name: SymbolicVariable parameters: List[ParameterAssignment] meta: Optional[Meta] = meta_field() + @property + def base_type_name(self) -> lark.Token: + """The base type name.""" + return self.name.name + + @property + def full_type_name(self) -> str: + """The full type name.""" + return str(self.name) + + @property + def value(self) -> str: + """The initialization value (call).""" + return str(self) + @staticmethod def from_lark( - name: lark.Token, - *parameters: ParameterAssignment + name: SymbolicVariable, + *parameters: ParameterAssignment, ) -> FunctionBlockInvocation: return FunctionBlockInvocation( name=name, @@ -1471,22 +1687,27 @@ def __str__(self) -> str: return f"{self.name}({parameters})" +AnyLocation = Union[Location, IncompleteLocation] + + @dataclass @_rule_handler("global_var_spec") class GlobalVariableSpec: variables: List[lark.Token] - location: Optional[Union[Location, IncompleteLocation]] + location: Optional[AnyLocation] meta: Optional[Meta] = meta_field() @staticmethod def from_lark( name_or_names: Union[lark.Token, lark.Tree], - location: Optional[Union[Location, IncompleteLocation]] = None + location: Optional[AnyLocation] = None ) -> GlobalVariableSpec: if location is None: + # Multiple variables without a location name_tree = typing.cast(lark.Tree, name_or_names) variables = typing.cast(List[lark.Token], name_tree.children) else: + # Only one variable allowed with a location variables = typing.cast(List[lark.Token], [name_or_names]) return GlobalVariableSpec(variables=variables, location=location) @@ -1510,13 +1731,29 @@ def __str__(self) -> str: @_rule_handler("global_var_decl", comments=True) class GlobalVariableDeclaration: spec: GlobalVariableSpec - init: Union[ - LocatedVariableSpecInit, - FunctionBlockInvocation, - lark.Token # FB type name - ] + init: Union[LocatedVariableSpecInit, FunctionBlockInvocation] meta: Optional[Meta] = meta_field() + @property + def variables(self) -> List[lark.Token]: + """The variable names contained.""" + return self.spec.variables + + @property + def location(self) -> Optional[AnyLocation]: + """The (optional) variable location.""" + return self.spec.location + + @property + def base_type_name(self) -> Union[str, lark.Token]: + """The base type name of the variable(s).""" + return self.init.base_type_name + + @property + def full_type_name(self) -> Union[str, lark.Token]: + """The full type name of the variable(s).""" + return self.init.full_type_name + def __str__(self) -> str: return f"{self.spec} : {self.init}" @@ -1547,24 +1784,6 @@ def __str__(self) -> str: return "IMPLEMENTS " + ", ".join(self.interfaces) -@dataclass -@_rule_handler( - "function_block_body", - "function_body", - comments=True -) -class FunctionBody: - source: Union[ - StatementList, - InstructionList, - SequentialFunctionChart, - ] - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - return str(self.source) - - @dataclass @_rule_handler("function_block_type_declaration", comments=True) class FunctionBlock: @@ -1573,7 +1792,7 @@ class FunctionBlock: extends: Optional[Extends] implements: Optional[Implements] declarations: List[VariableDeclarationBlock] - body: Optional[FunctionBody] + body: Optional[FunctionBlockBody] meta: Optional[Meta] = meta_field() @staticmethod @@ -1616,7 +1835,7 @@ def __str__(self) -> str: @_rule_handler("function_declaration", comments=True) class Function: name: lark.Token - return_type: Optional[lark.Token] + return_type: Optional[Union[SimpleSpecification, IndirectSimpleSpecification]] declarations: List[VariableDeclarationBlock] body: Optional[FunctionBody] meta: Optional[Meta] = meta_field() @@ -1624,7 +1843,7 @@ class Function: @staticmethod def from_lark( name: lark.Token, - return_type: Optional[lark.Token], + return_type: Optional[Union[SimpleSpecification, IndirectSimpleSpecification]], *remainder ) -> Function: *declarations, body = remainder @@ -1660,23 +1879,6 @@ class Program: body: Optional[FunctionBody] meta: Optional[Meta] = meta_field() - @staticmethod - def from_lark( - name: lark.Token, - declarations_tree: Optional[lark.Tree], - body: Optional[FunctionBody] - ) -> Program: - declarations = typing.cast( - List[VariableDeclarationBlock], - declarations_tree.children - if declarations_tree else [] - ) - return Program( - name=name, - declarations=declarations, - body=body, - ) - def __str__(self) -> str: return "\n".join( s for s in ( @@ -1689,15 +1891,9 @@ def __str__(self) -> str: ) -class Action: - name: Union[str, lark.Token] - body: Optional[FunctionBody] - meta: Optional[Meta] - - @dataclass @_rule_handler("action", comments=True) -class NamedAction(Action): +class Action: name: lark.Token body: Optional[FunctionBody] meta: Optional[Meta] = meta_field() @@ -1714,50 +1910,6 @@ def __str__(self) -> str: ) -@dataclass -@_rule_handler("entry_action", comments=True) -class EntryAction(Action): - body: Optional[FunctionBody] - meta: Optional[Meta] = meta_field() - - @property - def name(self) -> str: - return "ENTRY_ACTION" - - def __str__(self) -> str: - return "\n".join( - line for line in - ( - "ENTRY_ACTION", - indent_if(self.body), - "END_ACTION", - ) - if line is not None - ) - - -@dataclass -@_rule_handler("exit_action", comments=True) -class ExitAction(Action): - body: Optional[FunctionBody] - meta: Optional[Meta] = meta_field() - - @property - def name(self) -> str: - return "EXIT_ACTION" - - def __str__(self) -> str: - return "\n".join( - line for line in - ( - "EXIT_ACTION", - indent_if(self.body), - "END_ACTION", - ) - if line is not None - ) - - @dataclass @_rule_handler("function_block_method_declaration", comments=True) class Method: @@ -1846,6 +1998,7 @@ def __str__(self) -> str: VariableOneInitDeclaration, FunctionBlockDeclaration, EdgeDeclaration, + StructuredVariableInitDeclaration, ] InputOutputDeclaration = VariableInitDeclaration @@ -1861,6 +2014,7 @@ def __str__(self) -> str: ] +@as_tagged_union class VariableDeclarationBlock: block_header: ClassVar[str] = "VAR" items: List[Any] @@ -1871,19 +2025,21 @@ class VariableDeclarationBlock: @_rule_handler("var_declarations", comments=True) class VariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR" - config: Optional[lark.Token] + attrs: Optional[VariableAttributes] items: List[VariableInitDeclaration] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(config: Optional[lark.Token], tree: lark.Tree) -> VariableDeclarations: + def from_lark( + attrs: Optional[VariableAttributes], tree: lark.Tree + ) -> VariableDeclarations: items = typing.cast(List[VariableInitDeclaration], tree.children) - return VariableDeclarations(config=config, items=items) + return VariableDeclarations(attrs=attrs, items=items) def __str__(self) -> str: return "\n".join( ( - join_if("VAR", " ", self.config), + join_if("VAR", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -1894,17 +2050,20 @@ def __str__(self) -> str: @_rule_handler("static_var_declarations", comments=True) class StaticDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_STAT" + attrs: Optional[VariableAttributes] items: List[VariableInitDeclaration] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(*items: VariableInitDeclaration) -> StaticDeclarations: - return StaticDeclarations(list(items)) + def from_lark( + attrs: Optional[VariableAttributes], *items: VariableInitDeclaration + ) -> StaticDeclarations: + return StaticDeclarations(attrs, list(items)) def __str__(self) -> str: return "\n".join( ( - "VAR_STAT", + join_if("VAR_STAT", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -1938,13 +2097,17 @@ def __str__(self) -> str: @_rule_handler("var_inst_declaration", comments=True) class MethodInstanceVariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_INST" + attrs: Optional[VariableAttributes] items: List[VariableInitDeclaration] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(items: lark.Tree) -> MethodInstanceVariableDeclarations: + def from_lark( + attrs: Optional[VariableAttributes], items: lark.Tree + ) -> MethodInstanceVariableDeclarations: return MethodInstanceVariableDeclarations( - typing.cast( + attrs=attrs, + items=typing.cast( List[VariableInitDeclaration], items.children ) @@ -1963,7 +2126,7 @@ def __str__(self) -> str: @dataclass @_rule_handler("located_var_decl", comments=True) class LocatedVariableDeclaration: - name: Optional[lark.Token] + name: Optional[SimpleVariable] location: Location init: LocatedVariableSpecInit meta: Optional[Meta] = meta_field() @@ -1977,31 +2140,24 @@ def __str__(self) -> str: @_rule_handler("located_var_declarations", comments=True) class LocatedVariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR" - config: Optional[lark.Token] - persistent: bool + attrs: Optional[VariableAttributes] items: List[LocatedVariableDeclaration] meta: Optional[Meta] = meta_field() @staticmethod def from_lark( - config: Optional[lark.Token], - persistent: Optional[lark.Token], + attrs: Optional[VariableAttributes], *items: LocatedVariableDeclaration, ) -> LocatedVariableDeclarations: return LocatedVariableDeclarations( - config=config, - persistent=persistent is not None, + attrs=attrs, items=list(items), ) def __str__(self) -> str: return "\n".join( ( - join_if( - join_if("VAR", " ", self.config), - " ", - self.persistent and "PERSISTENT" or None - ), + join_if("VAR", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2009,6 +2165,7 @@ def __str__(self) -> str: IncompleteLocatedVariableSpecInit = Union[ + SimpleSpecification, TypeInitialization, SubrangeTypeInitialization, EnumeratedTypeInitialization, @@ -2021,7 +2178,7 @@ def __str__(self) -> str: @dataclass @_rule_handler("incomplete_located_var_decl", comments=True) class IncompleteLocatedVariableDeclaration: - name: lark.Token + name: SimpleVariable location: IncompleteLocation init: IncompleteLocatedVariableSpecInit meta: Optional[Meta] = meta_field() @@ -2035,24 +2192,24 @@ def __str__(self) -> str: @_rule_handler("incomplete_located_var_declarations", comments=True) class IncompleteLocatedVariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR" - retain: bool + attrs: Optional[VariableAttributes] items: List[IncompleteLocatedVariableDeclaration] meta: Optional[Meta] = meta_field() @staticmethod def from_lark( - retain: Optional[lark.Token], + attrs: Optional[VariableAttributes], *items: IncompleteLocatedVariableDeclaration, ) -> IncompleteLocatedVariableDeclarations: return IncompleteLocatedVariableDeclarations( - retain=retain is not None, + attrs=attrs, items=list(items), ) def __str__(self) -> str: return "\n".join( ( - join_if("VAR", " ", "RETAIN" if self.retain else None), + join_if("VAR", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2064,6 +2221,7 @@ def __str__(self) -> str: class ExternalVariableDeclaration: name: lark.Token spec: Union[ + SimpleSpecification, lark.Token, # SIMPLE_SPECIFICATION / STRUCTURE_TYPE_NAME / FUNCTION_BLOCK_TYPE_NAME SubrangeSpecification, EnumeratedSpecification, @@ -2079,24 +2237,24 @@ def __str__(self) -> str: @_rule_handler("external_var_declarations", comments=True) class ExternalVariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_EXTERNAL" - constant: bool + attrs: Optional[VariableAttributes] items: List[ExternalVariableDeclaration] meta: Optional[Meta] = meta_field() @staticmethod def from_lark( - constant: Optional[lark.Token], + attrs: Optional[VariableAttributes], *items: ExternalVariableDeclaration, ) -> ExternalVariableDeclarations: return ExternalVariableDeclarations( - constant=constant is not None, + attrs=attrs, items=list(items), ) def __str__(self) -> str: return "\n".join( ( - join_if("VAR_EXTERNAL", " ", self.constant and "CONSTANT" or None), + join_if("VAR_EXTERNAL", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2107,18 +2265,20 @@ def __str__(self) -> str: @_rule_handler("input_declarations", comments=True) class InputDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_INPUT" - retain: Optional[lark.Token] + attrs: Optional[VariableAttributes] items: List[InputDeclaration] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(retain: Optional[lark.Token], *items: InputDeclaration) -> InputDeclarations: - return InputDeclarations(retain, list(items) if items else []) + def from_lark( + attrs: Optional[VariableAttributes], *items: InputDeclaration + ) -> InputDeclarations: + return InputDeclarations(attrs, list(items) if items else []) def __str__(self) -> str: return "\n".join( ( - join_if("VAR_INPUT", " ", self.retain), + join_if("VAR_INPUT", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2129,20 +2289,22 @@ def __str__(self) -> str: @_rule_handler("output_declarations", comments=True) class OutputDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_OUTPUT" - retain: Optional[lark.Token] + attrs: Optional[VariableAttributes] items: List[OutputDeclaration] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(retain: Optional[lark.Token], items: lark.Tree) -> OutputDeclarations: + def from_lark( + attrs: Optional[VariableAttributes], items: lark.Tree + ) -> OutputDeclarations: return OutputDeclarations( - retain, typing.cast(List[OutputDeclaration], items.children) + attrs, typing.cast(List[OutputDeclaration], items.children) ) def __str__(self) -> str: return "\n".join( ( - join_if("VAR_OUTPUT", " ", self.retain), + join_if("VAR_OUTPUT", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2153,19 +2315,23 @@ def __str__(self) -> str: @_rule_handler("input_output_declarations", comments=True) class InputOutputDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_IN_OUT" + attrs: Optional[VariableAttributes] items: List[InputOutputDeclaration] meta: Optional[Meta] = meta_field() @staticmethod - def from_lark(items: lark.Tree) -> InputOutputDeclarations: + def from_lark( + attrs: Optional[VariableAttributes], items: lark.Tree + ) -> InputOutputDeclarations: return InputOutputDeclarations( + attrs, typing.cast(List[InputOutputDeclaration], items.children) ) def __str__(self) -> str: return "\n".join( ( - "VAR_IN_OUT", + join_if("VAR_IN_OUT", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2193,25 +2359,25 @@ def __str__(self) -> str: @_rule_handler("function_var_declarations", comments=True) class FunctionVariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR" - constant: bool + attrs: Optional[VariableAttributes] items: List[VariableInitDeclaration] meta: Optional[Meta] = meta_field() @staticmethod def from_lark( - constant: Optional[lark.Token], + attrs: Optional[VariableAttributes], body: lark.Tree, ) -> FunctionVariableDeclarations: items = typing.cast(List[VariableInitDeclaration], body.children) return FunctionVariableDeclarations( - constant=constant is not None, + attrs=attrs, items=items, ) def __str__(self) -> str: return "\n".join( ( - ("VAR CONSTANT" if self.constant else "VAR"), + join_if("VAR", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) @@ -2243,46 +2409,68 @@ def __str__(self) -> str: @_rule_handler("global_var_declarations", comments=True) class GlobalVariableDeclarations(VariableDeclarationBlock): block_header: ClassVar[str] = "VAR_GLOBAL" - constant: bool - retain: bool - persistent: bool + attrs: Optional[VariableAttributes] items: List[GlobalVariableDeclaration] meta: Optional[Meta] = meta_field() + name: Optional[str] = None @staticmethod def from_lark( - const_or_retain: Optional[lark.Token], - persistent: Optional[lark.Token], + attrs: Optional[VariableAttributes], *items: GlobalVariableDeclaration ) -> GlobalVariableDeclarations: return GlobalVariableDeclarations( - constant=str(const_or_retain).lower() == "constant", - retain=str(const_or_retain).lower() == "retain", - persistent=persistent is not None, + name=None, # This isn't in the code; set later + attrs=attrs, items=list(items) ) + @property + def attribute_pragmas(self) -> Set[str]: + """Attribute pragmas.""" + if self.meta is None: + return set() + + _, pragmas = self.meta.get_comments_and_pragmas() + attributes = set() + for pragma in pragmas: + # TODO: better pragma parsing; it's its own grammar + if pragma.startswith("{attribute "): # } + attributes.add(pragma.split(" ")[1].strip(" }'")) + return attributes + def __str__(self) -> str: - options = [] - if self.constant: - options.append("CONSTANT") - if self.retain: - options.append("RETAIN") - if self.persistent: - options.append("PERSISTENT") return "\n".join( ( - join_if("VAR_GLOBAL", " ", " ".join(options) if options else None), + join_if("VAR_GLOBAL", " ", self.attrs), *(indent(f"{item};") for item in self.items), "END_VAR", ) ) +@as_tagged_union class Statement: ... +@_rule_handler("fb_invocation_statement", comments=True) +class FunctionBlockInvocationStatement(Statement, FunctionBlockInvocation): + @staticmethod + def from_lark( + invocation: FunctionBlockInvocation, + ) -> FunctionBlockInvocationStatement: + return FunctionBlockInvocationStatement( + name=invocation.name, + parameters=invocation.parameters, + meta=invocation.meta, + ) + + def __str__(self): + invoc = super().__str__() + return f"{invoc};" + + @dataclass @_rule_handler("else_if_clause", comments=True) class ElseIfClause: @@ -2392,27 +2580,10 @@ def __str__(self): @_rule_handler("case_statement", comments=True) class CaseStatement(Statement): expression: Expression - cases: List[StatementList] + cases: List[CaseElement] else_clause: Optional[ElseClause] meta: Optional[Meta] = meta_field() - @staticmethod - def from_lark( - expr: Expression, - *args: Union[CaseStatement, ElseClause] - ) -> CaseStatement: - else_clause = None - if args and isinstance(args[-1], ElseClause) or args[-1] is None: - else_clause = typing.cast(ElseClause, args[-1]) - cases = typing.cast(Tuple[StatementList, ...], args[:-1]) - else: - cases = typing.cast(Tuple[StatementList, ...], args) - return CaseStatement( - expression=expr, - cases=list(cases), - else_clause=else_clause, - ) - def __str__(self) -> str: return "\n".join( s for s in ( @@ -2428,7 +2599,7 @@ def __str__(self) -> str: @dataclass @_rule_handler("no_op_statement", comments=True) class NoOpStatement(Statement): - variable: lark.Token + variable: Variable meta: Optional[Meta] = meta_field() def __str__(self): @@ -2500,7 +2671,7 @@ def __str__(self): @dataclass @_rule_handler("assignment_statement", comments=True) class AssignmentStatement(Statement): - variables: List[lark.Token] + variables: List[Variable] expression: Expression meta: Optional[Meta] = meta_field() @@ -2595,9 +2766,7 @@ class StatementList: meta: Optional[Meta] = meta_field() @staticmethod - def from_lark( - *statements: Statement - ) -> StatementList: + def from_lark(*statements: Statement) -> StatementList: return StatementList( statements=list(statements) ) @@ -2615,563 +2784,11 @@ def stringify_statement(statement: Union[Statement, FunctionBlockInvocation]) -> ) -class IL_Parameter: - ... - - -@dataclass -@_rule_handler("il_param_operand_assignment") -class IL_ParameterOperandAssignment(IL_Parameter): - variable: SymbolicVariable - value: IL_ParameterOperand - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - return f"{self.variable} := {self.value}" - - -@dataclass -@_rule_handler("il_param_instruction_assignment") -class IL_ParameterInstructionAssignment(IL_Parameter): - variable: SymbolicVariable - instructions: List[IL_Instruction] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - variable: SymbolicVariable, - *instructions: IL_Instruction, - ) -> IL_ParameterInstructionAssignment: - return IL_ParameterInstructionAssignment( - variable=variable, - instructions=list(instructions), - ) - - def __str__(self) -> str: - instructions = "\n".join( - str(instruction) for instruction in self.instructions - ) - return "\n".join( - part for part in ( - f"{self.variable} := (", - indent_if(instructions), - ")", - ) - if part is not None - ) - - -class IL_Operation: - ... - - -@dataclass -@_rule_handler("il_simple_operation") -class IL_SimpleOperation(IL_Operation): - function: lark.Token - operands: List[IL_ParameterOperand] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - function_or_operator: lark.Token, - _, - *operands: IL_ParameterOperand, - ) -> IL_SimpleOperation: - return IL_SimpleOperation( - function=function_or_operator, - operands=list(operands), - ) - - def __str__(self) -> str: - operands = ", ".join(str(operand) for operand in self.operands) - return f"{self.function} {operands}" - - -@dataclass -@_rule_handler("il_return_operator") -class IL_Return(IL_Operation): - return_token: lark.Token - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - return str(self.return_token).upper() - - -@dataclass -@_rule_handler("il_expression") -class IL_Expression(IL_Operation): - operator: lark.Token - operand: Optional[IL_ParameterOperand] - instructions: List[IL_SimpleInstruction] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - operator: lark.Token, - operand: Optional[IL_ParameterOperand], - *instructions: IL_SimpleInstruction, - ) -> IL_Expression: - return IL_Expression( - operator=operator, - operand=operand, - instructions=list(instructions), - ) - - def __str__(self) -> str: - instructions = "\n".join( - str(instruction) - for instruction in self.instructions - ) - instructions = f"\n{indent_if(instructions)}\n" if instructions else "" - operand = (str(self.operand) if self.operand else "") - return f"{self.operator}({operand}{instructions})" - - -@dataclass -@_rule_handler("il_jump_operation") -class IL_JumpOperation(IL_Operation): - operator: lark.Token - label: lark.Token - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - return f"{self.operator.upper()} {self.label}" - - -@dataclass -@_rule_handler("il_formal_function_call") -class IL_FunctionCall(IL_Operation): - function: lark.Token - parameters: List[IL_Parameter] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - function: lark.Token, - *parameters: IL_Parameter, - ) -> IL_FunctionCall: - return IL_FunctionCall(function=function, parameters=list(parameters)) - - def __str__(self) -> str: - parameters = "".join( - f"{indent_if(param)}\n" - for param in self.parameters - ) - return f"{self.function}({parameters})" - - -@dataclass -@_rule_handler("il_param_out_assignment") -class IL_ParameterOutAssignment(IL_Parameter): - inverted: bool - variable: SymbolicVariable - assign_to: Variable - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - not_token: Optional[lark.Token], - variable: SymbolicVariable, - assign_to: Variable, - ) -> IL_ParameterOutAssignment: - return IL_ParameterOutAssignment( - inverted=not_token is not None, - variable=variable, - assign_to=assign_to, - ) - - def __str__(self) -> str: - not_prefix = "NOT " if self.inverted else "" - return f"{not_prefix}{self.variable} => {self.assign_to}" - - -@dataclass -@_rule_handler("il_fb_call") -class IL_FunctionBlockCall: - operator: lark.Token - function_block: lark.Token - arguments: Union[List[IL_Parameter], List[IL_ParameterOperand]] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - operator: lark.Token, - function_block: lark.Token, - *arguments: Union[IL_Parameter, IL_ParameterOperand], - ) -> IL_FunctionBlockCall: - return IL_FunctionBlockCall( - operator=operator, - function_block=function_block, - arguments=list(arguments) - ) - - def __str__(self) -> str: - is_param_list = self.arguments and isinstance(self.arguments[0], IL_Parameter) - if is_param_list: - arguments = ",\n".join(indent(arg) for arg in self.arguments) - arguments = f"\n{arguments}\n" - else: - arguments = ", ".join(str(arg) for arg in self.arguments) - - return f"{self.operator.upper()} {self.function_block}({arguments})" - - -@dataclass -@_rule_handler("il_instruction", comments=True) -class IL_Instruction: - label: Optional[lark.Token] - operation: Optional[IL_Operation] - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - if not self.label and not self.operation: - return "" - label = f"{self.label}:" if self.label else None - return join_if(label, " ", self.operation) - - -IL_SimpleInstruction = Union[ - IL_SimpleOperation, - IL_Expression, - IL_FunctionCall, -] - -IL_ParameterOperand = Union[ - Literal, - Variable, - EnumeratedValue, -] - - -@dataclass -@_rule_handler("sfc_transition_steps") -class SfcTransitionSteps: - names: List[lark.Token] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - *names: lark.Token, - ) -> SfcTransitionSteps: - return SfcTransitionSteps(names=list(names)) - - def __str__(self) -> str: - if len(self.names) > 1: - names = ", ".join(name for name in self.names) - return f"({names})" - return str(self.names[0]) - - -@dataclass -@_rule_handler("sfc_transition") -class SfcTransition: - inverted: Optional[lark.Token] - name: Optional[lark.Token] - priority: Optional[lark.Token] - start: SfcTransitionSteps - stop: SfcTransitionSteps - condition: Expression - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - name_and_priority = " ".join( - part for part in ( - "TRANSITION", - self.inverted, - self.name, - self.priority - ) - if part is not None - ) - return "\n".join( - ( - name_and_priority, - f"FROM {self.start} TO {self.stop} := {self.condition}", - "END_TRANSITION" - ) - ) - - -@dataclass -@_rule_handler("action_qualifier") -class SfcActionQualifier: - qualifier: lark.Token - time: Optional[Union[Duration, SymbolicVariable]] = None - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - return join_if(self.qualifier, ", ", self.time) - - -@dataclass -@_rule_handler("action_association") -class SfcActionAssociation: - name: lark.Token - qualifier: Optional[SfcActionQualifier] - indicators: List[SymbolicVariable] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - name: lark.Token, - qualifier: Optional[SfcActionQualifier], - *indicators: SymbolicVariable, - ) -> SfcActionAssociation: - return SfcActionAssociation( - name=name, - qualifier=qualifier, - indicators=list(indicators), - ) - - def __str__(self) -> str: - qualifier_and_indicator = ", ".join( - str(item) - for item in (self.qualifier, *self.indicators) - if item is not None - ) - return f"{self.name}({qualifier_and_indicator})" - - -SfcStepBody = Union[ +FunctionBlockBody = Union[ StatementList, - SfcActionAssociation, - "SequentialFunctionChart", -] - - -@dataclass -@_rule_handler( - "sfc_step", - "sfc_initial_step", - comments=True -) -class SfcStep: - name: lark.Token - body: Optional[SfcStepBody] - initial: bool = False - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - step_token: lark.Token, - name: lark.Token, - body: Optional[SfcStepBody], - ) -> SfcStep: - return SfcStep( - name=name, - body=body, - initial=(step_token.upper() == "INITIAL_STEP"), - ) - - def __str__(self) -> str: - step_token = "INITIAL_STEP" if self.initial else "STEP" - return "\n".join( - line for line in - ( - f"{step_token} {self.name} :", - indent_if(self.body), - "END_STEP", - ) - if line is not None - ) - - -SfcNetworkPart = Union[ - SfcStep, - SfcTransition, - Action, - EntryAction, - ExitAction, ] - -@dataclass -@_rule_handler("sfc_network", comments=True) -class SfcNetwork: - parts: List[SfcNetworkPart] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - *parts: SfcNetworkPart - ) -> SfcNetwork: - return SfcNetwork(list(parts)) - - def __str__(self) -> str: - return "\n".join(str(part) for part in self.parts) - - -@dataclass -@_rule_handler("sequential_function_chart", comments=True) -class SequentialFunctionChart: - networks: List[SfcNetwork] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - *networks: SfcNetwork - ) -> SequentialFunctionChart: - return SequentialFunctionChart(list(networks)) - - def __str__(self) -> str: - return "\n".join(str(network) for network in self.networks) - - -@dataclass -@_rule_handler("instruction_list", comments=True) -class InstructionList: - instructions: List[IL_Instruction] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - *instructions: IL_Instruction - ) -> Optional[InstructionList]: - # TODO: parser ambiguity; il_instruction can match an empty - # function block body with no label or operation.... - instructions = [ - instr - for instr in instructions - if instr.label is not None or instr.operation is not None - ] - if instructions: - return InstructionList( - instructions=list(instructions) - ) - return None - - def __str__(self) -> str: - return "\n".join( - str(instruction) - for instruction in self.instructions - ) - - -@dataclass -@_rule_handler("config_access_declarations", comments=True) -class ConfigAccessDeclarations(VariableDeclarationBlock): - block_header: ClassVar[str] = "VAR_ACCESS" - items: List[ConfigAccessDeclaration] - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark(*items: ConfigAccessDeclaration) -> ConfigAccessDeclarations: - return ConfigAccessDeclarations(list(items)) - - def __str__(self) -> str: - return "\n".join( - ( - "VAR_ACCESS", - *(indent(f"{item};") for item in self.items), - "END_VAR", - ) - ) - - -@dataclass -@_rule_handler("config_access_declaration", comments=True) -class ConfigAccessDeclaration: - name: lark.Token - path: ConfigAccessPath - type: DataType - direction: Optional[lark.Token] - meta: Optional[Meta] = meta_field() - - def __str__(self) -> str: - return join_if( - f"{self.name} : {self.path} : {self.type}", - " ", - self.direction - ) - - -@dataclass -@_rule_handler("direct_access_path") -class DirectAccessPath: - resource_name: Optional[lark.Token] - variable: DirectVariable - meta: Optional[Meta] = meta_field() - - def __str__(self): - return join_if(self.resource_name, ".", self.variable) - - -@dataclass -@_rule_handler("access_path") -class SymbolicAccessPath: - # NOTE: these attributes are ambiguous; consider just making into a - # "DOTTED_IDENTIFIER". - resource_name: Optional[lark.Token] - program_name: Optional[lark.Token] - function_block_names: List[lark.Token] - variable: SymbolicVariable - meta: Optional[Meta] = meta_field() - - @staticmethod - def from_lark( - resource_name: Optional[lark.Token], - program_name: Optional[lark.Token], - *remainder, - ) -> SymbolicAccessPath: - *fb_names, variable = remainder - return SymbolicAccessPath( - resource_name=resource_name, - program_name=program_name, - function_block_names=typing.cast(List[lark.Token], fb_names), - variable=typing.cast(SymbolicVariable, variable), - ) - - def __str__(self): - return ".".join( - str(part) for part in [ - self.resource_name, - self.program_name, - *self.function_block_names, - self.variable - ] - if part is not None - ) - - -ConfigAccessPath = Union[DirectAccessPath, SymbolicAccessPath] - - -@dataclass -@_rule_handler("task_initialization") -class TaskInitialization: - single: Optional[TaskDataSource] - interval: Optional[TaskDataSource] - priority: lark.Token - meta: Optional[Meta] = meta_field() - - def __str__(self): - parts = " ".join( - part for part in ( - f"SINGLE := {self.single}," if self.single else None, - f"INTERVAL := {self.interval}," if self.interval else None, - f"PRIORITY := {self.priority}" - ) - if part is not None - ) - return f"({parts})" - - -@dataclass -@_rule_handler("task_configuration", comments=True) -class TaskConfiguration: - name: lark.Token - init: TaskInitialization - meta: Optional[Meta] = meta_field() - - def __str__(self): - return f"TASK {self.name} {self.init}" - - -TaskDataSource = Union[ - Literal, - Variable, -] +FunctionBody = FunctionBlockBody # Identical, currently TypeDeclarationItem = Union[ @@ -3181,6 +2798,7 @@ def __str__(self): SimpleTypeDeclaration, SubrangeTypeDeclaration, EnumeratedTypeDeclaration, + UnionTypeDeclaration, ] @@ -3222,7 +2840,6 @@ def __str__(self) -> str: Action, Method, Program, - # ConfigurationDeclaration, # TODO GlobalVariableDeclarations, ] @@ -3277,7 +2894,7 @@ class GrammarTransformer(lark.visitors.Transformer_InPlaceRecursive): Sorted list of comments and pragmas for annotating the resulting transformed grammar. """ - _filename: Optional[str] + _filename: Optional[pathlib.Path] comments: List[lark.Token] def __init__( @@ -3305,9 +2922,13 @@ def transform(self, tree): if isinstance(transformed, SourceCode): transformed.raw_source = self._source_code transformed.filename = ( - pathlib.Path(self._filename) + self._filename if self._filename is not None else None ) + for item in transformed.items: + if isinstance(item, GlobalVariableDeclarations): + item.name = self._filename.stem if self._filename else None + return transformed @_annotator_method_wrapper @@ -3354,6 +2975,14 @@ def true(self, value: lark.Token): def false(self, value: lark.Token): return Boolean(value=value) + @_annotator_method_wrapper + def program_var_declarations(self, *declarations: VariableDeclarationBlock): + return list(declarations) + + @_annotator_method_wrapper + def case_elements(self, *cases: CaseStatement): + return list(cases) + def __default__(self, data, children, meta): """ Default function that is called if there is no attribute matching ``data`` @@ -3400,3 +3029,15 @@ def merge_comments(source: Any, comments: List[lark.Token]): obj = getattr(source, field.name, None) if obj is not None: merge_comments(obj, comments) + + +if apischema is not None: + # Optional apischema deserializers + + @apischema.deserializer + def _method_access_deserializer(access: int) -> MethodAccess: + return MethodAccess(access) + + @apischema.deserializer + def _var_attrs_deserializer(attrs: int) -> VariableAttributes: + return VariableAttributes(attrs) diff --git a/blark/util.py b/blark/util.py index f0d12c0..36aaf46 100644 --- a/blark/util.py +++ b/blark/util.py @@ -1,9 +1,11 @@ +import hashlib import pathlib import re from typing import Generator, List, Tuple, Union import lark import pytmc +import pytmc.parser RE_LEADING_WHITESPACE = re.compile('^[ \t]+', re.MULTILINE) AnyPath = Union[str, pathlib.Path] @@ -229,3 +231,33 @@ def remove_comment_characters(text: str) -> str: if text.startswith("/"): return text.lstrip("/ ") return text.strip("()").strip("* ") + + +def get_tsprojects_from_filename( + filename: AnyPath, +) -> Tuple[pathlib.Path, List[pathlib.Path]]: + """ + From a TwinCAT solution (.sln) or .tsproj, return all tsproj projects. + + Returns + ------- + root : pathlib.Path + Project root directory (where the solution or provided tsproj is + located). + + projects : list of pathlib.Path + List of tsproj projects paths. + """ + abs_path = pathlib.Path(filename).resolve() + if abs_path.suffix == '.tsproj': + return abs_path.parent, [abs_path] + if abs_path.suffix == '.sln': + return abs_path.parent, pytmc.parser.projects_from_solution(abs_path) + + raise RuntimeError(f'Expected a .tsproj/.sln file; got {abs_path.suffix!r}') + + +def get_file_sha256(filename: AnyPath) -> str: + """Hash a file's contents with the SHA-256 algorithm.""" + with open(filename, "rb") as fp: + return hashlib.sha256(fp.read()).hexdigest()