-
Notifications
You must be signed in to change notification settings - Fork 57
/
相对OBV&MACD组合.py
275 lines (240 loc) · 8.92 KB
/
相对OBV&MACD组合.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import talib
from datetime import timedelta, date
import numpy as np
def Force(obv,n):
min_obv = min(obv[-n:-1])
max_obv = max(obv[-n:-1])
r_volume = (obv[-1] - min_obv)/(max_obv - min_obv)
return r_volume
# 600307.XSHG 601988.XSHG
'''
================================================================================
总体回测前
================================================================================
'''
#总体回测前要做的事情
def initialize(context):
set_params(context) #1设置策参数
set_variables() #2设置中间变量
set_backtest() #3设置回测条件
#1
#设置策略参数
def set_params(context):
g.tc=15 # 调仓频率
g.N = 1
g.security = "510300.XSHG"
set_benchmark('510300.XSHG')
g.benchmark = '000300.XSHG'
g.pre_dea = 0
g.pre_ema_12 = 0
g.pre_ema_26 = 0
g.days = 0 #记录当前的天数
g.count = 0
g.one = 11
g.two = 20
g.three = 9
set_universe(g.security)
set_commission(PerTrade(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
set_slippage(PriceRelatedSlippage())
g.r_volume_data = 0
g.type = []
g.one_day = timedelta(days = 1)
g.start = context.current_dt.date() - 30*g.one_day
#2
#设置中间变量
def set_variables():
return
#3
#设置回测条件
def set_backtest():
set_option('use_real_price', True) #用真实价格交易
log.set_level('order', 'error')
'''
================================================================================
每天开盘前
================================================================================
'''
#每天开盘前要做的事情
def before_trading_start(context):
set_slip_fee(context)
security = ['000300.XSHG']
start = g.start
end = context.current_dt.date() - g.one_day
df = get_price(security[-1], start_date=start, end_date=end, frequency='daily', fields=['close','volume','factor'])
# 剔除停盘数据
df[df['volume']==0] = np.nan
df = df.dropna()
# 回复原始价格
df['close'] = df['close']/df['factor']
# 求obv值
obv = talib.OBV(df['close'].values,double(df['volume'].values))
# 求相对成交量
r_volume = Force(obv,10)
# 确定买入卖出信号
if r_volume>0.5:
g.r_volume_data = 1
g.type.append(g.r_volume_data)
elif r_volume < -0.5:
g.r_volume_data = -1
g.type.append(g.r_volume_data)
else:
g.r_volume_data = 0
g.type.append(g.r_volume_data)
#4
# 根据不同的时间段设置滑点与手续费
def set_slip_fee(context):
# 将滑点设置为0
set_slippage(FixedSlippage(0))
# 根据不同的时间段设置手续费
dt=context.current_dt
if dt>datetime.datetime(2013,1, 1):
set_commission(PerTrade(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
elif dt>datetime.datetime(2011,1, 1):
set_commission(PerTrade(buy_cost=0.001, sell_cost=0.002, min_cost=5))
elif dt>datetime.datetime(2009,1, 1):
set_commission(PerTrade(buy_cost=0.002, sell_cost=0.003, min_cost=5))
else:
set_commission(PerTrade(buy_cost=0.003, sell_cost=0.004, min_cost=5))
'''
================================================================================
每天交易时
================================================================================
'''
def handle_data(context, data):
dt = context.current_dt
if dt.hour == 9 and dt.minute == 30:
g.count += 1
if g.count % 7 == 0:
g.count = 1
g.days += 1
# 将总资金等分为g.N份,为每只股票配资
capital_unit = context.portfolio.portfolio_value/g.N
#执行卖出操作
if signal_stock_sell(context,data) == True:
order_target_value(g.security,0)
print ("时间:%d/%d/%d 操作:卖出 当前股价:%f"%(dt.year, dt.month, dt.day, data[g.security].price))
# 执行买入操作
if signal_stock_buy(context,data) == True:
print ("时间:%d/%d/%d 操作:买入 当前股价:%f"%(dt.year, dt.month, dt.day, data[g.security].price))
order_target_value(g.security,capital_unit)
record(dif=g.pre_ema_12 - g.pre_ema_26)
record(dea=g.pre_dea)
#5
#获得卖出信号
#输入:context, data
#输出:sell - list
def signal_stock_sell(context,data):
obv = False
macd = False
if g.r_volume_data != 1:
#成交量
obv = True
pass
(dif_pre, dif_now) = get_dif()
(dea_pre, dea_now) = get_dea(dif_now)
# 如果短均线从上往下穿越长均线,则为死叉信号,标记卖出
if dif_now < dea_now and dif_pre > dea_pre and context.portfolio.positions[g.security].sellable_amount > 0:
macd = True
pass
return macd and obv
#6
#获得买入信号
#输入:context, data
#输出:buy - list
def signal_stock_buy(context,data):
obv = False
macd = False
if g.r_volume_data == 1:
# 成交量
obv = True
pass
(dif_pre, dif_now) = get_dif()
(dea_pre, dea_now) = get_dea(dif_now)
if g.days >= g.two:
g.pre_ema_12 = get_EMA(g.benchmark, g.one, data)
g.pre_ema_26 = get_EMA(g.benchmark, g.two, data)
if g.pre_dea == 0:
pass
# 如果短均线从下往上穿越长均线,则为金叉信号,标记买入
if dif_now > dea_now and dif_pre < dea_pre and context.portfolio.positions[g.security].sellable_amount == 0 :
macd = True
pass
return macd and obv
#7
# 计算移动平均线数据
# 输入:股票代码-字符串,移动平均线天数-整数
# 输出:算术平均值-浮点数
def get_MA(security_code,days):
# 获得前days天的数据,详见API
a=attribute_history(security_code, days, '1d', ('close'))
# 定义一个局部变量sum,用于求和
sum=0
# 对前days天的收盘价进行求和
for i in range(1,days+1):
sum+=a['close'][-i]
# 求和之后除以天数就可以的得到算术平均值啦
return sum/days
#8
# 计算指数移动平均线数据
# 输入:股票代码-字符串,移动指数平均线天数-整数,data
# 输出:今天和昨天的移动指数平均数-浮点数
def get_EMA(security_code,days,data):
# 如果只有一天的话,前一天的收盘价就是移动平均
if days==1:
# 获得前两天的收盘价数据,一个作为上一期的移动平均值,后一个作为当期的移动平均值
t = attribute_history(security_code, 2, '1d', ('close'))
return t['close'][-1]
else:
# 如果全局变量g.EMAs不存在的话,创建一个字典类型的变量,用来记录已经计算出来的EMA值
if 'EMAs' not in dir(g):
g.EMAs={}
# 字典的关键字用股票编码和天数连接起来唯一确定,以免不同股票或者不同天数的指数移动平均弄在一起了
key="%s%d" %(security_code,days)
# 如果关键字存在,说明之前已经计算过EMA了,直接迭代即可
if key in g.EMAs:
#计算alpha值
alpha=(days-1.0)/(days+1.0)
# 获得前一天的EMA(这个是保存下来的了)
EMA_pre=g.EMAs[key]
# EMA迭代计算
EMA_now=EMA_pre*alpha+data[security_code].close*(1.0-alpha)
# 写入新的EMA值
g.EMAs[key]=EMA_now
# 给用户返回昨天和今天的两个EMA值
return EMA_now
# 如果关键字不存在,说明之前没有计算过这个EMA,因此要初始化
else:
# 获得days天的移动平均
ma=get_MA(security_code,days)
# 如果滑动平均存在(不返回NaN)的话,那么我们已经有足够数据可以对这个EMA初始化了
if not(isnan(ma)):
g.EMAs[key]=ma
# 因为刚刚初始化,所以前一期的EMA还不存在
return ma
else:
# 移动平均数据不足days天,只好返回NaN值
return float("nan")
def get_dif():
if g.pre_ema_12 == 0 or g.pre_ema_26 == 0:
return (0, 0)
else:
pre_dif = g.pre_ema_12 - g.pre_ema_26
close_price = attribute_history(g.benchmark, 1, '1d', 'close') #close_price[0]为当天的收盘价
g.pre_ema_12 = g.pre_ema_12 * (g.one - 1) / (g.one + 1) + close_price['close'][0] * 2 / (g.one + 1)
g.pre_ema_26 = g.pre_ema_26 * (g.two - 1) / (g.two + 1) + close_price['close'][0] * 2 / (g.two + 1)
return (pre_dif, g.pre_ema_12 - g.pre_ema_26)
def get_dea(dif):
if g.pre_dea == 0:
return (0, 0)
else:
pre_dea = g.pre_dea
g.pre_dea = g.pre_dea * (g.three-1) / (g.three+1) + dif * 2 / (g.three+1)
return (pre_dea, g.pre_dea)
'''
================================================================================
每天收盘后
================================================================================
'''
# 每日收盘后要做的事情(本策略中不需要)
def after_trading_end(context):
return