Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logging statements #13

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions src/pycatcher/catch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import logging
import numpy as np
import pandas as pd
import statsmodels.api as sm
Expand All @@ -8,6 +9,9 @@
from sklearn.base import BaseEstimator
from statsmodels.tsa.stattools import acf

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def find_outliers_iqr(df: pd.DataFrame) -> pd.DataFrame:
"""
Expand All @@ -21,6 +25,8 @@ def find_outliers_iqr(df: pd.DataFrame) -> pd.DataFrame:
pd.DataFrame: A DataFrame containing the rows that are considered outliers.
"""

logging.info("Detecting outliers using the IQR method.")

# Calculate Q1 (25th percentile) and Q3 (75th percentile) for the second column
q1 = df.iloc[:, 1].quantile(0.25)
q3 = df.iloc[:, 1].quantile(0.75)
Expand All @@ -31,6 +37,8 @@ def find_outliers_iqr(df: pd.DataFrame) -> pd.DataFrame:
# Identify outliers
outliers = df[((df.iloc[:, 1] < (q1 - 1.5 * iqr)) | (df.iloc[:, 1] > (q3 + 1.5 * iqr)))]

logging.info(f"Outliers detected: {len(outliers)} rows.")

return outliers


Expand All @@ -46,6 +54,8 @@ def anomaly_mad(model_type: BaseEstimator) -> pd.DataFrame:
pd.DataFrame: A DataFrame containing the rows identified as outliers.
"""

logging.info("Detecting outliers using the MAD method.")

# Reshape residuals from the fitted model
residuals = model_type.resid.values.reshape(-1, 1)

Expand All @@ -55,7 +65,9 @@ def anomaly_mad(model_type: BaseEstimator) -> pd.DataFrame:
# Identify outliers using MAD labels (1 indicates an outlier)
is_outlier = mad.labels_ == 1

return(is_outlier)
logging.info(f"Outliers detected by MAD")

return is_outlier


def get_residuals(model_type: BaseEstimator) -> np.ndarray:
Expand All @@ -70,10 +82,14 @@ def get_residuals(model_type: BaseEstimator) -> np.ndarray:
np.ndarray: An array of residuals with NaN values removed.
"""

logging.info("Extracting residuals and removing NaN values.")

# Extract residuals from the model and remove NaN values
residuals = model_type.resid.values
residuals_cleaned = residuals[~np.isnan(residuals)]

logging.info(f"Number of residuals after NaN removal: {len(residuals_cleaned)}")

return residuals_cleaned


Expand All @@ -88,12 +104,16 @@ def sum_of_squares(array: np.ndarray) -> float:
float: The sum of squares of the array elements.
"""

logging.info("Calculating the sum of squares.")

# Flatten the array to a 1D array
flattened_array = array.flatten()

# Calculate the sum of squares of the flattened array
sum_of_squares_value = np.sum(flattened_array ** 2)

logging.info(f"Sum of squares calculated: {sum_of_squares_value:.2f}")

return sum_of_squares_value


Expand All @@ -109,6 +129,8 @@ def get_ssacf(residuals: np.ndarray, df_pandas: pd.DataFrame) -> float:
float: The sum of squares of the ACF of the residuals.
"""

logging.info("Calculating the sum of squares of the ACF of residuals.")

# Calculate the number of lags based on the square root of the data length
range_var = len(df_pandas.index)
nlags = math.isqrt(range_var) + 45
Expand All @@ -119,6 +141,8 @@ def get_ssacf(residuals: np.ndarray, df_pandas: pd.DataFrame) -> float:
# Calculate the sum of squares of the ACF values
ssacf = sum_of_squares(acf_array)

logging.info(f"Sum of squares of ACF: {ssacf:.2f}")

return ssacf


