-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbitcoin-sentiment-training-pipeline-2.py
138 lines (116 loc) · 6.25 KB
/
bitcoin-sentiment-training-pipeline-2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import modal
LOCAL=True
if LOCAL == False:
stub = modal.Stub()
image = modal.Image.debian_slim().apt_install(["libgomp1"]).pip_install(["hopsworks", "seaborn", "joblib", "scikit-learn","xgboost"])
@stub.function(image=image, schedule=modal.Period(days=1), secret=modal.Secret.from_name("hopsworks-api-key"))
def f():
g()
def g():
import hopsworks
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
import seaborn as sns
from matplotlib import pyplot
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
import joblib
project = hopsworks.login(api_key_value='U6PiDFwDVDQHP26X.XhXDZQ9QKiNwafhLh11PUntcyYW5Zp8aoXhoj1IJTGHDBu8owQJUKbFClHaehyMU')
# get feature store from hopsworks
fs = project.get_feature_store()
# select or create feature view from feature group of titanic dataset in feature store
try:
feature_view = fs.get_feature_view(name="twitter_bitcoin_sentiment", version=1)
except:
twitter_fg = fs.get_feature_group(name="twitter_bitcoin_sentiment", version=1)
query = twitter_fg.select_all()
feature_view = fs.create_feature_view(name="twitter_bitcoin_sentiment",
version=1,
description="Read from Twitter bitcoin sentiment dataset",
labels=["bitcoin_fluctuation"],
query=query)
# Read training data, randomly split into train/test sets of features (X) and labels (y) with 80/20 split
X_train, X_test, y_train, y_test = feature_view.train_test_split(0.2)
# transform labels into category and transform with label encoder from scikit learn
y_train.bitcoin_fluctuation = y_train.bitcoin_fluctuation.astype('category')
y_test.bitcoin_fluctuation = y_test.bitcoin_fluctuation.astype('category')
# label encoder and transform
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
le.fit(['Bearish', 'Bullish','Neutral'])
# print(list(le.classes_))
y_train_le = le.transform(y_train.values.ravel())
# Train model with the sklearn API of xgBoost algorithm using our features (X_train) and our encoded labels (y_train_le)
# # First: Hyperparameter tuning using grid search cross validation
# from sklearn.model_selection import GridSearchCV
# # set ranges for hyperparamers 'learning_rate' (eta), 'max_depth', 'min_child_weight', 'n_estimators'
# grid_param = {
# 'learning_rate': [0.01,0.02,0.03,0.04,0.05,0.1,0.2],
# 'max_depth': [3,4,5,6,7,8,9,10],
# 'min_child_weight': [0.5,1,2],
# 'n_estimators': [200,400,800,1000,1500]
# }
# # initialize grid search cv object
# gd_sr = GridSearchCV(estimator=xgb.XGBClassifier(tree_method="hist", objective="multi:softmax"),
# param_grid=grid_param,
# scoring='f1_weighted',
# cv=5,
# n_jobs=-1)
# # run grid seatch on training data
# gd_sr.fit(X_train,y_train_le)
# # check best parameters and resultung accuracy
# best_parameters = gd_sr.best_params_
# print(best_parameters)
# best_result = gd_sr.best_score_
# print(best_result)
# # resulting best parameters are: # {'learning_rate': 0.03, 'max_depth': 5, 'min_child_weight': 0.1, 'n_estimators': 1500} with cv f1: 0.4712777145272029
# Now train model with found hyperparameters
model = xgb.XGBClassifier(tree_method="hist", learning_rate=0.03, n_estimators=1500, max_depth=5, min_child_weight=0.1, objective="multi:softmax")
model.fit(X_train, y_train_le)
# Evaluate model performance using the features from the test set (X_test)
y_pred = model.predict(X_test)
y_pred_train = model.predict(X_train)
# Compare predictions (y_pred) with the labels in the test set (y_test)
metrics = classification_report(y_test, le.inverse_transform(y_pred), output_dict=True)
metrics_train = classification_report(y_train, le.inverse_transform(y_pred_train), output_dict=True)
print(f"Accuracy Test: {metrics['accuracy']}")
print(f"Accuracy Train: {metrics_train['accuracy']}")
# Create the confusion matrix as a figure
results = confusion_matrix(y_test, le.inverse_transform(y_pred),labels=['Bearish', 'Bullish', 'Neutral'])
df_cm = pd.DataFrame(results, ['True Bearish', 'True Bullish', 'True Neutral'],
['Pred Bearish', 'Pred Bullish', 'Pred Neutral'])
cm = sns.heatmap(df_cm, annot=True, fmt='g')
fig = cm.get_figure()
# Upload model to the Hopsworks Model Registry
# First get an object for the model registry.
mr = project.get_model_registry()
# The contents of the 'titanic_model' directory will be saved to the model registry. Create the dir, first.
model_dir="twitter_bitcoin_sentiment_model_2"
if os.path.isdir(model_dir) == False:
os.mkdir(model_dir)
# Save both our model and the confusion matrix to 'model_dir', whose contents will be uploaded to the model registry
joblib.dump(model, model_dir + "/twitter_bitcoin_sentiment_model_2.pkl")
fig.savefig(model_dir + "/confusion_matrix_2.png")
# Specify the schema of the model's input/output using the features (X_train) and transformed labels (y_train_le)
input_schema = Schema(X_train)
output_schema = Schema(y_train_le)
model_schema = ModelSchema(input_schema, output_schema)
# Create an entry in the model registry that includes the model's name, desc, metrics
twitter_bitcoin_sentiment_model = mr.python.create_model(
name="twitter_bitcoin_sentiment_2",
metrics={"accuracy" : metrics['accuracy'], "f1": metrics['weighted avg']['f1-score']},
model_schema=model_schema,
description="Twitter Bitcoin Sentiment Predictor based on max weighted f-1 score cv hyperparametertuning"
)
# Upload the model to the model registry, including all files in 'model_dir'
twitter_bitcoin_sentiment_model.save(model_dir)
if __name__ == "__main__":
if LOCAL == True :
g()
else:
with stub.run():
f()