From c058a50d334476948bd82da6bfb4e9baab3fb9aa Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 15 Nov 2023 09:39:29 -0300 Subject: [PATCH 1/5] feat: execute inside Dask cluster --- .../rj_escritorio/flooding_detection/flows.py | 22 ++++++++++++++++--- .../rj_escritorio/flooding_detection/tasks.py | 20 +++++++++++------ 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index f79cdbd4e..4dc8367aa 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -3,8 +3,10 @@ Flow definition for flooding detection using AI. """ from prefect import Parameter +from prefect.executors import DaskExecutor from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.control_flow.filter import FilterTask from prefect.utilities.edges import unmapped from pipelines.constants import constants @@ -21,12 +23,17 @@ ) from pipelines.utils.decorators import Flow +filter_results = FilterTask( + filter_func=lambda x: not isinstance(x, (BaseException, type(None))) +) + with Flow( - name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API", + name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API (Dask)", code_owners=[ "gabriel", "diego", ], + skip_if_running=True, ) as rj_escritorio__flooding_detection__flow: # Parameters cameras_geodf_url = Parameter( @@ -76,17 +83,23 @@ number_mock_rain_cameras=mocked_cameras_number, ) openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path) - images = get_snapshot.map( + images, cameras = get_snapshot.map( camera=cameras, ) - predictions = get_prediction.map( + images = filter_results(images) + cameras = filter_results(cameras) + predictions, images, cameras = get_prediction.map( image=images, + camera=cameras, flooding_prompt=unmapped(openai_flooding_detection_prompt), openai_api_key=unmapped(openai_api_key), openai_api_model=unmapped(openai_api_model), openai_api_max_tokens=unmapped(openai_api_max_tokens), openai_api_url=unmapped(openai_api_url), ) + predictions = filter_results(predictions) + images = filter_results(images) + cameras = filter_results(cameras) update_flooding_api_data( predictions=predictions, cameras=cameras, @@ -98,6 +111,9 @@ rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_escritorio__flooding_detection__flow.executor = DaskExecutor( + address="tcp://prefect-support-cluster-scheduler.dask.svc.cluster.local:8786" +) rj_escritorio__flooding_detection__flow.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value], diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 24eae6879..60914ed3d 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -60,9 +60,10 @@ def get_openai_api_key(secret_path: str) -> str: return secret["api_key"] -@task +@task(nout=3) def get_prediction( image: str, + camera: Dict[str, Union[str, float]], flooding_prompt: str, openai_api_key: str, openai_api_model: str, @@ -122,16 +123,21 @@ def get_prediction( json_string = content.replace("```json\n", "").replace("\n```", "") json_object = json.loads(json_string) flooding_detected = json_object["flooding_detected"] - return { - "object": "alagamento", - "label": flooding_detected, - "confidence": 0.7, - } + return ( + { + "object": "alagamento", + "label": flooding_detected, + "confidence": 0.7, + }, + image, + camera, + ) @task( max_retries=3, retry_delay=timedelta(seconds=5), + nout=2, ) def get_snapshot( camera: Dict[str, Union[str, float]], @@ -162,7 +168,7 @@ def get_snapshot( img.save(buffer, format="JPEG") img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") log(f"Successfully got snapshot from URL {rtsp_url}.") - return img_b64 + return img_b64, camera @task From 661335a4919238dd2717187dc9b660f64c13a6b9 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 15 Nov 2023 09:47:10 -0300 Subject: [PATCH 2/5] fix: typing --- pipelines/rj_escritorio/flooding_detection/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 60914ed3d..f86053b1a 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -6,7 +6,7 @@ import json from pathlib import Path import random -from typing import Dict, List, Union +from typing import Dict, List, Tuple, Union import cv2 import geopandas as gpd @@ -69,7 +69,7 @@ def get_prediction( openai_api_model: str, openai_api_max_tokens: int = 300, openai_api_url: str = "https://api.openai.com/v1/chat/completions", -) -> Dict[str, Union[str, float, bool]]: +) -> Tuple[Dict[str, Union[str, float, bool]], str, Dict[str, Union[str, float]]]: """ Gets the flooding detection prediction from OpenAI API. @@ -141,7 +141,7 @@ def get_prediction( ) def get_snapshot( camera: Dict[str, Union[str, float]], -) -> str: +) -> Tuple[str, Dict[str, Union[str, float]]]: """ Gets a snapshot from a camera. From fb81bc4061ac6f0a3267c9a622e7baf039a10103 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 15 Nov 2023 10:00:13 -0300 Subject: [PATCH 3/5] fix: refactor for using single output --- .../rj_escritorio/flooding_detection/flows.py | 20 ++-- .../rj_escritorio/flooding_detection/tasks.py | 95 ++++++++++--------- 2 files changed, 59 insertions(+), 56 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index 4dc8367aa..f6ddfa54e 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -83,27 +83,23 @@ number_mock_rain_cameras=mocked_cameras_number, ) openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path) - images, cameras = get_snapshot.map( + cameras_with_image = get_snapshot.map( camera=cameras, ) - images = filter_results(images) - cameras = filter_results(cameras) - predictions, images, cameras = get_prediction.map( - image=images, - camera=cameras, + cameras_with_image = filter_results(cameras_with_image) + cameras_with_image_and_classification = get_prediction.map( + camera_with_image=cameras_with_image, flooding_prompt=unmapped(openai_flooding_detection_prompt), openai_api_key=unmapped(openai_api_key), openai_api_model=unmapped(openai_api_model), openai_api_max_tokens=unmapped(openai_api_max_tokens), openai_api_url=unmapped(openai_api_url), ) - predictions = filter_results(predictions) - images = filter_results(images) - cameras = filter_results(cameras) + cameras_with_image_and_classification = filter_results( + cameras_with_image_and_classification + ) update_flooding_api_data( - predictions=predictions, - cameras=cameras, - images=images, + cameras_with_image_and_classification=cameras_with_image_and_classification, data_key=redis_key_flooding_detection_data, last_update_key=redis_key_flooding_detection_last_update, predictions_buffer_key=redis_key_predictions_buffer, diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index f86053b1a..4fd6af7a3 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -62,8 +62,7 @@ def get_openai_api_key(secret_path: str) -> str: @task(nout=3) def get_prediction( - image: str, - camera: Dict[str, Union[str, float]], + camera_with_image: Dict[str, Union[str, float]], flooding_prompt: str, openai_api_key: str, openai_api_model: str, @@ -74,7 +73,14 @@ def get_prediction( Gets the flooding detection prediction from OpenAI API. Args: - image: The image in base64 format. + camera_with_image: The camera with image in the following format: + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + "image_base64": "base64...", + } flooding_prompt: The flooding prompt. openai_api_key: The OpenAI API key. openai_api_model: The OpenAI API model. @@ -108,7 +114,9 @@ def get_prediction( }, { "type": "image_url", - "image_url": {"url": f"data:image/jpeg;base64,{image}"}, + "image_url": { + "url": f"data:image/jpeg;base64,{camera_with_image['image_base64']}" + }, }, ], } @@ -123,21 +131,20 @@ def get_prediction( json_string = content.replace("```json\n", "").replace("\n```", "") json_object = json.loads(json_string) flooding_detected = json_object["flooding_detected"] - return ( + log(f"Successfully got prediction: {flooding_detected}") + camera_with_image["ai_classification"] = [ { "object": "alagamento", "label": flooding_detected, "confidence": 0.7, - }, - image, - camera, - ) + } + ] + return camera_with_image @task( max_retries=3, retry_delay=timedelta(seconds=5), - nout=2, ) def get_snapshot( camera: Dict[str, Union[str, float]], @@ -168,7 +175,8 @@ def get_snapshot( img.save(buffer, format="JPEG") img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") log(f"Successfully got snapshot from URL {rtsp_url}.") - return img_b64, camera + camera["image_base64"] = img_b64 + return camera @task @@ -267,9 +275,7 @@ def pick_cameras( @task def update_flooding_api_data( - predictions: List[Dict[str, Union[str, float, bool]]], - cameras: List[Dict[str, Union[str, float]]], - images: List[str], + cameras_with_image_and_classification: List[Dict[str, Union[str, float, bool]]], data_key: str, last_update_key: str, predictions_buffer_key: str, @@ -278,26 +284,25 @@ def update_flooding_api_data( Updates Redis keys with flooding detection data and last update datetime (now). Args: - predictions: The AI predictions in the following format: - [ - { - "object": "alagamento", - "label": True, - "confidence": 0.7, - }, - ... - ] - cameras: A list of cameras in the following format: - [ - { - "id_camera": "1", - "url_camera": "rtsp://...", - "latitude": -22.912, - "longitude": -43.230, - }, - ... - ] - images: A list of images in base64 format. + cameras_with_image_and_classification: The cameras with image and classification + in the following format: + [ + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + "image_base64": "base64...", + "ai_classification": [ + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + } + ], + }, + ... + ] data_key: The Redis key for the flooding detection data. last_update_key: The Redis key for the last update datetime. predictions_buffer_key: The Redis key for the predictions buffer. @@ -305,13 +310,13 @@ def update_flooding_api_data( # Build API data last_update = pendulum.now(tz="America/Sao_Paulo") api_data = [] - for prediction, camera, image in zip(predictions, cameras, images): + for camera_with_image_and_classification in cameras_with_image_and_classification: # Get AI classifications ai_classification = [] - current_prediction = prediction["label"] - predictions_buffer_camera_key = ( - f"{predictions_buffer_key}_{camera['id_camera']}" - ) + current_prediction = camera_with_image_and_classification["ai_classification"][ + 0 + ]["label"] + predictions_buffer_camera_key = f"{predictions_buffer_key}_{camera_with_image_and_classification['id_camera']}" # noqa predictions_buffer = redis_add_to_prediction_buffer( predictions_buffer_camera_key, current_prediction ) @@ -331,11 +336,13 @@ def update_flooding_api_data( api_data.append( { "datetime": last_update.to_datetime_string(), - "id_camera": camera["id_camera"], - "url_camera": camera["url_camera"], - "latitude": camera["latitude"], - "longitude": camera["longitude"], - "image_base64": image, + "id_camera": cameras_with_image_and_classification["id_camera"], + "url_camera": cameras_with_image_and_classification["url_camera"], + "latitude": cameras_with_image_and_classification["latitude"], + "longitude": cameras_with_image_and_classification["longitude"], + "image_base64": cameras_with_image_and_classification[ + "image_base64" + ], "ai_classification": ai_classification, } ) From c2133ef2d64d6ce5a07059191c0b0e3eb7cef16e Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 15 Nov 2023 10:15:19 -0300 Subject: [PATCH 4/5] chore: test with local dask executor --- .../rj_escritorio/flooding_detection/flows.py | 9 +-- poetry.lock | 71 ++++++------------- pyproject.toml | 7 +- 3 files changed, 29 insertions(+), 58 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index f6ddfa54e..fcbf815ac 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -3,7 +3,7 @@ Flow definition for flooding detection using AI. """ from prefect import Parameter -from prefect.executors import DaskExecutor +from prefect.executors import LocalDaskExecutor from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.control_flow.filter import FilterTask @@ -107,9 +107,10 @@ rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -rj_escritorio__flooding_detection__flow.executor = DaskExecutor( - address="tcp://prefect-support-cluster-scheduler.dask.svc.cluster.local:8786" -) +# rj_escritorio__flooding_detection__flow.executor = DaskExecutor( +# address="tcp://prefect-support-cluster-scheduler.dask.svc.cluster.local:8786" +# ) +rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor() rj_escritorio__flooding_detection__flow.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value], diff --git a/poetry.lock b/poetry.lock index c271b32bb..2ff193d43 100644 --- a/poetry.lock +++ b/poetry.lock @@ -258,8 +258,6 @@ files = [ ] [package.dependencies] -importlib-metadata = {version = "*", markers = "python_version < \"3.9\""} -importlib-resources = {version = "*", markers = "python_version < \"3.9\""} Mako = "*" SQLAlchemy = ">=1.3.0" typing-extensions = ">=4" @@ -625,34 +623,6 @@ files = [ {file = "backports.weakref-1.0.post1.tar.gz", hash = "sha256:bc4170a29915f8b22c9e7c4939701859650f2eb84184aee80da329ac0b9825c2"}, ] -[[package]] -name = "backports-zoneinfo" -version = "0.2.1" -description = "Backport of the standard library zoneinfo module" -optional = false -python-versions = ">=3.6" -files = [ - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:da6013fd84a690242c310d77ddb8441a559e9cb3d3d59ebac9aca1a57b2e18bc"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:89a48c0d158a3cc3f654da4c2de1ceba85263fafb861b98b59040a5086259722"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:1c5742112073a563c81f786e77514969acb58649bcdf6cdf0b4ed31a348d4546"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-win32.whl", hash = "sha256:e8236383a20872c0cdf5a62b554b27538db7fa1bbec52429d8d106effbaeca08"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:8439c030a11780786a2002261569bdf362264f605dfa4d65090b64b05c9f79a7"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:f04e857b59d9d1ccc39ce2da1021d196e47234873820cbeaad210724b1ee28ac"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:17746bd546106fa389c51dbea67c8b7c8f0d14b5526a579ca6ccf5ed72c526cf"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:5c144945a7752ca544b4b78c8c41544cdfaf9786f25fe5ffb10e838e19a27570"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-win32.whl", hash = "sha256:e55b384612d93be96506932a786bbcde5a2db7a9e6a4bb4bffe8b733f5b9036b"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:a76b38c52400b762e48131494ba26be363491ac4f9a04c1b7e92483d169f6582"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:8961c0f32cd0336fb8e8ead11a1f8cd99ec07145ec2931122faaac1c8f7fd987"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e81b76cace8eda1fca50e345242ba977f9be6ae3945af8d46326d776b4cf78d1"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7b0a64cda4145548fed9efc10322770f929b944ce5cee6c0dfe0c87bf4c0c8c9"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-win32.whl", hash = "sha256:1b13e654a55cd45672cb54ed12148cd33628f672548f373963b0bff67b217328"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:4a0f800587060bf8880f954dbef70de6c11bbe59c673c3d818921f042f9954a6"}, - {file = "backports.zoneinfo-0.2.1.tar.gz", hash = "sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2"}, -] - -[package.extras] -tzdata = ["tzdata"] - [[package]] name = "basedosdados" version = "2.0.0b14" @@ -1317,17 +1287,17 @@ files = [ [[package]] name = "dask" -version = "2023.5.0" +version = "2023.11.0" description = "Parallel PyData with Task Scheduling" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "dask-2023.5.0-py3-none-any.whl", hash = "sha256:32b34986519b7ddc0947c8ca63c2fc81b964e4c208dfb5cbf9f4f8aec92d152b"}, - {file = "dask-2023.5.0.tar.gz", hash = "sha256:4f4c28ac406e81b8f21b5be4b31b21308808f3e0e7c7e2f4a914f16476d9941b"}, + {file = "dask-2023.11.0-py3-none-any.whl", hash = "sha256:b950951ee3f8c86f003b577b6928ecf20089eee6677719578deaba8fd9a78203"}, + {file = "dask-2023.11.0.tar.gz", hash = "sha256:06b8f39755d37ff6ef4db422774db1f1d5d6788d33f0628c80861dc6452f878c"}, ] [package.dependencies] -click = ">=8.0" +click = ">=8.1" cloudpickle = ">=1.5.0" fsspec = ">=2021.09.0" importlib-metadata = ">=4.13.0" @@ -1338,11 +1308,11 @@ toolz = ">=0.10.0" [package.extras] array = ["numpy (>=1.21)"] -complete = ["dask[array,dataframe,diagnostics,distributed]", "lz4 (>=4.3.2)", "pyarrow (>=7.0)"] -dataframe = ["numpy (>=1.21)", "pandas (>=1.3)"] +complete = ["dask[array,dataframe,diagnostics,distributed]", "lz4 (>=4.3.2)", "pyarrow (>=7.0)", "pyarrow-hotfix"] +dataframe = ["dask[array]", "pandas (>=1.3)"] diagnostics = ["bokeh (>=2.4.2)", "jinja2 (>=2.10.3)"] -distributed = ["distributed (==2023.5.0)"] -test = ["pandas[test]", "pre-commit", "pytest", "pytest-rerunfailures", "pytest-xdist"] +distributed = ["distributed (==2023.11.0)"] +test = ["pandas[test]", "pre-commit", "pytest", "pytest-cov", "pytest-rerunfailures", "pytest-timeout", "pytest-xdist"] [[package]] name = "databricks-cli" @@ -1422,31 +1392,31 @@ files = [ [[package]] name = "distributed" -version = "2023.5.0" +version = "2023.11.0" description = "Distributed scheduler for Dask" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "distributed-2023.5.0-py3-none-any.whl", hash = "sha256:73ce33bd2460bd45ffc793ffdf9066bd2a3b6bbc65079f74f5147eafcda9b1cb"}, - {file = "distributed-2023.5.0.tar.gz", hash = "sha256:74e3f7f68d4dc435a3591ae1ad8ce7d5a11211fd22692e39c7e50aa11bf7e385"}, + {file = "distributed-2023.11.0-py3-none-any.whl", hash = "sha256:44ad1fff31ece202cc64bdb72dd33d6964d78bdbe1ec1ec06e01f9544187cd2e"}, + {file = "distributed-2023.11.0.tar.gz", hash = "sha256:6a5ab490bd966d83a04210d560ed72fbb0518e88d63b3a176b6370b296cbd8d5"}, ] [package.dependencies] click = ">=8.0" cloudpickle = ">=1.5.0" -dask = "2023.5.0" +dask = "2023.11.0" jinja2 = ">=2.10.3" locket = ">=1.0.0" msgpack = ">=1.0.0" packaging = ">=20.0" -psutil = ">=5.7.0" +psutil = ">=5.7.2" pyyaml = ">=5.3.1" sortedcontainers = ">=2.0.5" tblib = ">=1.6.0" toolz = ">=0.10.0" -tornado = ">=6.0.3" +tornado = ">=6.0.4" urllib3 = ">=1.24.3" -zict = ">=2.2.0" +zict = ">=3.0.0" [[package]] name = "docker" @@ -6395,7 +6365,7 @@ pandas = ">=0.25" patsy = ">=0.5.2" scipy = [ {version = ">=1.3", markers = "(python_version > \"3.9\" or platform_system != \"Windows\" or platform_machine != \"x86\") and python_version < \"3.12\""}, - {version = ">=1.3,<1.9", markers = "(python_version == \"3.8\" or python_version == \"3.9\") and platform_system == \"Windows\" and platform_machine == \"x86\""}, + {version = ">=1.3,<1.9", markers = "python_version == \"3.9\" and platform_system == \"Windows\" and platform_machine == \"x86\""}, ] [package.extras] @@ -6727,7 +6697,6 @@ files = [ ] [package.dependencies] -"backports.zoneinfo" = {version = "*", markers = "python_version < \"3.9\""} tzdata = {version = "*", markers = "platform_system == \"Windows\""} [package.extras] @@ -7264,5 +7233,5 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" -python-versions = ">=3.8.1,<3.11" -content-hash = "b788af3d35ff07ac9dcbda64936ef602c4196038307f0c602a47f5ce1c3d363d" +python-versions = ">=3.9,<3.11" +content-hash = "791af1416eb18fcc168766779d4a4ca8b1a26701a95aad234c34f474614e7a9d" diff --git a/pyproject.toml b/pyproject.toml index f08195f90..82e9ed73b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ version = "0.1.0" PyMySQL = { extras = ["rsa"], version = "^1.0.2" } Shapely = "^1.8.1" Unidecode = "^1.3.6" -basedosdados = {version = "2.0.0b14", extras = ["upload"]} +basedosdados = { version = "2.0.0b14", extras = ["upload"] } black = "20.8b1" bs4 = "^0.0.1" croniter = "^1.3.5" @@ -19,7 +19,7 @@ earthengine-api = "^0.1.334" elasticsearch = "^8.2.0" geobr = "^0.1.10" geojsplit = "^0.1.2" -geopandas = ">=0.7.0,<0.8.0" # This version range is due to `geobr` package +geopandas = ">=0.7.0,<0.8.0" # This version range is due to `geobr` package geopy = "^2.3.0" google-api-python-client = "^2.56.0" google-cloud-pubsub = "^2.17.1" @@ -45,7 +45,7 @@ phonenumbers = "^8.12.57" plotly = "^5.14.0" prefect = "0.15.9" pyproj = "^3.4.0" -python = ">=3.8.1,<3.11" +python = ">=3.9,<3.11" python-telegram-bot = "^13.11" pytz = "^2021.3" rasterio = "1.3a3" @@ -64,6 +64,7 @@ azure-storage-blob = "^12.17.0" icecream = "^2.1.3" pyodbc = "^5.0.1" h3 = "^3.7.6" +dask = "^2023.11.0" [tool.poetry.dev-dependencies] pylint = "^2.12.2" From a5396308baba07127345caa2a242869d2f1499e7 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 15 Nov 2023 10:42:30 -0300 Subject: [PATCH 5/5] feat: increase parallelism --- pipelines/rj_escritorio/flooding_detection/flows.py | 7 ++----- pipelines/rj_escritorio/flooding_detection/tasks.py | 11 +++++------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index fcbf815ac..179985df4 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -28,7 +28,7 @@ ) with Flow( - name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API (Dask)", + name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API", code_owners=[ "gabriel", "diego", @@ -107,10 +107,7 @@ rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# rj_escritorio__flooding_detection__flow.executor = DaskExecutor( -# address="tcp://prefect-support-cluster-scheduler.dask.svc.cluster.local:8786" -# ) -rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor() +rj_escritorio__flooding_detection__flow.executor = LocalDaskExecutor(num_workers=10) rj_escritorio__flooding_detection__flow.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value], diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 4fd6af7a3..cd0ecb66c 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# TODO: Make it resilient to camera failures import base64 from datetime import datetime, timedelta import io @@ -60,7 +59,7 @@ def get_openai_api_key(secret_path: str) -> str: return secret["api_key"] -@task(nout=3) +@task def get_prediction( camera_with_image: Dict[str, Union[str, float]], flooding_prompt: str, @@ -68,7 +67,7 @@ def get_prediction( openai_api_model: str, openai_api_max_tokens: int = 300, openai_api_url: str = "https://api.openai.com/v1/chat/completions", -) -> Tuple[Dict[str, Union[str, float, bool]], str, Dict[str, Union[str, float]]]: +) -> Dict[str, Union[str, float, bool]]: """ Gets the flooding detection prediction from OpenAI API. @@ -143,12 +142,12 @@ def get_prediction( @task( - max_retries=3, - retry_delay=timedelta(seconds=5), + max_retries=2, + retry_delay=timedelta(seconds=1), ) def get_snapshot( camera: Dict[str, Union[str, float]], -) -> Tuple[str, Dict[str, Union[str, float]]]: +) -> Dict[str, Union[str, float]]: """ Gets a snapshot from a camera.