Add 'wav' source and drop 'tflt' id3 tags in mkogg.py.
[blog.git] / posts / mkogg / mkogg.py
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2009-2011 W. Trevor King <wking@drexel.edu>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions.
20
21 Other target formats are also supported.  Current conversions:
22
23 * flac -> ogg
24 * flac -> wav -> mp3
25 * ogg -> wav -> flac
26 * ogg -> wav -> mp3
27 * mp3 -> wav -> flac
28 * mp3 -> wav -> ogg
29
30 External packages required for full functionality:
31
32 * id3v2_ (`id3v2`)
33 * lame_ (`lame`)
34 * flac_ (`metaflac`)
35 * mpg123_ (`mpg123`)
36 * vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`)
37
38 .. _id3v2: http://id3v2.sourceforge.net/
39 .. _lame: http://lame.sourceforge.net
40 .. _flac: http://flac.sourceforge.net
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com
43 """
44
45 from hashlib import sha256 as _hash
46 import os
47 import os.path
48 import shutil
49 from subprocess import Popen, PIPE
50 from tempfile import mkstemp
51
52
53 __version__ = '0.2'
54
55
56 def invoke(args, stdin=None, expect=(0,)):
57     print '  %s' % args
58     p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
59     stdout,stderr = p.communicate(stdin)
60     status = p.wait()
61     assert status in expect, 'invalid status %d from %s' % (status, args)
62     return (status, stdout, stderr)
63
64
65 class Converter (object):
66     """Recode audio files from `source_dir` to `target_dir`.
67
68     `target_extension` sets the target encoding.
69
70     Notes
71     -----
72
73     The `get_` and `set_*_metadata` methods should pass metadata as a
74     `dict` with key/value pairs standardised to match the list of
75     Vorbis comment suggestions_ with lowecase keys.  The `date` field
76     should be formatted `YYYY[-MM[-DD]]`.
77
78     .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
79     """
80     def __init__(self, source_dir, target_dir, target_extension='ogg',
81                  cache_file=None):
82         self.source_dir = source_dir
83         self.target_dir = target_dir
84         self._source_extensions = ['flac', 'mp3', 'ogg', 'wav']
85         self._target_extension = target_extension
86         self._cache_file = cache_file
87         self._cache = self._read_cache()
88         f,self._tempfile = mkstemp(prefix='mkogg-')
89
90     def cleanup(self):
91         os.remove(self._tempfile)
92         self._save_cache()
93
94     def _read_cache(self):
95         cache = {}
96         if self._cache_file == None:
97             return cache
98         try:
99             with open(self._cache_file, 'r') as f:
100                 line = f.readline()
101                 assert line.startswith('# mkogg cache version:'), line
102                 version = line.split(':', 1)[-1].strip()
103                 if version != __version__:
104                     print 'cache version mismatch: %s != %s' % (
105                         version, __version__)
106                     return cache  # old cache, ignore contents
107                 for line in f:
108                     try:
109                         key,value = [x.strip() for x in line.split(' -> ')]
110                     except ValueError:
111                         pass
112                     cache[key] = value
113         except IOError:
114             pass
115         return cache
116
117     def _save_cache(self):
118         if self._cache_file == None:
119             return
120         with open(self._cache_file, 'w') as f:
121             f.write('# mkogg cache version: %s\n' % __version__)
122             for key,value in self._cache.iteritems():
123                 f.write('%s -> %s\n' % (key, value))
124
125     def run(self):
126         self._makedirs(self.target_dir)
127         for dirpath,dirnames,filenames in os.walk(self.source_dir):
128             for filename in filenames:
129                 root,ext = os.path.splitext(filename)
130                 ext = ext.lower()
131                 if ext.startswith('.'):
132                     ext = ext[1:]
133                 if ext not in self._source_extensions:
134                     print 'skip', filename, ext
135                     continue
136                 source_path = os.path.join(dirpath, filename)
137                 rel_path = os.path.relpath(dirpath, self.source_dir)
138                 target_path = os.path.join(
139                     self.target_dir, rel_path,
140                     '%s.%s' % (root, self._target_extension))
141                 target_dir = os.path.dirname(target_path)
142                 self._makedirs(target_dir)
143                 self._convert(source_path, target_path, ext)
144
145     def _makedirs(self, target_dir):
146         if not os.path.exists(target_dir):
147             os.makedirs(target_dir)
148
149     def _convert(self, source, target, ext):
150         cache_key = self._cache_key(source)
151         old_cache_value = self._cache.get(cache_key, None)
152         if (old_cache_value != None and
153             old_cache_value == self._cache_value(target)):
154             print 'cached %s to %s' % (source, target)
155             return
156         print 'convert %s to %s' % (source, target)
157         if ext == self._target_extension:
158             shutil.copy(source, target)
159             return
160         convert = getattr(self, 'convert_%s_to_%s'
161                           % (ext, self._target_extension))
162         convert(source, target)
163         if not getattr(convert, 'handles_metadata', False):
164             get_metadata = getattr(self, 'get_%s_metadata' % ext)
165             metadata = get_metadata(source)
166             set_metadata = getattr(self, 'set_%s_metadata'
167                                    % self._target_extension)
168             set_metadata(target, metadata)
169         self._cache[cache_key] = self._cache_value(target)
170
171     def _cache_key(self, source):
172         return repr((self._file_hash(source), self._target_extension))
173
174     def _cache_value(self, target):
175         return self._file_hash(target)
176
177     def _file_hash(self, filename):
178         """
179         Examples
180         --------
181         >>> c = Converter(None, None)
182         >>> h = c._file_hash(__file__)
183         >>> len(h)
184         64
185         >>> c._file_hash('/highly/unlikely/to/exist') == None
186         True
187         >>> c.cleanup()
188         """
189         h = _hash()
190         chunk_size = 2**20  # 1 Mb
191         try:
192             with open(filename, 'rb') as f:
193                 chunk = ' '
194                 while len(chunk) > 0:
195                     chunk = f.read(chunk_size)
196                     h.update(chunk)
197         except IOError:
198             return None
199         return str(h.hexdigest())
200
201     def _parse_date(self, date):
202         """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
203
204         Examples
205         --------
206         >>> c = Converter(None, None)
207         >>> c._parse_date('2010')
208         ['2010', None, None]
209         >>> c._parse_date('2010-11')
210         ['2010', '11', None]
211         >>> c._parse_date('2010-11-16')
212         ['2010', '11', '16']
213         >>> c.cleanup()
214         """
215         fields = date.split('-')
216         assert len(fields) > 0 and len(fields) <= 3, date
217         fields = fields + [None] * (3 - len(fields))
218         return fields
219
220     def _parse_id3v2_comments(self, stdout):
221         """Parse ID3v2 tags.
222
223         Examples
224         --------
225         >>> from pprint import pprint
226         >>> c = Converter(None, None)
227         >>> metadata = c._parse_id3v2_comments('\\n'.join([
228         ...     'id3v1 tag info for src/03-Drive_My_Car.mp3:',
229         ...     'Title  : The Famous Song     Artist: No One You Know',
230         ...     'Album  : The Famous Album    Year: 1965, Genre: Rock (17)',
231         ...     'Comment:                     Track: 7',
232         ...     'id3v2 tag info for src/03-Drive_My_Car.mp3:',
233         ...     'TALB (Album/Movie/Show title): The Famous Album',
234         ...     'TPE1 (Lead performer(s)/Soloist(s)): No One You Know',
235         ...     'TIT2 (Title/songname/content description): The Famous Song',
236         ...     'TYER (Year): 1965',
237         ...     'TCON (Content type): Rock (17)',
238         ...     'TRCK (Track number/Position in set): 07/14']))
239         >>> pprint(metadata)  # doctest: +REPORT_UDIFF
240         {'album': 'The Famous Album',
241          'artist': 'No One You Know',
242          'date': '1965',
243          'genre': 'Rock',
244          'title': 'The Famous Song',
245          'tracknumber': '07',
246          'tracktotal': '14'}
247         >>> c.cleanup()
248         """
249         metadata = {}
250         vorbis_keys = {
251             'comm': 'comment',
252             'talb': 'album',
253             'tcom': 'composer',
254             'tcon': 'genre',
255             'tcop': 'copyright',
256             'tit2': 'title',
257             'tpe1': 'artist',
258             'tpe2': 'accompaniment',
259             'tpe3': 'conductor',
260             'tpos': 'part of set',
261             'tpub': 'organization',
262             'trck': 'tracknumber',
263             'tyer': 'date',
264             }
265         drop_keys = [
266             'apic',  # attached picture
267             'geob',  # general encapsulated object
268             'ncon',  # ?
269             'pcnt',  # play counter (incremented with each play)
270             'priv',  # private
271             'tenc',  # encoded by
272             'tflt',  # file type
273             'tope',  # original artist (e.g. for a cover)
274             'tlen',  # length (in milliseconds)
275             'tmed',  # media type
276             'wxxx',  # user defined URL
277             ]
278         in_v2 = False
279         for line in stdout.splitlines():
280             if not in_v2:
281                 if line.startswith('id3v2 tag info'):
282                     in_v2 = True
283                 continue
284             key,value = [x.strip() for x in line.split(':', 1)]
285             short_key = key.split()[0]
286             if short_key.lower() in drop_keys:
287                 continue
288             v_key = vorbis_keys[short_key.lower()]
289             if v_key == 'genre':
290                 value = value.rsplit('(', 1)[0].strip()
291             elif v_key == 'tracknumber' and '/' in value:
292                 value,total = value.split('/')
293                 metadata['tracktotal'] = total
294             metadata[v_key] = value
295         return metadata
296
297     def _parse_vorbis_comments(self, stdout):
298         """Parse Vorbis comments.
299
300         Examples
301         --------
302         >>> from pprint import pprint
303         >>> c = Converter(None, None)
304         >>> metadata = c._parse_vorbis_comments('\\n'.join([
305         ...     'ARTIST=No One You Know',
306         ...     'ALBUM=The Famous Album',
307         ...     'TITLE=The Famous Song',
308         ...     'DATE=1965',
309         ...     'GENRE=Rock',
310         ...     'TRACKNUMBER=07',
311         ...     'TRACKTOTAL=14',
312         ...     'CDDB=af08640e']))
313         >>> pprint(metadata)  # doctest: +REPORT_UDIFF
314         {'album': 'The Famous Album',
315          'artist': 'No One You Know',
316          'cddb': 'af08640e',
317          'date': '1965',
318          'genre': 'Rock',
319          'title': 'The Famous Song',
320          'tracknumber': '07',
321          'tracktotal': '14'}
322         >>> c.cleanup()
323         """
324         metadata = {}
325         for line in stdout.splitlines():
326             key,value = line.split('=', 1)
327             metadata[key.lower()] = value
328         return metadata
329
330     def convert_flac_to_mp3(self, source, target):
331         self.convert_flac_to_wav(source, self._tempfile)
332         self.convert_wav_to_mp3(self._tempfile, target)
333
334     def convert_flac_to_wav(self, source, target):
335         invoke(['ogg123', '-d', 'wav', '-f', target, source])
336
337     def convert_flac_to_ogg(self, source, target):
338         invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
339     convert_flac_to_ogg.handles_metadata = True
340
341     def convert_mp3_to_flac(self, source, target):
342         self.convert_mp3_to_wav(source, self._tempfile)
343         self.convert_wav_to_flac(self._tempfile, target)
344
345     def convert_mp3_to_ogg(self, source, target):
346         self.convert_mp3_to_wav(source, self._tempfile)
347         self.convert_wav_to_ogg(self._tempfile, target)
348
349     def convert_mp3_to_wav(self, source, target):
350         invoke(['mpg123',  '-w', target, source])
351
352     def convert_ogg_to_mp3(self, source, target):
353         self.convert_flac_to_mp3(source, target)
354
355     def convert_ogg_to_wav(self, source, target):
356         self.convert_flac_to_wav(source_target)
357
358     def convert_wav_to_flac(self, source, target):
359         invoke(['flac', '-o', target, source])
360
361     def convert_wav_to_mp3(self, source, target):
362         invoke(['lame', '--quiet', '-V', '4', source, target])
363
364     def convert_wav_to_ogg(self, source, target):
365         self.convert_flac_to_ogg(source, target)
366
367     def get_flac_metadata(self, source):
368         status,stdout,stderr = invoke(
369             ['metaflac', '--export-tags-to=-', source])
370         metadata = {}
371         for line in stdout.splitlines():
372             key,value = line.split('=', 1)
373             metadata[key.lower()] = value
374         return metadata
375
376     def get_flac_metadata(self, source):
377         status,stdout,stderr = invoke(
378             ['metaflac', '--export-tags-to=-', source])
379         return self._parse_vorbis_comments(stdout)
380
381     def get_mp3_metadata(self, source):
382         status,stdout,stderr = invoke(
383             ['id3v2', '--list', source])
384         return self._parse_id3v2_comments(stdout)
385
386     def get_ogg_metadata(self, source):
387         status,stdout,stderr = invoke(
388             ['vorbiscomment', '--list', source])
389         return self._parse_vorbis_comments(stdout)
390
391     def get_wav_metadata(self, source):
392         return {}
393
394     def set_flac_metadata(self, target, metadata):
395         stdin = '\n'.join(['%s=%s' % (k.upper(), v)
396                            for k,v in sorted(metadata.iteritems())])
397         invoke(['metaflac', '--import-tags-from=-', target], stdin=stdin)
398
399     def set_mp3_metadata(self, target, metadata):
400         args = ['id3v2']
401         for key,arg in [('album', '--album'), ('artist', '--artist'),
402                         ('title', '--song')]:
403             if key in metadata:
404                 args.extend([arg, metadata[key]])
405         if 'date' in metadata:
406             year,month,day = self._parse_date(metadata['date'])
407             args.extend(['--year', year])
408         if 'genre' in metadata:
409             genre = metadata['genre']
410             if not hasattr(self, '_id3v1_genres'):
411                 status,stdout,stderr = invoke(['id3v2', '--list-genres'])
412                 genres = {}
413                 for line in stdout.splitlines():
414                     num,name = [x.strip() for x in line.split(':', 1)]
415                     genres[name.lower()] = num
416                 self._id3v1_genres = genres
417             # Genre 12 = "Other"
418             num = self._id3v1_genres.get(genre.lower(), '12')
419             args.extend(['--genre', num])
420         if 'tracknumber' in metadata:
421             track = metadata['tracknumber']
422             if 'tracktotal' in metadata:
423                 track = '%s/%s' % (track, metadata['tracktotal'])
424             args.extend(['--track', track])
425         args.append(target)
426         invoke(args)
427
428     def set_ogg_metadata(self, target, metadata):
429         stdin = '\n'.join(['%s=%s' % (k.upper(), v)
430                            for k,v in sorted(metadata.iteritems())])
431         invoke(['vorbiscomment', '--write', target], stdin=stdin)
432
433     def set_wav_metadata(self, target, metadata):
434         pass
435
436
437 def test():
438     import doctest
439     results = doctest.testmod()
440     return results.failed % 127
441
442
443 if __name__ == '__main__':
444     import optparse
445     import sys
446
447     usage = '%prog [options] source-dir target-dir'
448     epilog = __doc__
449     p = optparse.OptionParser(usage=usage, epilog=epilog)
450     p.format_epilog = lambda formatter: epilog+'\n'
451     p.add_option('-t', '--target-extension', dest='ext',
452                  default='ogg', metavar='EXT',
453                  help='Conversion target type (e.g. flac, mp3) (%default)')
454     p.add_option('-c', '--cache', dest='cache', metavar='PATH',
455                  help=('Save conversion hashes in a cache file to avoid '
456                        'repeated previous conversions.'))
457     p.add_option('--test', dest='test', action='store_true', default=False,
458                  help='Run internal tests and exit')
459
460     options,args = p.parse_args()
461
462     if options.test:
463         sys.exit(test())
464
465     source_dir,target_dir = args
466     c = Converter(source_dir, target_dir, target_extension=options.ext,
467                   cache_file=options.cache)
468     try:
469         c.run()
470     finally:
471         c.cleanup()
472         pass