# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
-"""Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions.
+"""Mirror a tree of audio files in another format.
-Other target formats are also supported. Current conversions:
+Conversion between any of the following formats are supported:
-* flac -> ogg
-* flac -> wav -> mp3
-* ogg -> wav -> flac
-* ogg -> wav -> mp3
-* mp3 -> wav -> flac
-* mp3 -> wav -> ogg
+* flac
+* mp3
+* ogg (Vorbis)
+* wav
External packages required for full functionality:
-* id3v2_ (`id3v2`)
* lame_ (`lame`)
-* flac_ (`metaflac`)
+* flac_ (`flac`)
* mpg123_ (`mpg123`)
-* vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`)
+* vorbis_ (`ogg123`, `oggenc`)
+* mutagen_ (metadata conversion)
-.. _id3v2: http://id3v2.sourceforge.net/
-.. _lame: http://lame.sourceforge.net
-.. _flac: http://flac.sourceforge.net
+.. _lame: http://lame.sourceforge.net/
+.. _flac: http://flac.sourceforge.net/
.. _mpg123: http://www.mpg123.org/
-.. _vorbis: http://www.vorbis.com
+.. _vorbis: http://www.vorbis.com/
+.. _mutagen: http://code.google.com/p/mutagen/
"""
from hashlib import sha256 as _hash
from subprocess import Popen, PIPE
from tempfile import mkstemp
+try:
+ import mutagen.flac
+ import mutagen.id3
+ import mutagen.mp3
+ import mutagen.oggvorbis
+except ImportError, _mutagen_import_error:
+ mutagen = None
-__version__ = '0.2'
+
+__version__ = '0.3'
def invoke(args, stdin=None, expect=(0,)):
- print ' %s' % args
+ print(' {}'.format(args))
p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout,stderr = p.communicate(stdin)
status = p.wait()
- assert status in expect, 'invalid status %d from %s' % (status, args)
+ assert status in expect, 'invalid status {} from {}'.format(status, args)
return (status, stdout, stderr)
The `get_` and `set_*_metadata` methods should pass metadata as a
`dict` with key/value pairs standardised to match the list of
- Vorbis comment suggestions_ with lowecase keys. The `date` field
- should be formatted `YYYY[-MM[-DD]]`.
+ Vorbis comment suggestions_ with lowercase keys. The `date` field
+ should be formatted `YYYY[-MM[-DD]]`. The dict values should be
+ lists to support repeated entries for a given tag.
.. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
"""
+ id3_to_vorbis_keys = {
+ 'comm': 'comment',
+ 'talb': 'album',
+ 'tcom': 'composer',
+ 'tcop': 'copyright',
+ 'tit2': 'title',
+ 'tpe1': 'artist',
+ 'tpe2': 'accompaniment',
+ 'tpe3': 'conductor',
+ 'tpos': 'part of set',
+ 'tpub': 'organization', # publisher
+ 'trck': 'tracknumber',
+ 'tyer': 'date',
+ }
+
def __init__(self, source_dir, target_dir, target_extension='ogg',
cache_file=None, hash=True, ignore_function=None):
self.source_dir = source_dir
assert line.startswith('# mkogg cache version:'), line
version = line.split(':', 1)[-1].strip()
if version != __version__:
- print 'cache version mismatch: %s != %s' % (
- version, __version__)
+ print('cache version mismatch: {} != {}'.format(
+ version, __version__))
return cache # old cache, ignore contents
for line in f:
try:
if self._cache_file == None:
return
with open(self._cache_file, 'w') as f:
- f.write('# mkogg cache version: %s\n' % __version__)
+ f.write('# mkogg cache version: {}\n'.format(__version__))
for key,value in self._cache.iteritems():
- f.write('%s -> %s\n' % (key, value))
+ f.write('{} -> {}\n'.format(key, value))
def run(self):
self._makedirs(self.target_dir)
if ext.startswith('.'):
ext = ext[1:]
if ext not in self._source_extensions:
- print 'skip', filename, ext
+ print('skip', filename, ext)
continue
source_path = os.path.join(dirpath, filename)
if (self._ignore_function is not None and
rel_path = os.path.relpath(dirpath, self.source_dir)
target_path = os.path.join(
self.target_dir, rel_path,
- '%s.%s' % (root, self._target_extension))
+ '{}.{}'.format(root, self._target_extension))
target_dir = os.path.dirname(target_path)
self._makedirs(target_dir)
self._convert(source_path, target_path, ext)
old_cache_value = self._cache.get(cache_key, None)
if (old_cache_value != None and
old_cache_value == self._cache_value(target)):
- print 'already cached %s to %s' % (source, target)
+ print('already cached {} to {}'.format(source, target))
return
elif os.path.exists(target):
- print 'target %s already exists' % (target)
+ print('target {} already exists'.format(target))
return
- print 'convert %s to %s' % (source, target)
+ print('convert {} to {}'.format(source, target))
if ext == self._target_extension:
shutil.copy(source, target)
return
- convert = getattr(self, 'convert_%s_to_%s'
- % (ext, self._target_extension))
+ convert = getattr(self, 'convert_{}_to_{}'.format(
+ ext, self._target_extension))
convert(source, target)
if not getattr(convert, 'handles_metadata', False):
- get_metadata = getattr(self, 'get_%s_metadata' % ext)
+ get_metadata = getattr(self, 'get_{}_metadata'.format(ext))
metadata = get_metadata(source)
- set_metadata = getattr(self, 'set_%s_metadata'
- % self._target_extension)
+ set_metadata = getattr(self, 'set_{}_metadata'.format(
+ self._target_extension))
set_metadata(target, metadata)
if not self._hash:
cache_key = self._cache_key(source)
return None
return str(h.hexdigest())
+ def _set_vorbis_comments(self, container, metadata):
+ container.delete()
+ if type(metadata) == dict:
+ items = sorted(metadata.items())
+ else:
+ items = metadata.items()
+ for key,value in items:
+ # leave key case alone, because Mutagen downcases Vorbis
+ # keys internally.
+ container[key] = value
+
def _parse_date(self, date):
"""Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
fields = fields + [None] * (3 - len(fields))
return fields
- def _parse_id3v2_comments(self, stdout):
- """Parse ID3v2 tags.
-
- Examples
- --------
- >>> from pprint import pprint
- >>> c = Converter(None, None)
- >>> metadata = c._parse_id3v2_comments('\\n'.join([
- ... 'id3v1 tag info for src/03-Drive_My_Car.mp3:',
- ... 'Title : The Famous Song Artist: No One You Know',
- ... 'Album : The Famous Album Year: 1965, Genre: Rock (17)',
- ... 'Comment: Track: 7',
- ... 'id3v2 tag info for src/03-Drive_My_Car.mp3:',
- ... 'TALB (Album/Movie/Show title): The Famous Album',
- ... 'TPE1 (Lead performer(s)/Soloist(s)): No One You Know',
- ... 'TT2 (Title/songname/content description): The Famous Song',
- ... 'TYER (Year): 1965',
- ... 'TCON (Content type): Rock (17)',
- ... 'TRCK (Track number/Position in set): 07/14']))
- >>> pprint(metadata) # doctest: +REPORT_UDIFF
- {'album': 'The Famous Album',
- 'artist': 'No One You Know',
- 'date': '1965',
- 'genre': 'Rock',
- 'title': 'The Famous Song',
- 'tracknumber': '07',
- 'tracktotal': '14'}
- >>> c.cleanup()
- """
- metadata = {}
- vorbis_keys = {
- 'comm': 'comment',
- 'talb': 'album',
- 'tcom': 'composer',
- 'tcon': 'genre',
- 'tcop': 'copyright',
- 'tit2': 'title',
- 'tpe1': 'artist',
- 'tpe2': 'accompaniment',
- 'tpe3': 'conductor',
- 'tpos': 'part of set',
- 'tpub': 'organization', # publisher
- 'trck': 'tracknumber',
- 'tyer': 'date',
- }
- drop_keys = [
- 'apic', # attached picture
- 'geob', # general encapsulated object
- 'ncon', # ?
- 'pcnt', # play counter (incremented with each play)
- 'pic', # attached picture
- 'priv', # private
- 'tbp', # beats per minute
- 'tco', # content type
- 'tcp', # frame?
- 'tenc', # encoded by
- 'tflt', # file type
- 'tope', # original artist (e.g. for a cover)
- 'tlen', # length (in milliseconds)
- 'tmed', # media type
- 'txxx', # user defined text information
- 'ufi', # unique file identifier
- 'uslt', # unsynchronized lyric/text transcription
- 'wcom', # commercial information
- 'woar', # official artist/performer webpage
- 'wxxx', # user defined URL
- ]
- key_translations = {
- 'com': 'comm',
- 'ten': 'tenc',
- 'tal': 'talb',
- 'tcm': 'tcom',
- 'tt2': 'tit2',
- 'tp1': 'tpe1',
- 'tpa': 'tpos',
- 'trk': 'trck',
- 'tye': 'tyer',
- }
- in_v2 = False
- for line in stdout.splitlines():
- if not in_v2:
- if line.startswith('id3v2 tag info'):
- in_v2 = True
- continue
- key,value = [x.strip() for x in line.split(':', 1)]
- if value.lower() == 'no id3v1 tag':
- continue
- short_key = key.split()[0].lower()
- short_key = key_translations.get(short_key, short_key)
- if short_key in drop_keys:
- continue
- v_key = vorbis_keys[short_key]
- if v_key == 'genre':
- value = value.rsplit('(', 1)[0].strip()
- elif v_key == 'tracknumber' and '/' in value:
- value,total = value.split('/')
- metadata['tracktotal'] = total
- metadata[v_key] = value
- return metadata
-
- def _parse_vorbis_comments(self, stdout):
- """Parse Vorbis comments.
-
- Examples
- --------
- >>> from pprint import pprint
- >>> c = Converter(None, None)
- >>> metadata = c._parse_vorbis_comments('\\n'.join([
- ... 'ARTIST=No One You Know',
- ... 'ALBUM=The Famous Album',
- ... 'TITLE=The Famous Song',
- ... 'DATE=1965',
- ... 'GENRE=Rock',
- ... 'TRACKNUMBER=07',
- ... 'TRACKTOTAL=14',
- ... 'CDDB=af08640e']))
- >>> pprint(metadata) # doctest: +REPORT_UDIFF
- {'album': 'The Famous Album',
- 'artist': 'No One You Know',
- 'cddb': 'af08640e',
- 'date': '1965',
- 'genre': 'Rock',
- 'title': 'The Famous Song',
- 'tracknumber': '07',
- 'tracktotal': '14'}
- >>> c.cleanup()
- """
- metadata = {}
- for line in stdout.splitlines():
- key,value = line.split('=', 1)
- metadata[key.lower()] = value
- return metadata
+ def _construct_id3_trck(self, metadata):
+ if 'tracknumber' not in metadata:
+ return (None, None)
+ if 'tracktotal' in metadata:
+ value = []
+ for i,v in enumerate(metadata['tracknumber']):
+ value.append(u'{}/{}'.format(
+ v, metadata['tracktotal'][i]))
+ else:
+ value = metadata['tracknumber']
+ key = 'tracknumber'
+ return (key, value)
+
+ def _guess_id3_encoding(self, text_list):
+ for id3_encoding,encoding in [(0, 'ISO-8859-1'), (3, 'utf-8')]:
+ encoding_success = True
+ for text in text_list:
+ if isinstance(text, unicode):
+ try:
+ text.encode(encoding)
+ except UnicodeEncodeError:
+ encoding_success == False
+ break
+ if encoding_success:
+ return id3_encoding
+ raise ValueError(text_list)
def convert_flac_to_mp3(self, source, target):
self.convert_flac_to_wav(source, self._tempfile)
def convert_mp3_to_wav(self, source, target):
invoke(['mpg123', '-w', target, source])
+ def convert_ogg_to_flac(self, source, target):
+ self.convert_ogg_to_wav(source, self._tempfile)
+ self.convert_wav_to_flac(self._tempfile, target)
+
def convert_ogg_to_mp3(self, source, target):
self.convert_flac_to_mp3(source, target)
def convert_ogg_to_wav(self, source, target):
- self.convert_flac_to_wav(source_target)
+ self.convert_flac_to_wav(source, target)
def convert_wav_to_flac(self, source, target):
- invoke(['flac', '-o', target, source])
+ invoke(['flac', '--force', '--output-name', target, source])
def convert_wav_to_mp3(self, source, target):
invoke(['lame', '--quiet', '-V', '4', source, target])
self.convert_flac_to_ogg(source, target)
def get_flac_metadata(self, source):
- status,stdout,stderr = invoke(
- ['metaflac', '--export-tags-to=-', source])
- metadata = {}
- for line in stdout.splitlines():
- key,value = line.split('=', 1)
- metadata[key.lower()] = value
- return metadata
-
- def get_flac_metadata(self, source):
- status,stdout,stderr = invoke(
- ['metaflac', '--export-tags-to=-', source])
- return self._parse_vorbis_comments(stdout)
+ if mutagen is None:
+ raise _mutagen_import_error
+ return mutagen.flac.FLAC(source)
def get_mp3_metadata(self, source):
- status,stdout,stderr = invoke(
- ['id3v2', '--list', source])
- return self._parse_id3v2_comments(stdout)
+ if mutagen is None:
+ raise _mutagen_import_error
+ mp3 = mutagen.mp3.MP3(source)
+ metadata = {}
+ for key,value in mp3.items():
+ try:
+ vorbis_key = self.id3_to_vorbis_keys[key.lower()]
+ except KeyError:
+ continue
+ v = value.text
+ if vorbis_key == 'tracknumber':
+ for i,v_entry in enumerate(v):
+ if u'/' in v_entry:
+ tracknumber,tracktotal = v_entry.split(u'/', 1)
+ v[i] = tracknumber
+ metadata['tracktotal'] = ['tracktotal']
+ metadata[vorbis_key] = v
+ return metadata
def get_ogg_metadata(self, source):
- status,stdout,stderr = invoke(
- ['vorbiscomment', '--list', source])
- return self._parse_vorbis_comments(stdout)
+ if mutagen is None:
+ raise _mutagen_import_error
+ return mutagen.oggvorbis.OggVorbis(source)
def get_wav_metadata(self, source):
return {}
def set_flac_metadata(self, target, metadata):
- stdin = '\n'.join(['%s=%s' % (k.upper(), v)
- for k,v in sorted(metadata.iteritems())])
- invoke(['metaflac', '--import-tags-from=-', target], stdin=stdin)
+ if mutagen is None:
+ raise _mutagen_import_error
+ flac = mutagen.flac.FLAC(target)
+ self._set_vorbis_comments(flac, metadata)
+ flac.save()
def set_mp3_metadata(self, target, metadata):
- args = ['id3v2']
- for key,arg in [('album', '--album'), ('artist', '--artist'),
- ('title', '--song')]:
- if key in metadata:
- args.extend([arg, metadata[key]])
- if 'date' in metadata:
- year,month,day = self._parse_date(metadata['date'])
- args.extend(['--year', year])
- if 'genre' in metadata:
- genre = metadata['genre']
- if not hasattr(self, '_id3v1_genres'):
- status,stdout,stderr = invoke(['id3v2', '--list-genres'])
- genres = {}
- for line in stdout.splitlines():
- num,name = [x.strip() for x in line.split(':', 1)]
- genres[name.lower()] = num
- self._id3v1_genres = genres
- # Genre 12 = "Other"
- num = self._id3v1_genres.get(genre.lower(), '12')
- args.extend(['--genre', num])
- if 'tracknumber' in metadata:
- track = metadata['tracknumber']
- if 'tracktotal' in metadata:
- track = '%s/%s' % (track, metadata['tracktotal'])
- args.extend(['--track', track])
- args.append(target)
- invoke(args)
+ vorbis_keys_to_id3 = dict(
+ (v,k) for k,v in self.id3_to_vorbis_keys.items())
+ if mutagen is None:
+ raise _mutagen_import_error
+ mp3 = mutagen.mp3.MP3(target)
+ if mp3.tags is not None:
+ mp3.tags.delete()
+ handled_trck = False
+ for key,value in metadata.items():
+ if key == 'date':
+ for i,v in enumerate(value):
+ year,month,day = self._parse_date(v)
+ value[i] = year
+ elif key in ['tracknumber', 'tracktotal']:
+ if handled_trck is True:
+ continue
+ handled_trck = True
+ key,value = self._construct_id3_trck(metadata)
+ if value is None:
+ continue
+ try:
+ frame_name = vorbis_keys_to_id3[key].upper()
+ except KeyError:
+ continue
+ frame = getattr(mutagen.id3, frame_name)
+ id3_encoding = self._guess_id3_encoding(value)
+ mp3[frame_name] = frame(encoding=id3_encoding, text=value)
+ if mp3.tags is None:
+ return
+ mp3.save(v1=2)
def set_ogg_metadata(self, target, metadata):
- stdin = '\n'.join(['%s=%s' % (k.upper(), v)
- for k,v in sorted(metadata.iteritems())])
- invoke(['vorbiscomment', '--write', target], stdin=stdin)
+ if mutagen is None:
+ raise _mutagen_import_error
+ ogg = mutagen.oggvorbis.OggVorbis(target)
+ self._set_vorbis_comments(ogg, metadata)
+ ogg.save()
def set_wav_metadata(self, target, metadata):
pass