-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathLSTM for one stock APP
280 lines (217 loc) · 13 KB
/
LSTM for one stock APP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import tushare as ts
import pandas as pd
def get_data(code, date):
#数据准备/data preparation
#变量选取Open,High,Low,Close,Volume等,以浦发银行股票为例
pro = ts.pro_api('f3bbc97d0ffbbed8666e6f7c82e712165950d048987f5d6cfbf1e0ce') #token可以在新版tushare的网站上找到
stock_data = pro.query('daily', ts_code = code, start_date = '20000101', end_date = date)
stock_data = stock_data[::-1] #倒序,使日期靠前的排在前面
stock_data.reset_index(drop=True, inplace=True) #把每行的索引改为“0、1、2……”
return stock_data
import numpy as np
from sklearn.preprocessing import MinMaxScaler
def preprocess(stock_data, day, seq_length, data_dim, output_dim, visual_window):
xy = stock_data[['open', 'close', 'high', 'low', 'vol', 'pct_chg', 'amount']] #选取需要的features
xy = np.array(xy.values) #转为array
dataXY = []
for i in range(0, len(xy) - seq_length - day + 1):
_xy = xy[i:i + seq_length + day] #包括用于计算的seq_length天的数据,以及day天后的价格
dataXY.append(_xy)
#调整数据的shape
xy_real = np.vstack(dataXY).reshape(-1, seq_length + day, data_dim)
dataXY = xy_real
app_dataX = []
for i in range(len(xy) - seq_length - day + 1, len(xy) - seq_length + 1):
_x = xy[i:i + seq_length] #包括用于计算的seq_length天的数据
app_dataX.append(_x)
#调整数据的shape
x_real = np.vstack(app_dataX).reshape(-1, seq_length, data_dim)
app_dataX = x_real
xy_visual = np.copy(dataXY[- visual_window:]) #取最近visual_window天的数据,用于最后的画图
np.random.shuffle(dataXY) #打乱顺序
#切分训练集合测试集/split to train and testing
train_size = int(len(dataXY) * 0.7) #训练集长度
test_size = len(dataXY) - train_size #测试集长度
xy_train, xy_test = np.array(dataXY[0:train_size]), np.array(dataXY[train_size:len(dataXY)]) #划分训练集、测试集
#先处理训练集的数据
scaler = MinMaxScaler()
xy_train = xy_train.reshape((-1, data_dim)) #先变成2维,才能transform
xy_train_new = scaler.fit_transform(xy_train) #预处理,按列操作,每列最小值为0,最大值为1
xy_train_new = xy_train_new.reshape((-1, seq_length + day, data_dim)) #变回3维
x_new = xy_train_new[:,0:seq_length] #features
y_new = xy_train_new[:,-1,1] * 10 #取最后一天的收盘价,用作label,适当放大,便于训练
trainX, trainY = x_new, y_new
#然后处理测试集的数据
xy_test = xy_test.reshape((-1, data_dim))
xy_test_new = scaler.transform(xy_test) #使用训练集的scaler预处理测试集的数据
xy_test_new = xy_test_new.reshape((-1, seq_length + day, data_dim))
x_new = xy_test_new[:, 0:seq_length]
y_new = xy_test_new[:, -1, 1] * 10
#以下3项用于计算收入
close_price = xy_test_new[:, seq_length - 1, 1]
buy_price = xy_test_new[:, seq_length, 0]
sell_price = xy_test_new[:, -1, 1]
testX, testY, test_close, test_buy, test_sell = x_new, y_new, close_price, buy_price, sell_price
#再处理应用集
x_app = app_dataX.reshape((-1, data_dim))
appX = scaler.transform(x_app) #用训练集的scaler进行预处理
appX = appX.reshape((-1, seq_length, data_dim))
#最后处理用于画图的数据
xy_visual = xy_visual.reshape((-1, data_dim))
xy_visual_new = scaler.transform(xy_visual) #使用训练集的scaler预处理
xy_visual_new = xy_visual_new.reshape((-1, seq_length + day, data_dim))
x_new = xy_visual_new[:, 0:seq_length]
y_new = xy_visual_new[:, -1, 1] * 10
visualX, visualY = x_new, y_new
return trainX, trainY, testX, testY, appX, scaler, test_close, test_buy, test_sell, visualX, visualY
from keras.layers import Input, Dense, LSTM, Reshape
from keras.models import Model
from keras import regularizers, callbacks
def train(code, day, trainX, trainY, seq_length, data_dim, output_dim):
# 构建神经网络层 1层Dense层+1层LSTM层+4层Dense层
rnn_units = 32
Dense_input = Input(shape=(seq_length, data_dim), name='dense_input') #输入层
#shape: 形状元组(整型)不包括batch size。表示了预期的输入将是一批(seq_len,data_dim)的向量。
Dense_output_1 = Dense(rnn_units, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense1')(Dense_input) #全连接网络
lstm_input = Reshape(target_shape=(seq_length, rnn_units), name='reshape2')(Dense_output_1)
#改变Tensor形状,改变后是(None,seq_length, rnn_units)
lstm_output = LSTM(rnn_units, activation='tanh', dropout=1.0, name='lstm')(lstm_input) #LSTM网络
#units: Positive integer,dimensionality of the output space.
#dropout: Float between 0 and 1. Fraction of the units to drop for the linear transformation of the inputs.
Dense_input_2 = Reshape(target_shape=(rnn_units,), name='reshape3')(lstm_output)
#改变Tensor形状,改变后是(None,rnn_units)
Dense_output_2 = Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense2')(Dense_input_2) #全连接网络
Dense_output_3 = Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense3')(Dense_output_2) #全连接网络
Dense_output_4 = Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense4')(Dense_output_3) #全连接网络
predictions = Dense(output_dim, activation=None, kernel_regularizer=regularizers.l2(0.0), name='dense5')(Dense_output_4) #全连接网络
model = Model(inputs=Dense_input, outputs=predictions)
#This model will include all layers required in the computation of output given input.
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
#Configures the model for training.
#optimizer: String (name of optimizer) or optimizer instance. See optimizers.
#loss: String (name of objective function) or objective function.The loss value will be minimized by the model.
#metrics: List of metrics to be evaluated by the model during training and testing. Typically you will use metrics=['accuracy'].
ES = callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='auto', baseline=None)
model.fit(trainX, trainY, batch_size=256, epochs=400, verbose=0, callbacks=[ES], validation_split=0.1)
#Trains the model for a given number of epochs (iterations on a dataset).
#verbose: Integer. 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch.
# 保存模型
model.save(code + '(1)' + str(day) + '.h5') # HDF5文件,pip install h5py
return model
def test(model, testX, testY, scaler, day, close_price, buy_price, sell_price, visualX, visualY):
testPredict = model.predict(testX) #查看测试结果
testPredict2 = testPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
testY2 = testY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
#以下3项用于计算收入
#今天的收盘价,用于判断买不买
close_price2 = close_price * scaler.data_range_[1] + scaler.data_min_[1]
#明天的开盘价,如果买需要付多少钱
buy_price2 = buy_price * scaler.data_range_[0] + scaler.data_min_[0]
#持有day天之后的收盘价,这时卖能卖多少钱
sell_price2 = sell_price * scaler.data_range_[1] + scaler.data_min_[1]
#平均误差(%)
mean_error = np.mean(abs(testPredict2 - testY2) / testY2 * 100)
mean_error = round(mean_error, 2)
print('平均误差(%):', mean_error)
#最大误差(%)
max_error = np.max(abs(testPredict2 - testY2) / testY2 * 100)
max_error = round(max_error, 2)
print('最大误差(%):', max_error)
count = 0 #绝对误差小于1%的比例
correct = np.zeros(len(testPredict2)) #预测涨或跌的正确率
model_income = 0 #模型能挣多少钱
trade = 0 #计算交易频率
max_income = 0 #最理想的状况下,能挣多少钱
random_income = 0 #随机购买,能挣多少钱
tolerance = 1
for i in range(len(testY2)):
#计算绝对误差小于 tolerance% 的比例
if abs(testPredict2[i] - testY2[i]) / testY2[i] * 100 <= tolerance:
count += 1
#计算对转折点的预测正确率
if np.sign(testPredict2[i] - close_price2[i]) == np.sign(testY2[i] - close_price2[i]):
#如果对涨或跌的判断准确,这里用正负符号判断
correct[i] = 1 #就加1
#如果对“day”天后的预测价格高于今天的收盘价,就买进并持有“day”天,计算能挣多少钱
if testPredict2[i] > close_price2[i]:
model_income = model_income + sell_price2[i] - buy_price2[i]
trade += 1
#最理想的状况下,能挣多少钱
if testY2[i] > close_price2[i]:
max_income = max_income + sell_price2[i] - buy_price2[i]
#随机购买,能挣多少钱
buy = np.random.randint(0, 2) #随机产生0或1
if buy: #如果是1就买
random_income = random_income + sell_price2[i] - buy_price2[i]
count = count / len(testY2) * 100
count = round(count, 2)
print('误差小于' + str(tolerance) + '%的比例:', count)
accuracy = np.sum(correct) / len(correct) * 100
accuracy = round(accuracy, 2)
print('预测涨或跌的正确率:', accuracy)
print('模型的购买策略是,如果对%d天之后的预测值大于今天的收盘价,就在明天开市时买进1股,并且持有%d天,再卖出'%(day, day))
frequency = trade / len(testPredict2) * 100
model_income = round(float(model_income), 2)
frequency = round(frequency, 2)
print('在%d天中,模型交易了%d次,交易频率为%g'%(len(testPredict2), trade, frequency) + '%')
print('按照模型进行操作所得的收入:', model_income)
max_income = round(float(max_income), 2)
print('最理想状况下的收入:', max_income)
random_income = round(float(random_income), 2)
print('随机购买的收入:', random_income)
visualPredict = model.predict(visualX)
visualPredict2 = visualPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
visualY2 = visualY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
return visualY2, visualPredict2
def apply(model, appX, scaler):
#查看应用结果
appPredict = model.predict(appX)
appPredict2 = appPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
appPredict2 = appPredict2.reshape(-1)
return appPredict2
import matplotlib.pyplot as plt
def visualize(visualY2, visualPredict2, appPredict2, visual_window, day):
plt.figure(figsize=(16,8)) #画布大小
plt.plot(list(range(len(visualY2))), visualY2, color='blue') #只显示最近“period”天的测试记录
plt.plot(list(range(len(visualPredict2))), visualPredict2, color='orange')
plt.scatter(
list(range(len(visualPredict2), len(visualPredict2) + len(appPredict2))), appPredict2, color='red')
plt.xlim((0, visual_window + day)) #x坐标范围
plt.legend(['True price', 'Model result', 'Prediction'], loc='upper left')
plt.ylabel('price')
plt.xlabel('time')
plt.show()
def main():
code = input("请输入6位代码:") #输入股票代码
code = code + '.SH'
day = input("请输入预测天数:") #输入预测多少天后的价格
day = int(day)
import time
date = time.strftime('%Y%m%d',time.localtime(time.time())) #获取当天日期
#参数设置/parameter setting
timesteps = seq_length = 20 #时间窗/window length
data_dim = 7 #输入数据维度/dimension of input data
output_dim = 1 #输出数据维度/dimension of output data
visual_window = 200
try:
stock_data = get_data(code, date)
except:
print('代码不正确或无法获得该股票的数据')
return
if len(stock_data) == 0:
print('代码不正确或无法获得该股票的数据')
return
trainX, trainY, testX, testY, appX, scaler, test_close, test_buy, test_sell, visualX, visualY = preprocess(
stock_data, day, seq_length, data_dim, output_dim, visual_window)
try:
# 载入模型
from keras.models import load_model
model = load_model(code + '(1)' + str(day) + '.h5')
except:
print('第一次预测%d天内'%(day) + code + '的估价,需要一点时间建模')
model = train(
code, day, trainX, trainY, seq_length, data_dim, output_dim)
visualY2, visualPredict2 = test(model, testX, testY, scaler, day, test_close, test_buy, test_sell, visualX, visualY)
appPredict2 = apply(model, appX, scaler)
visualize(visualY2, visualPredict2, appPredict2, visual_window, day)
main()