diff --git a/src/program/__init__.py b/src/program/__init__.py index 8621dc2b..8d37ccec 100644 --- a/src/program/__init__.py +++ b/src/program/__init__.py @@ -1,4 +1,9 @@ -"""Program main module""" - -from program.media.item import MediaItem # noqa: F401 -from program.program import Event, Program # noqa: F401 +"""Program module.""" + +from loguru import logger + +from program.media.item import MediaItem # noqa: F401 +from program.program import Event, Program # noqa: F401 + +# Add custom log levels +logger.level("RELEASE", no=35, color="") diff --git a/src/program/apis/__init__.py b/src/program/apis/__init__.py index 5fd463b6..ef0f8386 100644 --- a/src/program/apis/__init__.py +++ b/src/program/apis/__init__.py @@ -7,7 +7,7 @@ from .overseerr_api import OverseerrAPI, OverseerrAPIError from .plex_api import PlexAPI, PlexAPIError from .trakt_api import TraktAPI, TraktAPIError - +from program.apis.tvmaze_api import TVMazeAPI def bootstrap_apis(): __setup_trakt() @@ -15,10 +15,11 @@ def bootstrap_apis(): __setup_mdblist() __setup_overseerr() __setup_listrr() + __setup_tvmaze() def __setup_trakt(): - traktApi = TraktAPI(settings_manager.settings.content.trakt) - di[TraktAPI] = traktApi + """Setup Trakt API.""" + di[TraktAPI] = TraktAPI(settings_manager.settings.content.trakt) def __setup_plex(): if not settings_manager.settings.updaters.plex.enabled: @@ -43,3 +44,7 @@ def __setup_listrr(): return listrrApi = ListrrAPI(settings_manager.settings.content.listrr.api_key) di[ListrrAPI] = listrrApi + +def __setup_tvmaze(): + """Setup TVMaze API.""" + di[TVMazeAPI] = TVMazeAPI() diff --git a/src/program/apis/trakt_api.py b/src/program/apis/trakt_api.py index 31ffafe3..fbffdbaa 100644 --- a/src/program/apis/trakt_api.py +++ b/src/program/apis/trakt_api.py @@ -1,8 +1,9 @@ import re -from datetime import datetime +from datetime import datetime, timedelta from types import SimpleNamespace from typing import List, Optional, Union from urllib.parse import urlencode +from zoneinfo import ZoneInfo # Import ZoneInfo from requests import RequestException, Session @@ -358,9 +359,51 @@ def _get_imdb_id_from_list(self, namespaces: List[SimpleNamespace], id_type: str return None def _get_formatted_date(self, data, item_type: str) -> Optional[datetime]: - """Get the formatted aired date from the data.""" + """Get the formatted aired date from the data. + Trakt API provides all dates in UTC/GMT format (ISO 8601). + """ if item_type in ["show", "season", "episode"] and (first_aired := getattr(data, "first_aired", None)): - return datetime.strptime(first_aired, "%Y-%m-%dT%H:%M:%S.%fZ") - if item_type == "movie" and (released := getattr(data, "released", None)): - return datetime.strptime(released, "%Y-%m-%d") + try: + # Parse the UTC time directly from Trakt's first_aired field + utc_time = datetime.fromisoformat(first_aired.replace('Z', '+00:00')) + + # Apply release delay if configured + if self.settings.release_delay_minutes > 0: + utc_time = utc_time + timedelta(minutes=self.settings.release_delay_minutes) + logger.debug(f" Adding {self.settings.release_delay_minutes} minute delay to release time") + + logger.debug(f"Time conversion for {getattr(data, 'title', 'Unknown')}:") + logger.debug(f" 1. Raw time from Trakt (UTC): {first_aired}") + logger.debug(f" 2. Parsed UTC time: {utc_time}") + + # Convert to local time for display + local_time = utc_time.astimezone() + logger.debug(f" 3. Your local time will be: {local_time}") + + # Check if we have timezone information from Trakt + airs = getattr(data, "airs", None) + tz = getattr(airs, "timezone", None) if airs else None + if tz: + logger.debug(f" 4. Show timezone from Trakt: {tz}") + try: + # Convert UTC time to show's timezone for reference + show_time = utc_time.astimezone(ZoneInfo(tz)) + logger.debug(f" 5. Time in show's timezone: {show_time}") + except Exception as e: + logger.error(f"Failed to convert to show timezone: {e}") + + return utc_time + except (ValueError, TypeError) as e: + logger.error(f"Failed to parse airtime: {e}") + return None + + elif item_type == "movie" and (released := getattr(data, "released", None)): + try: + # Movies just have dates, set to midnight UTC + utc_time = datetime.strptime(released, "%Y-%m-%d").replace(hour=0, minute=0, second=0, tzinfo=ZoneInfo("UTC")) + logger.debug(f"Parsed movie release date: {released} -> {utc_time} UTC") + return utc_time + except ValueError: + logger.error(f"Failed to parse release date: {released}") + return None return None \ No newline at end of file diff --git a/src/program/apis/tvmaze_api.py b/src/program/apis/tvmaze_api.py new file mode 100644 index 00000000..d75074cf --- /dev/null +++ b/src/program/apis/tvmaze_api.py @@ -0,0 +1,198 @@ +"""TVMaze API client for fetching show information.""" +from datetime import datetime +from typing import Optional +from zoneinfo import ZoneInfo + +from loguru import logger +from program.utils.request import ( + BaseRequestHandler, + HttpMethod, + ResponseType, + create_service_session, + get_cache_params, + get_rate_limit_params, +) +from requests.exceptions import HTTPError + +class TVMazeAPI: + """Handles TVMaze API communication.""" + + BASE_URL = "https://api.tvmaze.com" + + def __init__(self): + rate_limit_params = get_rate_limit_params(max_calls=20, period=10) # TVMaze allows 20 requests per 10 seconds + tvmaze_cache = get_cache_params("tvmaze", 86400) # Cache for 24 hours + session = create_service_session(rate_limit_params=rate_limit_params, use_cache=True, cache_params=tvmaze_cache) + self.request_handler = BaseRequestHandler(session, response_type=ResponseType.SIMPLE_NAMESPACE) + + def get_show_by_imdb(self, imdb_id: str, show_name: Optional[str] = None, season_number: Optional[int] = None, episode_number: Optional[int] = None) -> Optional[datetime]: + """Get show information from TVMaze using IMDB ID. + + Args: + imdb_id: IMDB ID of the show or episode (with or without 'tt' prefix) + show_name: Optional show name to use for search if IMDB lookup fails + season_number: Optional season number to find specific episode + episode_number: Optional episode number to find specific episode + + Returns: + Next episode airtime in local time if available, None otherwise + """ + try: + # Add 'tt' prefix if not present + if not imdb_id.startswith('tt'): + imdb_id = f'tt{imdb_id}' + + show = None + + # Try singlesearch by show name first if provided, since episode IDs won't work with lookup + if show_name: + logger.debug(f"Trying singlesearch by name: {show_name}") + try: + response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/singlesearch/shows", params={'q': show_name}) + show = response.data if response.is_ok else None + except HTTPError as e: + if e.response.status_code == 404: + show = None + else: + raise + + # If show name search fails or wasn't provided, try direct lookup + # This will only work for show-level IMDB IDs, not episode IDs + if not show: + try: + response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/lookup/shows", params={'imdb': imdb_id}) + show = response.data if response.is_ok else None + except HTTPError as e: + if e.response.status_code == 404: + show = None + else: + raise + + # If that fails too, try regular search + if not show and show_name: + logger.debug(f"Singlesearch failed for {show_name}, trying regular search") + try: + response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/search/shows", params={'q': show_name}) + if response.is_ok and response.data: + # Take the first result with highest score + show = response.data[0].show if response.data else None + except HTTPError as e: + if e.response.status_code == 404: + show = None + else: + raise + + if not show: + logger.debug(f"Could not find show for {imdb_id} / {show_name}") + return None + + # Get next episode + try: + response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/shows/{show.id}/episodes") + episodes = response.data if response.is_ok else None + except HTTPError as e: + if e.response.status_code == 404: + episodes = None + else: + raise + + if not episodes: + return None + + # Find all unreleased episodes and the next episode + current_time = datetime.now().astimezone() # Make sure current_time has timezone info + unreleased_episodes = [] + next_episode = None + target_episode_time = None + + for episode in sorted(episodes, key=lambda x: (getattr(x, 'season', 0), getattr(x, 'number', 0))): + try: + if not episode.airstamp: + continue + + # First try to get air time using network timezone + air_time = None + if (hasattr(show, 'network') and show.network and + hasattr(show.network, 'country') and show.network.country and + hasattr(show.network.country, 'timezone') and show.network.country.timezone and + episode.airdate and episode.airtime): + + # Combine airdate and airtime in network timezone + network_tz = ZoneInfo(show.network.country.timezone) + air_datetime = f"{episode.airdate}T{episode.airtime}" + try: + # Parse the time in network timezone + air_time = datetime.fromisoformat(air_datetime).replace(tzinfo=network_tz) + # Only log network time for the target episode + if (season_number is not None and episode_number is not None and + hasattr(episode, 'number') and hasattr(episode, 'season') and + episode.season == season_number and episode.number == episode_number): + logger.debug(f"Network airs show at {air_time} ({show.network.country.timezone})") + except Exception as e: + logger.error(f"Failed to parse network air time: {e}") + air_time = None + + # Fallback to airstamp if needed + if not air_time and episode.airstamp: + try: + air_time = datetime.fromisoformat(episode.airstamp.replace('Z', '+00:00')) + if (season_number is not None and episode_number is not None and + hasattr(episode, 'number') and hasattr(episode, 'season') and + episode.season == season_number and episode.number == episode_number): + logger.debug(f"Using UTC airstamp: {air_time}") + except Exception as e: + logger.error(f"Failed to parse airstamp: {e}") + continue + + if not air_time: + continue + + # Convert to local time + air_time = air_time.astimezone(current_time.tzinfo) + + # Check if this is the specific episode we want + if season_number is not None and episode_number is not None: + if (hasattr(episode, 'number') and hasattr(episode, 'season') and + episode.season == season_number and episode.number == episode_number): + # Found our target episode + if hasattr(episode, 'name'): + logger.debug(f"Found S{season_number}E{episode_number} '{episode.name}' airing at {air_time}") + else: + logger.debug(f"Found S{season_number}E{episode_number} airing at {air_time}") + target_episode_time = air_time + + # Add all unreleased episodes to our list + if air_time > current_time: + ep_info = { + 'air_time': air_time, + 'season': getattr(episode, 'season', 0), + 'episode': getattr(episode, 'number', 0), + 'name': getattr(episode, 'name', '') + } + unreleased_episodes.append(ep_info) + # Track next episode separately + if not next_episode or air_time < next_episode: + next_episode = air_time + + except Exception as e: + logger.error(f"Failed to process episode {getattr(episode, 'number', '?')}: {e}") + continue + + # Return target episode time if we found one + if target_episode_time is not None: + return target_episode_time + + # Log all unreleased episodes in sequence + if unreleased_episodes: + unreleased_episodes.sort(key=lambda x: (x['season'], x['episode'])) + for ep in unreleased_episodes: + logger.debug(f"Unreleased: S{ep['season']}E{ep['episode']} '{ep['name']}' airs at {ep['air_time']}") + + # Return next episode air time + if next_episode: + logger.debug(f"Next episode airs at {next_episode}") + return next_episode + + except Exception as e: + logger.error(f"Error fetching TVMaze data for {imdb_id}: {e}") + return None diff --git a/src/program/media/item.py b/src/program/media/item.py index 178a64f6..446acef4 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -3,6 +3,7 @@ from enum import Enum from pathlib import Path from typing import List, Optional, Self +from zoneinfo import ZoneInfo import sqlalchemy from loguru import logger @@ -174,9 +175,16 @@ def blacklist_stream(self, stream: Stream): @property def is_released(self) -> bool: """Check if an item has been released.""" - if self.aired_at and self.aired_at <= datetime.now(): - return True - return False + if self.aired_at is None: + return False + + # Get current time with timezone info + if self.aired_at.tzinfo is None: + # If aired_at is naive, assume UTC + self.aired_at = self.aired_at.replace(tzinfo=ZoneInfo("UTC")) + + now = datetime.now(tz=self.aired_at.tzinfo) + return self.aired_at <= now @property def state(self): @@ -391,6 +399,82 @@ def _reset(self): def log_string(self): return self.title or self.id + def get_top_title(self) -> str: + """Get the top title of the item.""" + if self.type == "season": + return self.parent.title + elif self.type == "episode": + return self.parent.parent.title + else: + return self.title + + def get_top_imdb_id(self) -> str: + """Get the imdb_id of the item at the top of the hierarchy.""" + if self.type == "season": + return self.parent.imdb_id + elif self.type == "episode": + return self.parent.parent.imdb_id + return self.imdb_id + + def get_aliases(self) -> dict: + """Get the aliases of the item.""" + if self.type == "season": + return self.parent.aliases + elif self.type == "episode": + return self.parent.parent.aliases + else: + return self.aliases + + def __hash__(self): + return hash(self.id) + + def reset(self): + """Reset item attributes.""" + if self.type == "show": + for season in self.seasons: + for episode in season.episodes: + episode._reset() + season._reset() + elif self.type == "season": + for episode in self.episodes: + episode._reset() + self._reset() + if self.title: + self.store_state(States.Indexed) + else: + self.store_state(States.Requested) + + def _reset(self): + """Reset item attributes for rescraping.""" + if self.symlink_path: + if Path(self.symlink_path).exists(): + Path(self.symlink_path).unlink() + self.set("symlink_path", None) + + try: + for subtitle in self.subtitles: + subtitle.remove() + except Exception as e: + logger.warning(f"Failed to remove subtitles for {self.log_string}: {str(e)}") + + self.set("file", None) + self.set("folder", None) + self.set("alternative_folder", None) + + reset_streams(self) + self.active_stream = {} + + self.set("active_stream", {}) + self.set("symlinked", False) + self.set("symlinked_at", None) + self.set("update_folder", None) + self.set("scraped_at", None) + + self.set("symlinked_times", 0) + self.set("scraped_times", 0) + + logger.debug(f"Item {self.log_string} has been reset") + @property def collection(self): return self.parent.collection if self.parent else self.id @@ -581,7 +665,12 @@ def _determine_state(self): @property def is_released(self) -> bool: - return any(episode.is_released for episode in self.episodes) + """Check if an item has been released.""" + if self.aired_at: + # Use current time for comparison to include time component + current_time = datetime.now() + return self.aired_at <= current_time + return False def __repr__(self): return f"Season:{self.number}:{self.state.name}" diff --git a/src/program/program.py b/src/program/program.py index 1f3fab7c..25e8dc55 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -2,14 +2,17 @@ import os import threading import time +import logging from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime +from datetime import datetime, timedelta from queue import Empty +from zoneinfo import ZoneInfo from apscheduler.schedulers.background import BackgroundScheduler +from kink import di from rich.live import Live -from program.apis import bootstrap_apis +from program.apis import bootstrap_apis, TraktAPI, TVMazeAPI from program.managers.event_manager import EventManager from program.media.item import Episode, MediaItem, Movie, Season, Show from program.media.state import States @@ -60,6 +63,18 @@ def __init__(self): self.services = {} self.enable_trace = settings_manager.settings.tracemalloc self.em = EventManager() + self.scheduled_releases = {} # Track scheduled releases + + # Configure scheduler with timezone awareness + self.scheduler = BackgroundScheduler( + timezone=datetime.now().astimezone().tzinfo, # Use system timezone + ) + + # Disable noisy debug logs from APScheduler + logging.getLogger('apscheduler').setLevel(logging.WARNING) + + self.scheduler.start() # Start the scheduler + if self.enable_trace: tracemalloc.start() self.malloc_time = time.monotonic()-50 @@ -178,12 +193,10 @@ def start(self): logger.log("ITEM", f"Total Items: {total_items} (Symlinks: {total_symlinks})") self.executors = [] - self.scheduler = BackgroundScheduler() self._schedule_services() self._schedule_functions() super().start() - self.scheduler.start() logger.success("Riven is running!") self.initialized = True @@ -214,40 +227,259 @@ def _retry_library(self) -> None: def _update_ongoing(self) -> None: """Update state for ongoing and unreleased items.""" - with db.Session() as session: - item_ids = session.execute( - select(MediaItem.id) - .where(MediaItem.type.in_(["movie", "episode"])) - .where(MediaItem.last_state.in_([States.Ongoing, States.Unreleased])) - ).scalars().all() + try: + current_time = datetime.now().astimezone() + logger.log("PROGRAM", "Checking for upcoming releases...") + trakt_api = di[TraktAPI] - if not item_ids: - logger.debug("No ongoing or unreleased items to update.") - return + # Clear old scheduled releases + self.scheduled_releases = {k: v for k, v in self.scheduled_releases.items() if v > current_time} - logger.debug(f"Updating state for {len(item_ids)} ongoing and unreleased items.") + # Track items by show to optimize API calls + checked_shows = set() + upcoming_releases = [] - counter = 0 - for item_id in item_ids: + with db.Session() as session: try: - item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() - if item: - previous_state, new_state = item.store_state() - if previous_state != new_state: - self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) - logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state.name} to {new_state.name}") - counter += 1 - session.merge(item) - session.commit() + items = session.execute( + select(MediaItem, MediaItem.aired_at) + .where(MediaItem.type.in_(["movie", "episode"])) + .where(MediaItem.last_state.in_([States.Ongoing, States.Unreleased])) + ).unique().all() + + for item, aired_at in items: + try: + # Skip if no IMDB ID + if not item.imdb_id: + continue + + trakt_time = None + tvmaze_time = None + + # Get Trakt time + try: + trakt_item = trakt_api.create_item_from_imdb_id(item.imdb_id, type=item.type) + if trakt_item and trakt_item.aired_at: + trakt_time = trakt_item.aired_at + if trakt_time.tzinfo is None: + trakt_time = trakt_time.replace(tzinfo=ZoneInfo("UTC")) + trakt_time = trakt_time.astimezone(current_time.tzinfo) + + if hasattr(trakt_item, "network"): + item.network = trakt_item.network + except Exception as e: + logger.error(f"Failed to fetch Trakt time for {item.log_string}: {e}") + + # Get TVMaze time for episodes only + if item.type == "episode": + try: + tvmaze_api = di[TVMazeAPI] + show_name = item.get_top_title() + if show_name and not show_name.lower().startswith("episode "): + season_number = item.parent.number if isinstance(item, Episode) and item.parent else None + episode_number = item.number if isinstance(item, Episode) else None + + tvmaze_time = tvmaze_api.get_show_by_imdb( + item.imdb_id, + show_name=show_name, + season_number=season_number, + episode_number=episode_number + ) + except Exception as e: + logger.error(f"Failed to fetch TVMaze time for {item.log_string}: {e}") + + # Use earliest available time + times = [t for t in [trakt_time, tvmaze_time] if t is not None] + if not times: + continue + air_time = min(times) + if not air_time: + continue + + # Store the air time + item.aired_at = air_time + session.merge(item) + + # Calculate release time with delay + delay_minutes = settings_manager.settings.content.trakt.release_delay_minutes + release_time = air_time + timedelta(minutes=delay_minutes) + + # Skip if already released + if release_time <= current_time: + if item.last_state in [States.Ongoing, States.Unreleased]: + previous_state, new_state = item.store_state() + if previous_state != new_state: + self.em.add_event(Event("StateTransition", item_id=item.id)) + logger.log("RELEASE", f"🎬 Released (late): {item.log_string}") + continue + + # Check if releasing in next 24 hours + time_until_release = release_time - current_time + if time_until_release <= timedelta(hours=24): + release_time_str = release_time.strftime("%I:%M %p").lstrip('0') + upcoming_releases.append((item.log_string, release_time_str)) + + # Schedule the release + self.scheduled_releases[item.id] = release_time + job_id = f"release_{item.id}" + + try: + if self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + self.scheduler.add_job( + self._process_release, + 'date', + run_date=release_time, + args=[item.id, item.log_string], + id=job_id, + name=f"Release {item.log_string}" + ) + logger.log("PROGRAM", f"📅 Scheduled: {item.log_string} at {release_time_str}") + except Exception as e: + logger.error(f"Failed to schedule release for {item.log_string}: {e}") + + # If this episode isn't releasing soon, skip rest of the season + # if item.type == "episode": + # checked_shows.add(show_id) + + except Exception as e: + logger.error(f"Failed to process {item.log_string}: {e}") + continue + + session.commit() + + # Log summary of scheduled releases + if upcoming_releases: + logger.log("PROGRAM", "\n📅 Scheduled releases:") + for item_name, release_time in sorted(upcoming_releases, key=lambda x: x[1]): + logger.log("PROGRAM", f" • {item_name} at {release_time}") + else: + logger.log("PROGRAM", "No upcoming releases found") + except Exception as e: - logger.error(f"Failed to update state for item with ID {item_id}: {e}") + session.rollback() + logger.error(f"Database error in _update_ongoing: {e}") + raise + + except Exception as e: + logger.error(f"Error in _update_ongoing: {e}") + + def _update_item_state(self, item_id: str) -> None: + """Update the state of a single item.""" + with db.Session() as session: + try: + item = session.execute( + select(MediaItem).where(MediaItem.id == item_id) + ).unique().scalar_one_or_none() + + if not item: + logger.error(f"Item {item_id} not found") + return - logger.debug(f"Found {counter} items with updated state.") + # Check if this item should be scheduled for release today + current_time = datetime.now().astimezone() + if item.aired_at: + delay_minutes = settings_manager.settings.content.trakt.release_delay_minutes + delayed_time = item.aired_at + timedelta(minutes=delay_minutes) + + # Check if it's for today (regardless of time) + is_today = ( + item.aired_at.year == current_time.year and + item.aired_at.month == current_time.month and + item.aired_at.day == current_time.day + ) + + if is_today: + release_time_str = delayed_time.strftime("%I:%M %p").lstrip('0') + # If it's in the future, schedule it + if delayed_time > current_time: + # Only schedule if item is still unreleased + if item.last_state in [States.Ongoing, States.Unreleased]: + logger.log("PROGRAM", f"Scheduling {item.log_string} for release at {release_time_str}") + self.scheduled_releases[item.id] = delayed_time + + # Schedule a one-time job at the release time + job_id = f"release_{item.id}" + try: + # Remove any existing job first + if self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + # Add the new job + self.scheduler.add_job( + self._process_release, + 'date', + run_date=delayed_time, + args=[item.id, item.log_string], + id=job_id, + name=f"Release {item.log_string}" + ) + except Exception as e: + logger.error(f"Failed to schedule release job for {item.log_string}: {e}") + # If it's in the past (for today), release it now + else: + logger.log("PROGRAM", f"Releasing {item.log_string} now (scheduled for {release_time_str})") + previous_state, new_state = item.store_state() + if previous_state != new_state: + self.em.add_event(Event("StateTransition", item_id=item.id)) + logger.log("RELEASE", f"🎬 Released (late): {item.log_string}") + + session.merge(item) + session.commit() + except Exception as e: + session.rollback() + logger.error(f"Failed to update scheduled state for item with ID {item_id}: {e}") + finally: + # Remove from scheduled releases after processing + if item_id in self.scheduled_releases and self.scheduled_releases[item_id] <= current_time: + del self.scheduled_releases[item_id] + + def _process_release(self, item_id: str, log_string: str) -> None: + """Process a scheduled release at its designated time.""" + try: + with db.Session() as session: + item = session.execute( + select(MediaItem).where(MediaItem.id == item_id) + ).unique().scalar_one_or_none() + + if item: + # Only process if item is still unreleased + if item.last_state in [States.Ongoing, States.Unreleased]: + previous_state, new_state = item.store_state() + if previous_state != new_state: + self.em.add_event(Event("StateTransition", item_id=item_id)) + release_time = datetime.now().astimezone().strftime("%I:%M %p").lstrip('0') + logger.log("RELEASE", f"🎬 Released at {release_time}: {log_string}") + session.merge(item) + session.commit() + else: + logger.error(f"Item {item_id} not found during scheduled release") + + # Clean up the scheduled release + self.scheduled_releases.pop(item_id, None) + except Exception as e: + logger.error(f"Failed to process scheduled release for {log_string}: {e}") def _schedule_functions(self) -> None: """Schedule each service based on its update interval.""" + # Schedule the ongoing state update function to run at midnight + current_time = datetime.now().astimezone() + midnight = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + + self.scheduler.add_job( + self._update_ongoing, + 'cron', + hour=0, + minute=0, + id="update_ongoing", + replace_existing=True + ) + + # Run update_ongoing immediately on startup + self._update_ongoing() + scheduled_functions = { - self._update_ongoing: {"interval": 60 * 60 * 24}, self._retry_library: {"interval": 60 * 60 * 24}, log_cleaner: {"interval": 60 * 60}, vacuum_and_analyze_index_maintenance: {"interval": 60 * 60 * 24}, @@ -269,10 +501,9 @@ def _schedule_functions(self) -> None: id=f"{func.__name__}", max_instances=config.get("max_instances", 1), replace_existing=True, - next_run_time=datetime.now(), - misfire_grace_time=30 + next_run_time=datetime.now() ) - logger.debug(f"Scheduled {func.__name__} to run every {config['interval']} seconds.") + logger.log("PROGRAM", f"Scheduled {func.__name__} to run every {config['interval']} seconds.") def _schedule_services(self) -> None: """Schedule each service based on its update interval.""" @@ -294,31 +525,31 @@ def _schedule_services(self) -> None: next_run_time=datetime.now() if service_cls != SymlinkLibrary else None, coalesce=False, ) - logger.debug(f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.") + logger.log("PROGRAM", f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.") def display_top_allocators(self, snapshot, key_type="lineno", limit=10): import psutil process = psutil.Process(os.getpid()) top_stats = snapshot.compare_to(self.last_snapshot, "lineno") - logger.debug("Top %s lines" % limit) + logger.log("PROGRAM", "Top %s lines" % limit) for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] # replace "/path/to/module/file.py" with "module/file.py" filename = os.sep.join(frame.filename.split(os.sep)[-2:]) - logger.debug("#%s: %s:%s: %.1f KiB" + logger.log("PROGRAM", "#%s: %s:%s: %.1f KiB" % (index, filename, frame.lineno, stat.size / 1024)) line = linecache.getline(frame.filename, frame.lineno).strip() if line: - logger.debug(" %s" % line) + logger.log("PROGRAM", " %s" % line) other = top_stats[limit:] if other: size = sum(stat.size for stat in other) - logger.debug("%s other: %.1f MiB" % (len(other), size / (1024 * 1024))) + logger.log("PROGRAM", "%s other: %.1f MiB" % (len(other), size / (1024 * 1024))) total = sum(stat.size for stat in top_stats) - logger.debug("Total allocated size: %.1f MiB" % (total / (1024 * 1024))) - logger.debug(f"Process memory: {process.memory_info().rss / (1024 * 1024):.2f} MiB") + logger.log("PROGRAM", "Total allocated size: %.1f MiB" % (total / (1024 * 1024))) + logger.log("PROGRAM", f"Process memory: {process.memory_info().rss / (1024 * 1024):.2f} MiB") def dump_tracemalloc(self): if time.monotonic() - self.malloc_time > 60: @@ -365,16 +596,12 @@ def run(self): self.em.add_event_to_running(event) self.em.submit_job(next_service, self, event) - def stop(self): - if not self.initialized: - return - - if hasattr(self, "executors"): - for executor in self.executors: - if not executor["_executor"]._shutdown: - executor["_executor"].shutdown(wait=False) - if hasattr(self, "scheduler") and self.scheduler.running: - self.scheduler.shutdown(wait=False) + def stop(self) -> None: + """Stop the program.""" + self.running = False + # Shutdown scheduler properly + if self.scheduler.running: + self.scheduler.shutdown() logger.log("PROGRAM", "Riven has been stopped.") def _enhance_item(self, item: MediaItem) -> MediaItem | None: diff --git a/src/program/settings/models.py b/src/program/settings/models.py index ed18d06a..6b0800c7 100644 --- a/src/program/settings/models.py +++ b/src/program/settings/models.py @@ -173,6 +173,7 @@ class TraktModel(Updatable): most_watched_period: str = "weekly" most_watched_count: int = 10 update_interval: int = 86400 + release_delay_minutes: int = Field(default=60, description="Number of minutes to delay the state change after release") oauth: TraktOauthModel = TraktOauthModel() diff --git a/src/routers/secure/items.py b/src/routers/secure/items.py index b5a49808..038dc8b7 100644 --- a/src/routers/secure/items.py +++ b/src/routers/secure/items.py @@ -213,10 +213,11 @@ async def add_items(request: Request, imdb_ids: str = None) -> MessageResponse: return {"message": f"Added {len(valid_ids)} item(s) to the queue"} + @router.get( "/{id}", - summary="Retrieve Media Item", - description="Fetch a single media item by ID", + summary="Retrieve Media Item By ID", + description="Fetch a media item by its ID", operation_id="get_item", ) async def get_item(_: Request, id: str, use_tmdb_id: Optional[bool] = False) -> dict: @@ -227,10 +228,24 @@ async def get_item(_: Request, id: str, use_tmdb_id: Optional[bool] = False) -> query = query.where(MediaItem.tmdb_id == id) else: query = query.where(MediaItem.id == id) - item = session.execute(query).unique().scalar_one() + + # Get all matching items and use the first one + items = session.execute(query).unique().scalars().all() + if not items: + raise HTTPException(status_code=404, detail="Item not found") + + # Use the first item if there are multiple + item = items[0] + if len(items) > 1: + # Log details about each duplicate + logger.warning(f"Multiple items found for ID {id}:") + for i, dupe in enumerate(items): + logger.warning(f" {i+1}. {dupe.type} - {dupe.log_string} (ID: {dupe.id})") + logger.warning(f"Using first item: {item.type} - {item.log_string}") + + return item.to_extended_dict(with_streams=False) except NoResultFound: raise HTTPException(status_code=404, detail="Item not found") - return item.to_extended_dict(with_streams=False) @router.get(