Skip to content

Commit

Permalink
feat: enhance session management and event processing (#842)
Browse files Browse the repository at this point in the history
- Added asyncio support for asynchronous operations in scraping sessions.
- Improved session management with additional checks for item existence.
- Enhanced event manager to handle cancelled futures and improve logging.
- Updated state transition logic to handle NoneType for existing items.
- Introduced joinedload to always ensure we get a mediaitem and its parent id.
  • Loading branch information
iPromKnight authored Nov 3, 2024
1 parent 86e6fd0 commit 13aa94e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 17 deletions.
15 changes: 9 additions & 6 deletions src/program/db/db_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from loguru import logger
from sqlalchemy import delete, desc, func, insert, select, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session, selectinload
from sqlalchemy.orm import Session, selectinload, joinedload

from program.media.stream import Stream, StreamBlacklistRelation, StreamRelation
from program.services.libraries.symlink import fix_broken_symlinks
Expand Down Expand Up @@ -51,11 +51,14 @@ def get_item_by_external_id(imdb_id: str = None, tvdb_id: int = None, tmdb_id: i
from program.media.item import MediaItem, Season, Show

_session = session if session else db.Session()
query = (select(MediaItem)
.options(
selectinload(Show.seasons)
.selectinload(Season.episodes)
))
query = (
select(MediaItem)
.options(
joinedload(Show.seasons)
.joinedload(Season.episodes),
joinedload(Season.episodes)
)
)

if imdb_id:
query = query.where(MediaItem.imdb_id == imdb_id)
Expand Down
6 changes: 6 additions & 0 deletions src/program/managers/event_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import threading
import traceback
from asyncio import CancelledError
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty
Expand Down Expand Up @@ -65,6 +66,11 @@ def _process_future(self, future, service):
future (concurrent.futures.Future): The future to process.
service (type): The service class associated with the future.
"""

if future.cancelled():
logger.debug(f"Future for {future} was cancelled.")
return # Skip processing if the future was cancelled

try:
result = future.result()
if future in self._futures:
Expand Down
14 changes: 7 additions & 7 deletions src/program/state_transition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ def process_event(emitted_by: Service, existing_item: MediaItem | None = None, c
items_to_submit = []

#TODO - Reindex non-released badly indexed items here
if content_item or existing_item.last_state == States.Requested:
if content_item or (existing_item is not None and existing_item.last_state == States.Requested):
next_service = TraktIndexer
return next_service, [content_item or existing_item]

elif existing_item.last_state in [States.PartiallyCompleted, States.Ongoing]:
elif existing_item is not None and existing_item.last_state in [States.PartiallyCompleted, States.Ongoing]:
if existing_item.type == "show":
for season in existing_item.seasons:
if season.last_state not in [States.Completed, States.Unreleased]:
Expand All @@ -33,7 +33,7 @@ def process_event(emitted_by: Service, existing_item: MediaItem | None = None, c
_, sub_items = process_event(emitted_by, episode, None)
items_to_submit += sub_items

elif existing_item.last_state == States.Indexed:
elif existing_item is not None and existing_item.last_state == States.Indexed:
next_service = Scraping
if emitted_by != Scraping and Scraping.can_we_scrape(existing_item):
items_to_submit = [existing_item]
Expand All @@ -42,19 +42,19 @@ def process_event(emitted_by: Service, existing_item: MediaItem | None = None, c
elif existing_item.type == "season":
items_to_submit = [e for e in existing_item.episodes if e.last_state != States.Completed and Scraping.can_we_scrape(e)]

elif existing_item.last_state == States.Scraped:
elif existing_item is not None and existing_item.last_state == States.Scraped:
next_service = Downloader
items_to_submit = [existing_item]

elif existing_item.last_state == States.Downloaded:
elif existing_item is not None and existing_item.last_state == States.Downloaded:
next_service = Symlinker
items_to_submit = [existing_item]

elif existing_item.last_state == States.Symlinked:
elif existing_item is not None and existing_item.last_state == States.Symlinked:
next_service = Updater
items_to_submit = [existing_item]

elif existing_item.last_state == States.Completed:
elif existing_item is not None and existing_item.last_state == States.Completed:
# If a user manually retries an item, lets not notify them again
if emitted_by not in ["RetryItem", PostProcessing]:
notify(existing_item)
Expand Down
15 changes: 11 additions & 4 deletions src/routers/secure/scrape.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Literal, Optional, TypeAlias, Union
from uuid import uuid4
Expand Down Expand Up @@ -320,7 +321,7 @@ def manual_select_files(request: Request, session_id, files: Container) -> Selec
summary="Match container files to item",
operation_id="manual_update_attributes"
)
def manual_update_attributes(request: Request, session_id, data: Union[ContainerFile, ShowFileData]) -> UpdateAttributesResponse:
async def manual_update_attributes(request: Request, session_id, data: Union[ContainerFile, ShowFileData]) -> UpdateAttributesResponse:
session = session_manager.get_session(session_id)
log_string = None
if not session:
Expand All @@ -330,13 +331,13 @@ def manual_update_attributes(request: Request, session_id, data: Union[Container
raise HTTPException(status_code=500, detail="")

with db.Session() as db_session:
if str(session.item_id).startswith("tt") and not db_functions.get_item_by_external_id(imdb_id=session.item_id):
if str(session.item_id).startswith("tt") and not db_functions.get_item_by_external_id(imdb_id=session.item_id) and not db_functions.get_item_by_id(session.item_id):
prepared_item = MediaItem({"imdb_id": session.item_id})
item = next(TraktIndexer().run(prepared_item))
db_session.merge(item)
db_session.commit()
else:
item : MediaItem = db_functions.get_item_by_id(session.item_id)
item = db_functions.get_item_by_id(session.item_id)

if not item:
raise HTTPException(status_code=404, detail="Item not found")
Expand All @@ -354,11 +355,17 @@ def manual_update_attributes(request: Request, session_id, data: Union[Container
item.streams.append(ItemStream(torrent))
item_ids_to_submit.append(item.id)
else:
request.app.program.em.cancel_job(item.id)
await asyncio.sleep(0.2)
for season in item.seasons:
request.app.program.em.cancel_job(season.id)
await asyncio.sleep(0.2)
for season, episodes in data.root.items():
for episode, episode_data in episodes.items():
item_episode: Episode = next((_episode for _season in item.seasons if _season.number == season for _episode in _season.episodes if _episode.number == episode), None)
if item_episode:
request.app.program.em.cancel_job(item.id)
request.app.program.em.cancel_job(item_episode.id)
await asyncio.sleep(0.2)
item_episode.reset()
item_episode.file = episode_data.filename
item_episode.folder = episode_data.filename
Expand Down

0 comments on commit 13aa94e

Please sign in to comment.