Expand All @@ -135,6 +159,8 @@ def detect_outliers_today(df: pd.DataFrame) -> Union[pd.DataFrame, str]:
str: A message indicating no outliers were found today.
"""

logging.info("Detecting today's outliers.")

# Get the DataFrame of outliers from detect_outliers and select the latest row
df_outliers = detect_outliers(df)
df_last_outlier = df_outliers.tail(1)
Expand All @@ -147,8 +173,10 @@ def detect_outliers_today(df: pd.DataFrame) -> Union[pd.DataFrame, str]:

# Check if the latest outlier occurred today
if last_outlier_date == current_date:
logging.info("Outliers detected today.")
return df_last_outlier
else:
logging.info("No outliers detected today.")
return "No Outliers Today!"


Expand All @@ -163,8 +191,14 @@ def detect_outliers_latest(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: A DataFrame containing the latest detected outlier.
"""

logging.info("Detecting the latest outliers.")

df_outliers = detect_outliers(df)
df_latest_outlier = df_outliers.tail(1)

logging.info("Detected the latest outlier!")

return df_latest_outlier


Expand All @@ -181,19 +215,25 @@ def detect_outliers(df: pd.DataFrame) -> Union[pd.DataFrame, str]:
Returns:
str or pd.DataFrame: A message with None found or a DataFrame with detected outliers.
"""

logging.info("Starting outlier detection.")

# Creating a shallow copy of Pandas dataframe to use for seasonal trend
df_pandas = df.copy(deep=False)

# Calculate the length of the time period in years
length_year: float = len(df_pandas.index) / 365.25
print(f"Time-series data in years: {length_year:.2f}")

logging.info(f"Time-series data in years: {length_year:.2f}")

# If the dataset contains at least 2 years of data, use Seasonal Trend Decomposition
if length_year >= 2.0:
logging.info("Using seasonal trend decomposition for outlier detection.")
return _decompose_and_detect(df_pandas)

# If less than 2 years of data, use Inter Quartile Range (IQR) method
else:
logging.info("Using IQR method for outlier detection.")
return _detect_outliers_iqr(df)


Expand All @@ -209,6 +249,8 @@ def _decompose_and_detect(df_pandas: pd.DataFrame) -> Union[pd.DataFrame, str]:
str or pd.DataFrame: A message or a DataFrame with detected outliers.
"""

logging.info("Decomposing time-series for additive and multiplicative models.")

# Ensure the first column is in datetime format and set it as index
df_pandas.iloc[:, 0] = pd.to_datetime(df_pandas.iloc[:, 0])
df_pandas = df_pandas.set_index(df_pandas.columns[0]).asfreq('D').dropna()
Expand All @@ -227,18 +269,21 @@ def _decompose_and_detect(df_pandas: pd.DataFrame) -> Union[pd.DataFrame, str]:

# Return the outliers detected by the model with the smaller ACF value
if ssacf_add < ssacf_mul:
print("Additive Model")
logging.info("Using the additive model for outlier detection.")
is_outlier = anomaly_mad(decomposition_add)
else:
print("Multiplicative Model")
logging.info("Using the multiplicative model for outlier detection.")
is_outlier = anomaly_mad(decomposition_mul)

# Use the aligned boolean Series as the indexer
df_outliers = df_pandas[is_outlier]

if df_outliers.empty:
logging.info("No outliers found.")
return "No outliers found."

logging.info(f"Outliers detected: {len(df_outliers)} rows.")

return df_outliers


Expand All @@ -252,10 +297,15 @@ def _detect_outliers_iqr(df_pandas: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: A DataFrame containing the detected outliers.
"""

logging.info("Detecting outliers using the IQR method.")

# Ensure the second column is numeric
df_pandas.iloc[:,1] = pd.to_numeric(df_pandas.iloc[:,1])
df_pandas.iloc[:, 1] = pd.to_numeric(df_pandas.iloc[:, 1])

# Detect outliers using the IQR method
df_outliers: pd.DataFrame = find_outliers_iqr(df_pandas)
print(f"Record Count: {len(df_outliers)}")

logging.info(f"Outliers detected using IQR: {len(df_outliers)} rows.")

return df_outliers
Loading