forked from AustinHasten/PlexHolidays
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
120 lines (106 loc) · 4.73 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Austin Hasten
# Initial Commit - November 16th, 2017
import re, sys, logging, itertools
import tvdb_api
import plexapi.utils
from tqdm import tqdm
from retry import retry
from multiprocessing.pool import ThreadPool
from imdb import IMDb, IMDbDataAccessError
from requests.exceptions import ConnectTimeout
from plexapi.exceptions import BadRequest, NotFound
from http.client import RemoteDisconnected, ResponseNotReady
class Plex():
def __init__(self):
self.account = self.get_account()
self.server = self.get_account_server(self.account)
self.section = self.get_server_section(self.server)
self.media = self.get_flat_media(self.section)
@retry(BadRequest)
def get_account(self):
return plexapi.utils.getMyPlexAccount()
def get_account_server(self, account):
servers = [ _ for _ in account.resources() if _.product == 'Plex Media Server' ]
if not servers:
print('No available servers.')
sys.exit()
return plexapi.utils.choose('Select server index', servers, 'name').connect()
def get_server_section(self, server):
sections = [ _ for _ in server.library.sections() if _.type in {'movie', 'show'} ]
if not sections:
print('No available sections.')
sys.exit()
return plexapi.utils.choose('Select section index', sections, 'title')
def get_flat_media(self, section):
if section.type == 'movie':
return self.section.all()
nested_episodes = [ show.episodes() for show in self.section.all() ]
return list(itertools.chain.from_iterable(nested_episodes))
def create_playlist(self, name, media):
try:
self.server.playlist(name).addItems(media)
except plexapi.exceptions.NotFound:
self.server.createPlaylist(name, media)
class PlexHolidays():
def __init__(self):
# Necessary to disable imdbpy logger to hide timeouts, which are handled
logging.getLogger('imdbpy').disabled = True
logging.getLogger('imdbpy.parser.http.urlopener').disabled = True
self.imdbpy = IMDb()
self.tvdb = tvdb_api.Tvdb()
self.plex = Plex()
self.keyword = input('Keyword (i.e. Holiday name): ').lower()
self.pbar = tqdm(self.plex.media, desc=f'{self.plex.section.title}')
self.results = ThreadPool(10).map(self.find_matches, self.plex.media)
self.matches = [ medium for match, medium in self.results if match ]
if not self.matches:
print('No matching items.')
else:
print(len(self.matches), 'items matching', '\"' + self.keyword + '\":')
for match in self.matches:
print('\t', match.title, '(' + str(match.year) + ')')
self.plex.create_playlist(input('Playlist name: '), self.matches)
print('Playlist created/updated.')
def find_matches(self, medium):
try:
if self.keyword in medium.title.lower() or self.keyword in medium.summary.lower():
return (True, medium)
plex_guid = self.get_plex_guid(medium)
imdb_id = self.get_imdb_id(plex_guid)
imdb_keywords = self.get_imdb_keywords(imdb_id)
return ((self.keyword in imdb_keywords), medium)
finally:
self.pbar.update()
# TODO Handle exception if tries exceeded
@retry(ConnectTimeout, delay=1, tries=5)
def get_plex_guid(self, medium):
return medium.guid
def get_imdb_id(self, plex_guid):
if 'imdb' in plex_guid:
return re.search(r'tt(\d*)\?', plex_guid).groups()[0]
elif 'tvdb' in plex_guid:
return self.get_episode_id(plex_guid)
return None
def get_episode_id(self, plex_guid):
regex = r'\/\/(\d*)\/(\d*)\/(\d*)'
series_id, season, episode = map(int, re.search(regex, plex_guid).groups())
try:
imdb_id = self.tvdb[series_id][season][episode]['imdbId']
except (NameError, tvdb_api.tvdb_episodenotfound, tvdb_api.tvdb_seasonnotfound):
return None
return imdb_id[2:] if imdb_id.startswith('tt') else imdb_id
# TODO Handle exception if tries exceeded
@retry((RemoteDisconnected, ResponseNotReady, AttributeError, BrokenPipeError, OSError), delay=1, tries=5)
def get_tvdb_series(self, series_id):
return self.tvdb.get_series(series_id, 'en')
# TODO Handle exception if tries exceeded
@retry(IMDbDataAccessError, delay=2, tries=5)
def get_imdb_keywords(self, imdb_id):
if not imdb_id:
return []
try:
return self.imdbpy.get_movie_keywords(imdb_id)['data'].get('keywords', [])
except IMDbDataAccessError:
return []
if __name__ == "__main__":
ph = PlexHolidays()