From b12a99bc295e85aec107fc666f05c5c1b9f78e6f Mon Sep 17 00:00:00 2001 From: solstice0 Date: Sat, 16 Sep 2023 22:01:34 -0500 Subject: [PATCH] Add ability to tag DR value to files using mutagen --- audio_io/audio_io.py | 46 ++++++++++++++------------------------------ main.py | 42 ++++++++++++++++++++++++++++++++++------ requirements.txt | 3 ++- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/audio_io/audio_io.py b/audio_io/audio_io.py index c5f1941..8450c58 100755 --- a/audio_io/audio_io.py +++ b/audio_io/audio_io.py @@ -1,5 +1,4 @@ import itertools -import json import os import subprocess as sp import sys @@ -9,6 +8,7 @@ from subprocess import DEVNULL, PIPE from typing import NamedTuple, Iterator, Iterable, List, Optional, Sequence +import mutagen import numpy as np from audio_io.cue.cue_parser import CueCmd, parse_cue_str, read_cue_from_file @@ -63,11 +63,11 @@ class TagKey(str, Enum): def get_tag_with_alternatives(tags: dict, tag_key: TagKey) -> Optional[str]: exact_match = tags.get(tag_key) if exact_match: - return exact_match + return exact_match[0] for alt_key in _tag_alternatives.get(tag_key, ()): v = tags.get(alt_key) if v: - return v + return v[0] return None @@ -267,43 +267,25 @@ def _test_ffmpeg(): sys.exit('ffmpeg not installed, broken or not on PATH') -def _parse_audio_metadata(in_path: str, data_from_ffprobe: dict) -> AudioFileMetadata: - def get(*keys, default_value=None): - d = data_from_ffprobe - for k in keys: - try: - d = d[k] - except (KeyError, IndexError): - return default_value - return d - - tags = {key.upper(): val for key, val in get('format', 'tags', default_value={}).items()} +def _parse_audio_metadata(in_path: str, mutagen_file) -> AudioFileMetadata: + def get_sample_rate(mutagen_file): + if isinstance(mutagen_file, mutagen.oggopus.OggOpus): + return 48000 + return mutagen_file.info.sample_rate + return AudioFileMetadata( file_path=in_path, - channel_count=int(get('streams', 0, 'channels')), - sample_rate=int(get('streams', 0, 'sample_rate')), - tags=tags, - cuesheet=tags.get(TagKey.CUESHEET)) + channel_count=mutagen_file.info.channels, + sample_rate=get_sample_rate(mutagen_file), + tags=mutagen_file.tags, + cuesheet=mutagen_file.get(TagKey.CUESHEET, None)) def read_audio_file_metadata(in_path) -> AudioFileMetadata: if not path.exists(in_path): raise ValueError(f'Path "{in_path}" doesn''t exist') - p = sp.Popen( - (ex_ffprobe, - '-v', 'error', - '-print_format', 'json', - '-select_streams', 'a:0', - '-show_entries', 'stream=channels,sample_rate', - '-show_entries', 'format_tags', - in_path), - stdout=PIPE, stderr=PIPE) - out, err = p.communicate() - returncode = p.returncode - if returncode != 0: - raise Exception('ffprobe returned {}'.format(returncode)) - audio_metadata = _parse_audio_metadata(in_path, json.loads(out)) + audio_metadata = _parse_audio_metadata(in_path, mutagen.File(in_path, easy=True)) assert audio_metadata.channel_count >= 1 return audio_metadata diff --git a/main.py b/main.py index 9c4783e..b9652c6 100755 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from datetime import datetime from typing import Iterable, Tuple, NamedTuple +import mutagen import numpy from audio_io import read_audio_info, read_audio_data, TagKey, TrackInfo, get_tag_with_alternatives @@ -28,7 +29,7 @@ class LogGroup(NamedTuple): albums: Iterable[str] channels: int sample_rate: int - tracks_dr: Iterable[Tuple[int, float, float, int, str]] + tracks_dr: Iterable[Tuple[int, float, float, int, str, str]] def get_group_title(group: LogGroup): @@ -55,7 +56,7 @@ def write_log(write_fun, dr_log_groups: Iterable[LogGroup], average_dr): w(f"{l1}\nAnalyzed: {group_name}\n{l1}\n\nDR Peak RMS Duration Track\n{l1}\n") track_count = 0 - for dr, peak, rms, duration_sec, track_name in group.tracks_dr: + for dr, peak, rms, duration_sec, track_name, file_path in group.tracks_dr: dr_formatted = f"DR{str(dr).ljust(4)}" if dr is not None else "N/A " w(dr_formatted + f"{peak:9.2f} dB" @@ -66,13 +67,29 @@ def write_log(write_fun, dr_log_groups: Iterable[LogGroup], average_dr): w(f"{l1}\n\nNumber of tracks: {track_count}\nOfficial DR value: DR{average_dr}\n\n" f"Samplerate: {group.sample_rate} Hz\nChannels: {group.channels}\n{l2}\n\n") +def write_tags(dr_log_groups: Iterable[LogGroup]): + for group in dr_log_groups: + print(f"writing tags for {get_group_title(group)}...") + for track in group.tracks_dr: + path = track[5] + dr_value = str(track[0]) + mutagen_file = mutagen.File(path, easy=False) + if isinstance(mutagen_file, mutagen.mp3.MP3): + mutagen_file.tags.add(mutagen.id3.TXXX(encoding=mutagen.id3.Encoding.UTF8, desc=u"DR", text=dr_value)) + elif isinstance(mutagen_file, mutagen.mp4.MP4): + mutagen_file["----:com.apple.iTunes:DR"] = mutagen.mp4.MP4FreeForm(dr_value.encode()) + else: + mutagen_file["DR"] = dr_value + mutagen_file.save() + print("DR tags written!") + return def flatmap(f, items): for i in items: yield from f(i) -def make_log_groups(l: Iterable[Tuple[AudioSourceInfo, Iterable[Tuple[int, float, float, int, str]]]]): +def make_log_groups(l: Iterable[Tuple[AudioSourceInfo, Iterable[Tuple[int, float, float, int, str, str]]]]): import itertools grouped = itertools.groupby(l, key=lambda x: (x[0].channel_count, x[0].sample_rate)) @@ -95,6 +112,7 @@ def parse_args(): ap.add_argument("--no-log", help='Do not write log (dr.txt), by default a log file is written after analysis', action='store_true') ap.add_argument("--keep-precision", help='Do not round values, this also disables log', action='store_true') + ap.add_argument("--tag", help='Tag the audio files with the computed DR value. ', action='store_true') ap.add_argument("--no-resample", help='Do not resample everything to 44.1kHz (unlike the "standard" meter), ' 'this also disables log', action='store_true') @@ -120,6 +138,7 @@ def main(): and not args.no_resample keep_precision = args.keep_precision no_resample = args.no_resample + should_tag = args.tag if should_write_log: log_path = get_log_path(input_path) @@ -138,14 +157,25 @@ def track_cb(track_info: TrackInfo, dr): print(f'Official DR = {dr_mean}, Median DR = {dr_median}') print(f'Analyzed all tracks in {time.time() - time_start:.2f} seconds') + dr_log_items_list = [LogGroup(performers=item.performers, + albums=item.albums, + channels=item.channels, + sample_rate=item.sample_rate, + tracks_dr=[tuple(track) for track in item.tracks_dr] + ) for item in dr_log_items] + if should_write_log: # noinspection PyUnboundLocalVariable print(f'writing log to {log_path}') with open(log_path, mode='x', encoding='utf8') as f: - write_log(f.write, dr_log_items, dr_mean) + write_log(f.write, dr_log_items_list, dr_mean) print('…done') else: - write_log(sys.stdout.write, dr_log_items, dr_mean) + write_log(sys.stdout.write, dr_log_items_list, dr_mean) + + if should_tag: + write_tags(dr_log_items_list) + fix_tty() @@ -243,7 +273,7 @@ def process_results(audio_info_part, analyzed_tracks): title = get_tag_with_alternatives(track_info.tags, TagKey.TITLE) dr_log_subitems.append( (dr, dr_metrics.peak, dr_metrics.rms, duration_seconds, - f"{track_info.global_index:02d}-{title}")) + f"{track_info.global_index:02d}-{title}", audio_info_part[0])) return track_results def process_part(map_impl, audio_info_part: AudioSourceInfo): diff --git a/requirements.txt b/requirements.txt index 9f96323..d8cdc2a 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ numpy>=1.13.3 -chardet>=3.0.4 \ No newline at end of file +chardet>=3.0.4 +mutagen>= \ No newline at end of file