3 # Copyright (C) 2009-2015 W. Trevor King <wking@drexel.edu>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Mirror a tree of audio files in another format.
21 Conversion between any of the following formats are supported:
29 External packages required for full functionality:
35 * vorbis_ (`ogg123`, `oggenc`)
36 * mutagen_ (metadata conversion)
38 .. _lame: http://lame.sourceforge.net/
39 .. _faad: http://www.audiocoding.com/faad2.html
40 .. _flac: http://flac.sourceforge.net/
41 .. _mpg123: http://www.mpg123.org/
42 .. _vorbis: http://www.vorbis.com/
43 .. _mutagen: http://code.google.com/p/mutagen/
46 from hashlib import sha256 as _hash
49 import shutil as _shutil
50 import subprocess as _subprocess
51 import tempfile as _tempfile
54 import mutagen as _mutagen
55 import mutagen.flac as _mutagen_flac
56 import mutagen.id3 as _mutagen_id3
57 import mutagen.mp4 as _mutagen_mp4
58 import mutagen.mp3 as _mutagen_mp3
59 import mutagen.oggvorbis as _mutagen_oggvorbis
60 except ImportError as error:
62 _mutagen_import_error = error
68 def invoke(args, stdin=None, expect=(0,)):
69 print(' {}'.format(args))
70 p = _subprocess.Popen(
71 args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
72 stderr=_subprocess.PIPE)
73 stdout,stderr = p.communicate(stdin)
75 assert status in expect, 'invalid status {} from {}'.format(status, args)
76 return (status, stdout, stderr)
79 class Converter (object):
80 """Recode audio files from `source_dir` to `target_dir`.
82 `target_extension` sets the target encoding.
87 The `get_` and `set_*_metadata` methods should pass metadata as a
88 `dict` with key/value pairs standardised to match the list of
89 Vorbis comment suggestions_ with lowercase keys. The `date` field
90 should be formatted `YYYY[-MM[-DD]]`. The dict values should be
91 lists to support repeated entries for a given tag.
93 .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
95 id3_to_vorbis_keys = {
102 'tpe2': 'accompaniment',
104 'tpos': 'part of set',
105 'tpub': 'organization', # publisher
106 'trck': 'tracknumber',
110 mp4_to_vorbis_keys = {
111 '\xa9cmt': 'comment',
117 #'tpe2': 'accompaniment',
118 #'tpe3': 'conductor',
119 'disk': 'part of set',
120 #'tpub': 'organization', # publisher
121 'trkn': 'tracknumber',
125 def __init__(self, source_dir, target_dir, target_extension='ogg',
126 cache_file=None, hash=True, ignore_function=None):
127 self.source_dir = source_dir
128 self.target_dir = target_dir
129 self._source_extensions = ['flac', 'm4a', 'mp3', 'mp4', 'ogg', 'wav']
130 self._target_extension = target_extension
131 self._cache_file = cache_file
132 self._cache = self._read_cache()
134 self._ignore_function = ignore_function
135 f,self._tempfile = _tempfile.mkstemp(prefix='mkogg-')
138 _os.remove(self._tempfile)
141 def _read_cache(self):
143 if self._cache_file == None:
146 with open(self._cache_file, 'r') as f:
148 assert line.startswith('# mkogg cache version:'), line
149 version = line.split(':', 1)[-1].strip()
150 if version != __version__:
151 print('cache version mismatch: {} != {}'.format(
152 version, __version__))
153 return cache # old cache, ignore contents
156 key,value = [x.strip() for x in line.split(' -> ')]
164 def _save_cache(self):
165 if self._cache_file == None:
167 with open(self._cache_file, 'w') as f:
168 f.write('# mkogg cache version: {}\n'.format(__version__))
169 for key,value in self._cache.items():
170 f.write('{} -> {}\n'.format(key, value))
173 self._makedirs(self.target_dir)
174 for dirpath,dirnames,filenames in _os.walk(self.source_dir):
175 for filename in filenames:
176 root,ext = _os.path.splitext(filename)
178 if ext.startswith('.'):
180 if ext not in self._source_extensions:
181 print('skip', filename, ext)
183 source_path = _os.path.join(dirpath, filename)
184 if (self._ignore_function is not None and
185 self._ignore_function(source_path)):
187 rel_path = _os.path.relpath(dirpath, self.source_dir)
188 target_path = _os.path.join(
189 self.target_dir, rel_path,
190 '{}.{}'.format(root, self._target_extension))
191 target_dir = _os.path.dirname(target_path)
192 self._makedirs(target_dir)
193 self._convert(source_path, target_path, ext)
195 def _makedirs(self, target_dir):
196 if not _os.path.exists(target_dir):
197 _os.makedirs(target_dir)
199 def _convert(self, source, target, ext):
201 cache_key = self._cache_key(source)
202 old_cache_value = self._cache.get(cache_key, None)
203 if (old_cache_value != None and
204 old_cache_value == self._cache_value(target)):
205 print('already cached {} to {}'.format(source, target))
207 elif _os.path.exists(target):
208 print('target {} already exists'.format(target))
210 print('convert {} to {}'.format(source, target))
211 if ext == self._target_extension:
212 _shutil.copy(source, target)
215 convert = getattr(self, 'convert_{}_to_{}'.format(
216 ext, self._target_extension))
217 except AttributeError:
218 to_wav = getattr(self, 'convert_{}_to_wav'.format(ext))
219 from_wav = getattr(self, 'convert_wav_to_{}'.format(
220 self._target_extension))
221 def convert(source, target):
222 to_wav(source, self._tempfile)
223 from_wav(self._tempfile, target)
224 convert(source, target)
225 if not getattr(convert, 'handles_metadata', False):
226 get_metadata = getattr(self, 'get_{}_metadata'.format(ext))
227 metadata = get_metadata(source)
228 set_metadata = getattr(self, 'set_{}_metadata'.format(
229 self._target_extension))
230 set_metadata(target, metadata)
232 cache_key = self._cache_key(source)
233 self._cache[cache_key] = self._cache_value(target)
235 def _cache_key(self, source):
236 return repr((self._file_hash(source), self._target_extension))
238 def _cache_value(self, target):
239 return self._file_hash(target)
241 def _file_hash(self, filename):
245 >>> c = Converter(None, None)
246 >>> h = c._file_hash(__file__)
249 >>> c._file_hash('/highly/unlikely/to/exist') == None
254 chunk_size = 2**20 # 1 Mb
256 with open(filename, 'rb') as f:
258 while len(chunk) > 0:
259 chunk = f.read(chunk_size)
263 return str(h.hexdigest())
265 def _set_vorbis_comments(self, container, metadata):
267 if type(metadata) == dict:
268 items = sorted(metadata.items())
270 items = metadata.items()
271 for key,value in items:
272 # leave key case alone, because Mutagen downcases Vorbis
274 container[key] = value
276 def _parse_date(self, date):
277 """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
281 >>> c = Converter(None, None)
282 >>> c._parse_date('2010')
284 >>> c._parse_date('2010-11')
286 >>> c._parse_date('2010-11-16')
290 fields = date.split('-')
291 assert len(fields) > 0 and len(fields) <= 3, date
292 fields = fields + [None] * (3 - len(fields))
295 def _construct_id3_trck(self, metadata):
296 if 'tracknumber' not in metadata:
298 if 'tracktotal' in metadata:
300 for i,v in enumerate(metadata['tracknumber']):
301 value.append('{}/{}'.format(
302 v, metadata['tracktotal'][i]))
304 value = metadata['tracknumber']
308 def _guess_id3_encoding(self, text_list):
309 for id3_encoding,encoding in [(0, 'ISO-8859-1'), (3, 'utf-8')]:
310 encoding_success = True
311 for text in text_list:
312 if isinstance(text, str):
314 text.encode(encoding)
315 except UnicodeEncodeError:
316 encoding_success = False
320 raise ValueError(text_list)
322 def convert_flac_to_wav(self, source, target):
323 invoke(['ogg123', '-d', 'wav', '-f', target, source])
325 def convert_flac_to_ogg(self, source, target):
326 invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
327 convert_flac_to_ogg.handles_metadata = True
329 def convert_m4a_to_wav(self, source, target):
330 invoke(['faad', '-o', target, source])
332 def convert_mp3_to_wav(self, source, target):
333 invoke(['mpg123', '-w', target, source])
335 def convert_mp4_to_wav(self, source, target):
336 invoke(['faad', '-o', target, source])
338 def convert_ogg_to_wav(self, source, target):
339 self.convert_flac_to_wav(source, target)
341 def convert_wav_to_flac(self, source, target):
342 invoke(['flac', '--force', '--output-name', target, source])
344 def convert_wav_to_mp3(self, source, target):
345 invoke(['lame', '--quiet', '-V', '4', source, target])
347 def convert_wav_to_ogg(self, source, target):
348 self.convert_flac_to_ogg(source, target)
350 def get_flac_metadata(self, source):
352 raise _mutagen_import_error
353 return _mutagen_flac.FLAC(source)
355 def get_m4a_metadata(self, source):
356 return self.get_mp4_metadata(self, source)
358 def get_mp3_metadata(self, source):
360 raise _mutagen_import_error
361 mp3 = _mutagen_mp3.MP3(source)
363 for key,value in mp3.items():
365 vorbis_key = self.id3_to_vorbis_keys[key.lower()]
369 if vorbis_key == 'tracknumber':
370 for i,v_entry in enumerate(v):
372 tracknumber,tracktotal = v_entry.split('/', 1)
374 metadata['tracktotal'] = ['tracktotal']
375 metadata[vorbis_key] = v
378 def get_mp4_metadata(self, source):
380 raise _mutagen_import_error
381 mp4 = _mutagen_mp4.MP4(source)
383 for key,value in mp4.items():
385 vorbis_key = self.mp4_to_vorbis_keys[key.lower()]
388 if vorbis_key == 'tracknumber':
389 tracknumber,tracktotal = value
392 metadata['tracktotal'] = [str(tracktotal)]
393 elif vorbis_key == 'part of set':
394 disknumber,disktotal = value
397 metadata['set total'] = [str(disktotal)]
399 metadata[vorbis_key] = [str(value)]
400 except UnicodeEncodeError:
401 metadata[vorbis_key] = [value]
404 def get_ogg_metadata(self, source):
406 raise _mutagen_import_error
407 return _mutagen_oggvorbis.OggVorbis(source)
409 def get_wav_metadata(self, source):
412 def set_flac_metadata(self, target, metadata):
414 raise _mutagen_import_error
415 flac = _mutagen_flac.FLAC(target)
416 self._set_vorbis_comments(flac, metadata)
419 def set_mp3_metadata(self, target, metadata):
420 vorbis_keys_to_id3 = dict(
421 (v,k) for k,v in self.id3_to_vorbis_keys.items())
423 raise _mutagen_import_error
424 mp3 = _mutagen_mp3.MP3(target)
425 if mp3.tags is not None:
429 for key,value in metadata.items():
431 for i,v in enumerate(value):
432 year,month,day = self._parse_date(v)
434 elif key in ['tracknumber', 'tracktotal']:
435 if handled_trck is True:
438 key,value = self._construct_id3_trck(metadata)
442 frame_name = vorbis_keys_to_id3[key].upper()
445 frame = getattr(_mutagen_id3, frame_name)
446 id3_encoding = self._guess_id3_encoding(value)
447 max_encoding = max(max_encoding, id3_encoding)
448 mp3[frame_name] = frame(encoding=id3_encoding, text=value)
451 if max_encoding: # at least one tag doesn't use ISO-8859-1
452 v1 = 0 # remove ID3v1 tags
454 v1 = 2 # create and/or update ID3v1 tags
457 def set_ogg_metadata(self, target, metadata):
459 raise _mutagen_import_error
460 ogg = _mutagen_oggvorbis.OggVorbis(target)
461 self._set_vorbis_comments(ogg, metadata)
464 def set_wav_metadata(self, target, metadata):
470 results = doctest.testmod()
471 return results.failed % 127
474 if __name__ == '__main__':
478 class Formatter (argparse.RawDescriptionHelpFormatter,
479 argparse.ArgumentDefaultsHelpFormatter):
482 p = argparse.ArgumentParser(
483 description=__doc__.splitlines()[0],
484 epilog='\n'.join(__doc__.splitlines()[2:]),
485 formatter_class=Formatter)
487 '-v', '--version', action='version',
488 version='%(prog)s {}'.format(__version__))
490 '-t', '--target-extension', dest='ext', metavar='EXT',
491 default='ogg', choices=['flac', 'mp3', 'ogg', 'wav'],
492 help='Conversion target type')
494 '-c', '--cache', dest='cache', metavar='PATH',
495 help=('Save conversion hashes in a cache file to avoid '
496 'repeated previous conversions.'))
498 '-n', '--no-hash', dest='hash',
499 default=True, action='store_const', const=False,
500 help=("Don't hash files. Just assume matching names would "
501 'have matching hashes.'))
503 '-i', '--ignore', dest='ignore', metavar='REGEXP',
504 help='Ignore source paths matching REGEXP.')
506 '--test', dest='test',
507 default=False, action='store_const', const=True,
508 help='Run internal tests and exit')
510 'source_dir', metavar='SOURCE', default='.',
511 help='Source directory')
513 'target_dir', metavar='TARGET', default='.',
514 help='Target directory')
516 args = p.parse_args()
522 ignore_regexp = _re.compile(args.ignore)
523 ignore_function = ignore_regexp.match
525 ignore_function = None
528 args.source_dir, args.target_dir, target_extension=args.ext,
529 cache_file=args.cache, hash=args.hash,
530 ignore_function=ignore_function)