3 # Copyright (C) 2009-2011 W. Trevor King <wking@drexel.edu>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions.
21 Other target formats are also supported. Current conversions:
30 External packages required for full functionality:
36 * vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`)
38 .. _id3v2: http://id3v2.sourceforge.net/
39 .. _lame: http://lame.sourceforge.net
40 .. _flac: http://flac.sourceforge.net
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com
45 from hashlib import sha256 as _hash
49 from subprocess import Popen, PIPE
50 from tempfile import mkstemp
56 def invoke(args, stdin=None, expect=(0,)):
58 p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
59 stdout,stderr = p.communicate(stdin)
61 assert status in expect, 'invalid status %d from %s' % (status, args)
62 return (status, stdout, stderr)
65 class Converter (object):
66 """Recode audio files from `source_dir` to `target_dir`.
68 `target_extension` sets the target encoding.
73 The `get_` and `set_*_metadata` methods should pass metadata as a
74 `dict` with key/value pairs standardised to match the list of
75 Vorbis comment suggestions_ with lowecase keys. The `date` field
76 should be formatted `YYYY[-MM[-DD]]`.
78 .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
80 def __init__(self, source_dir, target_dir, target_extension='ogg',
82 self.source_dir = source_dir
83 self.target_dir = target_dir
84 self._source_extensions = ['flac', 'mp3', 'ogg', 'wav']
85 self._target_extension = target_extension
86 self._cache_file = cache_file
87 self._cache = self._read_cache()
88 f,self._tempfile = mkstemp(prefix='mkogg-')
91 os.remove(self._tempfile)
94 def _read_cache(self):
96 if self._cache_file == None:
99 with open(self._cache_file, 'r') as f:
101 assert line.startswith('# mkogg cache version:'), line
102 version = line.split(':', 1)[-1].strip()
103 if version != __version__:
104 print 'cache version mismatch: %s != %s' % (
105 version, __version__)
106 return cache # old cache, ignore contents
109 key,value = [x.strip() for x in line.split(' -> ')]
117 def _save_cache(self):
118 if self._cache_file == None:
120 with open(self._cache_file, 'w') as f:
121 f.write('# mkogg cache version: %s\n' % __version__)
122 for key,value in self._cache.iteritems():
123 f.write('%s -> %s\n' % (key, value))
126 self._makedirs(self.target_dir)
127 for dirpath,dirnames,filenames in os.walk(self.source_dir):
128 for filename in filenames:
129 root,ext = os.path.splitext(filename)
131 if ext.startswith('.'):
133 if ext not in self._source_extensions:
134 print 'skip', filename, ext
136 source_path = os.path.join(dirpath, filename)
137 rel_path = os.path.relpath(dirpath, self.source_dir)
138 target_path = os.path.join(
139 self.target_dir, rel_path,
140 '%s.%s' % (root, self._target_extension))
141 target_dir = os.path.dirname(target_path)
142 self._makedirs(target_dir)
143 self._convert(source_path, target_path, ext)
145 def _makedirs(self, target_dir):
146 if not os.path.exists(target_dir):
147 os.makedirs(target_dir)
149 def _convert(self, source, target, ext):
150 cache_key = self._cache_key(source)
151 old_cache_value = self._cache.get(cache_key, None)
152 if (old_cache_value != None and
153 old_cache_value == self._cache_value(target)):
154 print 'cached %s to %s' % (source, target)
156 print 'convert %s to %s' % (source, target)
157 if ext == self._target_extension:
158 shutil.copy(source, target)
160 convert = getattr(self, 'convert_%s_to_%s'
161 % (ext, self._target_extension))
162 convert(source, target)
163 if not getattr(convert, 'handles_metadata', False):
164 get_metadata = getattr(self, 'get_%s_metadata' % ext)
165 metadata = get_metadata(source)
166 set_metadata = getattr(self, 'set_%s_metadata'
167 % self._target_extension)
168 set_metadata(target, metadata)
169 self._cache[cache_key] = self._cache_value(target)
171 def _cache_key(self, source):
172 return repr((self._file_hash(source), self._target_extension))
174 def _cache_value(self, target):
175 return self._file_hash(target)
177 def _file_hash(self, filename):
181 >>> c = Converter(None, None)
182 >>> h = c._file_hash(__file__)
185 >>> c._file_hash('/highly/unlikely/to/exist') == None
190 chunk_size = 2**20 # 1 Mb
192 with open(filename, 'rb') as f:
194 while len(chunk) > 0:
195 chunk = f.read(chunk_size)
199 return str(h.hexdigest())
201 def _parse_date(self, date):
202 """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
206 >>> c = Converter(None, None)
207 >>> c._parse_date('2010')
209 >>> c._parse_date('2010-11')
211 >>> c._parse_date('2010-11-16')
215 fields = date.split('-')
216 assert len(fields) > 0 and len(fields) <= 3, date
217 fields = fields + [None] * (3 - len(fields))
220 def _parse_id3v2_comments(self, stdout):
225 >>> from pprint import pprint
226 >>> c = Converter(None, None)
227 >>> metadata = c._parse_id3v2_comments('\\n'.join([
228 ... 'id3v1 tag info for src/03-Drive_My_Car.mp3:',
229 ... 'Title : The Famous Song Artist: No One You Know',
230 ... 'Album : The Famous Album Year: 1965, Genre: Rock (17)',
231 ... 'Comment: Track: 7',
232 ... 'id3v2 tag info for src/03-Drive_My_Car.mp3:',
233 ... 'TALB (Album/Movie/Show title): The Famous Album',
234 ... 'TPE1 (Lead performer(s)/Soloist(s)): No One You Know',
235 ... 'TIT2 (Title/songname/content description): The Famous Song',
236 ... 'TYER (Year): 1965',
237 ... 'TCON (Content type): Rock (17)',
238 ... 'TRCK (Track number/Position in set): 07/14']))
239 >>> pprint(metadata) # doctest: +REPORT_UDIFF
240 {'album': 'The Famous Album',
241 'artist': 'No One You Know',
244 'title': 'The Famous Song',
258 'tpe2': 'accompaniment',
260 'tpos': 'part of set',
261 'tpub': 'organization',
262 'trck': 'tracknumber',
266 'apic', # attached picture
267 'geob', # general encapsulated object
269 'pcnt', # play counter (incremented with each play)
273 'tope', # original artist (e.g. for a cover)
274 'tlen', # length (in milliseconds)
276 'wxxx', # user defined URL
279 for line in stdout.splitlines():
281 if line.startswith('id3v2 tag info'):
284 key,value = [x.strip() for x in line.split(':', 1)]
285 short_key = key.split()[0]
286 if short_key.lower() in drop_keys:
288 v_key = vorbis_keys[short_key.lower()]
290 value = value.rsplit('(', 1)[0].strip()
291 elif v_key == 'tracknumber' and '/' in value:
292 value,total = value.split('/')
293 metadata['tracktotal'] = total
294 metadata[v_key] = value
297 def _parse_vorbis_comments(self, stdout):
298 """Parse Vorbis comments.
302 >>> from pprint import pprint
303 >>> c = Converter(None, None)
304 >>> metadata = c._parse_vorbis_comments('\\n'.join([
305 ... 'ARTIST=No One You Know',
306 ... 'ALBUM=The Famous Album',
307 ... 'TITLE=The Famous Song',
310 ... 'TRACKNUMBER=07',
312 ... 'CDDB=af08640e']))
313 >>> pprint(metadata) # doctest: +REPORT_UDIFF
314 {'album': 'The Famous Album',
315 'artist': 'No One You Know',
319 'title': 'The Famous Song',
325 for line in stdout.splitlines():
326 key,value = line.split('=', 1)
327 metadata[key.lower()] = value
330 def convert_flac_to_mp3(self, source, target):
331 self.convert_flac_to_wav(source, self._tempfile)
332 self.convert_wav_to_mp3(self._tempfile, target)
334 def convert_flac_to_wav(self, source, target):
335 invoke(['ogg123', '-d', 'wav', '-f', target, source])
337 def convert_flac_to_ogg(self, source, target):
338 invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
339 convert_flac_to_ogg.handles_metadata = True
341 def convert_mp3_to_flac(self, source, target):
342 self.convert_mp3_to_wav(source, self._tempfile)
343 self.convert_wav_to_flac(self._tempfile, target)
345 def convert_mp3_to_ogg(self, source, target):
346 self.convert_mp3_to_wav(source, self._tempfile)
347 self.convert_wav_to_ogg(self._tempfile, target)
349 def convert_mp3_to_wav(self, source, target):
350 invoke(['mpg123', '-w', target, source])
352 def convert_ogg_to_mp3(self, source, target):
353 self.convert_flac_to_mp3(source, target)
355 def convert_ogg_to_wav(self, source, target):
356 self.convert_flac_to_wav(source_target)
358 def convert_wav_to_flac(self, source, target):
359 invoke(['flac', '-o', target, source])
361 def convert_wav_to_mp3(self, source, target):
362 invoke(['lame', '--quiet', '-V', '4', source, target])
364 def convert_wav_to_ogg(self, source, target):
365 self.convert_flac_to_ogg(source, target)
367 def get_flac_metadata(self, source):
368 status,stdout,stderr = invoke(
369 ['metaflac', '--export-tags-to=-', source])
371 for line in stdout.splitlines():
372 key,value = line.split('=', 1)
373 metadata[key.lower()] = value
376 def get_flac_metadata(self, source):
377 status,stdout,stderr = invoke(
378 ['metaflac', '--export-tags-to=-', source])
379 return self._parse_vorbis_comments(stdout)
381 def get_mp3_metadata(self, source):
382 status,stdout,stderr = invoke(
383 ['id3v2', '--list', source])
384 return self._parse_id3v2_comments(stdout)
386 def get_ogg_metadata(self, source):
387 status,stdout,stderr = invoke(
388 ['vorbiscomment', '--list', source])
389 return self._parse_vorbis_comments(stdout)
391 def get_wav_metadata(self, source):
394 def set_flac_metadata(self, target, metadata):
395 stdin = '\n'.join(['%s=%s' % (k.upper(), v)
396 for k,v in sorted(metadata.iteritems())])
397 invoke(['metaflac', '--import-tags-from=-', target], stdin=stdin)
399 def set_mp3_metadata(self, target, metadata):
401 for key,arg in [('album', '--album'), ('artist', '--artist'),
402 ('title', '--song')]:
404 args.extend([arg, metadata[key]])
405 if 'date' in metadata:
406 year,month,day = self._parse_date(metadata['date'])
407 args.extend(['--year', year])
408 if 'genre' in metadata:
409 genre = metadata['genre']
410 if not hasattr(self, '_id3v1_genres'):
411 status,stdout,stderr = invoke(['id3v2', '--list-genres'])
413 for line in stdout.splitlines():
414 num,name = [x.strip() for x in line.split(':', 1)]
415 genres[name.lower()] = num
416 self._id3v1_genres = genres
418 num = self._id3v1_genres.get(genre.lower(), '12')
419 args.extend(['--genre', num])
420 if 'tracknumber' in metadata:
421 track = metadata['tracknumber']
422 if 'tracktotal' in metadata:
423 track = '%s/%s' % (track, metadata['tracktotal'])
424 args.extend(['--track', track])
428 def set_ogg_metadata(self, target, metadata):
429 stdin = '\n'.join(['%s=%s' % (k.upper(), v)
430 for k,v in sorted(metadata.iteritems())])
431 invoke(['vorbiscomment', '--write', target], stdin=stdin)
433 def set_wav_metadata(self, target, metadata):
439 results = doctest.testmod()
440 return results.failed % 127
443 if __name__ == '__main__':
447 usage = '%prog [options] source-dir target-dir'
449 p = optparse.OptionParser(usage=usage, epilog=epilog)
450 p.format_epilog = lambda formatter: epilog+'\n'
451 p.add_option('-t', '--target-extension', dest='ext',
452 default='ogg', metavar='EXT',
453 help='Conversion target type (e.g. flac, mp3) (%default)')
454 p.add_option('-c', '--cache', dest='cache', metavar='PATH',
455 help=('Save conversion hashes in a cache file to avoid '
456 'repeated previous conversions.'))
457 p.add_option('--test', dest='test', action='store_true', default=False,
458 help='Run internal tests and exit')
460 options,args = p.parse_args()
465 source_dir,target_dir = args
466 c = Converter(source_dir, target_dir, target_extension=options.ext,
467 cache_file=options.cache)