From 24ef19f5e33122899c059fd2de001f4ae4886ef7 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Mon, 24 Jun 2024 21:15:34 -0500 Subject: [PATCH 01/21] move file creation to FlowNode --- synthesize/config.py | 31 +++++++++++++++++++++++++++++++ synthesize/execution.py | 32 +------------------------------- synthesize/utils.py | 5 +++++ 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/synthesize/config.py b/synthesize/config.py index 71c7066..260983a 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -1,8 +1,12 @@ from __future__ import annotations +import shlex +import shutil from colorsys import hsv_to_rgb +from functools import cached_property from pathlib import Path from random import random +from stat import S_IEXEC from textwrap import dedent from typing import Annotated, Literal, Union @@ -11,6 +15,7 @@ from rich.color import Color from synthesize.model import Model +from synthesize.utils import md5 def random_color() -> str: @@ -77,6 +82,32 @@ class FlowNode(Model): color: str + @cached_property + def file_name(self) -> str: + return f"{self.id}-{md5(self.model_dump_json().encode())}" + + def file_content(self) -> str: + exe, *args = shlex.split(self.target.executable) + which_exe = shutil.which(exe) + if which_exe is None: + raise Exception(f"Failed to find absolute path to executable for {exe}") + return "\n".join( + ( + f"#!{shlex.join((which_exe, *args))}", + "", + self.target.commands, + ) + ) + + def ensure_file_exists(self, tmp_dir: Path) -> Path: + path = tmp_dir / self.file_name + + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(self.file_content()) + path.chmod(path.stat().st_mode | S_IEXEC) + + return path + class TargetRef(Model): id: str diff --git a/synthesize/execution.py b/synthesize/execution.py index 9512543..950dc7c 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -1,31 +1,16 @@ from __future__ import annotations import os -import shlex -import shutil from asyncio import Queue, Task, create_task from asyncio.subprocess import PIPE, STDOUT, Process, create_subprocess_exec from dataclasses import dataclass, field -from functools import lru_cache -from hashlib import md5 from pathlib import Path from signal import SIGKILL, SIGTERM -from stat import S_IEXEC from synthesize.config import FlowNode from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message -@lru_cache(maxsize=2**10) -def file_name(node: FlowNode) -> str: - h = md5() - h.update(node.id.encode()) - h.update(node.target.executable.encode()) - h.update(node.target.commands.encode()) - - return f"{node.id}-{h.hexdigest()}" - - @dataclass(frozen=True) class Execution: node: FlowNode @@ -45,22 +30,7 @@ async def start( tmp_dir: Path, width: int = 80, ) -> Execution: - path = tmp_dir / file_name(node) - path.parent.mkdir(parents=True, exist_ok=True) - exe, *args = shlex.split(node.target.executable) - which_exe = shutil.which(exe) - if which_exe is None: - raise Exception(f"Failed to find absolute path to executable for {exe}") - path.write_text( - "\n".join( - ( - f"#! {shlex.join((which_exe, *args))}", - "", - node.target.commands, - ) - ) - ) - path.chmod(path.stat().st_mode | S_IEXEC) + path = node.ensure_file_exists(tmp_dir=tmp_dir) process = await create_subprocess_exec( program=path, diff --git a/synthesize/utils.py b/synthesize/utils.py index 6a1f1ef..40af9d9 100644 --- a/synthesize/utils.py +++ b/synthesize/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib from asyncio import Task, create_task, sleep from typing import Awaitable, Callable, Optional, TypeVar @@ -12,3 +13,7 @@ async def delayed() -> T: return await fn() return create_task(delayed(), name=name) + + +def md5(data: bytes) -> str: + return hashlib.md5(data).hexdigest() From bf6b3d7944c8f38fb3bc18fbfb45e93807ea6c40 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 00:51:15 -0500 Subject: [PATCH 02/21] args and envs --- examples/dag.yaml | 2 +- poetry.lock | 106 ++++++++++++++++++++++++++++------------ pyproject.toml | 3 ++ synthesize/cli.py | 22 +++++++-- synthesize/config.py | 89 ++++++++++++++++++++++++++------- synthesize/execution.py | 12 ++++- synthesize/utils.py | 43 +++++++++++++++- 7 files changed, 220 insertions(+), 57 deletions(-) diff --git a/examples/dag.yaml b/examples/dag.yaml index 1c67e9e..36f1b47 100644 --- a/examples/dag.yaml +++ b/examples/dag.yaml @@ -30,4 +30,4 @@ targets: sleep-and-echo: commands: | sleep 2 - echo "Hi!" + echo "Hi from {{ id }}!" diff --git a/poetry.lock b/poetry.lock index 9c42cb9..e50dc7e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -320,6 +320,48 @@ docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1 testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-asyncio (>=0.21)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)", "virtualenv (>=20.26.2)"] typing = ["typing-extensions (>=4.8)"] +[[package]] +name = "frozendict" +version = "2.4.4" +description = "A simple immutable dictionary" +optional = false +python-versions = ">=3.6" +files = [ + {file = "frozendict-2.4.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4a59578d47b3949437519b5c39a016a6116b9e787bb19289e333faae81462e59"}, + {file = "frozendict-2.4.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:12a342e439aef28ccec533f0253ea53d75fe9102bd6ea928ff530e76eac38906"}, + {file = "frozendict-2.4.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f79c26dff10ce11dad3b3627c89bb2e87b9dd5958c2b24325f16a23019b8b94"}, + {file = "frozendict-2.4.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:2bd009cf4fc47972838a91e9b83654dc9a095dc4f2bb3a37c3f3124c8a364543"}, + {file = "frozendict-2.4.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:87ebcde21565a14fe039672c25550060d6f6d88cf1f339beac094c3b10004eb0"}, + {file = "frozendict-2.4.4-cp310-cp310-win_amd64.whl", hash = "sha256:fefeb700bc7eb8b4c2dc48704e4221860d254c8989fb53488540bc44e44a1ac2"}, + {file = "frozendict-2.4.4-cp310-cp310-win_arm64.whl", hash = "sha256:4297d694eb600efa429769125a6f910ec02b85606f22f178bafbee309e7d3ec7"}, + {file = "frozendict-2.4.4-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:812ab17522ba13637826e65454115a914c2da538356e85f43ecea069813e4b33"}, + {file = "frozendict-2.4.4-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fee9420475bb6ff357000092aa9990c2f6182b2bab15764330f4ad7de2eae49"}, + {file = "frozendict-2.4.4-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:3148062675536724502c6344d7c485dd4667fdf7980ca9bd05e338ccc0c4471e"}, + {file = "frozendict-2.4.4-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:78c94991944dd33c5376f720228e5b252ee67faf3bac50ef381adc9e51e90d9d"}, + {file = "frozendict-2.4.4-cp36-cp36m-win_amd64.whl", hash = "sha256:1697793b5f62b416c0fc1d94638ec91ed3aa4ab277f6affa3a95216ecb3af170"}, + {file = "frozendict-2.4.4-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:199a4d32194f3afed6258de7e317054155bc9519252b568d9cfffde7e4d834e5"}, + {file = "frozendict-2.4.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:85375ec6e979e6373bffb4f54576a68bf7497c350861d20686ccae38aab69c0a"}, + {file = "frozendict-2.4.4-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:2d8536e068d6bf281f23fa835ac07747fb0f8851879dd189e9709f9567408b4d"}, + {file = "frozendict-2.4.4-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:259528ba6b56fa051bc996f1c4d8b57e30d6dd3bc2f27441891b04babc4b5e73"}, + {file = "frozendict-2.4.4-cp37-cp37m-win_amd64.whl", hash = "sha256:07c3a5dee8bbb84cba770e273cdbf2c87c8e035903af8f781292d72583416801"}, + {file = "frozendict-2.4.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6874fec816b37b6eb5795b00e0574cba261bf59723e2de607a195d5edaff0786"}, + {file = "frozendict-2.4.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c8f92425686323a950337da4b75b4c17a3327b831df8c881df24038d560640d4"}, + {file = "frozendict-2.4.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d58d9a8d9e49662c6dafbea5e641f97decdb3d6ccd76e55e79818415362ba25"}, + {file = "frozendict-2.4.4-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:93a7b19afb429cbf99d56faf436b45ef2fa8fe9aca89c49eb1610c3bd85f1760"}, + {file = "frozendict-2.4.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:2b70b431e3a72d410a2cdf1497b3aba2f553635e0c0f657ce311d841bf8273b6"}, + {file = "frozendict-2.4.4-cp38-cp38-win_amd64.whl", hash = "sha256:e1b941132d79ce72d562a13341d38fc217bc1ee24d8c35a20d754e79ff99e038"}, + {file = "frozendict-2.4.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:dc2228874eacae390e63fd4f2bb513b3144066a977dc192163c9f6c7f6de6474"}, + {file = "frozendict-2.4.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:63aa49f1919af7d45fb8fd5dec4c0859bc09f46880bd6297c79bb2db2969b63d"}, + {file = "frozendict-2.4.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c6bf9260018d653f3cab9bd147bd8592bf98a5c6e338be0491ced3c196c034a3"}, + {file = "frozendict-2.4.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:6eb716e6a6d693c03b1d53280a1947716129f5ef9bcdd061db5c17dea44b80fe"}, + {file = "frozendict-2.4.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:d13b4310db337f4d2103867c5a05090b22bc4d50ca842093779ef541ea9c9eea"}, + {file = "frozendict-2.4.4-cp39-cp39-win_amd64.whl", hash = "sha256:b3b967d5065872e27b06f785a80c0ed0a45d1f7c9b85223da05358e734d858ca"}, + {file = "frozendict-2.4.4-cp39-cp39-win_arm64.whl", hash = "sha256:4ae8d05c8d0b6134bfb6bfb369d5fa0c4df21eabb5ca7f645af95fdc6689678e"}, + {file = "frozendict-2.4.4-py311-none-any.whl", hash = "sha256:705efca8d74d3facbb6ace80ab3afdd28eb8a237bfb4063ed89996b024bc443d"}, + {file = "frozendict-2.4.4-py312-none-any.whl", hash = "sha256:d9647563e76adb05b7cde2172403123380871360a114f546b4ae1704510801e5"}, + {file = "frozendict-2.4.4.tar.gz", hash = "sha256:3f7c031b26e4ee6a3f786ceb5e3abf1181c4ade92dce1f847da26ea2c96008c7"}, +] + [[package]] name = "ghp-import" version = "2.1.0" @@ -353,13 +395,13 @@ colorama = ">=0.4" [[package]] name = "hypothesis" -version = "6.103.3" +version = "6.104.1" description = "A library for property-based testing" optional = false python-versions = ">=3.8" files = [ - {file = "hypothesis-6.103.3-py3-none-any.whl", hash = "sha256:870d82475cd11c33d75bf7bbdfd972df9d7ea9f79696ae50c3d766e8307467d9"}, - {file = "hypothesis-6.103.3.tar.gz", hash = "sha256:33ed1ec217a3e279914b2c0d9045e56f0d9d2785232ec7ed69ad080c839f86b5"}, + {file = "hypothesis-6.104.1-py3-none-any.whl", hash = "sha256:a0a898fa78ecaefe76ad248901dc274e598f29198c6015b3053f7f7827670e0e"}, + {file = "hypothesis-6.104.1.tar.gz", hash = "sha256:4033898019a6149823d2feeb8d214921b4ac2d342a05d6b02e40a3ca4be07eea"}, ] [package.dependencies] @@ -713,38 +755,38 @@ mkdocstrings = ">=0.25" [[package]] name = "mypy" -version = "1.10.0" +version = "1.10.1" description = "Optional static typing for Python" optional = false python-versions = ">=3.8" files = [ - {file = "mypy-1.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:da1cbf08fb3b851ab3b9523a884c232774008267b1f83371ace57f412fe308c2"}, - {file = "mypy-1.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:12b6bfc1b1a66095ab413160a6e520e1dc076a28f3e22f7fb25ba3b000b4ef99"}, - {file = "mypy-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9e36fb078cce9904c7989b9693e41cb9711e0600139ce3970c6ef814b6ebc2b2"}, - {file = "mypy-1.10.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:2b0695d605ddcd3eb2f736cd8b4e388288c21e7de85001e9f85df9187f2b50f9"}, - {file = "mypy-1.10.0-cp310-cp310-win_amd64.whl", hash = "sha256:cd777b780312ddb135bceb9bc8722a73ec95e042f911cc279e2ec3c667076051"}, - {file = "mypy-1.10.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:3be66771aa5c97602f382230165b856c231d1277c511c9a8dd058be4784472e1"}, - {file = "mypy-1.10.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8b2cbaca148d0754a54d44121b5825ae71868c7592a53b7292eeb0f3fdae95ee"}, - {file = "mypy-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1ec404a7cbe9fc0e92cb0e67f55ce0c025014e26d33e54d9e506a0f2d07fe5de"}, - {file = "mypy-1.10.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e22e1527dc3d4aa94311d246b59e47f6455b8729f4968765ac1eacf9a4760bc7"}, - {file = "mypy-1.10.0-cp311-cp311-win_amd64.whl", hash = "sha256:a87dbfa85971e8d59c9cc1fcf534efe664d8949e4c0b6b44e8ca548e746a8d53"}, - {file = "mypy-1.10.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:a781f6ad4bab20eef8b65174a57e5203f4be627b46291f4589879bf4e257b97b"}, - {file = "mypy-1.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b808e12113505b97d9023b0b5e0c0705a90571c6feefc6f215c1df9381256e30"}, - {file = "mypy-1.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f55583b12156c399dce2df7d16f8a5095291354f1e839c252ec6c0611e86e2e"}, - {file = "mypy-1.10.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:4cf18f9d0efa1b16478c4c129eabec36148032575391095f73cae2e722fcf9d5"}, - {file = "mypy-1.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:bc6ac273b23c6b82da3bb25f4136c4fd42665f17f2cd850771cb600bdd2ebeda"}, - {file = "mypy-1.10.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:9fd50226364cd2737351c79807775136b0abe084433b55b2e29181a4c3c878c0"}, - {file = "mypy-1.10.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f90cff89eea89273727d8783fef5d4a934be2fdca11b47def50cf5d311aff727"}, - {file = "mypy-1.10.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fcfc70599efde5c67862a07a1aaf50e55bce629ace26bb19dc17cece5dd31ca4"}, - {file = "mypy-1.10.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:075cbf81f3e134eadaf247de187bd604748171d6b79736fa9b6c9685b4083061"}, - {file = "mypy-1.10.0-cp38-cp38-win_amd64.whl", hash = "sha256:3f298531bca95ff615b6e9f2fc0333aae27fa48052903a0ac90215021cdcfa4f"}, - {file = "mypy-1.10.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fa7ef5244615a2523b56c034becde4e9e3f9b034854c93639adb667ec9ec2976"}, - {file = "mypy-1.10.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3236a4c8f535a0631f85f5fcdffba71c7feeef76a6002fcba7c1a8e57c8be1ec"}, - {file = "mypy-1.10.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4a2b5cdbb5dd35aa08ea9114436e0d79aceb2f38e32c21684dcf8e24e1e92821"}, - {file = "mypy-1.10.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:92f93b21c0fe73dc00abf91022234c79d793318b8a96faac147cd579c1671746"}, - {file = "mypy-1.10.0-cp39-cp39-win_amd64.whl", hash = "sha256:28d0e038361b45f099cc086d9dd99c15ff14d0188f44ac883010e172ce86c38a"}, - {file = "mypy-1.10.0-py3-none-any.whl", hash = "sha256:f8c083976eb530019175aabadb60921e73b4f45736760826aa1689dda8208aee"}, - {file = "mypy-1.10.0.tar.gz", hash = "sha256:3d087fcbec056c4ee34974da493a826ce316947485cef3901f511848e687c131"}, + {file = "mypy-1.10.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e36f229acfe250dc660790840916eb49726c928e8ce10fbdf90715090fe4ae02"}, + {file = "mypy-1.10.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:51a46974340baaa4145363b9e051812a2446cf583dfaeba124af966fa44593f7"}, + {file = "mypy-1.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:901c89c2d67bba57aaaca91ccdb659aa3a312de67f23b9dfb059727cce2e2e0a"}, + {file = "mypy-1.10.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0cd62192a4a32b77ceb31272d9e74d23cd88c8060c34d1d3622db3267679a5d9"}, + {file = "mypy-1.10.1-cp310-cp310-win_amd64.whl", hash = "sha256:a2cbc68cb9e943ac0814c13e2452d2046c2f2b23ff0278e26599224cf164e78d"}, + {file = "mypy-1.10.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:bd6f629b67bb43dc0d9211ee98b96d8dabc97b1ad38b9b25f5e4c4d7569a0c6a"}, + {file = "mypy-1.10.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a1bbb3a6f5ff319d2b9d40b4080d46cd639abe3516d5a62c070cf0114a457d84"}, + {file = "mypy-1.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8edd4e9bbbc9d7b79502eb9592cab808585516ae1bcc1446eb9122656c6066f"}, + {file = "mypy-1.10.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6166a88b15f1759f94a46fa474c7b1b05d134b1b61fca627dd7335454cc9aa6b"}, + {file = "mypy-1.10.1-cp311-cp311-win_amd64.whl", hash = "sha256:5bb9cd11c01c8606a9d0b83ffa91d0b236a0e91bc4126d9ba9ce62906ada868e"}, + {file = "mypy-1.10.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d8681909f7b44d0b7b86e653ca152d6dff0eb5eb41694e163c6092124f8246d7"}, + {file = "mypy-1.10.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:378c03f53f10bbdd55ca94e46ec3ba255279706a6aacaecac52ad248f98205d3"}, + {file = "mypy-1.10.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bacf8f3a3d7d849f40ca6caea5c055122efe70e81480c8328ad29c55c69e93e"}, + {file = "mypy-1.10.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:701b5f71413f1e9855566a34d6e9d12624e9e0a8818a5704d74d6b0402e66c04"}, + {file = "mypy-1.10.1-cp312-cp312-win_amd64.whl", hash = "sha256:3c4c2992f6ea46ff7fce0072642cfb62af7a2484efe69017ed8b095f7b39ef31"}, + {file = "mypy-1.10.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:604282c886497645ffb87b8f35a57ec773a4a2721161e709a4422c1636ddde5c"}, + {file = "mypy-1.10.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:37fd87cab83f09842653f08de066ee68f1182b9b5282e4634cdb4b407266bade"}, + {file = "mypy-1.10.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8addf6313777dbb92e9564c5d32ec122bf2c6c39d683ea64de6a1fd98b90fe37"}, + {file = "mypy-1.10.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5cc3ca0a244eb9a5249c7c583ad9a7e881aa5d7b73c35652296ddcdb33b2b9c7"}, + {file = "mypy-1.10.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b3a2ffce52cc4dbaeee4df762f20a2905aa171ef157b82192f2e2f368eec05d"}, + {file = "mypy-1.10.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fe85ed6836165d52ae8b88f99527d3d1b2362e0cb90b005409b8bed90e9059b3"}, + {file = "mypy-1.10.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c2ae450d60d7d020d67ab440c6e3fae375809988119817214440033f26ddf7bf"}, + {file = "mypy-1.10.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6be84c06e6abd72f960ba9a71561c14137a583093ffcf9bbfaf5e613d63fa531"}, + {file = "mypy-1.10.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2189ff1e39db399f08205e22a797383613ce1cb0cb3b13d8bcf0170e45b96cc3"}, + {file = "mypy-1.10.1-cp39-cp39-win_amd64.whl", hash = "sha256:97a131ee36ac37ce9581f4220311247ab6cba896b4395b9c87af0675a13a755f"}, + {file = "mypy-1.10.1-py3-none-any.whl", hash = "sha256:71d8ac0b906354ebda8ef1673e5fde785936ac1f29ff6987c7483cfbd5a4235a"}, + {file = "mypy-1.10.1.tar.gz", hash = "sha256:1f8f492d7db9e3593ef42d4f115f04e556130f2819ad33ab84551403e97dd4c0"}, ] [package.dependencies] @@ -1565,4 +1607,4 @@ anyio = ">=3.0.0" [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "c6030b7b1931e6cba1640714ba4a0881400b3f0a2d0445aa315301b2f99da675" +content-hash = "020dc21dc785e7e169ca706ec2f9acf0d8913ce51ef7763d2970df9edd6f17e9" diff --git a/pyproject.toml b/pyproject.toml index 3de0893..047510f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ classifiers = [ "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Software Development", "Topic :: Utilities", "Typing :: Typed", @@ -39,6 +40,8 @@ pyyaml = ">=6.0" networkx = ">=3.0" watchfiles = ">=0.18" identify = ">=2.5" +jinja2 = ">=3.1" +frozendict = ">=2.4" [tool.poetry.group.dev.dependencies] pre-commit = ">=3" diff --git a/synthesize/cli.py b/synthesize/cli.py index fc588b7..17a6918 100644 --- a/synthesize/cli.py +++ b/synthesize/cli.py @@ -38,7 +38,7 @@ def run( help="The path to the configuration file to execute.", ), dry: bool = Option( - False, + default=False, help="If enabled, do not run actually run the flow.", ), ) -> None: @@ -66,11 +66,25 @@ def run( ) ) - return - resolved = parsed_config.resolve() - controller = Orchestrator(flow=resolved[flow], console=console) + try: + selected_flow = resolved[flow] + except KeyError: + sep = "\n " + available_flows = sep + sep.join(resolved.keys()) + console.print( + Text( + f"No flow named '{flow}'. Available flows:{available_flows}", + style=Style(color="red"), + ) + ) + raise Exit(code=1) + + if dry: + return + + controller = Orchestrator(flow=selected_flow, console=console) try: asyncio.run(controller.run()) diff --git a/synthesize/config.py b/synthesize/config.py index 260983a..0a2beb0 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -10,12 +10,44 @@ from textwrap import dedent from typing import Annotated, Literal, Union +from frozendict import frozendict from identify.identify import tags_from_path +from jinja2 import Environment from pydantic import Field, field_validator from rich.color import Color from synthesize.model import Model -from synthesize.utils import md5 +from synthesize.utils import FrozenDict, md5 + +ArgValue = int | float | str | bool | None + +Args = Annotated[ + FrozenDict[ + Annotated[ + str, + Field( + # https://jinja.palletsprojects.com/en/3.1.x/api/#notes-on-identifiers + pattern=r"[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*", + min_length=1, + ), + ], + ArgValue, + ], + Field(default_factory=frozendict), +] +Envs = Annotated[ + FrozenDict[ + Annotated[ + str, + Field( + pattern=r"[A-Z_]*", + min_length=1, + ), + ], + str, + ], + Field(default_factory=frozendict), +] def random_color() -> str: @@ -27,6 +59,9 @@ def random_color() -> str: return triplet.hex +template_environment = Environment() + + class Target(Model): commands: str = Field(default="") executable: str = Field(default="sh -u") @@ -36,6 +71,24 @@ class Target(Model): def dedent_commands(cls, commands: str) -> str: return dedent(commands).strip() + def render(self, args: Args) -> str: + exe, *exe_args = shlex.split(self.executable) + which_exe = shutil.which(exe) + if which_exe is None: + raise Exception(f"Failed to find absolute path to executable for {exe}") + + template = template_environment.from_string( + "\n".join( + ( + f"#!{shlex.join((which_exe, *exe_args))}", + "", + self.commands, + ) + ) + ) + + return template.render(args) + class Once(Model): type: Literal["once"] = "once" @@ -44,7 +97,7 @@ class Once(Model): class After(Model): type: Literal["after"] = "after" - after: frozenset[str] = Field(default=...) + after: Annotated[frozenset[str], Field(min_length=1)] class Restart(Model): @@ -78,6 +131,9 @@ class FlowNode(Model): id: str target: Target + args: Args + envs: Envs + trigger: AnyTrigger color: str @@ -86,24 +142,18 @@ class FlowNode(Model): def file_name(self) -> str: return f"{self.id}-{md5(self.model_dump_json().encode())}" - def file_content(self) -> str: - exe, *args = shlex.split(self.target.executable) - which_exe = shutil.which(exe) - if which_exe is None: - raise Exception(f"Failed to find absolute path to executable for {exe}") - return "\n".join( - ( - f"#!{shlex.join((which_exe, *args))}", - "", - self.target.commands, - ) - ) - - def ensure_file_exists(self, tmp_dir: Path) -> Path: + def write_script(self, tmp_dir: Path) -> Path: path = tmp_dir / self.file_name path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(self.file_content()) + path.write_text( + self.target.render( + args=self.args + | { + "id": self.id, + } + ) + ) path.chmod(path.stat().st_mode | S_IEXEC) return path @@ -121,6 +171,9 @@ class UnresolvedFlowNode(Model): id: str target: Target | TargetRef + args: Args + envs: Envs + trigger: AnyTrigger | TriggerRef = Once() color: Annotated[str, Field(default_factory=random_color)] @@ -133,6 +186,8 @@ def resolve( return FlowNode( id=self.id, target=targets[self.target.id] if isinstance(self.target, TargetRef) else self.target, + args=self.args, + envs=self.envs, trigger=( triggers[self.trigger.id] if isinstance(self.trigger, TriggerRef) else self.trigger ), diff --git a/synthesize/execution.py b/synthesize/execution.py index 950dc7c..eab1527 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -30,13 +30,21 @@ async def start( tmp_dir: Path, width: int = 80, ) -> Execution: - path = node.ensure_file_exists(tmp_dir=tmp_dir) + path = node.write_script(tmp_dir=tmp_dir) process = await create_subprocess_exec( program=path, stdout=PIPE, stderr=STDOUT, - env={**os.environ, "FORCE_COLOR": "1", "COLUMNS": str(width)}, + env=os.environ + | node.envs + | { + "FORCE_COLOR": "1", + "COLUMNS": str(width), + } + | { + "SYNTH_NODE_ID": node.id, + }, preexec_fn=os.setsid, ) diff --git a/synthesize/utils.py b/synthesize/utils.py index 40af9d9..d6d8b4d 100644 --- a/synthesize/utils.py +++ b/synthesize/utils.py @@ -2,7 +2,21 @@ import hashlib from asyncio import Task, create_task, sleep -from typing import Awaitable, Callable, Optional, TypeVar +from typing import Annotated, Any, Awaitable, Callable, Optional, TypeVar + +from frozendict import frozendict +from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler +from pydantic.json_schema import JsonSchemaValue +from pydantic_core import CoreSchema +from pydantic_core.core_schema import ( + chain_schema, + dict_schema, + is_instance_schema, + json_or_python_schema, + no_info_plain_validator_function, + plain_serializer_function_ser_schema, + union_schema, +) T = TypeVar("T") @@ -17,3 +31,30 @@ async def delayed() -> T: def md5(data: bytes) -> str: return hashlib.md5(data).hexdigest() + + +# https://github.com/pydantic/pydantic/discussions/8721 +class _PydanticFrozenDictAnnotation: + @classmethod + def __get_pydantic_core_schema__( + cls, _source_type: Any, _handler: GetCoreSchemaHandler + ) -> CoreSchema: + from_dict_schema = chain_schema( + [dict_schema(), no_info_plain_validator_function(frozendict)] + ) + return json_or_python_schema( + json_schema=from_dict_schema, + python_schema=union_schema([is_instance_schema(frozendict), from_dict_schema]), + serialization=plain_serializer_function_ser_schema(dict), + ) + + @classmethod + def __get_pydantic_json_schema__( + cls, _: CoreSchema, handler: GetJsonSchemaHandler + ) -> JsonSchemaValue: + return handler(dict_schema()) + + +_K = TypeVar("_K") +_V = TypeVar("_V") +FrozenDict = Annotated[frozendict[_K, _V], _PydanticFrozenDictAnnotation] From d9fd88843920e1ce7903772ce05053a5140b9ff6 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 00:53:21 -0500 Subject: [PATCH 03/21] rename --- synthesize/config.py | 2 +- synthesize/model.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synthesize/config.py b/synthesize/config.py index 0a2beb0..b3a28bb 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -216,7 +216,7 @@ def from_file(cls, file: Path) -> Config: tags = tags_from_path(str(file)) if "yaml" in tags: - return cls.parse_yaml(file.read_text()) + return cls.model_validate_yaml(file.read_text()) else: raise NotImplementedError("Currently, only YAML files are supported.") diff --git a/synthesize/model.py b/synthesize/model.py index 51c0087..1a2572d 100644 --- a/synthesize/model.py +++ b/synthesize/model.py @@ -17,5 +17,5 @@ class Model(BaseModel): ) @classmethod - def parse_yaml(cls: Type[C], y: str) -> C: + def model_validate_yaml(cls: Type[C], y: str) -> C: return cls.model_validate(yaml.safe_load(y)) From ba5463049a9941ac1cde6496623240f47b0c3054 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 01:06:10 -0500 Subject: [PATCH 04/21] less restrictive --- synthesize/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synthesize/config.py b/synthesize/config.py index b3a28bb..e4a596c 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -40,7 +40,6 @@ Annotated[ str, Field( - pattern=r"[A-Z_]*", min_length=1, ), ], From 0e5b03dcc4e3f3277ecee17167f8eb5fa55c9b5a Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 01:33:56 -0500 Subject: [PATCH 05/21] collapse format --- examples/dag.yaml | 51 +++++++++++++++++++------------------- synth.yaml | 31 +++++++++++------------ synthesize/config.py | 31 +++++++++-------------- synthesize/orchestrator.py | 2 +- synthesize/renderer.py | 4 +-- synthesize/state.py | 40 ++++++++++++++---------------- 6 files changed, 72 insertions(+), 87 deletions(-) diff --git a/examples/dag.yaml b/examples/dag.yaml index 36f1b47..fb4f9c2 100644 --- a/examples/dag.yaml +++ b/examples/dag.yaml @@ -1,30 +1,29 @@ flows: - dag: - nodes: - - id: 1 - target: - id: sleep-and-echo - - id: 2 - target: - id: sleep-and-echo - - id: 3 - target: - id: sleep-and-echo - - id: 4 - target: - id: sleep-and-echo - trigger: - after: ["1", "2"] - - id: 5 - target: - id: sleep-and-echo - trigger: - after: ["3"] - - id: 6 - target: - id: sleep-and-echo - trigger: - after: ["4", "5"] + default: + 1: + target: + id: sleep-and-echo + 2: + target: + id: sleep-and-echo + 3: + target: + id: sleep-and-echo + 4: + target: + id: sleep-and-echo + trigger: + after: ["1", "2"] + 5: + target: + id: sleep-and-echo + trigger: + after: ["3"] + 6: + target: + id: sleep-and-echo + trigger: + after: ["4", "5"] targets: sleep-and-echo: diff --git a/synth.yaml b/synth.yaml index 25fae8f..753dad1 100644 --- a/synth.yaml +++ b/synth.yaml @@ -1,21 +1,20 @@ flows: default: - nodes: - - id: tests - target: - id: tests - trigger: - id: code-changes - - id: types - target: - id: types - trigger: - id: code-changes - - id: docs - target: - id: docs - trigger: - type: restart + tests: + target: + id: tests + trigger: + id: code-changes + types: + target: + id: types + trigger: + id: code-changes + docs: + target: + id: docs + trigger: + type: restart targets: tests: diff --git a/synthesize/config.py b/synthesize/config.py index e4a596c..c73832f 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -3,7 +3,6 @@ import shlex import shutil from colorsys import hsv_to_rgb -from functools import cached_property from pathlib import Path from random import random from stat import S_IEXEC @@ -137,12 +136,8 @@ class FlowNode(Model): color: str - @cached_property - def file_name(self) -> str: - return f"{self.id}-{md5(self.model_dump_json().encode())}" - def write_script(self, tmp_dir: Path) -> Path: - path = tmp_dir / self.file_name + path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}" path.parent.mkdir(parents=True, exist_ok=True) path.write_text( @@ -167,8 +162,6 @@ class TriggerRef(Model): class UnresolvedFlowNode(Model): - id: str - target: Target | TargetRef args: Args envs: Envs @@ -179,11 +172,12 @@ class UnresolvedFlowNode(Model): def resolve( self, + id: str, targets: dict[str, Target], triggers: dict[str, AnyTrigger], ) -> FlowNode: return FlowNode( - id=self.id, + id=id, target=targets[self.target.id] if isinstance(self.target, TargetRef) else self.target, args=self.args, envs=self.envs, @@ -194,15 +188,8 @@ def resolve( ) -class Flow(Model): - nodes: tuple[FlowNode, ...] - - -class UnresolvedFlow(Model): - nodes: tuple[UnresolvedFlowNode, ...] - - def resolve(self, targets: dict[str, Target], triggers: dict[str, AnyTrigger]) -> Flow: - return Flow(nodes=tuple(node.resolve(targets, triggers) for node in self.nodes)) +UnresolvedFlow = dict[str, UnresolvedFlowNode] +Flow = dict[str, FlowNode] class Config(Model): @@ -220,4 +207,10 @@ def from_file(cls, file: Path) -> Config: raise NotImplementedError("Currently, only YAML files are supported.") def resolve(self) -> dict[str, Flow]: - return {id: flow.resolve(self.targets, self.triggers) for id, flow in self.flows.items()} + return { + flow_id: { + node_id: node.resolve(node_id, self.targets, self.triggers) + for node_id, node in flow.items() + } + for flow_id, flow in self.flows.items() + } diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 3246b2d..b23db3f 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -136,7 +136,7 @@ async def start() -> None: await start() async def start_watchers(self) -> None: - for node in self.flow.nodes: + for node in self.flow.values(): if isinstance(node.trigger, Watch): self.watchers[node.id] = create_task( watch( diff --git a/synthesize/renderer.py b/synthesize/renderer.py index c9eef00..767e741 100644 --- a/synthesize/renderer.py +++ b/synthesize/renderer.py @@ -117,13 +117,11 @@ def handle_lifecycle_message( match message: case ExecutionStarted(node=node, pid=pid): parts = ( - "Node ", (node.id, node.color), f" started (pid {pid})", ) case ExecutionCompleted(node=node, pid=pid, exit_code=exit_code): parts = ( - "Node ", (node.id, node.color), f" (pid {pid}) exited with code ", (str(exit_code), "green" if exit_code == 0 else "red"), @@ -134,7 +132,7 @@ def handle_lifecycle_message( ) parts = ( - "Running node ", + "Running ", (node.id, node.color), " due to detected changes: ", changes, diff --git a/synthesize/state.py b/synthesize/state.py index 27a1c0f..b325151 100644 --- a/synthesize/state.py +++ b/synthesize/state.py @@ -18,68 +18,64 @@ class FlowNodeStatus(Enum): @dataclass(frozen=True) class FlowState: graph: DiGraph - id_to_node: dict[str, FlowNode] - id_to_status: dict[str, FlowNodeStatus] + flow: Flow + statuses: dict[str, FlowNodeStatus] @classmethod def from_flow(cls, flow: Flow) -> FlowState: - id_to_node = {node.id: node for node in flow.nodes} - graph = DiGraph() - for id, node in id_to_node.items(): - graph.add_node(node.id) + for id, node in flow.items(): + graph.add_node(id) if isinstance(node.trigger, After): for predecessor_id in node.trigger.after: graph.add_edge(predecessor_id, id) return FlowState( graph=graph, - id_to_node={id: node for id, node in id_to_node.items()}, - id_to_status={id: FlowNodeStatus.Pending for id in graph.nodes}, + flow=flow, + statuses={id: FlowNodeStatus.Pending for id in graph.nodes}, ) def running_nodes(self) -> set[FlowNode]: return { - self.id_to_node[id] - for id, status in self.id_to_status.items() + self.flow[id] + for id, status in self.statuses.items() if status is FlowNodeStatus.Running } def ready_nodes(self) -> set[FlowNode]: return { - self.id_to_node[id] + self.flow[id] for id in self.graph.nodes - if self.id_to_status[id] is FlowNodeStatus.Pending - and all( - self.id_to_status[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id) - ) + if self.statuses[id] is FlowNodeStatus.Pending + and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)) } def mark_success(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Succeeded + self.statuses[node.id] = FlowNodeStatus.Succeeded def mark_failure(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Failed + self.statuses[node.id] = FlowNodeStatus.Failed def mark_pending(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Pending + self.statuses[node.id] = FlowNodeStatus.Pending def mark_descendants_pending(self, node: FlowNode) -> None: for t in _descendants(self.graph, {node.id}): - self.id_to_status[t] = FlowNodeStatus.Pending + self.statuses[t] = FlowNodeStatus.Pending def mark_running(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Running + self.statuses[node.id] = FlowNodeStatus.Running def all_done(self) -> bool: - return all(status is FlowNodeStatus.Succeeded for status in self.id_to_status.values()) + return all(status is FlowNodeStatus.Succeeded for status in self.statuses.values()) def num_nodes(self) -> int: return len(self.graph) def nodes(self) -> set[FlowNode]: - return set(self.id_to_node.values()) + return set(self.flow.values()) def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]: From c1cf3ea3abceb2672979fbab85f845c1411f8b79 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:05:04 -0500 Subject: [PATCH 06/21] more flattening --- examples/dag.yaml | 18 ++++++------------ synth.yaml | 15 +++++---------- synthesize/config.py | 18 ++++-------------- 3 files changed, 15 insertions(+), 36 deletions(-) diff --git a/examples/dag.yaml b/examples/dag.yaml index fb4f9c2..d8cdc5e 100644 --- a/examples/dag.yaml +++ b/examples/dag.yaml @@ -1,27 +1,21 @@ flows: default: 1: - target: - id: sleep-and-echo + target: sleep-and-echo 2: - target: - id: sleep-and-echo + target: sleep-and-echo 3: - target: - id: sleep-and-echo + target: sleep-and-echo 4: - target: - id: sleep-and-echo + target: sleep-and-echo trigger: after: ["1", "2"] 5: - target: - id: sleep-and-echo + target: sleep-and-echo trigger: after: ["3"] 6: - target: - id: sleep-and-echo + target: sleep-and-echo trigger: after: ["4", "5"] diff --git a/synth.yaml b/synth.yaml index 753dad1..812de2a 100644 --- a/synth.yaml +++ b/synth.yaml @@ -1,18 +1,13 @@ flows: default: tests: - target: - id: tests - trigger: - id: code-changes + target: tests + trigger: code-changes types: - target: - id: types - trigger: - id: code-changes + target: types + trigger: code-changes docs: - target: - id: docs + target: docs trigger: type: restart diff --git a/synthesize/config.py b/synthesize/config.py index c73832f..aecd53a 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -153,20 +153,12 @@ def write_script(self, tmp_dir: Path) -> Path: return path -class TargetRef(Model): - id: str - - -class TriggerRef(Model): - id: str - - class UnresolvedFlowNode(Model): - target: Target | TargetRef + target: Target | str args: Args envs: Envs - trigger: AnyTrigger | TriggerRef = Once() + trigger: AnyTrigger | str = Once() color: Annotated[str, Field(default_factory=random_color)] @@ -178,12 +170,10 @@ def resolve( ) -> FlowNode: return FlowNode( id=id, - target=targets[self.target.id] if isinstance(self.target, TargetRef) else self.target, + target=targets[self.target] if isinstance(self.target, str) else self.target, args=self.args, envs=self.envs, - trigger=( - triggers[self.trigger.id] if isinstance(self.trigger, TriggerRef) else self.trigger - ), + trigger=(triggers[self.trigger] if isinstance(self.trigger, str) else self.trigger), color=self.color, ) From cf22ab3e8cc5fbcf06feb0facd64625711e7c119 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:19:14 -0500 Subject: [PATCH 07/21] change structure again --- examples/dag.yaml | 37 +++++++++++++++++++------------------ synth.yaml | 21 +++++++++++---------- synthesize/config.py | 36 ++++++++++++++++++++++++++++-------- synthesize/execution.py | 10 +++++++--- synthesize/orchestrator.py | 3 ++- synthesize/state.py | 8 ++++---- 6 files changed, 71 insertions(+), 44 deletions(-) diff --git a/examples/dag.yaml b/examples/dag.yaml index d8cdc5e..3a98ff4 100644 --- a/examples/dag.yaml +++ b/examples/dag.yaml @@ -1,23 +1,24 @@ flows: default: - 1: - target: sleep-and-echo - 2: - target: sleep-and-echo - 3: - target: sleep-and-echo - 4: - target: sleep-and-echo - trigger: - after: ["1", "2"] - 5: - target: sleep-and-echo - trigger: - after: ["3"] - 6: - target: sleep-and-echo - trigger: - after: ["4", "5"] + nodes: + 1: + target: sleep-and-echo + 2: + target: sleep-and-echo + 3: + target: sleep-and-echo + 4: + target: sleep-and-echo + trigger: + after: ["1", "2"] + 5: + target: sleep-and-echo + trigger: + after: ["3"] + 6: + target: sleep-and-echo + trigger: + after: ["4", "5"] targets: sleep-and-echo: diff --git a/synth.yaml b/synth.yaml index 812de2a..afe6e03 100644 --- a/synth.yaml +++ b/synth.yaml @@ -1,15 +1,16 @@ flows: default: - tests: - target: tests - trigger: code-changes - types: - target: types - trigger: code-changes - docs: - target: docs - trigger: - type: restart + nodes: + tests: + target: tests + trigger: code-changes + types: + target: types + trigger: code-changes + docs: + target: docs + trigger: + type: restart targets: tests: diff --git a/synthesize/config.py b/synthesize/config.py index aecd53a..bb00c10 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -136,13 +136,14 @@ class FlowNode(Model): color: str - def write_script(self, tmp_dir: Path) -> Path: + def write_script(self, tmp_dir: Path, flow_args: Args) -> Path: path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}" path.parent.mkdir(parents=True, exist_ok=True) path.write_text( self.target.render( - args=self.args + args=flow_args + | self.args | { "id": self.id, } @@ -178,8 +179,30 @@ def resolve( ) -UnresolvedFlow = dict[str, UnresolvedFlowNode] -Flow = dict[str, FlowNode] +class Flow(Model): + nodes: dict[str, FlowNode] + args: Args = Field(default_factory=frozendict) + envs: Envs = Field(default_factory=frozendict) + + +class UnresolvedFlow(Model): + nodes: dict[str, UnresolvedFlowNode] + args: Args = Field(default_factory=frozendict) + envs: Envs = Field(default_factory=frozendict) + + def resolve( + self, + targets: dict[str, Target], + triggers: dict[str, AnyTrigger], + ) -> Flow: + return Flow( + nodes={ + node_id: node.resolve(node_id, targets, triggers) + for node_id, node in self.nodes.items() + }, + args=self.args, + envs=self.envs, + ) class Config(Model): @@ -198,9 +221,6 @@ def from_file(cls, file: Path) -> Config: def resolve(self) -> dict[str, Flow]: return { - flow_id: { - node_id: node.resolve(node_id, self.targets, self.triggers) - for node_id, node in flow.items() - } + flow_id: flow.resolve(self.targets, self.triggers) for flow_id, flow in self.flows.items() } diff --git a/synthesize/execution.py b/synthesize/execution.py index eab1527..00bac77 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -7,7 +7,9 @@ from pathlib import Path from signal import SIGKILL, SIGTERM -from synthesize.config import FlowNode +from frozendict import frozendict + +from synthesize.config import Flow, FlowNode from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message @@ -26,17 +28,19 @@ class Execution: async def start( cls, node: FlowNode, + flow: Flow, events: Queue[Message], tmp_dir: Path, width: int = 80, ) -> Execution: - path = node.write_script(tmp_dir=tmp_dir) + path = node.write_script(tmp_dir=tmp_dir, flow_args=flow.args) process = await create_subprocess_exec( program=path, stdout=PIPE, stderr=STDOUT, - env=os.environ + env=frozendict(os.environ) + | flow.envs | node.envs | { "FORCE_COLOR": "1", diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index b23db3f..764d042 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -122,6 +122,7 @@ async def start_ready_targets(self, tmp_dir: Path) -> None: async def start() -> None: e = await Execution.start( node=ready_node, + flow=self.flow, events=self.inbox, width=self.console.width - self.renderer.prefix_width, tmp_dir=tmp_dir, @@ -136,7 +137,7 @@ async def start() -> None: await start() async def start_watchers(self) -> None: - for node in self.flow.values(): + for node in self.flow.nodes.values(): if isinstance(node.trigger, Watch): self.watchers[node.id] = create_task( watch( diff --git a/synthesize/state.py b/synthesize/state.py index b325151..69f5475 100644 --- a/synthesize/state.py +++ b/synthesize/state.py @@ -25,7 +25,7 @@ class FlowState: def from_flow(cls, flow: Flow) -> FlowState: graph = DiGraph() - for id, node in flow.items(): + for id, node in flow.nodes.items(): graph.add_node(id) if isinstance(node.trigger, After): for predecessor_id in node.trigger.after: @@ -39,14 +39,14 @@ def from_flow(cls, flow: Flow) -> FlowState: def running_nodes(self) -> set[FlowNode]: return { - self.flow[id] + self.flow.nodes[id] for id, status in self.statuses.items() if status is FlowNodeStatus.Running } def ready_nodes(self) -> set[FlowNode]: return { - self.flow[id] + self.flow.nodes[id] for id in self.graph.nodes if self.statuses[id] is FlowNodeStatus.Pending and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)) @@ -75,7 +75,7 @@ def num_nodes(self) -> int: return len(self.graph) def nodes(self) -> set[FlowNode]: - return set(self.flow.values()) + return set(self.flow.nodes.values()) def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]: From 093253356cb6b9d8390a05740dba0173224b01cf Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:42:25 -0500 Subject: [PATCH 08/21] more flexible arg values --- synthesize/config.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synthesize/config.py b/synthesize/config.py index bb00c10..3062d25 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -18,8 +18,6 @@ from synthesize.model import Model from synthesize.utils import FrozenDict, md5 -ArgValue = int | float | str | bool | None - Args = Annotated[ FrozenDict[ Annotated[ @@ -30,7 +28,7 @@ min_length=1, ), ], - ArgValue, + object, ], Field(default_factory=frozendict), ] From f902416f072f4e7eebaef39f42689d44c11eba97 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:45:15 -0500 Subject: [PATCH 09/21] reorg --- synthesize/config.py | 4 ++-- synthesize/execution.py | 9 +++++---- synthesize/orchestrator.py | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/synthesize/config.py b/synthesize/config.py index 3062d25..da13988 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -134,13 +134,13 @@ class FlowNode(Model): color: str - def write_script(self, tmp_dir: Path, flow_args: Args) -> Path: + def write_script(self, tmp_dir: Path, args: Args) -> Path: path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}" path.parent.mkdir(parents=True, exist_ok=True) path.write_text( self.target.render( - args=flow_args + args=args | self.args | { "id": self.id, diff --git a/synthesize/execution.py b/synthesize/execution.py index 00bac77..2a85a0b 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -9,7 +9,7 @@ from frozendict import frozendict -from synthesize.config import Flow, FlowNode +from synthesize.config import Args, Envs, FlowNode from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message @@ -28,19 +28,20 @@ class Execution: async def start( cls, node: FlowNode, - flow: Flow, + args: Args, + envs: Envs, events: Queue[Message], tmp_dir: Path, width: int = 80, ) -> Execution: - path = node.write_script(tmp_dir=tmp_dir, flow_args=flow.args) + path = node.write_script(tmp_dir=tmp_dir, args=args) process = await create_subprocess_exec( program=path, stdout=PIPE, stderr=STDOUT, env=frozendict(os.environ) - | flow.envs + | envs | node.envs | { "FORCE_COLOR": "1", diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 764d042..2334998 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -122,7 +122,8 @@ async def start_ready_targets(self, tmp_dir: Path) -> None: async def start() -> None: e = await Execution.start( node=ready_node, - flow=self.flow, + args=self.flow.args, + envs=self.flow.envs, events=self.inbox, width=self.console.width - self.renderer.prefix_width, tmp_dir=tmp_dir, From eef6906c7775cfc8de57411599fbf9c2485d2c51 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:46:12 -0500 Subject: [PATCH 10/21] remove unnecessary field --- synthesize/orchestrator.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 2334998..203de15 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -35,7 +35,6 @@ def __init__(self, flow: Flow, console: Console): self.inbox: Queue[Message] = Queue() self.executions: dict[str, Execution] = {} - self.waiters: dict[str, Task[Execution]] = {} self.watchers: dict[str, Task[None]] = {} self.heartbeat: Task[None] | None = None @@ -129,7 +128,6 @@ async def start() -> None: tmp_dir=tmp_dir, ) self.executions[ready_node.id] = e - self.waiters[ready_node.id] = create_task(e.wait()) # When restarting after first execution, delay if isinstance(ready_node.trigger, Restart) and ready_node.id in self.executions: From 1cce1b09f12eb86b9b9c0b0527e6c4a14ee15a13 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:51:23 -0500 Subject: [PATCH 11/21] move things around --- synthesize/config.py | 20 +------------------- synthesize/execution.py | 26 +++++++++++++++++++++++--- synthesize/orchestrator.py | 4 ++-- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/synthesize/config.py b/synthesize/config.py index da13988..439520b 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -5,7 +5,6 @@ from colorsys import hsv_to_rgb from pathlib import Path from random import random -from stat import S_IEXEC from textwrap import dedent from typing import Annotated, Literal, Union @@ -16,7 +15,7 @@ from rich.color import Color from synthesize.model import Model -from synthesize.utils import FrozenDict, md5 +from synthesize.utils import FrozenDict Args = Annotated[ FrozenDict[ @@ -134,23 +133,6 @@ class FlowNode(Model): color: str - def write_script(self, tmp_dir: Path, args: Args) -> Path: - path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}" - - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text( - self.target.render( - args=args - | self.args - | { - "id": self.id, - } - ) - ) - path.chmod(path.stat().st_mode | S_IEXEC) - - return path - class UnresolvedFlowNode(Model): target: Target | str diff --git a/synthesize/execution.py b/synthesize/execution.py index 2a85a0b..6a75589 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -6,11 +6,31 @@ from dataclasses import dataclass, field from pathlib import Path from signal import SIGKILL, SIGTERM +from stat import S_IEXEC from frozendict import frozendict from synthesize.config import Args, Envs, FlowNode from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message +from synthesize.utils import md5 + + +def write_script(node: FlowNode, args: Args, tmp_dir: Path) -> Path: + path = tmp_dir / f"{node.id}-{md5(node.model_dump_json().encode())}" + + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + node.target.render( + args=args + | node.args + | { + "id": node.id, + } + ) + ) + path.chmod(path.stat().st_mode | S_IEXEC) + + return path @dataclass(frozen=True) @@ -30,11 +50,11 @@ async def start( node: FlowNode, args: Args, envs: Envs, - events: Queue[Message], tmp_dir: Path, - width: int = 80, + width: int, + events: Queue[Message], ) -> Execution: - path = node.write_script(tmp_dir=tmp_dir, args=args) + path = write_script(node=node, args=args, tmp_dir=tmp_dir) process = await create_subprocess_exec( program=path, diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 203de15..937fec5 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -123,9 +123,9 @@ async def start() -> None: node=ready_node, args=self.flow.args, envs=self.flow.envs, - events=self.inbox, - width=self.console.width - self.renderer.prefix_width, tmp_dir=tmp_dir, + width=self.console.width - self.renderer.prefix_width, + events=self.inbox, ) self.executions[ready_node.id] = e From 2deb57a46a8be6c0f5f896030d3933ffa0bd9c72 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:52:08 -0500 Subject: [PATCH 12/21] drop field --- synthesize/execution.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synthesize/execution.py b/synthesize/execution.py index 6a75589..a6d8b08 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -42,8 +42,6 @@ class Execution: process: Process reader: Task[None] - width: int - @classmethod async def start( cls, @@ -89,7 +87,6 @@ async def start( events=events, process=process, reader=reader, - width=width, ) @property From 34ab8eb1b0cec9b77d6e179b09f2da103054c686 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 18:55:14 -0500 Subject: [PATCH 13/21] restore waiters --- synthesize/orchestrator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 937fec5..457260f 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -35,6 +35,7 @@ def __init__(self, flow: Flow, console: Console): self.inbox: Queue[Message] = Queue() self.executions: dict[str, Execution] = {} + self.waiters: dict[str, Task[Execution]] = {} self.watchers: dict[str, Task[None]] = {} self.heartbeat: Task[None] | None = None @@ -128,6 +129,7 @@ async def start() -> None: events=self.inbox, ) self.executions[ready_node.id] = e + self.waiters[ready_node.id] = create_task(e.wait()) # When restarting after first execution, delay if isinstance(ready_node.trigger, Restart) and ready_node.id in self.executions: From 59717458b7a4df95a72aaacdae46bc3d02f7179d Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 22:19:10 -0500 Subject: [PATCH 14/21] config tests --- pyproject.toml | 1 + synth.yaml | 3 +- synthesize/config.py | 76 ++++++----- tests/test_config.py | 294 ++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 331 insertions(+), 43 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 047510f..caf9d93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,7 @@ warn_redundant_casts = true ignore_missing_imports = true +plugins = "pydantic.mypy" [tool.ruff] line-length = 120 diff --git a/synth.yaml b/synth.yaml index afe6e03..560406f 100644 --- a/synth.yaml +++ b/synth.yaml @@ -15,7 +15,7 @@ flows: targets: tests: commands: | - pytest --cov + pytest -vv --cov types: commands: | @@ -32,3 +32,4 @@ triggers: - synthesize/ - tests/ - examples/ + - pyproject.toml diff --git a/synthesize/config.py b/synthesize/config.py index 439520b..f0c856c 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -2,6 +2,7 @@ import shlex import shutil +from collections.abc import Mapping from colorsys import hsv_to_rgb from pathlib import Path from random import random @@ -17,31 +18,25 @@ from synthesize.model import Model from synthesize.utils import FrozenDict -Args = Annotated[ - FrozenDict[ - Annotated[ - str, - Field( - # https://jinja.palletsprojects.com/en/3.1.x/api/#notes-on-identifiers - pattern=r"[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*", - min_length=1, - ), - ], - object, +Args = FrozenDict[ + Annotated[ + str, + Field( + # https://jinja.palletsprojects.com/en/3.1.x/api/#notes-on-identifiers + pattern=r"[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*", + min_length=1, + ), ], - Field(default_factory=frozendict), + object, ] -Envs = Annotated[ - FrozenDict[ - Annotated[ - str, - Field( - min_length=1, - ), - ], +Envs = FrozenDict[ + Annotated[ str, + Field( + min_length=1, + ), ], - Field(default_factory=frozendict), + str, ] @@ -58,8 +53,8 @@ def random_color() -> str: class Target(Model): - commands: str = Field(default="") - executable: str = Field(default="sh -u") + commands: str = "" + executable: str = "sh -u" @field_validator("commands") @classmethod @@ -101,11 +96,10 @@ class Restart(Model): delay: Annotated[ float, Field( - default=1, description="The delay before restarting the command after it exits.", ge=0, ), - ] + ] = 1 class Watch(Model): @@ -126,8 +120,8 @@ class FlowNode(Model): id: str target: Target - args: Args - envs: Envs + args: Args = frozendict() + envs: Envs = frozendict() trigger: AnyTrigger @@ -136,8 +130,8 @@ class FlowNode(Model): class UnresolvedFlowNode(Model): target: Target | str - args: Args - envs: Envs + args: Args = frozendict() + envs: Envs = frozendict() trigger: AnyTrigger | str = Once() @@ -146,8 +140,8 @@ class UnresolvedFlowNode(Model): def resolve( self, id: str, - targets: dict[str, Target], - triggers: dict[str, AnyTrigger], + targets: Mapping[str, Target], + triggers: Mapping[str, AnyTrigger], ) -> FlowNode: return FlowNode( id=id, @@ -161,19 +155,19 @@ def resolve( class Flow(Model): nodes: dict[str, FlowNode] - args: Args = Field(default_factory=frozendict) - envs: Envs = Field(default_factory=frozendict) + args: Args = frozendict() + envs: Envs = frozendict() class UnresolvedFlow(Model): nodes: dict[str, UnresolvedFlowNode] - args: Args = Field(default_factory=frozendict) - envs: Envs = Field(default_factory=frozendict) + args: Args = frozendict() + envs: Envs = frozendict() def resolve( self, - targets: dict[str, Target], - triggers: dict[str, AnyTrigger], + targets: Mapping[str, Target], + triggers: Mapping[str, AnyTrigger], ) -> Flow: return Flow( nodes={ @@ -186,9 +180,9 @@ def resolve( class Config(Model): - targets: Annotated[dict[str, Target], Field(default_factory=dict)] - triggers: Annotated[dict[str, AnyTrigger], Field(default_factory=dict)] - flows: Annotated[dict[str, UnresolvedFlow], Field(default_factory=dict)] + flows: FrozenDict[str, UnresolvedFlow] = frozendict() + targets: FrozenDict[str, Target] = frozendict() + triggers: FrozenDict[str, AnyTrigger] = frozendict() @classmethod def from_file(cls, file: Path) -> Config: @@ -199,7 +193,7 @@ def from_file(cls, file: Path) -> Config: else: raise NotImplementedError("Currently, only YAML files are supported.") - def resolve(self) -> dict[str, Flow]: + def resolve(self) -> Mapping[str, Flow]: return { flow_id: flow.resolve(self.targets, self.triggers) for flow_id, flow in self.flows.items() diff --git a/tests/test_config.py b/tests/test_config.py index 1ecab05..026b313 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,9 +1,23 @@ +import shutil from pathlib import Path import pytest +from frozendict import frozendict from rich.style import Style -from synthesize.config import Config, random_color +from synthesize.config import ( + AnyTrigger, + Args, + Config, + Flow, + FlowNode, + Once, + Restart, + Target, + UnresolvedFlow, + UnresolvedFlowNode, + random_color, +) @pytest.mark.parametrize("example", list((Path(__file__).parent.parent / "examples").iterdir())) @@ -13,3 +27,281 @@ def test_config_examples_parse(example: Path) -> None: def test_can_make_style_from_random_color() -> None: assert Style(color=random_color()) + + +@pytest.mark.parametrize( + ("raw", "expected"), + ( + ("echo 'hello'", "echo 'hello'"), + (" echo 'hello'", "echo 'hello'"), + (" echo ", "echo"), + ( + """ + echo + """, + "echo", + ), + ( + """ + echo + echo + """, + "echo\necho", + ), + ), +) +def test_target_commands_dedenting(raw: str, expected: str) -> None: + assert Target(commands=raw).commands == expected + + +@pytest.mark.parametrize( + ("target", "args", "expected"), + ( + ( + Target(commands=""), + Args(), + f"#!{shutil.which('sh')} -u\n", + ), + ( + Target(commands="echo 'hello'"), + Args(), + f"#!{shutil.which('sh')} -u\n\necho 'hello'", + ), + ( + Target(commands="echo '{{foo}}'"), + Args({"foo": "bar"}), + f"#!{shutil.which('sh')} -u\n\necho 'bar'", + ), + ( # unused values are ok + Target(commands="echo '{{foo}}'"), + Args({"foo": "bar", "baz": "qux"}), + f"#!{shutil.which('sh')} -u\n\necho 'bar'", + ), + ( + Target(commands="echo {{foo}} {{baz}}"), + Args({"foo": "bar", "baz": "qux"}), + f"#!{shutil.which('sh')} -u\n\necho bar qux", + ), + ( + Target(commands="echo", executable="bash"), + Args(), + f"#!{shutil.which('bash')}\n\necho", + ), + ( + Target(commands="{{ 'yes' if choice else 'no' }}"), + Args({"choice": True}), + f"#!{shutil.which('sh')} -u\n\nyes", + ), + ( + Target(commands="{{ 'yes' if choice else 'no' }}"), + Args({"choice": False}), + f"#!{shutil.which('sh')} -u\n\nno", + ), + ), +) +def test_target_rendering(target: Target, args: Args, expected: str) -> None: + assert target.render(args) == expected + + +def test_rendering_fails_for_bogus_executable() -> None: + with pytest.raises(Exception): + Target(executable="bogus").render(Args()) + + +color = random_color() + + +@pytest.mark.parametrize( + ("unresolved_node", "id", "targets", "triggers", "expected"), + ( + ( + UnresolvedFlowNode( + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + "foo", + {}, + {}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ( + UnresolvedFlowNode( + target="t", + trigger=Once(), + color=color, + ), + "foo", + {"t": Target(commands="echo")}, + {}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ( + UnresolvedFlowNode( + target=Target(commands="echo"), + trigger="r", + color=color, + ), + "foo", + {}, + {"r": Once()}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ( + UnresolvedFlowNode( + target="t", + trigger="r", + color=color, + ), + "foo", + {"t": Target(commands="echo")}, + {"r": Once()}, + FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ), + ), + ), +) +def test_resolve_flow_node( + unresolved_node: UnresolvedFlowNode, + id: str, + targets: dict[str, Target], + triggers: dict[str, AnyTrigger], + expected: FlowNode, +) -> None: + assert unresolved_node.resolve(id, targets, triggers) == expected + + +@pytest.mark.parametrize( + ("unresolved_flow", "targets", "triggers", "expected"), + ( + ( + UnresolvedFlow( + nodes={ + "foo": UnresolvedFlowNode( + target=Target(commands="echo"), + trigger=Once(), + color=color, + ) + } + ), + {}, + {}, + Flow( + nodes={ + "foo": FlowNode( + id="foo", + target=Target(commands="echo"), + trigger=Once(), + color=color, + ) + } + ), + ), + ( + UnresolvedFlow( + nodes={ + "foo": UnresolvedFlowNode( + target="t", + args=frozendict({"foo": "bar"}), + envs=frozendict({"FOO": "BAR"}), + trigger="r", + color=color, + ) + }, + args=frozendict({"baz": "qux"}), + envs=frozendict({"BAZ": "QUX"}), + ), + {"t": Target(commands="echo")}, + {"r": Restart()}, + Flow( + nodes={ + "foo": FlowNode( + id="foo", + target=Target(commands="echo"), + args=frozendict({"foo": "bar"}), + envs=frozendict({"FOO": "BAR"}), + trigger=Restart(), + color=color, + ) + }, + args=frozendict({"baz": "qux"}), + envs=frozendict({"BAZ": "QUX"}), + ), + ), + ), +) +def test_resolve_flow( + unresolved_flow: UnresolvedFlow, + targets: dict[str, Target], + triggers: dict[str, AnyTrigger], + expected: Flow, +) -> None: + assert unresolved_flow.resolve(targets, triggers) == expected + + +@pytest.mark.parametrize( + ("config", "expected"), + ( + ( + Config( + flows={ + "flow": UnresolvedFlow( + nodes={ + "foo": UnresolvedFlowNode( + target="t", + args=frozendict({"foo": "bar"}), + envs=frozendict({"FOO": "BAR"}), + trigger="r", + color=color, + ) + }, + args=frozendict({"baz": "qux"}), + envs=frozendict({"BAZ": "QUX"}), + ) + }, + targets={"t": Target(commands="echo")}, + triggers={"r": Restart()}, + ), + { + "flow": Flow( + nodes={ + "foo": FlowNode( + id="foo", + target=Target(commands="echo"), + args=frozendict({"foo": "bar"}), + envs=frozendict({"FOO": "BAR"}), + trigger=Restart(), + color=color, + ) + }, + args=frozendict({"baz": "qux"}), + envs=frozendict({"BAZ": "QUX"}), + ), + }, + ), + ), +) +def test_resolve_config( + config: Config, + expected: dict[str, Flow], +) -> None: + assert config.resolve() == expected From f3e9344cd87be239f305449da1c3766e89fe8557 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 22:20:02 -0500 Subject: [PATCH 15/21] typo --- synthesize/orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 457260f..d6ae6b7 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -43,7 +43,7 @@ async def run(self) -> None: if not self.state.nodes(): return - with TemporaryDirectory(prefix="snyth-") as tmpdir, self.renderer: + with TemporaryDirectory(prefix="synth-") as tmpdir, self.renderer: tmp_dir = Path(tmpdir) try: From 2681d834a1e6c1879e033b67fb44bac9011f4727 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 22:33:38 -0500 Subject: [PATCH 16/21] strip out frozendict --- poetry.lock | 44 +------------------------------------- pyproject.toml | 14 ++++++------ synthesize/config.py | 29 ++++++++++++------------- synthesize/execution.py | 4 +--- synthesize/orchestrator.py | 2 +- synthesize/renderer.py | 8 +++---- synthesize/state.py | 17 ++++++++------- synthesize/utils.py | 43 +------------------------------------ tests/test_config.py | 33 ++++++++++++++-------------- 9 files changed, 54 insertions(+), 140 deletions(-) diff --git a/poetry.lock b/poetry.lock index e50dc7e..062c891 100644 --- a/poetry.lock +++ b/poetry.lock @@ -320,48 +320,6 @@ docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1 testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-asyncio (>=0.21)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)", "virtualenv (>=20.26.2)"] typing = ["typing-extensions (>=4.8)"] -[[package]] -name = "frozendict" -version = "2.4.4" -description = "A simple immutable dictionary" -optional = false -python-versions = ">=3.6" -files = [ - {file = "frozendict-2.4.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4a59578d47b3949437519b5c39a016a6116b9e787bb19289e333faae81462e59"}, - {file = "frozendict-2.4.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:12a342e439aef28ccec533f0253ea53d75fe9102bd6ea928ff530e76eac38906"}, - {file = "frozendict-2.4.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f79c26dff10ce11dad3b3627c89bb2e87b9dd5958c2b24325f16a23019b8b94"}, - {file = "frozendict-2.4.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:2bd009cf4fc47972838a91e9b83654dc9a095dc4f2bb3a37c3f3124c8a364543"}, - {file = "frozendict-2.4.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:87ebcde21565a14fe039672c25550060d6f6d88cf1f339beac094c3b10004eb0"}, - {file = "frozendict-2.4.4-cp310-cp310-win_amd64.whl", hash = "sha256:fefeb700bc7eb8b4c2dc48704e4221860d254c8989fb53488540bc44e44a1ac2"}, - {file = "frozendict-2.4.4-cp310-cp310-win_arm64.whl", hash = "sha256:4297d694eb600efa429769125a6f910ec02b85606f22f178bafbee309e7d3ec7"}, - {file = "frozendict-2.4.4-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:812ab17522ba13637826e65454115a914c2da538356e85f43ecea069813e4b33"}, - {file = "frozendict-2.4.4-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fee9420475bb6ff357000092aa9990c2f6182b2bab15764330f4ad7de2eae49"}, - {file = "frozendict-2.4.4-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:3148062675536724502c6344d7c485dd4667fdf7980ca9bd05e338ccc0c4471e"}, - {file = "frozendict-2.4.4-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:78c94991944dd33c5376f720228e5b252ee67faf3bac50ef381adc9e51e90d9d"}, - {file = "frozendict-2.4.4-cp36-cp36m-win_amd64.whl", hash = "sha256:1697793b5f62b416c0fc1d94638ec91ed3aa4ab277f6affa3a95216ecb3af170"}, - {file = "frozendict-2.4.4-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:199a4d32194f3afed6258de7e317054155bc9519252b568d9cfffde7e4d834e5"}, - {file = "frozendict-2.4.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:85375ec6e979e6373bffb4f54576a68bf7497c350861d20686ccae38aab69c0a"}, - {file = "frozendict-2.4.4-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:2d8536e068d6bf281f23fa835ac07747fb0f8851879dd189e9709f9567408b4d"}, - {file = "frozendict-2.4.4-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:259528ba6b56fa051bc996f1c4d8b57e30d6dd3bc2f27441891b04babc4b5e73"}, - {file = "frozendict-2.4.4-cp37-cp37m-win_amd64.whl", hash = "sha256:07c3a5dee8bbb84cba770e273cdbf2c87c8e035903af8f781292d72583416801"}, - {file = "frozendict-2.4.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6874fec816b37b6eb5795b00e0574cba261bf59723e2de607a195d5edaff0786"}, - {file = "frozendict-2.4.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c8f92425686323a950337da4b75b4c17a3327b831df8c881df24038d560640d4"}, - {file = "frozendict-2.4.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d58d9a8d9e49662c6dafbea5e641f97decdb3d6ccd76e55e79818415362ba25"}, - {file = "frozendict-2.4.4-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:93a7b19afb429cbf99d56faf436b45ef2fa8fe9aca89c49eb1610c3bd85f1760"}, - {file = "frozendict-2.4.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:2b70b431e3a72d410a2cdf1497b3aba2f553635e0c0f657ce311d841bf8273b6"}, - {file = "frozendict-2.4.4-cp38-cp38-win_amd64.whl", hash = "sha256:e1b941132d79ce72d562a13341d38fc217bc1ee24d8c35a20d754e79ff99e038"}, - {file = "frozendict-2.4.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:dc2228874eacae390e63fd4f2bb513b3144066a977dc192163c9f6c7f6de6474"}, - {file = "frozendict-2.4.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:63aa49f1919af7d45fb8fd5dec4c0859bc09f46880bd6297c79bb2db2969b63d"}, - {file = "frozendict-2.4.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c6bf9260018d653f3cab9bd147bd8592bf98a5c6e338be0491ced3c196c034a3"}, - {file = "frozendict-2.4.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:6eb716e6a6d693c03b1d53280a1947716129f5ef9bcdd061db5c17dea44b80fe"}, - {file = "frozendict-2.4.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:d13b4310db337f4d2103867c5a05090b22bc4d50ca842093779ef541ea9c9eea"}, - {file = "frozendict-2.4.4-cp39-cp39-win_amd64.whl", hash = "sha256:b3b967d5065872e27b06f785a80c0ed0a45d1f7c9b85223da05358e734d858ca"}, - {file = "frozendict-2.4.4-cp39-cp39-win_arm64.whl", hash = "sha256:4ae8d05c8d0b6134bfb6bfb369d5fa0c4df21eabb5ca7f645af95fdc6689678e"}, - {file = "frozendict-2.4.4-py311-none-any.whl", hash = "sha256:705efca8d74d3facbb6ace80ab3afdd28eb8a237bfb4063ed89996b024bc443d"}, - {file = "frozendict-2.4.4-py312-none-any.whl", hash = "sha256:d9647563e76adb05b7cde2172403123380871360a114f546b4ae1704510801e5"}, - {file = "frozendict-2.4.4.tar.gz", hash = "sha256:3f7c031b26e4ee6a3f786ceb5e3abf1181c4ade92dce1f847da26ea2c96008c7"}, -] - [[package]] name = "ghp-import" version = "2.1.0" @@ -1607,4 +1565,4 @@ anyio = ">=3.0.0" [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "020dc21dc785e7e169ca706ec2f9acf0d8913ce51ef7763d2970df9edd6f17e9" +content-hash = "7397750614f0c947e02cd3f371b93574c032c69e8797d16df052bbc5f3ff1aa4" diff --git a/pyproject.toml b/pyproject.toml index caf9d93..3886025 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,6 @@ networkx = ">=3.0" watchfiles = ">=0.18" identify = ">=2.5" jinja2 = ">=3.1" -frozendict = ">=2.4" [tool.poetry.group.dev.dependencies] pre-commit = ">=3" @@ -113,10 +112,11 @@ select = [ ] ignore = [ - "E501", # line length exceeds limit - "E741", # ambiguous variable name - "T201", # print - "T203", # pprint - "F403", # star imports, used for utilities - "F405", # star imports, used for utilities + "E501", # line length exceeds limit + "E741", # ambiguous variable name + "T201", # print + "T203", # pprint + "F403", # star imports, used for utilities + "F405", # star imports, used for utilities + "RUF012", # pydantic allows mutable class attributes ] diff --git a/synthesize/config.py b/synthesize/config.py index f0c856c..0740dd8 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -9,16 +9,14 @@ from textwrap import dedent from typing import Annotated, Literal, Union -from frozendict import frozendict from identify.identify import tags_from_path from jinja2 import Environment from pydantic import Field, field_validator from rich.color import Color from synthesize.model import Model -from synthesize.utils import FrozenDict -Args = FrozenDict[ +Args = dict[ Annotated[ str, Field( @@ -29,7 +27,7 @@ ], object, ] -Envs = FrozenDict[ +Envs = dict[ Annotated[ str, Field( @@ -120,8 +118,8 @@ class FlowNode(Model): id: str target: Target - args: Args = frozendict() - envs: Envs = frozendict() + args: Args = {} + envs: Envs = {} trigger: AnyTrigger @@ -130,8 +128,8 @@ class FlowNode(Model): class UnresolvedFlowNode(Model): target: Target | str - args: Args = frozendict() - envs: Envs = frozendict() + args: Args = {} + envs: Envs = {} trigger: AnyTrigger | str = Once() @@ -155,14 +153,14 @@ def resolve( class Flow(Model): nodes: dict[str, FlowNode] - args: Args = frozendict() - envs: Envs = frozendict() + args: Args = {} + envs: Envs = {} class UnresolvedFlow(Model): nodes: dict[str, UnresolvedFlowNode] - args: Args = frozendict() - envs: Envs = frozendict() + args: Args = {} + envs: Envs = {} def resolve( self, @@ -180,9 +178,9 @@ def resolve( class Config(Model): - flows: FrozenDict[str, UnresolvedFlow] = frozendict() - targets: FrozenDict[str, Target] = frozendict() - triggers: FrozenDict[str, AnyTrigger] = frozendict() + flows: dict[str, UnresolvedFlow] = {} + targets: dict[str, Target] = {} + triggers: dict[str, AnyTrigger] = {} @classmethod def from_file(cls, file: Path) -> Config: @@ -194,6 +192,7 @@ def from_file(cls, file: Path) -> Config: raise NotImplementedError("Currently, only YAML files are supported.") def resolve(self) -> Mapping[str, Flow]: + print(self.flows) return { flow_id: flow.resolve(self.targets, self.triggers) for flow_id, flow in self.flows.items() diff --git a/synthesize/execution.py b/synthesize/execution.py index a6d8b08..04135f1 100644 --- a/synthesize/execution.py +++ b/synthesize/execution.py @@ -8,8 +8,6 @@ from signal import SIGKILL, SIGTERM from stat import S_IEXEC -from frozendict import frozendict - from synthesize.config import Args, Envs, FlowNode from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message from synthesize.utils import md5 @@ -58,7 +56,7 @@ async def start( program=path, stdout=PIPE, stderr=STDOUT, - env=frozendict(os.environ) + env=os.environ | envs | node.envs | { diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index d6ae6b7..ff11505 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -40,7 +40,7 @@ def __init__(self, flow: Flow, console: Console): self.heartbeat: Task[None] | None = None async def run(self) -> None: - if not self.state.nodes(): + if not self.flow.nodes: return with TemporaryDirectory(prefix="synth-") as tmpdir, self.renderer: diff --git a/synthesize/renderer.py b/synthesize/renderer.py index 767e741..6bb4adc 100644 --- a/synthesize/renderer.py +++ b/synthesize/renderer.py @@ -63,17 +63,17 @@ def handle_message(self, message: Message) -> None: def info(self, event: Message) -> RenderableType: table = Table.grid(padding=(1, 1, 0, 0)) - running_targets = self.state.running_nodes() + running_nodes = self.state.running_nodes() running = ( Text.assemble( "Running ", Text(" ").join( Text(t.id, style=Style(color="black", bgcolor=t.color)) - for t in sorted(running_targets, key=lambda t: t.id) + for t in sorted(running_nodes, key=lambda t: t.id) ), ) - if running_targets + if running_nodes else Text() ) @@ -82,7 +82,7 @@ def info(self, event: Message) -> RenderableType: running, ) - return Group(Rule(style=(Style(color="green" if running_targets else "yellow"))), table) + return Group(Rule(style=(Style(color="green" if running_nodes else "yellow"))), table) def render_prefix( self, message: ExecutionOutput | ExecutionStarted | ExecutionCompleted | WatchPathChanged diff --git a/synthesize/state.py b/synthesize/state.py index 69f5475..3464298 100644 --- a/synthesize/state.py +++ b/synthesize/state.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Collection, Iterator from dataclasses import dataclass from enum import Enum @@ -37,20 +38,20 @@ def from_flow(cls, flow: Flow) -> FlowState: statuses={id: FlowNodeStatus.Pending for id in graph.nodes}, ) - def running_nodes(self) -> set[FlowNode]: - return { + def running_nodes(self) -> Collection[FlowNode]: + return tuple( self.flow.nodes[id] for id, status in self.statuses.items() if status is FlowNodeStatus.Running - } + ) - def ready_nodes(self) -> set[FlowNode]: - return { + def ready_nodes(self) -> Collection[FlowNode]: + return tuple( self.flow.nodes[id] for id in self.graph.nodes if self.statuses[id] is FlowNodeStatus.Pending and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)) - } + ) def mark_success(self, node: FlowNode) -> None: self.statuses[node.id] = FlowNodeStatus.Succeeded @@ -74,8 +75,8 @@ def all_done(self) -> bool: def num_nodes(self) -> int: return len(self.graph) - def nodes(self) -> set[FlowNode]: - return set(self.flow.nodes.values()) + def nodes(self) -> Iterator[FlowNode]: + yield from self.flow.nodes.values() def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]: diff --git a/synthesize/utils.py b/synthesize/utils.py index d6d8b4d..40af9d9 100644 --- a/synthesize/utils.py +++ b/synthesize/utils.py @@ -2,21 +2,7 @@ import hashlib from asyncio import Task, create_task, sleep -from typing import Annotated, Any, Awaitable, Callable, Optional, TypeVar - -from frozendict import frozendict -from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler -from pydantic.json_schema import JsonSchemaValue -from pydantic_core import CoreSchema -from pydantic_core.core_schema import ( - chain_schema, - dict_schema, - is_instance_schema, - json_or_python_schema, - no_info_plain_validator_function, - plain_serializer_function_ser_schema, - union_schema, -) +from typing import Awaitable, Callable, Optional, TypeVar T = TypeVar("T") @@ -31,30 +17,3 @@ async def delayed() -> T: def md5(data: bytes) -> str: return hashlib.md5(data).hexdigest() - - -# https://github.com/pydantic/pydantic/discussions/8721 -class _PydanticFrozenDictAnnotation: - @classmethod - def __get_pydantic_core_schema__( - cls, _source_type: Any, _handler: GetCoreSchemaHandler - ) -> CoreSchema: - from_dict_schema = chain_schema( - [dict_schema(), no_info_plain_validator_function(frozendict)] - ) - return json_or_python_schema( - json_schema=from_dict_schema, - python_schema=union_schema([is_instance_schema(frozendict), from_dict_schema]), - serialization=plain_serializer_function_ser_schema(dict), - ) - - @classmethod - def __get_pydantic_json_schema__( - cls, _: CoreSchema, handler: GetJsonSchemaHandler - ) -> JsonSchemaValue: - return handler(dict_schema()) - - -_K = TypeVar("_K") -_V = TypeVar("_V") -FrozenDict = Annotated[frozendict[_K, _V], _PydanticFrozenDictAnnotation] diff --git a/tests/test_config.py b/tests/test_config.py index 026b313..b294923 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,7 +2,6 @@ from pathlib import Path import pytest -from frozendict import frozendict from rich.style import Style from synthesize.config import ( @@ -221,14 +220,14 @@ def test_resolve_flow_node( nodes={ "foo": UnresolvedFlowNode( target="t", - args=frozendict({"foo": "bar"}), - envs=frozendict({"FOO": "BAR"}), + args={"foo": "bar"}, + envs={"FOO": "BAR"}, trigger="r", color=color, ) }, - args=frozendict({"baz": "qux"}), - envs=frozendict({"BAZ": "QUX"}), + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, ), {"t": Target(commands="echo")}, {"r": Restart()}, @@ -237,14 +236,14 @@ def test_resolve_flow_node( "foo": FlowNode( id="foo", target=Target(commands="echo"), - args=frozendict({"foo": "bar"}), - envs=frozendict({"FOO": "BAR"}), + args={"foo": "bar"}, + envs={"FOO": "BAR"}, trigger=Restart(), color=color, ) }, - args=frozendict({"baz": "qux"}), - envs=frozendict({"BAZ": "QUX"}), + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, ), ), ), @@ -268,14 +267,14 @@ def test_resolve_flow( nodes={ "foo": UnresolvedFlowNode( target="t", - args=frozendict({"foo": "bar"}), - envs=frozendict({"FOO": "BAR"}), + args={"foo": "bar"}, + envs={"FOO": "BAR"}, trigger="r", color=color, ) }, - args=frozendict({"baz": "qux"}), - envs=frozendict({"BAZ": "QUX"}), + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, ) }, targets={"t": Target(commands="echo")}, @@ -287,14 +286,14 @@ def test_resolve_flow( "foo": FlowNode( id="foo", target=Target(commands="echo"), - args=frozendict({"foo": "bar"}), - envs=frozendict({"FOO": "BAR"}), + args={"foo": "bar"}, + envs={"FOO": "BAR"}, trigger=Restart(), color=color, ) }, - args=frozendict({"baz": "qux"}), - envs=frozendict({"BAZ": "QUX"}), + args={"baz": "qux"}, + envs={"BAZ": "QUX"}, ), }, ), From 49b2eebceb88343dd33a41d3115566686fe6f5f8 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 23:04:55 -0500 Subject: [PATCH 17/21] more tests --- poetry.lock | 13 ++- pyproject.toml | 1 + synthesize/config.py | 5 +- tests/test_execution.py | 239 ++++++++++++++++++++++++++++++++++++++++ tests/test_utils.py | 27 +++++ 5 files changed, 281 insertions(+), 4 deletions(-) create mode 100644 tests/test_execution.py create mode 100644 tests/test_utils.py diff --git a/poetry.lock b/poetry.lock index 062c891..8acba6e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -711,6 +711,17 @@ files = [ griffe = ">=0.47" mkdocstrings = ">=0.25" +[[package]] +name = "more-itertools" +version = "10.3.0" +description = "More routines for operating on iterables, beyond itertools" +optional = false +python-versions = ">=3.8" +files = [ + {file = "more-itertools-10.3.0.tar.gz", hash = "sha256:e5d93ef411224fbcef366a6e8ddc4c5781bc6359d43412a65dd5964e46111463"}, + {file = "more_itertools-10.3.0-py3-none-any.whl", hash = "sha256:ea6a02e24a9161e51faad17a8782b92a0df82c12c1c8886fec7f0c3fa1a1b320"}, +] + [[package]] name = "mypy" version = "1.10.1" @@ -1565,4 +1576,4 @@ anyio = ">=3.0.0" [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "7397750614f0c947e02cd3f371b93574c032c69e8797d16df052bbc5f3ff1aa4" +content-hash = "ba9a5a86fb46076cd5dcf437aac10e78fd145488f53cebbfdfb46d87065c1d21" diff --git a/pyproject.toml b/pyproject.toml index 3886025..c1bb050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ networkx = ">=3.0" watchfiles = ">=0.18" identify = ">=2.5" jinja2 = ">=3.1" +more-itertools = ">=10.3" [tool.poetry.group.dev.dependencies] pre-commit = ">=3" diff --git a/synthesize/config.py b/synthesize/config.py index 0740dd8..d76522b 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -121,9 +121,9 @@ class FlowNode(Model): args: Args = {} envs: Envs = {} - trigger: AnyTrigger + trigger: AnyTrigger = Once() - color: str + color: Annotated[str, Field(default_factory=random_color)] class UnresolvedFlowNode(Model): @@ -192,7 +192,6 @@ def from_file(cls, file: Path) -> Config: raise NotImplementedError("Currently, only YAML files are supported.") def resolve(self) -> Mapping[str, Flow]: - print(self.flows) return { flow_id: flow.resolve(self.targets, self.triggers) for flow_id, flow in self.flows.items() diff --git a/tests/test_execution.py b/tests/test_execution.py new file mode 100644 index 0000000..baac608 --- /dev/null +++ b/tests/test_execution.py @@ -0,0 +1,239 @@ +from asyncio import Queue +from pathlib import Path + +import pytest + +from synthesize.config import Envs, FlowNode, Target, random_color +from synthesize.execution import Execution +from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message + +color = random_color() + + +async def test_execution_lifecycle(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + assert await ex.wait() is ex + + assert ex.has_exited + + msg = await q.get() + + assert isinstance(msg, ExecutionStarted) + assert msg.node is node + assert msg.pid == ex.pid + + msg = await q.get() + + assert isinstance(msg, ExecutionOutput) + assert msg.node is node + assert msg.text == "hi" + + msg = await q.get() + + assert isinstance(msg, ExecutionCompleted) + assert msg.node is node + assert msg.pid == ex.pid + assert msg.exit_code == ex.exit_code == 0 + + +async def test_termination_before_completion(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="sleep 10 && echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + ex.terminate() + + assert await ex.wait() is ex + + assert ex.has_exited + + msg = await q.get() + + assert isinstance(msg, ExecutionStarted) + assert msg.node is node + assert msg.pid == ex.pid + + msg = await q.get() + + assert isinstance(msg, ExecutionCompleted) + assert msg.node is node + assert msg.pid == ex.pid + assert msg.exit_code == ex.exit_code == -15 + + +async def test_termination_after_completion(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + assert await ex.wait() is ex + + assert ex.has_exited + + ex.terminate() # noop + + +async def test_execution_kill(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="sleep 10 && echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + ex.kill() + + assert await ex.wait() is ex + + assert ex.has_exited + + msg = await q.get() + + assert isinstance(msg, ExecutionStarted) + assert msg.node is node + assert msg.pid == ex.pid + + msg = await q.get() + + assert isinstance(msg, ExecutionCompleted) + assert msg.node is node + assert msg.pid == ex.pid + assert msg.exit_code == ex.exit_code == -9 + + +async def test_kill_after_completion(tmp_path: Path) -> None: + node = FlowNode( + id="foo", + target=Target(commands="echo 'hi'"), + color=color, + ) + + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs={}, + tmp_dir=tmp_path, + width=80, + events=q, + ) + + assert await ex.wait() is ex + + assert ex.has_exited + + ex.kill() # noop + + +@pytest.mark.parametrize( + ("node", "envs", "expected"), + ( + ( + FlowNode( + id="foo", + target=Target(commands="echo $FORCE_COLOR"), + color=color, + ), + Envs(), + "1", + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $COLUMNS"), + color=color, + ), + Envs(), + "111", # set in test body below + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $SYNTH_NODE_ID"), + color=color, + ), + Envs(), + "foo", + ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $FOO"), + color=color, + ), + Envs({"FOO": "baz"}), + "baz", + ), + ), +) +async def test_envs( + tmp_path: Path, + node: FlowNode, + envs: Envs, + expected: str, +) -> None: + q: Queue[Message] = Queue() + ex = await Execution.start( + node=node, + args={}, + envs=envs, + tmp_dir=tmp_path, + width=111, + events=q, + ) + + await ex.wait() + + await q.get() + msg = await q.get() + + assert isinstance(msg, ExecutionOutput) + assert msg.text == expected diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..15a2390 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,27 @@ +from time import monotonic + +import pytest + +from synthesize.utils import delay, md5 + + +async def test_delay() -> None: + async def fn() -> str: + return "hi" + + start = monotonic() + + assert await delay(0.1, fn) == "hi" + + assert monotonic() - start >= 0.1 + + +@pytest.mark.parametrize( + ("data", "expected"), + ( + (b"", "d41d8cd98f00b204e9800998ecf8427e"), + (b"hello", "5d41402abc4b2a76b9719d911017c592"), + ), +) +def test_md5(data: bytes, expected: str) -> None: + assert md5(data) == expected From bc005586d6c900931b83410a040765a45332630c Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 23:06:57 -0500 Subject: [PATCH 18/21] tweak --- tests/test_execution.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/test_execution.py b/tests/test_execution.py index baac608..5e5a4e2 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -203,6 +203,16 @@ async def test_kill_after_completion(tmp_path: Path) -> None: Envs(), "foo", ), + ( + FlowNode( + id="foo", + target=Target(commands="echo $FOO"), + envs=Envs({"FOO": "bar"}), + color=color, + ), + Envs(), + "bar", + ), ( FlowNode( id="foo", From 8aece92928d09892c9a5f3d8a8da072083895847 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 23:11:23 -0500 Subject: [PATCH 19/21] changelog --- docs/changelog.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/changelog.md b/docs/changelog.md index d70ee61..549d68a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -14,6 +14,12 @@ Reorganized configuration to separate targets, triggers (formerly "lifecycles"), and flows (graphs of targets and triggers)." +- [#33](https://github.com/JoshKarpel/synthesize/pull/33) + Allow injecting arguments + (via [Jinja2 templates](https://jinja.palletsprojects.com/)) + and environment variables into target commands. + Arguments and environment variables can be specified at either + the flow, node, or target level, with the most specific taking precedence. ## `0.0.2` From 0becf1269157cf28e3e830327453fabc7fba0242 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 23:12:30 -0500 Subject: [PATCH 20/21] move changelog entry --- docs/changelog.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/changelog.md b/docs/changelog.md index 549d68a..1ea21bd 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -7,6 +7,12 @@ ### Added - [#3](https://github.com/JoshKarpel/synthesize/pull/3) Added PyPI classifiers and other metadata. +- [#33](https://github.com/JoshKarpel/synthesize/pull/33) + Allow injecting arguments + (via [Jinja2 templates](https://jinja.palletsprojects.com/)) + and environment variables into target commands. + Arguments and environment variables can be specified at either + the flow, node, or target level, with the most specific taking precedence. ### Changed @@ -14,12 +20,6 @@ Reorganized configuration to separate targets, triggers (formerly "lifecycles"), and flows (graphs of targets and triggers)." -- [#33](https://github.com/JoshKarpel/synthesize/pull/33) - Allow injecting arguments - (via [Jinja2 templates](https://jinja.palletsprojects.com/)) - and environment variables into target commands. - Arguments and environment variables can be specified at either - the flow, node, or target level, with the most specific taking precedence. ## `0.0.2` From cf84db15d418ae406c9f8cc0ab0c0be2ad378232 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 23:14:17 -0500 Subject: [PATCH 21/21] lock hash --- poetry.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 8acba6e..8f2acfc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1576,4 +1576,4 @@ anyio = ">=3.0.0" [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "ba9a5a86fb46076cd5dcf437aac10e78fd145488f53cebbfdfb46d87065c1d21" +content-hash = "766e4897e1ddefe89c187a03f1a8595d78228b455c1f50fb8412b790f7b3a671"