mkogg.py: Fix 'self.get_mp4_metadata(self, source)'
[blog.git] / posts / mkogg / mkogg.py
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2009-2015 W. Trevor King <wking@drexel.edu>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Mirror a tree of audio files in another format.
20
21 Conversion between any of the following formats are supported:
22
23 * flac
24 * mp4 (decoding only)
25 * mp3
26 * ogg (Vorbis)
27 * wav
28
29 External packages required for full functionality:
30
31 * lame_ (`lame`)
32 * faad_ (`faad`)
33 * flac_ (`flac`)
34 * mpg123_ (`mpg123`)
35 * vorbis_ (`ogg123`, `oggenc`)
36 * mutagen_ (metadata conversion)
37
38 .. _lame: http://lame.sourceforge.net/
39 .. _faad: http://www.audiocoding.com/faad2.html
40 .. _flac: http://flac.sourceforge.net/
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com/
43 .. _mutagen: http://code.google.com/p/mutagen/
44 """
45
46 from hashlib import sha256 as _hash
47 import os as _os
48 import re as _re
49 import shutil as _shutil
50 import subprocess as _subprocess
51 import tempfile as _tempfile
52
53 try:
54     import mutagen as _mutagen
55     import mutagen.flac as _mutagen_flac
56     import mutagen.id3 as _mutagen_id3
57     import mutagen.mp4 as _mutagen_mp4
58     import mutagen.mp3 as _mutagen_mp3
59     import mutagen.oggvorbis as _mutagen_oggvorbis
60 except ImportError as error:
61     _mutagen = None
62     _mutagen_import_error = error
63
64
65 __version__ = '0.5'
66
67
68 def invoke(args, stdin=None, expect=(0,)):
69     print('  {}'.format(args))
70     p = _subprocess.Popen(
71         args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
72         stderr=_subprocess.PIPE)
73     stdout,stderr = p.communicate(stdin)
74     status = p.wait()
75     assert status in expect, 'invalid status {} from {}'.format(status, args)
76     return (status, stdout, stderr)
77
78
79 class Converter (object):
80     """Recode audio files from `source_dir` to `target_dir`.
81
82     `target_extension` sets the target encoding.
83
84     Notes
85     -----
86
87     The `get_` and `set_*_metadata` methods should pass metadata as a
88     `dict` with key/value pairs standardised to match the list of
89     Vorbis comment suggestions_ with lowercase keys.  The `date` field
90     should be formatted `YYYY[-MM[-DD]]`.  The dict values should be
91     lists to support repeated entries for a given tag.
92
93     .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
94     """
95     id3_to_vorbis_keys = {
96         'comm': 'comment',
97         'talb': 'album',
98         'tcom': 'composer',
99         'tcop': 'copyright',
100         'tit2': 'title',
101         'tpe1': 'artist',
102         'tpe2': 'accompaniment',
103         'tpe3': 'conductor',
104         'tpos': 'part of set',
105         'tpub': 'organization',  # publisher
106         'trck': 'tracknumber',
107         'tyer': 'date',
108         }
109
110     mp4_to_vorbis_keys = {
111         '\xa9cmt': 'comment',
112         '\xa9alb': 'album',
113         #'tcom': 'composer',
114         'cprt': 'copyright',
115         '\xa9nam': 'title',
116         '\xa9ART': 'artist',
117         #'tpe2': 'accompaniment',
118         #'tpe3': 'conductor',
119         'disk': 'part of set',
120         #'tpub': 'organization',  # publisher
121         'trkn': 'tracknumber',
122         '\xa9day': 'date',
123         }
124
125     def __init__(self, source_dir, target_dir, target_extension='ogg',
126                  cache_file=None, hash=True, ignore_function=None):
127         self.source_dir = source_dir
128         self.target_dir = target_dir
129         self._source_extensions = ['flac', 'm4a', 'mp3', 'mp4', 'ogg', 'wav']
130         self._target_extension = target_extension
131         self._cache_file = cache_file
132         self._cache = self._read_cache()
133         self._hash = hash
134         self._ignore_function = ignore_function
135         f,self._tempfile = _tempfile.mkstemp(prefix='mkogg-')
136
137     def cleanup(self):
138         _os.remove(self._tempfile)
139         self._save_cache()
140
141     def _read_cache(self):
142         cache = {}
143         if self._cache_file == None:
144             return cache
145         try:
146             with open(self._cache_file, 'r') as f:
147                 line = f.readline()
148                 assert line.startswith('# mkogg cache version:'), line
149                 version = line.split(':', 1)[-1].strip()
150                 if version != __version__:
151                     print('cache version mismatch: {} != {}'.format(
152                             version, __version__))
153                     return cache  # old cache, ignore contents
154                 for line in f:
155                     try:
156                         key,value = [x.strip() for x in line.split(' -> ')]
157                     except ValueError:
158                         pass
159                     cache[key] = value
160         except IOError:
161             pass
162         return cache
163
164     def _save_cache(self):
165         if self._cache_file == None:
166             return
167         with open(self._cache_file, 'w') as f:
168             f.write('# mkogg cache version: {}\n'.format(__version__))
169             for key,value in self._cache.items():
170                 f.write('{} -> {}\n'.format(key, value))
171
172     def run(self):
173         self._makedirs(self.target_dir)
174         for dirpath,dirnames,filenames in _os.walk(self.source_dir):
175             for filename in filenames:
176                 root,ext = _os.path.splitext(filename)
177                 ext = ext.lower()
178                 if ext.startswith('.'):
179                     ext = ext[1:]
180                 if ext not in self._source_extensions:
181                     print('skip', filename, ext)
182                     continue
183                 source_path = _os.path.join(dirpath, filename)
184                 if (self._ignore_function is not None and
185                     self._ignore_function(source_path)):
186                     continue
187                 rel_path = _os.path.relpath(dirpath, self.source_dir)
188                 target_path = _os.path.join(
189                     self.target_dir, rel_path,
190                     '{}.{}'.format(root, self._target_extension))
191                 target_dir = _os.path.dirname(target_path)
192                 self._makedirs(target_dir)
193                 self._convert(source_path, target_path, ext)
194
195     def _makedirs(self, target_dir):
196         if not _os.path.exists(target_dir):
197             _os.makedirs(target_dir)
198
199     def _convert(self, source, target, ext):
200         if self._hash:
201             cache_key = self._cache_key(source)
202             old_cache_value = self._cache.get(cache_key, None)
203             if (old_cache_value != None and
204                 old_cache_value == self._cache_value(target)):
205                 print('already cached {} to {}'.format(source, target))
206                 return
207         elif _os.path.exists(target):
208             print('target {} already exists'.format(target))
209             return
210         print('convert {} to {}'.format(source, target))
211         if ext == self._target_extension:
212             _shutil.copy(source, target)
213             return
214         try:
215             convert = getattr(self, 'convert_{}_to_{}'.format(
216                     ext, self._target_extension))
217         except AttributeError:
218             to_wav = getattr(self, 'convert_{}_to_wav'.format(ext))
219             from_wav = getattr(self, 'convert_wav_to_{}'.format(
220                     self._target_extension))
221             def convert(source, target):
222                 to_wav(source, self._tempfile)
223                 from_wav(self._tempfile, target)
224         convert(source, target)
225         if not getattr(convert, 'handles_metadata', False):
226             get_metadata = getattr(self, 'get_{}_metadata'.format(ext))
227             metadata = get_metadata(source)
228             set_metadata = getattr(self, 'set_{}_metadata'.format(
229                     self._target_extension))
230             set_metadata(target, metadata)
231         if not self._hash:
232             cache_key = self._cache_key(source)
233         self._cache[cache_key] = self._cache_value(target)
234
235     def _cache_key(self, source):
236         return repr((self._file_hash(source), self._target_extension))
237
238     def _cache_value(self, target):
239         return self._file_hash(target)
240
241     def _file_hash(self, filename):
242         """
243         Examples
244         --------
245         >>> c = Converter(None, None)
246         >>> h = c._file_hash(__file__)
247         >>> len(h)
248         64
249         >>> c._file_hash('/highly/unlikely/to/exist') == None
250         True
251         >>> c.cleanup()
252         """
253         h = _hash()
254         chunk_size = 2**20  # 1 Mb
255         try:
256             with open(filename, 'rb') as f:
257                 chunk = ' '
258                 while len(chunk) > 0:
259                     chunk = f.read(chunk_size)
260                     h.update(chunk)
261         except IOError:
262             return None
263         return str(h.hexdigest())
264
265     def _set_vorbis_comments(self, container, metadata):
266         container.delete()
267         if type(metadata) == dict:
268             items = sorted(metadata.items())
269         else:
270             items = metadata.items()
271         for key,value in items:
272             # leave key case alone, because Mutagen downcases Vorbis
273             # keys internally.
274             container[key] = value
275
276     def _parse_date(self, date):
277         """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
278
279         Examples
280         --------
281         >>> c = Converter(None, None)
282         >>> c._parse_date('2010')
283         ['2010', None, None]
284         >>> c._parse_date('2010-11')
285         ['2010', '11', None]
286         >>> c._parse_date('2010-11-16')
287         ['2010', '11', '16']
288         >>> c.cleanup()
289         """
290         fields = date.split('-')
291         assert len(fields) > 0 and len(fields) <= 3, date
292         fields = fields + [None] * (3 - len(fields))
293         return fields
294
295     def _construct_id3_trck(self, metadata):
296         if 'tracknumber' not in metadata:
297             return (None, None)
298         if 'tracktotal' in metadata:
299             value = []
300             for i,v in enumerate(metadata['tracknumber']):
301                 value.append('{}/{}'.format(
302                         v, metadata['tracktotal'][i]))
303         else:
304             value = metadata['tracknumber']
305         key = 'tracknumber'
306         return (key, value)
307
308     def _guess_id3_encoding(self, text_list):
309         for id3_encoding,encoding in [(0, 'ISO-8859-1'), (3, 'utf-8')]:
310             encoding_success = True
311             for text in text_list:
312                 if isinstance(text, str):
313                     try:
314                         text.encode(encoding)
315                     except UnicodeEncodeError:
316                         encoding_success = False
317                         break
318             if encoding_success:
319                 return id3_encoding
320         raise ValueError(text_list)
321
322     def convert_flac_to_wav(self, source, target):
323         invoke(['ogg123', '-d', 'wav', '-f', target, source])
324
325     def convert_flac_to_ogg(self, source, target):
326         invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
327     convert_flac_to_ogg.handles_metadata = True
328
329     def convert_m4a_to_wav(self, source, target):
330         invoke(['faad', '-o', target, source])
331
332     def convert_mp3_to_wav(self, source, target):
333         invoke(['mpg123',  '-w', target, source])
334
335     def convert_mp4_to_wav(self, source, target):
336         invoke(['faad', '-o', target, source])
337
338     def convert_ogg_to_wav(self, source, target):
339         self.convert_flac_to_wav(source, target)
340
341     def convert_wav_to_flac(self, source, target):
342         invoke(['flac', '--force', '--output-name', target, source])
343
344     def convert_wav_to_mp3(self, source, target):
345         invoke(['lame', '--quiet', '-V', '4', source, target])
346
347     def convert_wav_to_ogg(self, source, target):
348         self.convert_flac_to_ogg(source, target)
349
350     def get_flac_metadata(self, source):
351         if _mutagen is None:
352             raise _mutagen_import_error
353         return _mutagen_flac.FLAC(source)
354
355     def get_m4a_metadata(self, source):
356         return self.get_mp4_metadata(source)
357
358     def get_mp3_metadata(self, source):
359         if _mutagen is None:
360             raise _mutagen_import_error
361         mp3 = _mutagen_mp3.MP3(source)
362         metadata = {}
363         for key,value in mp3.items():
364             try:
365                 vorbis_key = self.id3_to_vorbis_keys[key.lower()]
366             except KeyError:
367                 continue
368             v = value.text
369             if vorbis_key == 'tracknumber':
370                 for i,v_entry in enumerate(v):
371                     if '/' in v_entry:
372                         tracknumber,tracktotal = v_entry.split('/', 1)
373                         v[i] = tracknumber
374                         metadata['tracktotal'] = ['tracktotal']
375             metadata[vorbis_key] = v
376         return metadata
377
378     def get_mp4_metadata(self, source):
379         if _mutagen is None:
380             raise _mutagen_import_error
381         mp4 = _mutagen_mp4.MP4(source)
382         metadata = {}
383         for key,value in mp4.items():
384             try:
385                 vorbis_key = self.mp4_to_vorbis_keys[key.lower()]
386             except KeyError:
387                 continue
388             if vorbis_key == 'tracknumber':
389                 tracknumber,tracktotal = value
390                 value = tracknumber
391                 if tracktotal:
392                     metadata['tracktotal'] = [str(tracktotal)]
393             elif vorbis_key == 'part of set':
394                 disknumber,disktotal = value
395                 value = disknumber
396                 if disktotal:
397                     metadata['set total'] = [str(disktotal)]
398             try:
399                 metadata[vorbis_key] = [str(value)]
400             except UnicodeEncodeError:
401                 metadata[vorbis_key] = [value]
402         return metadata
403
404     def get_ogg_metadata(self, source):
405         if _mutagen is None:
406             raise _mutagen_import_error        
407         return _mutagen_oggvorbis.OggVorbis(source)
408
409     def get_wav_metadata(self, source):
410         return {}
411
412     def set_flac_metadata(self, target, metadata):
413         if _mutagen is None:
414             raise _mutagen_import_error
415         flac = _mutagen_flac.FLAC(target)
416         self._set_vorbis_comments(flac, metadata)
417         flac.save()
418
419     def set_mp3_metadata(self, target, metadata):
420         vorbis_keys_to_id3 = dict(
421             (v,k) for k,v in self.id3_to_vorbis_keys.items())
422         if _mutagen is None:
423             raise _mutagen_import_error
424         mp3 = _mutagen_mp3.MP3(target)
425         if mp3.tags is not None:
426             mp3.tags.delete()
427         handled_trck = False
428         max_encoding = 0
429         for key,value in metadata.items():
430             if key == 'date':
431                 for i,v in enumerate(value):
432                     year,month,day = self._parse_date(v)
433                     value[i] = year
434             elif key in ['tracknumber', 'tracktotal']:
435                 if handled_trck is True:
436                     continue
437                 handled_trck = True
438                 key,value = self._construct_id3_trck(metadata)
439                 if value is None:
440                     continue
441             try:
442                 frame_name = vorbis_keys_to_id3[key].upper()
443             except KeyError:
444                 continue
445             frame = getattr(_mutagen_id3, frame_name)
446             id3_encoding = self._guess_id3_encoding(value)
447             max_encoding = max(max_encoding, id3_encoding)
448             mp3[frame_name] = frame(encoding=id3_encoding, text=value)
449         if mp3.tags is None:
450             return
451         if max_encoding:  # at least one tag doesn't use ISO-8859-1
452             v1 = 0  # remove ID3v1 tags
453         else:
454             v1 = 2  # create and/or update ID3v1 tags
455         mp3.save(v1=v1)
456
457     def set_ogg_metadata(self, target, metadata):
458         if _mutagen is None:
459             raise _mutagen_import_error
460         ogg = _mutagen_oggvorbis.OggVorbis(target)
461         self._set_vorbis_comments(ogg, metadata)
462         ogg.save()
463
464     def set_wav_metadata(self, target, metadata):
465         pass
466
467
468 def test():
469     import doctest
470     results = doctest.testmod()
471     return results.failed % 127
472
473
474 if __name__ == '__main__':
475     import argparse
476     import sys
477
478     class Formatter (argparse.RawDescriptionHelpFormatter,
479                      argparse.ArgumentDefaultsHelpFormatter):
480         pass
481
482     p = argparse.ArgumentParser(
483         description=__doc__.splitlines()[0],
484         epilog='\n'.join(__doc__.splitlines()[2:]),
485         formatter_class=Formatter)
486     p.add_argument(
487         '-v', '--version', action='version',
488         version='%(prog)s {}'.format(__version__))
489     p.add_argument(
490         '-t', '--target-extension', dest='ext', metavar='EXT',
491         default='ogg', choices=['flac', 'mp3', 'ogg', 'wav'],
492         help='Conversion target type')
493     p.add_argument(
494         '-c', '--cache', dest='cache', metavar='PATH',
495         help=('Save conversion hashes in a cache file to avoid '
496               'repeated previous conversions.'))
497     p.add_argument(
498         '-n', '--no-hash', dest='hash',
499         default=True, action='store_const', const=False,
500         help=("Don't hash files.  Just assume matching names would "
501               'have matching hashes.'))
502     p.add_argument(
503         '-i', '--ignore', dest='ignore', metavar='REGEXP',
504         help='Ignore source paths matching REGEXP.')
505     p.add_argument(
506         '--test', dest='test',
507         default=False, action='store_const', const=True,
508         help='Run internal tests and exit')
509     p.add_argument(
510         'source_dir', metavar='SOURCE', default='.',
511         help='Source directory')
512     p.add_argument(
513         'target_dir', metavar='TARGET', default='.',
514         help='Target directory')
515
516     args = p.parse_args()
517
518     if args.test:
519         sys.exit(test())
520
521     if args.ignore:
522         ignore_regexp = _re.compile(args.ignore)
523         ignore_function = ignore_regexp.match
524     else:
525         ignore_function = None
526
527     c = Converter(
528         args.source_dir, args.target_dir, target_extension=args.ext,
529         cache_file=args.cache, hash=args.hash,
530         ignore_function=ignore_function)
531     try:
532         c.run()
533     finally:
534         c.cleanup()
535         pass