From 3000c0a80db319394c91beb3fc35b2da855532d4 Mon Sep 17 00:00:00 2001 From: Konstantin Date: Fri, 11 Oct 2024 00:05:52 +0200 Subject: [PATCH] annotate types --- beetsplug/vocadb.py | 323 ++++++++++++++++++++++++++------------------ 1 file changed, 189 insertions(+), 134 deletions(-) diff --git a/beetsplug/vocadb.py b/beetsplug/vocadb.py index 087b9f9..b875d5c 100644 --- a/beetsplug/vocadb.py +++ b/beetsplug/vocadb.py @@ -1,15 +1,17 @@ from datetime import datetime from json import load from re import match, search -from typing import NamedTuple +from typing import Any, NamedTuple, Optional, Sequence from urllib.error import HTTPError from urllib.parse import quote, urljoin from urllib.request import Request, urlopen import beets from beets import autotag, config, library, ui, util -from beets.autotag.hooks import AlbumInfo, TrackInfo +from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance +from beets.library import Item, Library from beets.plugins import BeetsPlugin, apply_item_changes, get_distance +from beets.ui import show_model_changes, Subcommand USER_AGENT = f"beets/{beets.__version__} +https://beets.io/" HEADERS = {"accept": "application/json", "User-Agent": USER_AGENT} @@ -22,13 +24,13 @@ class VocaDBInstance(NamedTuple): class VocaDBPlugin(BeetsPlugin): - def __init__(self): + def __init__(self) -> None: super().__init__() - self.data_source = "VocaDB" - self.instance = VocaDBInstance( - base_url = "https://vocadb.net/", - api_url = "https://vocadb.net/api/", - subcommand = "vdbsync" + self.data_source: str = "VocaDB" + self.instance: VocaDBInstance = VocaDBInstance( + base_url="https://vocadb.net/", + api_url="https://vocadb.net/api/", + subcommand="vdbsync", ) self.config.add( { @@ -38,8 +40,11 @@ def __init__(self): } ) - def commands(self): - cmd = ui.Subcommand(self.instance.subcommand, help=f"update metadata from {self.data_source}") + def commands(self) -> list[Subcommand]: + cmd: Subcommand = Subcommand( + self.instance.subcommand, + help=f"update metadata from {self.data_source}", + ) cmd.parser.add_option( "-p", "--pretend", @@ -72,35 +77,39 @@ def commands(self): cmd.func = self.func return [cmd] - def func(self, lib, opts, args): + def func(self, lib: Library, opts, args) -> None: """Command handler for the *dbsync function.""" - move = ui.should_move(opts.move) + move: bool = ui.should_move(opts.move) pretend = opts.pretend - write = ui.should_write(opts.write) + write: bool = ui.should_write(opts.write) query = ui.decargs(args) self.singletons(lib, query, move, pretend, write) self.albums(lib, query, move, pretend, write) - def singletons(self, lib, query, move, pretend, write): + def singletons(self, lib: Library, query, move: bool, pretend, write: bool) -> None: """Retrieve and apply info from the autotagger for items matched by query. """ for item in lib.items(query + ["singleton:true"]): - item_formatted = format(item) + item_formatted: str = format(item) if not item.mb_trackid: self._log.debug( - "Skipping singleton with no mb_trackid: {0}", item_formatted + "Skipping singleton with no mb_trackid: {0}", + item_formatted, ) continue if not ( - item.get("data_source") == self.data_source and item.mb_trackid.isnumeric() + item.get("data_source") == self.data_source + and item.mb_trackid.isnumeric() ): self._log.debug( - "Skipping non-{0} singleton: {1}", self.data_source, item_formatted + "Skipping non-{0} singleton: {1}", + self.data_source, + item_formatted, ) continue - track_info = self.track_for_id(item.mb_trackid) + track_info: Optional[TrackInfo] = self.track_for_id(item.mb_trackid) if not track_info: self._log.info( "Recording ID not found: {0} for track {0}", @@ -110,29 +119,33 @@ def singletons(self, lib, query, move, pretend, write): continue with lib.transaction(): autotag.apply_item_metadata(item, track_info) - ui.show_model_changes(item) + show_model_changes(item) apply_item_changes(lib, item, move, pretend, write) - def albums(self, lib, query, move, pretend, write): + def albums(self, lib: Library, query, move: bool, pretend, write: bool) -> None: """Retrieve and apply info from the autotagger for albums matched by query and their items. """ for album in lib.albums(query): - album_formatted = format(album) + album_formatted: str = format(album) if not album.mb_albumid: self._log.debug( - "Skipping album with no mb_albumid: {0}", album_formatted + "Skipping album with no mb_albumid: {0}", + album_formatted, ) continue - items = list(album.items()) + items: Sequence[Item] = album.items() if not ( - album.get("data_source") == self.data_source and album.mb_albumid.isnumeric() + album.get("data_source") == self.data_source + and album.mb_albumid.isnumeric() ): self._log.debug( - "Skipping non-{0} album: {1}", self.data_source, album_formatted + "Skipping non-{0} album: {1}", + self.data_source, + album_formatted, ) continue - album_info = self.album_for_id(album.mb_albumid) + album_info: Optional[AlbumInfo] = self.album_for_id(album.mb_albumid) if not album_info: self._log.info( "Release ID {0} not found for album {1}", @@ -140,12 +153,14 @@ def albums(self, lib, query, move, pretend, write): album_formatted, ) continue - trackid_to_trackinfo = { + trackid_to_trackinfo: dict[Optional[str], TrackInfo] = { track.track_id: track for track in album_info.tracks } - library_trackid_to_item = {item.mb_trackid: item for item in items} - mapping = {} - missing_tracks = [] + library_trackid_to_item: dict[Optional[str], Item] = { + item.mb_trackid: item for item in items + } + mapping: dict[Item, TrackInfo] = {} + missing_tracks: list[Optional[str]] = [] for track_id, item in library_trackid_to_item.items(): if track_id in trackid_to_trackinfo: mapping[item] = trackid_to_trackinfo[track_id] @@ -154,23 +169,26 @@ def albums(self, lib, query, move, pretend, write): self._log.debug( "Missing track ID {0} in album info for {1}", track_id, - album_formatted + album_formatted, ) if missing_tracks: self._log.warning( - "The following track IDs were missing in the VocaDB album info for {0}: {1}", + "The following track IDs were missing in the VocaDB album \ + info for {0}: {1}", album_formatted, - ', '.join(missing_tracks) + ", ".join( + str(track) for track in missing_tracks if track is not None + ), ) self._log.debug("applying changes to {}", album_formatted) with lib.transaction(): autotag.apply_metadata(album_info, mapping) - changed = False - any_changed_item = items[0] + changed: bool = False + any_changed_item: Item = items[0] for item in items: - item_changed = ui.show_model_changes(item) + item_changed: bool = show_model_changes(item) changed |= item_changed if item_changed: any_changed_item = item @@ -191,29 +209,42 @@ def albums(self, lib, query, move, pretend, write): self._log.debug("moving album {0}", album_formatted) album.move() - def track_distance(self, item, info): + def track_distance(self, item: Item, info: TrackInfo) -> Distance: """Returns the track distance.""" return get_distance(data_source=self.data_source, info=info, config=self.config) - def album_distance(self, items, album_info, mapping): + def album_distance( + self, + items: Sequence[Item], + album_info: AlbumInfo, + mapping: dict[Item, TrackInfo], + ) -> Distance: """Returns the album distance.""" return get_distance( data_source=self.data_source, info=album_info, config=self.config ) - def candidates(self, items, artist, album, va_likely, extra_tags=None): + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + extra_tags: Optional[dict] = None, + ) -> list[Optional[AlbumInfo]]: self._log.debug("Searching for album {0}", album) - url = urljoin( + url: str = urljoin( self.instance.api_url, f"albums/?query={quote(album)}&maxResults=5&nameMatchMode=Auto", ) - request = Request(url, headers=HEADERS) + request: Request = Request(url, headers=HEADERS) try: with urlopen(request) as result: if result: result = load(result) - # songFields parameter doesn't exist for album search so we'll get albums by their id - ids = [x["id"] for x in result["items"]] + # songFields parameter doesn't exist for album search + # so we'll get albums by their id + ids: list[str] = [str(x["id"]) for x in result["items"]] return [album for album in map(self.album_for_id, ids) if album] else: self._log.debug("API Error: Returned empty page (query: {0})", url) @@ -222,17 +253,20 @@ def candidates(self, items, artist, album, va_likely, extra_tags=None): self._log.debug("API Error: {0} (query: {1})", e, url) return [] - def item_candidates(self, item, artist, title): + def item_candidates( + self, item: Item, artist: str, title: str + ) -> list[Optional[TrackInfo]]: self._log.debug("Searching for track {0}", item) - language = self.get_lang(config["import"]["languages"].as_str_seq()) - url = urljoin( + language: str = self.get_lang(config["import"]["languages"].as_str_seq()) + url: str = urljoin( self.instance.api_url, f"songs/?query={quote(title)}" + f"&fields={self.get_song_fields()}" + f"&lang={language}" - + "&maxResults=5&sort=SongType&preferAccurateMatches=true&nameMatchMode=Auto", + + "&maxResults=5&sort=SongType\ + &preferAccurateMatches=true&nameMatchMode=Auto", ) - request = Request(url, headers=HEADERS) + request: Request = Request(url, headers=HEADERS) try: with urlopen(request) as result: if result: @@ -249,10 +283,10 @@ def item_candidates(self, item, artist, title): self._log.debug("API Error: {0} (query: {1})", e, url) return [] - def album_for_id(self, album_id): + def album_for_id(self, album_id: Optional[str]) -> Optional[AlbumInfo]: self._log.debug("Searching for album {0}", album_id) - language = self.get_lang(config["import"]["languages"].as_str_seq()) - url = urljoin( + language: str = self.get_lang(config["import"]["languages"].as_str_seq()) + url: str = urljoin( self.instance.api_url, f"albums/{album_id}" + "?fields=Artists,Discs,Tags,Tracks,WebLinks" @@ -267,21 +301,21 @@ def album_for_id(self, album_id): return self.album_info(result, search_lang=language) else: self._log.debug("API Error: Returned empty page (query: {0})", url) - return + return None except HTTPError as e: self._log.debug("API Error: {0} (query: {1})", e, url) - return + return None - def track_for_id(self, track_id): + def track_for_id(self, track_id: Optional[str]) -> Optional[TrackInfo]: self._log.debug("Searching for track {0}", track_id) - language = self.get_lang(config["import"]["languages"].as_str_seq()) - url = urljoin( + language: str = self.get_lang(config["import"]["languages"].as_str_seq()) + url: str = urljoin( self.instance.api_url, f"songs/{track_id}" + f"?fields={self.get_song_fields()}" + f"&lang={language}", ) - request = Request(url, headers=HEADERS) + request: Request = Request(url, headers=HEADERS) try: with urlopen(request) as result: if result: @@ -289,19 +323,21 @@ def track_for_id(self, track_id): return self.track_info(result, search_lang=language) else: self._log.debug("API Error: Returned empty page (query: {0})", url) - return + return None except HTTPError as e: self._log.debug("API Error: {0} (query: {1})", e, url) - return + return None - def album_info(self, release, search_lang=None): - discs = len(set([x["discNumber"] for x in release["tracks"]])) + def album_info( + self, release: dict[str, Any], search_lang: Optional[str] = None + ) -> AlbumInfo: + discs: int = len(set([x["discNumber"] for x in release["tracks"]])) if not release["discs"]: release["discs"] = [ {"discNumber": x + 1, "name": "CD", "mediaType": "Audio"} for x in range(discs) ] - ignored_discs = [] + ignored_discs: list[int] = [] for x in release["discs"]: if ( x["mediaType"] == "Video" @@ -319,16 +355,16 @@ def album_info(self, release, search_lang=None): key=lambda y: y["trackNumber"], )["trackNumber"] - va = release.get("discType", "") == "Compilation" - album = release["name"] - album_id = str(release["id"]) + va: bool = release.get("discType", "") == "Compilation" + album: Optional[str] = release["name"] + album_id: Optional[str] = str(release["id"]) artist_categories, artist = self.get_artists( release["artists"], album=True, comp=va ) if artist == "Various artists": va = True - artists = [] - artists_ids = [] + artists: list[str] = [] + artists_ids: list[str] = [] for category in artist_categories.values(): artists.extend( [artist for artist in category.keys() if artist not in artists] @@ -336,41 +372,43 @@ def album_info(self, release, search_lang=None): artists_ids.extend( [id for id in category.values() if id not in artists_ids] ) + artist_id: Optional[str] = None try: artist_id = artists_ids[0] except IndexError: - artist_id = None + pass tracks, script, language = self.get_album_track_infos( release["tracks"], release["discs"], ignored_discs, search_lang ) - asin = None + asin: Optional[str] = None for x in release.get("webLinks", []): if not x["disabled"] and match( "Amazon( \\((LE|RE|JP|US)\\).*)?$", x["description"] ): - asin = search("\\/dp\\/(.+?)(\\/|$)", x["url"]) + asin = str(search("\\/dp\\/(.+?)(\\/|$)", x["url"])) if asin: asin = asin[1] break - albumtype = release.get("discType", "").lower() - albumtypes = [albumtype] - date = release.get("releaseDate", {}) - year = date.get("year", None) - month = date.get("month", None) - day = date.get("day", None) - label = None + albumtype: Optional[str] = release.get("discType", "").lower() + albumtypes: Optional[list[str]] = [albumtype] if albumtype else None + date: dict[str, Any] = release.get("releaseDate", {}) + year: Optional[int] = date.get("year", None) + month: Optional[int] = date.get("month", None) + day: Optional[int] = date.get("day", None) + label: Optional[str] = None for x in release.get("artists", []): if "Label" in x.get("categories", ""): label = x["name"] break - mediums = len(release["discs"]) - catalognum = release.get("catalogNumber", None) - genre = self.get_genres(release) + mediums: Optional[int] = len(release["discs"]) + catalognum: Optional[str] = release.get("catalogNumber", None) + genre: Optional[str] = self.get_genres(release) + media: Optional[str] try: media = release["discs"][0]["name"] except IndexError: media = None - data_url = urljoin(self.instance.base_url, f"Al/{album_id}") + data_url: Optional[str] = urljoin(self.instance.base_url, f"Al/{album_id}") return AlbumInfo( album=album, album_id=album_id, @@ -399,19 +437,19 @@ def album_info(self, release, search_lang=None): def track_info( self, - recording, - index=None, - media=None, - medium=None, - medium_index=None, - medium_total=None, - search_lang=None, - ): - title = recording["name"] - track_id = str(recording["id"]) + recording: dict[str, Any], + index: Optional[int] = None, + media: Optional[str] = None, + medium: Optional[int] = None, + medium_index: Optional[int] = None, + medium_total: Optional[int] = None, + search_lang: Optional[str] = None, + ) -> TrackInfo: + title: str = recording["name"] + track_id: str = str(recording["id"]) artist_categories, artist = self.get_artists(recording["artists"]) - artists = [] - artists_ids = [] + artists: list[str] = [] + artists_ids: list[str] = [] for category in artist_categories.values(): artists.extend( [artist for artist in category.keys() if artist not in artists] @@ -419,29 +457,29 @@ def track_info( artists_ids.extend( [id for id in category.values() if id not in artists_ids] ) + artist_id: Optional[str] = None try: artist_id = artists_ids[0] except IndexError: - artist_id = None - arranger = ", ".join(artist_categories["arrangers"]) - composer = ", ".join(artist_categories["composers"]) - lyricist = ", ".join(artist_categories["lyricists"]) - length = recording.get("lengthSeconds", 0) - data_url = urljoin(self.instance.base_url, f"S/{track_id}") - bpm = str(recording.get("maxMilliBpm", 0) // 1000) - genre = self.get_genres(recording) + pass + arranger: str = ", ".join(artist_categories["arrangers"]) + composer: str = ", ".join(artist_categories["composers"]) + lyricist: str = ", ".join(artist_categories["lyricists"]) + length: float = recording.get("lengthSeconds", 0) + data_url: str = urljoin(self.instance.base_url, f"S/{track_id}") + bpm: str = str(recording.get("maxMilliBpm", 0) // 1000) + genre: str = self.get_genres(recording) script, language, lyrics = self.get_lyrics( recording.get("lyrics", {}), search_lang ) + original_day: Optional[int] = None + original_month: Optional[int] = None + original_year: Optional[int] = None if "publishDate" in recording: - date = datetime.fromisoformat(recording["publishDate"][:-1]) + date: datetime = datetime.fromisoformat(recording["publishDate"][:-1]) original_day = date.day original_month = date.month original_year = date.year - else: - original_day = None - original_month = None - original_year = None return TrackInfo( title=title, track_id=track_id, @@ -471,16 +509,22 @@ def track_info( original_year=original_year, ) - def get_album_track_infos(self, tracks, discs, ignored_discs, search_lang): - track_infos = [] - script = None - language = None + def get_album_track_infos( + self, + tracks: list[TrackInfo], + discs: list[dict[str, Any]], + ignored_discs: list[int], + search_lang: Optional[str], + ) -> tuple[list[TrackInfo], Optional[str], Optional[str]]: + track_infos: list[TrackInfo] = [] + script: Optional[str] = None + language: Optional[str] = None for index, track in enumerate(tracks): if track["discNumber"] in ignored_discs or "song" not in track: continue - format = discs[track["discNumber"] - 1]["name"] - total = discs[track["discNumber"] - 1]["total"] - track_info = self.track_info( + format: Optional[str] = discs[track["discNumber"] - 1]["name"] + total: Optional[int] = discs[track["discNumber"] - 1]["total"] + track_info: TrackInfo = self.track_info( track["song"], index=index + 1, media=format, @@ -503,11 +547,13 @@ def get_album_track_infos(self, tracks, discs, ignored_discs, search_lang): track.language = language return track_infos, script, language - def get_song_fields(self): + def get_song_fields(self) -> str: return "Artists,Tags,Bpm,Lyrics" - def get_artists(self, artists, album=False, comp=False): - out = { + def get_artists( + self, artists: list[dict[str, Any]], album=False, comp=False + ) -> tuple[dict[str, dict[str, Any]], str]: + out: dict[str, dict[str, Any]] = { "producers": {}, "circles": {}, "vocalists": {}, @@ -516,26 +562,31 @@ def get_artists(self, artists, album=False, comp=False): "lyricists": {}, } for artist in artists: - parent = artist.get("artist", {}) + parent: dict[str, str] = artist.get("artist", {}) + name: str + id: str if parent: name = parent.get("name", "") id = str(parent.get("id", "")) else: name = artist.get("name", "") id = "" - if "Producer" in artist["categories"] or "Band" in artist["categories"]: + categories: str = artist["categories"] + effectiveRoles: str = artist["effectiveRoles"] + if "Producer" in categories or "Band" in categories: if "Default" in artist["effectiveRoles"]: artist["effectiveRoles"] += ",Arranger,Composer,Lyricist" + effectiveRoles = artist["effectiveRoles"] out["producers"][name] = id - if "Circle" in artist["categories"]: + if "Circle" in categories: out["circles"][name] = id - if "Arranger" in artist["effectiveRoles"]: + if "Arranger" in effectiveRoles: out["arrangers"][name] = id - if "Composer" in artist["effectiveRoles"]: + if "Composer" in effectiveRoles: out["composers"][name] = id - if "Lyricist" in artist["effectiveRoles"]: + if "Lyricist" in effectiveRoles: out["lyricists"][name] = id - if "Vocalist" in artist["categories"] and not artist["isSupport"]: + if "Vocalist" in categories and not artist["isSupport"]: out["vocalists"][name] = id if not out["producers"] and out["vocalists"]: out["producers"] = out["vocalists"] @@ -547,19 +598,19 @@ def get_artists(self, artists, album=False, comp=False): out["lyricists"] = out["producers"] if comp or len(out["producers"]) > 5: return out, "Various artists" - artistString = ", ".join( + artistString: str = ", ".join( list(out["producers"].keys()) + list(out["circles"].keys()) ) if not album and out["vocalists"]: - featuring = [ + featuring: list[str] = [ name for name in out["vocalists"] if name not in out["producers"] ] if featuring: artistString += " feat. " + ", ".join(featuring) return out, artistString - def get_genres(self, info): - genres = [] + def get_genres(self, info: dict[str, Any]) -> str: + genres: list[str] = [] for tag in sorted(info.get("tags", {}), reverse=True, key=lambda x: x["count"]): if tag["tag"]["categoryName"] == "Genres": genres.append(tag["tag"]["name"].title()) @@ -576,10 +627,12 @@ def get_lang(self, languages): return "English" return "English" - def get_lyrics(self, lyrics, language): - out_script = None - out_language = None - out_lyrics = None + def get_lyrics( + self, lyrics: list[dict[str, Any]], language: Optional[str] + ) -> tuple[Optional[str], Optional[str], Optional[str]]: + out_script: Optional[str] = None + out_language: Optional[str] = None + out_lyrics: Optional[str] = None for x in lyrics: if "en" in x["cultureCodes"]: if x["translationType"] == "Original": @@ -591,7 +644,7 @@ def get_lyrics(self, lyrics, language): if x["translationType"] == "Original": out_script = "Jpan" out_language = "jpn" - if not self.config["translated_lyrics"] and language == "Japanese": + if not (self.config["translated_lyrics"] and language == "Japanese"): out_lyrics = x["value"] if ( not self.config["translated_lyrics"] @@ -603,7 +656,9 @@ def get_lyrics(self, lyrics, language): out_lyrics = self.get_fallback_lyrics(lyrics, language) return out_script, out_language, out_lyrics - def get_fallback_lyrics(self, lyrics, language): + def get_fallback_lyrics( + self, lyrics: list[dict[str, Any]], language: Optional[str] + ) -> Optional[str]: if language == "English": for x in lyrics: if "en" in x["cultureCodes"]: