Skip to content

Commit

Permalink
Merge pull request #543 from mlrun/1.6.x-dev
Browse files Browse the repository at this point in the history
1.6.x dev
  • Loading branch information
aviaIguazio authored Nov 28, 2023
2 parents 9689ff9 + 51d6b3a commit ab5cd4b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 21 deletions.
9 changes: 6 additions & 3 deletions stocks-prediction/01_ingest_news.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@
"metadata": {},
"outputs": [],
"source": [
"NUMBER_OF_STOCKS = 3"
"# saving number of stocks as project parameter\n",
"project.params['NUMBER_OF_STOCKS'] = 10\n",
"project.save()\n",
"number_of_stocks = project.params['NUMBER_OF_STOCKS']"
]
},
{
Expand Down Expand Up @@ -986,7 +989,7 @@
"fstore.ingest(news_set,\n",
" pd.DataFrame.from_dict({'ticker':['AMZN'],\n",
" 'Datetime': now,\n",
" 'n_stocks':NUMBER_OF_STOCKS}),\n",
" 'n_stocks':number_of_stocks}),\n",
" overwrite=True)"
]
},
Expand Down Expand Up @@ -1074,7 +1077,7 @@
"\n",
"t = requests.post(news_set_endpoint,json={'ticker':['news'],\n",
" 'Datetime': now,\n",
" 'n_stocks':NUMBER_OF_STOCKS})\n",
" 'n_stocks':number_of_stocks})\n",
"t.text"
]
},
Expand Down
25 changes: 17 additions & 8 deletions stocks-prediction/02_ingest_stocks.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
"project = mlrun.get_or_create_project(name='stocks',user_project=True, context=\"./\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"#loading the number of stocks as project parameter\n",
"\n",
"number_of_stocks = project.params.get('NUMBER_OF_STOCKS',10)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"id": "4b54f299",
Expand Down Expand Up @@ -170,15 +183,11 @@
" timestamp_key='Datetime', \n",
" description=\"stocks feature set\")\n",
"\n",
"# how many tickers data we ingest (make sure same number used for ingesting news)\n",
"n_tickers = 10\n",
"\n",
"\n",
"info_set.graph\\\n",
" .to(name='get_stocks',handler='get_stocks')\\\n",
" .to(\"storey.steps.Flatten\", name=\"flatten_stocks\")\\\n",
" .to(name='gen_event_key',handler='gen_event_key',full_event=True)\\\n",
" .to(OneHotEncoder(mapping={'ticker2onehot':{ticker:str(idx) for idx,ticker in enumerate(si.tickers_sp500()[:n_tickers])}}))\\\n",
" .to(OneHotEncoder(mapping={'ticker2onehot':{ticker:str(idx) for idx,ticker in enumerate(si.tickers_sp500()[:number_of_stocks])}}))\\\n",
" \n",
"# Setting default targets (nosql & parquet)\n",
"info_set.set_targets([ParquetTarget(flush_after_seconds=5)], with_defaults=False) \n",
Expand Down Expand Up @@ -567,7 +576,7 @@
" 'start_delta':59,\n",
" 'end_delta':31,\n",
" 'interval':'5m',\n",
" 'n_stocks':n_tickers}),\n",
" 'n_stocks':number_of_stocks}),\n",
" overwrite=True)"
]
},
Expand Down Expand Up @@ -654,7 +663,7 @@
" 'start_delta':29,\n",
" 'end_delta':0,\n",
" 'interval':'5m',\n",
" 'n_stocks':n_tickers})\n",
" 'n_stocks':number_of_stocks})\n",
"t.text"
]
},
Expand Down Expand Up @@ -691,7 +700,7 @@
" 'start_delta':1,\n",
" 'end_delta':0,\n",
" 'interval':'5m',\n",
" 'n_stocks':n_tickers}\n",
" 'n_stocks':number_of_stocks}\n",
"\n",
"# specifying '0 8 * * *' as schedule will trigger the function every day at 08:00 AM\n",
"fn = mlrun.code_to_function(name='ingestion_service_stocks',kind='job',image='mlrun/mlrun',handler='ingestion_service_invoker', filename='src/invoker.py')\n",
Expand Down
2 changes: 1 addition & 1 deletion stocks-prediction/03_train_model.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
1 change: 1 addition & 0 deletions stocks-prediction/04_model_serving.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@
}
],
"source": [
"fn.spec.readiness_timeout = 60 * 20 # 20 minutes\n",
"address = fn.deploy()"
]
},
Expand Down
2 changes: 1 addition & 1 deletion stocks-prediction/06_grafana_view.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
18 changes: 10 additions & 8 deletions stocks-prediction/src/serving_stocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@
import os
import time

warnings.filterwarnings("ignore")
def log_results(df: pd.DataFrame):
# writing to tsdb
if "V3IO_USERNAME" in os.environ:
framesd = os.getenv("V3IO_FRAMESD",'framesd:8081')
client = v3f.Client(framesd, container=os.getenv('V3IO_CONTAINER', 'projects'))
kv_table_path = '/stocks-'+ os.environ['V3IO_USERNAME'] + '/artifacts/stocks_prediction'
client.write('kv', kv_table_path, dfs=df, index_cols=['datetime','tickers'])
print(f'writing prediction to kv at projects{kv_table_path} ...')


def preprocess(event):
vector_name = event['vector_name']
Expand Down Expand Up @@ -85,14 +93,8 @@ def postprocess(event):
df['true'] = event['outputs']['labels']
df['prediction'] = (df['prediction']*event['outputs']['price_std']) + event['outputs']['price_mean']
df['true'] = (df['true']*event['outputs']['price_std']) + event['outputs']['price_mean']
df2 = df.copy()
df['datetime'] = df['datetime'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
# writing to tsdb
framesd = os.getenv("V3IO_FRAMESD",'framesd:8081')
client = v3f.Client(framesd, container=os.getenv('V3IO_CONTAINER', 'projects'))
kv_table_path = '/stocks-'+ os.environ['V3IO_USERNAME'] + '/artifacts/stocks_prediction'
client.write('kv', kv_table_path, dfs=df, index_cols=['datetime','tickers'])
print(f'writing prediction to kv at projects{kv_table_path} ...')
log_results(df)
return [df.values.tolist(),list(df.columns)]

class StocksModel(PyTorchModelServer):
Expand Down

0 comments on commit ab5cd4b

Please sign in to comment.