diff --git a/blackhole.py b/blackhole.py index ec7cb88..66197a6 100644 --- a/blackhole.py +++ b/blackhole.py @@ -12,7 +12,7 @@ from shared.discord import discordError, discordUpdate from shared.shared import realdebrid, torbox, blackhole, plex, checkRequiredEnvs from shared.arr import Arr, Radarr, Sonarr -from shared.debrid import TorrentBase, RealDebridTorrent, RealDebridMagnet, TorboxTorrent, TorboxMagnet +from shared.debrid import FileBase, RealDebridTorrent, RealDebridMagnet, TorboxTorrent, TorboxMagnet, TorboxNZB _print = print @@ -41,23 +41,25 @@ def __init__(self, filename, filenameWithoutExt, filePath, filePathProcessing, f self.folderPathCompleted = folderPathCompleted class TorrentInfo(): - def __init__(self, isTorrentOrMagnet, isDotTorrentFile) -> None: - self.isTorrentOrMagnet = isTorrentOrMagnet + def __init__(self, isTorrentMagnetOrNZB, isDotTorrentFile, isDotNZBFile) -> None: + self.isTorrentMagnetOrNZB = isTorrentMagnetOrNZB self.isDotTorrentFile = isDotTorrentFile + self.isDotNZBFile = isDotNZBFile def __init__(self, filename, isRadarr) -> None: print('filename:', filename) baseBath = getPath(isRadarr) uniqueId = str(uuid.uuid4())[:8] # Generate a unique identifier isDotTorrentFile = filename.casefold().endswith('.torrent') - isTorrentOrMagnet = isDotTorrentFile or filename.casefold().endswith('.magnet') + isDotNZBFile = filename.casefold().endswith('.nzb') + isTorrentMagnetOrNZB = isDotNZBFile or isDotTorrentFile or filename.casefold().endswith('.magnet') filenameWithoutExt, ext = os.path.splitext(filename) filePath = os.path.join(baseBath, filename) filePathProcessing = os.path.join(baseBath, 'processing', f"{filenameWithoutExt}_{uniqueId}{ext}") folderPathCompleted = os.path.join(baseBath, 'completed', filenameWithoutExt) self.fileInfo = self.FileInfo(filename, filenameWithoutExt, filePath, filePathProcessing, folderPathCompleted) - self.torrentInfo = self.TorrentInfo(isTorrentOrMagnet, isDotTorrentFile) + self.torrentInfo = self.TorrentInfo(isTorrentMagnetOrNZB, isDotTorrentFile, isDotNZBFile) def getPath(isRadarr, create=False): baseWatchPath = blackhole['baseWatchPath'] @@ -137,13 +139,13 @@ def print(*values: object): import signal -async def processTorrent(torrent: TorrentBase, file: TorrentFileInfo, arr: Arr) -> bool: +async def processTorrent(torrent: FileBase, file: TorrentFileInfo, arr: Arr) -> bool: _print = globals()['print'] def print(*values: object): _print(f"[{torrent.__class__.__name__}] [{file.fileInfo.filenameWithoutExt}]", *values) - if not torrent.submitTorrent(): + if not torrent.submitFile(): return False count = 0 @@ -179,7 +181,7 @@ def print(*values: object): while True: existsCount += 1 - folderPathMountTorrent = await torrent.getTorrentPath() + folderPathMountTorrent = await torrent.getFilePath() if folderPathMountTorrent: multiSeasonRegex1 = r'(?<=[\W_][Ss]eason[\W_])[\d][\W_][\d]{1,2}(?=[\W_])' multiSeasonRegex2 = r'(?<=[\W_][Ss])[\d]{2}[\W_][Ss]?[\d]{2}(?=[\W_])' @@ -253,7 +255,7 @@ def print(*values: object): async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr): try: _print = globals()['print'] - + def print(*values: object): _print(f"[{file.fileInfo.filenameWithoutExt}]", *values) @@ -278,16 +280,19 @@ async def is_accessible(path, timeout=10): time.sleep(.1) # Wait before processing the file in case it isn't fully written yet. os.renames(file.fileInfo.filePath, file.fileInfo.filePathProcessing) + + if file.torrentInfo.isDotNZBFile and not torbox['enabled']: + raise Exception("Cannot process NZB, no NZB client enabled (i.e. Torbox)") - with open(file.fileInfo.filePathProcessing, 'rb' if file.torrentInfo.isDotTorrentFile else 'r') as f: + with open(file.fileInfo.filePathProcessing, 'rb' if file.torrentInfo.isDotTorrentFile or file.torrentInfo.isDotNZBFile else 'r') as f: fileData = f.read() f.seek(0) torrentConstructors = [] - if realdebrid['enabled']: + if realdebrid['enabled'] and not file.torrentInfo.isDotNZBFile: torrentConstructors.append(RealDebridTorrent if file.torrentInfo.isDotTorrentFile else RealDebridMagnet) if torbox['enabled']: - torrentConstructors.append(TorboxTorrent if file.torrentInfo.isDotTorrentFile else TorboxMagnet) + torrentConstructors.append(TorboxNZB if file.torrentInfo.isDotNZBFile else TorboxTorrent if file.torrentInfo.isDotTorrentFile else TorboxMagnet) onlyLargestFile = isRadarr or bool(re.search(r'S[\d]{2}E[\d]{2}(?![\W_][\d]{2}[\W_])', file.fileInfo.filename)) if not blackhole['failIfNotCached']: @@ -315,7 +320,7 @@ async def is_accessible(path, timeout=10): discordError(f"Error processing {file.fileInfo.filenameWithoutExt}", e) -async def fail(torrent: TorrentBase, arr: Arr): +async def fail(torrent: FileBase, arr: Arr): _print = globals()['print'] def print(*values: object): @@ -339,7 +344,7 @@ def print(*values: object): def getFiles(isRadarr): print('getFiles') files = (TorrentFileInfo(filename, isRadarr) for filename in os.listdir(getPath(isRadarr)) if filename not in ['processing', 'completed']) - return [file for file in files if file.torrentInfo.isTorrentOrMagnet] + return [file for file in files if file.torrentInfo.isTorrentMagnetOrNZB] async def on_created(isRadarr): print("Enter 'on_created'") @@ -360,7 +365,7 @@ async def on_created(isRadarr): if files: futures.append(asyncio.gather(*(processFile(file, arr, isRadarr) for file in files))) elif firstGo: - print('No torrent files found') + print('No torrent or NZB files found') firstGo = False await asyncio.sleep(1) diff --git a/blackhole_watcher.py b/blackhole_watcher.py index 180b687..ee0c5c3 100644 --- a/blackhole_watcher.py +++ b/blackhole_watcher.py @@ -10,7 +10,7 @@ def __init__(self, is_radarr): self.path_name = getPath(is_radarr, create=True) def on_created(self, event): - if not event.is_directory and event.src_path.lower().endswith((".torrent", ".magnet")): + if not event.is_directory and event.src_path.lower().endswith((".torrent", ".magnet", ".nzb")): asyncio.run(on_created(self.is_radarr)) async def on_run(self): diff --git a/shared/debrid.py b/shared/debrid.py index f9260a0..46865b7 100644 --- a/shared/debrid.py +++ b/shared/debrid.py @@ -95,7 +95,7 @@ def validateTorboxMountTorrentsPath(): checkRequiredEnvs(requiredEnvs) -class TorrentBase(ABC): +class FileBase(ABC): STATUS_WAITING_FILES_SELECTION = 'waiting_files_selection' STATUS_DOWNLOADING = 'downloading' STATUS_COMPLETED = 'completed' @@ -118,7 +118,7 @@ def print(self, *values: object): print(f"[{datetime.now()}] [{self.__class__.__name__}] [{self.file.fileInfo.filenameWithoutExt}]", *values) @abstractmethod - def submitTorrent(self): + def submitFile(self): pass @abstractmethod @@ -126,7 +126,7 @@ def getHash(self): pass @abstractmethod - def addTorrent(self): + def addFile(self): pass @abstractmethod @@ -142,7 +142,7 @@ def delete(self): pass @abstractmethod - async def getTorrentPath(self): + async def getFilePath(self): pass @abstractmethod @@ -152,25 +152,29 @@ def _addTorrentFile(self): @abstractmethod def _addMagnetFile(self): pass + + @abstractmethod + def _addNZBFile(self): + pass def _enforceId(self): if not self.id: - raise Exception("Id is required. Must be acquired via successfully running submitTorrent() first.") + raise Exception("Id is required. Must be acquired via successfully running submitFile() first.") -class RealDebrid(TorrentBase): +class RealDebrid(FileBase): def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) self.headers = {'Authorization': f'Bearer {realdebrid["apiKey"]}'} self.mountTorrentsPath = realdebrid["mountTorrentsPath"] - def submitTorrent(self): + def submitFile(self): if self.failIfNotCached: instantAvailability = self._getInstantAvailability() self.print('instantAvailability:', not not instantAvailability) if not instantAvailability: return False - return not not self.addTorrent() + return not not self.addFile() def _getInstantAvailability(self, refresh=False): if refresh or not self._instantAvailability: @@ -281,7 +285,7 @@ def delete(self): return not not deleteRequest - async def getTorrentPath(self): + async def getFilePath(self): filename = (await self.getInfo())['filename'] originalFilename = (await self.getInfo())['original_filename'] @@ -325,6 +329,9 @@ def _addTorrentFile(self): def _addMagnetFile(self): return self._addFile(requests.post, "torrents/addMagnet", {'magnet': self.fileData}) + def _addNZBFile(self): + raise Exception("Not Implemented") + def _normalize_status(self, status): if status in ['waiting_files_selection']: return self.STATUS_WAITING_FILES_SELECTION @@ -336,7 +343,7 @@ def _normalize_status(self, status): return self.STATUS_ERROR return status -class Torbox(TorrentBase): +class Torbox(FileBase): def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) self.headers = {'Authorization': f'Bearer {torbox["apiKey"]}'} @@ -352,14 +359,14 @@ def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: userInfo = userInfoRequest.json() self.authId = userInfo['data']['auth_id'] - def submitTorrent(self): + def submitFile(self): if self.failIfNotCached: instantAvailability = self._getInstantAvailability() self.print('instantAvailability:', not not instantAvailability) if not instantAvailability: return False - if self.addTorrent(): + if self.addFile(): self.submittedTime = datetime.now() return True return False @@ -438,7 +445,7 @@ def delete(self): ) return not not deleteRequest - async def getTorrentPath(self): + async def getFilePath(self): filename = (await self.getInfo())['name'] folderPathMountFilenameTorrent = os.path.join(self.mountTorrentsPath, filename) @@ -461,7 +468,7 @@ def _addFile(self, data=None, files=None): response = request.json() self.print('response info:', response) - if response.get('detail') == 'queued': + if 'queued' in response.get('detail'): return None self.id = response['data']['torrent_id'] @@ -475,6 +482,9 @@ def _addTorrentFile(self): def _addMagnetFile(self): return self._addFile(data={'magnet': self.fileData}) + + def _addNZBFile(self): + raise Exception("Not Implemented") def _normalize_status(self, status, download_finished): if download_finished: @@ -490,7 +500,7 @@ def _normalize_status(self, status, download_finished): return self.STATUS_ERROR return status -class Torrent(TorrentBase): +class Torrent(FileBase): def getHash(self): if not self._hash: @@ -499,10 +509,10 @@ def getHash(self): return self._hash - def addTorrent(self): + def addFile(self): return self._addTorrentFile() -class Magnet(TorrentBase): +class Magnet(FileBase): def getHash(self): if not self._hash: @@ -511,7 +521,7 @@ def getHash(self): return self._hash - def addTorrent(self): + def addFile(self): return self._addMagnetFile() @@ -526,3 +536,170 @@ class TorboxTorrent(Torbox, Torrent): class TorboxMagnet(Torbox, Magnet): pass + + +class UsenetTorbox(FileBase): + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) + self.headers = {'Authorization': f'Bearer {torbox["apiKey"]}'} + self.mountTorrentsPath = torbox["mountTorrentsPath"] + self.submittedTime = None + self.lastInactiveCheck = None + + userInfoRequest = retryRequest( + lambda: requests.get(urljoin(torbox['host'], "user/me"), headers=self.headers), + print=self.print + ) + if userInfoRequest is not None: + userInfo = userInfoRequest.json() + self.authId = userInfo['data']['auth_id'] + + def submitFile(self): + if self.failIfNotCached: + instantAvailability = self._getInstantAvailability() + self.print('instantAvailability:', not not instantAvailability) + if not instantAvailability: + return False + + if self.addFile(): + self.submittedTime = datetime.now() + return True + return False + + def _getInstantAvailability(self, refresh=False): + if refresh or not self._instantAvailability: + usenetHash = self.getHash() + self.print('hash:', usenetHash) + + instantAvailabilityRequest = retryRequest( + lambda: requests.get( + urljoin(torbox['host'], "usenet/checkcached"), + headers=self.headers, + params={'hash': usenetHash, 'format': 'object'} + ), + print=self.print + ) + if instantAvailabilityRequest is None: + return None + + instantAvailabilities = instantAvailabilityRequest.json() + self.print('instantAvailabilities:', instantAvailabilities) + + # Check if 'data' exists and is not None or False + if instantAvailabilities and 'data' in instantAvailabilities and instantAvailabilities['data']: + self._instantAvailability = instantAvailabilities['data'] + else: + self._instantAvailability = None + + return self._instantAvailability + + async def getInfo(self, refresh=False): + self._enforceId() + + if refresh or not self._info: + if not self.authId: + return None + + for _ in range(60): + infoRequest = retryRequest( + lambda: requests.get(urljoin(torbox['host'], "usenet/mylist"), headers=self.headers), + print=self.print + ) + if infoRequest is None: + return None + + usenetData = infoRequest.json()['data'] + + for usenet in usenetData: + if usenet['id'] == self.id: + usenet['status'] = self._normalize_status(usenet['download_state'], usenet['download_finished']) + usenet['progress'] = usenet['progress'] * 100 + self._info = usenet + return self._info + + await asyncio.sleep(1) + return self._info + + async def selectFiles(self): + pass + + def delete(self): + self._enforceId() + + deleteRequest = retryRequest( + lambda: requests.delete(urljoin(torbox['host'], "usenet/controlusenetdownload"), headers=self.headers, data={'usenet_id': self.id, 'operation': "delete"}), + print=self.print + ) + return not not deleteRequest + + async def getFilePath(self): + filename = (await self.getInfo())['name'] + + folderPathMountFilenameUsenet = os.path.join(self.mountTorrentsPath, filename) + + if os.path.exists(folderPathMountFilenameUsenet) and os.listdir(folderPathMountFilenameUsenet): + folderPathMountUsenet = folderPathMountFilenameUsenet + else: + folderPathMountUsenet = None + + return folderPathMountUsenet + + def _addFile(self, data=None, files=None): + request = retryRequest( + lambda: requests.post(urljoin(torbox['host'], "usenet/createusenetdownload"), headers=self.headers, data=data, files=files), + print=self.print + ) + if request is None: + return None + + response = request.json() + self.print('response info:', response) + + if response.get('detail') == 'queued': + return None + + self.id = response['data']['usenetdownload_id'] + + return self.id + + def _addTorrentFile(self): + raise Exception("Not Implemented") + + def _addMagnetFile(self): + raise Exception("Not Implemented") + + def _addNZBFile(self): + nameusenet = self.f.name.split('/')[-1] + files = {'file': (nameusenet, self.f, 'application/octet-stream')} + return self._addFile(files=files) + + def _normalize_status(self, status, download_finished): + if download_finished: + return self.STATUS_COMPLETED + elif status in [ + 'completed', 'cached', 'paused', 'downloading', 'uploading', + 'checkingResumeData', 'metaDL', 'pausedUP', 'queuedUP', 'checkingUP', + 'forcedUP', 'allocating', 'downloading', 'metaDL', 'pausedDL', + 'queuedDL', 'checkingDL', 'forcedDL', 'checkingResumeData', 'moving' + ]: + return self.STATUS_DOWNLOADING + elif status in ['error', 'stalledUP', 'stalledDL', 'stalled (no seeds)', 'missingFiles']: + return self.STATUS_ERROR + return status + + +class NZB(FileBase): + def getHash(self): + + #TODO: Verify that torbox uses md5 hash for nzb files + if not self._hash: + self._hash = hashlib.md5(self.fileData).hexdigest() + + return self._hash + + def addFile(self): + return self._addNZBFile() + + +class TorboxNZB(UsenetTorbox, NZB): + pass \ No newline at end of file