#!/usr/bin/env python
#
-# Copyright (C) 2009-2011 W. Trevor King <wking@drexel.edu>
+# Copyright (C) 2009-2015 W. Trevor King <wking@drexel.edu>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
Conversion between any of the following formats are supported:
* flac
+* mp4 (decoding only)
* mp3
* ogg (Vorbis)
* wav
External packages required for full functionality:
* lame_ (`lame`)
+* faad_ (`faad`)
* flac_ (`flac`)
* mpg123_ (`mpg123`)
* vorbis_ (`ogg123`, `oggenc`)
* mutagen_ (metadata conversion)
.. _lame: http://lame.sourceforge.net/
+.. _faad: http://www.audiocoding.com/faad2.html
.. _flac: http://flac.sourceforge.net/
.. _mpg123: http://www.mpg123.org/
.. _vorbis: http://www.vorbis.com/
"""
from hashlib import sha256 as _hash
-import os
-import os.path
+import os as _os
import re as _re
-import shutil
-from subprocess import Popen, PIPE
-from tempfile import mkstemp
+import shutil as _shutil
+import subprocess as _subprocess
+import tempfile as _tempfile
try:
- import mutagen.flac
- import mutagen.id3
- import mutagen.mp3
- import mutagen.oggvorbis
-except ImportError, _mutagen_import_error:
- mutagen = None
+ import mutagen as _mutagen
+ import mutagen.flac as _mutagen_flac
+ import mutagen.id3 as _mutagen_id3
+ import mutagen.mp4 as _mutagen_mp4
+ import mutagen.mp3 as _mutagen_mp3
+ import mutagen.oggvorbis as _mutagen_oggvorbis
+except ImportError as error:
+ _mutagen = None
+ _mutagen_import_error = error
-__version__ = '0.3'
+__version__ = '0.5'
def invoke(args, stdin=None, expect=(0,)):
print(' {}'.format(args))
- p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ p = _subprocess.Popen(
+ args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
+ stderr=_subprocess.PIPE)
stdout,stderr = p.communicate(stdin)
status = p.wait()
assert status in expect, 'invalid status {} from {}'.format(status, args)
'tyer': 'date',
}
+ mp4_to_vorbis_keys = {
+ '\xa9cmt': 'comment',
+ '\xa9alb': 'album',
+ #'tcom': 'composer',
+ 'cprt': 'copyright',
+ '\xa9nam': 'title',
+ '\xa9ART': 'artist',
+ #'tpe2': 'accompaniment',
+ #'tpe3': 'conductor',
+ 'disk': 'part of set',
+ #'tpub': 'organization', # publisher
+ 'trkn': 'tracknumber',
+ '\xa9day': 'date',
+ }
+
def __init__(self, source_dir, target_dir, target_extension='ogg',
cache_file=None, hash=True, ignore_function=None):
self.source_dir = source_dir
self.target_dir = target_dir
- self._source_extensions = ['flac', 'mp3', 'ogg', 'wav']
+ self._source_extensions = ['flac', 'm4a', 'mp3', 'mp4', 'ogg', 'wav']
self._target_extension = target_extension
self._cache_file = cache_file
self._cache = self._read_cache()
self._hash = hash
self._ignore_function = ignore_function
- f,self._tempfile = mkstemp(prefix='mkogg-')
+ f,self._tempfile = _tempfile.mkstemp(prefix='mkogg-')
def cleanup(self):
- os.remove(self._tempfile)
+ _os.remove(self._tempfile)
self._save_cache()
def _read_cache(self):
return
with open(self._cache_file, 'w') as f:
f.write('# mkogg cache version: {}\n'.format(__version__))
- for key,value in self._cache.iteritems():
+ for key,value in self._cache.items():
f.write('{} -> {}\n'.format(key, value))
def run(self):
self._makedirs(self.target_dir)
- for dirpath,dirnames,filenames in os.walk(self.source_dir):
+ for dirpath,dirnames,filenames in _os.walk(self.source_dir):
for filename in filenames:
- root,ext = os.path.splitext(filename)
+ root,ext = _os.path.splitext(filename)
ext = ext.lower()
if ext.startswith('.'):
ext = ext[1:]
if ext not in self._source_extensions:
print('skip', filename, ext)
continue
- source_path = os.path.join(dirpath, filename)
+ source_path = _os.path.join(dirpath, filename)
if (self._ignore_function is not None and
self._ignore_function(source_path)):
continue
- rel_path = os.path.relpath(dirpath, self.source_dir)
- target_path = os.path.join(
+ rel_path = _os.path.relpath(dirpath, self.source_dir)
+ target_path = _os.path.join(
self.target_dir, rel_path,
'{}.{}'.format(root, self._target_extension))
- target_dir = os.path.dirname(target_path)
+ target_dir = _os.path.dirname(target_path)
self._makedirs(target_dir)
self._convert(source_path, target_path, ext)
def _makedirs(self, target_dir):
- if not os.path.exists(target_dir):
- os.makedirs(target_dir)
+ if not _os.path.exists(target_dir):
+ _os.makedirs(target_dir)
def _convert(self, source, target, ext):
if self._hash:
old_cache_value == self._cache_value(target)):
print('already cached {} to {}'.format(source, target))
return
- elif os.path.exists(target):
+ elif _os.path.exists(target):
print('target {} already exists'.format(target))
return
print('convert {} to {}'.format(source, target))
if ext == self._target_extension:
- shutil.copy(source, target)
+ _shutil.copy(source, target)
return
- convert = getattr(self, 'convert_{}_to_{}'.format(
- ext, self._target_extension))
+ try:
+ convert = getattr(self, 'convert_{}_to_{}'.format(
+ ext, self._target_extension))
+ except AttributeError:
+ to_wav = getattr(self, 'convert_{}_to_wav'.format(ext))
+ from_wav = getattr(self, 'convert_wav_to_{}'.format(
+ self._target_extension))
+ def convert(source, target):
+ to_wav(source, self._tempfile)
+ from_wav(self._tempfile, target)
convert(source, target)
if not getattr(convert, 'handles_metadata', False):
get_metadata = getattr(self, 'get_{}_metadata'.format(ext))
if 'tracktotal' in metadata:
value = []
for i,v in enumerate(metadata['tracknumber']):
- value.append(u'{}/{}'.format(
+ value.append('{}/{}'.format(
v, metadata['tracktotal'][i]))
else:
value = metadata['tracknumber']
for id3_encoding,encoding in [(0, 'ISO-8859-1'), (3, 'utf-8')]:
encoding_success = True
for text in text_list:
- if isinstance(text, unicode):
+ if isinstance(text, str):
try:
text.encode(encoding)
except UnicodeEncodeError:
- encoding_success == False
+ encoding_success = False
break
if encoding_success:
return id3_encoding
raise ValueError(text_list)
- def convert_flac_to_mp3(self, source, target):
- self.convert_flac_to_wav(source, self._tempfile)
- self.convert_wav_to_mp3(self._tempfile, target)
-
def convert_flac_to_wav(self, source, target):
invoke(['ogg123', '-d', 'wav', '-f', target, source])
invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
convert_flac_to_ogg.handles_metadata = True
- def convert_mp3_to_flac(self, source, target):
- self.convert_mp3_to_wav(source, self._tempfile)
- self.convert_wav_to_flac(self._tempfile, target)
-
- def convert_mp3_to_ogg(self, source, target):
- self.convert_mp3_to_wav(source, self._tempfile)
- self.convert_wav_to_ogg(self._tempfile, target)
+ def convert_m4a_to_wav(self, source, target):
+ invoke(['faad', '-o', target, source])
def convert_mp3_to_wav(self, source, target):
invoke(['mpg123', '-w', target, source])
- def convert_ogg_to_flac(self, source, target):
- self.convert_ogg_to_wav(source, self._tempfile)
- self.convert_wav_to_flac(self._tempfile, target)
-
- def convert_ogg_to_mp3(self, source, target):
- self.convert_flac_to_mp3(source, target)
+ def convert_mp4_to_wav(self, source, target):
+ invoke(['faad', '-o', target, source])
def convert_ogg_to_wav(self, source, target):
self.convert_flac_to_wav(source, target)
self.convert_flac_to_ogg(source, target)
def get_flac_metadata(self, source):
- if mutagen is None:
+ if _mutagen is None:
raise _mutagen_import_error
- return mutagen.flac.FLAC(source)
+ return _mutagen_flac.FLAC(source)
+
+ def get_m4a_metadata(self, source):
+ return self.get_mp4_metadata(source)
def get_mp3_metadata(self, source):
- if mutagen is None:
+ if _mutagen is None:
raise _mutagen_import_error
- mp3 = mutagen.mp3.MP3(source)
+ mp3 = _mutagen_mp3.MP3(source)
metadata = {}
for key,value in mp3.items():
try:
v = value.text
if vorbis_key == 'tracknumber':
for i,v_entry in enumerate(v):
- if u'/' in v_entry:
- tracknumber,tracktotal = v_entry.split(u'/', 1)
+ if '/' in v_entry:
+ tracknumber,tracktotal = v_entry.split('/', 1)
v[i] = tracknumber
metadata['tracktotal'] = ['tracktotal']
metadata[vorbis_key] = v
return metadata
+ def get_mp4_metadata(self, source):
+ if _mutagen is None:
+ raise _mutagen_import_error
+ mp4 = _mutagen_mp4.MP4(source)
+ metadata = {}
+ for key,value in mp4.items():
+ try:
+ vorbis_key = self.mp4_to_vorbis_keys[key.lower()]
+ except KeyError:
+ continue
+ if vorbis_key == 'tracknumber':
+ tracknumber,tracktotal = value
+ value = tracknumber
+ if tracktotal:
+ metadata['tracktotal'] = [str(tracktotal)]
+ elif vorbis_key == 'part of set':
+ disknumber,disktotal = value
+ value = disknumber
+ if disktotal:
+ metadata['set total'] = [str(disktotal)]
+ try:
+ metadata[vorbis_key] = [str(value)]
+ except UnicodeEncodeError:
+ metadata[vorbis_key] = [value]
+ return metadata
+
def get_ogg_metadata(self, source):
- if mutagen is None:
+ if _mutagen is None:
raise _mutagen_import_error
- return mutagen.oggvorbis.OggVorbis(source)
+ return _mutagen_oggvorbis.OggVorbis(source)
def get_wav_metadata(self, source):
return {}
def set_flac_metadata(self, target, metadata):
- if mutagen is None:
+ if _mutagen is None:
raise _mutagen_import_error
- flac = mutagen.flac.FLAC(target)
+ flac = _mutagen_flac.FLAC(target)
self._set_vorbis_comments(flac, metadata)
flac.save()
def set_mp3_metadata(self, target, metadata):
vorbis_keys_to_id3 = dict(
(v,k) for k,v in self.id3_to_vorbis_keys.items())
- if mutagen is None:
+ if _mutagen is None:
raise _mutagen_import_error
- mp3 = mutagen.mp3.MP3(target)
+ mp3 = _mutagen_mp3.MP3(target)
if mp3.tags is not None:
mp3.tags.delete()
handled_trck = False
+ max_encoding = 0
for key,value in metadata.items():
if key == 'date':
for i,v in enumerate(value):
frame_name = vorbis_keys_to_id3[key].upper()
except KeyError:
continue
- frame = getattr(mutagen.id3, frame_name)
+ frame = getattr(_mutagen_id3, frame_name)
id3_encoding = self._guess_id3_encoding(value)
+ max_encoding = max(max_encoding, id3_encoding)
mp3[frame_name] = frame(encoding=id3_encoding, text=value)
if mp3.tags is None:
return
- mp3.save(v1=2)
+ if max_encoding: # at least one tag doesn't use ISO-8859-1
+ v1 = 0 # remove ID3v1 tags
+ else:
+ v1 = 2 # create and/or update ID3v1 tags
+ mp3.save(v1=v1)
def set_ogg_metadata(self, target, metadata):
- if mutagen is None:
+ if _mutagen is None:
raise _mutagen_import_error
- ogg = mutagen.oggvorbis.OggVorbis(target)
+ ogg = _mutagen_oggvorbis.OggVorbis(target)
self._set_vorbis_comments(ogg, metadata)
ogg.save()
if __name__ == '__main__':
- import optparse
+ import argparse
import sys
- usage = '%prog [options] source-dir target-dir'
- epilog = __doc__
- p = optparse.OptionParser(usage=usage, epilog=epilog)
- p.format_epilog = lambda formatter: epilog+'\n'
- p.add_option('-t', '--target-extension', dest='ext',
- default='ogg', metavar='EXT',
- help='Conversion target type (e.g. flac, mp3) (%default)')
- p.add_option('-c', '--cache', dest='cache', metavar='PATH',
- help=('Save conversion hashes in a cache file to avoid '
- 'repeated previous conversions.'))
- p.add_option('-n', '--no-hash', dest='hash', action='store_false',
- default=True,
- help=("Don't hash files. Just assume matching names would "
- 'have matching hashes.'))
- p.add_option('-i', '--ignore', dest='ignore', metavar='REGEXP',
- help=('Ignore source paths matching REGEXP.'))
- p.add_option('--test', dest='test', action='store_true', default=False,
- help='Run internal tests and exit')
-
- options,args = p.parse_args()
-
- if options.test:
+ class Formatter (argparse.RawDescriptionHelpFormatter,
+ argparse.ArgumentDefaultsHelpFormatter):
+ pass
+
+ p = argparse.ArgumentParser(
+ description=__doc__.splitlines()[0],
+ epilog='\n'.join(__doc__.splitlines()[2:]),
+ formatter_class=Formatter)
+ p.add_argument(
+ '-v', '--version', action='version',
+ version='%(prog)s {}'.format(__version__))
+ p.add_argument(
+ '-t', '--target-extension', dest='ext', metavar='EXT',
+ default='ogg', choices=['flac', 'mp3', 'ogg', 'wav'],
+ help='Conversion target type')
+ p.add_argument(
+ '-c', '--cache', dest='cache', metavar='PATH',
+ help=('Save conversion hashes in a cache file to avoid '
+ 'repeated previous conversions.'))
+ p.add_argument(
+ '-n', '--no-hash', dest='hash',
+ default=True, action='store_const', const=False,
+ help=("Don't hash files. Just assume matching names would "
+ 'have matching hashes.'))
+ p.add_argument(
+ '-i', '--ignore', dest='ignore', metavar='REGEXP',
+ help='Ignore source paths matching REGEXP.')
+ p.add_argument(
+ '--test', dest='test',
+ default=False, action='store_const', const=True,
+ help='Run internal tests and exit')
+ p.add_argument(
+ 'source_dir', metavar='SOURCE', default='.',
+ help='Source directory')
+ p.add_argument(
+ 'target_dir', metavar='TARGET', default='.',
+ help='Target directory')
+
+ args = p.parse_args()
+
+ if args.test:
sys.exit(test())
- if options.ignore is not None:
- ignore_regexp = _re.compile(options.ignore)
+ if args.ignore:
+ ignore_regexp = _re.compile(args.ignore)
ignore_function = ignore_regexp.match
else:
ignore_function = None
- source_dir,target_dir = args
- c = Converter(source_dir, target_dir, target_extension=options.ext,
- cache_file=options.cache, hash=options.hash,
- ignore_function=ignore_function)
+ c = Converter(
+ args.source_dir, args.target_dir, target_extension=args.ext,
+ cache_file=args.cache, hash=args.hash,
+ ignore_function=ignore_function)
try:
c.run()
finally: