Skip to content

Commit

Permalink
Allow Restart triggers to let child nodes run again (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshKarpel authored Jun 29, 2024
1 parent 8ea4963 commit a53ce91
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 84 deletions.
5 changes: 5 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
- [#41](https://github.com/JoshKarpel/synthesize/pull/41)
Execution duration is printed in the completion message.

### Fixed

- [#45](https://github.com/JoshKarpel/synthesize/pull/45)
`Restart` triggers now allow for the node's children to run again after the node completes.

## `0.0.2`

Released `2023-02-12`
Expand Down
18 changes: 9 additions & 9 deletions docs/examples/after.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
flows:
default:
nodes:
1:
A:
target: sleep-and-echo
2:
B:
target: sleep-and-echo
3:
C:
target: sleep-and-echo
4:
D:
target: sleep-and-echo
trigger:
after: ["1", "2"]
5:
after: ["A", "B"]
E:
target: sleep-and-echo
trigger:
after: ["3"]
6:
after: ["C"]
F:
target: sleep-and-echo
trigger:
after: ["4", "5"]
after: ["D", "E"]

targets:
sleep-and-echo:
Expand Down
6 changes: 3 additions & 3 deletions docs/examples/once.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
flows:
default:
nodes:
1:
A:
target: sleep-and-echo
2:
B:
target: sleep-and-echo
3:
C:
target: sleep-and-echo

targets:
Expand Down
22 changes: 22 additions & 0 deletions docs/examples/restart-after.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
flows:
default:
nodes:
A:
target: sleep-and-echo
trigger:
type: restart
delay: 10
B:
target: sleep-and-echo
trigger:
after: ["A"]
C:
target: sleep-and-echo
trigger:
after: ["B"]

targets:
sleep-and-echo:
commands: |
sleep 2
echo "Hi from {{ id }}!"
4 changes: 2 additions & 2 deletions docs/examples/restart.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
flows:
default:
nodes:
1:
A:
target: sleep-and-echo
trigger:
type: restart
delay: 3
2:
B:
target: sleep-and-echo
trigger:
type: restart
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/watch.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
flows:
default:
nodes:
1:
A:
target: sleep-and-echo
trigger:
type: watch
paths: ["synthesize/", "tests/"]
2:
B:
target: sleep-and-echo
trigger:
type: watch
Expand Down
2 changes: 1 addition & 1 deletion docs/hooks/mermaid.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def on_page_markdown(
) -> str:
lines = []
for line in markdown.splitlines():
if match := re.match(r"@mermaid\(([\w\.\/]+)\)", line):
if match := re.match(r"@mermaid\(([\w\.\/\-]+)\)", line):
lines.append("```mermaid")
cmd = subprocess.run(
("synth", "--mermaid", "--config", match.group(1)),
Expand Down
6 changes: 6 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
# Synthesize

```yaml
--8<-- "synth.yaml"
```

@mermaid(synth.yaml)
6 changes: 4 additions & 2 deletions docs/triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ Use this trigger to run a node in reaction to changes in the filesystem.

## Combining Triggers

### Restart + After

```yaml
--8<-- "synth.yaml"
--8<-- "docs/examples/restart-after.yaml"
```

@mermaid(synth.yaml)
@mermaid(docs/examples/restart-after.yaml)
27 changes: 15 additions & 12 deletions synthesize/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
WatchPathChanged,
)
from synthesize.renderer import Renderer
from synthesize.state import FlowState
from synthesize.state import FlowState, Status
from synthesize.utils import delay


Expand Down Expand Up @@ -87,12 +87,15 @@ async def handle_messages(self, tmp_dir: Path) -> None:
else:
self.state.mark_failure(node)

self.state.mark_pending(*self.state.children(node))

case WatchPathChanged(node=node):
if e := self.executions.get(node.id):
self.waiters[node.id].add_done_callback(
lambda _: self.state.mark_pending(node)
)
e.terminate()

self.state.mark_descendants_pending(node)

case Quit():
return

Expand All @@ -112,28 +115,28 @@ async def heartbeat() -> None:
self.heartbeat = create_task(heartbeat())

async def start_ready_targets(self, tmp_dir: Path) -> None:
for ready_node in self.state.ready_nodes():
if e := self.executions.get(ready_node.id):
for node in self.state.ready_nodes():
if e := self.executions.get(node.id):
if not e.has_exited:
continue

self.state.mark_running(ready_node)

async def start() -> None:
e = await Execution.start(
node=ready_node,
node=node,
args=self.flow.args,
envs=self.flow.envs,
tmp_dir=tmp_dir,
width=self.console.width - self.renderer.prefix_width,
events=self.inbox,
)
self.executions[ready_node.id] = e
self.waiters[ready_node.id] = create_task(e.wait())
self.executions[node.id] = e
self.waiters[node.id] = create_task(e.wait())
self.state.mark_running(node)

# When restarting after first execution, delay
if isinstance(ready_node.trigger, Restart) and ready_node.id in self.executions:
delay(ready_node.trigger.delay, start)
if isinstance(node.trigger, Restart) and node.id in self.executions:
self.state.mark(node, status=Status.Waiting)
delay(node.trigger.delay, start)
else:
await start()

Expand Down
51 changes: 33 additions & 18 deletions synthesize/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Message,
WatchPathChanged,
)
from synthesize.state import FlowState
from synthesize.state import FlowState, Status

prefix_format = "{timestamp:%H:%M:%S} {id} "
internal_format = "{timestamp:%H:%M:%S}"
Expand Down Expand Up @@ -61,28 +61,43 @@ def handle_message(self, message: Message) -> None:
self.update(message)

def info(self, event: Message) -> RenderableType:
table = Table.grid(padding=(1, 1, 0, 0))
table = Table.grid(padding=(1, 1, 0, 0), expand=False)

status_table = Table.grid(padding=(2, 2, 0, 0), expand=False)

nodes_by_status = self.state.nodes_by_status()
node_status_displays = []
for status in (
Status.Running,
Status.Waiting,
Status.Pending,
Status.Succeeded,
Status.Failed,
):
nodes_with_status = nodes_by_status[status]
if nodes_with_status:
node_status_displays.append(
Text.assemble(
status.value.capitalize(),
" ",
Text(" ").join(
Text(t.id, style=Style(color="black", bgcolor=t.color))
for t in sorted(nodes_with_status, key=lambda t: t.id)
),
)
)

running_nodes = self.state.running_nodes()

running = (
Text.assemble(
"Running ",
Text(" ").join(
Text(t.id, style=Style(color="black", bgcolor=t.color))
for t in sorted(running_nodes, key=lambda t: t.id)
),
)
if running_nodes
else Text()
)
status_table.add_row(*node_status_displays)

table.add_row(
internal_format.format_map({"timestamp": event.timestamp}),
running,
status_table,
)

return Group(Rule(style=(Style(color="green" if running_nodes else "yellow"))), table)
return Group(
Rule(style=(Style(color="green" if nodes_by_status[Status.Running] else "yellow"))),
table,
)

def render_prefix(
self, message: ExecutionOutput | ExecutionStarted | ExecutionCompleted | WatchPathChanged
Expand All @@ -109,7 +124,7 @@ def handle_lifecycle_message(
) -> None:
prefix = Text.from_markup(
self.render_prefix(message),
style=Style(color=message.node.color),
style=Style(color=message.node.color, dim=True),
)

parts: tuple[str | tuple[str, str] | tuple[str, Style] | Text, ...]
Expand Down
Loading

0 comments on commit a53ce91

Please sign in to comment.