diff --git a/dockerignore b/.dockerignore similarity index 58% rename from dockerignore rename to .dockerignore index be89f67..0ad3229 100644 --- a/dockerignore +++ b/.dockerignore @@ -1,2 +1,4 @@ .git +.github +tests/ .deploy_tmp_path \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 6c97445..e13e5e6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,32 @@ -# 基于Python3.9 alpine镜像 -FROM python:3.9.17-alpine3.18 +# 第一阶段:安装GCC +FROM python:3.12.1-alpine as gcc_installer + +# 安装GCC +RUN apk add --no-cache gcc musl-dev + +# 第二阶段:安装Python依赖 +FROM gcc_installer as requirements_installer # 设置工作目录 WORKDIR /app -# 将源代码复制到Docker镜像中 -COPY ./LrcApi /app +# 只复制 requirements.txt,充分利用 Docker 缓存层 +COPY ./LrcApi/requirements.txt /app/ + +# 安装Python依赖 +RUN pip install --no-user --prefix=/install -r requirements.txt + +# 第三阶段:运行环境 +FROM python:3.12.1-alpine -# 安装Python项目依赖 -RUN pip install -r /app/requirements.txt +# 设置工作目录 +WORKDIR /app + +# 复制Python依赖 +COPY --from=requirements_installer /install /usr/local + +# 复制项目代码 +COPY ./LrcApi /app # 设置启动命令 -CMD ["python", "/app/app.py"] \ No newline at end of file +CMD ["python", "/app/app.py"] diff --git a/app.py b/app.py index b3ebc4e..1f456a4 100644 --- a/app.py +++ b/app.py @@ -1,17 +1,18 @@ -import hashlib -import logging import os -from urllib.parse import unquote_plus - import shutil +import hashlib +import logging import requests -from flask import Flask, request, abort, redirect, send_from_directory, Response, jsonify, render_template_string, \ +import concurrent.futures + +from flask import Flask, Blueprint, request, abort, redirect, send_from_directory, jsonify, render_template_string, \ make_response from flask_caching import Cache from waitress import serve -import concurrent.futures +from urllib.parse import unquote_plus -from mod import search, lrc, tags +from mod import search, lrc +from mod import tag from mod.auth import webui, cookie from mod.auth.authentication import require_auth from mod.args import GlobalArgs @@ -19,6 +20,11 @@ args = GlobalArgs() app = Flask(__name__) +v1_bp = Blueprint('v1', __name__, url_prefix='/api/v1') + +# Blueprint直接复制app配置项 +v1_bp.config = app.config.copy() + # 缓存逻辑 cache_dir = './flask_cache' try: @@ -26,6 +32,7 @@ shutil.rmtree(cache_dir) except FileNotFoundError: pass +# 定义缓存逻辑为本地文件缓存,目录为cache_dir = './flask_cache' cache = Cache(app, config={ 'CACHE_TYPE': 'filesystem', 'CACHE_DIR': cache_dir @@ -78,6 +85,7 @@ def read_file_with_encoding(file_path, encodings): @app.route('/lyrics', methods=['GET']) +@v1_bp.route('/lyrics/single', methods=['GET']) @cache.cached(timeout=86400, key_prefix=make_cache_key) def lyrics(): match require_auth(request=request): @@ -97,7 +105,7 @@ def lyrics(): if file_content is not None: return lrc.standard(file_content) try: - lrc_in = tags.r_lrc(path) + lrc_in = tag.tout(path).get("lyrics", "") if type(lrc_in) is str and len(lrc_in) > 0: return lrc_in except: @@ -117,6 +125,7 @@ def lyrics(): @app.route('/jsonapi', methods=['GET']) +@v1_bp.route('/lyrics/advance', methods=['GET']) @cache.cached(timeout=86400, key_prefix=make_cache_key) def lrc_json(): match require_auth(request=request): @@ -126,8 +135,8 @@ def lrc_json(): return render_template_string(webui.error()), 421 if not bool(request.args): abort(404, "请携带参数访问") - path = unquote_plus(request.args.get('path')) - title = unquote_plus(request.args.get('title')) + path = unquote_plus(request.args.get('path', '')) + title = unquote_plus(request.args.get('title', '')) artist = unquote_plus(request.args.get('artist', '')) album = unquote_plus(request.args.get('album', '')) response = [] @@ -154,10 +163,13 @@ def lrc_json(): "artist": artist, "lyrics": i }) + _response = jsonify(response) + _response.headers['Content-Type'] = 'application/json; charset=utf-8' return jsonify(response) @app.route('/cover', methods=['GET']) +@v1_bp.route('/cover', methods=['GET']) @cache.cached(timeout=86400, key_prefix=make_cache_key) def cover_api(): req_args = {key: request.args.get(key) for key in request.args} @@ -185,9 +197,10 @@ def validate_json_structure(data): return True -@app.route('/tag', methods=['POST']) +@app.route('/tag', methods=['GET', 'POST', 'PUT']) +@v1_bp.route('/tag', methods=['GET', 'POST', 'PUT']) def setTag(): - match require_auth(request=request): + match require_auth(request=request, permission='rw'): case -1: return render_template_string(webui.error()), 403 case -2: @@ -205,41 +218,68 @@ def setTag(): return "File not found.", 404 supported_tags = { - "title": "title", - "artist": "artist", - "album": "album", - "lyrics": "lyrics" + "tracktitle": {"allow": (str, bool, type(None)), "caption": "Track Title"}, + "artist": {"allow": (str, bool, type(None)), "caption": "Artists"}, + "album": {"allow": (str, bool, type(None)), "caption": "Albums"}, + "year": {"allow": (int, bool, type(None)), "caption": "Album year"}, + "lyrics": {"allow": (str, bool, type(None)), "caption": "Lyrics text"} } - - tags_to_set = {supported_tags[key]: value for key, value in musicData.items() if key in supported_tags} - result = tags.w_file(audio_path, tags_to_set) - if result == 0: - return "OK", 200 - elif result == -1: - return "Failed to write lyrics", 523 - elif result == -2: - return "Failed to write tags", 524 - else: - return "Unknown error", 525 + if "title" in musicData and "tracktitle" not in musicData: + musicData["tracktitle"] = musicData["title"] + tags_to_set = {} + for key, value in musicData.items(): + if key in supported_tags and isinstance(value, supported_tags[key]["allow"]): + tags_to_set[key] = value + try: + tag.tin(tags=tags_to_set, file=audio_path) + except TypeError as e: + return str(e), 524 + except FileNotFoundError as e: + return str(e), 404 + except Exception as e: + return str(e), 500 + return "Succeed", 200 @app.route('/') def redirect_to_welcome(): + """ + 重定向至/src,显示主页 + :return: + """ return redirect('/src') @app.route('/favicon.ico') def favicon(): + """ + favicon位置,返回图片 + :return: + """ return send_from_directory('src', 'img/Logo_Design.svg') @app.route('/src') def return_index(): + """ + 显示主页 + :return: index page + """ return send_from_directory('src', 'index.html') @app.route('/src/') def serve_file(filename): + """ + 路由/src/ + 路径下的静态资源 + :param filename: + :return: + """ + FORBIDDEN_EXTENSIONS = ('.exe', '.bat', '.dll', '.sh', '.so', '.php', '.sql', '.db', '.mdb', '.gz', '.tar', '.bak', + '.tmp', '.key', '.pem', '.crt', '.csr', '.log') + if filename.lower().endswith(FORBIDDEN_EXTENSIONS): + abort(404) try: return send_from_directory('src', filename) except FileNotFoundError: @@ -248,6 +288,11 @@ def serve_file(filename): @app.route('/login') def login_check(): + """ + 登录页面 + 未登录时返回页面,已登录时重定向至主页 + :return: + """ if require_auth(request=request) < 0 and args.auth: return render_template_string(webui.html_login()) @@ -256,6 +301,12 @@ def login_check(): @app.route('/login-api', methods=['POST']) def login_api(): + """ + 登录对接的API + 包括验证和下发Cookie + success提示成功与否 + :return: + """ data = request.get_json() if 'password' in data: pwd = data['password'] @@ -269,12 +320,18 @@ def login_api(): def main(): + # Waitress WSGI 服务器 serve(app, host=args.ip, port=args.port, threads=32, channel_timeout=30) + # Debug服务器 # app.run(host='0.0.0.0', port=args.port) if __name__ == '__main__': + # 日志配置 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger('') logger.info("正在启动服务器") + # 注册 Blueprint 到 Flask 应用 + app.register_blueprint(v1_bp) + # 启动 main() diff --git a/mod/music_tag/__init__.py b/mod/music_tag/__init__.py new file mode 100644 index 0000000..3ad7bf8 --- /dev/null +++ b/mod/music_tag/__init__.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# This whole module is just id3 tag part of MusicBrainz Picard +# The idea is to get Musicbrainz's layer on top of mutagen without +# dependancies (like qt) + +import logging +import os + +import mutagen + +from . import file +from . import util +from . import aac +from . import aiff +from . import apev2 +from . import asf +from . import dsf +from . import flac +from . import id3 +from . import mp4 +from . import smf +from . import vorbis +from . import wave + +from .file import Artwork, MetadataItem, NotAppendable, AudioFile + +__version__ = """0.4.3""" + +logger = logging.getLogger("music_tag") +log = logger + + +def _subclass_spider_dfs(kls, _lst=None): + if _lst is None: + _lst = [] + for sub in kls.__subclasses__(): + _subclass_spider_dfs(sub, _lst=_lst) + _lst.append(kls) + return _lst + + +def load_file(file_spec, err='raise'): + if isinstance(file_spec, mutagen.FileType): + mfile = file_spec + filename = mfile.filename + else: + filename = file_spec + if not os.path.exists(filename): + if os.path.exists(os.path.expanduser(os.path.expandvars(filename))): + filename = os.path.expanduser(os.path.expandvars(filename)) + elif os.path.exists(os.path.expanduser(filename)): + filename = os.path.expanduser(filename) + mfile = mutagen.File(filename, easy=False) + + ret = None + + for kls in _subclass_spider_dfs(file.AudioFile): + # print("checking against:", kls, kls.mutagen_kls) + if kls.mutagen_kls is not None and isinstance(mfile, kls.mutagen_kls): + ret = kls(filename, _mfile=mfile) + break + + if ret is None and err == 'raise': + raise NotImplementedError("Mutagen type {0} not implemented" + "".format(type(mfile))) + + return ret + + +__all__ = ['file', 'util', + 'aac', 'aiff', 'apev2', 'asf', 'dsf', 'flac', + 'id3', 'mp4', 'smf', 'vorbis', 'wave', + 'logger', 'log', + 'Artwork', 'MetadataItem', 'NotAppendable', + 'AudioFile', + 'load_file', + ] + +## +## EOF +## diff --git a/mod/music_tag/__main__.py b/mod/music_tag/__main__.py new file mode 100644 index 0000000..9badef0 --- /dev/null +++ b/mod/music_tag/__main__.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python +# pylint: disable=unused-import +r"""CLI for music_tag + +Examples + + Printing tags: + + $ # Print tags from all audio files in sample directory + $ python -m music_tag --print ./sample + + $ # Print specific tags from all audio files in sample directory + $ python -m music_tag --print --tags="Title : Album" ./sample + + $ # Write tags from all audio files in sample directory to a csv file + $ python -m music_tag --to-csv tags.csv ./sample + + $ # Write specific tags from all audio files in sample directory to a csv file + $ python -m music_tag --tags="Title : Album" --to-csv tags.csv ./sample + + Setting tags: + + $ # Set a couple tags for multiple files + $ python -m music_tag --set "genre:Pop" --set "comment:cli test" \ + $ ./sample/440Hz.aac ./sample/440Hz.flac + + $ # Write tags from csv file to audio files (assuming file paths in + $ # the csv file are relative to the sample directory + $ python -m music_tag --from-csv tags.csv +""" + +from __future__ import print_function +import argparse +from argparse import RawTextHelpFormatter +import csv +import fnmatch +import os +import sys + +from .. import music_tag + + +_audio_pattern = ['*.wav', '*.aac', '*.aiff', '*.dsf', '*.flac', + '*.m4a', '*.mp3', '*.ogg', '*.opus', '*.wv'] + +_default_tags = ('Disc Number : Total Discs : Track Number : Total Tracks ' + ': Title : Artist : Album : Album Artist ' + ': Year : Genre : Comment') + + +def _expand_files(files): + ret = [] + for f in files: + if os.path.isdir(f): + # walk directory looking for music files + for root, dirs, files in os.walk(f): + for pattern in _audio_pattern: + for filename in fnmatch.filter(files, pattern): + ret.append(os.path.join(root, filename)) + else: + ret.append(f) + return ret + + +def _main(): + parser = argparse.ArgumentParser(prog='python -m music_tag', + description=__doc__, + formatter_class=RawTextHelpFormatter) + action_group = parser.add_mutually_exclusive_group(required=True) + action_group.add_argument('--version', action='version', + version='music-tag ' + music_tag.__version__) + action_group.add_argument('--print', action='store_true', + help='print tags') + action_group.add_argument('--set', action='append', default=[], + help='set tag') + action_group.add_argument('--to-csv', action='store', + help='write tags to csv file') + action_group.add_argument('--from-csv', action='store', + help='write tags from csv file') + + parser.add_argument('--tags', action='store', default=_default_tags, + help='tags to print') + parser.add_argument('-I', '--ignore-missing', action='store_true', + help='ignore missing audio files when using from-csv') + parser.add_argument('-D', '--csv-dialect', action='store', default='excel', + help='csv file dialect (excel | excel_tab | unix)') + parser.add_argument('--resolve', action='store_true', + help='Use resolve to discern missing tags') + parser.add_argument('files', nargs='*') + + args = parser.parse_args() + + + if args.print: + print() + fnames = _expand_files(args.files) + tags = [t.strip() for t in args.tags.split(':')] + + for fname in fnames: + f = music_tag.load_file(fname) + print(f.info(tags=tags, show_empty=True, resolve=args.resolve)) + print() + + if args.set: + set_key_vals = [s.split(':') for s in args.set] + set_key_vals = [(kv[0], ':'.join(kv[1:])) for kv in set_key_vals] + + fnames = _expand_files(args.files) + for fname in fnames: + mt_f = music_tag.load_file(fname) + for kv in set_key_vals: + key, val = kv[0], kv[1] + if val: + mt_f[key] = val + else: + del mt_f[key] + mt_f.save() + + if args.to_csv: + fnames = _expand_files(args.files) + tags = [t.strip() for t in args.tags.split(':')] + + with open(args.to_csv, 'w', newline='') as fout: + csvwriter = csv.writer(fout, delimiter=',', quotechar='"', + dialect=args.csv_dialect, + quoting=csv.QUOTE_MINIMAL) + csvwriter.writerow(tags + ['filename']) + for fname in fnames: + mt_f = music_tag.load_file(fname) + if args.resolve: + row = [mt_f.resolve(k) for k in tags] + [fname] + else: + row = [mt_f[k] for k in tags] + [fname] + csvwriter.writerow(row) + + if args.from_csv: + pth0 = '' + if args.files and os.path.isdir(args.files[0]): + pth0 = args.files[0] + + with open(args.from_csv, newline='') as fin: + csvreader = csv.reader(fin, delimiter=',', quotechar='"', + dialect=args.csv_dialect) + tags = [] + for row in csvreader: + if not tags: + tags = row[:-1] + else: + fname = row[-1] + if pth0: + fname = os.path.join(pth0, fname) + + if os.path.isfile(fname): + print('editing', fname) + else: + if args.ignore_missing: + print('missing file', fname, '; continuing anyway') + continue + else: + print('missing file', fname, '; stopping now') + return 1 + + mt_f = music_tag.load_file(fname) + for key, val in zip(tags, row[:-1]): + if val: + mt_f[key] = val + else: + del mt_f[key] + mt_f.save() + + return 0 + +if __name__ == "__main__": + sys.exit(_main()) + +## +## EOF +## diff --git a/mod/music_tag/aac.py b/mod/music_tag/aac.py new file mode 100644 index 0000000..40cfd03 --- /dev/null +++ b/mod/music_tag/aac.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# coding: utf-8 + +import mutagen.aac + +from .file import TAG_MAP_ENTRY +from .apev2 import Apev2File + + +class AacFile(Apev2File): + tag_format = "AAC" + mutagen_kls = mutagen.aac.AAC + + _TAG_MAP = Apev2File._TAG_MAP.copy() + _TAG_MAP.update({ + }) diff --git a/mod/music_tag/aiff.py b/mod/music_tag/aiff.py new file mode 100644 index 0000000..2808f30 --- /dev/null +++ b/mod/music_tag/aiff.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# coding: utf-8 + +import mutagen.aiff + +from .file import TAG_MAP_ENTRY +from .id3 import Id3File + + +class AiffFile(Id3File): + tag_format = "AIFF" + mutagen_kls = mutagen.aiff.AIFF + + def __init__(self, filename, **kwargs): + super(AiffFile, self).__init__(filename, **kwargs) + + self.tag_map = self.tag_map.copy() + self.tag_map.update({ + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'aiff', + type=str), + '#bitspersample': TAG_MAP_ENTRY(getter='sample_size', type=int), + }) diff --git a/mod/music_tag/apev2.py b/mod/music_tag/apev2.py new file mode 100644 index 0000000..0ecff1e --- /dev/null +++ b/mod/music_tag/apev2.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# coding: utf-8 + +import base64 + +import mutagen.apev2 +import mutagen.wavpack +import mutagen.musepack +import mutagen.monkeysaudio +import mutagen.optimfrog +from mutagen.id3 import PictureType + +from . import util +from .file import Artwork, AudioFile, MetadataItem, TAG_MAP_ENTRY + + +# FIXME: find more complete mapping between id3 tag pic types and apev2 tags +pic_type2tag = { + PictureType.COVER_FRONT: 'Cover Art (Front)', + PictureType.COVER_BACK: 'Cover Art (Back)', +} +pic_tag2type = {} +for key, val in pic_type2tag.items(): + pic_tag2type[val] = key +del key, val + + +def get_tracknum(afile, norm_key): + return util.get_easy_tracknum(afile, norm_key, _tag_name='Track') +def set_tracknum(afile, norm_key, val): + return util.set_easy_tracknum(afile, norm_key, val, _tag_name='Track') +def get_totaltracks(afile, norm_key): + return util.get_easy_totaltracks(afile, norm_key, _tag_name='Track') +def set_totaltracks(afile, norm_key, val): + return util.set_easy_totaltracks(afile, norm_key, val, _tag_name='Track') + +def get_discnum(afile, norm_key): + return util.get_easy_discnum(afile, norm_key, _tag_name='Disc') +def set_discnum(afile, norm_key, val): + return util.set_easy_discnum(afile, norm_key, val, _tag_name='Disc') +def get_totaldiscs(afile, norm_key): + return util.get_easy_totaldiscs(afile, norm_key, _tag_name='Disc') +def set_totaldiscs(afile, norm_key, val): + return util.set_easy_totaldiscs(afile, norm_key, val, _tag_name='Disc') + + +def get_pictures(afile, norm_key): + artworks = [] + for pic_tag, pic_type in pic_tag2type.items(): + if pic_tag in afile.mfile.tags: + p = afile.mfile.tags[pic_tag].value + try: + artwork = Artwork(p, pic_type=pic_type) + except OSError: + artwork = Artwork(p.split(b'\0', 1)[1], pic_type=pic_type) + artworks.append(artwork) + return MetadataItem(Artwork, None, artworks) + +def set_pictures(afile, norm_key, artworks): + for art in artworks.values: + pic_tag = pic_type2tag[art.pic_type] + raw = (pic_tag + '.jpg').encode('ascii') + b'\0' + art.raw + afile.mfile.tags[pic_tag] = raw + + +class Apev2File(AudioFile): + tag_format = "APEv2" + mutagen_kls = mutagen.apev2.APEv2File + + _TAG_MAP = { + 'tracktitle': TAG_MAP_ENTRY(getter='Title', setter='Title', type=str), + 'artist': TAG_MAP_ENTRY(getter='Artist', setter='Artist', type=str), + 'album': TAG_MAP_ENTRY(getter='Album', setter='Album', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='Album Artist', setter='Album Artist', + type=str), + 'composer': TAG_MAP_ENTRY(getter='Composer', setter='Composer', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter=get_tracknum, + setter=set_tracknum, + remover='Track', + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter=get_totaltracks, + setter=set_totaltracks, + remover='Track', + type=int), + 'discnumber': TAG_MAP_ENTRY(getter=get_discnum, + setter=set_discnum, + remover='Disc', + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter=get_totaldiscs, + setter=set_totaldiscs, + remover='Disc', + type=int), + 'genre': TAG_MAP_ENTRY(getter='Genre', setter='Genre', type=str), + 'year': TAG_MAP_ENTRY(getter='Year', setter='Year', type=int, + sanitizer=util.sanitize_year), + 'comment': TAG_MAP_ENTRY(getter='Comment', setter='Comment', type=str), + 'lyrics': TAG_MAP_ENTRY(getter='Lyrics', setter='Lyrics', type=str), + 'isrc': TAG_MAP_ENTRY(getter='ISRC', setter='ISRC', type=str), + 'compilation': TAG_MAP_ENTRY(getter='Compilation', setter='Compilation', + type=int, sanitizer=util.sanitize_bool), + + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + remover=list(pic_tag2type.keys()), + type=Artwork), + + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=str), + '#channels': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + '#bitspersample': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + '#samplerate': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + } + + def _ft_getter(self, key): + val = self.mfile.tags.get(key, None) + if val is not None: + val = str(val) + return val + + def _ft_setter(self, key, md_val, appendable=True): + self.mfile.tags[key] = str(md_val) + + +class WavePackFile(Apev2File): + tag_format = "WavPack" + mutagen_kls = mutagen.wavpack.WavPack + + _TAG_MAP = Apev2File._TAG_MAP.copy() + _TAG_MAP.update({ + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'WavePack', + type=str), + '#bitrate': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + '#bitspersample': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + }) + + +class MusepackFile(Apev2File): + tag_format = "Musepack" + mutagen_kls = mutagen.musepack.Musepack + + +class MonkeysAudioFile(Apev2File): + tag_format = "MonkeysAudio" + mutagen_kls = mutagen.monkeysaudio.MonkeysAudio + + +class OptimFrogFile(Apev2File): + tag_format = "OptimFROG" + mutagen_kls = mutagen.optimfrog.OptimFROG diff --git a/mod/music_tag/asf.py b/mod/music_tag/asf.py new file mode 100644 index 0000000..8b13774 --- /dev/null +++ b/mod/music_tag/asf.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# coding: utf-8 + +# asf is a microsoft format (wma, wmv, etc.) + +import mutagen.asf +from mutagen.id3 import PictureType + +from . import util + +from .file import AudioFile, TAG_MAP_ENTRY, Artwork, MetadataItem + +pic_type2tag = { + PictureType.COVER_FRONT: 'Cover Art (Front)', + PictureType.COVER_BACK: 'Cover Art (Back)', +} +pic_tag2type = {} +for key, val in pic_type2tag.items(): + pic_tag2type[val] = key +del key, val + + +def get_pictures(afile, norm_key): + artworks = [] + if "WM/Picture" in afile.mfile.tags: + p = afile.mfile.tags["WM/Picture"][0].value + if not isinstance(p, bytes): + p = eval(p) + try: + artwork = Artwork(p) + except OSError: + artwork = Artwork(p.split(b'\0', 1)[1]) + artworks.append(artwork) + return MetadataItem(Artwork, None, artworks) + + +def set_pictures(afile, norm_key, artworks): + for art in artworks.values: + pic_tag = "WM/Picture" + raw = (pic_tag + '.jpg').encode('ascii') + b'\0' + art.raw + afile.mfile.tags[pic_tag] = raw + + +class AsfFile(AudioFile): + tag_format = "ASF" + mutagen_kls = mutagen.asf.ASF + _TAG_MAP = { + 'tracktitle': TAG_MAP_ENTRY(getter='Title', setter='Title', type=str), + 'artist': TAG_MAP_ENTRY(getter='Author', setter='Author', type=str), + 'album': TAG_MAP_ENTRY(getter='WM/AlbumTitle', setter='WM/AlbumTitle', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='albumartist', setter='albumartist', + type=str), + 'composer': TAG_MAP_ENTRY(getter='composer', setter='composer', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter='WM/TrackNumber', setter='WM/TrackNumber', + type=str), + 'totaltracks': TAG_MAP_ENTRY(getter='tracktotal', setter='tracktotal', + type=int), + 'discnumber': TAG_MAP_ENTRY(getter='discnumber', setter='discnumber', + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter='disctotal', setter='disctotal', + type=int), + 'genre': TAG_MAP_ENTRY(getter='WM/Genre', setter='WM/Genre', type=str), + 'year': TAG_MAP_ENTRY(getter=('TDOR', 'originaldate'), + setter=('TDOR', 'originaldate'), + type=int, sanitizer=util.sanitize_year), + 'lyrics': TAG_MAP_ENTRY(getter='lyrics-eng', setter='lyrics-eng', type=str), + 'isrc': TAG_MAP_ENTRY(getter='isrc', setter='isrc', type=str), + 'comment': TAG_MAP_ENTRY(getter='Description', setter='Description', type=str), + 'compilation': TAG_MAP_ENTRY(getter='compilation', setter='compilation', + type=int, sanitizer=util.sanitize_bool), + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + remover=list(pic_tag2type.keys()), + type=Artwork), + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'flac', + type=str), + } + + def __init__(self, filename, **kwargs): + super(AsfFile, self).__init__(filename, **kwargs) diff --git a/mod/music_tag/dsf.py b/mod/music_tag/dsf.py new file mode 100644 index 0000000..df5a8ee --- /dev/null +++ b/mod/music_tag/dsf.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# coding: utf-8 + +import mutagen.dsf + +from .file import TAG_MAP_ENTRY +from .id3 import Id3File + + +class DsfFile(Id3File): + tag_format = "DSF" + mutagen_kls = mutagen.dsf.DSF + + def __init__(self, filename, **kwargs): + super(DsfFile, self).__init__(filename, **kwargs) + + self.tag_map = self.tag_map.copy() + self.tag_map.update({ + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'dsf', + type=str), + }) diff --git a/mod/music_tag/file.py b/mod/music_tag/file.py new file mode 100644 index 0000000..9ef82dd --- /dev/null +++ b/mod/music_tag/file.py @@ -0,0 +1,616 @@ +#!/usr/bin/env python + +from collections import namedtuple +import hashlib +import io +import shutil + +import mutagen +from mutagen.id3 import PictureType +try: + import PIL + from PIL import Image + BICUBIC = PIL.Image.BICUBIC + _HAS_PIL = True +except ImportError: + BICUBIC = None + _HAS_PIL = False + +from . import util + + +def getter_not_implemented(afile, norm_key): + raise NotImplementedError("getter: '{0}' not implemented for {1}" + "".format(norm_key, type(afile))) + +def setter_not_implemented(afile, norm_key, val): + raise NotImplementedError("setter: '{0}' not implemented for {1}" + "".format(norm_key, type(afile))) + +def albumartist_from_comp(afile, norm_key): + ret = None + if afile.get('compilation', default=None): + ret = 'Various Artists' + return ret + +def comp_from_albumartist(afile, norm_key): + ret = None + albumartist = afile.get('albumartist', default=None) + if albumartist: + albumartist = albumartist.first.lower().replace(' ', '') + if albumartist in ('various', 'variousartists'): + ret = True + else: + ret = False + return ret + + +TAG_MAP_ENTRY = namedtuple('TAG_MAP_ENTRY', ('getter', 'setter', 'remover', + 'type', 'sanitizer')) +TAG_MAP_ENTRY.__new__.__defaults__ = (getter_not_implemented, # getter + setter_not_implemented, # setter + None, # remover + str, # type + None, # sanitizer + ) + + +class MetadataItem(object): + def __init__(self, typ, sanitizer, val): + self._values = None + + if isinstance(val, MetadataItem): + val = val.values + + self.type = typ + self.sanitizer = sanitizer + self.values = val + + @property + def ismissing(self): + return bool(self.values) + @property + def isna(self): + return bool(self.values) + + @property + def values(self): + return self._values + @values.setter + def values(self, val): + if isinstance(val, (list, tuple)): + self._values = list(val) + elif val is None: + self._values = [] + else: + self._values = [val] + + for i, v in enumerate(self._values): + if self.sanitizer is not None: + v = self.sanitizer(v) + if not (self.type is None or v is None or isinstance(v, self.type)): + v = self.type(v) + self._values[i] = v + + @property + def value(self): + try: + if self.type is None: + if len(self.values) == 1: + val = self.values[0] + else: + val = str(self) + else: + val = self.type(self) + except TypeError: + values = self.values + if not values: + raise ValueError("No values exist") + elif len(values) > 1: + raise ValueError("Multiple values exist: {0}".format(repr(values))) + val = values[0] + return val + @property + def val(self): + return self.value + + @property + def first(self): + try: + return self._values[0] + except IndexError: + return None + + def append(self, val): + if self.sanitizer is not None: + val = self.sanitizer(val) + if not (self.type is None or val is None or isinstance(val, self.type)): + val = self.type(val) + + if self._values: + self._values.append(val) + else: + self._values = [val] + + def __len__(self): + return len(self._values) + + def __str__(self): + return ', '.join(str(li) for li in self._values) + + def __int__(self): + if not self._values: + val = 0 + elif len(self._values) == 1: + val = int(self._values[0]) + else: + raise ValueError("Metadata must have 1 value to cast to int") + return val + + def __bool__(self): + return any(self._values) + + def __list__(self): + return list(self._values) + + def __tuple__(self): + return tuple(self._values) + + def __repr__(self): + return ''.format(self.__str__()) + + +class Artwork(object): + def __init__(self, raw, width=None, height=None, fmt=None, depth=None, + pic_type=PictureType.COVER_FRONT): + if isinstance(raw, Artwork): + orig = raw + raw = orig.raw + width, height = orig.width, orig.height + fmt, depth = orig.fmt, orig.depth + pic_type = orig.pic_type + del orig + + if not isinstance(raw, bytes): + raise TypeError("image data must have type 'bytes'") + + self.raw = raw + + if any(v is None for v in (width, height, fmt, depth)): + try: + img = self.image + width = img.width + height = img.height + fmt = img.format.lower() + mode2depth = {'1': 1, 'L': 8, 'P': 8, 'RGB': 24, 'RGBA': 32, + 'CMYK': 32, 'YCbCr': 24, 'I': 32, 'F': 32} + depth = mode2depth[img.mode] + except ImportError: + width = None + height = None + fmt = None + depth = None + + self.width = width + self.height = height + self.depth = depth + self.format = fmt + self.mime = "image/{0}".format(self.format) + + # ``pic_type`` should be one of ``mutagen.id3.PictureType.*`` + self.pic_type = pic_type + + # Image.open(io.BytesIO(self.data)) # for testing + + @property + def data(self): + return self.raw + + @property + def image(self): + img = None + if _HAS_PIL: + img = Image.open(io.BytesIO(self.raw)) + else: + raise ImportError("PIL (Pillow) not installed") + return img + + def thumbnail(self, size, method=BICUBIC): + image = self.image + image.thumbnail(size, method) + return image + + def raw_thumbnail(self, size, method=BICUBIC, format=None, quality=95, + return_info=False): + thumb = self.thumbnail(size, method=method) + if format is None: + format = thumb.format + + with io.BytesIO() as output: + thumb.save(output, format=format, quality=quality) + raw = output.getvalue() + + if return_info: + info = {'width': thumb.width, 'height': thumb.height} + return raw, info + else: + return raw + + def __str__(self): + md5 = hashlib.md5() + md5.update(self.data) + return "{0} {1}x{2} {3}".format(self.mime, self.width, self.height, + md5.hexdigest()) + + +class RawProxy(object): + def __init__(self, parent): + self.parent = parent + + def resolve(self, norm_key, default=None): + return self.parent.resolve(norm_key, default, typeless=True) + + def get(self, norm_key, default=None): + raw_key = norm_key + norm_key = self.parent._normalize_norm_key(norm_key) + if norm_key in self.parent.tag_map: + md_item = self.parent.get(norm_key, default=default, typeless=True) + return md_item + else: + return self.parent.mfile[raw_key] + + def set(self, norm_key, val): + raw_key = norm_key + norm_key = self.parent._normalize_norm_key(norm_key) + if norm_key in self.parent.tag_map: + self.parent.set(norm_key, val, typeless=True) + else: + self.parent.mfile[raw_key] = val + + def __getitem__(self, norm_key): + return self.get(norm_key, default=None) + def __setitem__(self, norm_key, val): + self.set(norm_key, val) + + +class NotAppendable(Exception): + pass + + +class AudioFile(object): + tag_format = "None" + mutagen_kls = None + + appendable = True + + # The _DEFAULT_* attributes should not be overridden in subclasses + _DEFAULT_TAG_ALIASES = { + 'title': 'tracktitle', + 'name': 'tracktitle', + 'disknumber': 'discnumber', + 'totaldisks': 'totaldiscs', + } + + _DEFAULT_TAG_MAP = { + 'tracktitle': TAG_MAP_ENTRY(type=str), + 'artist': TAG_MAP_ENTRY(type=str), + 'album': TAG_MAP_ENTRY(type=str), + 'albumartist': TAG_MAP_ENTRY(type=str), + 'composer': TAG_MAP_ENTRY(type=str), + 'tracknumber': TAG_MAP_ENTRY(type=int), + 'totaltracks': TAG_MAP_ENTRY(type=int), + 'discnumber': TAG_MAP_ENTRY(type=int), + 'totaldiscs': TAG_MAP_ENTRY(type=int), + 'genre': TAG_MAP_ENTRY(type=str), + 'year': TAG_MAP_ENTRY(type=int, sanitizer=util.sanitize_year), + 'compilation': TAG_MAP_ENTRY(type=bool), + 'lyrics': TAG_MAP_ENTRY(type=str), + 'isrc': TAG_MAP_ENTRY(type=str), + 'comment': TAG_MAP_ENTRY(type=str), + + 'artwork': TAG_MAP_ENTRY(type=Artwork), + + '#bitrate': TAG_MAP_ENTRY(getter='bitrate', type=int), + '#codec': TAG_MAP_ENTRY(getter='codec', type=str), + '#length': TAG_MAP_ENTRY(getter='length', type=float), + '#channels': TAG_MAP_ENTRY(getter='channels', type=int), + '#bitspersample': TAG_MAP_ENTRY(getter='bits_per_sample', type=int), + '#samplerate': TAG_MAP_ENTRY(getter='sample_rate', type=int), + } + + _DEFAULT_RESOLVERS = { + 'albumartist': ('albumartist', albumartist_from_comp, 'artist'), + 'artist': ('artist', 'albumartist'), + 'compilation': ('compilation', comp_from_albumartist), + 'discnumber': ('discnumber', + lambda afile, norm_key: 1 + ), + 'totaldiscs': ('totaldiscs', + lambda afile, norm_key: afile.get('discnumber', 1) + ), + } + + _DEFAULT_SINGULAR_KEYS = ['tracknumber', 'totaltracks', + 'discnumber', 'totaldiscs', + 'year', 'compilation', + ] + + # these 3 attributes may be overridden in subclasses + _TAG_ALIASES = {} + _TAG_MAP = {} + _RESOLVERS = {} + _SINGULAR_KEYS = [] + + + def __init__(self, filename, _mfile=None): + self.tag_aliases = self._DEFAULT_TAG_ALIASES.copy() + self.tag_aliases.update(self._TAG_ALIASES) + + self.tag_map = self._DEFAULT_TAG_MAP.copy() + self.tag_map.update(self._TAG_MAP) + + self.resolvers = self._DEFAULT_RESOLVERS.copy() + self.resolvers.update(self._RESOLVERS) + + self.singular_keys = self._DEFAULT_SINGULAR_KEYS.copy() + self.singular_keys += self._SINGULAR_KEYS + + self.filename = filename + if _mfile is None: + self.mfile = mutagen.File(filename) + else: + self.mfile = _mfile + + if self.mfile.tags is None: + self.mfile.add_tags() + + @property + def raw(self): + return RawProxy(self) + + def save(self, filename=None, **kwargs): + """BE CAREFUL, I doubt I did a good job testing tag editing""" + if filename is None: + self.mfile.save(**kwargs) + filename = self.filename + else: + shutil.copyfile(self.filename, filename) + self.mfile.save(filename, **kwargs) + + def _normalize_norm_key(self, norm_key): + norm_key = norm_key.replace(' ', '').replace('_', '').replace('-', '').lower() + if self.tag_aliases and norm_key in self.tag_aliases: + norm_key = self.tag_aliases[norm_key] + return norm_key + + def resolve(self, norm_key, default=None, typeless=False): + norm_key = self._normalize_norm_key(norm_key) + tmap = self.tag_map[norm_key] + md_type = None if typeless else tmap.type + md_sanitizer = None if typeless else tmap.sanitizer + + ret = None + if norm_key in self.resolvers: + for resolver in self.resolvers[norm_key]: + if hasattr(resolver, '__call__'): + ret = resolver(self, norm_key) + else: + ret = self.get(resolver, default=None, _raw_default=True, + typeless=typeless) + if ret is not None: + break + else: + ret = self.get(norm_key, default=None, _raw_default=True, + typeless=typeless) + + if not (ret is None or isinstance(ret, MetadataItem)): + ret = MetadataItem(md_type, md_sanitizer, ret) + + if ret is None: + ret = MetadataItem(md_type, md_sanitizer, default) + + return ret + + def _ft_getter(self, key): + return self.mfile.tags.get(key, None) + + def get(self, norm_key, default=None, _raw_default=False, typeless=False): + norm_key = self._normalize_norm_key(norm_key) + tmap = self.tag_map[norm_key] + md_type = None if typeless else tmap.type + md_sanitizer = None if typeless else tmap.sanitizer + + ret = None + if hasattr(tmap.getter, '__call__'): + val = tmap.getter(self, norm_key) + ret = None if val is None else MetadataItem(md_type, md_sanitizer, + val) + elif norm_key.startswith('#'): + val = getattr(self.mfile.info, tmap.getter) + if not typeless: + val = tmap.type(val) + ret = None if val is None else MetadataItem(md_type, md_sanitizer, + val) + elif isinstance(tmap.getter, (list, tuple)): + val = None + for getter in tmap.getter: + if val is not None: + break + if hasattr(getter, '__call__'): + val = getter(self, norm_key) + elif getter in self.mfile.tags: + val = self._ft_getter(getter) + ret = None if val is None else MetadataItem(md_type, md_sanitizer, + val) + else: + try: + val = self._ft_getter(tmap.getter) + except KeyError: + val = None + ret = None if val is None else MetadataItem(md_type, md_sanitizer, + val) + + if ret is None: + if _raw_default: + ret = default + else: + ret = MetadataItem(md_type, md_sanitizer, default) + + return ret + + def _ft_setter(self, key, md_val, appendable=True): + if self.appendable and appendable: + self.mfile.tags[key] = md_val.values + else: + self.mfile.tags[key] = md_val.value + + def set_raw(self, norm_key, key, md_val, appendable=True): + if not isinstance(md_val, MetadataItem): + if isinstance(md_val, (list, tuple)): + md_val = MetadataItem(type(md_val[0]), None, md_val) + else: + md_val = MetadataItem(type(md_val), None, md_val) + + appendable = appendable and norm_key not in self.singular_keys + if norm_key in self.singular_keys and len(md_val.values) > 1: + raise ValueError("Key '{0}' can not have multiple values; {1}" + "".format(norm_key, md_val.values)) + + try: + self._ft_setter(key, md_val, appendable=appendable) + except (TypeError, ValueError): + try: + v = [str(vi) for vi in md_val.values] + self._ft_setter(key, MetadataItem(str, None, v), + appendable=appendable) + except Exception: + success = False + else: + success = True + if not success: + raise + + def set(self, norm_key, val, typeless=False): + norm_key = self._normalize_norm_key(norm_key) + tmap = self.tag_map[norm_key] + md_type = None if typeless else tmap.type + md_sanitizer = None if typeless else tmap.sanitizer + + if not isinstance(val, MetadataItem): + val = MetadataItem(md_type, md_sanitizer, val) + + if hasattr(tmap.setter, '__call__'): + tmap.setter(self, norm_key, val) + elif norm_key.startswith('#'): + raise KeyError("Can not set file info (tags that begin with #)") + elif isinstance(tmap.setter, (list, tuple)): + value_set = False + for setter in tmap.setter: + if value_set: + break + if hasattr(tmap.setter, '__call__'): + tmap.setter(self, norm_key, val) + value_set = True + elif setter in self.mfile.tags: + self.set_raw(norm_key, setter, val) + value_set = True + if not value_set: + self.set_raw(norm_key, tmap.setter[0], val) + else: + self.set_raw(norm_key, tmap.setter, val) + + def append_tag(self, norm_key, val): + norm_key = self._normalize_norm_key(norm_key) + if not self.appendable: + raise NotAppendable("{0} can not have multiple values for tags" + "".format(self.__class__.__name__)) + if norm_key in self.singular_keys: + raise NotAppendable("{0} can not have multiple values for '{1}'" + "".format(self.__class__.__name__, norm_key)) + + existing_val = self.get(norm_key, default=None) + if existing_val is None: + new_val = val + else: + existing_val.append(val) + new_val = existing_val + self.set(norm_key, new_val) + + def append(self, norm_key, val): + # I'm not sure how i feel about this synonym since append usually + # takes a single argument in python (i.e. lists etc) + return self.append_tag(norm_key, val) + + def _ft_rmtag(self, key): + if key in self.mfile.tags: + del self.mfile.tags[key] + + def remove_tag(self, norm_key): + norm_key = self._normalize_norm_key(norm_key) + + if norm_key.startswith('#'): + raise KeyError("Can not remove tags that start with '#' since " + "they are not real tags") + + tmap = self.tag_map[norm_key] + + remover = None + if tmap.remover: + remover = tmap.remover + + if not remover: + if isinstance(tmap.getter, (list, tuple)): + remover = [g for g in tmap.getter if isinstance(g, util.string_types)] + if isinstance(tmap.getter, util.string_types): + remover = [tmap.getter] + + if not remover: + if isinstance(tmap.setter, (list, tuple)): + remover = [s for s in tmap.setter if isinstance(s, util.string_types)] + if isinstance(tmap.setter, util.string_types): + remover = [tmap.setter] + + if remover is not None: + if hasattr(remover, '__call__'): + remover(self, norm_key) + elif isinstance(remover, (list, tuple)): + for key in remover: + self._ft_rmtag(key) + elif isinstance(remover, util.string_types): + self._ft_rmtag(remover) + + def info(self, tags=None, show_empty=False, resolve=False): + if not tags: + tags = self._TAG_MAP.keys() + + t_lst = [] + for tag in tags: + if resolve: + mdi = self.resolve(tag, None) + else: + mdi = self.get(tag, None) + + if mdi or show_empty: + t_lst.append('{0}: {1}'.format(tag, str(mdi))) + + return '\n'.join(t_lst) + + + def __getitem__(self, norm_key): + return self.get(norm_key, default=None) + + def __setitem__(self, norm_key, val): + self.set(norm_key, val) + + def __contains__(self, key): + return self[key].values != [] + + def __delitem__(self, norm_key): + self.remove_tag(norm_key) + + def __str__(self): + return self.info(show_empty=True) + +## +## EOF +## diff --git a/mod/music_tag/flac.py b/mod/music_tag/flac.py new file mode 100644 index 0000000..b61d04b --- /dev/null +++ b/mod/music_tag/flac.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# coding: utf-8 + +import base64 + +import mutagen.flac + +from . import util +from .file import Artwork, AudioFile, MetadataItem, TAG_MAP_ENTRY + + +def get_pictures(afile, norm_key): + artworks = [Artwork(p.data, width=p.width, height=p.height, + fmt=p.mime.split('/')[-1], pic_type=p.type) + for p in afile.mfile.pictures] + return MetadataItem(Artwork, None, artworks) + + +def set_pictures(afile, norm_key, artworks): + if not isinstance(artworks, MetadataItem): + raise TypeError() + + afile.mfile.clear_pictures() + for i, art in enumerate(artworks.values): + if any(v is None for v in (art.mime, art.width, art.height, art.depth)): + raise ImportError("Please install Pillow to properly handle images") + pic = mutagen.flac.Picture() + pic.data = art.raw + pic.type = art.pic_type + pic.mime = art.mime + pic.width = art.width + pic.height = art.height + pic.depth = art.depth + afile.mfile.add_picture(pic) + + +def rm_pictures(afile, norm_key): + afile.mfile.clear_pictures() + + +class FlacFile(AudioFile): + tag_format = "FLAC" + mutagen_kls = mutagen.flac.FLAC + + _TAG_MAP = { + 'tracktitle': TAG_MAP_ENTRY(getter='title', setter='title', type=str), + 'artist': TAG_MAP_ENTRY(getter='artist', setter='artist', type=str), + 'album': TAG_MAP_ENTRY(getter='album', setter='album', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='albumartist', setter='albumartist', + type=str), + 'composer': TAG_MAP_ENTRY(getter='composer', setter='composer', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter='tracknumber', setter='tracknumber', + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter='tracktotal', setter='tracktotal', + type=int), + 'discnumber': TAG_MAP_ENTRY(getter='discnumber', setter='discnumber', + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter='disctotal', setter='disctotal', + type=int), + 'genre': TAG_MAP_ENTRY(getter='genre', setter='genre', type=str), + 'year': TAG_MAP_ENTRY(getter=('date', 'originaldate'), + setter=('date', 'originaldate'), + type=int, sanitizer=util.sanitize_year), + 'lyrics': TAG_MAP_ENTRY(getter='lyrics', setter='lyrics', type=str), + 'isrc': TAG_MAP_ENTRY(getter='isrc', setter='isrc', type=str), + 'comment': TAG_MAP_ENTRY(getter='comment', setter='comment', type=str), + 'compilation': TAG_MAP_ENTRY(getter='compilation', setter='compilation', + type=int, sanitizer=util.sanitize_bool), + + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + remover=rm_pictures, + type=Artwork), + + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'flac', + type=str), + } + + def _ft_setter(self, key, md_val, appendable=True): + if self.appendable and appendable: + self.mfile.tags[key] = [str(v) for v in md_val.values] + else: + self.mfile.tags[key] = str(md_val.value) diff --git a/mod/music_tag/id3.py b/mod/music_tag/id3.py new file mode 100644 index 0000000..6e7405b --- /dev/null +++ b/mod/music_tag/id3.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python +# coding: utf-8 + +# FIXME: there's probably a more standard way of setting desc or repeated tags +# FIXME: there's probably a better way of dealing with pic_type + +import mutagen +import mutagen.id3 +import mutagen.easyid3 +import mutagen.mp3 +import mutagen.trueaudio + +from . import util +from .file import Artwork, AudioFile, MetadataItem, TAG_MAP_ENTRY + + +def get_tracknumA(afile, norm_key): + return util.get_easy_tracknum(afile, norm_key, _tag_name='TRK') +def set_tracknumA(afile, norm_key, val): + return util.set_easy_tracknum(afile, norm_key, val, _tag_name='TRK') +def get_totaltracksA(afile, norm_key): + return util.get_easy_totaltracks(afile, norm_key, _tag_name='TRK') +def set_totaltracksA(afile, norm_key, val): + return util.set_easy_totaltracks(afile, norm_key, val, _tag_name='TRK') + +def get_discnumA(afile, norm_key): + return util.get_easy_discnum(afile, norm_key, _tag_name='TPA') +def set_discnumA(afile, norm_key, val): + return util.set_easy_discnum(afile, norm_key, val, _tag_name='TPA') +def get_totaldiscsA(afile, norm_key): + return util.get_easy_totaldiscs(afile, norm_key, _tag_name='TPA') +def set_totaldiscsA(afile, norm_key, val): + return util.set_easy_totaldiscs(afile, norm_key, val, _tag_name='TPA') + +def get_tracknumB(afile, norm_key): + return util.get_easy_tracknum(afile, norm_key, _tag_name='TRCK') +def set_tracknumB(afile, norm_key, val): + return util.set_easy_tracknum(afile, norm_key, val, _tag_name='TRCK') +def get_totaltracksB(afile, norm_key): + return util.get_easy_totaltracks(afile, norm_key, _tag_name='TRCK') +def set_totaltracksB(afile, norm_key, val): + return util.set_easy_totaltracks(afile, norm_key, val, _tag_name='TRCK') + +def get_discnumB(afile, norm_key): + return util.get_easy_discnum(afile, norm_key, _tag_name='TPOS') +def set_discnumB(afile, norm_key, val): + return util.set_easy_discnum(afile, norm_key, val, _tag_name='TPOS') +def get_totaldiscsB(afile, norm_key): + return util.get_easy_totaldiscs(afile, norm_key, _tag_name='TPOS') +def set_totaldiscsB(afile, norm_key, val): + return util.set_easy_totaldiscs(afile, norm_key, val, _tag_name='TPOS') + +def get_pictures(afile, norm_key): + pics = afile.mfile.tags.getall('APIC') + afile.mfile.tags.getall('PIC') + artworks = [] + for p in pics: + artworks.append(Artwork(p.data, pic_type=p.type)) + return MetadataItem(Artwork, None, artworks) + +def set_pictures(afile, norm_key, artworks): + if afile.mfile.tags.getall('APIC'): + kls = mutagen.id3.APIC + elif afile.mfile.tags.getall('PIC'): + kls = mutagen.id3.PIC + else: + if afile.mfile.tags.version[:2] == (2, 2): + kls = mutagen.id3.PIC + else: + kls = mutagen.id3.APIC + + tag = str(kls.__name__).strip(':') + afile.mfile.tags.delall(tag) + for i, art in enumerate(artworks.values): + if kls == mutagen.id3.PIC: + mime = { + 'image/jpeg': 'JPG', + 'image/jpg': 'JPG', + 'image/png': 'PNG' + }[art.mime.lower()] + else: + mime = art.mime + afile.mfile.tags.add(kls(data=art.raw, type=art.pic_type, desc=str(i), + mime=mime)) + +# https://github.com/tilo/ID3/tree/master/docs + +_TAG_MAP_ID3_1 = { + 'tracktitle': TAG_MAP_ENTRY(getter='title', setter='title', type=str), + 'artist': TAG_MAP_ENTRY(getter='artist', setter='artist', type=str), + 'album': TAG_MAP_ENTRY(getter='album', setter='album', type=str), + 'year': TAG_MAP_ENTRY(getter='year', setter='year', type=int, + sanitizer=util.sanitize_year), + 'comment': TAG_MAP_ENTRY(getter='comment', setter='comment', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter='track', setter='track', type=int), + 'genre': TAG_MAP_ENTRY(getter='genre', setter='genre', type=str), +} + +_TAG_MAP_ID3_2_2 = { + 'tracktitle': TAG_MAP_ENTRY(getter='TT2', setter='TT2', type=str), + 'artist': TAG_MAP_ENTRY(getter='TP1', setter='TP1', type=str), + 'album': TAG_MAP_ENTRY(getter='TAL', setter='TAL', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='TP2', setter='TP2', type=str), + 'composer': TAG_MAP_ENTRY(getter='TCM', setter='TCM', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter=get_tracknumA, + setter=set_tracknumA, + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter=get_totaltracksA, + setter=set_totaltracksA, + type=int), + 'discnumber': TAG_MAP_ENTRY(getter=get_discnumA, + setter=set_discnumA, + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter=get_totaldiscsA, + setter=set_totaldiscsA, + type=int), + 'genre': TAG_MAP_ENTRY(getter='TCO', setter='TCO', type=str), + 'year': TAG_MAP_ENTRY(getter=('TYE', 'TDA', 'TRD', 'TOR'), + setter=('TYE', 'TDA', 'TRD', 'TOR'), + type=int, sanitizer=util.sanitize_year), + 'isrc': TAG_MAP_ENTRY(getter='TRC', setter='TRC', type=str), + # 'comment': TAG_MAP_ENTRY(getter='COMM', setter='COMM', type=str, + # sanitizer=util.sanitize_bool), + # 'lyrics': TAG_MAP_ENTRY(getter='USLT', setter='USLT', type=str), + # 'compilation': TAG_MAP_ENTRY(getter='TCMP', setter='TCMP', type=int), + + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + type=Artwork), +} + +_TAG_MAP_ID3_2_3 = { + 'tracktitle': TAG_MAP_ENTRY(getter='TIT2', setter='TIT2', type=str), + 'artist': TAG_MAP_ENTRY(getter='TPE1', setter='TPE1', type=str), + 'album': TAG_MAP_ENTRY(getter='TALB', setter='TALB', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='TPE2', setter='TPE2', type=str), + 'composer': TAG_MAP_ENTRY(getter='TCOM', setter='TCOM', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter=get_tracknumB, + setter=set_tracknumB, + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter=get_totaltracksB, + setter=set_totaltracksB, + type=int), + 'discnumber': TAG_MAP_ENTRY(getter=get_discnumB, + setter=set_discnumB, + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter=get_totaldiscsB, + setter=set_totaldiscsB, + type=int), + 'genre': TAG_MAP_ENTRY(getter='TCON', setter='TCON', type=str), + 'year': TAG_MAP_ENTRY(getter=('TYER', 'TDAT', 'TDRC'), + setter=('TYER', 'TDAT', 'TDRC'), + type=int, sanitizer=util.sanitize_year), + 'comment': TAG_MAP_ENTRY(getter='COMM', setter='COMM', type=str), + 'lyrics': TAG_MAP_ENTRY(getter='USLT', setter='USLT', type=str), + 'isrc': TAG_MAP_ENTRY(getter='TSRC', setter='TSRC', type=str), + 'compilation': TAG_MAP_ENTRY(getter='TCMP', setter='TCMP', type=int, + sanitizer=util.sanitize_bool), + + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + type=Artwork), +} + +_TAG_MAP_ID3_2_4 = { + 'tracktitle': TAG_MAP_ENTRY(getter='TIT2', setter='TIT2', type=str), + 'artist': TAG_MAP_ENTRY(getter='TPE1', setter='TPE1', type=str), + 'album': TAG_MAP_ENTRY(getter='TALB', setter='TALB', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='TPE2', setter='TPE2', type=str), + 'composer': TAG_MAP_ENTRY(getter='TCOM', setter='TCOM', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter=get_tracknumB, + setter=set_tracknumB, + remover=('TRK', 'TRCK'), + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter=get_totaltracksB, + setter=set_totaltracksB, + remover=('TRK', 'TRCK'), + type=int), + 'discnumber': TAG_MAP_ENTRY(getter=get_discnumB, + setter=set_discnumB, + remover=('TPA', 'TPOS'), + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter=get_totaldiscsB, + setter=set_totaldiscsB, + remover=('TPA', 'TPOS'), + type=int), + 'genre': TAG_MAP_ENTRY(getter='TCON', setter='TCON', + type=str), + 'year': TAG_MAP_ENTRY(getter=('TDOR', 'TORY', 'TYER', 'TDAT', 'TDRC'), + setter=('TDOR', 'TYER', 'TDAT', 'TDRC'), + type=int, sanitizer=util.sanitize_year), + 'comment': TAG_MAP_ENTRY(getter='COMM', setter='COMM', type=str), + 'lyrics': TAG_MAP_ENTRY(getter='USLT', setter='USLT', type=str), + 'isrc': TAG_MAP_ENTRY(getter='TSRC', setter='TSRC', type=str), + 'compilation': TAG_MAP_ENTRY(getter='TCMP', setter='TCMP', type=int, + sanitizer=util.sanitize_bool), + + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + remover=('APIC', 'PIC'), + type=Artwork), +} + + +class Id3File(AudioFile): + tag_format = "Id3" + mutagen_kls = mutagen.id3.ID3FileType + + def __init__(self, filename, **kwargs): + mfile = kwargs.get('_mfile', None) + if mfile is None: + mfile = mutagen.File(filename) + kwargs['_mfile'] = mfile + + if mfile.tags: + id3_ver = mfile.tags.version[:2] + else: + id3_ver = (2, 4) + + # by default, mutagen presents all files using id3v2.4 + if id3_ver[0] == 1: + self._TAG_MAP = _TAG_MAP_ID3_2_4 + elif id3_ver == (2, 2): + self._TAG_MAP = _TAG_MAP_ID3_2_4 + elif id3_ver == (2, 3): + self._TAG_MAP = _TAG_MAP_ID3_2_4 + elif id3_ver == (2, 4): + self._TAG_MAP = _TAG_MAP_ID3_2_4 + else: + raise NotImplementedError("Unexpected id3 tag version: {0}" + "".format(mfile.tags.version)) + + super(Id3File, self).__init__(filename, **kwargs) + + def _ft_getter(self, key): + vals = self.mfile.tags.getall(key) + ret = [] + for val in vals: + if isinstance(val.text, (list, tuple)): + ret += [str(t) for t in val.text] + else: + ret += [str(val.text)] + if not ret: + ret = None + return ret + + def _ft_setter(self, key, md_val, appendable=True): + self.mfile.tags.delall(key) + kls = getattr(mutagen.id3, key.split(':')[0]) + + kwargs = {} + _o = kls() + if hasattr(_o, 'lang'): + # so, it's a little anglo-centric to do this, but + # this matches the behavior of kid3 and MusicBrainz Picard + kwargs['lang'] = 'eng' + + self.mfile.tags.add(kls(text=str(md_val), **kwargs)) + + def _ft_rmtag(self, key): + self.mfile.tags.delall(key) + + +class Mp3File(Id3File): + tag_format = "Mp3" + mutagen_kls = mutagen.mp3.MP3 + + def __init__(self, filename, **kwargs): + super(Mp3File, self).__init__(filename, **kwargs) + + self.tag_map = self.tag_map.copy() + self.tag_map.update({ + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'mp3', + type=str), + '#bitspersample': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + }) + + +class EasyMp3File(AudioFile): + tag_format = "EasyMP3" + mutagen_kls = mutagen.mp3.EasyMP3 + + +class EasyId3File(AudioFile): + tag_format = "EasyID3" + mutagen_kls = mutagen.easyid3.EasyID3FileType + + +class TrueAudioFile(Id3File): + tag_format = "TrueAudio" + mutagen_kls = mutagen.trueaudio.TrueAudio + + +class EasyTrueAudioFile(AudioFile): + tag_format = "EasyTrueAudio" + mutagen_kls = mutagen.trueaudio.EasyTrueAudio diff --git a/mod/music_tag/mp4.py b/mod/music_tag/mp4.py new file mode 100644 index 0000000..b4a65f9 --- /dev/null +++ b/mod/music_tag/mp4.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python +# coding: utf-8 + +# FIXME: does artwork need a proper shim? + +import mutagen.mp4 +import mutagen.easymp4 +from mutagen.mp4 import MP4FreeForm + +from . import util +from .file import Artwork, AudioFile, MetadataItem, TAG_MAP_ENTRY + + +mutagen.easymp4.EasyMP4Tags.RegisterTextKey("compilation", "cpil") + + +_MP4_ISRC_KEY = '----:com.apple.iTunes:ISRC' + + +def get_tracknum(afile, norm_key): + trkn = afile.mfile.get('trkn', [(None, None)])[0] + try: + return trkn[0] + except IndexError: + return None + +def set_tracknum(afile, norm_key, val): + trkn = list(afile.mfile.tags.get('trkn', [(0, 0)])[0]) + trkn += [0] * (2 - len(trkn)) + trkn[0] = int(val) + trkn = tuple([0 if i is None else int(i) for i in trkn]) + afile.mfile.tags['trkn'] = [trkn] + +def get_totaltracks(afile, norm_key): + trkn = afile.mfile.get('trkn', [(None, None)])[0] + try: + return trkn[1] + except IndexError: + return None + +def set_totaltracks(afile, norm_key, val): + trkn = list(afile.mfile.tags.get('trkn', [(0, 0)])[0]) + trkn += [0] * (2 - len(trkn)) + trkn[1] = int(val) + trkn = tuple([0 if i is None else int(i) for i in trkn]) + afile.mfile.tags['trkn'] = [trkn] + +def get_discnum(afile, norm_key): + trkn = afile.mfile.get('disk', [(None, None)])[0] + try: + return trkn[0] + except IndexError: + return None + +def set_discnum(afile, norm_key, val): + disc = list(afile.mfile.tags.get('disk', [(0, 0)])[0]) + disc += [0] * (2 - len(disc)) + disc[0] = int(val) + disc = [0 if i is None else i for i in disc] + afile.mfile.tags['disk'] = [tuple(disc)] + +def get_totaldiscs(afile, norm_key): + trkn = afile.mfile.get('disk', [(None, None)])[0] + try: + return trkn[1] + except IndexError: + return None + +def set_totaldiscs(afile, norm_key, val): + disc = list(afile.mfile.tags.get('disk', [(0, 0)])[0]) + disc += [0] * (2 - len(disc)) + disc[1] = int(val) + disc = [0 if i is None else i for i in disc] + afile.mfile.tags['disk'] = [tuple(disc)] + +def get_artwork(afile, norm_key): + fmt_lut = {mutagen.mp4.MP4Cover.FORMAT_JPEG: 'jpeg', + mutagen.mp4.MP4Cover.FORMAT_PNG: 'png', + } + artworks = [Artwork(bytes(p)) for p in afile.mfile.tags['covr']] + + return MetadataItem(Artwork, None, artworks) + +def set_artwork(afile, norm_key, artworks): + if not isinstance(artworks, MetadataItem): + raise TypeError() + + pics = [] + for art in artworks.values: + if any(v is None for v in (art.mime, )): + raise ImportError("Please install Pillow to properly handle images") + + mime_fmt = art.mime.split('/')[1].upper() + if mime_fmt == 'JPEG': + img_fmt = mutagen.mp4.MP4Cover.FORMAT_JPEG + elif mime_fmt == 'PNG': + img_fmt = mutagen.mp4.MP4Cover.FORMAT_PNG + else: + raise TypeError('mp4 artwork should be either jpeg or png') + + pics.append(mutagen.mp4.MP4Cover(art.raw, imageformat=img_fmt)) + afile.mfile.tags['covr'] = pics + +def freeform_get(afile, norm_key): + return [val.decode() for val in afile.mfile.get(norm_key, [])] + +def freeform_set(afile, norm_key, val): + ff_vals = [MP4FreeForm(v.encode('utf-8')) for v in val.values] + afile.mfile.tags[norm_key] = ff_vals + + +class Mp4File(AudioFile): + tag_format = "mp4" + mutagen_kls = mutagen.mp4.MP4 + + _TAG_MAP = { + 'tracktitle': TAG_MAP_ENTRY(getter='©nam', setter='©nam', type=str), + 'artist': TAG_MAP_ENTRY(getter='©ART', setter='©ART', type=str), + 'album': TAG_MAP_ENTRY(getter='©alb', setter='©alb', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='aART', setter='aART', type=str), + 'composer': TAG_MAP_ENTRY(getter='©wrt', setter='©wrt', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter=get_tracknum, + setter=set_tracknum, + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter=get_totaltracks, + setter=set_totaltracks, + type=int), + 'discnumber': TAG_MAP_ENTRY(getter=get_discnum, + setter=set_discnum, + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter=get_totaldiscs, + setter=set_totaldiscs, + type=int), + 'genre': TAG_MAP_ENTRY(getter='©gen', setter='©gen', type=str), + 'year': TAG_MAP_ENTRY(getter='©day', setter='©day', type=int, + sanitizer=util.sanitize_year), + 'lyrics': TAG_MAP_ENTRY(getter='©lyr', setter='©lyr', type=str), + 'isrc': TAG_MAP_ENTRY(getter=lambda f, k: freeform_get(f, _MP4_ISRC_KEY), + setter=lambda f, k, v: freeform_set(f, _MP4_ISRC_KEY, v), + remover=_MP4_ISRC_KEY, + type=str), + 'comment': TAG_MAP_ENTRY(getter='©cmt', setter='©cmt', type=str), + 'compilation': TAG_MAP_ENTRY(getter='cpil', setter='cpil', type=bool, + sanitizer=util.sanitize_bool), + + 'artwork': TAG_MAP_ENTRY(getter=get_artwork, setter=set_artwork, + type=Artwork), + } + + +class EasyMp4File(Mp4File): + tag_format = "mp4" + mutagen_kls = mutagen.easymp4.EasyMP4 + + _TAG_MAP = Mp4File._TAG_MAP.copy() + _TAG_MAP.update({ + 'tracktitle': TAG_MAP_ENTRY(getter='title', setter='title', type=str), + 'artist': TAG_MAP_ENTRY(getter='artist', setter='artist', type=str), + 'album': TAG_MAP_ENTRY(getter='album', setter='album', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='albumartist', setter='albumartist', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter=util.get_easy_tracknum, + setter=util.set_easy_tracknum, + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter=util.get_easy_totaltracks, + setter=util.set_easy_totaltracks, + type=int), + 'discnumber': TAG_MAP_ENTRY(getter=util.get_easy_discnum, + setter=util.set_easy_discnum, + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter=util.get_easy_totaldiscs, + setter=util.set_easy_totaldiscs, + type=int), + 'genre': TAG_MAP_ENTRY(getter='genre', setter='genre', type=str), + 'year': TAG_MAP_ENTRY(getter='date', setter='date', type=int, + sanitizer=util.sanitize_year), + 'compilation': TAG_MAP_ENTRY(getter='compilation', setter='compilation', + type=bool), + + 'artwork': TAG_MAP_ENTRY(getter='covr', type=Artwork), + }) diff --git a/mod/music_tag/smf.py b/mod/music_tag/smf.py new file mode 100644 index 0000000..cb99448 --- /dev/null +++ b/mod/music_tag/smf.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# coding: utf-8 + +import mutagen.smf + +from .file import AudioFile + + +# smf: standard midi file + + +class SmfFile(AudioFile): + tag_format = "SMF" + mutagen_kls = mutagen.smf.SMF + + def __init__(self, filename, **kwargs): + raise NotImplementedError("SMF format not implemented") diff --git a/mod/music_tag/util.py b/mod/music_tag/util.py new file mode 100644 index 0000000..c1fd1e4 --- /dev/null +++ b/mod/music_tag/util.py @@ -0,0 +1,154 @@ +from collections import namedtuple +import re +import struct + + +string_types = str + + +def as_str(value): + if isinstance(value, (list, tuple)): + value = ', '.join(value) + return str(value) + +def sanitize_year(year): + try: + if ',' in year: + year = year.split(',')[0] + # TODO: warn that we're dropping a 2nd year + except TypeError: + pass + + try: + year = int(year) + except ValueError: + if re.match(r'^[0-9]{4}[-\s][0-9]{2}[-\s][0-9]{2}$', year): + year = int(year[:4]) + elif re.match(r'^[0-9]{2}[-/\s][0-9]{2}[-/\s][0-9]{4}$', year): + year = int(year[-4:]) + else: + raise ValueError("Could not extract year from: {0}".format(year)) + return year + +def sanitize_int(val): + try: + ret = int(val) + except ValueError: + m = re.match(r'^.*?([0-9]+).*?$', val) + if m: + ret = int(m.group(1)) + else: + raise ValueError('int contains no in {0}'.format(val)) + return ret + +def sanitize_bool(val): + val = str(val).strip().lower() + if val in ('true', '1'): + return True + elif val in ('false', '0', ''): + return False + else: + return int(val) !=0 + +def get_easy_tracknum(afile, norm_key, _tag_name='tracknumber'): + tracknumber = str(afile.mfile.get(_tag_name, None)) + if tracknumber in (None, 'None'): + tracknumber = None + else: + tracknumber = tracknumber.split('/')[0] + return tracknumber + +def set_easy_tracknum(afile, norm_key, val, _tag_name='tracknumber'): + tracknumber = [i for i in str(afile.mfile.get(_tag_name, '0/0')).split('/')] + tracknumber += [0] * (2 - len(tracknumber)) + tracknumber[0] = val + afile.set_raw(norm_key, _tag_name, + '/'.join(str(i) for i in tracknumber), + appendable=False) + +def get_easy_totaltracks(afile, norm_key, _tag_name='tracknumber'): + tracknumber = str(afile.mfile.get(_tag_name, None)) + if tracknumber in (None, 'None'): + tracknumber = None + else: + try: + tracknumber = tracknumber.split('/')[1] + except IndexError: + tracknumber = None + return tracknumber + +def set_easy_totaltracks(afile, norm_key, val, _tag_name='tracknumber'): + tracknumber = [i for i in str(afile.mfile.get(_tag_name, '0/0')).split('/')] + tracknumber += [0] * (2 - len(tracknumber)) + tracknumber[1] = val + afile.set_raw(norm_key, _tag_name, + '/'.join(str(i) for i in tracknumber), + appendable=False) + +def get_easy_discnum(afile, norm_key, _tag_name='discnumber'): + discnumber = str(afile.mfile.get(_tag_name, None)) + if discnumber in (None, 'None'): + discnumber = None + else: + discnumber = discnumber.split('/')[0] + return discnumber + +def set_easy_discnum(afile, norm_key, val, _tag_name='discnumber'): + discnumber = [i for i in str(afile.mfile.get(_tag_name, '0/0')).split('/')] + discnumber += [0] * (2 - len(discnumber)) + discnumber[0] = val + afile.set_raw(norm_key, _tag_name, + '/'.join(str(i) for i in discnumber), + appendable=False) + +def get_easy_totaldiscs(afile, norm_key, _tag_name='discnumber'): + discnumber = str(afile.mfile.get(_tag_name, None)) + if discnumber in (None, 'None'): + discnumber = None + else: + try: + discnumber = discnumber.split('/')[1] + except IndexError: + discnumber = None + return discnumber + +def set_easy_totaldiscs(afile, norm_key, val, _tag_name='discnumber'): + discnumber = [i for i in str(afile.mfile.get(_tag_name, '0/0')).split('/')] + discnumber += [0] * (2 - len(discnumber)) + discnumber[1] = val + afile.set_raw(norm_key, _tag_name, + '/'.join(str(i) for i in discnumber), + appendable=False) + +PicBlock = namedtuple('PicBlock', ('typeid', 'picturetype', 'mime', 'format', + 'descr', 'width', 'height', 'color_depth', + 'colors_indexed', 'data')) +PICTURE_TYPE_LUT = {0: 'other', 1: 'icon', 2: 'other icon', 3: 'front cover', + 4: 'back cover', 5: 'leaflet', 6: 'media', 7: 'lead artist', + 8: 'artist', 9: 'conductor', 10: 'band', 11: 'composer', + 12: 'lyricist', 13: 'recording location', + 14: 'during recording', 15: 'during performance', + 16: 'screen capture', 17: 'coloured fish', 18: 'illustration', + 19: 'artist logo', 20: 'publisher logo'} + +def _split(it, i): + return it[:i], it[i:] + +def parse_picture_block(dat): + head, rest = _split(dat, 2 * 4) + typeid, mime_len = struct.unpack('>ii', head) + mime, rest = _split(rest, mime_len) + mime = mime.decode('ascii').lower() + head, rest = _split(rest, 1 * 4) + descr_len, = struct.unpack('>i', head) + descr, rest = _split(rest, descr_len) + descr = descr.decode('utf-8') + head, rest = _split(rest, 5 * 4) + width, height, cdepth, cidx, dat_len = struct.unpack('>iiiii', head) + dat = rest + pic = PicBlock(typeid=typeid, picturetype=PICTURE_TYPE_LUT[typeid], + mime=mime, format=mime.split('/')[1], + descr=descr, width=width, height=height, color_depth=cdepth, + colors_indexed=cidx, data=dat) + assert len(dat) == dat_len + return pic diff --git a/mod/music_tag/vorbis.py b/mod/music_tag/vorbis.py new file mode 100644 index 0000000..86246c3 --- /dev/null +++ b/mod/music_tag/vorbis.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# coding: utf-8 + +import base64 +import itertools + +import mutagen.ogg +import mutagen.oggvorbis +import mutagen.oggopus +import mutagen.oggflac +import mutagen.oggtheora +import mutagen.oggspeex + +from . import util +from .file import Artwork, AudioFile, MetadataItem, TAG_MAP_ENTRY + + +def get_pictures(afile, norm_key): + artworks = [] + + pics_dat = afile.mfile.get("coverart", []) + mimes = afile.mfile.get("coverartmime", []) + for dat, mime in itertools.zip_longest(pics_dat, mimes, fillvalue=""): + image_data = base64.b64decode(dat.encode("ascii")) + artworks = Artwork(image_data) + + for p in afile.mfile.tags['metadata_block_picture']: + pb = util.parse_picture_block(base64.standard_b64decode(p)) + art = Artwork(pb.data, width=pb.width, height=pb.height, fmt=pb.format) + artworks.append(art) + + return MetadataItem(Artwork, None, artworks) + +def set_pictures(afile, norm_key, artworks): + if not isinstance(artworks, MetadataItem): + raise TypeError() + + pics = [] + for i, art in enumerate(artworks.values): + if any(v is None for v in (art.mime, art.width, art.height, art.depth)): + raise ImportError("Please install Pillow to properly handle images") + pic = mutagen.flac.Picture() + pic.data = art.raw + pic.type = art.pic_type + pic.mime = art.mime + pic.width = art.width + pic.height = art.height + pic.depth = art.depth + + pic_data = base64.b64encode(pic.write()).decode('ascii') + pics.append(pic_data) + afile.mfile.tags['metadata_block_picture'] = pics + +def rm_pictures(afile, norm_key): + for k in ('coverart', 'coverartmime', 'metadata_block_picture'): + if k in afile.mfile.tags: + del afile.mfile.tags[k] + + +class OggFile(AudioFile): + tag_format = "Ogg" + mutagen_kls = mutagen.ogg.OggFileType + + _TAG_MAP = { + 'tracktitle': TAG_MAP_ENTRY(getter='title', setter='title', type=str), + 'artist': TAG_MAP_ENTRY(getter='artist', setter='artist', type=str), + 'album': TAG_MAP_ENTRY(getter='album', setter='album', type=str), + 'albumartist': TAG_MAP_ENTRY(getter='albumartist', setter='albumartist', + type=str), + 'composer': TAG_MAP_ENTRY(getter='composer', setter='composer', type=str), + 'tracknumber': TAG_MAP_ENTRY(getter='tracknumber', setter='tracknumber', + type=int), + 'totaltracks': TAG_MAP_ENTRY(getter='tracktotal', setter='tracktotal', + type=int), + 'discnumber': TAG_MAP_ENTRY(getter='discnumber', setter='discnumber', + type=int), + 'totaldiscs': TAG_MAP_ENTRY(getter='disctotal', setter='disctotal', + type=int), + 'genre': TAG_MAP_ENTRY(getter='genre', setter='genre', type=str), + 'year': TAG_MAP_ENTRY(getter=('date', 'originaldate'), + setter=('date', 'originaldate'), + type=int, sanitizer=util.sanitize_year), + 'lyrics': TAG_MAP_ENTRY(getter='lyrics', setter='lyrics', type=str), + 'isrc': TAG_MAP_ENTRY(getter='isrc', setter='isrc', type=str), + 'comment': TAG_MAP_ENTRY(getter='comment', setter='comment', type=str), + 'compilation': TAG_MAP_ENTRY(getter='compilation', setter='compilation', + type=int, sanitizer=util.sanitize_bool), + + 'artwork': TAG_MAP_ENTRY(getter=get_pictures, setter=set_pictures, + remover=rm_pictures, + type=Artwork), + } + + def _ft_setter(self, key, md_val, appendable=True): + if self.appendable and appendable: + self.mfile.tags[key] = [str(v) for v in md_val.values] + else: + self.mfile.tags[key] = str(md_val.value) + + +class OggFlacFile(OggFile): + tag_format = "OggFlac" + mutagen_kls = mutagen.oggflac.OggFLAC + + +class OggSpeexFile(OggFile): + tag_format = "OggSpeex" + mutagen_kls = mutagen.oggspeex.OggSpeex + + +class OggTheoraFile(OggFile): + tag_format = "OggTheora" + mutagen_kls = mutagen.oggtheora.OggTheora + + +class OggVorbisFile(OggFile): + tag_format = "OggVorbis" + mutagen_kls = mutagen.oggvorbis.OggVorbis + + _TAG_MAP = OggFile._TAG_MAP.copy() + _TAG_MAP.update({ + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'Ogg Vorbis', + type=str), + '#bitspersample': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + }) + + +class OggOpusFile(OggFile): + tag_format = "OggOpus" + mutagen_kls = mutagen.oggopus.OggOpus + + _TAG_MAP = OggFile._TAG_MAP.copy() + _TAG_MAP.update({ + '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'Ogg Opus', + type=str), + '#bitspersample': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + '#samplerate': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + '#bitrate': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + type=int), + }) diff --git a/mod/music_tag/wave.py b/mod/music_tag/wave.py new file mode 100644 index 0000000..c7c9176 --- /dev/null +++ b/mod/music_tag/wave.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# coding: utf-8 + +try: + import mutagen.wave + + from component.music_tag.id3 import Id3File + + + class WaveId3File(Id3File): + tag_format = "Wave[Id3]" + mutagen_kls = mutagen.wave.WAVE + + def __init__(self, filename, **kwargs): + super(WaveId3File, self).__init__(filename, **kwargs) + + # self.tag_map = self.tag_map.copy() + # self.tag_map.update({ + # '#codec': TAG_MAP_ENTRY(getter=lambda afile, norm_key: 'mp3', + # type=str), + # '#bitspersample': TAG_MAP_ENTRY(getter=lambda afile, norm_key: None, + # type=int), + # }) + +except ImportError: + pass diff --git a/mod/tag.py b/mod/tag.py new file mode 100644 index 0000000..226fd74 --- /dev/null +++ b/mod/tag.py @@ -0,0 +1,80 @@ +import base64 +import os +import io + +from PIL import Image + +from mod import music_tag + +TAG_MAP = { + 'tracktitle': '曲目标题', + 'artist': '艺术家', + 'album': '专辑', + 'year': '年份', + 'lyrics': '歌词', + 'artwork': '封面' +} + + +def dump_b64(album_art: music_tag.file.MetadataItem): + """ + 以图片加载MetadataItem对象并进行base64编码 + :param album_art: + :return: + """ + artwork = album_art.values[0] + img_data = artwork.data + img_format = artwork.format + img = Image.open(io.BytesIO(img_data)) + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format=img_format) + # 将字节流编码为base64字符串 + img_base64 = base64.b64encode(img_byte_arr.getvalue()) + return img_base64 + + +def tin(tags: dict, file: any) -> None: + """ + :param tags: 字典,包含Tags数据,详见TAG_MAP + :param file: string, file-like object, io.StringIO, etc. + :return: None + """ + if not isinstance(tags, dict): + raise TypeError(f'Tags should be dict, but {type(tags).__name__} found.') + file_path = file if isinstance(file, str) else (file.name if hasattr(file, 'name') else None) + if not file_path or not os.path.exists(file_path): + raise FileNotFoundError(f'File {file_path} does not exist or path is invalid.') + + music_file_obj = music_tag.load_file(file) + for tag_name, tag_value in tags.items(): + if tag_name in TAG_MAP and tag_value: + music_file_obj[tag_name] = tag_value + else: + # 具体是跳过还是del待定 + continue + + music_file_obj.save() + + +def tout(file: any) -> dict: + file_path = file if isinstance(file, str) else (file.name if hasattr(file, 'name') else None) + if not file_path or not os.path.exists(file_path): + return {} + result = {} + for tag_name, tag_func in TAG_MAP.items(): + if tag_name == "artwork": + result[tag_name] = dump_b64(music_tag.load_file(file_path).resolve(tag_name)) + else: + result[tag_name] = str(music_tag.load_file(file_path).resolve(tag_name)) + return result + + +if __name__ == '__main__': + val_tags = { + 'tracktitle': '曲目标题', + 'artist': '艺术家', + 'album': '专辑', + 'year': 2022, + 'lyrics': '歌词' + } + print(tout(r'H:\sp\test.mp3')) diff --git a/mod/tags.py b/mod/tags.py deleted file mode 100644 index 95ff894..0000000 --- a/mod/tags.py +++ /dev/null @@ -1,97 +0,0 @@ -from mutagen import File - - -def id3_lrc(path, lrc, read=False): - from mutagen.id3 import ID3, USLT - audio = ID3(path) - if read: - matching_keys = [key for key in audio.keys() if key.startswith("USLT")] - for key in matching_keys: - try: - return audio[key].text - except: - continue - else: - # 设置歌词 - audio["USLT"] = USLT(encoding=3, lang="eng", text=lrc) - # 保存更改 - audio.save(path) - return True - - -def ogg_lrc(path, lrc, read=False): - from mutagen.oggvorbis import OggVorbis - # 读取 Ogg Vorbis 文件 - audio = OggVorbis(path) - if read: - return audio["lyrics"] - else: - # 设置歌词 - audio["lyrics"] = lrc - # 保存更改 - audio.save() - return True - - -def lrcs(path, lrc, read=False): - """ - 通过读取ID3和Vorbis标签查找信息 - :param path: - :param lrc: - :param read: - :return: - """ - tag_objs = [id3_lrc, ogg_lrc] - for tag_obj in tag_objs: - try: - result = tag_obj(path, lrc, read) - return result - except Exception: - continue - return False - - -def w_file(path: str, tags: dict) -> int: - audio = File(path, easy=True) - title = tags.get("title", None) - artist = tags.get("artist", None) - album = tags.get("album", None) - lyrics = tags.get("lyrics", None) - if title is not None: - audio["title"] = title - if artist is not None: - audio["artist"] = artist - if album is not None: - audio["album"] = album - try: - if lyrics is not None: - audio["lyrics"] = lyrics - audio.save() - return 0 - except Exception as e: - result = lrcs(path, lyrics) - if not result: - return -1 - try: - audio.save() - except Exception as e: - return -2 - return 0 - - -def r_lrc(path: str) -> str: - audio = File(path, easy=True) - lyrics = audio.get("lyrics", None) - if lyrics is None: - lyrics = lrcs(path, None, read=True) - if isinstance(lyrics, list): - result = lyrics[0] - else: - result = lyrics - if not isinstance(result, str): - raise TypeError("The result should be a string.") - return result - - -if __name__ == "__main__": - file_path = "" diff --git a/requirements.txt b/requirements.txt index 57fb323..2792edd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ Requests==2.31.0 tinytag==1.9.0 waitress==2.1.2 mutagen==1.46.0 -aiohttp>=3.9.1 \ No newline at end of file +aiohttp>=3.9.1 +Pillow>=10.1.0 \ No newline at end of file