diff --git a/smartmoneyconcepts/smc.py b/smartmoneyconcepts/smc.py index 9d6e1fb..034ca28 100644 --- a/smartmoneyconcepts/smc.py +++ b/smartmoneyconcepts/smc.py @@ -1,9 +1,13 @@ +from enum import Enum from functools import wraps import pandas as pd import numpy as np from pandas import DataFrame, Series from datetime import datetime +# Docs: https://github.com/joshyattridge/smart-money-concepts/blob/master/README.md + + def inputvalidator(input_="ohlc"): def dfcheck(func): @wraps(func) @@ -11,7 +15,8 @@ def wrap(*args, **kwargs): args = list(args) i = 0 if isinstance(args[0], pd.DataFrame) else 1 - args[i] = args[i].rename(columns={c: c.lower() for c in args[i].columns}) + args[i] = args[i].rename( + columns={c: c.lower() for c in args[i].columns}) inputs = { "o": "open", @@ -27,7 +32,8 @@ def wrap(*args, **kwargs): for l in input_: if inputs[l] not in args[i].columns: raise LookupError( - 'Must have a dataframe column named "{0}"'.format(inputs[l]) + 'Must have a dataframe column named "{0}"'.format( + inputs[l]) ) return func(*args, **kwargs) @@ -71,11 +77,11 @@ def fvg(cls, ohlc: DataFrame, join_consecutive=False) -> Series: fvg = np.where( ( - (ohlc["high"].shift(1) < ohlc["low"].shift(-1)) + (ohlc["high"].shift(2) < ohlc["low"]) & (ohlc["close"] > ohlc["open"]) ) | ( - (ohlc["low"].shift(1) > ohlc["high"].shift(-1)) + (ohlc["low"].shift(2) > ohlc["high"]) & (ohlc["close"] < ohlc["open"]) ), np.where(ohlc["close"] > ohlc["open"], 1, -1), @@ -86,8 +92,8 @@ def fvg(cls, ohlc: DataFrame, join_consecutive=False) -> Series: ~np.isnan(fvg), np.where( ohlc["close"] > ohlc["open"], - ohlc["low"].shift(-1), - ohlc["low"].shift(1), + ohlc["low"], + ohlc["low"].shift(2), ), np.nan, ) @@ -96,8 +102,8 @@ def fvg(cls, ohlc: DataFrame, join_consecutive=False) -> Series: ~np.isnan(fvg), np.where( ohlc["close"] > ohlc["open"], - ohlc["high"].shift(1), - ohlc["high"].shift(-1), + ohlc["high"].shift(2), + ohlc["high"], ), np.nan, ) @@ -114,109 +120,254 @@ def fvg(cls, ohlc: DataFrame, join_consecutive=False) -> Series: for i in np.where(~np.isnan(fvg))[0]: mask = np.zeros(len(ohlc), dtype=np.bool_) if fvg[i] == 1: - mask = ohlc["low"][i + 2 :] <= top[i] + mask = ohlc["low"][i + 2:] <= top[i] elif fvg[i] == -1: - mask = ohlc["high"][i + 2 :] >= bottom[i] + mask = ohlc["high"][i + 2:] >= bottom[i] if np.any(mask): j = np.argmax(mask) + i + 2 mitigated_index[i] = j - mitigated_index = np.where(np.isnan(fvg), np.nan, mitigated_index) - - return pd.concat( - [ - pd.Series(fvg, name="FVG"), - pd.Series(top, name="Top"), - pd.Series(bottom, name="Bottom"), - pd.Series(mitigated_index, name="MitigatedIndex"), - ], + return pd.concat([ + pd.Series(fvg, name="FVG"), + pd.Series(top, name="Top"), + pd.Series(bottom, name="Bottom"), + pd.Series(mitigated_index, name="MitigatedIndex"), + ], axis=1, ) + class SwingMethodEvaluator(Enum): + COMBINED = "combined" + FRACTALS = "fractals" + MOMENTUM = "momentum" + WEIGHTED_ROLLING_WINDOW = "weighted_rolling_window" + DEFAULT = "default" + @classmethod - def swing_highs_lows(cls, ohlc: DataFrame, swing_length: int = 50) -> Series: + def swing_highs_lows(cls, ohlc: DataFrame, swing_evaluator: SwingMethodEvaluator = SwingMethodEvaluator.DEFAULT, + swing_length: int = 10, short_swing_length: int = 10, long_swing_length=50) -> Series: """ - Swing Highs and Lows - A swing high is when the current high is the highest high out of the swing_length amount of candles before and after. - A swing low is when the current low is the lowest low out of the swing_length amount of candles before and after. + Swing Highs and Lows without lookahead bias. + + A swing high is when the current high is the highest high out of + the last `swing_length` candles (including itself). + A swing low is when the current low is the lowest low out of + the last `swing_length` candles (including itself). parameters: - swing_length: int - the amount of candles to look back and forward to determine the swing high or low + swing_length: int - Number of candles to look back to determine swings. + short_swing_length: int - Number of candles for short-term swings. + long_swing_length: int - Number of candles for long-term swings. returns: HighLow = 1 if swing high, -1 if swing low Level = the level of the swing high or low """ - - swing_length *= 2 - # set the highs to 1 if the current high is the highest high in the last 5 candles and next 5 candles - swing_highs_lows = np.where( - ohlc["high"] - == ohlc["high"].shift(-(swing_length // 2)).rolling(swing_length).max(), - 1, - np.where( - ohlc["low"] - == ohlc["low"].shift(-(swing_length // 2)).rolling(swing_length).min(), + def fractals(): + """ + Identifies swing highs/lows based on a smaller subset of candles rather than a fixed-length rolling window. + + How it helps: Fractals use fewer candles (e.g., just the previous two and the next two), which reduces lag. + + Trade-off: Less robust for noisy markets. + """ + swing_highs = np.where( + (ohlc["high"] > ohlc["high"].shift(1)) & + (ohlc["high"] > ohlc["high"].shift(2)) & + (ohlc["high"] > ohlc["high"].rolling(window=swing_length, center=False).max().shift(1)), + 1, + np.nan, + ) + swing_lows = np.where( + (ohlc["low"] < ohlc["low"].shift(1)) & + (ohlc["low"] < ohlc["low"].shift(2)) & + (ohlc["low"] < ohlc["low"].rolling(window=swing_length, center=False).min().shift(1)), -1, np.nan, - ), - ) - - while True: - positions = np.where(~np.isnan(swing_highs_lows))[0] - - if len(positions) < 2: - break - - current = swing_highs_lows[positions[:-1]] - next = swing_highs_lows[positions[1:]] - - highs = ohlc["high"].iloc[positions[:-1]].values - lows = ohlc["low"].iloc[positions[:-1]].values + ) + # Combine swing highs and lows + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) + + # Determine swing levels + level = np.where( + swing_highs_lows == 1, + ohlc["high"], # Level for swing highs + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), + ) + return swing_highs_lows, level + + def momentum(): + """ + Use momentum (e.g., rate of change or RSI) to confirm swing points earlier by identifying + trends or overbought/oversold conditions. + + How it helps: This can confirm swing points without waiting for the full swing_length. + """ + + ohlc["momentum"] = ohlc["close"].diff(swing_length) + swing_highs = np.where( + (ohlc["high"] == ohlc["high"].rolling(window=swing_length).max()) & + (ohlc["momentum"] > 0), + 1, + np.nan, + ) + swing_lows = np.where( + (ohlc["low"] == ohlc["low"].rolling(window=swing_length).min()) & + (ohlc["momentum"] < 0), + -1, + np.nan, + ) + # Combine swing highs and lows + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) + + # Determine swing levels + level = np.where( + swing_highs_lows == 1, + ohlc["high"], # Level for swing highs + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), + ) + return swing_highs_lows, level - next_highs = ohlc["high"].iloc[positions[1:]].values - next_lows = ohlc["low"].iloc[positions[1:]].values + def weighted_rolling_window(): + """ + Instead of treating all candles equally in the rolling window, give more weight to recent candles. - index_to_remove = np.zeros(len(positions), dtype=bool) + How it helps: This makes the indicator react faster to recent price movements. - consecutive_highs = (current == 1) & (next == 1) - index_to_remove[:-1] |= consecutive_highs & (highs < next_highs) - index_to_remove[1:] |= consecutive_highs & (highs >= next_highs) + Trade-off: Swings may not strictly align with traditional high/low definitions. + """ - consecutive_lows = (current == -1) & (next == -1) - index_to_remove[:-1] |= consecutive_lows & (lows > next_lows) - index_to_remove[1:] |= consecutive_lows & (lows <= next_lows) + ohlc["high_ema"] = ohlc["high"].ewm(span=swing_length).mean() + ohlc["low_ema"] = ohlc["low"].ewm(span=swing_length).mean() - if not index_to_remove.any(): - break + swing_highs = np.where(ohlc["high"] >= ohlc["high_ema"], 1, np.nan) + swing_lows = np.where(ohlc["low"] <= ohlc["low_ema"], -1, np.nan) + # Combine swing highs and lows + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) - swing_highs_lows[positions[index_to_remove]] = np.nan + # Determine swing levels + level = np.where( + swing_highs_lows == 1, + ohlc["high"], # Level for swing highs + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), + ) + return swing_highs_lows, level + + def combined(): + """ + Combine Short and Long Swing Lengths for Swing Highs and Lows. + + Parameters: + ohlc: DataFrame - Contains columns 'high' and 'low'. + short_swing_length: int - Number of candles for short-term swings. + long_swing_length: int - Number of candles for long-term swings. + """ + + # Short-term swing highs and lows + short_highs = ohlc["high"].rolling( + window=short_swing_length, min_periods=1).max() + short_lows = ohlc["low"].rolling( + window=short_swing_length, min_periods=1).min() + + short_swing_highs = np.where( + ohlc["high"] == short_highs, 1, np.nan) + short_swing_lows = np.where(ohlc["low"] == short_lows, -1, np.nan) + + # Long-term swing highs and lows + long_highs = ohlc["high"].rolling( + window=long_swing_length, min_periods=1).max() + long_lows = ohlc["low"].rolling( + window=long_swing_length, min_periods=1).min() + + long_swing_highs = np.where(ohlc["high"] == long_highs, 2, np.nan) + long_swing_lows = np.where(ohlc["low"] == long_lows, -2, np.nan) + + # Combine short-term and long-term swings + swing_highs_lows = ( + np.nan_to_num(short_swing_highs) + + np.nan_to_num(short_swing_lows) + + np.nan_to_num(long_swing_highs) + + np.nan_to_num(long_swing_lows) + ) - positions = np.where(~np.isnan(swing_highs_lows))[0] + # Determine swing levels + level = np.where( + swing_highs_lows == 1, ohlc["high"], # Short-term high + np.where( + swing_highs_lows == -1, ohlc["low"], # Short-term low + np.where( + swing_highs_lows == 2, ohlc["high"], # Long-term high + np.where(swing_highs_lows == -2, + ohlc["low"], np.nan) # Long-term low + ) + ) + ) + return swing_highs_lows, level + + def default(): + """ + Swing Highs and Lows without lookahead bias. + + A swing high is when the current high is the highest high out of + the last `swing_length` candles (including itself). + A swing low is when the current low is the lowest low out of + the last `swing_length` candles (including itself). + """ + + # Calculate swing highs + swing_highs = np.where( + ohlc["high"] == ohlc["high"].rolling( + window=swing_length, min_periods=1).max(), + 1, + np.nan, + ) - if len(positions) > 0: - if swing_highs_lows[positions[0]] == 1: - swing_highs_lows[0] = -1 - if swing_highs_lows[positions[0]] == -1: - swing_highs_lows[0] = 1 - if swing_highs_lows[positions[-1]] == -1: - swing_highs_lows[-1] = 1 - if swing_highs_lows[positions[-1]] == 1: - swing_highs_lows[-1] = -1 + # Calculate swing lows + swing_lows = np.where( + ohlc["low"] == ohlc["low"].rolling( + window=swing_length, min_periods=1).min(), + -1, + np.nan, + ) - level = np.where( - ~np.isnan(swing_highs_lows), - np.where(swing_highs_lows == 1, ohlc["high"], ohlc["low"]), - np.nan, - ) + # Combine swing highs and lows + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) - return pd.concat( - [ - pd.Series(swing_highs_lows, name="HighLow"), - pd.Series(level, name="Level"), - ], - axis=1, - ) + # Determine swing levels + level = np.where( + swing_highs_lows == 1, + ohlc["high"], # Level for swing highs + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), + ) + return swing_highs_lows, level + + match swing_evaluator.value: + case "momentum": + swing_highs_lows, level = momentum() + case "weighted_rolling_window": + swing_highs_lows, level = weighted_rolling_window() + case "fractals": + swing_highs_lows, level = fractals() + case "combined": + swing_highs_lows, level = combined() + case "default": + swing_highs_lows, level = default() + case _: + swing_highs_lows, level = default() + + # Return results as a DataFrame + return pd.DataFrame({ + "HighLow": swing_highs_lows, + "Level": level, + }) @classmethod def bos_choch( @@ -337,10 +488,10 @@ def bos_choch( mask = np.zeros(len(ohlc), dtype=np.bool_) # if the bos is 1 then check if the candles high has gone above the level if bos[i] == 1 or choch[i] == 1: - mask = ohlc["close" if close_break else "high"][i + 2 :] > level[i] + mask = ohlc["close" if close_break else "high"][i + 2:] > level[i] # if the bos is -1 then check if the candles low has gone below the level elif bos[i] == -1 or choch[i] == -1: - mask = ohlc["close" if close_break else "low"][i + 2 :] < level[i] + mask = ohlc["close" if close_break else "low"][i + 2:] < level[i] if np.any(mask): j = np.argmax(mask) + i + 2 broken[i] = j @@ -482,7 +633,8 @@ def ob( + _volume[close_index - 2] ) lowVolume[obIndex] = _volume[close_index - 2] - highVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + highVolume[obIndex] = _volume[close_index] + \ + _volume[close_index - 1] percentage[obIndex] = ( np.min([highVolume[obIndex], lowVolume[obIndex]], axis=0) / np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) @@ -556,7 +708,8 @@ def ob( + _volume[close_index - 1] + _volume[close_index - 2] ) - lowVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + lowVolume[obIndex] = _volume[close_index] + \ + _volume[close_index - 1] highVolume[obIndex] = _volume[close_index - 2] percentage[obIndex] = ( np.min([highVolume[obIndex], lowVolume[obIndex]], axis=0) @@ -576,7 +729,8 @@ def ob( top_series = pd.Series(top, name="Top") bottom_series = pd.Series(bottom, name="Bottom") obVolume_series = pd.Series(obVolume, name="OBVolume") - mitigated_index_series = pd.Series(mitigated_index, name="MitigatedIndex") + mitigated_index_series = pd.Series( + mitigated_index, name="MitigatedIndex") percentage_series = pd.Series(percentage, name="Percentage") return pd.concat( @@ -637,13 +791,15 @@ def liquidity( and range_low <= swing_highs_lows["Level"][c] <= range_high ): end = c - temp_liquidity_level.append(swing_highs_lows["Level"][c]) + temp_liquidity_level.append( + swing_highs_lows["Level"][c]) swing_highs_lows.loc[c, "HighLow"] = 0 if ohlc["high"].iloc[c] >= range_high: swept = c break if len(temp_liquidity_level) > 1: - average_high = sum(temp_liquidity_level) / len(temp_liquidity_level) + average_high = sum(temp_liquidity_level) / \ + len(temp_liquidity_level) liquidity[i] = 1 liquidity_level[i] = average_high liquidity_end[i] = end @@ -665,22 +821,26 @@ def liquidity( and range_low <= swing_highs_lows["Level"][c] <= range_high ): end = c - temp_liquidity_level.append(swing_highs_lows["Level"][c]) + temp_liquidity_level.append( + swing_highs_lows["Level"][c]) swing_highs_lows.loc[c, "HighLow"] = 0 if ohlc["low"].iloc[c] <= range_low: swept = c break if len(temp_liquidity_level) > 1: - average_low = sum(temp_liquidity_level) / len(temp_liquidity_level) + average_low = sum(temp_liquidity_level) / \ + len(temp_liquidity_level) liquidity[i] = -1 liquidity_level[i] = average_low liquidity_end[i] = end liquidity_swept[i] = swept liquidity = np.where(liquidity != 0, liquidity, np.nan) - liquidity_level = np.where(~np.isnan(liquidity), liquidity_level, np.nan) + liquidity_level = np.where( + ~np.isnan(liquidity), liquidity_level, np.nan) liquidity_end = np.where(~np.isnan(liquidity), liquidity_end, np.nan) - liquidity_swept = np.where(~np.isnan(liquidity), liquidity_swept, np.nan) + liquidity_swept = np.where( + ~np.isnan(liquidity), liquidity_swept, np.nan) liquidity = pd.Series(liquidity, name="Liquidity") level = pd.Series(liquidity_level, name="Level") @@ -738,7 +898,7 @@ def previous_high_low(cls, ohlc: DataFrame, time_frame: str = "1D") -> Series: currently_broken_low = False last_broken_time = resampled_previous_index - previous_high[i] = resampled_ohlc["high"].iloc[resampled_previous_index] + previous_high[i] = resampled_ohlc["high"].iloc[resampled_previous_index] previous_low[i] = resampled_ohlc["low"].iloc[resampled_previous_index] currently_broken_high = ohlc["high"].iloc[i] > previous_high[i] or currently_broken_high currently_broken_low = ohlc["low"].iloc[i] < previous_low[i] or currently_broken_low @@ -848,7 +1008,8 @@ def sessions( and (start_time <= current_time or current_time <= end_time) ): active[i] = 1 - high[i] = max(ohlc["high"].iloc[i], high[i - 1] if i > 0 else 0) + high[i] = max(ohlc["high"].iloc[i], + high[i - 1] if i > 0 else 0) low[i] = min( ohlc["low"].iloc[i], low[i - 1] if i > 0 and low[i - 1] != 0 else float("inf"), @@ -897,7 +1058,8 @@ def retracements(cls, ohlc: DataFrame, swing_highs_lows: DataFrame) -> Series: if direction[i - 1] == 1: current_retracement[i] = round( - 100 - (((ohlc["low"].iloc[i] - bottom) / (top - bottom)) * 100), 1 + 100 - (((ohlc["low"].iloc[i] - bottom) / + (top - bottom)) * 100), 1 ) deepest_retracement[i] = max( ( @@ -909,7 +1071,8 @@ def retracements(cls, ohlc: DataFrame, swing_highs_lows: DataFrame) -> Series: ) if direction[i] == -1: current_retracement[i] = round( - 100 - ((ohlc["high"].iloc[i] - top) / (bottom - top)) * 100, 1 + 100 - ((ohlc["high"].iloc[i] - top) / + (bottom - top)) * 100, 1 ) deepest_retracement[i] = max( ( @@ -942,7 +1105,9 @@ def retracements(cls, ohlc: DataFrame, swing_highs_lows: DataFrame) -> Series: break direction = pd.Series(direction, name="Direction") - current_retracement = pd.Series(current_retracement, name="CurrentRetracement%") - deepest_retracement = pd.Series(deepest_retracement, name="DeepestRetracement%") + current_retracement = pd.Series( + current_retracement, name="CurrentRetracement%") + deepest_retracement = pd.Series( + deepest_retracement, name="DeepestRetracement%") return pd.concat([direction, current_retracement, deepest_retracement], axis=1) diff --git a/tests/SMC.test.py b/tests/SMC.test.py index 7d5c8eb..1e0d828 100644 --- a/tests/SMC.test.py +++ b/tests/SMC.test.py @@ -9,6 +9,7 @@ import imageio from io import BytesIO from PIL import Image +from tqdm import tqdm sys.path.append(os.path.abspath("../")) from smartmoneyconcepts.smc import smc @@ -441,7 +442,7 @@ def import_data(symbol, start_str, timeframe): df = df.iloc[-500:] def fig_to_buffer(fig): - fig_bytes = fig.to_image(format="png") + fig_bytes = fig.to_image(format="png", scale=5) fig_buffer = BytesIO(fig_bytes) fig_image = Image.open(fig_buffer) return np.array(fig_image) @@ -450,7 +451,7 @@ def fig_to_buffer(fig): gif = [] window = 100 -for pos in range(window, len(df)): +for pos in tqdm(range(window, len(df))): window_df = df.iloc[pos - window : pos] fig = go.Figure(