Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to tag DR value to files #11

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 14 additions & 32 deletions audio_io/audio_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import itertools
import json
import os
import subprocess as sp
import sys
Expand All @@ -9,6 +8,7 @@
from subprocess import DEVNULL, PIPE
from typing import NamedTuple, Iterator, Iterable, List, Optional, Sequence

import mutagen
import numpy as np

from audio_io.cue.cue_parser import CueCmd, parse_cue_str, read_cue_from_file
Expand Down Expand Up @@ -63,11 +63,11 @@ class TagKey(str, Enum):
def get_tag_with_alternatives(tags: dict, tag_key: TagKey) -> Optional[str]:
exact_match = tags.get(tag_key)
if exact_match:
return exact_match
return exact_match[0]
for alt_key in _tag_alternatives.get(tag_key, ()):
v = tags.get(alt_key)
if v:
return v
return v[0]
return None


Expand Down Expand Up @@ -267,43 +267,25 @@ def _test_ffmpeg():
sys.exit('ffmpeg not installed, broken or not on PATH')


def _parse_audio_metadata(in_path: str, data_from_ffprobe: dict) -> AudioFileMetadata:
def get(*keys, default_value=None):
d = data_from_ffprobe
for k in keys:
try:
d = d[k]
except (KeyError, IndexError):
return default_value
return d

tags = {key.upper(): val for key, val in get('format', 'tags', default_value={}).items()}
def _parse_audio_metadata(in_path: str, mutagen_file) -> AudioFileMetadata:
def get_sample_rate(mutagen_file):
if isinstance(mutagen_file, mutagen.oggopus.OggOpus):
return 48000
return mutagen_file.info.sample_rate

return AudioFileMetadata(
file_path=in_path,
channel_count=int(get('streams', 0, 'channels')),
sample_rate=int(get('streams', 0, 'sample_rate')),
tags=tags,
cuesheet=tags.get(TagKey.CUESHEET))
channel_count=mutagen_file.info.channels,
sample_rate=get_sample_rate(mutagen_file),
tags=mutagen_file.tags,
cuesheet=mutagen_file.get(TagKey.CUESHEET, None))


def read_audio_file_metadata(in_path) -> AudioFileMetadata:
if not path.exists(in_path):
raise ValueError(f'Path "{in_path}" doesn''t exist')

