forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
starter.py
46 lines (36 loc) · 1.24 KB
/
starter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import logging
import sys
import uuid
import dacite
import yaml
from temporalio.client import Client
from dsl.workflow import DSLInput, DSLWorkflow
async def main(dsl_yaml: str) -> None:
# Convert the YAML to our dataclass structure. We use PyYAML + dacite to do
# this but it can be done any number of ways.
dsl_input = dacite.from_dict(DSLInput, yaml.safe_load(dsl_yaml))
# Connect client
client = await Client.connect("localhost:7233")
# Run workflow
result = await client.execute_workflow(
DSLWorkflow.run,
dsl_input,
id=f"dsl-workflow-id-{uuid.uuid4()}",
task_queue="dsl-task-queue",
)
logging.info(
f"Final variables:\n "
+ "\n ".join((f"{k}: {v}" for k, v in result.items()))
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Require the YAML file as an argument. We read this _outside_ of the async
# def function because thread-blocking IO should never happen in async def
# functions.
if len(sys.argv) != 2:
raise RuntimeError("Expected single argument for YAML file")
with open(sys.argv[1], "r") as yaml_file:
dsl_yaml = yaml_file.read()
# Run
asyncio.run(main(dsl_yaml))