mkogg.py: Fix 'self.get_mp4_metadata(self, source)'
[blog.git] / posts / mkogg / mkogg.py
index a9bfdc6554539c5600383352bc3e9bbcc7397de6..9d423b689de5ba0e47e6c4ab4484103d285f55a5 100755 (executable)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 #
-# Copyright (C) 2009-2012 W. Trevor King <wking@drexel.edu>
+# Copyright (C) 2009-2015 W. Trevor King <wking@drexel.edu>
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as
@@ -21,7 +21,7 @@
 Conversion between any of the following formats are supported:
 
 * flac
-* m4a (decoding only)
+* mp4 (decoding only)
 * mp3
 * ogg (Vorbis)
 * wav
@@ -44,29 +44,32 @@ External packages required for full functionality:
 """
 
 from hashlib import sha256 as _hash
-import os
-import os.path
+import os as _os
 import re as _re
-import shutil
-from subprocess import Popen, PIPE
-from tempfile import mkstemp
+import shutil as _shutil
+import subprocess as _subprocess
+import tempfile as _tempfile
 
 try:
-    import mutagen.flac
-    import mutagen.id3
-    import mutagen.m4a
-    import mutagen.mp3
-    import mutagen.oggvorbis
-except ImportError, _mutagen_import_error:
-    mutagen = None
+    import mutagen as _mutagen
+    import mutagen.flac as _mutagen_flac
+    import mutagen.id3 as _mutagen_id3
+    import mutagen.mp4 as _mutagen_mp4
+    import mutagen.mp3 as _mutagen_mp3
+    import mutagen.oggvorbis as _mutagen_oggvorbis
+except ImportError as error:
+    _mutagen = None
+    _mutagen_import_error = error
 
 
-__version__ = '0.4'
+__version__ = '0.5'
 
 
 def invoke(args, stdin=None, expect=(0,)):
     print('  {}'.format(args))
-    p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+    p = _subprocess.Popen(
+        args, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
+        stderr=_subprocess.PIPE)
     stdout,stderr = p.communicate(stdin)
     status = p.wait()
     assert status in expect, 'invalid status {} from {}'.format(status, args)
@@ -104,7 +107,7 @@ class Converter (object):
         'tyer': 'date',
         }
 
-    m4a_to_vorbis_keys = {
+    mp4_to_vorbis_keys = {
         '\xa9cmt': 'comment',
         '\xa9alb': 'album',
         #'tcom': 'composer',
@@ -123,16 +126,16 @@ class Converter (object):
                  cache_file=None, hash=True, ignore_function=None):
         self.source_dir = source_dir
         self.target_dir = target_dir
-        self._source_extensions = ['flac', 'm4a', 'mp3', 'ogg', 'wav']
+        self._source_extensions = ['flac', 'm4a', 'mp3', 'mp4', 'ogg', 'wav']
         self._target_extension = target_extension
         self._cache_file = cache_file
         self._cache = self._read_cache()
         self._hash = hash
         self._ignore_function = ignore_function
-        f,self._tempfile = mkstemp(prefix='mkogg-')
+        f,self._tempfile = _tempfile.mkstemp(prefix='mkogg-')
 
     def cleanup(self):
-        os.remove(self._tempfile)
+        _os.remove(self._tempfile)
         self._save_cache()
 
     def _read_cache(self):
@@ -163,35 +166,35 @@ class Converter (object):
             return
         with open(self._cache_file, 'w') as f:
             f.write('# mkogg cache version: {}\n'.format(__version__))
-            for key,value in self._cache.iteritems():
+            for key,value in self._cache.items():
                 f.write('{} -> {}\n'.format(key, value))
 
     def run(self):
         self._makedirs(self.target_dir)
-        for dirpath,dirnames,filenames in os.walk(self.source_dir):
+        for dirpath,dirnames,filenames in _os.walk(self.source_dir):
             for filename in filenames:
-                root,ext = os.path.splitext(filename)
+                root,ext = _os.path.splitext(filename)
                 ext = ext.lower()
                 if ext.startswith('.'):
                     ext = ext[1:]
                 if ext not in self._source_extensions:
                     print('skip', filename, ext)
                     continue
-                source_path = os.path.join(dirpath, filename)
+                source_path = _os.path.join(dirpath, filename)
                 if (self._ignore_function is not None and
                     self._ignore_function(source_path)):
                     continue
-                rel_path = os.path.relpath(dirpath, self.source_dir)
-                target_path = os.path.join(
+                rel_path = _os.path.relpath(dirpath, self.source_dir)
+                target_path = _os.path.join(
                     self.target_dir, rel_path,
                     '{}.{}'.format(root, self._target_extension))
-                target_dir = os.path.dirname(target_path)
+                target_dir = _os.path.dirname(target_path)
                 self._makedirs(target_dir)
                 self._convert(source_path, target_path, ext)
 
     def _makedirs(self, target_dir):
-        if not os.path.exists(target_dir):
-            os.makedirs(target_dir)
+        if not _os.path.exists(target_dir):
+            _os.makedirs(target_dir)
 
     def _convert(self, source, target, ext):
         if self._hash:
@@ -201,12 +204,12 @@ class Converter (object):
                 old_cache_value == self._cache_value(target)):
                 print('already cached {} to {}'.format(source, target))
                 return
-        elif os.path.exists(target):
+        elif _os.path.exists(target):
             print('target {} already exists'.format(target))
             return
         print('convert {} to {}'.format(source, target))
         if ext == self._target_extension:
-            shutil.copy(source, target)
+            _shutil.copy(source, target)
             return
         try:
             convert = getattr(self, 'convert_{}_to_{}'.format(
@@ -295,7 +298,7 @@ class Converter (object):
         if 'tracktotal' in metadata:
             value = []
             for i,v in enumerate(metadata['tracknumber']):
-                value.append(u'{}/{}'.format(
+                value.append('{}/{}'.format(
                         v, metadata['tracktotal'][i]))
         else:
             value = metadata['tracknumber']
@@ -306,7 +309,7 @@ class Converter (object):
         for id3_encoding,encoding in [(0, 'ISO-8859-1'), (3, 'utf-8')]:
             encoding_success = True
             for text in text_list:
-                if isinstance(text, unicode):
+                if isinstance(text, str):
                     try:
                         text.encode(encoding)
                     except UnicodeEncodeError:
@@ -329,6 +332,9 @@ class Converter (object):
     def convert_mp3_to_wav(self, source, target):
         invoke(['mpg123',  '-w', target, source])
 
+    def convert_mp4_to_wav(self, source, target):
+        invoke(['faad', '-o', target, source])
+
     def convert_ogg_to_wav(self, source, target):
         self.convert_flac_to_wav(source, target)
 
@@ -342,18 +348,41 @@ class Converter (object):
         self.convert_flac_to_ogg(source, target)
 
     def get_flac_metadata(self, source):
-        if mutagen is None:
+        if _mutagen is None:
             raise _mutagen_import_error
-        return mutagen.flac.FLAC(source)
+        return _mutagen_flac.FLAC(source)
 
     def get_m4a_metadata(self, source):
-        if mutagen is None:
+        return self.get_mp4_metadata(source)
+
+    def get_mp3_metadata(self, source):
+        if _mutagen is None:
+            raise _mutagen_import_error
+        mp3 = _mutagen_mp3.MP3(source)
+        metadata = {}
+        for key,value in mp3.items():
+            try:
+                vorbis_key = self.id3_to_vorbis_keys[key.lower()]
+            except KeyError:
+                continue
+            v = value.text
+            if vorbis_key == 'tracknumber':
+                for i,v_entry in enumerate(v):
+                    if '/' in v_entry:
+                        tracknumber,tracktotal = v_entry.split('/', 1)
+                        v[i] = tracknumber
+                        metadata['tracktotal'] = ['tracktotal']
+            metadata[vorbis_key] = v
+        return metadata
+
+    def get_mp4_metadata(self, source):
+        if _mutagen is None:
             raise _mutagen_import_error
-        m4a = mutagen.m4a.M4A(source)
+        mp4 = _mutagen_mp4.MP4(source)
         metadata = {}
-        for key,value in m4a.items():
+        for key,value in mp4.items():
             try:
-                vorbis_key = self.m4a_to_vorbis_keys[key.lower()]
+                vorbis_key = self.mp4_to_vorbis_keys[key.lower()]
             except KeyError:
                 continue
             if vorbis_key == 'tracknumber':
@@ -366,50 +395,33 @@ class Converter (object):
                 value = disknumber
                 if disktotal:
                     metadata['set total'] = [str(disktotal)]
-            metadata[vorbis_key] = [str(value)]
-        return metadata
-
-    def get_mp3_metadata(self, source):
-        if mutagen is None:
-            raise _mutagen_import_error
-        mp3 = mutagen.mp3.MP3(source)
-        metadata = {}
-        for key,value in mp3.items():
             try:
-                vorbis_key = self.id3_to_vorbis_keys[key.lower()]
-            except KeyError:
-                continue
-            v = value.text
-            if vorbis_key == 'tracknumber':
-                for i,v_entry in enumerate(v):
-                    if u'/' in v_entry:
-                        tracknumber,tracktotal = v_entry.split(u'/', 1)
-                        v[i] = tracknumber
-                        metadata['tracktotal'] = ['tracktotal']
-            metadata[vorbis_key] = v
+                metadata[vorbis_key] = [str(value)]
+            except UnicodeEncodeError:
+                metadata[vorbis_key] = [value]
         return metadata
 
     def get_ogg_metadata(self, source):
-        if mutagen is None:
+        if _mutagen is None:
             raise _mutagen_import_error        
-        return mutagen.oggvorbis.OggVorbis(source)
+        return _mutagen_oggvorbis.OggVorbis(source)
 
     def get_wav_metadata(self, source):
         return {}
 
     def set_flac_metadata(self, target, metadata):
-        if mutagen is None:
+        if _mutagen is None:
             raise _mutagen_import_error
-        flac = mutagen.flac.FLAC(target)
+        flac = _mutagen_flac.FLAC(target)
         self._set_vorbis_comments(flac, metadata)
         flac.save()
 
     def set_mp3_metadata(self, target, metadata):
         vorbis_keys_to_id3 = dict(
             (v,k) for k,v in self.id3_to_vorbis_keys.items())
-        if mutagen is None:
+        if _mutagen is None:
             raise _mutagen_import_error
-        mp3 = mutagen.mp3.MP3(target)
+        mp3 = _mutagen_mp3.MP3(target)
         if mp3.tags is not None:
             mp3.tags.delete()
         handled_trck = False
@@ -430,7 +442,7 @@ class Converter (object):
                 frame_name = vorbis_keys_to_id3[key].upper()
             except KeyError:
                 continue
-            frame = getattr(mutagen.id3, frame_name)
+            frame = getattr(_mutagen_id3, frame_name)
             id3_encoding = self._guess_id3_encoding(value)
             max_encoding = max(max_encoding, id3_encoding)
             mp3[frame_name] = frame(encoding=id3_encoding, text=value)
@@ -443,9 +455,9 @@ class Converter (object):
         mp3.save(v1=v1)
 
     def set_ogg_metadata(self, target, metadata):
-        if mutagen is None:
+        if _mutagen is None:
             raise _mutagen_import_error
-        ogg = mutagen.oggvorbis.OggVorbis(target)
+        ogg = _mutagen_oggvorbis.OggVorbis(target)
         self._set_vorbis_comments(ogg, metadata)
         ogg.save()
 
@@ -460,43 +472,62 @@ def test():
 
 
 if __name__ == '__main__':
-    import optparse
+    import argparse
     import sys
 
-    usage = '%prog [options] source-dir target-dir'
-    epilog = __doc__
-    p = optparse.OptionParser(usage=usage, epilog=epilog)
-    p.format_epilog = lambda formatter: epilog+'\n'
-    p.add_option('-t', '--target-extension', dest='ext',
-                 default='ogg', metavar='EXT',
-                 help='Conversion target type (e.g. flac, mp3) (%default)')
-    p.add_option('-c', '--cache', dest='cache', metavar='PATH',
-                 help=('Save conversion hashes in a cache file to avoid '
-                       'repeated previous conversions.'))
-    p.add_option('-n', '--no-hash', dest='hash', action='store_false',
-                 default=True,
-                 help=("Don't hash files.  Just assume matching names would "
-                       'have matching hashes.'))
-    p.add_option('-i', '--ignore', dest='ignore', metavar='REGEXP',
-                 help=('Ignore source paths matching REGEXP.'))
-    p.add_option('--test', dest='test', action='store_true', default=False,
-                 help='Run internal tests and exit')
-
-    options,args = p.parse_args()
-
-    if options.test:
+    class Formatter (argparse.RawDescriptionHelpFormatter,
+                     argparse.ArgumentDefaultsHelpFormatter):
+        pass
+
+    p = argparse.ArgumentParser(
+        description=__doc__.splitlines()[0],
+        epilog='\n'.join(__doc__.splitlines()[2:]),
+        formatter_class=Formatter)
+    p.add_argument(
+        '-v', '--version', action='version',
+        version='%(prog)s {}'.format(__version__))
+    p.add_argument(
+        '-t', '--target-extension', dest='ext', metavar='EXT',
+        default='ogg', choices=['flac', 'mp3', 'ogg', 'wav'],
+        help='Conversion target type')
+    p.add_argument(
+        '-c', '--cache', dest='cache', metavar='PATH',
+        help=('Save conversion hashes in a cache file to avoid '
+              'repeated previous conversions.'))
+    p.add_argument(
+        '-n', '--no-hash', dest='hash',
+        default=True, action='store_const', const=False,
+        help=("Don't hash files.  Just assume matching names would "
+              'have matching hashes.'))
+    p.add_argument(
+        '-i', '--ignore', dest='ignore', metavar='REGEXP',
+        help='Ignore source paths matching REGEXP.')
+    p.add_argument(
+        '--test', dest='test',
+        default=False, action='store_const', const=True,
+        help='Run internal tests and exit')
+    p.add_argument(
+        'source_dir', metavar='SOURCE', default='.',
+        help='Source directory')
+    p.add_argument(
+        'target_dir', metavar='TARGET', default='.',
+        help='Target directory')
+
+    args = p.parse_args()
+
+    if args.test:
         sys.exit(test())
 
-    if options.ignore is not None:
-        ignore_regexp = _re.compile(options.ignore)
+    if args.ignore:
+        ignore_regexp = _re.compile(args.ignore)
         ignore_function = ignore_regexp.match
     else:
         ignore_function = None
 
-    source_dir,target_dir = args
-    c = Converter(source_dir, target_dir, target_extension=options.ext,
-                  cache_file=options.cache, hash=options.hash,
-                  ignore_function=ignore_function)
+    c = Converter(
+        args.source_dir, args.target_dir, target_extension=args.ext,
+        cache_file=args.cache, hash=args.hash,
+        ignore_function=ignore_function)
     try:
         c.run()
     finally: