Skip to content

Commit

Permalink
Micro climate prediction using Neighbors (DeepMC) (#185)
Browse files Browse the repository at this point in the history
This code changes are enhancements for deepmc. It help to find the
weather forecast for stations having no historical data or missing
sensor data. It will add new model created using Pytorch Graphical
Neural Network (GNN).

The code changes also having enhancements to existing deepmc scripts
that add datetime in preprocess output. It help to find data belong to
which date while performing GNN model training.

Co-authored-by: Naga Bilwanth Gangarapu <[email protected]>
  • Loading branch information
2 people authored and robertomest committed Aug 1, 2024
1 parent 75ef72c commit b711b42
Show file tree
Hide file tree
Showing 26 changed files with 3,808 additions and 909 deletions.
556 changes: 254 additions & 302 deletions notebooks/deepmc/mc_forecast.ipynb

Large diffs are not rendered by default.

69 changes: 23 additions & 46 deletions notebooks/deepmc/notebook_lib/forecast.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple, cast

Expand All @@ -8,7 +7,7 @@
from shapely.geometry import Point

from vibe_core.client import FarmvibesAiClient, get_default_vibe_client
from vibe_core.datamodel import RunConfig, RunConfigUser, RunDetails, SpatioTemporalJson
from vibe_core.datamodel import RunConfig, RunConfigUser, SpatioTemporalJson


class Forecast:
Expand All @@ -31,7 +30,8 @@ def submit_download_request(self):
"""
Submit request to worker to download forecast data
"""
run_list = []
run_metadata_list = []
runs = []
for parameter in self.parameters:
run_name = f"forecast_{parameter['weather_type']}"
run = self.client.run(
Expand All @@ -42,57 +42,40 @@ def submit_download_request(self):
parameters=parameter,
)

try:
run.block_until_complete(5)
except RuntimeError:
print(run)

run_list.append(
run_metadata_list.append(
{
"id": run.id,
"weather_type": parameter["weather_type"],
}
)
runs.append(run)

self.client.monitor(runs, 5)

return run_list
return run_metadata_list

def get_run_status(self, run_list: List[Dict[str, str]]):
clear_output(wait=True)
all_done = True
out_ = []
out = []
for run_item in run_list:
o = self.client.describe_run(run_item["id"])
print(f"Execution status for {run_item['weather_type']}: {o.details.status}")

if o.details.status == "done":
out_.append(o)
elif o.details.status == "failed":
print(o.details)
out.append(o)
else:
all_done = False
cnt_complete = 0
for key, value in o.task_details.items():
value = cast(RunDetails, value)
assert value.subtasks is not None, "Subtasks don't exist"
for subtask in value.subtasks:
if subtask.status == "done":
cnt_complete += 1
print(
"\t",
f"Subtask {key}",
cnt_complete,
"/",
len(value.subtasks),
)
cnt_complete = 0
return all_done, out_
raise Exception(
f"Execution status for {run_item['weather_type']}: {o.details.status}"
)

return out

def get_all_assets(self, details: RunConfigUser):
asset_files = []
output = details.output["weather_forecast"]
record: Dict[str, Any]
for record in cast(List[Dict[str, Any]], output):
for _, value in record["assets"].items():
for value in record["assets"].values():
asset_files.append(value["href"])
df_assets = [pd.read_csv(f, index_col=False) for f in asset_files]
df_out = pd.concat(df_assets)
Expand All @@ -104,21 +87,15 @@ def get_downloaded_data(self, run_list: List[Dict[str, str]], offset_hours: int
check the download status. If status is done, fetch the downloaded data
"""
forecast_dataset = pd.DataFrame()
status = False
out_ = []
while status is False:
status, out_ = self.get_run_status(run_list)
time.sleep(10)

if status:
for detail in out_:
df = self.get_all_assets(detail)
out = self.get_run_status(run_list)
for detail in out:
df = self.get_all_assets(detail)

# Offset from UTC to specified timezone
df.index = df.index + pd.offsets.Hour(offset_hours)
# Offset from UTC to specified timezone
df.index = df.index + pd.offsets.Hour(offset_hours)

if not df.empty:
forecast_dataset = pd.concat([forecast_dataset, df], axis=1)
if not df.empty:
forecast_dataset = pd.concat([forecast_dataset, df], axis=1)

return forecast_dataset

Expand Down
4 changes: 2 additions & 2 deletions notebooks/deepmc/notebook_lib/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ def training_step(self, train_batch: Tensor, _):
x, y = train_batch[:6], train_batch[6]
y_hat = self.deepmc(x)
loss = self.loss(y_hat, y)
self.log("train_loss/total", loss)
self.log("train_loss/total", loss, on_epoch=True, prog_bar=True, logger=True, on_step=True)
return loss

def validation_step(self, validation_batch: Tensor, _):
x, y = validation_batch[:6], validation_batch[6]
y_hat = self.deepmc(x)
loss = self.loss(y_hat, y)
self.log("val_loss/total", loss, on_epoch=True)
self.log("val_loss/total", loss, on_epoch=True, prog_bar=True, logger=True, on_step=True)
return loss


Expand Down
34 changes: 0 additions & 34 deletions notebooks/deepmc/notebook_lib/post_models.py

This file was deleted.

Loading

0 comments on commit b711b42

Please sign in to comment.