p = sp.Popen(
(ex_ffprobe,
'-v', 'error',
'-print_format', 'json',
'-select_streams', 'a:0',
'-show_entries', 'stream=channels,sample_rate',
'-show_entries', 'format_tags',
in_path),
stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
returncode = p.returncode
if returncode != 0:
raise Exception('ffprobe returned {}'.format(returncode))
audio_metadata = _parse_audio_metadata(in_path, json.loads(out))
audio_metadata = _parse_audio_metadata(in_path, mutagen.File(in_path, easy=True))
assert audio_metadata.channel_count >= 1
return audio_metadata

Expand Down
42 changes: 36 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from typing import Iterable, Tuple, NamedTuple

import mutagen
import numpy

from audio_io import read_audio_info, read_audio_data, TagKey, TrackInfo, get_tag_with_alternatives
Expand All @@ -28,7 +29,7 @@ class LogGroup(NamedTuple):
albums: Iterable[str]
channels: int
sample_rate: int
tracks_dr: Iterable[Tuple[int, float, float, int, str]]
tracks_dr: Iterable[Tuple[int, float, float, int, str, str]]


def get_group_title(group: LogGroup):
Expand All @@ -55,7 +56,7 @@ def write_log(write_fun, dr_log_groups: Iterable[LogGroup], average_dr):

w(f"{l1}\nAnalyzed: {group_name}\n{l1}\n\nDR Peak RMS Duration Track\n{l1}\n")
track_count = 0
for dr, peak, rms, duration_sec, track_name in group.tracks_dr:
for dr, peak, rms, duration_sec, track_name, file_path in group.tracks_dr:
dr_formatted = f"DR{str(dr).ljust(4)}" if dr is not None else "N/A "
w(dr_formatted +
f"{peak:9.2f} dB"
Expand All @@ -66,13 +67,29 @@ def write_log(write_fun, dr_log_groups: Iterable[LogGroup], average_dr):
w(f"{l1}\n\nNumber of tracks: {track_count}\nOfficial DR value: DR{average_dr}\n\n"
f"Samplerate: {group.sample_rate} Hz\nChannels: {group.channels}\n{l2}\n\n")

def write_tags(dr_log_groups: Iterable[LogGroup]):
for group in dr_log_groups:
print(f"writing tags for {get_group_title(group)}...")
for track in group.tracks_dr:
path = track[5]
dr_value = str(track[0])
mutagen_file = mutagen.File(path, easy=False)
if isinstance(mutagen_file, mutagen.mp3.MP3):
mutagen_file.tags.add(mutagen.id3.TXXX(encoding=mutagen.id3.Encoding.UTF8, desc=u"DR", text=dr_value))
elif isinstance(mutagen_file, mutagen.mp4.MP4):
mutagen_file["----:com.apple.iTunes:DR"] = mutagen.mp4.MP4FreeForm(dr_value.encode())
else:
mutagen_file["DR"] = dr_value
mutagen_file.save()
print("DR tags written!")
return

def flatmap(f, items):
for i in items:
yield from f(i)


def make_log_groups(l: Iterable[Tuple[AudioSourceInfo, Iterable[Tuple[int, float, float, int, str]]]]):
def make_log_groups(l: Iterable[Tuple[AudioSourceInfo, Iterable[Tuple[int, float, float, int, str, str]]]]):
import itertools
grouped = itertools.groupby(l, key=lambda x: (x[0].channel_count, x[0].sample_rate))

Expand All @@ -95,6 +112,7 @@ def parse_args():
ap.add_argument("--no-log", help='Do not write log (dr.txt), by default a log file is written after analysis',
action='store_true')
ap.add_argument("--keep-precision", help='Do not round values, this also disables log', action='store_true')
ap.add_argument("--tag", help='Tag the audio files with the computed DR value. ', action='store_true')
ap.add_argument("--no-resample", help='Do not resample everything to 44.1kHz (unlike the "standard" meter), '
'this also disables log',
action='store_true')
Expand All @@ -120,6 +138,7 @@ def main():
and not args.no_resample
keep_precision = args.keep_precision
no_resample = args.no_resample
should_tag = args.tag

if should_write_log:
log_path = get_log_path(input_path)
Expand All @@ -138,14 +157,25 @@ def track_cb(track_info: TrackInfo, dr):
print(f'Official DR = {dr_mean}, Median DR = {dr_median}')
print(f'Analyzed all tracks in {time.time() - time_start:.2f} seconds')

dr_log_items_list = [LogGroup(performers=item.performers,
albums=item.albums,
channels=item.channels,
sample_rate=item.sample_rate,
tracks_dr=[tuple(track) for track in item.tracks_dr]
) for item in dr_log_items]

if should_write_log:
# noinspection PyUnboundLocalVariable
print(f'writing log to {log_path}')
with open(log_path, mode='x', encoding='utf8') as f:
write_log(f.write, dr_log_items, dr_mean)
write_log(f.write, dr_log_items_list, dr_mean)
print('…done')
else:
write_log(sys.stdout.write, dr_log_items, dr_mean)
write_log(sys.stdout.write, dr_log_items_list, dr_mean)

if should_tag:
write_tags(dr_log_items_list)

fix_tty()


Expand Down Expand Up @@ -243,7 +273,7 @@ def process_results(audio_info_part, analyzed_tracks):
title = get_tag_with_alternatives(track_info.tags, TagKey.TITLE)
dr_log_subitems.append(
(dr, dr_metrics.peak, dr_metrics.rms, duration_seconds,
f"{track_info.global_index:02d}-{title}"))
f"{track_info.global_index:02d}-{title}", audio_info_part[0]))
return track_results

def process_part(map_impl, audio_info_part: AudioSourceInfo):
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
numpy>=1.13.3
chardet>=3.0.4
chardet>=3.0.4
mutagen>=