diff --git a/notebooks/deepmc/mc_forecast.ipynb b/notebooks/deepmc/mc_forecast.ipynb
index d41b43b7..7fbfa798 100755
--- a/notebooks/deepmc/mc_forecast.ipynb
+++ b/notebooks/deepmc/mc_forecast.ipynb
@@ -15,7 +15,9 @@
"```bash\n",
"$ micromamba env create -f ./deepmc_env.yaml\n",
"$ micromamba activate deepmc-pytorch\n",
- "```\n"
+ "```\n",
+ "\n",
+ "**We currently only support Unix-based systems (Linux and MacOS) for running this notebook.**"
]
},
{
@@ -55,33 +57,22 @@
},
{
"cell_type": "code",
- "execution_count": 1,
+ "execution_count": null,
"metadata": {},
- "outputs": [
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "/home/azureuser/.conda/envs/deepmc-pytorch/lib/python3.8/site-packages/torchvision/io/image.py:11: UserWarning: Failed to load image Python extension: /home/azureuser/.conda/envs/deepmc-pytorch/lib/python3.8/site-packages/torchvision/image.so: undefined symbol: _ZNK3c1010TensorImpl36is_contiguous_nondefault_policy_implENS_12MemoryFormatE\n",
- " warn(f\"Failed to load image Python extension: {e}\")\n"
- ]
- }
- ],
+ "outputs": [],
"source": [
- "import pandas as pd\n",
- "import numpy as np\n",
+ "import warnings\n",
+ "from datetime import datetime\n",
"\n",
- "from datetime import datetime, timedelta\n",
+ "import numpy as np\n",
+ "import pandas as pd\n",
"from matplotlib import pyplot as plt\n",
- "\n",
- "from shapely import geometry\n",
- "\n",
- "from notebook_lib import utils\n",
- "from notebook_lib import prediction\n",
"from notebook_lib import train\n",
- "from notebook_lib.forecast import Forecast\n",
+ "from shapely import geometry\n",
+ "\n",
+ "from vibe_notebook.deepmc import prediction, utils\n",
+ "from vibe_notebook.deepmc.forecast import Forecast\n",
"\n",
- "import warnings\n",
"warnings.filterwarnings(\"ignore\")"
]
},
@@ -90,7 +81,7 @@
"metadata": {},
"source": [
"### Workflows\n",
- "The notebook utilize below workflows available in farmvibes"
+ "The notebook utilizes the workflow below, which is available in FarmVibes.AI: "
]
},
{
@@ -107,9 +98,9 @@
"metadata": {},
"source": [
"### Data\n",
- "The notebook utilizing two types of datasets\n",
+ "The notebook utilizes two types of datasets:\n",
"\n",
- "1. The historical observations recorded by weather stations\n",
+ "1. The historical observations recorded by weather stations.\n",
"2. The forecast observations downloaded using the [herbie package](https://blaylockbk.github.io/Herbie/_build/html/). This package helps to download recent and archived numerical weather prediction (NWP) model output from different cloud archive sources. Its most popular capability is to download HRRR model data.\n"
]
},
@@ -118,7 +109,7 @@
"metadata": {},
"source": [
"### AGWeatherNet\n",
- "In this notebook, we utilize historical observations downloaded from AGWeatherNet for a station \\\"Palouse\\\". The data used for training range from May 2020 to June 2022. For more information check [AGWeatherNet documentation](http://weather.wsu.edu/?p=92850&desktop)."
+ "In this notebook, we utilize historical observations downloaded from AGWeatherNet for the station `Palouse`. The data used for training range from May 2020 to June 2022. For more information check [AGWeatherNet documentation](http://weather.wsu.edu/?p=92850&desktop)."
]
},
{
@@ -146,8 +137,8 @@
"metadata": {},
"outputs": [],
"source": [
- "PREDICT=\"%s\"\n",
- "RELEVANT=\"%s\"\n",
+ "PREDICT = \"%s\"\n",
+ "RELEVANT = \"%s\"\n",
"ROOT_PATH = f\"./data/model_{PREDICT}/\"\n",
"DATA_EXPORT_PATH = ROOT_PATH + f\"{STATION_NAME}/{RELEVANT}/train_data.pkl\""
]
@@ -166,7 +157,7 @@
"outputs": [],
"source": [
"# weather dataset filtered and model training limited to train features.\n",
- "HISTORICAL_MODEL_TRAIN_FEATURES = ['humidity', 'wind_speed', 'temperature']\n",
+ "HISTORICAL_MODEL_TRAIN_FEATURES = [\"humidity\", \"wind_speed\", \"temperature\"]\n",
"\n",
"# Historical data aligned using INDEX variable\n",
"INDEX = \"date\""
@@ -203,7 +194,7 @@
"outputs": [],
"source": [
"# Models trained to predict out features\n",
- "OUT_FEATURES = ['wind_speed' , 'temperature']"
+ "OUT_FEATURES = [\"wind_speed\", \"temperature\"]"
]
},
{
@@ -211,14 +202,16 @@
"metadata": {},
"source": [
"### Relevant vs Not Relevant\n",
- "The notebook support performing micro climate predictions with below approaches. \n",
+ "The scenario is considered relevant when there is a close match between historical data and forecasts, with minimal discrepancies else it's not relevant.\n",
+ "\n",
+ "The notebook supports performing micro climate predictions with the following approaches: \n",
"\n",
"1. Utilizing both Historical & Forecast observations. This approach is suggested to use if both observations are relevant. \n",
"\n",
"2. Utilizing only Historical dataset. This approach is suggested to use if both Historical & Forecast observations are not relevant or Forecast dataset doesn't exist. \n",
"\n",
"\n",
- "In next cells, demonstrated training & prediction process for both relevant and not relevant scenarios. "
+ "In next cells, we demonstrate the training and prediction processes for both relevant and non-relevant scenarios. "
]
},
{
@@ -270,7 +263,7 @@
"1. The index variable is converted to datetime\n",
"2. The input data is interpolated to fill the missing values using their neighbors\n",
"3. The script focuses on training the model with a 60-minute frequency, hence the data is grouped for this frequency.\n",
- "4. The data is scaled using the scikit-learn StandardScalar. For more information check [scikit-learn documentaion](https://github.com/scikit-learn/scikit-learn)"
+ "4. The data is scaled using the scikit-learn StandardScalar. For more information check [scikit-learn documentation](https://github.com/scikit-learn/scikit-learn)"
]
},
{
@@ -296,7 +289,7 @@
"metadata": {},
"outputs": [],
"source": [
- "historical_dataset = utils.get_csv_data(path=file_path)"
+ "historical_dataset = utils.get_csv_data(path=file_path, interpolate=False, fill_na=False)"
]
},
{
@@ -319,7 +312,7 @@
" - humidity - \"RH:2 m\" \n",
" - wind speed - The forecast observations of wind speed are derived using data downloaded for u & v components. The algebraic expression used to calculate wind speed is \n",
" $$ \n",
- " ws = \\sqrt{u^2 + v^2}\n",
+ " ws(u, v) = \\sqrt{u^2 + v^2}\n",
" $$\n",
" i. u component - \"UGRD:10 m\" \n",
" ii. v component - \"VGRD:10 m\""
@@ -348,24 +341,14 @@
"start_date = datetime(year=2020, month=5, day=31)\n",
"end_date = datetime(year=2022, month=8, day=2)\n",
"time_range = (start_date, end_date)\n",
- "date_column=\"date\"\n",
+ "date_column = \"date\"\n",
"\n",
- "parameters = [{\n",
- " \"weather_type\": \"temperature\",\n",
- " \"search_text\": \"TMP:2 m\"\n",
- " },\n",
- " {\n",
- " \"weather_type\": \"humidity\",\n",
- " \"search_text\": \"RH:2 m\"\n",
- " },\n",
- " {\n",
- " \"weather_type\": \"u-component\",\n",
- " \"search_text\": \"UGRD:10 m\"\n",
- " },\n",
- " {\n",
- " \"weather_type\": \"v-component\",\n",
- " \"search_text\": \"VGRD:10 m\"\n",
- " }]"
+ "parameters = [\n",
+ " {\"weather_type\": \"temperature\", \"search_text\": \"TMP:2 m\"},\n",
+ " {\"weather_type\": \"humidity\", \"search_text\": \"RH:2 m\"},\n",
+ " {\"weather_type\": \"u-component\", \"search_text\": \"UGRD:10 m\"},\n",
+ " {\"weather_type\": \"v-component\", \"search_text\": \"VGRD:10 m\"},\n",
+ "]"
]
},
{
@@ -373,7 +356,7 @@
"metadata": {},
"source": [
"### Submit Request to Worker\n",
- "Download forecast observations by submitting request to worker running in background. If more than one worker instance running in background, it process the request in parallel for each parameter. Workflow execution utilize below parameters while processing requests, this can be overwritten using the parameter argument.\n",
+ "We download forecast observations by submitting a request to the worker running in the background. If more than one worker instance is running in the background, the request is processed in parallel for each parameter. Workflow execution utilizes the parameters below while processing requests, this can be overwritten using the parameter argument.\n",
"\n",
"- fxx: [1, 25, 1] # start, stop, step\n",
"- search_text: \"TMP:2 m\"\n",
@@ -388,24 +371,48 @@
"metadata": {},
"outputs": [
{
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "'VibeWorkflowRun'(id='d7c0dc6a-339f-45b9-81d1-2fb93d2938f6', name='forecast_temperature', workflow='data_ingestion/weather/herbie_forecast', status='done')\n",
- "'VibeWorkflowRun'(id='61d952d1-b068-4c2c-b522-a680efed450f', name='forecast_humidity', workflow='data_ingestion/weather/herbie_forecast', status='running')\n",
- "'VibeWorkflowRun'(id='8c95f7ab-6d6b-40e8-a3bd-c12b854d0a7b', name='forecast_u-component', workflow='data_ingestion/weather/herbie_forecast', status='running')\n",
- "'VibeWorkflowRun'(id='7490cd70-9731-4cac-ab36-051d3903776a', name='forecast_v-component', workflow='data_ingestion/weather/herbie_forecast', status='running')\n"
- ]
+ "data": {
+ "text/html": [
+ "
\n"
+ ],
+ "text/plain": []
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "c00b48cb983f4c2184d411cd346f2bdb",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Output()"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n"
+ ],
+ "text/plain": []
+ },
+ "metadata": {},
+ "output_type": "display_data"
}
],
"source": [
- "forecast_ = Forecast(\n",
- " workflow_name=HERBIE_DOWNLOAD_WORKFLOW,\n",
- " geometry=STATION_GEOMETRY,\n",
- " time_range=time_range,\n",
- " parameters=parameters,\n",
- " )\n",
- "run_list = forecast_.submit_download_request()"
+ "forecast = Forecast(\n",
+ " workflow_name=HERBIE_DOWNLOAD_WORKFLOW,\n",
+ " geometry=STATION_GEOMETRY,\n",
+ " time_range=time_range,\n",
+ " parameters=parameters,\n",
+ ")\n",
+ "run_list = forecast.submit_download_request()"
]
},
{
@@ -413,12 +420,14 @@
"metadata": {},
"source": [
"### Monitor download of Forecast observations\n",
- "Check the download status and fetch the downloaded data from the cluster running in backend. The execution time of download depends on time_range. The downloaded data undergoes below changes.\n",
+ "Check the download status and fetch the downloaded data from FarmVibes.AI. The execution time of the download depends on the time range. \n",
+ "\n",
+ "The downloaded data undergoes the following changes:\n",
"\n",
- "1. concatenate the output of all requests submitted.\n",
+ "1. Concatenate the output of all submitted requests.\n",
"2. Set index on date column.\n",
- "3. Does interpolate to derive the missing data.\n",
- "4. The data downloaded follows the utc timezone. It's required to transform the data to the timezone of historical observations. The historical observations used in this notebook follows pst timezone, hence the data offset by -8 hours."
+ "3. Interpolate to derive the missing data.\n",
+ "4. The data downloaded follows the UTC timezone. It's required to transform the data to the timezone of historical observations. The historical observations used in this notebook follows the PST timezone, hence the data is offset by -8 hours."
]
},
{
@@ -501,14 +510,14 @@
"2020-05-30 17:00:00 -2.861307 1.178179 "
]
},
- "execution_count": 15,
+ "execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# transform downloaded data from utc to pst timezone\n",
- "forecast_dataset = forecast_.get_downloaded_data(run_list=run_list, offset_hours=-8)\n",
+ "forecast_dataset = forecast.get_downloaded_data(run_list=run_list, offset_hours=-8)\n",
"forecast_dataset.to_csv(f\"{STATION_NAME}_forecast.csv\")\n",
"forecast_dataset.head(2)"
]
@@ -517,109 +526,40 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "### Preprocess Forecast Observations\n",
- "Below preprocessing performed on downloaded data before performing model training.\n",
- "\n",
- "- Temperature: The downloaded data has units \"kelvin\". It will be converted to Fahrenheit.\n",
- "- wind_speed: Using the u-component & v-component values downloaded, the wind_speed values derived. The derived values multiplied by 2.23 to convert from m/sec to mph\n",
- "- drop u-component & v-component"
+ "### Preprocess Forecast Observations"
]
},
{
- "cell_type": "code",
- "execution_count": 16,
+ "cell_type": "markdown",
"metadata": {},
- "outputs": [],
"source": [
- "# Temperature\n",
- "# convert kelvin to celsius\n",
- "forecast_dataset[\"temperature_forecast\"] = forecast_dataset[\"temperature_forecast\"]-273.15\n",
+ "We perform the following preprocessing in the downloaded data before training the model.\n",
"\n",
- "# convert celsius to Fahrenheit\n",
- "forecast_dataset[\"temperature_forecast\"] = forecast_dataset[\"temperature_forecast\"].apply(lambda x: (x * 9/5) + 32)"
+ "- `temperature`: The downloaded data is in Kelvin. It will be converted to Fahrenheit.\n",
+ "- `wind_speed`: Using the u-component & v-component values downloaded, the `wind_speed` values are derived. The derived values are multiplied by 2.23 to convert from m/sec to mph.\n",
+ "- Drop u-component & v-component"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
- "outputs": [
- {
- "data": {
- "text/html": [
- "
\n",
- "\n",
- "
\n",
- " \n",
- "
\n",
- "
\n",
- "
temperature_forecast
\n",
- "
humidity_forecast
\n",
- "
wind_speed_forecast
\n",
- "
\n",
- "
\n",
- "
date
\n",
- "
\n",
- "
\n",
- "
\n",
- "
\n",
- " \n",
- " \n",
- "
\n",
- "
2020-05-30 16:00:00
\n",
- "
84.173633
\n",
- "
49.299999
\n",
- "
7.025768
\n",
- "
\n",
- "
\n",
- "
2020-05-30 17:00:00
\n",
- "
82.149731
\n",
- "
54.599998
\n",
- "
6.900466
\n",
- "
\n",
- " \n",
- "
\n",
- "
"
- ],
- "text/plain": [
- " temperature_forecast humidity_forecast \\\n",
- "date \n",
- "2020-05-30 16:00:00 84.173633 49.299999 \n",
- "2020-05-30 17:00:00 82.149731 54.599998 \n",
- "\n",
- " wind_speed_forecast \n",
- "date \n",
- "2020-05-30 16:00:00 7.025768 \n",
- "2020-05-30 17:00:00 6.900466 "
- ]
- },
- "execution_count": 17,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
+ "outputs": [],
"source": [
- "# wind_speed\n",
- "# multiplying with 2.23 to convert wind speed from m/sec to mph\n",
- "forecast_dataset[\"wind_speed_forecast\"] = forecast_dataset.apply(lambda x: np.sqrt(np.square(x[\"u-component_forecast\"]) + \n",
- " np.square(x[\"v-component_forecast\"]))*2.23, axis=1)\n",
- "\n",
- "forecast_dataset.drop(columns=[\"u-component_forecast\", \"v-component_forecast\"], inplace=True)\n",
+ "forecast_dataset = utils.convert_forecast_data(forecast_dataset)\n",
"forecast_dataset.head(2)"
]
},
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We also clean the input data with the following operations:\n",
+ "- Exclude input data outside the time_range of interest.\n",
+ "- Shift forecast data by number of hours\n",
+ "- Fill missing data with neighboring data points using pandas interpolate techniques."
+ ]
+ },
{
"cell_type": "code",
"execution_count": 18,
@@ -698,18 +638,21 @@
"2020-07-06 01:00:00 57.220984 3.85 10.642863 "
]
},
- "execution_count": 18,
+ "execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "input_df = utils.clean_relevant_data(\n",
- " actual_df=historical_dataset, \n",
- " forecast_df=forecast_dataset, \n",
- " out_variables=RELEVANT_FEATURES,\n",
- " freq_hours=frequency_hour,\n",
- " num_of_indices=number_of_hours)\n",
+ "input_df = utils.clean_relevant_data_using_hrrr(\n",
+ " actual_df=historical_dataset,\n",
+ " forecast_df=forecast_dataset,\n",
+ " out_variables=RELEVANT_FEATURES,\n",
+ " freq_hours=frequency_hour,\n",
+ " num_of_indices=number_of_hours,\n",
+ " start_date=start_date,\n",
+ " end_date=end_date,\n",
+ ")\n",
"input_df.head(2)"
]
},
@@ -717,12 +660,12 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "### Verifying the forecast observations are relevant or not relevant"
+ "### Verifying if the forecast observations are relevant or not relevant"
]
},
{
"cell_type": "code",
- "execution_count": 19,
+ "execution_count": 20,
"metadata": {},
"outputs": [
{
@@ -731,7 +674,7 @@
""
]
},
- "execution_count": 19,
+ "execution_count": 21,
"metadata": {},
"output_type": "execute_result"
},
@@ -747,7 +690,7 @@
}
],
"source": [
- "plot_df = input_df[(input_df.index.month==7) & (input_df.index.year==2020)]\n",
+ "plot_df = input_df[(input_df.index.month == 7) & (input_df.index.year == 2020)]\n",
"\n",
"plt.figure(figsize=(20, 4))\n",
"plt.plot(plot_df.index.values, plot_df[\"temperature_forecast\"].values, label=\"forecast\")\n",
@@ -759,7 +702,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "Based on the distribution of observation in above plot, the forecast observations are relevant. In this scenario continue with model training process using relevant dataset."
+ "Based on the distribution of observation in above plot, the forecast observations are relevant. In this scenario, we will continue with model training using the relevant dataset."
]
},
{
@@ -768,7 +711,7 @@
"source": [
"### Training\n",
"\n",
- "The script is configured to train the Micro Climate prediction model for 24 hours and the historical weather station data has points with a 60-minute frequency. Below inputs vary based on number of hours of prediction and frequency of weather station data points.\n",
+ "The script is configured to train the Micro Climate prediction model for 24 hours and the historical weather station data has points with a 60-minute frequency. Below inputs vary based on the number of hours of prediction and frequency of weather station data points.\n",
"\n",
"1. `chunk_size` - The value of the chunk size is based on the frequency of the weather station data points. For a frequency of 60 minutes, the minimum required data points are 528. If the data frequency is 15 minutes, the minimum number of data points required is 528*4 = 2112. These are the minimum number of data points need to be provided as input during the inference.\n",
"2. `ts_lookahead` - The value used during the data preprocessing. It is the value used to consider weather data points ahead for a given time period while grouping the data.\n",
@@ -791,7 +734,7 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 22,
"metadata": {
"tags": []
},
@@ -803,7 +746,8 @@
" root_path=ROOT_PATH,\n",
" data_export_path=DATA_EXPORT_PATH,\n",
" station_name=STATION_NAME,\n",
- " relevant=True)\n",
+ " relevant=True,\n",
+ ")\n",
"\n",
"train_weather.train_model(input_df)"
]
@@ -820,26 +764,27 @@
"metadata": {},
"source": [
"### Current\n",
- "Predict weather for the next 24 hours. To predict weather for next 24 hours it is required to certain hours of historical forecast observations, the default size called chunk size of historical forecast observations is 528. Choosing start time of prediction is important, if historical observations used to train model has the start time of 12:00:00 then the historical observations used for prediction should start at the same time."
+ "To predict the weather for next 24 hours, we need certain hours of historical forecast observations beforehand. The default size (chunk size) of historical forecast observations is 528. Choosing a start time to predict is important, if historical observations used to train model have a starting time of 12:00:00, then the historical observations used for prediction should start at the same time."
]
},
{
"cell_type": "code",
- "execution_count": 20,
+ "execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"weather_forecast = prediction.InferenceWeather(\n",
- " root_path=ROOT_PATH,\n",
- " data_export_path=DATA_EXPORT_PATH,\n",
- " station_name=STATION_NAME,\n",
- " predicts=OUT_FEATURES,\n",
- " relevant=True)"
+ " root_path=ROOT_PATH,\n",
+ " data_export_path=DATA_EXPORT_PATH,\n",
+ " station_name=STATION_NAME,\n",
+ " predicts=OUT_FEATURES,\n",
+ " relevant=True,\n",
+ ")"
]
},
{
"cell_type": "code",
- "execution_count": 21,
+ "execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
@@ -848,55 +793,69 @@
"p_end_date = datetime(year=2022, month=6, day=3, hour=0, minute=0, second=0)\n",
"\n",
"time_range = (p_start_date, p_end_date)\n",
- "date_column=\"date\"\n",
+ "date_column = \"date\"\n",
"\n",
- "parameters = [{\n",
- " \"weather_type\": \"temperature\",\n",
- " \"search_text\": \"TMP:2 m\"\n",
- " },\n",
- " {\n",
- " \"weather_type\": \"humidity\",\n",
- " \"search_text\": \"RH:2 m\"\n",
- " },\n",
- " {\n",
- " \"weather_type\": \"u-component\",\n",
- " \"search_text\": \"UGRD:10 m\"\n",
- " },\n",
- " {\n",
- " \"weather_type\": \"v-component\",\n",
- " \"search_text\": \"VGRD:10 m\"\n",
- " }]"
+ "parameters = [\n",
+ " {\"weather_type\": \"temperature\", \"search_text\": \"TMP:2 m\"},\n",
+ " {\"weather_type\": \"humidity\", \"search_text\": \"RH:2 m\"},\n",
+ " {\"weather_type\": \"u-component\", \"search_text\": \"UGRD:10 m\"},\n",
+ " {\"weather_type\": \"v-component\", \"search_text\": \"VGRD:10 m\"},\n",
+ "]"
]
},
{
"cell_type": "code",
- "execution_count": 22,
+ "execution_count": 25,
"metadata": {},
"outputs": [
{
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "'VibeWorkflowRun'(id='ea662424-c9c5-4d1f-8d1c-ed907f0176ea', name='forecast_temperature', workflow='data_ingestion/weather/herbie_forecast', status='done')\n",
- "'VibeWorkflowRun'(id='e93b52c2-9c20-4bf2-b647-a7dc04ff4947', name='forecast_humidity', workflow='data_ingestion/weather/herbie_forecast', status='done')\n",
- "'VibeWorkflowRun'(id='5cbd7199-626c-43dc-aa73-17639c97bc30', name='forecast_u-component', workflow='data_ingestion/weather/herbie_forecast', status='done')\n",
- "'VibeWorkflowRun'(id='e9373c58-5730-4b03-aee4-83015ab08848', name='forecast_v-component', workflow='data_ingestion/weather/herbie_forecast', status='done')\n"
- ]
+ "data": {
+ "text/html": [
+ "\n"
+ ],
+ "text/plain": []
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "04e890c9992c4710acdbb661b49c1e56",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Output()"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n"
+ ],
+ "text/plain": []
+ },
+ "metadata": {},
+ "output_type": "display_data"
}
],
"source": [
- "forecast_ = Forecast(\n",
- " workflow_name=HERBIE_DOWNLOAD_WORKFLOW,\n",
- " geometry=STATION_GEOMETRY,\n",
- " time_range=time_range,\n",
- " parameters=parameters,\n",
- " )\n",
- "run_list = forecast_.submit_download_request()"
+ "forecast = Forecast(\n",
+ " workflow_name=HERBIE_DOWNLOAD_WORKFLOW,\n",
+ " geometry=STATION_GEOMETRY,\n",
+ " time_range=time_range,\n",
+ " parameters=parameters,\n",
+ ")\n",
+ "run_list = forecast.submit_download_request()"
]
},
{
"cell_type": "code",
- "execution_count": 23,
+ "execution_count": 26,
"metadata": {},
"outputs": [
{
@@ -974,21 +933,21 @@
"2022-03-17 17:00:00 4.563419 1.176411 "
]
},
- "execution_count": 23,
+ "execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# transform downloaded data from utc to pst timezone\n",
- "p_forecast_dataset = forecast_.get_downloaded_data(run_list=run_list, offset_hours=-8)\n",
+ "p_forecast_dataset = forecast.get_downloaded_data(run_list=run_list, offset_hours=-8)\n",
"p_forecast_dataset.to_csv(f\"{STATION_NAME}_forecast.csv\")\n",
"p_forecast_dataset.head(2)"
]
},
{
"cell_type": "code",
- "execution_count": 24,
+ "execution_count": 28,
"metadata": {},
"outputs": [
{
@@ -1047,14 +1006,14 @@
"2022-03-18 14:00:00 66.300 16.175 50.075"
]
},
- "execution_count": 24,
+ "execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"predict_file_path = f\"./data/{STATION_NAME}/prediction.csv\"\n",
- "p_historical_dataset = utils.get_csv_data(path=predict_file_path)\n",
+ "p_historical_dataset = utils.get_csv_data(path=predict_file_path, interpolate=False, fill_na=False)\n",
"p_historical_dataset = p_historical_dataset[HISTORICAL_MODEL_TRAIN_FEATURES]\n",
"\n",
"p_historical_dataset.head(2)"
@@ -1062,35 +1021,26 @@
},
{
"cell_type": "code",
- "execution_count": 25,
+ "execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
- "# Temperature\n",
- "# convert kelvin to celsius\n",
- "p_forecast_dataset[\"temperature_forecast\"] = p_forecast_dataset[\"temperature_forecast\"]-273.15\n",
- "\n",
- "# convert celsius to Fahrenheit\n",
- "p_forecast_dataset[\"temperature_forecast\"] = p_forecast_dataset[\"temperature_forecast\"].apply(lambda x: (x * 9/5) + 32)"
+ "p_forecast_dataset = utils.convert_forecast_data(p_forecast_dataset)"
]
},
{
- "cell_type": "code",
- "execution_count": 26,
+ "cell_type": "markdown",
"metadata": {},
- "outputs": [],
"source": [
- "# wind_speed\n",
- "# multiplying with 2.23 to convert wind speed from m/sec to mph\n",
- "p_forecast_dataset[\"wind_speed_forecast\"] = p_forecast_dataset.apply(lambda x: np.sqrt(np.square(x[\"u-component_forecast\"]) + \n",
- " np.square(x[\"v-component_forecast\"]))*2.23, axis=1)\n",
- "\n",
- "p_forecast_dataset.drop(columns=[\"u-component_forecast\", \"v-component_forecast\"], inplace=True)"
+ "We clean the input data with the following operations:\n",
+ "- Exclude input data outside the time range of interest.\n",
+ "- Shift forecast data by number of hours\n",
+ "- Fill missing data with neighboring data points using pandas interpolate techniques."
]
},
{
"cell_type": "code",
- "execution_count": 27,
+ "execution_count": 31,
"metadata": {},
"outputs": [
{
@@ -1166,19 +1116,21 @@
"2022-03-18 14:00:00 45.456384 16.175 17.855009 "
]
},
- "execution_count": 27,
+ "execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "input_df = utils.clean_relevant_data(\n",
- " actual_df=p_historical_dataset.copy(),\n",
- " forecast_df= p_forecast_dataset.copy(),\n",
- " out_variables= RELEVANT_FEATURES,\n",
- " freq_hours=frequency_hour,\n",
- " num_of_indices=number_of_hours\n",
- " )\n",
+ "input_df = utils.clean_relevant_data_using_hrrr(\n",
+ " actual_df=p_historical_dataset.copy(),\n",
+ " forecast_df=p_forecast_dataset.copy(),\n",
+ " out_variables=RELEVANT_FEATURES,\n",
+ " freq_hours=frequency_hour,\n",
+ " num_of_indices=number_of_hours,\n",
+ " start_date=start_date,\n",
+ " end_date=end_date,\n",
+ ")\n",
"\n",
"base_data_df = input_df[RELEVANT_FEATURES]\n",
"base_data_df.head(2)"
@@ -1186,7 +1138,7 @@
},
{
"cell_type": "code",
- "execution_count": 28,
+ "execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
@@ -1197,7 +1149,7 @@
},
{
"cell_type": "code",
- "execution_count": 29,
+ "execution_count": 34,
"metadata": {},
"outputs": [
{
@@ -1239,7 +1191,7 @@
},
{
"cell_type": "code",
- "execution_count": 30,
+ "execution_count": 35,
"metadata": {},
"outputs": [
{
@@ -1315,45 +1267,45 @@
"2022-03-16 17:00:00 44.783197 14.325 10.509131 "
]
},
- "execution_count": 30,
+ "execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"predict_file_path = f\"./data/{STATION_NAME}/training.csv\"\n",
- "p_historical_dataset = utils.get_csv_data(path=predict_file_path)\n",
+ "p_historical_dataset = utils.get_csv_data(path=predict_file_path, interpolate=False, fill_na=False)\n",
"p_historical_dataset = p_historical_dataset[HISTORICAL_MODEL_TRAIN_FEATURES]\n",
"p_historical_dataset.head(5)\n",
"\n",
"input_df = utils.clean_relevant_data(\n",
- " p_historical_dataset.copy(), \n",
- " p_forecast_dataset.copy(), \n",
- " RELEVANT_FEATURES,\n",
- " freq_hours=frequency_hour,\n",
- " num_of_indices=number_of_hours)\n",
+ " p_historical_dataset.copy(),\n",
+ " p_forecast_dataset.copy(),\n",
+ " RELEVANT_FEATURES,\n",
+ " freq_hours=frequency_hour,\n",
+ " num_of_indices=number_of_hours,\n",
+ ")\n",
"base_data_df = input_df[RELEVANT_FEATURES]\n",
"base_data_df.head(2)"
]
},
{
"cell_type": "code",
- "execution_count": 31,
+ "execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"predict_start_datetime = datetime(year=2022, month=4, day=30, hour=13, minute=0, second=0)\n",
"predict_end_datetime = datetime(year=2022, month=5, day=21, hour=13, minute=0, second=0)\n",
"\n",
- "df_out = weather_forecast.inference_historical(base_data_df.copy(),\n",
- " start_datetime=predict_start_datetime,\n",
- " end_datetime=predict_end_datetime\n",
- " )"
+ "df_out = weather_forecast.inference_historical(\n",
+ " base_data_df.copy(), start_datetime=predict_start_datetime, end_datetime=predict_end_datetime\n",
+ ")"
]
},
{
"cell_type": "code",
- "execution_count": 32,
+ "execution_count": 38,
"metadata": {},
"outputs": [
{
@@ -1378,20 +1330,22 @@
}
],
"source": [
- "base_data_df = base_data_df[(base_data_df.index >= predict_start_datetime) & (base_data_df.index <= predict_end_datetime)]\n",
+ "base_data_df = base_data_df[\n",
+ " (base_data_df.index >= predict_start_datetime) & (base_data_df.index <= predict_end_datetime)\n",
+ "]\n",
"\n",
"for predict in OUT_FEATURES:\n",
" plt.figure(figsize=(18, 6))\n",
- " plt.plot(df_out[\"date\"].values, utils.smooth(df_out[predict].values, 2), label=\"Predict\")\n",
+ " plt.plot(df_out[\"date\"].values, utils.smooth(df_out[predict].values, 2), label=\"Prediction\")\n",
" plt.plot(base_data_df.index.values, base_data_df[predict].values, label=\"Ground Truth\")\n",
- " # plt.plot(base_data_df.index.values, base_data_df[predict+\"_forecast\"].values, label=\"Forecast\")\n",
+ "\n",
" plt.title(f\"24 Models {predict} Ground Truth Vs Predict\")\n",
" plt.legend()"
]
},
{
"cell_type": "code",
- "execution_count": 33,
+ "execution_count": 39,
"metadata": {},
"outputs": [
{
@@ -1411,31 +1365,28 @@
}
],
"source": [
- "from sklearn.metrics import mean_squared_error, mean_absolute_error\n",
- "import math\n",
- "\n",
- "def calculate_KPI(y, yhat):\n",
- " print(\"RMSE: {}\".format(round(mean_squared_error(y,yhat,squared=False),2)))\n",
- " print(\"MAE: {}\".format(round(mean_absolute_error(y,yhat),2)))\n",
- " print(\"MAE%: {}%\".format(round(100*sum(abs(y-yhat))/sum(y),2)))\n",
- "\n",
"print(\"temperature\")\n",
- "calculate_KPI(utils.smooth(df_out[\"temperature\"].values, 1),base_data_df[\"temperature\"].values)\n",
+ "utils.calculate_KPI(\n",
+ " utils.smooth(list(df_out[\"temperature\"].values), 1),\n",
+ " np.array(base_data_df[\"temperature\"].values),\n",
+ ")\n",
"\n",
"print(\"\\n\", \"wind_speed\")\n",
- "calculate_KPI(utils.smooth(df_out[\"wind_speed\"].values, 1),base_data_df[\"wind_speed\"].values)"
+ "utils.calculate_KPI(\n",
+ " utils.smooth(list(df_out[\"wind_speed\"].values), 1), np.array(base_data_df[\"wind_speed\"].values)\n",
+ ")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "### Training model using not relevant dataset or without forecast observations"
+ "### Training model using non-relevant dataset or without forecast observations"
]
},
{
"cell_type": "code",
- "execution_count": 12,
+ "execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
@@ -1445,7 +1396,7 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
@@ -1455,7 +1406,8 @@
" root_path=ROOT_PATH,\n",
" data_export_path=DATA_EXPORT_PATH,\n",
" station_name=STATION_NAME,\n",
- " relevant=False)\n",
+ " relevant=False,\n",
+ ")\n",
"\n",
"train_weather.train_model(historical_df, start=0, epochs=1)"
]
@@ -1470,15 +1422,16 @@
},
{
"cell_type": "code",
- "execution_count": 22,
+ "execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"weather_forecast = prediction.InferenceWeather(\n",
- " root_path=ROOT_PATH,\n",
- " data_export_path=DATA_EXPORT_PATH,\n",
- " station_name=STATION_NAME,\n",
- " predicts=OUT_FEATURES)"
+ " root_path=ROOT_PATH,\n",
+ " data_export_path=DATA_EXPORT_PATH,\n",
+ " station_name=STATION_NAME,\n",
+ " predicts=OUT_FEATURES,\n",
+ ")"
]
},
{
@@ -1491,7 +1444,7 @@
},
{
"cell_type": "code",
- "execution_count": 23,
+ "execution_count": 43,
"metadata": {},
"outputs": [],
"source": [
@@ -1501,7 +1454,7 @@
},
{
"cell_type": "code",
- "execution_count": 24,
+ "execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
@@ -1511,14 +1464,12 @@
"\n",
"df_output_merge = pd.DataFrame(columns=base_data_df.columns)\n",
"\n",
- "df_out = weather_forecast.inference(base_data_df,\n",
- " start_datetime=predict_start_datetime\n",
- " )"
+ "df_out = weather_forecast.inference(base_data_df, start_datetime=predict_start_datetime)"
]
},
{
"cell_type": "code",
- "execution_count": 25,
+ "execution_count": 45,
"metadata": {},
"outputs": [
{
@@ -1560,7 +1511,7 @@
},
{
"cell_type": "code",
- "execution_count": 26,
+ "execution_count": 46,
"metadata": {},
"outputs": [],
"source": [
@@ -1571,15 +1522,14 @@
"predict_start_datetime = datetime(year=2022, month=4, day=30, hour=13, minute=0, second=0)\n",
"predict_end_datetime = datetime(year=2022, month=5, day=21, hour=13, minute=0, second=0)\n",
"\n",
- "df_out = weather_forecast.inference_historical(base_data_df,\n",
- " start_datetime=predict_start_datetime,\n",
- " end_datetime=predict_end_datetime\n",
- " )"
+ "df_out = weather_forecast.inference_historical(\n",
+ " base_data_df, start_datetime=predict_start_datetime, end_datetime=predict_end_datetime\n",
+ ")"
]
},
{
"cell_type": "code",
- "execution_count": 27,
+ "execution_count": 47,
"metadata": {},
"outputs": [
{
@@ -1604,12 +1554,14 @@
}
],
"source": [
- "base_data_df = base_data_df[(base_data_df.index >= predict_start_datetime) & (base_data_df.index <= predict_end_datetime)]\n",
+ "base_data_df = base_data_df[\n",
+ " (base_data_df.index >= predict_start_datetime) & (base_data_df.index <= predict_end_datetime)\n",
+ "]\n",
"for predict in OUT_FEATURES:\n",
" plt.figure(figsize=(20, 5))\n",
" plt.plot(df_out[\"date\"].values, df_out[predict].values)\n",
" plt.plot(base_data_df.index.values, base_data_df[predict].values)\n",
- " plt.title(f\"24 Models {predict} Ground Truth Vs Predict\")\n",
+ " plt.title(f\"24 Models {predict} Ground Truth Vs Prediction\")\n",
" plt.legend([\"Predict\", \"Ground Truth\"])"
]
}
@@ -1632,7 +1584,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.8.16"
+ "version": "3.8.18"
},
"name": "Micro climate prediction",
"running_time": "",
diff --git a/notebooks/deepmc/notebook_lib/forecast.py b/notebooks/deepmc/notebook_lib/forecast.py
index 645b8e8d..993a6617 100644
--- a/notebooks/deepmc/notebook_lib/forecast.py
+++ b/notebooks/deepmc/notebook_lib/forecast.py
@@ -1,4 +1,3 @@
-import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple, cast
@@ -8,7 +7,7 @@
from shapely.geometry import Point
from vibe_core.client import FarmvibesAiClient, get_default_vibe_client
-from vibe_core.datamodel import RunConfig, RunConfigUser, RunDetails, SpatioTemporalJson
+from vibe_core.datamodel import RunConfig, RunConfigUser, SpatioTemporalJson
class Forecast:
@@ -31,7 +30,8 @@ def submit_download_request(self):
"""
Submit request to worker to download forecast data
"""
- run_list = []
+ run_metadata_list = []
+ runs = []
for parameter in self.parameters:
run_name = f"forecast_{parameter['weather_type']}"
run = self.client.run(
@@ -42,57 +42,40 @@ def submit_download_request(self):
parameters=parameter,
)
- try:
- run.block_until_complete(5)
- except RuntimeError:
- print(run)
-
- run_list.append(
+ run_metadata_list.append(
{
"id": run.id,
"weather_type": parameter["weather_type"],
}
)
+ runs.append(run)
+
+ self.client.monitor(runs, 5)
- return run_list
+ return run_metadata_list
def get_run_status(self, run_list: List[Dict[str, str]]):
clear_output(wait=True)
- all_done = True
- out_ = []
+ out = []
for run_item in run_list:
o = self.client.describe_run(run_item["id"])
print(f"Execution status for {run_item['weather_type']}: {o.details.status}")
if o.details.status == "done":
- out_.append(o)
- elif o.details.status == "failed":
- print(o.details)
+ out.append(o)
else:
- all_done = False
- cnt_complete = 0
- for key, value in o.task_details.items():
- value = cast(RunDetails, value)
- assert value.subtasks is not None, "Subtasks don't exist"
- for subtask in value.subtasks:
- if subtask.status == "done":
- cnt_complete += 1
- print(
- "\t",
- f"Subtask {key}",
- cnt_complete,
- "/",
- len(value.subtasks),
- )
- cnt_complete = 0
- return all_done, out_
+ raise Exception(
+ f"Execution status for {run_item['weather_type']}: {o.details.status}"
+ )
+
+ return out
def get_all_assets(self, details: RunConfigUser):
asset_files = []
output = details.output["weather_forecast"]
record: Dict[str, Any]
for record in cast(List[Dict[str, Any]], output):
- for _, value in record["assets"].items():
+ for value in record["assets"].values():
asset_files.append(value["href"])
df_assets = [pd.read_csv(f, index_col=False) for f in asset_files]
df_out = pd.concat(df_assets)
@@ -104,21 +87,15 @@ def get_downloaded_data(self, run_list: List[Dict[str, str]], offset_hours: int
check the download status. If status is done, fetch the downloaded data
"""
forecast_dataset = pd.DataFrame()
- status = False
- out_ = []
- while status is False:
- status, out_ = self.get_run_status(run_list)
- time.sleep(10)
-
- if status:
- for detail in out_:
- df = self.get_all_assets(detail)
+ out = self.get_run_status(run_list)
+ for detail in out:
+ df = self.get_all_assets(detail)
- # Offset from UTC to specified timezone
- df.index = df.index + pd.offsets.Hour(offset_hours)
+ # Offset from UTC to specified timezone
+ df.index = df.index + pd.offsets.Hour(offset_hours)
- if not df.empty:
- forecast_dataset = pd.concat([forecast_dataset, df], axis=1)
+ if not df.empty:
+ forecast_dataset = pd.concat([forecast_dataset, df], axis=1)
return forecast_dataset
diff --git a/notebooks/deepmc/notebook_lib/modules.py b/notebooks/deepmc/notebook_lib/modules.py
index 5fbfe012..9be52ab4 100644
--- a/notebooks/deepmc/notebook_lib/modules.py
+++ b/notebooks/deepmc/notebook_lib/modules.py
@@ -59,14 +59,14 @@ def training_step(self, train_batch: Tensor, _):
x, y = train_batch[:6], train_batch[6]
y_hat = self.deepmc(x)
loss = self.loss(y_hat, y)
- self.log("train_loss/total", loss)
+ self.log("train_loss/total", loss, on_epoch=True, prog_bar=True, logger=True, on_step=True)
return loss
def validation_step(self, validation_batch: Tensor, _):
x, y = validation_batch[:6], validation_batch[6]
y_hat = self.deepmc(x)
loss = self.loss(y_hat, y)
- self.log("val_loss/total", loss, on_epoch=True)
+ self.log("val_loss/total", loss, on_epoch=True, prog_bar=True, logger=True, on_step=True)
return loss
diff --git a/notebooks/deepmc/notebook_lib/post_models.py b/notebooks/deepmc/notebook_lib/post_models.py
deleted file mode 100644
index 224be6fd..00000000
--- a/notebooks/deepmc/notebook_lib/post_models.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from keras.layers import BatchNormalization, Dense, Input
-from keras.models import Sequential
-from keras.utils.vis_utils import plot_model
-
-
-def simple_mixture_model(inshape: int):
- model = Sequential()
- model.add(Input(shape=(inshape,)))
-
- model.add(Dense(inshape * 2, activation="relu"))
- model.add(BatchNormalization())
- model.add(Dense(inshape * 4, activation="relu"))
- model.add(BatchNormalization())
- model.add(Dense(inshape))
-
- model.compile(loss="mae", optimizer="adam")
- return model
-
-
-def fit_model(model, train_X, train_y, test_X, test_y, batch_size: int):
- batch_size = batch_size
- validation_data = (test_X, test_y)
-
- # fit network
- history = model.fit(
- train_X,
- train_y,
- epochs=20,
- batch_size=batch_size,
- validation_data=validation_data,
- verbose=1,
- )
-
- return model, history
diff --git a/notebooks/deepmc/notebook_lib/train.py b/notebooks/deepmc/notebook_lib/train.py
index 6a6f2242..4b9fa8b7 100644
--- a/notebooks/deepmc/notebook_lib/train.py
+++ b/notebooks/deepmc/notebook_lib/train.py
@@ -14,8 +14,8 @@
from torch import Tensor
from torch.utils.data import DataLoader, TensorDataset
-from . import utils
-from .preprocess import Preprocess
+from vibe_notebook.deepmc import utils
+from vibe_notebook.deepmc.preprocess import Preprocess
MODEL_SUFFIX = "deepmc."
@@ -35,7 +35,7 @@ def __init__(
wavelet: str = "bior3.5",
mode: str = "periodic",
level: int = 5,
- batch_size: int = 256,
+ batch_size: int = 24,
relevant: bool = False,
):
if relevant:
@@ -67,6 +67,7 @@ def train_model(
start: int = 0,
end: int = -1,
epochs: int = 20,
+ reset_preprocess: bool = False,
):
end = self.total_models if end == -1 else end
@@ -80,12 +81,22 @@ def train_model(
input_order_df[out_feature] = out_feature_df
# data preprocessing
- (train_scaler, output_scaler, train_df, test_df,) = utils.get_split_scaled_data(
+ (
+ train_scaler,
+ output_scaler,
+ train_df,
+ test_df,
+ ) = utils.get_split_scaled_data(
data=input_order_df, out_feature=out_feature, split_ratio=0.92
)
+ if reset_preprocess and os.path.exists(
+ self.data_export_path % (out_feature, self.relevant_text)
+ ):
+ os.remove(self.data_export_path % (out_feature, self.relevant_text))
if os.path.exists(self.data_export_path % (out_feature, self.relevant_text)):
- with open(self.data_export_path % (out_feature, self.relevant_text), "rb") as f:
+ exp_path = self.data_export_path.replace("train_data.pkl", "train_data_dates.pkl")
+ with open(exp_path % (out_feature, self.relevant_text), "rb") as f:
(
train_X,
train_y,
@@ -93,6 +104,10 @@ def train_model(
test_y,
train_scaler,
output_scaler,
+ train_dates_X,
+ train_dates_y,
+ test_dates_X,
+ test_dates_y,
) = pickle.load(f)
self.preprocess = Preprocess(
@@ -128,6 +143,10 @@ def train_model(
train_y,
test_X,
test_y,
+ train_dates_X,
+ train_dates_y,
+ test_dates_X,
+ test_dates_y,
) = self.preprocess.wavelet_transform_train(train_df, test_df, out_feature)
with open(self.data_export_path % (out_feature, self.relevant_text), "wb") as f:
@@ -136,6 +155,25 @@ def train_model(
f,
)
+ exp_path = self.data_export_path.replace("train_data.pkl", "train_data_dates.pkl")
+
+ with open(exp_path % (out_feature, self.relevant_text), "wb") as f:
+ pickle.dump(
+ [
+ train_X,
+ train_y,
+ test_X,
+ test_y,
+ train_scaler,
+ output_scaler,
+ train_dates_X,
+ train_dates_y,
+ test_dates_X,
+ test_dates_y,
+ ],
+ f,
+ )
+
self.train_models(
train_X=train_X, # type: ignore
train_y=train_y, # type: ignore
@@ -145,6 +183,8 @@ def train_model(
out_feature=out_feature,
start=start,
end=end,
+ train_dates_y=train_dates_y, # type: ignore
+ test_dates_y=test_dates_y, # type: ignore
)
def train_models(
@@ -157,6 +197,8 @@ def train_models(
out_feature: str,
start: int,
end: int,
+ train_dates_y: List[str],
+ test_dates_y: List[str],
):
first_channels = train_X[0].shape[2]
rest_channels = train_X[1].shape[2]
@@ -209,7 +251,6 @@ def train_models(
dirpath=model_path,
),
],
- num_processes=1,
)
t_obj.fit(m, train_loader, val_loader)
@@ -225,6 +266,8 @@ def train_models(
out_feature=out_feature,
model_index=i,
epochs=epochs,
+ train_dates_y=train_dates_y,
+ test_dates_y=test_dates_y,
)
def export_to_onnx(
@@ -249,19 +292,24 @@ def export_to_onnx(
)
def get_dataloader(
- self, gt: NDArray[Any], target: NDArray[Any], o_feature: str
+ self,
+ gt: NDArray[Any],
+ target: NDArray[Any],
+ o_feature: str,
+ dates_mapped: NDArray[Any],
) -> Tuple[DataLoader[Any], List[Tensor]]:
- o_x = self.preprocess.dl_preprocess_data(pd.DataFrame(gt), o_feature)[0][:, :, 0].astype(
- np.float32
- )
+ dates_mapped = pd.to_datetime(dates_mapped, format="%Y-%m-%d %H:%M:%S").values
+ df = pd.DataFrame(list(zip(gt, dates_mapped)), columns=["data", "date"])
+ df.set_index("date", inplace=True)
+ o_x = self.preprocess.dl_preprocess_data(df, o_feature)[0][:, :, 0].astype(np.float32)
- o_y = self.preprocess.dl_preprocess_data(pd.DataFrame(target), o_feature)[0][
- :, :, 0
- ].astype(np.float32)
+ df = pd.DataFrame(list(zip(target, dates_mapped)), columns=["data", "date"])
+ df.set_index("date", inplace=True)
+ o_y = self.preprocess.dl_preprocess_data(df, o_feature)[0][:, :, 0].astype(np.float32)
o_inputs = [torch.from_numpy(x.astype(np.float32)) for x in (o_x, o_y)]
o_dataset = TensorDataset(*o_inputs)
- o_loader = DataLoader(o_dataset, batch_size=self.batch_size, shuffle=True)
+ o_loader = DataLoader(o_dataset, batch_size=self.batch_size, shuffle=True, drop_last=True)
return o_loader, o_inputs
def post_model(
@@ -274,6 +322,8 @@ def post_model(
out_feature: str,
model_index: int,
epochs: int,
+ train_dates_y: List[str],
+ test_dates_y: List[str],
):
m.eval()
@@ -288,11 +338,17 @@ def xf(a: List[NDArray[Any]]) -> List[Tensor]:
os.mkdir(post_model_path)
train_dataloader, _ = self.get_dataloader(
- gt=train_y[:, model_index, 0], target=train_yhat, o_feature=out_feature # type: ignore
+ gt=train_y[:, model_index, 0], # type: ignore
+ target=train_yhat,
+ o_feature=out_feature,
+ dates_mapped=train_dates_y[:, model_index], # type: ignore
)
- val_dataloader, val_inputs = self.get_dataloader(
- gt=test_y[:, model_index, 0], target=test_yhat, o_feature=out_feature # type: ignore
+ val_dataloader, _ = self.get_dataloader(
+ gt=test_y[:, model_index, 0], # type: ignore
+ target=test_yhat,
+ o_feature=out_feature,
+ dates_mapped=test_dates_y[:, model_index], # type: ignore
)
p_m = DeepMCPostTrain(first_in_features=self.total_models)
@@ -308,9 +364,113 @@ def xf(a: List[NDArray[Any]]) -> List[Tensor]:
dirpath=post_model_path,
),
],
- num_processes=1,
)
t_obj.fit(p_m, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)
self.export_to_onnx(file_path=post_model_path, model=p_m.deepmc, inputs=torch.rand((1, 24)))
+
+ def preprocess_data(
+ self,
+ input_df: pd.DataFrame,
+ out_path: str,
+ start: int = 0,
+ end: int = -1,
+ epochs: int = 20,
+ reset_preprocess: bool = False,
+ ):
+ end = self.total_models if end == -1 else end
+
+ for out_feature in self.out_features:
+ if not os.path.exists(self.path_to_station % out_feature):
+ os.makedirs(self.path_to_station % out_feature, exist_ok=True)
+
+ input_order_df = input_df[self.train_features].copy()
+ out_feature_df = input_order_df[out_feature]
+ input_order_df.drop(columns=[out_feature], inplace=True)
+ input_order_df[out_feature] = out_feature_df
+
+ # data preprocessing
+ (
+ train_scaler,
+ output_scaler,
+ train_df,
+ test_df,
+ ) = utils.get_split_scaled_data(
+ data=input_order_df, out_feature=out_feature, split_ratio=0.92
+ )
+ if reset_preprocess and os.path.exists(
+ self.data_export_path % (out_feature, self.relevant_text)
+ ):
+ os.remove(self.data_export_path % (out_feature, self.relevant_text))
+
+ if os.path.exists(self.data_export_path % (out_feature, self.relevant_text)):
+ with open(self.data_export_path % (out_feature, self.relevant_text), "rb") as f:
+ (
+ train_X,
+ train_y,
+ test_X,
+ test_y,
+ train_scaler,
+ output_scaler,
+ ) = pickle.load(f)
+
+ self.preprocess = Preprocess(
+ train_scaler=train_scaler,
+ output_scaler=output_scaler,
+ is_training=True,
+ is_validation=self.is_validation,
+ ts_lookahead=self.ts_lookahead,
+ ts_lookback=self.ts_lookback,
+ chunk_size=self.chunk_size,
+ wavelet=self.wavelet,
+ mode=self.mode,
+ level=self.level,
+ relevant=self.relevant,
+ )
+ else:
+ self.preprocess = Preprocess(
+ train_scaler=train_scaler,
+ output_scaler=output_scaler,
+ is_training=True,
+ is_validation=self.is_validation,
+ ts_lookahead=self.ts_lookahead,
+ ts_lookback=self.ts_lookback,
+ chunk_size=self.chunk_size,
+ wavelet=self.wavelet,
+ mode=self.mode,
+ level=self.level,
+ relevant=self.relevant,
+ )
+
+ (
+ train_X,
+ train_y,
+ test_X,
+ test_y,
+ train_dates,
+ test_dates,
+ ) = self.preprocess.wavelet_transform_train(train_df, test_df, out_feature)
+
+ with open(self.data_export_path % (out_feature, self.relevant_text), "wb") as f:
+ pickle.dump(
+ [train_X, train_y, test_X, test_y, train_scaler, output_scaler],
+ f,
+ )
+
+ exp_path = self.data_export_path.replace("train_data.pkl", "train_data_dates.pkl")
+
+ with open(exp_path % (out_feature, self.relevant_text), "wb") as f:
+ pickle.dump(
+ [
+ train_X,
+ train_y,
+ test_X,
+ test_y,
+ train_scaler,
+ output_scaler,
+ train_dates,
+ test_dates,
+ ],
+ f,
+ )
diff --git a/notebooks/deepmc/notebook_lib/transformer_models_ts.py b/notebooks/deepmc/notebook_lib/transformer_models_ts.py
deleted file mode 100644
index ba55aaca..00000000
--- a/notebooks/deepmc/notebook_lib/transformer_models_ts.py
+++ /dev/null
@@ -1,367 +0,0 @@
-import numpy as np
-import tensorflow as tf
-
-
-def get_angles(pos, i, d_model):
- angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
- return pos * angle_rates
-
-
-def positional_encoding(position, d_model):
- angle_rads = get_angles(
- np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model
- )
-
- # apply sin to even indices in the array; 2i
- angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
-
- # apply cos to odd indices in the array; 2i+1
- angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
-
- pos_encoding = angle_rads[np.newaxis, ...]
-
- return tf.cast(pos_encoding, dtype=tf.float32)
-
-
-# create mask for padding, 0 --> 1 (mask)
-def create_padding_mask(seq):
- seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
-
- # add extra dimensions to add the padding
- # to the attention logits.
- return seq[:, tf.newaxis, tf.newaxis, :] # (batch_size, 1, 1, seq_len)
-
-
-def create_look_ahead_mask(size):
- mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
- return mask # (seq_len, seq_len)
-
-
-def scaled_dot_product_attention(q, k, v, mask):
- """Calculate the attention weights.
- q, k, v must have matching leading dimensions.
- k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
- The mask has different shapes depending on its type(padding or look ahead)
- but it must be broadcastable for addition.
-
- Args:
- q: query shape == (..., seq_len_q, depth)
- k: key shape == (..., seq_len_k, depth)
- v: value shape == (..., seq_len_v, depth_v)
- mask: Float tensor with shape broadcastable
- to (..., seq_len_q, seq_len_k). Defaults to None.
-
- Returns:
- output, attention_weights
- """
-
- matmul_qk = tf.matmul(q, k, transpose_b=True) # (..., seq_len_q, seq_len_k)
-
- # scale matmul_qk
- dk = tf.cast(tf.shape(k)[-1], tf.float32)
- scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
-
- # add the mask to the scaled tensor.
- if mask is not None:
- scaled_attention_logits += mask * -1e9
-
- # softmax is normalized on the last axis (seq_len_k) so that the scores
- # add up to 1.
- attention_weights = tf.nn.softmax(
- scaled_attention_logits, axis=-1
- ) # (..., seq_len_q, seq_len_k)
-
- output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v)
-
- return output, attention_weights
-
-
-def print_out(q, k, v):
- temp_out, temp_attn = scaled_dot_product_attention(q, k, v, None)
- print("Attention weights are:")
- print(temp_attn)
- print("Output is:")
- print(temp_out)
-
-
-"""
- - Q (query), K (key) and V (value) are split into multiple heads (num_heads)
- - each tuple (q, k, v) are fed to scaled_dot_product_attention
- - all attention outputs are concatenated
-"""
-
-
-class MultiHeadAttention(tf.keras.layers.Layer):
- def __init__(self, d_model, num_heads):
- super(MultiHeadAttention, self).__init__()
- self.num_heads = num_heads
- self.d_model = d_model
-
- assert d_model % self.num_heads == 0
-
- self.depth = d_model // self.num_heads
-
- self.wq = tf.keras.layers.Dense(d_model)
- self.wk = tf.keras.layers.Dense(d_model)
- self.wv = tf.keras.layers.Dense(d_model)
-
- self.dense = tf.keras.layers.Dense(d_model)
-
- def split_heads(self, x, batch_size):
- """Split the last dimension into (num_heads, depth).
- Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
- """
- x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
- return tf.transpose(x, perm=[0, 2, 1, 3])
-
- def call(self, v, k, q, mask):
- batch_size = tf.shape(q)[0]
-
- q = self.wq(q) # (batch_size, seq_len, d_model)
- k = self.wk(k) # (batch_size, seq_len, d_model)
- v = self.wv(v) # (batch_size, seq_len, d_model)
-
- q = self.split_heads(q, batch_size) # (batch_size, num_heads, seq_len_q, depth)
- k = self.split_heads(k, batch_size) # (batch_size, num_heads, seq_len_k, depth)
- v = self.split_heads(v, batch_size) # (batch_size, num_heads, seq_len_v, depth)
-
- scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
-
- scaled_attention = tf.transpose(
- scaled_attention, perm=[0, 2, 1, 3]
- ) # (batch_size, seq_len_q, num_heads, depth)
-
- concat_attention = tf.reshape(
- scaled_attention, (batch_size, -1, self.d_model)
- ) # (batch_size, seq_len_q, d_model)
-
- output = self.dense(concat_attention) # (batch_size, seq_len_q, d_model)
-
- return output, attention_weights
-
-
-def point_wise_feed_forward_network(d_model, dff):
- return tf.keras.Sequential(
- [
- tf.keras.layers.Dense(dff, activation="relu"), # (batch_size, seq_len, dff)
- tf.keras.layers.Dense(d_model), # (batch_size, seq_len, d_model)
- ]
- )
-
-
-class EncoderLayer(tf.keras.layers.Layer):
- def __init__(self, d_model, num_heads, dff, rate=0.1):
- super(EncoderLayer, self).__init__()
-
- self.mha = MultiHeadAttention(d_model, num_heads)
- self.ffn = point_wise_feed_forward_network(d_model, dff)
-
- self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
-
- self.dropout1 = tf.keras.layers.Dropout(rate)
- self.dropout2 = tf.keras.layers.Dropout(rate)
-
- def call(self, x, training, mask):
-
- attn_output, _ = self.mha(x, x, x, mask) # (batch_size, input_seq_len, d_model)
- attn_output = self.dropout1(attn_output, training=training)
- out1 = self.layernorm1(x + attn_output) # (batch_size, input_seq_len, d_model)
-
- ffn_output = self.ffn(out1) # (batch_size, input_seq_len, d_model)
- ffn_output = self.dropout2(ffn_output, training=training)
- out2 = self.layernorm2(out1 + ffn_output) # (batch_size, input_seq_len, d_model)
-
- return out2
-
-
-class DecoderLayer(tf.keras.layers.Layer):
- def __init__(self, d_model, num_heads, dff, rate=0.1):
- super(DecoderLayer, self).__init__()
-
- self.mha1 = MultiHeadAttention(d_model, num_heads)
- self.mha2 = MultiHeadAttention(d_model, num_heads)
-
- self.ffn = point_wise_feed_forward_network(d_model, dff)
-
- self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
- self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
-
- self.dropout1 = tf.keras.layers.Dropout(rate)
- self.dropout2 = tf.keras.layers.Dropout(rate)
- self.dropout3 = tf.keras.layers.Dropout(rate)
-
- def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
- # enc_output.shape == (batch_size, input_seq_len, d_model)
-
- attn1, attn_weights_block1 = self.mha1(
- x, x, x, look_ahead_mask
- ) # (batch_size, target_seq_len, d_model)
- attn1 = self.dropout1(attn1, training=training)
- out1 = self.layernorm1(attn1 + x)
-
- attn2, attn_weights_block2 = self.mha2(
- enc_output, enc_output, out1, padding_mask
- ) # (batch_size, target_seq_len, d_model)
- attn2 = self.dropout2(attn2, training=training)
- out2 = self.layernorm2(attn2 + out1) # (batch_size, target_seq_len, d_model)
-
- ffn_output = self.ffn(out2) # (batch_size, target_seq_len, d_model)
- ffn_output = self.dropout3(ffn_output, training=training)
- out3 = self.layernorm3(ffn_output + out2) # (batch_size, target_seq_len, d_model)
-
- return out3, attn_weights_block1, attn_weights_block2
-
-
-class Encoder(tf.keras.layers.Layer):
- def __init__(self, num_layers, d_model, num_heads, dff, maximum_position_encoding, rate=0.1):
- super(Encoder, self).__init__()
-
- self.d_model = d_model
- self.num_layers = num_layers
-
- self.embedding = tf.keras.layers.Dense(d_model, activation="relu")
- self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)
-
- self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
-
- self.dropout = tf.keras.layers.Dropout(rate)
-
- def call(self, x, training, mask):
-
- seq_len = tf.shape(x)[1]
-
- # print("Encoder:", x.shape)
- # adding embedding and position encoding.
- x = self.embedding(x) # (batch_size, input_seq_len, d_model)
- x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
- x += self.pos_encoding[:, :seq_len, :]
-
- x = self.dropout(x, training=training)
-
- for i in range(self.num_layers):
- x = self.enc_layers[i](x, training, mask)
-
- return x # (batch_size, input_seq_len, d_model)
-
-
-class Decoder(tf.keras.layers.Layer):
- def __init__(self, num_layers, d_model, num_heads, dff, maximum_position_encoding, rate=0.1):
- super(Decoder, self).__init__()
-
- self.d_model = d_model
- self.num_layers = num_layers
-
- self.embedding = tf.keras.layers.Dense(d_model, activation="relu")
- self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
-
- self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
- self.dropout = tf.keras.layers.Dropout(rate)
-
- def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
-
- seq_len = tf.shape(x)[1]
- attention_weights = {}
-
- x = self.embedding(x) # (batch_size, target_seq_len, d_model)
- x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
- x += self.pos_encoding[:, :seq_len, :]
-
- x = self.dropout(x, training=training)
-
- for i in range(self.num_layers):
- x, block1, block2 = self.dec_layers[i](
- x, enc_output, training, look_ahead_mask, padding_mask
- )
- attention_weights["decoder_layer{}_block1".format(i + 1)] = block1
- attention_weights["decoder_layer{}_block2".format(i + 1)] = block2
-
- return x, attention_weights
-
-
-class Transformer(tf.keras.Model):
- def __init__(
- self, num_layers, d_model, num_heads, dff, target_vocab_size, pe_input, pe_target, rate=0.1
- ):
- super(Transformer, self).__init__()
-
- self.encoder = Encoder(num_layers, d_model, num_heads, dff, pe_input, rate)
-
- self.decoder = Decoder(num_layers, d_model, num_heads, dff, pe_target, rate)
-
- self.final_layer = tf.keras.layers.Dense(target_vocab_size)
-
- def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
-
- enc_output = self.encoder(
- inp, training, enc_padding_mask
- ) # (batch_size, inp_seq_len, d_model)
-
- # dec_output.shape == (batch_size, tar_seq_len, d_model)
- dec_output, attention_weights = self.decoder(
- tar, enc_output, training, look_ahead_mask, dec_padding_mask
- )
-
- final_output = self.final_layer(dec_output) # (batch_size, tar_seq_len, target_vocab_size)
-
- return final_output, attention_weights
-
-
-class GLU(tf.keras.layers.Layer):
- def __init__(self, input_channel, output_channel):
- super(GLU, self).__init__()
- self.linear_left = tf.keras.layers.Dense(output_channel)
- self.linear_right = tf.keras.layers.Dense(output_channel)
-
- def call(self, x):
- return tf.math.multiply(
- self.linear_left(x), tf.keras.activations.sigmoid(self.linear_right(x))
- )
-
-
-class FFT(tf.keras.layers.Layer):
- def __init__(self, time_step, order, output_channel):
- super(FFT, self).__init__()
- self.time_step = time_step
- self.order = order
- self.output_channel = output_channel
- self.GLUs = [] # nn.ModuleList()
- for i in range(3):
- if i == 0:
- self.GLUs.append(
- GLU(self.time_step * self.order, self.time_step * self.output_channel)
- )
- self.GLUs.append(
- GLU(self.time_step * self.order, self.time_step * self.output_channel)
- )
- elif i == 1:
- self.GLUs.append(
- GLU(self.time_step * self.output_channel, self.time_step * self.output_channel)
- )
- self.GLUs.append(
- GLU(self.time_step * self.output_channel, self.time_step * self.output_channel)
- )
- else:
- self.GLUs.append(
- GLU(self.time_step * self.output_channel, self.time_step * self.output_channel)
- )
- self.GLUs.append(
- GLU(self.time_step * self.output_channel, self.time_step * self.output_channel)
- )
-
- def call(self, x):
- # x should be (b, seq_len, units)
- x = tf.keras.layers.Permute((2, 1))(x)
- ffted = tf.signal.fft(tf.cast(x, dtype=tf.complex64)) # (b, units, seq_len)
- real = tf.math.real(ffted) # [b, units, seq_len]
- img = tf.math.imag(ffted)
- for i in range(3):
- real = self.GLUs[i * 2](real)
- img = self.GLUs[2 * i + 1](img)
-
- time_step_as_inner = tf.dtypes.complex(real, img)
- iffted = tf.signal.ifft(time_step_as_inner) # [b, k, node_cnt, 48]
- iffted = tf.cast(iffted, dtype=tf.float32)
- iffted = tf.keras.layers.Permute((2, 1))(iffted)
- return iffted
diff --git a/notebooks/deepmc/notebook_lib/utils.py b/notebooks/deepmc/notebook_lib/utils.py
deleted file mode 100644
index eacec1aa..00000000
--- a/notebooks/deepmc/notebook_lib/utils.py
+++ /dev/null
@@ -1,104 +0,0 @@
-from datetime import datetime, timedelta
-from typing import Any, Dict, List
-
-import numpy as np
-import pandas as pd
-from numpy._typing import NDArray
-from pandas.tseries.offsets import DateOffset
-from sklearn.preprocessing import StandardScaler
-
-
-def get_csv_data(
- path: str,
- date_attribute: str = "date",
- columns_rename: Dict[str, str] = {},
- frequency: str = "60min",
-):
- """
- Read data from CSV file using Pandas python package.
- """
-
- data_df = pd.read_csv(path)
- data_df[date_attribute] = pd.to_datetime(data_df[date_attribute])
-
- if columns_rename:
- data_df.rename(columns=columns_rename, inplace=True)
-
- # apply index on date
- data_df.reset_index(drop=True, inplace=True)
- data_df.set_index(date_attribute, inplace=True)
- data_df.sort_index(ascending=True, inplace=True)
-
- # interpolate to derive missing data
- data_df = data_df.interpolate(method="from_derivatives")
- assert data_df is not None, "Interpolate deleted all data"
- data_df = data_df.dropna()
-
- # Group rows by frequency, requires date attribute indexed to execute this
- data_df = data_df.fillna(method="ffill")
- data_df = data_df.fillna(method="bfill")
- data_df = data_df.groupby(pd.Grouper(freq=frequency)).mean()
- data_df = data_df.fillna(method="ffill")
- data_df = data_df.fillna(method="bfill")
-
- return data_df
-
-
-def hour_round(t: datetime):
- # Rounds to nearest hour by adding a timedelta hour if minute >= 30
- return t.replace(second=0, microsecond=0, minute=0, hour=t.hour) + timedelta(
- hours=t.minute // 30
- )
-
-
-def get_split_scaled_data(data: pd.DataFrame, out_feature: str, split_ratio: float = 0.92):
- split = int(split_ratio * data.shape[0])
-
- train_data = data.iloc[:split]
- test_data = data.iloc[split:]
-
- output_scaler = StandardScaler()
- output_scaler.fit_transform(np.expand_dims(data[out_feature].values, axis=1)) # type: ignore
-
- train_scaler = StandardScaler()
- train_scale_df = pd.DataFrame(
- train_scaler.fit_transform(train_data), columns=train_data.columns, index=train_data.index
- )
- test_scale_df = pd.DataFrame(
- train_scaler.transform(test_data), columns=test_data.columns, index=test_data.index
- )
-
- return train_scaler, output_scaler, train_scale_df, test_scale_df
-
-
-def shift_index(ds_df: pd.DataFrame, freq_minutes: int, num_indices: int, dateColumn: str = "date"):
- ds_df[dateColumn] = ds_df.index.shift(-num_indices, freq=DateOffset(minutes=freq_minutes))
- ds_df = ds_df.reset_index(drop=True)
- ds_df = ds_df.set_index(dateColumn)
- return ds_df
-
-
-def clean_relevant_data(
- actual_df: pd.DataFrame,
- forecast_df: pd.DataFrame,
- out_variables: List[str],
- freq_hours: int,
- num_of_indices: int,
-):
- base_data_df = actual_df.copy()
- current_ws_df = forecast_df.add_suffix("Current")
- base_data_df = base_data_df.join(current_ws_df)
- shift_forecast_df = shift_index(forecast_df, freq_hours * 60, num_of_indices)
- base_data_df = base_data_df.join(shift_forecast_df)
-
- base_data_df = base_data_df[out_variables]
- base_data_df = base_data_df.interpolate(method="from_derivatives")
- assert base_data_df is not None, "Interpolate deleted all data"
- base_data_df = base_data_df.dropna()
- return base_data_df
-
-
-def smooth(y: NDArray[Any], box_pts: int):
- box = np.ones(box_pts) / box_pts
- y_smooth = np.convolve(y, box, mode="same")
- return y_smooth
diff --git a/notebooks/deepmc_neighbors/deepmc_neighbors_env.yaml b/notebooks/deepmc_neighbors/deepmc_neighbors_env.yaml
new file mode 100644
index 00000000..834e34c9
--- /dev/null
+++ b/notebooks/deepmc_neighbors/deepmc_neighbors_env.yaml
@@ -0,0 +1,19 @@
+name: deepmc-pytorch-neighbors
+channels:
+ - pyg
+ - conda-forge
+ - defaults
+dependencies:
+ - python=3.9.*
+ - pip~=21.2.4
+ - pip:
+ - geopandas~=0.9.0
+ - einops~=0.6.0
+ - geopy~=2.4.1
+ - ipykernel~=6.17.1
+ - unfoldNd~=0.2.0
+ - pyWavelets~=1.3.0
+ - pydantic~=1.10.12
+ - matplotlib~=3.9.0
+ - ../../src/vibe_core
+ - ../../src/vibe_notebook
\ No newline at end of file
diff --git a/notebooks/deepmc_neighbors/gnn_forecast.ipynb b/notebooks/deepmc_neighbors/gnn_forecast.ipynb
new file mode 100644
index 00000000..75834edc
--- /dev/null
+++ b/notebooks/deepmc_neighbors/gnn_forecast.ipynb
@@ -0,0 +1,643 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Micro Climate Predictions with Nearby Weather Stations\n",
+ "\n",
+ "It helps infer weather forecasts for stations that have no data or limited data by utilizing data of neighboring stations. The notebook demonstrates configuring inputs and training a model using neighboring weather stations data.\n",
+ "\n",
+ "This is an extension of the deepmc notebook [notebooks/deepmc/mc_forecast.ipynb](https://github.com/microsoft/farmvibes-ai/blob/main/notebooks/deepmc/mc_forecast.ipynb).\n",
+ "\n",
+ "Before running this notebook, let's build a micromamba environment. If you do not have micromamba installed, please follow the instructions from the [micromamba installation guide](https://mamba.readthedocs.io/en/latest/installation/micromamba-installation.html).\n",
+ "\n",
+ "```bash\n",
+ "$ micromamba env create -f ./deepmc_neighbors_env.yaml\n",
+ "$ micromamba activate deepmc-pytorch-neighbors\n",
+ "```"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**Install Packages**"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "! pip install torch==1.12.1 --index-url https://download.pytorch.org/whl/cpu\n",
+ "! pip install torch-scatter==2.1.0 torch-sparse==0.6.15 torch-geometric==2.3.0 -f https://data.pyg.org/whl/torch-1.12.1%2Bcpu.html\n",
+ "! pip install torch-geometric-temporal~=0.54.0 onnxruntime~=1.15.0 pytorch-lightning~=1.8.0"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Notebook overview\n",
+ "In this notebook, we describe steps to generate forecast for weather variables at a specific station with no or limited data. We employ [Graphical Neural Network (GNNs)](https://pytorch-geometric.readthedocs.io/) for cross-learning from nearby weather stations by capturing spatial relationships. \n",
+ "\n",
+ "To illustrate this approach, we focus on three locations in Washington state, U.S.A., utilizing data accessible through [AGWeatherNet](https://weather.wsu.edu/). An example is shown the figure below. For instance, assuming that the Warden SW station has missing data, we look to neighboring stations (such as Royal Slope and Ringold) that provide relevant data. We consider the weather variables, temperature, humidity and wind_speed.\n",
+ "\n",
+ "\n",
+ "\n",
+ "Selecting appropriate neighboring stations is crucial for accurate predictions. When choosing neighboring weather stations, consider the following factors:\n",
+ "\n",
+ "- Elevation Similarity: In the current model the neighboring stations should be at a similar elevation to the target station. This ensures that altitude-related effects are consistent. Although, one can build a edge weight model which includes altitude differential to accommodate for the topography (this notebook does not cover that). \n",
+ "\n",
+ "- Spatial Proximity: The distance between neighboring stations should be small. Proximity often implies similar local weather patterns. For example, in the example, we chose stations with distance less than 25 km between them. In our experiments we noticed significant errors with distances greater than 25 Kms.\n",
+ "\n",
+ "**Graph Representation of Weather Stations for GNNs**\n",
+ "\n",
+ "Each weather station corresponds to a node in our graph. To capture the relationships between stations, we connect stations based on the distance between them. This graph does not change with time during inference. If a new station is available which can be helpful to increase accuracy, then the model can be dynamically updated by recomputing & retraining the GNN.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Following steps are required for training a model and inference.\n",
+ "\n",
+ "**Step 1: Download AgWeatherNet data**\n",
+ "- Download historical weather data for the stations Royal Slope and Ringold from [AGWeatherNet]( https://weather.wsu.edu/) for the time range of interest (minimum 2 years of data).\n",
+ "- Clean downloaded historical data for considered variables temperature, humidity and wind_speed. \n",
+ "\n",
+ "Note: these two steps are not included in the notebook. See [sample data](sample_data.csv) for an example. \n",
+ "\n",
+ "**Step 2: Download forecast data**\n",
+ "- Download HRRR data for the stations Warden SW, Royal Slope and Ringold using herbie_forecast workflow in Farmvibes for the time range of interest (minimum 2 years of data).\n",
+ "- Clean downloaded HRRR data for considered variables temperature, humidity and wind_speed.\n",
+ "\n",
+ "**Step 3: Train DeepMC models**\n",
+ "- For stations Royal Slope and Ringold, train the DeepMC model using the notebook [notebooks/deepmc/mc_forecast.ipynb]( https://github.com/microsoft/farmvibes-ai/blob/main/notebooks/deepmc/mc_forecast.ipynb). You will need to train separately for each station.\n",
+ "- The results received from DeepMC inference results are weather forecasts for next 24 hours for the stations Royal Slope and Ringold.\n",
+ "\n",
+ "**Step 4: Preparation for GNN model training**\n",
+ "- Create embeddings: Concatenate cleaned HRRR weather forecast data of station Warden SW and DeepMC inference results of station Royal Slope & Ringold.\n",
+ "- Create train and test splits from the embeddings.\n",
+ "- Train GNN model.\n",
+ "\n",
+ "**Step 5: Inference**\n",
+ "\n",
+ "Run the inference to infer weather forecasts for the Warden SW station.\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Notebook Setup\n",
+ "\n",
+ "Let's start by importing the required packages and defining some constants.\n",
+ "\n",
+ "### Imports"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import os\n",
+ "import warnings\n",
+ "from datetime import datetime\n",
+ "\n",
+ "from notebook_lib.post_deepmc_inference import download_forecast_data\n",
+ "from notebook_lib.train import MC_Neighbors\n",
+ "\n",
+ "warnings.filterwarnings(\"ignore\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Constants\n",
+ "- ROOT_DIR - Root directory of DeepMC output.\n",
+ "- WEATHER_TYPE - temperature, humidity, or wind_speed.\n",
+ "- INFERENCE_STATION - Station having missing weather data.\n",
+ "- MODEL_TYPE - relevant or not-relevant"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ROOT_DIR = \"\"\n",
+ "WEATHER_TYPE = \"temperature\"\n",
+ "INFERENCE_STATION = \"Warden_SW\"\n",
+ "MODEL_TYPE = \"relevant\"\n",
+ "ROOT_PATH = os.path.join(ROOT_DIR, WEATHER_TYPE)\n",
+ "\n",
+ "# Forecast data\n",
+ "infer_forecast_data_path = f\"{ROOT_PATH}/{INFERENCE_STATION}/{MODEL_TYPE}/forecast.csv\""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Step 1: Download stations data. \n",
+ "Here, we are taking the stations from [AGWeatherNet](https://weather.wsu.edu/). \n",
+ "\n",
+ "We are assuming that the station Warden_SW does not have the weather station data. We consider the stations Royal Slope and Ringold as neighboring weather stations having similar weather patterns, hence historical data download is required for these two stations. See [sample data](sample_data.csv) for an example."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Neighboring stations\n",
+ "# Coordinates are in (longitude, latitude)\n",
+ "neighbor_stations = [\n",
+ " {\n",
+ " \"name\": \"Warden_SW\",\n",
+ " \"column_name\": \"temperature_forecast\",\n",
+ " \"coordinates\": (-119.12, 46.93),\n",
+ " },\n",
+ " {\n",
+ " \"name\": \"royal_slope\",\n",
+ " \"column_name\": \"temperature\",\n",
+ " \"coordinates\": (-119.32, 46.95),\n",
+ " },\n",
+ " {\n",
+ " \"name\": \"ringold\",\n",
+ " \"column_name\": \"temperature\",\n",
+ " \"coordinates\": (-119.18, 46.48),\n",
+ " },\n",
+ "]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Step 2: Download Forecast data\n",
+ "\n",
+ "For weather station Warden SW, download weather forecast observations by submitting request to worker running in background. Workflow execution utilize below parameters while processing requests, this can be overwritten using the parameter argument.\n",
+ "\n",
+ "- fxx: [1, 25, 1] # start, stop, step\n",
+ "- search_text: \"TMP:2 m\"\n",
+ "- interval: 60 # in minutes\n",
+ "- weather_type: \"temperature\"\n",
+ "- multi_threads: 25"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "start_date = datetime(year=2021, month=7, day=30)\n",
+ "end_date = datetime(year=2023, month=8, day=2)\n",
+ "forecast_data = download_forecast_data([neighbor_stations[0]], start_date, end_date)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "forecast_data[\"Warden_SW\"].to_csv(infer_forecast_data_path)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Step 3: Train DeepMC models\n",
+ "\n",
+ "Complete the DeepMC model training using the notebook [notebooks/deepmc/mc_forecast.ipynb](https://github.com/microsoft/farmvibes-ai/blob/main/notebooks/deepmc/mc_forecast.ipynb) for weather stations Royal Slope and Ringold.\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Step 4: Train [Graphical Neural Network (GNN)](https://pytorch-geometric.readthedocs.io/) model\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Step 4.1 Create embeddings\n",
+ "\n",
+ "The get_embeddings module does the following: \n",
+ "1. Run inference using DeepMC trained model to find weather forecasts of temperature for station Royal Slope and Ringold weather stations.\n",
+ "2. Pre-process inference results to create a lookback by transforming it to a 2D matrix.\n",
+ "3. Pre-process HRRR weather forecast to create a lookback by transforming it to a 2D matrix.\n",
+ "4. Embeddings created by concatenating pre-process results. The embeddings are sorted by timestamp and station name."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "obj_neighbors = MC_Neighbors(root_dir=ROOT_PATH, learning_rate=0.0025, use_edge_weights=False)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "train_embeddings, test_embeddings = obj_neighbors.get_embeddings(\n",
+ " INFERENCE_STATION,\n",
+ " neighbor_stations,\n",
+ " 24,\n",
+ " infer_forecast_data_path,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Step 4.2 Model training\n",
+ "\n",
+ "The GNN training script does the following:\n",
+ "\n",
+ "1. Creates Dataset that reads the input embeddings, creates a node for each timestamp, and creates edges connecting weather stations.\n",
+ "2. Creates BatchSampler to split data into batches for training and testing dataset.\n",
+ "3. Using PyTorch lightning package, the model training is initiated."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "obj_neighbors.run_train(\n",
+ " train_embeddings=train_embeddings,\n",
+ " test_embeddings=test_embeddings,\n",
+ " neighbor_stations=neighbor_stations,\n",
+ " infer_station=INFERENCE_STATION,\n",
+ " epochs=20,\n",
+ " batch_size=24 * len(neighbor_stations),\n",
+ " forecast_hours=24,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Run Inference to validate the trained model"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "pred_df = obj_neighbors.run_inference(\n",
+ " embeddings=test_embeddings.copy(),\n",
+ " neighbors_station=neighbor_stations,\n",
+ " infer_station=INFERENCE_STATION,\n",
+ " batch_size=len(neighbor_stations),\n",
+ " forecast_hours=24,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "- historical_data_path: it's a path to historical weather data downloaded and cleaned in Step 1.\n",
+ "- hrrr_data_path: it's a path to hrr weather data downloaded and cleaned in Step 2."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "historical_data_path = \"\"\n",
+ "hrrr_data_path = \"\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/png": "",
+ "text/plain": [
+ "