3 # Copyright (C) 2009-2011 W. Trevor King <wking@tremily.us>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions.
21 Other target formats are also supported. Current conversions:
30 External packages required for full functionality:
36 * vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`)
38 .. _id3v2: http://id3v2.sourceforge.net/
39 .. _lame: http://lame.sourceforge.net
40 .. _flac: http://flac.sourceforge.net
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com
45 from hashlib import sha256 as _hash
50 from subprocess import Popen, PIPE
51 from tempfile import mkstemp
57 def invoke(args, stdin=None, expect=(0,)):
59 p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
60 stdout,stderr = p.communicate(stdin)
62 assert status in expect, 'invalid status %d from %s' % (status, args)
63 return (status, stdout, stderr)
66 class Converter (object):
67 """Recode audio files from `source_dir` to `target_dir`.
69 `target_extension` sets the target encoding.
74 The `get_` and `set_*_metadata` methods should pass metadata as a
75 `dict` with key/value pairs standardised to match the list of
76 Vorbis comment suggestions_ with lowecase keys. The `date` field
77 should be formatted `YYYY[-MM[-DD]]`.
79 .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
81 def __init__(self, source_dir, target_dir, target_extension='ogg',
82 cache_file=None, hash=True, ignore_function=None):
83 self.source_dir = source_dir
84 self.target_dir = target_dir
85 self._source_extensions = ['flac', 'mp3', 'ogg', 'wav']
86 self._target_extension = target_extension
87 self._cache_file = cache_file
88 self._cache = self._read_cache()
90 self._ignore_function = ignore_function
91 f,self._tempfile = mkstemp(prefix='mkogg-')
94 os.remove(self._tempfile)
97 def _read_cache(self):
99 if self._cache_file == None:
102 with open(self._cache_file, 'r') as f:
104 assert line.startswith('# mkogg cache version:'), line
105 version = line.split(':', 1)[-1].strip()
106 if version != __version__:
107 print 'cache version mismatch: %s != %s' % (
108 version, __version__)
109 return cache # old cache, ignore contents
112 key,value = [x.strip() for x in line.split(' -> ')]
120 def _save_cache(self):
121 if self._cache_file == None:
123 with open(self._cache_file, 'w') as f:
124 f.write('# mkogg cache version: %s\n' % __version__)
125 for key,value in self._cache.iteritems():
126 f.write('%s -> %s\n' % (key, value))
129 self._makedirs(self.target_dir)
130 for dirpath,dirnames,filenames in os.walk(self.source_dir):
131 for filename in filenames:
132 root,ext = os.path.splitext(filename)
134 if ext.startswith('.'):
136 if ext not in self._source_extensions:
137 print 'skip', filename, ext
139 source_path = os.path.join(dirpath, filename)
140 if (self._ignore_function is not None and
141 self._ignore_function(source_path)):
143 rel_path = os.path.relpath(dirpath, self.source_dir)
144 target_path = os.path.join(
145 self.target_dir, rel_path,
146 '%s.%s' % (root, self._target_extension))
147 target_dir = os.path.dirname(target_path)
148 self._makedirs(target_dir)
149 self._convert(source_path, target_path, ext)
151 def _makedirs(self, target_dir):
152 if not os.path.exists(target_dir):
153 os.makedirs(target_dir)
155 def _convert(self, source, target, ext):
157 cache_key = self._cache_key(source)
158 old_cache_value = self._cache.get(cache_key, None)
159 if (old_cache_value != None and
160 old_cache_value == self._cache_value(target)):
161 print 'already cached %s to %s' % (source, target)
163 elif os.path.exists(target):
164 print 'target %s already exists' % (target)
166 print 'convert %s to %s' % (source, target)
167 if ext == self._target_extension:
168 shutil.copy(source, target)
170 convert = getattr(self, 'convert_%s_to_%s'
171 % (ext, self._target_extension))
172 convert(source, target)
173 if not getattr(convert, 'handles_metadata', False):
174 get_metadata = getattr(self, 'get_%s_metadata' % ext)
175 metadata = get_metadata(source)
176 set_metadata = getattr(self, 'set_%s_metadata'
177 % self._target_extension)
178 set_metadata(target, metadata)
180 cache_key = self._cache_key(source)
181 self._cache[cache_key] = self._cache_value(target)
183 def _cache_key(self, source):
184 return repr((self._file_hash(source), self._target_extension))
186 def _cache_value(self, target):
187 return self._file_hash(target)
189 def _file_hash(self, filename):
193 >>> c = Converter(None, None)
194 >>> h = c._file_hash(__file__)
197 >>> c._file_hash('/highly/unlikely/to/exist') == None
202 chunk_size = 2**20 # 1 Mb
204 with open(filename, 'rb') as f:
206 while len(chunk) > 0:
207 chunk = f.read(chunk_size)
211 return str(h.hexdigest())
213 def _parse_date(self, date):
214 """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
218 >>> c = Converter(None, None)
219 >>> c._parse_date('2010')
221 >>> c._parse_date('2010-11')
223 >>> c._parse_date('2010-11-16')
227 fields = date.split('-')
228 assert len(fields) > 0 and len(fields) <= 3, date
229 fields = fields + [None] * (3 - len(fields))
232 def _parse_id3v2_comments(self, stdout):
237 >>> from pprint import pprint
238 >>> c = Converter(None, None)
239 >>> metadata = c._parse_id3v2_comments('\\n'.join([
240 ... 'id3v1 tag info for src/03-Drive_My_Car.mp3:',
241 ... 'Title : The Famous Song Artist: No One You Know',
242 ... 'Album : The Famous Album Year: 1965, Genre: Rock (17)',
243 ... 'Comment: Track: 7',
244 ... 'id3v2 tag info for src/03-Drive_My_Car.mp3:',
245 ... 'TALB (Album/Movie/Show title): The Famous Album',
246 ... 'TPE1 (Lead performer(s)/Soloist(s)): No One You Know',
247 ... 'TT2 (Title/songname/content description): The Famous Song',
248 ... 'TYER (Year): 1965',
249 ... 'TCON (Content type): Rock (17)',
250 ... 'TRCK (Track number/Position in set): 07/14']))
251 >>> pprint(metadata) # doctest: +REPORT_UDIFF
252 {'album': 'The Famous Album',
253 'artist': 'No One You Know',
256 'title': 'The Famous Song',
270 'tpe2': 'accompaniment',
272 'tpos': 'part of set',
273 'tpub': 'organization', # publisher
274 'trck': 'tracknumber',
278 'apic', # attached picture
279 'geob', # general encapsulated object
281 'pcnt', # play counter (incremented with each play)
282 'pic', # attached picture
284 'tbp', # beats per minute
285 'tco', # content type
289 'tope', # original artist (e.g. for a cover)
290 'tlen', # length (in milliseconds)
292 'txxx', # user defined text information
293 'ufi', # unique file identifier
294 'uslt', # unsynchronized lyric/text transcription
295 'wcom', # commercial information
296 'woar', # official artist/performer webpage
297 'wxxx', # user defined URL
311 for line in stdout.splitlines():
313 if line.startswith('id3v2 tag info'):
316 key,value = [x.strip() for x in line.split(':', 1)]
317 if value.lower() == 'no id3v1 tag':
319 short_key = key.split()[0].lower()
320 short_key = key_translations.get(short_key, short_key)
321 if short_key in drop_keys:
323 v_key = vorbis_keys[short_key]
325 value = value.rsplit('(', 1)[0].strip()
326 elif v_key == 'tracknumber' and '/' in value:
327 value,total = value.split('/')
328 metadata['tracktotal'] = total
329 metadata[v_key] = value
332 def _parse_vorbis_comments(self, stdout):
333 """Parse Vorbis comments.
337 >>> from pprint import pprint
338 >>> c = Converter(None, None)
339 >>> metadata = c._parse_vorbis_comments('\\n'.join([
340 ... 'ARTIST=No One You Know',
341 ... 'ALBUM=The Famous Album',
342 ... 'TITLE=The Famous Song',
345 ... 'TRACKNUMBER=07',
347 ... 'CDDB=af08640e']))
348 >>> pprint(metadata) # doctest: +REPORT_UDIFF
349 {'album': 'The Famous Album',
350 'artist': 'No One You Know',
354 'title': 'The Famous Song',
360 for line in stdout.splitlines():
361 key,value = line.split('=', 1)
362 metadata[key.lower()] = value
365 def convert_flac_to_mp3(self, source, target):
366 self.convert_flac_to_wav(source, self._tempfile)
367 self.convert_wav_to_mp3(self._tempfile, target)
369 def convert_flac_to_wav(self, source, target):
370 invoke(['ogg123', '-d', 'wav', '-f', target, source])
372 def convert_flac_to_ogg(self, source, target):
373 invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
374 convert_flac_to_ogg.handles_metadata = True
376 def convert_mp3_to_flac(self, source, target):
377 self.convert_mp3_to_wav(source, self._tempfile)
378 self.convert_wav_to_flac(self._tempfile, target)
380 def convert_mp3_to_ogg(self, source, target):
381 self.convert_mp3_to_wav(source, self._tempfile)
382 self.convert_wav_to_ogg(self._tempfile, target)
384 def convert_mp3_to_wav(self, source, target):
385 invoke(['mpg123', '-w', target, source])
387 def convert_ogg_to_mp3(self, source, target):
388 self.convert_flac_to_mp3(source, target)
390 def convert_ogg_to_wav(self, source, target):
391 self.convert_flac_to_wav(source_target)
393 def convert_wav_to_flac(self, source, target):
394 invoke(['flac', '-o', target, source])
396 def convert_wav_to_mp3(self, source, target):
397 invoke(['lame', '--quiet', '-V', '4', source, target])
399 def convert_wav_to_ogg(self, source, target):
400 self.convert_flac_to_ogg(source, target)
402 def get_flac_metadata(self, source):
403 status,stdout,stderr = invoke(
404 ['metaflac', '--export-tags-to=-', source])
406 for line in stdout.splitlines():
407 key,value = line.split('=', 1)
408 metadata[key.lower()] = value
411 def get_flac_metadata(self, source):
412 status,stdout,stderr = invoke(
413 ['metaflac', '--export-tags-to=-', source])
414 return self._parse_vorbis_comments(stdout)
416 def get_mp3_metadata(self, source):
417 status,stdout,stderr = invoke(
418 ['id3v2', '--list', source])
419 return self._parse_id3v2_comments(stdout)
421 def get_ogg_metadata(self, source):
422 status,stdout,stderr = invoke(
423 ['vorbiscomment', '--list', source])
424 return self._parse_vorbis_comments(stdout)
426 def get_wav_metadata(self, source):
429 def set_flac_metadata(self, target, metadata):
430 stdin = '\n'.join(['%s=%s' % (k.upper(), v)
431 for k,v in sorted(metadata.iteritems())])
432 invoke(['metaflac', '--import-tags-from=-', target], stdin=stdin)
434 def set_mp3_metadata(self, target, metadata):
436 for key,arg in [('album', '--album'), ('artist', '--artist'),
437 ('title', '--song')]:
439 args.extend([arg, metadata[key]])
440 if 'date' in metadata:
441 year,month,day = self._parse_date(metadata['date'])
442 args.extend(['--year', year])
443 if 'genre' in metadata:
444 genre = metadata['genre']
445 if not hasattr(self, '_id3v1_genres'):
446 status,stdout,stderr = invoke(['id3v2', '--list-genres'])
448 for line in stdout.splitlines():
449 num,name = [x.strip() for x in line.split(':', 1)]
450 genres[name.lower()] = num
451 self._id3v1_genres = genres
453 num = self._id3v1_genres.get(genre.lower(), '12')
454 args.extend(['--genre', num])
455 if 'tracknumber' in metadata:
456 track = metadata['tracknumber']
457 if 'tracktotal' in metadata:
458 track = '%s/%s' % (track, metadata['tracktotal'])
459 args.extend(['--track', track])
463 def set_ogg_metadata(self, target, metadata):
464 stdin = '\n'.join(['%s=%s' % (k.upper(), v)
465 for k,v in sorted(metadata.iteritems())])
466 invoke(['vorbiscomment', '--write', target], stdin=stdin)
468 def set_wav_metadata(self, target, metadata):
474 results = doctest.testmod()
475 return results.failed % 127
478 if __name__ == '__main__':
482 usage = '%prog [options] source-dir target-dir'
484 p = optparse.OptionParser(usage=usage, epilog=epilog)
485 p.format_epilog = lambda formatter: epilog+'\n'
486 p.add_option('-t', '--target-extension', dest='ext',
487 default='ogg', metavar='EXT',
488 help='Conversion target type (e.g. flac, mp3) (%default)')
489 p.add_option('-c', '--cache', dest='cache', metavar='PATH',
490 help=('Save conversion hashes in a cache file to avoid '
491 'repeated previous conversions.'))
492 p.add_option('-n', '--no-hash', dest='hash', action='store_false',
494 help=("Don't hash files. Just assume matching names would "
495 'have matching hashes.'))
496 p.add_option('-i', '--ignore', dest='ignore', metavar='REGEXP',
497 help=('Ignore source paths matching REGEXP.'))
498 p.add_option('--test', dest='test', action='store_true', default=False,
499 help='Run internal tests and exit')
501 options,args = p.parse_args()
506 if options.ignore is not None:
507 ignore_regexp = _re.compile(options.ignore)
508 ignore_function = ignore_regexp.match
510 ignore_function = None
512 source_dir,target_dir = args
513 c = Converter(source_dir, target_dir, target_extension=options.ext,
514 cache_file=options.cache, hash=options.hash,
515 ignore_function=ignore_function)