We provide a python client for interacting with the cluster, which is accessible by installing the vibe_core
package.
For the complete documentation of the client, please refer to the client documentation.
In this user guide, we provide an overview of the client and how to use it to interact with the cluster. To start, we will instantiate a client object by doing:
from vibe_core.client import get_default_vibe_client
client = get_default_vibe_client()
The get_default_vibe_client
function will automatically target your local cluster. If you want to target a remote cluster, make sure you add the remote
argument:
client = get_default_vibe_client("remote")
The URL of the local/remote cluster is written to configuration files by the farmvibes-ai <local | remote> setup
script. In case the deployment changes, you can update the configuration files by running farmvibes-ai <local | remote> status
.
The list_workflows
method can be used to list the name of all available workflows
>>> client.list_workflows()[:7]
['ingest_raster',
'helloworld',
'farm_ai/land_cover_mapping/conservation_practices',
'farm_ai/agriculture/canopy_cover',
'farm_ai/agriculture/methane_index',
'farm_ai/agriculture/emergence_summary',
'farm_ai/agriculture/change_detection',
]
The document_workflow
method provides documentation about a workflow, including it's inputs, outputs,
and parameters.
>>> client.document_workflow("helloworld")
Workflow: helloworld
Description:
Hello world! Small test workflow that generates an image of the Earth with countries that
intersect with the input geometry highlighted in orange.
Sources:
- user_input (vibe_core.data.core_types.DataVibe): Input geometry.
Sinks:
- raster (vibe_core.data.rasters.Raster): Raster with highlighted countries.
Tasks:
- hello
For more information about workflows, check the workflow documentation.
To submit a workflow run, use the run
method, which takes in the workflow, a run name,
the workflow input, and optional parameter overrides. For workflows that take in a DataVibe
defining a region and time range of interest, the inputs can be a geometry (shapely
object)
and time range (tuple of datetime
objects). See the example below:
from shapely import geometry as shpg
from datetime import datetime
# Geometry in WGS-84/EPSG:4326
geom = shpg.box(-122.142363,47.681775, -122.106146, 47.667801)
# Time range with start and end
time_range = (datetime(2020, 1, 1), datetime(2022, 1, 1))
run = client.run("helloworld", "My first workflow run", geometry=geom, time_range=time_range)
To submit a run with other inputs, use the alternate run
signature. The following is an equivalent
example of the previous run, but instead submitting a DataVibe
object.
from vibe_core.data import DataVibe, gen_guid
vibe_input = DataVibe(id=gen_guid(), geometry=shpg.mapping(geom), time_range=time_range, assets=[])
# Since this workflow only has a single source (input), we can pass in the object directly, and
# it will be assigned to the only source
run = client.run("helloworld", "Workflow run with other inputs", input_data=vibe_input)
# More generally, pass in a dict where the keys are the workflow sources
run = client.run(
"helloworld",
"Workflow run with other inputs",
input_data={"user_input": vibe_input}
)
The run
method will return a VibeWorkflowRun
object, that contains information about your run,
and can be used to keep track of your run progress, access outputs, and more. The object
representation will display the run id, name, workflow, and status:
>>> run
VibeWorkflowRun(id='7b95932f-2428-4036-b4cc-14ef832bf8c2', name='My first workflow run', workflow='helloworld', status='running')
This information can also be queried with their respective property. For the status, it will be refreshed at every call:
>>> run.status
<RunStatus.done: 'done'>
For more detailed information about each task in the workflow run, use task_status
, and
task_details
:
>>> run.task_status # Status of each task
{'hello': 'done'}
>>> run.task_details # Full details
{'hello': RunDetails(start_time=datetime.datetime(2022, 10, 3, 22, 22, 4, 609784), end_time=datetime.datetime(2022, 10, 3, 22, 22, 9, 533641), reason=None, status='done'),}
To monitor the run in a continuous manner, use the monitor
method. It will draw a table on the
terminal and update it on a regular interval
>>> run.monitor()
🌎 FarmVibes.AI 🌍 helloworld 🌏
Run name: My first workflow run
Run id: dd541f5b-4f03-46e2-b017-8e88a518dfe6
Run status: done
Run duration: 00:00:04
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Task Name ┃ Status ┃ Start Time ┃ End Time ┃ Duration ┃ Progress ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ hello │ done │ 2023/08/17 14:45:13 │ 2023/08/17 14:45:17 │ 00:00:04 │ ━━━━━━━━━━━━━━━━━━━━ 1/1 │
└──────────────────┴────────┴─────────────────────┴─────────────────────┴──────────┴─────────────────────────────┘
Last update: 2023/08/17 14:45:19 UTC
Similarly, you can use the monitor
method from the VibeWorkflowClient
, passing the run object as
an argument, as in client.monitor(run)
. This method also allows for monitoring multiple runs at
once, by passing a list of runs. For example:
time_range_list = [
(datetime(2020, 1, 1), datetime(2022, 1, 1)),
(datetime(2020, 7, 1), datetime(2022, 7, 1)),
(datetime(2020, 12, 1), datetime(2022, 12, 1)),
]
run_list = [
client.run("helloworld", f"Run {i}", geometry=geom, time_range=time_range)
for i, time_range in enumerate(time_range_list)
]
When calling client.monitor(run_list)
, the output will be a table with summarized information
of each run, along with the progress of its current task.
>>> client.montior(run_list)
🌎 FarmVibes.AI 🌍 Multi-Run Monitoring 🌏
Total duration: 00:01:08
┏━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Run Name ┃ Task Name ┃ Status ┃ Start Time ┃ End Time ┃ Duration ┃ Progress ┃
┡━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Run 0 │ │ done │ 2023/08/15 12:41:10 │ 2023/08/15 12:41:10 │ 00:00:00 │ ━━━━━━━━━━━━━━━━━━━━ 1/1 │
│ ↪ │ hello │ done │ 2023/08/15 12:41:10 │ 2023/08/15 12:41:10 │ 00:00:00 │ ━━━━━━━━━━━━━━━━━━━━ 1/1 │
├──────────┼───────────┼─────────┼─────────────────────┼─────────────────────┼──────────┼─────────────────────────────┤
│ Run 1 │ │ done │ 2023/08/15 12:41:10 │ 2023/08/15 12:41:17 │ 00:00:06 │ ━━━━━━━━━━━━━━━━━━━━ 1/1 │
│ ↪ │ hello │ done │ 2023/08/15 12:41:10 │ 2023/08/15 12:41:17 │ 00:00:06 │ ━━━━━━━━━━━━━━━━━━━━ 1/1 │
├──────────┼───────────┼─────────┼─────────────────────┼─────────────────────┼──────────┼─────────────────────────────┤
│ Run 2 │ │ running │ 2023/08/15 12:41:10 │ N/A │ 00:01:08 │ ━━━━━━━━━━━━━━━━━━━━ 0/1 │
│ ↪ │ hello │ running │ 2023/08/15 12:42:17 │ N/A │ 00:00:01 │ ━━━━━━━━━━━━━━━━━━━━ 0/1 │
└──────────┴───────────┴─────────┴─────────────────────┴─────────────────────┴──────────┴─────────────────────────────┘
Last update: 2023/08/15 12:42:18 UTC
The run call is asynchronous: the cluster will start working on your submission, but the interpreter
is free. To block the interpreter (e.g., in a script that needs to wait for a run to be done),
use block_until_complete()
. You can optionally define a timeout in seconds:
run.block_until_complete() # Will wait until the run is done
run.block_until_complete(timeout=60) # Will raise RuntimeError if the run is not done in 60s
To list all runs, use the list_runs
method. It will return a list of ids
for all runs in the
cluster. Run ids follow the UUID4 standard.
>>> client.list_runs()
['7b95932f-2428-4036-b4cc-14ef832bf8c2']
You can then obtain a VibeWorkflowRun
from the id with the get_run_by_id
method
# Get latest run
run_id = client.list_runs()[-1]
run = client.get_run_by_id(run_id)
After the run is done, the output
property will contain a dictionary with the outputs. The
dictionary keys are the workflow sinks. The outputs will be DataVibe
-like objects, that contain
metadata about the outputs, and references to the data files as assets. See the example below
>>> run.status
<RunStatus.done: 'done'>
>>> run.output.keys()
dict_keys(['raster'])
>>> out = run.output["raster"]
>>> out
[Raster(id='3339a6f3-1800-4c1a-9edd-5b791734f240', time_range=(datetime.datetime(2020, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)), bbox=(-122.142363, 47.667801, -122.106146, 47.681775), geometry={'type': 'Polygon', 'coordinates': [[[-122.106146, 47.681775], [-122.106146, 47.667801], [-122.142363, 47.667801], [-122.142363, 47.681775], [-122.106146, 47.681775]]]}, assets=[AssetVibe(type='image/tiff', id='baa45c36-648b-4a03-9f3f-51ec9ff9d061', path_or_url='/data/cache/farmvibes-ai/data/assets/baa45c36-648b-4a03-9f3f-51ec9ff9d061/baa45c36-648b-4a03-9f3f-51ec9ff9d061.tif', _is_local=True, _local_path='/data/cache/farmvibes-ai/data/assets/baa45c36-648b-4a03-9f3f-51ec9ff9d061/baa45c36-648b-4a03-9f3f-51ec9ff9d061.tif')], bands={'red': 0, 'blue': 1, 'green': 2})]
>>> out[0].assets # Check list of assets
[AssetVibe(type='image/tiff', id='baa45c36-648b-4a03-9f3f-51ec9ff9d061', path_or_url='/data/cache/farmvibes-ai/data/assets/baa45c36-648b-4a03-9f3f-51ec9ff9d061/baa45c36-648b-4a03-9f3f-51ec9ff9d061.tif', _is_local=True, _local_path='/data/cache/farmvibes-ai/data/assets/baa45c36-648b-4a03-9f3f-51ec9ff9d061/baa45c36-648b-4a03-9f3f-51ec9ff9d061.tif')]
>>> out[0].raster_asset.url # Get reference for the generated tiff file
'file:///data/cache/farmvibes-ai/data/assets/baa45c36-648b-4a03-9f3f-51ec9ff9d061/baa45c36-648b-4a03-9f3f-51ec9ff9d061.tif'
Instead of submitting a run with a built-in workflow name, it is also possible to send a workflow definition for a custom workflow. The workflow definition is a dictionary that defines sources (inputs), sinks (outputs), parameters, tasks, and edges. See the workflow documentation for more information on how the the structure and syntax of workflow definitions. Consider a case where we want to obtain NDVI rasters from Sentinel-2 imagery. We can do this by composing a workflow that downloads and preprocesses Sentinel-2 data, and a workflow that computes NDVI indices. The workflow definition is shown below:
name: custom_ndvi_workflow
sources:
user_input:
- s2.user_input
sinks:
ndvi: ndvi.index_raster
parameters:
pc_key:
tasks:
s2:
workflow: data_ingestion/sentinel2/preprocess_s2
parameters:
# This parameter will have it's value filled by the workflow parameter
pc_key: "@from(pc_key)"
ndvi:
workflow: data_processing/index/index
parameters:
# Set the index to NDVI
index: ndvi
edges:
- origin: s2.raster
destination:
- ndvi.raster
To submit the workflow, send the dictionary instead of a workflow name:
import yaml
with open("custom_ndvi_workflow.yaml") as f:
custom_wf = yaml.safe_load(f)
run = client.run(custom_wf, "Custom run name", geometry=my_geometry, time_range=my_time_range)
The custom workflow can be a composition of any of the available workflows. It is not possible to use a custom workflow as a task to another workflow.
In case you need to cancel an ongoing workflow run, use the VibeWorkflowRun.cancel
or
FarmvibesAiClient.cancel_run
methods. The status of run, along with queued and running tasks,
will be set to cancelled
.
>>> run.cancel()
'VibeWorkflowRun'(id='89252ae9-abbb-46f2-aac3-73836a016b96', name='Cancelled workflow run', workflow='helloworld', status='cancelled')
>>> run.monitor()
🌎 FarmVibes.AI 🌍 helloworld 🌏
Run name: Cancelled workflow run
Run id: 89252ae9-abbb-46f2-aac3-73836a016b96
Run status: cancelled
Run duration: 00:00:02
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Task Name ┃ Status ┃ Start Time ┃ End Time ┃ Duration ┃ Progress ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ hello │ cancelled │ 2023/08/15 12:48:18 │ 2023/08/15 12:48:20 │ 00:00:02 │ ━━━━━━━━━━━━━━━━━━━━ 0/1 │
└───────────┴───────────┴─────────────────────┴─────────────────────┴──────────┴─────────────────────────────┘
Last update: 2023/08/15 12:48:26 UTC
You can use the VibeWorkflowRun.delete
or FarmvibesAiClient.delete_run
methods to delete a
completed workflow run (i.e. a run with the a status of done
, failed
, or cancelled
). If the
deletion is successful, all cached data the workflow run produced that is not shared with other
workflow runs will be deleted and status will be set to deleted
.
For more information on how data in managed and cached in FarmVibes.AI, please refer to our Data Management user guide.