Skip to content

Commit

Permalink
modified: .gitignore
Browse files Browse the repository at this point in the history
	modified:   bot.py
	modified:   cogs/admin.py
	modified:   cogs/afk.py
	modified:   cogs/dvp.py
	modified:   cogs/gpt.py
	modified:   cogs/message_logger.py
	modified:   cogs/preview.py
	modified:   cogs/rate.py
	modified:   cogs/spc.py
	modified:   cogs/stats.py
	modified:   cogs/uptime.py
	modified:   cogs/user.py
	modified:   ecosystem.config.js
	modified:   logger.py
	modified:   manual_test.py
	modified:   twitch_helix_client.py
	modified:   utils.py
  • Loading branch information
Revulate committed Oct 17, 2024
1 parent e83abe2 commit afa0390
Show file tree
Hide file tree
Showing 18 changed files with 360 additions and 299 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ __pycache__/
*.log
*.log.1
*.db
bot.log.*
*.db-journal

# Virtual Environment
venv/
Expand Down
182 changes: 95 additions & 87 deletions bot.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import os
import sys
import asyncio
import datetime
from twitchio.ext import commands
from dotenv import load_dotenv
from logger import setup_logger
from logger import log_info, log_error, log_warning, log_debug, get_logger, set_log_level
from utils import setup_database, get_database_connection
from twitch_helix_client import TwitchAPI

# Add the virtual environment's site-packages to sys.path
venv_path = os.getenv("PYTHONPATH")
if venv_path:
sys.path.append(venv_path)

# Load environment variables
load_dotenv()

Expand All @@ -31,13 +37,13 @@

class TwitchBot(commands.Bot):
def __init__(self):
self.logger = setup_logger("twitch_bot")
self.logger = get_logger("twitch_bot")
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
set_log_level(log_level)

# Use environment variables
self.token = os.getenv("ACCESS_TOKEN")
self.client_id = os.getenv("TWITCH_CLIENT_ID")
self.client_secret = os.getenv("TWITCH_CLIENT_SECRET")
self.refresh_token = os.getenv("REFRESH_TOKEN")
nick = os.getenv("BOT_NICK")
prefix = os.getenv("COMMAND_PREFIX", "#")
channels = os.getenv("TWITCH_CHANNELS", "").strip().split(",")
Expand All @@ -46,8 +52,15 @@ def __init__(self):
# Check for missing critical environment variables
self._check_env_variables()

# Initialize TwitchAPI
redirect_uri = os.getenv("TWITCH_REDIRECT_URI", "http://localhost:3000")
self.twitch_api = TwitchAPI(self.client_id, self.client_secret, redirect_uri)

# Ensure token is valid before initializing the bot
asyncio.create_task(self.twitch_api.ensure_token_valid())

