forked from alpacahq/Momentum-Trading-Example
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathalgo.py
396 lines (356 loc) · 13.7 KB
/
algo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
import alpaca_trade_api as tradeapi
import requests
import time
from ta.trend import macd
import numpy as np
from datetime import datetime, timedelta
from pytz import timezone
# Replace these with your API connection info from the dashboard
base_url = 'Your API URL'
api_key_id = 'Your API Key'
api_secret = 'Your API Secret'
api = tradeapi.REST(
base_url=base_url,
key_id=api_key_id,
secret_key=api_secret
)
session = requests.session()
# We only consider stocks with per-share prices inside this range
min_share_price = 2.0
max_share_price = 13.0
# Minimum previous-day dollar volume for a stock we might consider
min_last_dv = 500000
# Stop limit to default to
default_stop = .95
# How much of our portfolio to allocate to any one position
risk = 0.001
def get_1000m_history_data(symbols):
print('Getting historical data...')
minute_history = {}
c = 0
for symbol in symbols:
minute_history[symbol] = api.polygon.historic_agg_v2(
timespan="minute", symbol=symbol, limit=1000, _from='2020-06-26', to='2020-06-26', multiplier=1
).df
c += 1
print('{}/{}'.format(c, len(symbols)))
print('Success.')
return minute_history
def get_tickers():
print('Getting current ticker data...')
tickers = api.polygon.all_tickers()
print('Success.')
assets = api.list_assets()
symbols = [asset.symbol for asset in assets if asset.tradable]
return [ticker for ticker in tickers if (
ticker.ticker in symbols and
ticker.lastTrade['p'] >= min_share_price and
ticker.lastTrade['p'] <= max_share_price and
ticker.prevDay['v'] * ticker.lastTrade['p'] > min_last_dv and
ticker.todaysChangePerc >= 3.5
)]
def find_stop(current_value, minute_history, now):
series = minute_history['low'][-100:] \
.dropna().resample('5min').min()
series = series[now.floor('1D'):]
diff = np.diff(series.values)
low_index = np.where((diff[:-1] <= 0) & (diff[1:] > 0))[0] + 1
if len(low_index) > 0:
return series[low_index[-1]] - 0.01
return current_value * default_stop
def run(tickers, market_open_dt, market_close_dt):
# Establish streaming connection
conn = tradeapi.StreamConn(base_url=base_url, key_id=api_key_id, secret_key=api_secret)
# Update initial state with information from tickers
volume_today = {}
prev_closes = {}
for ticker in tickers:
symbol = ticker.ticker
prev_closes[symbol] = ticker.prevDay['c']
volume_today[symbol] = ticker.day['v']
symbols = [ticker.ticker for ticker in tickers]
print('Tracking {} symbols.'.format(len(symbols)))
minute_history = get_1000m_history_data(symbols)
portfolio_value = float(api.get_account().portfolio_value)
open_orders = {}
positions = {}
# Cancel any existing open orders on watched symbols
existing_orders = api.list_orders(limit=500)
for order in existing_orders:
if order.symbol in symbols:
api.cancel_order(order.id)
stop_prices = {}
latest_cost_basis = {}
# Track any positions bought during previous executions
existing_positions = api.list_positions()
for position in existing_positions:
if position.symbol in symbols:
positions[position.symbol] = float(position.qty)
# Recalculate cost basis and stop price
latest_cost_basis[position.symbol] = float(position.cost_basis)
stop_prices[position.symbol] = (
float(position.cost_basis) * default_stop
)
# Keep track of what we're buying/selling
target_prices = {}
partial_fills = {}
# Use trade updates to keep track of our portfolio
@conn.on(r'trade_update')
async def handle_trade_update(conn, channel, data):
symbol = data.order['symbol']
last_order = open_orders.get(symbol)
if last_order is not None:
event = data.event
if event == 'partial_fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = qty
positions[symbol] += qty
open_orders[symbol] = data.order
elif event == 'fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = 0
positions[symbol] += qty
open_orders[symbol] = None
elif event == 'canceled' or event == 'rejected':
partial_fills[symbol] = 0
open_orders[symbol] = None
@conn.on(r'A$')
async def handle_second_bar(conn, channel, data):
symbol = data.symbol
# First, aggregate 1s bars for up-to-date MACD calculations
ts = data.start
ts -= timedelta(seconds=ts.second, microseconds=ts.microsecond)
try:
current = minute_history[data.symbol].loc[ts]
except KeyError:
current = None
new_data = []
if current is None:
new_data = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
else:
new_data = [
current.open,
data.high if data.high > current.high else current.high,
data.low if data.low < current.low else current.low,
data.close,
current.volume + data.volume
]
minute_history[symbol].loc[ts] = new_data
# Next, check for existing orders for the stock
existing_order = open_orders.get(symbol)
if existing_order is not None:
# Make sure the order's not too old
submission_ts = existing_order.submitted_at.astimezone(
timezone('America/New_York')
)
order_lifetime = ts - submission_ts
if order_lifetime.seconds // 60 > 1:
# Cancel it so we can try again for a fill
api.cancel_order(existing_order.id)
return
# Now we check to see if it might be time to buy or sell
since_market_open = ts - market_open_dt
until_market_close = market_close_dt - ts
if (
since_market_open.seconds // 60 > 15 and
since_market_open.seconds // 60 < 60
):
# Check for buy signals
# See if we've already bought in first
position = positions.get(symbol, 0)
if position > 0:
return
# See how high the price went during the first 15 minutes
lbound = market_open_dt
ubound = lbound + timedelta(minutes=15)
high_15m = 0
try:
high_15m = minute_history[symbol][lbound:ubound]['high'].max()
except Exception as e:
# Because we're aggregating on the fly, sometimes the datetime
# index can get messy until it's healed by the minute bars
return
# Get the change since yesterday's market close
daily_pct_change = (
(data.close - prev_closes[symbol]) / prev_closes[symbol]
)
if (
daily_pct_change > .04 and
data.close > high_15m and
volume_today[symbol] > 30000
):
# check for a positive, increasing MACD
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=12,
n_slow=26
)
if (
hist[-1] < 0 or
not (hist[-3] < hist[-2] < hist[-1])
):
return
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=40,
n_slow=60
)
if hist[-1] < 0 or np.diff(hist)[-1] < 0:
return
# Stock has passed all checks; figure out how much to buy
stop_price = find_stop(
data.close, minute_history[symbol], ts
)
stop_prices[symbol] = stop_price
target_prices[symbol] = data.close + (
(data.close - stop_price) * 3
)
shares_to_buy = portfolio_value * risk // (
data.close - stop_price
)
if shares_to_buy == 0:
shares_to_buy = 1
shares_to_buy -= positions.get(symbol, 0)
if shares_to_buy <= 0:
return
print('Submitting buy for {} shares of {} at {}'.format(
shares_to_buy, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(shares_to_buy), side='buy',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
if(
since_market_open.seconds // 60 >= 24 and
until_market_close.seconds // 60 > 15
):
# Check for liquidation signals
# We can't liquidate if there's no position
position = positions.get(symbol, 0)
if position == 0:
return
# Sell for a loss if it's fallen below our stop price
# Sell for a loss if it's below our cost basis and MACD < 0
# Sell for a profit if it's above our target price
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=13,
n_slow=21
)
if (
data.close <= stop_prices[symbol] or
(data.close >= target_prices[symbol] and hist[-1] <= 0) or
(data.close <= latest_cost_basis[symbol] and hist[-1] <= 0)
):
print('Submitting sell for {} shares of {} at {}'.format(
position, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(position), side='sell',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
elif (
until_market_close.seconds // 60 <= 15
):
# Liquidate remaining positions on watched symbols at market
try:
position = api.get_position(symbol)
except Exception as e:
# Exception here indicates that we have no position
return
print('Trading over, liquidating remaining position in {}'.format(
symbol)
)
api.submit_order(
symbol=symbol, qty=position.qty, side='sell',
type='market', time_in_force='day'
)
symbols.remove(symbol)
if len(symbols) <= 0:
conn.close()
conn.deregister([
'A.{}'.format(symbol),
'AM.{}'.format(symbol)
])
# Replace aggregated 1s bars with incoming 1m bars
@conn.on(r'AM$')
async def handle_minute_bar(conn, channel, data):
ts = data.start
ts -= timedelta(microseconds=ts.microsecond)
minute_history[data.symbol].loc[ts] = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
volume_today[data.symbol] += data.volume
channels = ['trade_updates']
for symbol in symbols:
symbol_channels = ['A.{}'.format(symbol), 'AM.{}'.format(symbol)]
channels += symbol_channels
print('Watching {} symbols.'.format(len(symbols)))
run_ws(conn, channels)
# Handle failed websocket connections by reconnecting
def run_ws(conn, channels):
try:
conn.run(channels)
except Exception as e:
print(e)
conn.close()
run_ws(conn, channels)
if __name__ == "__main__":
# Get when the market opens or opened today
nyc = timezone('America/New_York')
today = datetime.today().astimezone(nyc)
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
calendar = api.get_calendar(start=today_str, end=today_str)[0]
market_open = today.replace(
hour=calendar.open.hour,
minute=calendar.open.minute,
second=0
)
market_open = market_open.astimezone(nyc)
market_close = today.replace(
hour=calendar.close.hour,
minute=calendar.close.minute,
second=0
)
market_close = market_close.astimezone(nyc)
# Wait until just before we might want to trade
current_dt = datetime.today().astimezone(nyc)
since_market_open = current_dt - market_open
while since_market_open.seconds // 60 <= 14:
time.sleep(1)
since_market_open = current_dt - market_open
run(get_tickers(), market_open, market_close)