-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbitcoin-sentiment-feature-pipeline-daily.py
228 lines (204 loc) · 12.1 KB
/
bitcoin-sentiment-feature-pipeline-daily.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
from unicodedata import decimal
import modal
from twitter_inference import tweets_preprocess_daily, scrape_tweets_daily, tweets_preprocess_backfill
BACKFILL = False
LOCAL = False
if LOCAL == False:
stub = modal.Stub()
image = modal.Image.debian_slim().pip_install(["hopsworks==3.0.4","joblib","seaborn","sklearn","dataframe-image","tweepy","datetime","configparser","transformers","nltk","emot","tqdm","torch"])
@stub.function(image=image, timeout=3600, schedule=modal.Period(days=1), secret=modal.Secret.from_name("hopsworks-api-key"))
def f():
g()
def g():
import hopsworks
import pandas as pd
import numpy as np
import tweepy
import datetime
from datetime import date, timedelta
import time
import configparser
# connect to Hopsworks
project = hopsworks.login(api_key_value='U6PiDFwDVDQHP26X.XhXDZQ9QKiNwafhLh11PUntcyYW5Zp8aoXhoj1IJTGHDBu8owQJUKbFClHaehyMU')
# connect to feature store
fs = project.get_feature_store()
# connect to dataset API
dataset_api = project.get_dataset_api()
##### Backfill #####
# either use prepped tweets dataset reaching back to 2016 to add to feature group
if BACKFILL == True:
print("Backfill started. Downloading twitter backfill file...")
# load file from hopsworks api
if not os.path.exists("twitter_bitcoin_sentiment_assets"):
os.mkdir("twitter_bitcoin_sentiment_assets")
downloaded_file_path = dataset_api.download(
"KTH_lab1_Training_Datasets/twitter_bitcoin_sentiment/tweets_influential_users.csv", local_path="./twitter_bitcoin_sentiment_assets/", overwrite=True)
df = pd.read_csv("./twitter_bitcoin_sentiment_assets/tweets_influential_users.csv", sep=";", decimal=".",lineterminator='\n',usecols=['time', 'id', 'tweet','followers'], index_col='id')
df = df.rename(columns={'time':'date','tweet':'text','followers':'user_followers'})
print(df.info())
print("Finished twitter backfill file download. Starting preprocess...")
twitter_df = tweets_preprocess_backfill(df)
print("Extracting tweet sentiment...")
# extract sentiment from tweet text
# set up pipe from huggingface transformer pipeline
from transformers import TextClassificationPipeline, AutoModelForSequenceClassification, AutoTokenizer, AutoConfig
from transformers import pipeline
from scipy.special import softmax
# to track progress of sentiment analysis
from tqdm import tqdm, tqdm_pandas
tqdm.pandas()
print("Initializing transformers pipeline...")
model_path = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(model_path)
config = AutoConfig.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
def sentiment_analysis(text):
encoded_input = tokenizer(text, return_tensors='pt')
output = model(**encoded_input)
scores = output[0][0].detach().numpy()
scores = softmax(scores)
# determine ranking
ranking = np.argsort(scores)
ranking = ranking[::-1]
ranked_label = config.id2label[ranking[0]]
# # printing of result
# for i in range(scores.shape[0]):
# l = config.id2label[ranking[i]]
# s = scores[ranking[i]]
# print(f"{i+1}) {l} {np.round(float(s), 4)}")
# scores is [negative, neutral, positive]
return scores
print("Running pipeline (for larger datasets this can take up to 2h or more)...")
twitter_df[['sentiment_score_negative','sentiment_score_neutral','sentiment_score_positive']] = pd.DataFrame(twitter_df.new_text.progress_apply(sentiment_analysis).tolist(), index= twitter_df.index)
print("Pipeline finished!")
twitter_df.to_csv("./twitter_bitcoin_sentiment_assets/checkpoint_with_sentiment.csv",sep=";",decimal=".")
# only select relevant features
twitter_df = twitter_df[["user_followers","date","new_text","sentiment_score_negative","sentiment_score_neutral","sentiment_score_positive"]]
# group by days
d = {'user_followers':'aggregate_followers','sentiment_score_negative':'sentiment_score_negative_mean','sentiment_score_neutral':'sentiment_score_neutral_mean','sentiment_score_positive':'sentiment_score_positive_mean'}
twitter_df = twitter_df.groupby(pd.Grouper(key='date',freq='D')).agg({'user_followers':'sum','sentiment_score_negative':'mean','sentiment_score_neutral':'mean','sentiment_score_positive':'mean'}).rename(columns=d).dropna()
try:
twitter_df.index = twitter_df.index.tz_convert(None)
except:
pass
print("Preprocess finished. With following snippet of twitter dataframe:")
print(twitter_df.head())
print("Downloading Bitcoin data...")
# import bitcoin price data to match with tweets from that day
bitcoin_df = pd.read_csv('https://www.cryptodatadownload.com/cdd/Bitstamp_BTCEUR_d.csv', skiprows=1, decimal=".", usecols=[
"date",
"symbol",
"open",
"high",
"low",
"close"],
parse_dates=['date'])
print("Bitcoin data downloaded. Processing and merging with twitter data...")
# date to only be date, not time
bitcoin_df.date = bitcoin_df.date.dt.floor('D')
# categorize fluctuation as neutral, bearish or bullish when fluctuation is greater or less than 2% bullish or bearish, in between neutral
bitcoin_df['bitcoin_fluctuation'] = 'Neutral'
bitcoin_df.loc[((bitcoin_df.close - bitcoin_df.open)/bitcoin_df.open) > 0.02, 'bitcoin_fluctuation'] = 'Bullish'
bitcoin_df.loc[((bitcoin_df.close - bitcoin_df.open)/bitcoin_df.open) < -0.02, 'bitcoin_fluctuation'] = 'Bearish'
counts = np.unique(bitcoin_df.bitcoin_fluctuation, return_counts=True)[1]
print(f"Bitcoin fluctuations are:\nBearish: {counts[0]}\nNeutral: {counts[2]}\nBullish: {counts[1]}")
bitcoin_df.set_index("date", inplace=True)
bitcoin_df_input = bitcoin_df[["bitcoin_fluctuation"]]
twitter_df = twitter_df.merge(bitcoin_df_input, on='date')
print("Merged with following snippet of final dataframe:")
print(twitter_df.head())
# or update with new twitter and corresponding bitcoin fluctuation data from yesterday
else:
print("Started daily update. Scraping daily tweets from Twitter API...")
# scrape yesterdays tweets from users which have more than 500 000 followers
tweets_df = scrape_tweets_daily()
# tweets_df.to_csv("project/twitter_bitcoin_sentiment_ml/Tomas_files/220110_tweets_bitcoin.csv", decimal=".",sep=";")
# tweets_df = pd.read_csv("project/twitter_bitcoin_sentiment_ml/Tomas_files/220110_tweets_bitcoin.csv", decimal=".",sep=";", index_col=0)
print("Finished scraping daily tweets. Starting preprocess...")
print(tweets_df.head())
twitter_df = tweets_preprocess_daily(tweets_df)
print(twitter_df.head())
print("Extracting tweet sentiment...")
# extract sentiment from tweet text
# set up pipe from huggingface transformer pipeline
from transformers import TextClassificationPipeline, AutoModelForSequenceClassification, AutoTokenizer, AutoConfig
from transformers import pipeline
from scipy.special import softmax
# to track progress of sentiment analysis
from tqdm import tqdm, tqdm_pandas
tqdm.pandas()
print("Initializing transformers pipeline...")
model_path = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(model_path)
config = AutoConfig.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
def sentiment_analysis(text):
encoded_input = tokenizer(text, return_tensors='pt')
output = model(**encoded_input)
scores = output[0][0].detach().numpy()
scores = softmax(scores)
# determine ranking
ranking = np.argsort(scores)
ranking = ranking[::-1]
ranked_label = config.id2label[ranking[0]]
# # printing of result
# for i in range(scores.shape[0]):
# l = config.id2label[ranking[i]]
# s = scores[ranking[i]]
# print(f"{i+1}) {l} {np.round(float(s), 4)}")
# scores is [negative, neutral, positive]
return scores
print("Running pipeline (for larger datasets this can take up to 2h or more)...")
twitter_df[['sentiment_score_negative','sentiment_score_neutral','sentiment_score_positive']] = pd.DataFrame(twitter_df.new_text.progress_apply(sentiment_analysis).tolist(), index= twitter_df.index)
print("Pipeline finished!")
# only select relevant features
twitter_df = twitter_df[["user_followers","date","new_text","sentiment_score_negative","sentiment_score_neutral","sentiment_score_positive"]]
# group by days
d = {'user_followers':'aggregate_followers','sentiment_score_negative':'sentiment_score_negative_mean','sentiment_score_neutral':'sentiment_score_neutral_mean','sentiment_score_positive':'sentiment_score_positive_mean'}
twitter_df = twitter_df.groupby(pd.Grouper(key='date',freq='D')).agg({'user_followers':'sum','sentiment_score_negative':'mean','sentiment_score_neutral':'mean','sentiment_score_positive':'mean'}).rename(columns=d).dropna()
# iloc[0] to only select yesterday's tweets and not today's
twitter_df = twitter_df.iloc[[0]]
try:
twitter_df.index = twitter_df.index.tz_convert(None)
except:
pass
print("Preprocess finished with following snipped of twitter dataframe:")
print(twitter_df.head())
print("Downloading Bitcoin data...")
# import bitcoin price data to match with tweets from that day
bitcoin_df = pd.read_csv('https://www.cryptodatadownload.com/cdd/Bitstamp_BTCEUR_d.csv', skiprows=1, decimal=".", usecols=[
"date",
"symbol",
"open",
"high",
"low",
"close"],
parse_dates=['date'])
print("Bitcoin data downloaded. Processing and merging with twitter data...")
# date to only be date, not time
bitcoin_df.date = bitcoin_df.date.dt.floor('D')
bitcoin_df['bitcoin_fluctuation'] = 'Neutral'
bitcoin_df.loc[((bitcoin_df.close - bitcoin_df.open)/bitcoin_df.open) > 0.02, 'bitcoin_fluctuation'] = 'Bullish'
bitcoin_df.loc[((bitcoin_df.close - bitcoin_df.open)/bitcoin_df.open) < -0.02, 'bitcoin_fluctuation'] = 'Bearish'
counts = np.unique(bitcoin_df.bitcoin_fluctuation, return_counts=True)[1]
print(f"Bitcoin fluctuations are:\nBearish: {counts[0]}\nNeutral: {counts[2]}\nBullish: {counts[1]}")
bitcoin_df.set_index("date", inplace=True)
bitcoin_df_input = bitcoin_df[["bitcoin_fluctuation"]]
twitter_df = twitter_df.merge(bitcoin_df_input, on='date')
print("Merged with following snippet of final dataframe:")
print(twitter_df.head())
# add to feature group
print("Pushing to feature group...")
twitter_fg = fs.get_or_create_feature_group(
name="twitter_bitcoin_sentiment",
version=1,
primary_key=['aggregate_followers','sentiment_score_negative_mean','sentiment_score_neutral_mean','sentiment_score_positive_mean'],
description="Twitter bitcoin sentiment dataset")
twitter_fg.insert(twitter_df, write_options={"wait_for_job" : False})
print("Feature pipeline finished!")
if __name__ == "__main__":
if LOCAL == True :
g()
else:
stub.deploy("bitcoin_sentiment_feature_pipeline_daily")