-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathresults.py
264 lines (249 loc) · 10.9 KB
/
results.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import pandas as pd
import logging
import sqlite3
import numpy as np
import yfinance as yf
import performance as perf
import sql as sql
import signals as signals
import indicators as indicators
import indicators.vwap as vwap
import indicators.avwap as avwap
import indicators.trendline as trendline
#Plots
import matplotlib
matplotlib.use('Qt5Agg') # or 'TkAgg', 'GTK3Agg', etc.
import matplotlib.pyplot as plt
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL) #Debug or Critical are used
def processData(df: pd.DataFrame):
logger.warning('Attempting to processData() df = df.copy()')
df = df.copy()
logger.warning('Setting maxdraw maxprofit maxdrawstrat maxprofitstrat to df[x][-1]')
#maxDailyDrawdown
maxdraw = df['maxDailyDrawdown'][-1]
maxprofit = df['cummax'][-1]
maxdrawstrat = df['drawmaxstrat'][-1]
maxprofitstrat = df['cummaxstrat'][-1]
logger.warning('Attempting to calculate basePerf() for buy and hold')
holdres = perf.basePerf(df, 252)
logger.warning('Attempting to calculate basePerf() for strategy')
stratres = perf.basePerf(df, 252, strategy = True)
return holdres, stratres
def average(tableName: str):
x = sql.getDataFromTable(tableName, db = "stock_data.db") #output data
z = 0
even = True
odds = []
evens = []
bh1 = 0
bh2 = 0
strat1 = 0
strat2 = 0
bhres = ()
stratres = ()
for profit in x.loc[:, "Profit Factor"]:
if(even == True):
evens.append(profit)
even = False
else:
odds.append(profit)
even = True
z = 0
for n in evens:
z += n
bh1 = z / len(evens)
z = 0
for n in odds:
z += n
strat1 = z / len(odds)
evens = True
evens = []
odds = []
for PnL in x.loc[:, "PnL"]:
if(even == True):
evens.append(PnL)
even = False
else:
odds.append(PnL)
even = True
z = 0
for n in evens:
z += n
bh2 = z / len(evens)
z = 0
for n in odds:
z += n
strat2 = z / len(odds)
bhres = (bh1, bh2)
stratres = (strat1, strat2)
return bhres, stratres
#Pass raw data | lookback is an indicator variable. View is a strategy variable
def processDataMultiple(stocks: list, dateFrom: str, dateTO: str, lookback: int, view: int, strat = 1, plotMe = True):
initial_balance = 100 # Example initial balance
position_size_percent = 0.9 # 10% of balance per trade
indx = 1
d1 = dateFrom.replace('-', "")
d2 = dateTO.replace('-', "")
# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect('stock_data.db')
connRaw = sqlite3.connect('raw_data.db')
#Naive implementation
#sql.getData(stock: str, start_date: str, end_date: str, conn) -> pd.DataFrame:
for stock in stocks:
#logger.critical("Checking if data is already downloaded")
debug = sql.exists(stock, dateFrom, dateTO, connRaw)
logger.critical(f'TRUE/FALSE EVALS TO: {debug}')
if(sql.exists(stock, dateFrom, dateTO, connRaw) == False):
#Data isn't downloaded, downloading it
#logger.critical("Data isn't downloaded, downloading it")
data = yf.download(stock, dateFrom, dateTO)
#Formatting
data.rename(columns={'Adj Close': 'AdjClose'}, inplace=True)
data.reset_index(inplace=True) # Resetting index if date is index
data.loc[:, 'Date'] = pd.to_datetime(data.loc[:, 'Date']).dt.strftime('%Y-%m-%d') # Format date
# Writing it
#Writing it
debug = data.iloc[:, 0]
try:
#logger.critical("Attempting to write data")
#logger.critical(debug)
sql.writeRaw(data, stock)
except:
logger.warning("Fuck")
logger.warning("MOVING ON")
#END OF NAIVE
logger.warning('Attempting to processDataMultiple()')
logger.warning('Create ans as empty data frame')
ans = pd.DataFrame()
logger.warning('Beginning loop through stocks')
axs = []
plots = []
for stock in stocks:
logger.warning(f'Attempting to work on {stock}')
#dl data
logger.warning('Attempting to download data')
data = sql.getData(stock, dateFrom, dateTO, connRaw)
debug = data.columns
logger.critical(debug)
data['VWAP'] = vwap.VWAP(data)
#window_days = 2 is vwap anchored to yesterday
data['AVWAP'] = avwap.AVWAP(data, 2)
#logger.critical(data['VWAP'])
#logger.critical(data['AVWAP'])
#logger.critical(f'DATA FROM sql.getData {data}')
#data.loc[:, 'Date'] = pd.to_datetime(data.loc[:, 'Date'])
#data.set_index('Date', inplace=True)
#logger.critical(data['Close'])
#Input raw into db if not there
#sql.writeRaw(data, sto)
#End db code
#setup
logger.warning('Creating copy of data called data2 for real pricing performance calculations')
#Save copy of data for real pricing for performance calcs
data2 = data.copy()
data, data2 = trendline.getTrend(data, data2, lookback, view)
#Assign position based on signals.signal(s) np.where(signal1 + signal2 = 2, 1, 0)
logger.warning('Attempting to set position based on data')
#STRATEGY
#CHANGE THIS TO CHANGE STRATEGY
#logger.critical(f'STUFF: {signals.signal(data, strat)[1]}')
data2.loc[:,'position'] = signals.signal(data, strat)[0]
data.loc[:,'position'] = signals.signal(data, strat)[0]
#Calculate buy and hold returns for raw data and log data
logger.warning('Attempting to set Buy and hold returns')
data['Returnsb&h'] = data.loc[:,'Close'] - data.loc[:,'Close'].shift(1)
data2['Returnsb&h'] = data2.loc[:,'Close'].pct_change()
#Calculate strategy returns based on position for both log and real
logger.warning('Attempting to set strategy returns based on position')
data['Strategy'] = data.loc[:,'Returnsb&h'] * data.loc[:,'position'].shift(1)
data2['Strategy'] = data2.loc[:,'Returnsb&h'] * data2.loc[:,'position'].shift(1)
#Drop nans
logger.warning('Attempting to drop NaNs')
data2 = data2.dropna()
data = data.dropna()
#Calculate cumulative compounding returns for buying and holding along with our strategy
# * 100 to make it a %
logger.warning('Attempting to set cumreturns for strat and hold for both log and real')
data2['cumreturns'] = ((1 + data2.loc[:,'Returnsb&h']).cumprod() - 1) * 100
data2['cumreturnsstrat'] = ((1 + data2.loc[:,'Strategy']).cumprod() - 1) * 100
data['cumreturns'] = ((1 + data.loc[:,'Returnsb&h']).cumprod() - 1) * 100
data['cumreturnsstrat'] = ((1 + data.loc[:,'Strategy']).cumprod() - 1) * 100
# Initialize account balance for each stock
account_balance = initial_balance
data2['equitycurve'] = account_balance
for index, row in data2.iterrows():
position_size = account_balance * position_size_percent
trade_return = row['Strategy']
trade_result = position_size * trade_return # Assuming trade_return is in decimal form
account_balance += trade_result
data2.at[index, 'equitycurve'] = account_balance
data2['peakcurve'] = data2['equitycurve'].cummax()
#Equity Curve
data['equitycurve'] = 100 * (1 + data.loc[:,'cumreturnsstrat'] / 100)
data['peakcurve'] = data['equitycurve'].cummax()
logger.warning('Attempting to drop NaNs')
data2 = data2.dropna()
data = data.dropna()
#Moved to function
logger.warning('Calling calcData on data2')
data2 = perf.calc(data2)
#data2.head()
logger.warning('Attempting to drop NaNs from data2')
data2 = data2.dropna()
logger.warning('Attempting to set z to processData(data2) (real pricing)')
z = processData(data2)
dx = {
"Max Drawdown" : [data2['maxDailyDrawdown'][-1], data2['drawmaxstrat'][-1]],
"Max Profit" : [data2['cummax'][-1], data2['cummaxstrat'][-1]]
}
res2 = pd.DataFrame(dx)
bug = data.head()
#logger.critical(f'data: {bug}')
dz = {
"Num Positions" : [1, signals.signal(data, strat)[1]],
"PnL" : [(1 + data2['cumreturns'][-1] / 100), (1 + data2.loc[:,'cumreturnsstrat'][-1] / 100)]
}
#logger.critical(f'DZ DZ:{dz}')
res3 = pd.DataFrame(dz)
#0 = hold 1 = strat
logger.warning('Attempting to set a new dataFrame res to pd.DataFrame(z) where z is results')
res1 = pd.DataFrame(z)
res = pd.concat([res2, res1], axis = 1)
res = pd.concat([res, res3], axis = 1)
logger.warning('Attempting to concatanate ans along with res')
ans = pd.concat([ans, res], axis = 0)
logger.warning('Attempting to plot slopes and pos')
plt.style.use('dark_background')
indx += 2
# Create a new figure for each stock with a 2x2 grid of subplots
if(plotMe):
logger.warning('Attempting to plot things')
fig, axs = plt.subplots(2, 2, figsize=(24, 16))
axs = axs.flatten() # Flatten the array for easier indexing
data['support_slope'].plot(ax = axs[0], figsize=(24,16), title=f"{stock} SLOPES", label='Support Slope', color='green', legend = True)
data['resist_slope'].plot(ax = axs[0], label='Resistance Slope', color='red', legend = True)
data2['position'].plot(ax = axs[0], label='position', color='white', legend = True, secondary_y= True)
data2['Close'].plot(ax = axs[1], figsize=(24, 16), title=f"{stock} Close", label='Close', color='blue', legend = True)
data2['position'].plot(ax = axs[1], label='position', color='white', legend = True, secondary_y= True)
data2['maxDailyDrawdown'].plot(ax = axs[2], figsize=(24,16), title=f"DRAWDOWN B&H vs Strategy {stock}", fontsize=12, legend = True)
data2['drawmaxstrat'].plot(ax = axs[2], legend = True)
data2['equitycurve'].plot(ax = axs[3], figsize=(24,16), title=f"Strategy Equity curve vs B&H curve {stock}", fontsize=12, legend = True)
(100 + data2['cumreturns']).plot(ax = axs[3], legend = True)
if(plotMe):
for x in plt.get_fignums():
plt.figure(x)
plt.show()
logger.warning('Attempting to save to sql')
stringPlaceHolder = "".join(stocks)
#DO NOT CHANGE THIS
#DO NOT CHANGE THIS FORMATTING
#DO NOT CHANGE THIS
d1 = dateFrom.replace('-', "")
d2 = dateTO.replace('-', "")
table_name = f"strat_{strat}_view_{view}_lookback_{lookback}_from_{d1}_to_{d2}_stocks_{stringPlaceHolder}"
table_name = table_name.replace('.', "")
ans.to_sql(table_name, conn, if_exists='replace', index=False)
conn.close()
logger.warning('Attempting to return ans')
return ans