Change email address from drexel to tremily.
[blog.git] / posts / mkogg / mkogg.py
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2009-2011 W. Trevor King <wking@tremily.us>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions.
20
21 Other target formats are also supported.  Current conversions:
22
23 * flac -> ogg
24 * flac -> wav -> mp3
25 * ogg -> wav -> flac
26 * ogg -> wav -> mp3
27 * mp3 -> wav -> flac
28 * mp3 -> wav -> ogg
29
30 External packages required for full functionality:
31
32 * id3v2_ (`id3v2`)
33 * lame_ (`lame`)
34 * flac_ (`metaflac`)
35 * mpg123_ (`mpg123`)
36 * vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`)
37
38 .. _id3v2: http://id3v2.sourceforge.net/
39 .. _lame: http://lame.sourceforge.net
40 .. _flac: http://flac.sourceforge.net
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com
43 """
44
45 from hashlib import sha256 as _hash
46 import os
47 import os.path
48 import re as _re
49 import shutil
50 from subprocess import Popen, PIPE
51 from tempfile import mkstemp
52
53
54 __version__ = '0.2'
55
56
57 def invoke(args, stdin=None, expect=(0,)):
58     print '  %s' % args
59     p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
60     stdout,stderr = p.communicate(stdin)
61     status = p.wait()
62     assert status in expect, 'invalid status %d from %s' % (status, args)
63     return (status, stdout, stderr)
64
65
66 class Converter (object):
67     """Recode audio files from `source_dir` to `target_dir`.
68
69     `target_extension` sets the target encoding.
70
71     Notes
72     -----
73
74     The `get_` and `set_*_metadata` methods should pass metadata as a
75     `dict` with key/value pairs standardised to match the list of
76     Vorbis comment suggestions_ with lowecase keys.  The `date` field
77     should be formatted `YYYY[-MM[-DD]]`.
78
79     .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
80     """
81     def __init__(self, source_dir, target_dir, target_extension='ogg',
82                  cache_file=None, hash=True, ignore_function=None):
83         self.source_dir = source_dir
84         self.target_dir = target_dir
85         self._source_extensions = ['flac', 'mp3', 'ogg', 'wav']
86         self._target_extension = target_extension
87         self._cache_file = cache_file
88         self._cache = self._read_cache()
89         self._hash = hash
90         self._ignore_function = ignore_function
91         f,self._tempfile = mkstemp(prefix='mkogg-')
92
93     def cleanup(self):
94         os.remove(self._tempfile)
95         self._save_cache()
96
97     def _read_cache(self):
98         cache = {}
99         if self._cache_file == None:
100             return cache
101         try:
102             with open(self._cache_file, 'r') as f:
103                 line = f.readline()
104                 assert line.startswith('# mkogg cache version:'), line
105                 version = line.split(':', 1)[-1].strip()
106                 if version != __version__:
107                     print 'cache version mismatch: %s != %s' % (
108                         version, __version__)
109                     return cache  # old cache, ignore contents
110                 for line in f:
111                     try:
112                         key,value = [x.strip() for x in line.split(' -> ')]
113                     except ValueError:
114                         pass
115                     cache[key] = value
116         except IOError:
117             pass
118         return cache
119
120     def _save_cache(self):
121         if self._cache_file == None:
122             return
123         with open(self._cache_file, 'w') as f:
124             f.write('# mkogg cache version: %s\n' % __version__)
125             for key,value in self._cache.iteritems():
126                 f.write('%s -> %s\n' % (key, value))
127
128     def run(self):
129         self._makedirs(self.target_dir)
130         for dirpath,dirnames,filenames in os.walk(self.source_dir):
131             for filename in filenames:
132                 root,ext = os.path.splitext(filename)
133                 ext = ext.lower()
134                 if ext.startswith('.'):
135                     ext = ext[1:]
136                 if ext not in self._source_extensions:
137                     print 'skip', filename, ext
138                     continue
139                 source_path = os.path.join(dirpath, filename)
140                 if (self._ignore_function is not None and
141                     self._ignore_function(source_path)):
142                     continue
143                 rel_path = os.path.relpath(dirpath, self.source_dir)
144                 target_path = os.path.join(
145                     self.target_dir, rel_path,
146                     '%s.%s' % (root, self._target_extension))
147                 target_dir = os.path.dirname(target_path)
148                 self._makedirs(target_dir)
149                 self._convert(source_path, target_path, ext)
150
151     def _makedirs(self, target_dir):
152         if not os.path.exists(target_dir):
153             os.makedirs(target_dir)
154
155     def _convert(self, source, target, ext):
156         if self._hash:
157             cache_key = self._cache_key(source)
158             old_cache_value = self._cache.get(cache_key, None)
159             if (old_cache_value != None and
160                 old_cache_value == self._cache_value(target)):
161                 print 'already cached %s to %s' % (source, target)
162                 return
163         elif os.path.exists(target):
164             print 'target %s already exists' % (target)
165             return
166         print 'convert %s to %s' % (source, target)
167         if ext == self._target_extension:
168             shutil.copy(source, target)
169             return
170         convert = getattr(self, 'convert_%s_to_%s'
171                           % (ext, self._target_extension))
172         convert(source, target)
173         if not getattr(convert, 'handles_metadata', False):
174             get_metadata = getattr(self, 'get_%s_metadata' % ext)
175             metadata = get_metadata(source)
176             set_metadata = getattr(self, 'set_%s_metadata'
177                                    % self._target_extension)
178             set_metadata(target, metadata)
179         if not self._hash:
180             cache_key = self._cache_key(source)
181         self._cache[cache_key] = self._cache_value(target)
182
183     def _cache_key(self, source):
184         return repr((self._file_hash(source), self._target_extension))
185
186     def _cache_value(self, target):
187         return self._file_hash(target)
188
189     def _file_hash(self, filename):
190         """
191         Examples
192         --------
193         >>> c = Converter(None, None)
194         >>> h = c._file_hash(__file__)
195         >>> len(h)
196         64
197         >>> c._file_hash('/highly/unlikely/to/exist') == None
198         True
199         >>> c.cleanup()
200         """
201         h = _hash()
202         chunk_size = 2**20  # 1 Mb
203         try:
204             with open(filename, 'rb') as f:
205                 chunk = ' '
206                 while len(chunk) > 0:
207                     chunk = f.read(chunk_size)
208                     h.update(chunk)
209         except IOError:
210             return None
211         return str(h.hexdigest())
212
213     def _parse_date(self, date):
214         """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
215
216         Examples
217         --------
218         >>> c = Converter(None, None)
219         >>> c._parse_date('2010')
220         ['2010', None, None]
221         >>> c._parse_date('2010-11')
222         ['2010', '11', None]
223         >>> c._parse_date('2010-11-16')
224         ['2010', '11', '16']
225         >>> c.cleanup()
226         """
227         fields = date.split('-')
228         assert len(fields) > 0 and len(fields) <= 3, date
229         fields = fields + [None] * (3 - len(fields))
230         return fields
231
232     def _parse_id3v2_comments(self, stdout):
233         """Parse ID3v2 tags.
234
235         Examples
236         --------
237         >>> from pprint import pprint
238         >>> c = Converter(None, None)
239         >>> metadata = c._parse_id3v2_comments('\\n'.join([
240         ...     'id3v1 tag info for src/03-Drive_My_Car.mp3:',
241         ...     'Title  : The Famous Song     Artist: No One You Know',
242         ...     'Album  : The Famous Album    Year: 1965, Genre: Rock (17)',
243         ...     'Comment:                     Track: 7',
244         ...     'id3v2 tag info for src/03-Drive_My_Car.mp3:',
245         ...     'TALB (Album/Movie/Show title): The Famous Album',
246         ...     'TPE1 (Lead performer(s)/Soloist(s)): No One You Know',
247         ...     'TT2 (Title/songname/content description): The Famous Song',
248         ...     'TYER (Year): 1965',
249         ...     'TCON (Content type): Rock (17)',
250         ...     'TRCK (Track number/Position in set): 07/14']))
251         >>> pprint(metadata)  # doctest: +REPORT_UDIFF
252         {'album': 'The Famous Album',
253          'artist': 'No One You Know',
254          'date': '1965',
255          'genre': 'Rock',
256          'title': 'The Famous Song',
257          'tracknumber': '07',
258          'tracktotal': '14'}
259         >>> c.cleanup()
260         """
261         metadata = {}
262         vorbis_keys = {
263             'comm': 'comment',
264             'talb': 'album',
265             'tcom': 'composer',
266             'tcon': 'genre',
267             'tcop': 'copyright',
268             'tit2': 'title',
269             'tpe1': 'artist',
270             'tpe2': 'accompaniment',
271             'tpe3': 'conductor',
272             'tpos': 'part of set',
273             'tpub': 'organization',  # publisher
274             'trck': 'tracknumber',
275             'tyer': 'date',
276             }
277         drop_keys = [
278             'apic',  # attached picture
279             'geob',  # general encapsulated object
280             'ncon',  # ?
281             'pcnt',  # play counter (incremented with each play)
282             'pic',   # attached picture
283             'priv',  # private
284             'tbp',   # beats per minute
285             'tco',   # content type
286             'tcp',   # frame?
287             'tenc',  # encoded by
288             'tflt',  # file type
289             'tope',  # original artist (e.g. for a cover)
290             'tlen',  # length (in milliseconds)
291             'tmed',  # media type
292             'txxx',  # user defined text information
293             'ufi',   # unique file identifier
294             'uslt',  # unsynchronized lyric/text transcription
295             'wcom',  # commercial information
296             'woar',  # official artist/performer webpage
297             'wxxx',  # user defined URL
298             ]
299         key_translations = {
300             'com': 'comm',
301             'ten': 'tenc',
302             'tal': 'talb',
303             'tcm': 'tcom',
304             'tt2': 'tit2',
305             'tp1': 'tpe1',
306             'tpa': 'tpos',
307             'trk': 'trck',
308             'tye': 'tyer',
309             }
310         in_v2 = False
311         for line in stdout.splitlines():
312             if not in_v2:
313                 if line.startswith('id3v2 tag info'):
314                     in_v2 = True
315                 continue
316             key,value = [x.strip() for x in line.split(':', 1)]
317             if value.lower() == 'no id3v1 tag':
318                 continue
319             short_key = key.split()[0].lower()
320             short_key = key_translations.get(short_key, short_key)
321             if short_key in drop_keys:
322                 continue
323             v_key = vorbis_keys[short_key]
324             if v_key == 'genre':
325                 value = value.rsplit('(', 1)[0].strip()
326             elif v_key == 'tracknumber' and '/' in value:
327                 value,total = value.split('/')
328                 metadata['tracktotal'] = total
329             metadata[v_key] = value
330         return metadata
331
332     def _parse_vorbis_comments(self, stdout):
333         """Parse Vorbis comments.
334
335         Examples
336         --------
337         >>> from pprint import pprint
338         >>> c = Converter(None, None)
339         >>> metadata = c._parse_vorbis_comments('\\n'.join([
340         ...     'ARTIST=No One You Know',
341         ...     'ALBUM=The Famous Album',
342         ...     'TITLE=The Famous Song',
343         ...     'DATE=1965',
344         ...     'GENRE=Rock',
345         ...     'TRACKNUMBER=07',
346         ...     'TRACKTOTAL=14',
347         ...     'CDDB=af08640e']))
348         >>> pprint(metadata)  # doctest: +REPORT_UDIFF
349         {'album': 'The Famous Album',
350          'artist': 'No One You Know',
351          'cddb': 'af08640e',
352          'date': '1965',
353          'genre': 'Rock',
354          'title': 'The Famous Song',
355          'tracknumber': '07',
356          'tracktotal': '14'}
357         >>> c.cleanup()
358         """
359         metadata = {}
360         for line in stdout.splitlines():
361             key,value = line.split('=', 1)
362             metadata[key.lower()] = value
363         return metadata
364
365     def convert_flac_to_mp3(self, source, target):
366         self.convert_flac_to_wav(source, self._tempfile)
367         self.convert_wav_to_mp3(self._tempfile, target)
368
369     def convert_flac_to_wav(self, source, target):
370         invoke(['ogg123', '-d', 'wav', '-f', target, source])
371
372     def convert_flac_to_ogg(self, source, target):
373         invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
374     convert_flac_to_ogg.handles_metadata = True
375
376     def convert_mp3_to_flac(self, source, target):
377         self.convert_mp3_to_wav(source, self._tempfile)
378         self.convert_wav_to_flac(self._tempfile, target)
379
380     def convert_mp3_to_ogg(self, source, target):
381         self.convert_mp3_to_wav(source, self._tempfile)
382         self.convert_wav_to_ogg(self._tempfile, target)
383
384     def convert_mp3_to_wav(self, source, target):
385         invoke(['mpg123',  '-w', target, source])
386
387     def convert_ogg_to_mp3(self, source, target):
388         self.convert_flac_to_mp3(source, target)
389
390     def convert_ogg_to_wav(self, source, target):
391         self.convert_flac_to_wav(source_target)
392
393     def convert_wav_to_flac(self, source, target):
394         invoke(['flac', '-o', target, source])
395
396     def convert_wav_to_mp3(self, source, target):
397         invoke(['lame', '--quiet', '-V', '4', source, target])
398
399     def convert_wav_to_ogg(self, source, target):
400         self.convert_flac_to_ogg(source, target)
401
402     def get_flac_metadata(self, source):
403         status,stdout,stderr = invoke(
404             ['metaflac', '--export-tags-to=-', source])
405         metadata = {}
406         for line in stdout.splitlines():
407             key,value = line.split('=', 1)
408             metadata[key.lower()] = value
409         return metadata
410
411     def get_flac_metadata(self, source):
412         status,stdout,stderr = invoke(
413             ['metaflac', '--export-tags-to=-', source])
414         return self._parse_vorbis_comments(stdout)
415
416     def get_mp3_metadata(self, source):
417         status,stdout,stderr = invoke(
418             ['id3v2', '--list', source])
419         return self._parse_id3v2_comments(stdout)
420
421     def get_ogg_metadata(self, source):
422         status,stdout,stderr = invoke(
423             ['vorbiscomment', '--list', source])
424         return self._parse_vorbis_comments(stdout)
425
426     def get_wav_metadata(self, source):
427         return {}
428
429     def set_flac_metadata(self, target, metadata):
430         stdin = '\n'.join(['%s=%s' % (k.upper(), v)
431                            for k,v in sorted(metadata.iteritems())])
432         invoke(['metaflac', '--import-tags-from=-', target], stdin=stdin)
433
434     def set_mp3_metadata(self, target, metadata):
435         args = ['id3v2']
436         for key,arg in [('album', '--album'), ('artist', '--artist'),
437                         ('title', '--song')]:
438             if key in metadata:
439                 args.extend([arg, metadata[key]])
440         if 'date' in metadata:
441             year,month,day = self._parse_date(metadata['date'])
442             args.extend(['--year', year])
443         if 'genre' in metadata:
444             genre = metadata['genre']
445             if not hasattr(self, '_id3v1_genres'):
446                 status,stdout,stderr = invoke(['id3v2', '--list-genres'])
447                 genres = {}
448                 for line in stdout.splitlines():
449                     num,name = [x.strip() for x in line.split(':', 1)]
450                     genres[name.lower()] = num
451                 self._id3v1_genres = genres
452             # Genre 12 = "Other"
453             num = self._id3v1_genres.get(genre.lower(), '12')
454             args.extend(['--genre', num])
455         if 'tracknumber' in metadata:
456             track = metadata['tracknumber']
457             if 'tracktotal' in metadata:
458                 track = '%s/%s' % (track, metadata['tracktotal'])
459             args.extend(['--track', track])
460         args.append(target)
461         invoke(args)
462
463     def set_ogg_metadata(self, target, metadata):
464         stdin = '\n'.join(['%s=%s' % (k.upper(), v)
465                            for k,v in sorted(metadata.iteritems())])
466         invoke(['vorbiscomment', '--write', target], stdin=stdin)
467
468     def set_wav_metadata(self, target, metadata):
469         pass
470
471
472 def test():
473     import doctest
474     results = doctest.testmod()
475     return results.failed % 127
476
477
478 if __name__ == '__main__':
479     import optparse
480     import sys
481
482     usage = '%prog [options] source-dir target-dir'
483     epilog = __doc__
484     p = optparse.OptionParser(usage=usage, epilog=epilog)
485     p.format_epilog = lambda formatter: epilog+'\n'
486     p.add_option('-t', '--target-extension', dest='ext',
487                  default='ogg', metavar='EXT',
488                  help='Conversion target type (e.g. flac, mp3) (%default)')
489     p.add_option('-c', '--cache', dest='cache', metavar='PATH',
490                  help=('Save conversion hashes in a cache file to avoid '
491                        'repeated previous conversions.'))
492     p.add_option('-n', '--no-hash', dest='hash', action='store_false',
493                  default=True,
494                  help=("Don't hash files.  Just assume matching names would "
495                        'have matching hashes.'))
496     p.add_option('-i', '--ignore', dest='ignore', metavar='REGEXP',
497                  help=('Ignore source paths matching REGEXP.'))
498     p.add_option('--test', dest='test', action='store_true', default=False,
499                  help='Run internal tests and exit')
500
501     options,args = p.parse_args()
502
503     if options.test:
504         sys.exit(test())
505
506     if options.ignore is not None:
507         ignore_regexp = _re.compile(options.ignore)
508         ignore_function = ignore_regexp.match
509     else:
510         ignore_function = None
511
512     source_dir,target_dir = args
513     c = Converter(source_dir, target_dir, target_extension=options.ext,
514                   cache_file=options.cache, hash=options.hash,
515                   ignore_function=ignore_function)
516     try:
517         c.run()
518     finally:
519         c.cleanup()
520         pass