diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py index bffe7e04f..0e831fbd4 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py @@ -34,7 +34,7 @@ @task( nout=2, - max_retries=constants.TASK_MAX_RETRIES.value, + max_retries=10, # constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def download_data() -> pd.DataFrame: diff --git a/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py b/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py index 610131a48..d60657398 100644 --- a/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py +++ b/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py @@ -47,7 +47,6 @@ # Required Libraries # ==================================================================== -import base64 import datetime import os import shutil @@ -69,7 +68,12 @@ import xarray as xr from pipelines.rj_cor.meteorologia.satelite.remap import remap -from pipelines.utils.utils import get_credentials_from_env, list_blobs_with_prefix, log +from pipelines.utils.utils import ( + get_credentials_from_env, + get_vault_secret, + list_blobs_with_prefix, + log, +) def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = "prod") -> str: @@ -362,8 +366,16 @@ def get_info(path: str) -> dict: # DSIF - Derived Stability Indices: 'CAPE', 'KI', 'LI', 'SI', 'TT' product_caracteristics["DSIF"] = { "variable": ["LI", "CAPE", "TT", "SI", "KI"], - "vmin": 0, - "vmax": 1000, + # "vmin": 0, + # "vmax": 1000, + "vmin": {"LI": -20, "CAPE": 0, "TT": 10, "SI": -20, "KI": 0}, + "vmax": {"LI": 20, "CAPE": 8000, "TT": 70, "SI": 20, "KI": 60}, + # https://www.star.nesdis.noaa.gov/goesr/documents/ATBDs/Enterprise/ATBD_Enterprise_Soundings_Legacy_Atmospheric_Profiles_v3.1_2019-11-01.pdf + # Lifted Index: --10 to 40 K + # CAPE: 0 to 5000 J/kg + # Showalter index: >4 to -10 K + # Total totals Index: -43 to > 56 + # K index: 0 to 40 "cmap": "jet", } # FDCF - Fire-Hot Spot Characterization: 'Area', 'Mask', 'Power', 'Temp' @@ -589,14 +601,37 @@ def get_variable_values(dfr: pd.DataFrame, variable: str) -> xr.DataArray: longitudes = list(matrix_temp.columns) latitudes = list(matrix_temp.index) - log("Convert to xr dataarray") + log("Convert df to xr data array") data_array = xr.DataArray( matrix, dims=("lat", "lon"), coords={"lon": longitudes, "lat": latitudes} ) - log("end") + log("Finished converting df to data array") + return data_array +# pylint: disable=dangerous-default-value +def get_point_value( + data_array: xr.DataArray, selected_point: list = [-22.89980, -43.35546] +) -> float: + """ + Find the nearest point on data_array from the selected_point and return its value + """ + + # Find the nearest index of latitude and longitude from selected_point + lat_idx = (data_array["lat"] - selected_point[0]).argmin().values + lon_idx = (data_array["lon"] - selected_point[1]).argmin().values + + # Get the correspondent value of this point + point_value = data_array.isel(lat=lat_idx, lon=lon_idx).values + log( + f"\nThe value of the selected point is {point_value}. It will be replace by 0 if is nan.\n" + ) + point_value = 0 if np.isnan(point_value) else float(point_value) + + return point_value + + # pylint: disable=unused-variable def create_and_save_image(data: xr.DataArray, info: dict, variable) -> Path: """ @@ -614,23 +649,41 @@ def create_and_save_image(data: xr.DataArray, info: dict, variable) -> Path: # Define the color scale based on the channel colormap = "jet" # White to black for IR channels # colormap = "gray_r" # White to black for IR channels + log(f"\nmax valueeeeeeeeeeeeeeeeee: {np.nanmax(data)} min value: {np.nanmin(data)}") + + variable = variable.upper() + vmin = info["vmin"] + vmax = info["vmax"] + if variable in ["LI", "CAPE", "TT", "SI", "KI"]: + vmin = vmin[variable] + vmax = vmax[variable] + # vmin = np.nanmin(data) + # vmax = np.nanmax(data) # Plot the image - img = axis.imshow(data, origin="upper", extent=img_extent, cmap=colormap, alpha=0.8) - - # # Find shapefile file "Limite_Bairros_RJ.shp" across the entire file system - # for root, dirs, files in os.walk(os.sep): - # if "Limite_Bairros_RJ.shp" in files: - # log(f"[DEBUG] ROOT {root}") - # shapefile_dir = root - # break - # else: - # print("File not found.") - - # Add coastlines, borders and gridlines - shapefile_dir = Path( - "/opt/venv/lib/python3.9/site-packages/pipelines/utils/shapefiles" + img = axis.imshow( + data, + origin="upper", + extent=img_extent, + cmap=colormap, + alpha=0.8, + vmin=vmin, + vmax=vmax, ) + + # Find shapefile file "Limite_Bairros_RJ.shp" across the entire file system + for root, dirs, files in os.walk(os.sep): + if "Limite_Bairros_RJ.shp" in files: + log(f"[DEBUG] ROOT {root}") + shapefile_dir = Path(root) + break + else: + print("File not found.") + + # # Add coastlines, borders and gridlines + # shapefile_dir = Path( + # "/opt/venv/lib/python3.9/site-packages/pipelines/utils/shapefiles" + # ) shapefile_path_neighborhood = shapefile_dir / "Limite_Bairros_RJ.shp" shapefile_path_state = shapefile_dir / "Limite_Estados_BR_IBGE.shp" @@ -683,27 +736,40 @@ def create_and_save_image(data: xr.DataArray, info: dict, variable) -> Path: if not output_image_path.exists(): output_image_path.mkdir(parents=True, exist_ok=True) - plt.savefig(save_image_path, bbox_inches="tight", pad_inches=0, dpi=300) - log("\n Ended saving image") + plt.savefig(save_image_path, bbox_inches="tight", pad_inches=0.1, dpi=80) + log(f"\n Ended saving image on {save_image_path}") return save_image_path -def upload_image_to_api(info: dict, save_image_path: Path): +# def upload_image_to_api(info: dict, save_image_path: Path): +def upload_image_to_api(var: str, save_image_path: Path, point_value: float): """ - Upload image to api + Upload image and point value to API. """ - username = "your-username" - password = "your-password" + # We need to change this variable so it can be posted on API + var = "cp" if var == "cape" else var + + log("Getting API url") + url_secret = get_vault_secret("rionowcast")["data"] + log(f"urlsecret1 {url_secret}") + url_secret = url_secret["url_api_satellite_products"] + log(f"urlsecret2 {url_secret}") + api_url = f"{url_secret}/{var.lower()}" + log( + f"\n Sending image {save_image_path} to API: {api_url} with value {point_value}\n" + ) - image = base64.b64encode(open(save_image_path, "rb").read()).decode() + payload = {"value": point_value} - response = requests.post( - "https://api.example.com/upload-image", - data={"image": image, "timestamp": info["datetime_save"]}, - auth=(username, password), - ) + # Convert from Path to string + save_image_path = str(save_image_path) + + with open(save_image_path, "rb") as image_file: + files = {"image": (save_image_path, image_file, "image/jpeg")} + response = requests.post(api_url, data=payload, files=files) if response.status_code == 200: - print("Image sent to API") + log("Finished the request successful!") + log(response.json()) else: - print("Problem senting imagem to API") + log(f"Error: {response.status_code}, {response.text}") diff --git a/pipelines/rj_cor/meteorologia/satelite/tasks.py b/pipelines/rj_cor/meteorologia/satelite/tasks.py index 6bec193c5..242292d35 100644 --- a/pipelines/rj_cor/meteorologia/satelite/tasks.py +++ b/pipelines/rj_cor/meteorologia/satelite/tasks.py @@ -10,6 +10,8 @@ from pathlib import Path from typing import Union +import requests + import pandas as pd import pendulum from prefect import task @@ -24,11 +26,13 @@ get_files_from_aws, get_files_from_gcp, get_info, + get_point_value, get_variable_values, remap_g16, save_data_in_file, + # upload_image_to_api, ) -from pipelines.utils.utils import log +from pipelines.utils.utils import log, get_vault_secret @task() @@ -225,7 +229,7 @@ def save_data(info: dict, mode_redis: str = "prod") -> Union[str, Path]: @task def create_image_and_upload_to_api(info: dict, output_filepath: Path): """ - Create image from dataframe and send it to API + Create image from dataframe, get the value of a point on the image and send these to API. """ dfr = pd.read_csv(output_filepath) @@ -237,12 +241,41 @@ def create_image_and_upload_to_api(info: dict, output_filepath: Path): var = var.lower() data_array = get_variable_values(dfr, var) + point_value = get_point_value(data_array) # Get the pixel values data = data_array.data[:] - log(f"\n[DEBUG] data {data}") + log(f"\n[DEBUG] {var} data \n{data}") + log(f"\nmax value: {data.max()} min value: {data.min()}") save_image_path = create_and_save_image(data, info, var) + log(f"\nStart uploading image for variable {var} on API\n") - # upload_image_to_api(info, save_image_path) + # upload_image_to_api(var, save_image_path, point_value) + var = "cp" if var == "cape" else var + + log("Getting API url") + url_secret = get_vault_secret("rionowcast")["data"] + log(f"urlsecret1 {url_secret}") + url_secret = url_secret["url_api_satellite_products"] + log(f"urlsecret2 {url_secret}") + api_url = f"{url_secret}/{var.lower()}" + log( + f"\n Sending image {save_image_path} to API: {api_url} with value {point_value}\n" + ) + + payload = {"value": point_value} + + # Convert from Path to string + save_image_path = str(save_image_path) + + with open(save_image_path, "rb") as image_file: + files = {"image": (save_image_path, image_file, "image/jpeg")} + response = requests.post(api_url, data=payload, files=files) + + if response.status_code == 200: + log("Finished the request successful!") + log(response.json()) + else: + log(f"Error: {response.status_code}, {response.text}") log(save_image_path) log(f"\nEnd uploading image for variable {var} on API\n")