581631ac3f5beafe5ff15558e1a8a67af9c12c44
[blog.git] / posts / mkogg / mkogg.py
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2009-2012 W. Trevor King <wking@drexel.edu>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Mirror a tree of audio files in another format.
20
21 Conversion between any of the following formats are supported:
22
23 * flac
24 * m4a (decoding only)
25 * mp3
26 * ogg (Vorbis)
27 * wav
28
29 External packages required for full functionality:
30
31 * lame_ (`lame`)
32 * faad_ (`faad`)
33 * flac_ (`flac`)
34 * mpg123_ (`mpg123`)
35 * vorbis_ (`ogg123`, `oggenc`)
36 * mutagen_ (metadata conversion)
37
38 .. _lame: http://lame.sourceforge.net/
39 .. _faad: http://www.audiocoding.com/faad2.html
40 .. _flac: http://flac.sourceforge.net/
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com/
43 .. _mutagen: http://code.google.com/p/mutagen/
44 """
45
46 from hashlib import sha256 as _hash
47 import os as _os
48 import re as _re
49 import shutil as _shutil
50 import subprocess as _subprocess
51 import tempfile as _tempfile
52
53 try:
54     import mutagen as _mutagen
55     import mutagen.flac as _mutagen_flag
56     import mutagen.id3 as _mutagen_id3
57     import mutagen.m4a as _mutagen_m4a
58     import mutagen.mp3 as _mutagen_mp3
59     import mutagen.oggvorbis as _mutagen_oggvorbis
60 except ImportError as _mutagen_import_error:
61     _mutagen = None
62
63
64 __version__ = '0.4'
65
66
67 def invoke(args, stdin=None, expect=(0,)):
68     print('  {}'.format(args))
69     p = _subprocess.Popen(
70         args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
71         stderr=_subprocess.PIPE)
72     stdout,stderr = p.communicate(stdin)
73     status = p.wait()
74     assert status in expect, 'invalid status {} from {}'.format(status, args)
75     return (status, stdout, stderr)
76
77
78 class Converter (object):
79     """Recode audio files from `source_dir` to `target_dir`.
80
81     `target_extension` sets the target encoding.
82
83     Notes
84     -----
85
86     The `get_` and `set_*_metadata` methods should pass metadata as a
87     `dict` with key/value pairs standardised to match the list of
88     Vorbis comment suggestions_ with lowercase keys.  The `date` field
89     should be formatted `YYYY[-MM[-DD]]`.  The dict values should be
90     lists to support repeated entries for a given tag.
91
92     .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
93     """
94     id3_to_vorbis_keys = {
95         'comm': 'comment',
96         'talb': 'album',
97         'tcom': 'composer',
98         'tcop': 'copyright',
99         'tit2': 'title',
100         'tpe1': 'artist',
101         'tpe2': 'accompaniment',
102         'tpe3': 'conductor',
103         'tpos': 'part of set',
104         'tpub': 'organization',  # publisher
105         'trck': 'tracknumber',
106         'tyer': 'date',
107         }
108
109     m4a_to_vorbis_keys = {
110         '\xa9cmt': 'comment',
111         '\xa9alb': 'album',
112         #'tcom': 'composer',
113         'cprt': 'copyright',
114         '\xa9nam': 'title',
115         '\xa9ART': 'artist',
116         #'tpe2': 'accompaniment',
117         #'tpe3': 'conductor',
118         'disk': 'part of set',
119         #'tpub': 'organization',  # publisher
120         'trkn': 'tracknumber',
121         '\xa9day': 'date',
122         }
123
124     def __init__(self, source_dir, target_dir, target_extension='ogg',
125                  cache_file=None, hash=True, ignore_function=None):
126         self.source_dir = source_dir
127         self.target_dir = target_dir
128         self._source_extensions = ['flac', 'm4a', 'mp3', 'ogg', 'wav']
129         self._target_extension = target_extension
130         self._cache_file = cache_file
131         self._cache = self._read_cache()
132         self._hash = hash
133         self._ignore_function = ignore_function
134         f,self._tempfile = _tempfile.mkstemp(prefix='mkogg-')
135
136     def cleanup(self):
137         _os.remove(self._tempfile)
138         self._save_cache()
139
140     def _read_cache(self):
141         cache = {}
142         if self._cache_file == None:
143             return cache
144         try:
145             with open(self._cache_file, 'r') as f:
146                 line = f.readline()
147                 assert line.startswith('# mkogg cache version:'), line
148                 version = line.split(':', 1)[-1].strip()
149                 if version != __version__:
150                     print('cache version mismatch: {} != {}'.format(
151                             version, __version__))
152                     return cache  # old cache, ignore contents
153                 for line in f:
154                     try:
155                         key,value = [x.strip() for x in line.split(' -> ')]
156                     except ValueError:
157                         pass
158                     cache[key] = value
159         except IOError:
160             pass
161         return cache
162
163     def _save_cache(self):
164         if self._cache_file == None:
165             return
166         with open(self._cache_file, 'w') as f:
167             f.write('# mkogg cache version: {}\n'.format(__version__))
168             for key,value in self._cache.iteritems():
169                 f.write('{} -> {}\n'.format(key, value))
170
171     def run(self):
172         self._makedirs(self.target_dir)
173         for dirpath,dirnames,filenames in _os.walk(self.source_dir):
174             for filename in filenames:
175                 root,ext = _os.path.splitext(filename)
176                 ext = ext.lower()
177                 if ext.startswith('.'):
178                     ext = ext[1:]
179                 if ext not in self._source_extensions:
180                     print('skip', filename, ext)
181                     continue
182                 source_path = _os.path.join(dirpath, filename)
183                 if (self._ignore_function is not None and
184                     self._ignore_function(source_path)):
185                     continue
186                 rel_path = _os.path.relpath(dirpath, self.source_dir)
187                 target_path = _os.path.join(
188                     self.target_dir, rel_path,
189                     '{}.{}'.format(root, self._target_extension))
190                 target_dir = _os.path.dirname(target_path)
191                 self._makedirs(target_dir)
192                 self._convert(source_path, target_path, ext)
193
194     def _makedirs(self, target_dir):
195         if not _os.path.exists(target_dir):
196             _os.makedirs(target_dir)
197
198     def _convert(self, source, target, ext):
199         if self._hash:
200             cache_key = self._cache_key(source)
201             old_cache_value = self._cache.get(cache_key, None)
202             if (old_cache_value != None and
203                 old_cache_value == self._cache_value(target)):
204                 print('already cached {} to {}'.format(source, target))
205                 return
206         elif _os.path.exists(target):
207             print('target {} already exists'.format(target))
208             return
209         print('convert {} to {}'.format(source, target))
210         if ext == self._target_extension:
211             _shutil.copy(source, target)
212             return
213         try:
214             convert = getattr(self, 'convert_{}_to_{}'.format(
215                     ext, self._target_extension))
216         except AttributeError:
217             to_wav = getattr(self, 'convert_{}_to_wav'.format(ext))
218             from_wav = getattr(self, 'convert_wav_to_{}'.format(
219                     self._target_extension))
220             def convert(source, target):
221                 to_wav(source, self._tempfile)
222                 from_wav(self._tempfile, target)
223         convert(source, target)
224         if not getattr(convert, 'handles_metadata', False):
225             get_metadata = getattr(self, 'get_{}_metadata'.format(ext))
226             metadata = get_metadata(source)
227             set_metadata = getattr(self, 'set_{}_metadata'.format(
228                     self._target_extension))
229             set_metadata(target, metadata)
230         if not self._hash:
231             cache_key = self._cache_key(source)
232         self._cache[cache_key] = self._cache_value(target)
233
234     def _cache_key(self, source):
235         return repr((self._file_hash(source), self._target_extension))
236
237     def _cache_value(self, target):
238         return self._file_hash(target)
239
240     def _file_hash(self, filename):
241         """
242         Examples
243         --------
244         >>> c = Converter(None, None)
245         >>> h = c._file_hash(__file__)
246         >>> len(h)
247         64
248         >>> c._file_hash('/highly/unlikely/to/exist') == None
249         True
250         >>> c.cleanup()
251         """
252         h = _hash()
253         chunk_size = 2**20  # 1 Mb
254         try:
255             with open(filename, 'rb') as f:
256                 chunk = ' '
257                 while len(chunk) > 0:
258                     chunk = f.read(chunk_size)
259                     h.update(chunk)
260         except IOError:
261             return None
262         return str(h.hexdigest())
263
264     def _set_vorbis_comments(self, container, metadata):
265         container.delete()
266         if type(metadata) == dict:
267             items = sorted(metadata.items())
268         else:
269             items = metadata.items()
270         for key,value in items:
271             # leave key case alone, because Mutagen downcases Vorbis
272             # keys internally.
273             container[key] = value
274
275     def _parse_date(self, date):
276         """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
277
278         Examples
279         --------
280         >>> c = Converter(None, None)
281         >>> c._parse_date('2010')
282         ['2010', None, None]
283         >>> c._parse_date('2010-11')
284         ['2010', '11', None]
285         >>> c._parse_date('2010-11-16')
286         ['2010', '11', '16']
287         >>> c.cleanup()
288         """
289         fields = date.split('-')
290         assert len(fields) > 0 and len(fields) <= 3, date
291         fields = fields + [None] * (3 - len(fields))
292         return fields
293
294     def _construct_id3_trck(self, metadata):
295         if 'tracknumber' not in metadata:
296             return (None, None)
297         if 'tracktotal' in metadata:
298             value = []
299             for i,v in enumerate(metadata['tracknumber']):
300                 value.append('{}/{}'.format(
301                         v, metadata['tracktotal'][i]))
302         else:
303             value = metadata['tracknumber']
304         key = 'tracknumber'
305         return (key, value)
306
307     def _guess_id3_encoding(self, text_list):
308         for id3_encoding,encoding in [(0, 'ISO-8859-1'), (3, 'utf-8')]:
309             encoding_success = True
310             for text in text_list:
311                 if isinstance(text, unicode):
312                     try:
313                         text.encode(encoding)
314                     except UnicodeEncodeError:
315                         encoding_success = False
316                         break
317             if encoding_success:
318                 return id3_encoding
319         raise ValueError(text_list)
320
321     def convert_flac_to_wav(self, source, target):
322         invoke(['ogg123', '-d', 'wav', '-f', target, source])
323
324     def convert_flac_to_ogg(self, source, target):
325         invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
326     convert_flac_to_ogg.handles_metadata = True
327
328     def convert_m4a_to_wav(self, source, target):
329         invoke(['faad', '-o', target, source])
330
331     def convert_mp3_to_wav(self, source, target):
332         invoke(['mpg123',  '-w', target, source])
333
334     def convert_ogg_to_wav(self, source, target):
335         self.convert_flac_to_wav(source, target)
336
337     def convert_wav_to_flac(self, source, target):
338         invoke(['flac', '--force', '--output-name', target, source])
339
340     def convert_wav_to_mp3(self, source, target):
341         invoke(['lame', '--quiet', '-V', '4', source, target])
342
343     def convert_wav_to_ogg(self, source, target):
344         self.convert_flac_to_ogg(source, target)
345
346     def get_flac_metadata(self, source):
347         if _mutagen is None:
348             raise _mutagen_import_error
349         return _mutagen_flac.FLAC(source)
350
351     def get_m4a_metadata(self, source):
352         if _mutagen is None:
353             raise _mutagen_import_error
354         m4a = _mutagen_m4a.M4A(source)
355         metadata = {}
356         for key,value in m4a.items():
357             try:
358                 vorbis_key = self.m4a_to_vorbis_keys[key.lower()]
359             except KeyError:
360                 continue
361             if vorbis_key == 'tracknumber':
362                 tracknumber,tracktotal = value
363                 value = tracknumber
364                 if tracktotal:
365                     metadata['tracktotal'] = [str(tracktotal)]
366             elif vorbis_key == 'part of set':
367                 disknumber,disktotal = value
368                 value = disknumber
369                 if disktotal:
370                     metadata['set total'] = [str(disktotal)]
371             metadata[vorbis_key] = [str(value)]
372         return metadata
373
374     def get_mp3_metadata(self, source):
375         if _mutagen is None:
376             raise _mutagen_import_error
377         mp3 = _mutagen_mp3.MP3(source)
378         metadata = {}
379         for key,value in mp3.items():
380             try:
381                 vorbis_key = self.id3_to_vorbis_keys[key.lower()]
382             except KeyError:
383                 continue
384             v = value.text
385             if vorbis_key == 'tracknumber':
386                 for i,v_entry in enumerate(v):
387                     if '/' in v_entry:
388                         tracknumber,tracktotal = v_entry.split('/', 1)
389                         v[i] = tracknumber
390                         metadata['tracktotal'] = ['tracktotal']
391             metadata[vorbis_key] = v
392         return metadata
393
394     def get_ogg_metadata(self, source):
395         if _mutagen is None:
396             raise _mutagen_import_error        
397         return _mutagen_oggvorbis.OggVorbis(source)
398
399     def get_wav_metadata(self, source):
400         return {}
401
402     def set_flac_metadata(self, target, metadata):
403         if _mutagen is None:
404             raise _mutagen_import_error
405         flac = _mutagen_flac.FLAC(target)
406         self._set_vorbis_comments(flac, metadata)
407         flac.save()
408
409     def set_mp3_metadata(self, target, metadata):
410         vorbis_keys_to_id3 = dict(
411             (v,k) for k,v in self.id3_to_vorbis_keys.items())
412         if _mutagen is None:
413             raise _mutagen_import_error
414         mp3 = _mutagen_mp3.MP3(target)
415         if mp3.tags is not None:
416             mp3.tags.delete()
417         handled_trck = False
418         max_encoding = 0
419         for key,value in metadata.items():
420             if key == 'date':
421                 for i,v in enumerate(value):
422                     year,month,day = self._parse_date(v)
423                     value[i] = year
424             elif key in ['tracknumber', 'tracktotal']:
425                 if handled_trck is True:
426                     continue
427                 handled_trck = True
428                 key,value = self._construct_id3_trck(metadata)
429                 if value is None:
430                     continue
431             try:
432                 frame_name = vorbis_keys_to_id3[key].upper()
433             except KeyError:
434                 continue
435             frame = getattr(_mutagen_id3, frame_name)
436             id3_encoding = self._guess_id3_encoding(value)
437             max_encoding = max(max_encoding, id3_encoding)
438             mp3[frame_name] = frame(encoding=id3_encoding, text=value)
439         if mp3.tags is None:
440             return
441         if max_encoding:  # at least one tag doesn't use ISO-8859-1
442             v1 = 0  # remove ID3v1 tags
443         else:
444             v1 = 2  # create and/or update ID3v1 tags
445         mp3.save(v1=v1)
446
447     def set_ogg_metadata(self, target, metadata):
448         if _mutagen is None:
449             raise _mutagen_import_error
450         ogg = _mutagen_oggvorbis.OggVorbis(target)
451         self._set_vorbis_comments(ogg, metadata)
452         ogg.save()
453
454     def set_wav_metadata(self, target, metadata):
455         pass
456
457
458 def test():
459     import doctest
460     results = doctest.testmod()
461     return results.failed % 127
462
463
464 if __name__ == '__main__':
465     import optparse
466     import sys
467
468     usage = '%prog [options] source-dir target-dir'
469     epilog = __doc__
470     p = optparse.OptionParser(usage=usage, epilog=epilog)
471     p.format_epilog = lambda formatter: epilog+'\n'
472     p.add_option('-t', '--target-extension', dest='ext',
473                  default='ogg', metavar='EXT',
474                  help='Conversion target type (e.g. flac, mp3) (%default)')
475     p.add_option('-c', '--cache', dest='cache', metavar='PATH',
476                  help=('Save conversion hashes in a cache file to avoid '
477                        'repeated previous conversions.'))
478     p.add_option('-n', '--no-hash', dest='hash', action='store_false',
479                  default=True,
480                  help=("Don't hash files.  Just assume matching names would "
481                        'have matching hashes.'))
482     p.add_option('-i', '--ignore', dest='ignore', metavar='REGEXP',
483                  help=('Ignore source paths matching REGEXP.'))
484     p.add_option('--test', dest='test', action='store_true', default=False,
485                  help='Run internal tests and exit')
486
487     options,args = p.parse_args()
488
489     if options.test:
490         sys.exit(test())
491
492     if options.ignore is not None:
493         ignore_regexp = _re.compile(options.ignore)
494         ignore_function = ignore_regexp.match
495     else:
496         ignore_function = None
497
498     source_dir,target_dir = args
499     c = Converter(source_dir, target_dir, target_extension=options.ext,
500                   cache_file=options.cache, hash=options.hash,
501                   ignore_function=ignore_function)
502     try:
503         c.run()
504     finally:
505         c.cleanup()
506         pass