diff --git a/docs/changelog.md b/docs/changelog.md index d70ee61..1ea21bd 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -7,6 +7,12 @@ ### Added - [#3](https://github.com/JoshKarpel/synthesize/pull/3) Added PyPI classifiers and other metadata. +- [#33](https://github.com/JoshKarpel/synthesize/pull/33) + Allow injecting arguments + (via [Jinja2 templates](https://jinja.palletsprojects.com/)) + and environment variables into target commands. + Arguments and environment variables can be specified at either + the flow, node, or target level, with the most specific taking precedence. ### Changed diff --git a/examples/dag.yaml b/examples/dag.yaml index 1c67e9e..3a98ff4 100644 --- a/examples/dag.yaml +++ b/examples/dag.yaml @@ -1,28 +1,22 @@ flows: - dag: + default: nodes: - - id: 1 - target: - id: sleep-and-echo - - id: 2 - target: - id: sleep-and-echo - - id: 3 - target: - id: sleep-and-echo - - id: 4 - target: - id: sleep-and-echo + 1: + target: sleep-and-echo + 2: + target: sleep-and-echo + 3: + target: sleep-and-echo + 4: + target: sleep-and-echo trigger: after: ["1", "2"] - - id: 5 - target: - id: sleep-and-echo + 5: + target: sleep-and-echo trigger: after: ["3"] - - id: 6 - target: - id: sleep-and-echo + 6: + target: sleep-and-echo trigger: after: ["4", "5"] @@ -30,4 +24,4 @@ targets: sleep-and-echo: commands: | sleep 2 - echo "Hi!" + echo "Hi from {{ id }}!" diff --git a/poetry.lock b/poetry.lock index 9c42cb9..8f2acfc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -353,13 +353,13 @@ colorama = ">=0.4" [[package]] name = "hypothesis" -version = "6.103.3" +version = "6.104.1" description = "A library for property-based testing" optional = false python-versions = ">=3.8" files = [ - {file = "hypothesis-6.103.3-py3-none-any.whl", hash = "sha256:870d82475cd11c33d75bf7bbdfd972df9d7ea9f79696ae50c3d766e8307467d9"}, - {file = "hypothesis-6.103.3.tar.gz", hash = "sha256:33ed1ec217a3e279914b2c0d9045e56f0d9d2785232ec7ed69ad080c839f86b5"}, + {file = "hypothesis-6.104.1-py3-none-any.whl", hash = "sha256:a0a898fa78ecaefe76ad248901dc274e598f29198c6015b3053f7f7827670e0e"}, + {file = "hypothesis-6.104.1.tar.gz", hash = "sha256:4033898019a6149823d2feeb8d214921b4ac2d342a05d6b02e40a3ca4be07eea"}, ] [package.dependencies] @@ -711,40 +711,51 @@ files = [ griffe = ">=0.47" mkdocstrings = ">=0.25" +[[package]] +name = "more-itertools" +version = "10.3.0" +description = "More routines for operating on iterables, beyond itertools" +optional = false +python-versions = ">=3.8" +files = [ + {file = "more-itertools-10.3.0.tar.gz", hash = "sha256:e5d93ef411224fbcef366a6e8ddc4c5781bc6359d43412a65dd5964e46111463"}, + {file = "more_itertools-10.3.0-py3-none-any.whl", hash = "sha256:ea6a02e24a9161e51faad17a8782b92a0df82c12c1c8886fec7f0c3fa1a1b320"}, +] + [[package]] name = "mypy" -version = "1.10.0" +version = "1.10.1" description = "Optional static typing for Python" optional = false python-versions = ">=3.8" files = [ - {file = "mypy-1.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:da1cbf08fb3b851ab3b9523a884c232774008267b1f83371ace57f412fe308c2"}, - {file = "mypy-1.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:12b6bfc1b1a66095ab413160a6e520e1dc076a28f3e22f7fb25ba3b000b4ef99"}, - {file = "mypy-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9e36fb078cce9904c7989b9693e41cb9711e0600139ce3970c6ef814b6ebc2b2"}, - {file = "mypy-1.10.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:2b0695d605ddcd3eb2f736cd8b4e388288c21e7de85001e9f85df9187f2b50f9"}, - {file = "mypy-1.10.0-cp310-cp310-win_amd64.whl", hash = "sha256:cd777b780312ddb135bceb9bc8722a73ec95e042f911cc279e2ec3c667076051"}, - {file = "mypy-1.10.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:3be66771aa5c97602f382230165b856c231d1277c511c9a8dd058be4784472e1"}, - {file = "mypy-1.10.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8b2cbaca148d0754a54d44121b5825ae71868c7592a53b7292eeb0f3fdae95ee"}, - {file = "mypy-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1ec404a7cbe9fc0e92cb0e67f55ce0c025014e26d33e54d9e506a0f2d07fe5de"}, - {file = "mypy-1.10.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e22e1527dc3d4aa94311d246b59e47f6455b8729f4968765ac1eacf9a4760bc7"}, - {file = "mypy-1.10.0-cp311-cp311-win_amd64.whl", hash = "sha256:a87dbfa85971e8d59c9cc1fcf534efe664d8949e4c0b6b44e8ca548e746a8d53"}, - {file = "mypy-1.10.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:a781f6ad4bab20eef8b65174a57e5203f4be627b46291f4589879bf4e257b97b"}, - {file = "mypy-1.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b808e12113505b97d9023b0b5e0c0705a90571c6feefc6f215c1df9381256e30"}, - {file = "mypy-1.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f55583b12156c399dce2df7d16f8a5095291354f1e839c252ec6c0611e86e2e"}, - {file = "mypy-1.10.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:4cf18f9d0efa1b16478c4c129eabec36148032575391095f73cae2e722fcf9d5"}, - {file = "mypy-1.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:bc6ac273b23c6b82da3bb25f4136c4fd42665f17f2cd850771cb600bdd2ebeda"}, - {file = "mypy-1.10.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:9fd50226364cd2737351c79807775136b0abe084433b55b2e29181a4c3c878c0"}, - {file = "mypy-1.10.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f90cff89eea89273727d8783fef5d4a934be2fdca11b47def50cf5d311aff727"}, - {file = "mypy-1.10.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fcfc70599efde5c67862a07a1aaf50e55bce629ace26bb19dc17cece5dd31ca4"}, - {file = "mypy-1.10.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:075cbf81f3e134eadaf247de187bd604748171d6b79736fa9b6c9685b4083061"}, - {file = "mypy-1.10.0-cp38-cp38-win_amd64.whl", hash = "sha256:3f298531bca95ff615b6e9f2fc0333aae27fa48052903a0ac90215021cdcfa4f"}, - {file = "mypy-1.10.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fa7ef5244615a2523b56c034becde4e9e3f9b034854c93639adb667ec9ec2976"}, - {file = "mypy-1.10.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3236a4c8f535a0631f85f5fcdffba71c7feeef76a6002fcba7c1a8e57c8be1ec"}, - {file = "mypy-1.10.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4a2b5cdbb5dd35aa08ea9114436e0d79aceb2f38e32c21684dcf8e24e1e92821"}, - {file = "mypy-1.10.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:92f93b21c0fe73dc00abf91022234c79d793318b8a96faac147cd579c1671746"}, - {file = "mypy-1.10.0-cp39-cp39-win_amd64.whl", hash = "sha256:28d0e038361b45f099cc086d9dd99c15ff14d0188f44ac883010e172ce86c38a"}, - {file = "mypy-1.10.0-py3-none-any.whl", hash = "sha256:f8c083976eb530019175aabadb60921e73b4f45736760826aa1689dda8208aee"}, - {file = "mypy-1.10.0.tar.gz", hash = "sha256:3d087fcbec056c4ee34974da493a826ce316947485cef3901f511848e687c131"}, + {file = "mypy-1.10.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e36f229acfe250dc660790840916eb49726c928e8ce10fbdf90715090fe4ae02"}, + {file = "mypy-1.10.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:51a46974340baaa4145363b9e051812a2446cf583dfaeba124af966fa44593f7"}, + {file = "mypy-1.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:901c89c2d67bba57aaaca91ccdb659aa3a312de67f23b9dfb059727cce2e2e0a"}, + {file = "mypy-1.10.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0cd62192a4a32b77ceb31272d9e74d23cd88c8060c34d1d3622db3267679a5d9"}, + {file = "mypy-1.10.1-cp310-cp310-win_amd64.whl", hash = "sha256:a2cbc68cb9e943ac0814c13e2452d2046c2f2b23ff0278e26599224cf164e78d"}, + {file = "mypy-1.10.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:bd6f629b67bb43dc0d9211ee98b96d8dabc97b1ad38b9b25f5e4c4d7569a0c6a"}, + {file = "mypy-1.10.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a1bbb3a6f5ff319d2b9d40b4080d46cd639abe3516d5a62c070cf0114a457d84"}, + {file = "mypy-1.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8edd4e9bbbc9d7b79502eb9592cab808585516ae1bcc1446eb9122656c6066f"}, + {file = "mypy-1.10.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6166a88b15f1759f94a46fa474c7b1b05d134b1b61fca627dd7335454cc9aa6b"}, + {file = "mypy-1.10.1-cp311-cp311-win_amd64.whl", hash = "sha256:5bb9cd11c01c8606a9d0b83ffa91d0b236a0e91bc4126d9ba9ce62906ada868e"}, + {file = "mypy-1.10.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d8681909f7b44d0b7b86e653ca152d6dff0eb5eb41694e163c6092124f8246d7"}, + {file = "mypy-1.10.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:378c03f53f10bbdd55ca94e46ec3ba255279706a6aacaecac52ad248f98205d3"}, + {file = "mypy-1.10.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bacf8f3a3d7d849f40ca6caea5c055122efe70e81480c8328ad29c55c69e93e"}, + {file = "mypy-1.10.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:701b5f71413f1e9855566a34d6e9d12624e9e0a8818a5704d74d6b0402e66c04"}, + {file = "mypy-1.10.1-cp312-cp312-win_amd64.whl", hash = "sha256:3c4c2992f6ea46ff7fce0072642cfb62af7a2484efe69017ed8b095f7b39ef31"}, + {file = "mypy-1.10.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:604282c886497645ffb87b8f35a57ec773a4a2721161e709a4422c1636ddde5c"}, + {file = "mypy-1.10.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:37fd87cab83f09842653f08de066ee68f1182b9b5282e4634cdb4b407266bade"}, + {file = "mypy-1.10.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8addf6313777dbb92e9564c5d32ec122bf2c6c39d683ea64de6a1fd98b90fe37"}, + {file = "mypy-1.10.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5cc3ca0a244eb9a5249c7c583ad9a7e881aa5d7b73c35652296ddcdb33b2b9c7"}, + {file = "mypy-1.10.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b3a2ffce52cc4dbaeee4df762f20a2905aa171ef157b82192f2e2f368eec05d"}, + {file = "mypy-1.10.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fe85ed6836165d52ae8b88f99527d3d1b2362e0cb90b005409b8bed90e9059b3"}, + {file = "mypy-1.10.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c2ae450d60d7d020d67ab440c6e3fae375809988119817214440033f26ddf7bf"}, + {file = "mypy-1.10.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6be84c06e6abd72f960ba9a71561c14137a583093ffcf9bbfaf5e613d63fa531"}, + {file = "mypy-1.10.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2189ff1e39db399f08205e22a797383613ce1cb0cb3b13d8bcf0170e45b96cc3"}, + {file = "mypy-1.10.1-cp39-cp39-win_amd64.whl", hash = "sha256:97a131ee36ac37ce9581f4220311247ab6cba896b4395b9c87af0675a13a755f"}, + {file = "mypy-1.10.1-py3-none-any.whl", hash = "sha256:71d8ac0b906354ebda8ef1673e5fde785936ac1f29ff6987c7483cfbd5a4235a"}, + {file = "mypy-1.10.1.tar.gz", hash = "sha256:1f8f492d7db9e3593ef42d4f115f04e556130f2819ad33ab84551403e97dd4c0"}, ] [package.dependencies] @@ -1565,4 +1576,4 @@ anyio = ">=3.0.0" [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "c6030b7b1931e6cba1640714ba4a0881400b3f0a2d0445aa315301b2f99da675" +content-hash = "766e4897e1ddefe89c187a03f1a8595d78228b455c1f50fb8412b790f7b3a671" diff --git a/pyproject.toml b/pyproject.toml index 3de0893..c1bb050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ classifiers = [ "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Software Development", "Topic :: Utilities", "Typing :: Typed", @@ -39,6 +40,8 @@ pyyaml = ">=6.0" networkx = ">=3.0" watchfiles = ">=0.18" identify = ">=2.5" +jinja2 = ">=3.1" +more-itertools = ">=10.3" [tool.poetry.group.dev.dependencies] pre-commit = ">=3" @@ -88,6 +91,7 @@ warn_redundant_casts = true ignore_missing_imports = true +plugins = "pydantic.mypy" [tool.ruff] line-length = 120 @@ -109,10 +113,11 @@ select = [ ] ignore = [ - "E501", # line length exceeds limit - "E741", # ambiguous variable name - "T201", # print - "T203", # pprint - "F403", # star imports, used for utilities - "F405", # star imports, used for utilities + "E501", # line length exceeds limit + "E741", # ambiguous variable name + "T201", # print + "T203", # pprint + "F403", # star imports, used for utilities + "F405", # star imports, used for utilities + "RUF012", # pydantic allows mutable class attributes ] diff --git a/synth.yaml b/synth.yaml index 25fae8f..560406f 100644 --- a/synth.yaml +++ b/synth.yaml @@ -1,26 +1,21 @@ flows: default: nodes: - - id: tests - target: - id: tests - trigger: - id: code-changes - - id: types - target: - id: types - trigger: - id: code-changes - - id: docs - target: - id: docs + tests: + target: tests + trigger: code-changes + types: + target: types + trigger: code-changes + docs: + target: docs trigger: type: restart targets: tests: commands: | - pytest --cov + pytest -vv --cov types: commands: | @@ -37,3 +32,4 @@ triggers: - synthesize/ - tests/ - examples/ + - pyproject.toml diff --git a/synthesize/cli.py b/synthesize/cli.py index fc588b7..17a6918 100644 --- a/synthesize/cli.py +++ b/synthesize/cli.py @@ -38,7 +38,7 @@ def run( help="The path to the configuration file to execute.", ), dry: bool = Option( - False, + default=False, help="If enabled, do not run actually run the flow.", ), ) -> None: @@ -66,11 +66,25 @@ def run( ) ) - return - resolved = parsed_config.resolve() - controller = Orchestrator(flow=resolved[flow], console=console) + try: + selected_flow = resolved[flow] + except KeyError: + sep = "\n " + available_flows = sep + sep.join(resolved.keys()) + console.print( + Text( + f"No flow named '{flow}'. Available flows:{available_flows}", + style=Style(color="red"), + ) + ) + raise Exit(code=1) + + if dry: + return + + controller = Orchestrator(flow=selected_flow, console=console) try: asyncio.run(controller.run()) diff --git a/synthesize/config.py b/synthesize/config.py index 71c7066..d76522b 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -1,5 +1,8 @@ from __future__ import annotations +import shlex +import shutil +from collections.abc import Mapping from colorsys import hsv_to_rgb from pathlib import Path from random import random @@ -7,11 +10,33 @@ from typing import Annotated, Literal, Union from identify.identify import tags_from_path +from jinja2 import Environment from pydantic import Field, field_validator from rich.color import Color from synthesize.model import Model +Args = dict[ + Annotated[ + str, + Field( + # https://jinja.palletsprojects.com/en/3.1.x/api/#notes-on-identifiers + pattern=r"[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*", + min_length=1, + ), + ], + object, +] +Envs = dict[ + Annotated[ + str, + Field( + min_length=1, + ), + ], + str, +] + def random_color() -> str: triplet = Color.from_rgb(*(x * 255 for x in hsv_to_rgb(random(), 1, 0.7))).triplet @@ -22,15 +47,36 @@ def random_color() -> str: return triplet.hex +template_environment = Environment() + + class Target(Model): - commands: str = Field(default="") - executable: str = Field(default="sh -u") + commands: str = "" + executable: str = "sh -u" @field_validator("commands") @classmethod def dedent_commands(cls, commands: str) -> str: return dedent(commands).strip() + def render(self, args: Args) -> str: + exe, *exe_args = shlex.split(self.executable) + which_exe = shutil.which(exe) + if which_exe is None: + raise Exception(f"Failed to find absolute path to executable for {exe}") + + template = template_environment.from_string( + "\n".join( + ( + f"#!{shlex.join((which_exe, *exe_args))}", + "", + self.commands, + ) + ) + ) + + return template.render(args) + class Once(Model): type: Literal["once"] = "once" @@ -39,7 +85,7 @@ class Once(Model): class After(Model): type: Literal["after"] = "after" - after: frozenset[str] = Field(default=...) + after: Annotated[frozenset[str], Field(min_length=1)] class Restart(Model): @@ -48,11 +94,10 @@ class Restart(Model): delay: Annotated[ float, Field( - default=1, description="The delay before restarting the command after it exits.", ge=0, ), - ] + ] = 1 class Watch(Model): @@ -73,66 +118,81 @@ class FlowNode(Model): id: str target: Target - trigger: AnyTrigger - - color: str - - -class TargetRef(Model): - id: str + args: Args = {} + envs: Envs = {} + trigger: AnyTrigger = Once() -class TriggerRef(Model): - id: str + color: Annotated[str, Field(default_factory=random_color)] class UnresolvedFlowNode(Model): - id: str + target: Target | str + args: Args = {} + envs: Envs = {} - target: Target | TargetRef - trigger: AnyTrigger | TriggerRef = Once() + trigger: AnyTrigger | str = Once() color: Annotated[str, Field(default_factory=random_color)] def resolve( self, - targets: dict[str, Target], - triggers: dict[str, AnyTrigger], + id: str, + targets: Mapping[str, Target], + triggers: Mapping[str, AnyTrigger], ) -> FlowNode: return FlowNode( - id=self.id, - target=targets[self.target.id] if isinstance(self.target, TargetRef) else self.target, - trigger=( - triggers[self.trigger.id] if isinstance(self.trigger, TriggerRef) else self.trigger - ), + id=id, + target=targets[self.target] if isinstance(self.target, str) else self.target, + args=self.args, + envs=self.envs, + trigger=(triggers[self.trigger] if isinstance(self.trigger, str) else self.trigger), color=self.color, ) class Flow(Model): - nodes: tuple[FlowNode, ...] + nodes: dict[str, FlowNode] + args: Args = {} + envs: Envs = {} class UnresolvedFlow(Model): - nodes: tuple[UnresolvedFlowNode, ...] + nodes: dict[str, UnresolvedFlowNode] + args: Args = {} + envs: Envs = {} - def resolve(self, targets: dict[str, Target], triggers: dict[str, AnyTrigger]) -> Flow: - return Flow(nodes=tuple(node.resolve(targets, triggers) for node in self.nodes)) + def resolve( + self, + targets: Mapping[str, Target], + triggers: Mapping[str, AnyTrigger], + ) -> Flow: + return Flow( + nodes={ + node_id: node.resolve(node_id, targets, triggers) + for node_id, node in self.nodes.items() + }, + args=self.args, + envs=self.envs, + ) class Config(Model): - targets: Annotated[dict[str, Target], Field(default_factory=dict)] - triggers: Annotated[dict[str, AnyTrigger], Field(default_factory=dict)] - flows: Annotated[dict[str, UnresolvedFlow], Field(default_factory=dict)] + flows: dict[str, UnresolvedFlow] = {} + targets: dict[str, Target] = {} + triggers: dict[str, AnyTrigger] = {} @classmethod def from_file(cls, file: Path) -> Config: tags = tags_from_path(str(file)) if "yaml" in tags: - return cls.parse_yaml(file.read_text()) + return cls.model_validate_yaml(file.read_text()) else: raise NotImplementedError("Currently, only YAML files are supported.") - def resolve(self) -> dict[str, Flow]: - return {id: flow.resolve(self.targets, self.triggers) for id, flow in self.flows.items()} + def resolve(self) -> Mapping[str, Flow]: + return { + flow_id: flow.resolve(self.targets, self.triggers) + for flow_id, flow in self.flows.items() + } diff --git a/synthesize/execution.py b/synthesize/execution.py index 9512543..04135f1 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -1,29 +1,34 @@ from __future__ import annotations import os -import shlex -import shutil from asyncio import Queue, Task, create_task from asyncio.subprocess import PIPE, STDOUT, Process, create_subprocess_exec from dataclasses import dataclass, field -from functools import lru_cache -from hashlib import md5 from pathlib import Path from signal import SIGKILL, SIGTERM from stat import S_IEXEC -from synthesize.config import FlowNode +from synthesize.config import Args, Envs, FlowNode from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message +from synthesize.utils import md5 -@lru_cache(maxsize=2**10) -def file_name(node: FlowNode) -> str: - h = md5() - h.update(node.id.encode()) - h.update(node.target.executable.encode()) - h.update(node.target.commands.encode()) +def write_script(node: FlowNode, args: Args, tmp_dir: Path) -> Path: + path = tmp_dir / f"{node.id}-{md5(node.model_dump_json().encode())}" - return f"{node.id}-{h.hexdigest()}" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + node.target.render( + args=args + | node.args + | { + "id": node.id, + } + ) + ) + path.chmod(path.stat().st_mode | S_IEXEC) + + return path @dataclass(frozen=True) @@ -35,38 +40,32 @@ class Execution: process: Process reader: Task[None] - width: int - @classmethod async def start( cls, node: FlowNode, - events: Queue[Message], + args: Args, + envs: Envs, tmp_dir: Path, - width: int = 80, + width: int, + events: Queue[Message], ) -> Execution: - path = tmp_dir / file_name(node) - path.parent.mkdir(parents=True, exist_ok=True) - exe, *args = shlex.split(node.target.executable) - which_exe = shutil.which(exe) - if which_exe is None: - raise Exception(f"Failed to find absolute path to executable for {exe}") - path.write_text( - "\n".join( - ( - f"#! {shlex.join((which_exe, *args))}", - "", - node.target.commands, - ) - ) - ) - path.chmod(path.stat().st_mode | S_IEXEC) + path = write_script(node=node, args=args, tmp_dir=tmp_dir) process = await create_subprocess_exec( program=path, stdout=PIPE, stderr=STDOUT, - env={**os.environ, "FORCE_COLOR": "1", "COLUMNS": str(width)}, + env=os.environ + | envs + | node.envs + | { + "FORCE_COLOR": "1", + "COLUMNS": str(width), + } + | { + "SYNTH_NODE_ID": node.id, + }, preexec_fn=os.setsid, ) @@ -86,7 +85,6 @@ async def start( events=events, process=process, reader=reader, - width=width, ) @property diff --git a/synthesize/model.py b/synthesize/model.py index 51c0087..1a2572d 100644 --- a/synthesize/model.py +++ b/synthesize/model.py @@ -17,5 +17,5 @@ class Model(BaseModel): ) @classmethod - def parse_yaml(cls: Type[C], y: str) -> C: + def model_validate_yaml(cls: Type[C], y: str) -> C: return cls.model_validate(yaml.safe_load(y)) diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 3246b2d..ff11505 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -40,10 +40,10 @@ def __init__(self, flow: Flow, console: Console): self.heartbeat: Task[None] | None = None async def run(self) -> None: - if not self.state.nodes(): + if not self.flow.nodes: return - with TemporaryDirectory(prefix="snyth-") as tmpdir, self.renderer: + with TemporaryDirectory(prefix="synth-") as tmpdir, self.renderer: tmp_dir = Path(tmpdir) try: @@ -122,9 +122,11 @@ async def start_ready_targets(self, tmp_dir: Path) -> None: async def start() -> None: e = await Execution.start( node=ready_node, - events=self.inbox, - width=self.console.width - self.renderer.prefix_width, + args=self.flow.args, + envs=self.flow.envs, tmp_dir=tmp_dir, + width=self.console.width - self.renderer.prefix_width, + events=self.inbox, ) self.executions[ready_node.id] = e self.waiters[ready_node.id] = create_task(e.wait()) @@ -136,7 +138,7 @@ async def start() -> None: await start() async def start_watchers(self) -> None: - for node in self.flow.nodes: + for node in self.flow.nodes.values(): if isinstance(node.trigger, Watch): self.watchers[node.id] = create_task( watch( diff --git a/synthesize/renderer.py b/synthesize/renderer.py index c9eef00..6bb4adc 100644 --- a/synthesize/renderer.py +++ b/synthesize/renderer.py @@ -63,17 +63,17 @@ def handle_message(self, message: Message) -> None: def info(self, event: Message) -> RenderableType: table = Table.grid(padding=(1, 1, 0, 0)) - running_targets = self.state.running_nodes() + running_nodes = self.state.running_nodes() running = ( Text.assemble( "Running ", Text(" ").join( Text(t.id, style=Style(color="black", bgcolor=t.color)) - for t in sorted(running_targets, key=lambda t: t.id) + for t in sorted(running_nodes, key=lambda t: t.id) ), ) - if running_targets + if running_nodes else Text() ) @@ -82,7 +82,7 @@ def info(self, event: Message) -> RenderableType: running, ) - return Group(Rule(style=(Style(color="green" if running_targets else "yellow"))), table) + return Group(Rule(style=(Style(color="green" if running_nodes else "yellow"))), table) def render_prefix( self, message: ExecutionOutput | ExecutionStarted | ExecutionCompleted | WatchPathChanged @@ -117,13 +117,11 @@ def handle_lifecycle_message( match message: case ExecutionStarted(node=node, pid=pid): parts = ( - "Node ", (node.id, node.color), f" started (pid {pid})", ) case ExecutionCompleted(node=node, pid=pid, exit_code=exit_code): parts = ( - "Node ", (node.id, node.color), f" (pid {pid}) exited with code ", (str(exit_code), "green" if exit_code == 0 else "red"), @@ -134,7 +132,7 @@ def handle_lifecycle_message( ) parts = ( - "Running node ", + "Running ", (node.id, node.color), " due to detected changes: ", changes, diff --git a/synthesize/state.py b/synthesize/state.py index 27a1c0f..3464298 100644 --- a/synthesize/state.py +++ b/synthesize/state.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Collection, Iterator from dataclasses import dataclass from enum import Enum @@ -18,68 +19,64 @@ class FlowNodeStatus(Enum): @dataclass(frozen=True) class FlowState: graph: DiGraph - id_to_node: dict[str, FlowNode] - id_to_status: dict[str, FlowNodeStatus] + flow: Flow + statuses: dict[str, FlowNodeStatus] @classmethod def from_flow(cls, flow: Flow) -> FlowState: - id_to_node = {node.id: node for node in flow.nodes} - graph = DiGraph() - for id, node in id_to_node.items(): - graph.add_node(node.id) + for id, node in flow.nodes.items(): + graph.add_node(id) if isinstance(node.trigger, After): for predecessor_id in node.trigger.after: graph.add_edge(predecessor_id, id) return FlowState( graph=graph, - id_to_node={id: node for id, node in id_to_node.items()}, - id_to_status={id: FlowNodeStatus.Pending for id in graph.nodes}, + flow=flow, + statuses={id: FlowNodeStatus.Pending for id in graph.nodes}, ) - def running_nodes(self) -> set[FlowNode]: - return { - self.id_to_node[id] - for id, status in self.id_to_status.items() + def running_nodes(self) -> Collection[FlowNode]: + return tuple( + self.flow.nodes[id] + for id, status in self.statuses.items() if status is FlowNodeStatus.Running - } + ) - def ready_nodes(self) -> set[FlowNode]: - return { - self.id_to_node[id] + def ready_nodes(self) -> Collection[FlowNode]: + return tuple( + self.flow.nodes[id] for id in self.graph.nodes - if self.id_to_status[id] is FlowNodeStatus.Pending - and all( - self.id_to_status[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id) - ) - } + if self.statuses[id] is FlowNodeStatus.Pending + and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)) + ) def mark_success(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Succeeded + self.statuses[node.id] = FlowNodeStatus.Succeeded def mark_failure(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Failed + self.statuses[node.id] = FlowNodeStatus.Failed def mark_pending(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Pending + self.statuses[node.id] = FlowNodeStatus.Pending def mark_descendants_pending(self, node: FlowNode) -> None: for t in _descendants(self.graph, {node.id}): - self.id_to_status[t] = FlowNodeStatus.Pending + self.statuses[t] = FlowNodeStatus.Pending def mark_running(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Running + self.statuses[node.id] = FlowNodeStatus.Running def all_done(self) -> bool: - return all(status is FlowNodeStatus.Succeeded for status in self.id_to_status.values()) + return all(status is FlowNodeStatus.Succeeded for status in self.statuses.values()) def num_nodes(self) -> int: return len(self.graph) - def nodes(self) -> set[FlowNode]: - return set(self.id_to_node.values()) + def nodes(self) -> Iterator[FlowNode]: + yield from self.flow.nodes.values() def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]: diff --git a/synthesize/utils.py b/synthesize/utils.py index 6a1f1ef..40af9d9 100644 --- a/synthesize/utils.py +++ b/synthesize/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib from asyncio import Task, create_task, sleep from typing import Awaitable, Callable, Optional, TypeVar @@ -12,3 +13,7 @@ async def delayed() -> T: return await fn() return create_task(delayed(), name=name) + + +def md5(data: bytes) -> str: + return hashlib.md5(data).hexdigest() diff --git a/tests/test_config.py b/tests/test_config.py index 1ecab05..b294923 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,9 +1,22 @@ +import shutil from pathlib import Path import pytest from rich.style import Style -from synthesize.config import Config, random_color +from synthesize.config import ( + AnyTrigger, + Args, + Config, + Flow, + FlowNode, + Once, + Restart, + Target, + UnresolvedFlow, + UnresolvedFlowNode, + random_color, +) @pytest.mark.parametrize("example", list((Path(__file__).parent.parent / "examples").iterdir())) @@ -13,3 +26,281 @@ def test_config_examples_parse(example: Path) -> None: def test_can_make_style_from_random_color() -> None: assert Style(color=random_color()) + + +@pytest.mark.parametrize( + ("raw", "expected"), + ( + ("echo 'hello'", "echo 'hello'"), + (" echo 'hello'", "echo 'hello'"), + (" echo ", "echo"), + ( + """ + echo + """, + "echo", + ), + ( + """ + echo + echo + """, + "echo\necho", + ), + ), +) +def test_target_commands_dedenting(raw: str, expected: str) -> None: + assert Target(commands=raw).commands == expected + + +@pytest.mark.parametrize( + ("target", "args", "expected"), + ( + ( + Target(commands=""), + Args(), + f"#!{shutil.which('sh')} -u\n", + ), + ( + Target(commands="echo 'hello'"), + Args(), + f"#!{shutil.which('sh')} -u\n\necho 'hello'", + ), + ( + Target(commands="echo '{{foo}}'"), + Args({"foo": "bar"}), + f"#!{shutil.which('sh')} -u\n\necho 'bar'", + ), + ( # unused values are ok + Target(commands="echo '{{foo}}'"), + Args({"foo": "bar", "baz": "qux"}), + f"#!{shutil.which('sh')} -u\n\necho 'bar'", + ), + ( + Target(commands="echo {{foo}} {{baz}}"), + Args({"foo": "bar", "baz": "qux"}), + f"#!{shutil.which('sh')} -u\n\necho bar qux", + ), + ( + Target(commands="echo", executable="bash"), + Args(), + f"#!{shutil.which('bash')}\n\necho", + ), + ( + Target(commands="{{ 'yes' if choice else 'no' }}"), + Args({"choice": True}), + f"#!{shutil.which('sh')} -u\n\nyes", + ), + ( + Target(commands="{{ 'yes' if choice else 'no' }}"), + Args({"choice": False}), + f"#!{shutil.which('sh')} -u\n\nno", + ), + ), +) +def test_target_rendering(target: Target, args: Args, expected: str) -> None: + assert target.render(args) == expected + + +def test_rendering_fails_for_bogus_executable() -> None: + with pytest.raises(Exception): + Target(executable="bogus").render(Args()) + + +color = random_color() + + +@pytest.mark.parametrize( + ("unresolved_node", "id", "targets", "triggers", "expected"), + ( + ( + UnresolvedFlowNode( + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + "foo", + {}, + {}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ( + UnresolvedFlowNode( + target="t", + trigger=Once(), + color=color, + ), + "foo", + {"t": Target(commands="echo")}, + {}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ( + UnresolvedFlowNode( + target=Target(commands="echo"), + trigger="r", + color=color, + ), + "foo", + {}, + {"r": Once()}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ( + UnresolvedFlowNode( + target="t", + trigger="r", + color=color, + ), + "foo", + {"t": Target(commands="echo")}, + {"r": Once()}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ), +) +def test_resolve_flow_node( + unresolved_node: UnresolvedFlowNode, + id: str, + targets: dict[str, Target], + triggers: dict[str, AnyTrigger], + expected: FlowNode, +) -> None: + assert unresolved_node.resolve(id, targets, triggers) == expected + + +@pytest.mark.parametrize( + ("unresolved_flow", "targets", "triggers", "expected"), + ( + ( + UnresolvedFlow( + nodes={ + "foo": UnresolvedFlowNode( + target=Target(commands="echo"), + trigger=Once(), + color=color, + ) + } + ), + {}, + {}, + Flow( + nodes={ + "foo": FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ) + } + ), + ), + ( + UnresolvedFlow( + nodes={ + "foo": UnresolvedFlowNode( + target="t", + args={"foo": "bar"}, + envs={"FOO": "BAR"}, + trigger="r", + color=color, + ) + }, + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, + ), + {"t": Target(commands="echo")}, + {"r": Restart()}, + Flow( + nodes={ + "foo": FlowNode( + id="foo", + target=Target(commands="echo"), + args={"foo": "bar"}, + envs={"FOO": "BAR"}, + trigger=Restart(), + color=color, + ) + }, + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, + ), + ), + ), +) +def test_resolve_flow( + unresolved_flow: UnresolvedFlow, + targets: dict[str, Target], + triggers: dict[str, AnyTrigger], + expected: Flow, +) -> None: + assert unresolved_flow.resolve(targets, triggers) == expected + + +@pytest.mark.parametrize( + ("config", "expected"), + ( + ( + Config( + flows={ + "flow": UnresolvedFlow( + nodes={ + "foo": UnresolvedFlowNode( + target="t", + args={"foo": "bar"}, + envs={"FOO": "BAR"}, + trigger="r", + color=color, + ) + }, + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, + ) + }, + targets={"t": Target(commands="echo")}, + triggers={"r": Restart()}, + ), + { + "flow": Flow( + nodes={ + "foo": FlowNode( + id="foo", + target=Target(commands="echo"), + args={"foo": "bar"}, + envs={"FOO": "BAR"}, + trigger=Restart(), + color=color, + ) + }, + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, + ), + }, + ), + ), +) +def test_resolve_config( + config: Config, + expected: dict[str, Flow], +) -> None: + assert config.resolve() == expected diff --git a/tests/test_execution.py b/tests/test_execution.py new file mode 100644 index 0000000..5e5a4e2 --- /dev/null +++ b/tests/test_execution.py @@ -0,0 +1,249 @@ +from asyncio import Queue +from pathlib import Path + +import pytest + +from synthesize.config import Envs, FlowNode, Target, random_color +from synthesize.execution import Execution +from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message + +color = random_color() + + +async def test_execution_lifecycle(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + assert await ex.wait() is ex + + assert ex.has_exited + + msg = await q.get() + + assert isinstance(msg, ExecutionStarted) + assert msg.node is node + assert msg.pid == ex.pid + + msg = await q.get() + + assert isinstance(msg, ExecutionOutput) + assert msg.node is node + assert msg.text == "hi" + + msg = await q.get() + + assert isinstance(msg, ExecutionCompleted) + assert msg.node is node + assert msg.pid == ex.pid + assert msg.exit_code == ex.exit_code == 0 + + +async def test_termination_before_completion(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="sleep 10 && echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + ex.terminate() + + assert await ex.wait() is ex + + assert ex.has_exited + + msg = await q.get() + + assert isinstance(msg, ExecutionStarted) + assert msg.node is node + assert msg.pid == ex.pid + + msg = await q.get() + + assert isinstance(msg, ExecutionCompleted) + assert msg.node is node + assert msg.pid == ex.pid + assert msg.exit_code == ex.exit_code == -15 + + +async def test_termination_after_completion(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + assert await ex.wait() is ex + + assert ex.has_exited + + ex.terminate() # noop + + +async def test_execution_kill(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="sleep 10 && echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + ex.kill() + + assert await ex.wait() is ex + + assert ex.has_exited + + msg = await q.get() + + assert isinstance(msg, ExecutionStarted) + assert msg.node is node + assert msg.pid == ex.pid + + msg = await q.get() + + assert isinstance(msg, ExecutionCompleted) + assert msg.node is node + assert msg.pid == ex.pid + assert msg.exit_code == ex.exit_code == -9 + + +async def test_kill_after_completion(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + assert await ex.wait() is ex + + assert ex.has_exited + + ex.kill() # noop + + +@pytest.mark.parametrize( + ("node", "envs", "expected"), + ( + ( + FlowNode( + id="foo", + target=Target(commands="echo $FORCE_COLOR"), + color=color, + ), + Envs(), + "1", + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $COLUMNS"), + color=color, + ), + Envs(), + "111", # set in test body below + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $SYNTH_NODE_ID"), + color=color, + ), + Envs(), + "foo", + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $FOO"), + envs=Envs({"FOO": "bar"}), + color=color, + ), + Envs(), + "bar", + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $FOO"), + color=color, + ), + Envs({"FOO": "baz"}), + "baz", + ), + ), +) +async def test_envs( + tmp_path: Path, + node: FlowNode, + envs: Envs, + expected: str, +) -> None: + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs=envs, + tmp_dir=tmp_path, + width=111, + events=q, + ) + + await ex.wait() + + await q.get() + msg = await q.get() + + assert isinstance(msg, ExecutionOutput) + assert msg.text == expected diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..15a2390 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,27 @@ +from time import monotonic + +import pytest + +from synthesize.utils import delay, md5 + + +async def test_delay() -> None: + async def fn() -> str: + return "hi" + + start = monotonic() + + assert await delay(0.1, fn) == "hi" + + assert monotonic() - start >= 0.1 + + +@pytest.mark.parametrize( + ("data", "expected"), + ( + (b"", "d41d8cd98f00b204e9800998ecf8427e"), + (b"hello", "5d41402abc4b2a76b9719d911017c592"), + ), +) +def test_md5(data: bytes, expected: str) -> None: + assert md5(data) == expected