super().__init__(
token=self.token,
token=self.twitch_api.oauth_token,
client_id=self.client_id,
nick=nick,
prefix=prefix,
Expand All @@ -58,21 +71,16 @@ def __init__(self):
self.http_session = None
self.token_check_task = None

# Initialize TwitchAPI
redirect_uri = os.getenv("TWITCH_REDIRECT_URI", "http://localhost:3000")
self.twitch_api = TwitchAPI(self.client_id, self.client_secret, redirect_uri)
self.twitch_api.oauth_token = self.token
self.twitch_api.refresh_token = self.refresh_token
self.logger.info("TwitchAPI instance created and tokens saved")
log_info("TwitchAPI instance created and tokens saved")

self._connection_retries = 0
self._max_retries = 5
self._closing = asyncio.Event()
self.cog_tasks = []

async def event_ready(self):
self.logger.info(f"Logged in as | {self.nick}")
await self.ensure_valid_token()
log_info(f"Logged in as | {self.nick}")
await self.twitch_api.ensure_token_valid()
await self.fetch_user_id()
await self.fetch_example_streams()
self.load_modules()
Expand All @@ -83,51 +91,52 @@ async def event_ready(self):
# Test API call
try:
user_info = await self.twitch_api.get_users([self.nick])
self.logger.info(f"Successfully fetched user info: {user_info}")
log_info(f"Successfully fetched user info: {user_info}")
except Exception as e:
self.logger.error(f"Error fetching user info: {e}")
log_error(f"Error fetching user info: {e}")

# Check token status
self.logger.info(f"Current access token: {self.twitch_api.oauth_token[:10]}...")
self.logger.info(
log_info(f"Current access token: {self.twitch_api.oauth_token[:10]}...")
log_info(
f"Current refresh token: {self.twitch_api.refresh_token[:10]}..."
if self.twitch_api.refresh_token
else "No refresh token"
)
self.logger.info(f"Token expiry: {self.twitch_api.token_expiry}")
log_info(f"Token expiry: {self.twitch_api.token_expiry}")

async def start(self):
await self.ensure_valid_token()
await self.twitch_api.ensure_token_valid()
self.token = self.twitch_api.oauth_token # Update the token before starting
while self._connection_retries < self._max_retries:
try:
await super().start()
break
except Exception as e:
self._connection_retries += 1
self.logger.error(f"Connection attempt {self._connection_retries} failed: {e}")
log_error(f"Connection attempt {self._connection_retries} failed: {e}")
if self._connection_retries < self._max_retries:
await asyncio.sleep(5 * self._connection_retries) # Exponential backoff
else:
self.logger.error("Max retries reached. Unable to connect.")
log_error("Max retries reached. Unable to connect.")
raise

async def close(self):
self._closing.set()
try:
for task in self.cog_tasks:
task.cancel()
await asyncio.gather(*self.cog_tasks, return_exceptions=True)
if task:
task.cancel()
await asyncio.gather(*[t for t in self.cog_tasks if t], return_exceptions=True)

if self._connection and hasattr(self._connection, "_close"):
await self._connection._close()
if hasattr(self, "_http") and self._http:
await self._http.close()
if self.http_session:
await self.http_session.close()
if self.twitch_api:
await self.twitch_api.close()
await self.twitch_api.close()
except Exception as e:
self.logger.error(f"Error during close: {e}")
log_error(f"Error during close: {e}")
finally:
await super().close()

Expand All @@ -137,101 +146,66 @@ async def _close_cogs(self):
try:
await cog.cog_unload()
except Exception as e:
self.logger.error(f"Error unloading cog {name}: {e}")

async def ensure_valid_token(self):
if not self.twitch_api.token_expiry or self.twitch_api.token_expiry <= datetime.datetime.now():
self.logger.info("Token expired or close to expiry. Refreshing...")
try:
success = await asyncio.wait_for(self.twitch_api.refresh_oauth_token(), timeout=10)
if success:
self.token = self.twitch_api.oauth_token
self.refresh_token = self.twitch_api.refresh_token
self._update_env_file()
load_dotenv(override=True)
self.logger.info("Access token refreshed successfully")
else:
self.logger.error("Failed to refresh access token")
except asyncio.TimeoutError:
self.logger.error("Token refresh timed out")
except Exception as e:
self.logger.error(f"Error during token refresh: {e}")
return True

def _update_env_file(self):
env_path = ".env"
with open(env_path, "r") as file:
lines = file.readlines()

with open(env_path, "w") as file:
for line in lines:
if line.startswith("ACCESS_TOKEN="):
file.write(f"ACCESS_TOKEN={self.token}\n")
elif line.startswith("REFRESH_TOKEN="):
file.write(f"REFRESH_TOKEN={self.refresh_token}\n")
else:
file.write(line)
log_error(f"Error unloading cog {name}: {e}")

def _check_env_variables(self):
"""Check for missing critical environment variables."""
required_vars = ["ACCESS_TOKEN", "TWITCH_CLIENT_ID", "TWITCH_CLIENT_SECRET", "BOT_NICK", "REFRESH_TOKEN"]
required_vars = ["TWITCH_CLIENT_ID", "TWITCH_CLIENT_SECRET", "BOT_NICK"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
error_msg = f"The following environment variables are missing: {', '.join(missing_vars)}"
self.logger.error(error_msg)
log_error(error_msg)
raise ValueError(error_msg)

async def fetch_user_id(self):
retries = 3
base_delay = 5
for attempt in range(1, retries + 1):
try:
users = await self.fetch_users(names=[self.nick])
if users:
self.bot_user_id = users[0].id
self.logger.info(f"User ID is | {self.bot_user_id}")
users = await self.twitch_api.get_users([self.nick])
if users and users.get("data"):
self.bot_user_id = users["data"][0]["id"]
log_info(f"User ID is | {self.bot_user_id}")
return
else:
self.logger.error("Failed to fetch user data.")
log_error("Failed to fetch user data.")
except Exception as e:
self.logger.error(f"Attempt {attempt} - Error fetching user data: {e}", exc_info=True)
log_error(f"Attempt {attempt} - Error fetching user data: {e}", exc_info=True)

if attempt < retries:
delay = base_delay * (2 ** (attempt - 1))
self.logger.info(f"Retrying to fetch user data in {delay:.2f} seconds...")
log_info(f"Retrying to fetch user data in {delay:.2f} seconds...")
await asyncio.sleep(delay)

self.logger.error("Exceeded maximum retries to fetch user data.")
log_error("Exceeded maximum retries to fetch user data.")

async def fetch_example_streams(self):
try:
streams = await self.twitch_api.get_streams(["afro", "cohhcarnage"])
for stream in streams.get("data", []):
self.logger.info(
f"Stream found: {stream.get('user_name')} is live with {stream.get('viewer_count')} viewers."
)
log_info(f"Stream found: {stream.get('user_name')} is live with {stream.get('viewer_count')} viewers.")
except Exception as e:
self.logger.error(f"Error fetching streams: {e}", exc_info=True)
log_error(f"Error fetching streams: {e}", exc_info=True)

def load_modules(self):
for cog in COGS:
try:
self.logger.info(f"Attempting to load extension: {cog}")
log_info(f"Attempting to load extension: {cog}")
self.load_module(cog)
self.logger.info(f"Loaded extension: {cog}")
log_info(f"Loaded extension: {cog}")
if hasattr(self.get_cog(cog.split(".")[-1]), "initialize"):
task = asyncio.create_task(self.get_cog(cog.split(".")[-1]).initialize())
self.cog_tasks.append(task)
except Exception as e:
self.logger.error(f"Failed to load extension {cog}: {e}")
log_error(f"Failed to load extension {cog}: {e}")

async def join_channels(self):
try:
self.logger.info(f"Attempting to join channels: {self.initial_channels}")
log_info(f"Attempting to join channels: {self.initial_channels}")
await self._connection.join_channels(self.initial_channels)
self.logger.info(f"Successfully joined channels: {self.initial_channels}")
log_info(f"Successfully joined channels: {self.initial_channels}")
except Exception as e:
self.logger.error(f"Failed to join channels: {e}")
log_error(f"Failed to join channels: {e}")

@commands.command(name="listcommands")
async def list_commands(self, ctx: commands.Context):
Expand All @@ -242,26 +216,60 @@ async def event_error(self, error: Exception, data: str = None):
"""
Handles errors in the bot's event loop.
"""
self.logger.error(f"Error in event loop: {error}")
self.logger.error(f"Error traceback: {error.__traceback__}")
log_error(f"Error in event loop: {error}")
log_error(f"Error traceback: {error.__traceback__}")
if data:
self.logger.error(f"Error data: {data}")
log_error(f"Error data: {data}")

async def check_token_regularly(self):
while not self._closing.is_set():
await asyncio.sleep(3600) # Check every hour
await self.ensure_valid_token()
try:
await self.twitch_api.ensure_token_valid()
# Test the token with a simple API call
user_info = await self.twitch_api.get_users([self.nick])
if not user_info:
raise Exception("API call failed after token refresh")
self.token = self.twitch_api.oauth_token # Update the bot's token
except Exception as e:
log_error(f"Token validation failed: {e}")
# Force a token refresh
self.twitch_api.token_expiry = None
await self.twitch_api.ensure_token_valid()
self.token = self.twitch_api.oauth_token # Update the bot's token

async def handle_api_failure(self):
log_warning("Entering reduced functionality mode due to API issues")
# Disable features that require API calls
for cog in self.cogs.values():
if hasattr(cog, "disable_api_features"):
cog.disable_api_features()
# Notify channels about the issue
for channel in self.connected_channels:
await channel.send(
"Bot is currently operating with reduced functionality due to API issues. Some features may be unavailable."
)

async def check_bot_state(self):
while not self._closing.is_set():
await asyncio.sleep(300) # Check every 5 minutes
if not self.token or not self._http.token:
log_warning("Inconsistent token state detected. Refreshing token.")
await self.twitch_api.ensure_token_valid()
self.token = self.twitch_api.oauth_token
self._http.token = self.token
# Add other state checks as needed


async def main():
bot = TwitchBot()
try:
await bot.start()
except KeyboardInterrupt:
bot.logger.info("Received keyboard interrupt. Shutting down...")
log_info("Received keyboard interrupt. Shutting down...")
except Exception as e:
bot.logger.error(f"An unexpected error occurred: {e}")
bot.logger.error(f"Error traceback: {e.__traceback__}")
log_error(f"An unexpected error occurred: {e}")
log_error(f"Error traceback: {e.__traceback__}")
finally:
await bot.close()

Expand Down
Loading

0 comments on commit afa0390

Please sign in to comment.