Add metadata conversion to mkogg.py.
authorW. Trevor King <wking@drexel.edu>
Tue, 16 Nov 2010 16:10:21 +0000 (11:10 -0500)
committerW. Trevor King <wking@drexel.edu>
Tue, 16 Nov 2010 16:10:21 +0000 (11:10 -0500)
posts/mkogg/mkogg.py

index 76755c7fd74489037e739b0f7fda35dbdca2d1bb..f6a0d29d67b4887298532a531d80d5e9c10067f7 100755 (executable)
 # License along with this program.  If not, see
 # <http://www.gnu.org/licenses/>.
 
-"""Mirror a tree of mp3/ogg/flac files with ogg-vorbis versions
+"""Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions.
+
+Other target formats are also supported.  Current conversions:
+
+* flac -> ogg
+* flac -> wav -> mp3
+* ogg -> wav -> mp3
+* mp3 -> ogg
+
+External packages required for full functionality:
+
+* id3v2_ (`id3v2`)
+* lame_ (`lame`)
+* flac_ (`metaflac`)
+* mpg123_ (`mpg123`)
+* vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`)
+
+.. _id3v2: http://id3v2.sourceforge.net/
+.. _lame: http://lame.sourceforge.net
+.. _flac: http://flac.sourceforge.net
+.. _mpg123: http://www.mpg123.org/
+.. _vorbis: http://www.vorbis.com
 """
 
 import shutil
@@ -26,11 +47,34 @@ import os
 import os.path
 
 
+def invoke(args, stdin=None, expect=(0,)):
+    print '  %s' % args
+    p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+    stdout,stderr = p.communicate(stdin)
+    status = p.wait()
+    assert status in expect, 'invalid status %d from %s' % (status, args)
+    return (status, stdout, stderr)
+
+
 class Converter (object):
+    """Recode audio files from `source_dir` to `target_dir`.
+
+    `target_extension` sets the target encoding.
+
+    Notes
+    -----
+
+    The `get_` and `set_*_metadata` methods should pass metadata as a
+    `dict` with key/value pairs standardised to match the list of
+    Vorbis comment suggestions_ with lowecase keys.  The `date` field
+    should be formatted `YYYY[-MM[-DD]]`.
+
+    .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html
+    """
     def __init__(self, source_dir, target_dir, target_extension='ogg'):
         self.source_dir = source_dir
         self.target_dir = target_dir
-        self._source_extensions = ['flac', 'mp3', 'ogg', 'wav']
+        self._source_extensions = ['flac', 'mp3', 'ogg']
         self._target_extension = target_extension
         f,self._tempfile = mkstemp(prefix='mkogg-')
 
@@ -67,28 +111,136 @@ class Converter (object):
             shutil.copy(source, target)
         else:
             convert = getattr(self, 'convert_%s_to_%s'
-                                 % (ext, self._target_extension))
+                              % (ext, self._target_extension))
             convert(source, target)
+            if not getattr(convert, 'handles_metadata', False):
+                get_metadata = getattr(self, 'get_%s_metadata' % ext)
+                metadata = get_metadata(source)
+                set_metadata = getattr(self, 'set_%s_metadata'
+                                       % self._target_extension)
+                set_metadata(target, metadata)
+
+    def _parse_date(self, date):
+        """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`.
+
+        Examples
+        --------
+        >>> c = Converter(None, None)
+        >>> c._parse_date('2010')
+        ['2010', None, None]
+        >>> c._parse_date('2010-11')
+        ['2010', '11', None]
+        >>> c._parse_date('2010-11-16')
+        ['2010', '11', '16']
+        """
+        fields = date.split('-')
+        assert len(fields) > 0 and len(fields) <= 3, date
+        fields = fields + [None] * (3 - len(fields))
+        return fields
+
+    def _parse_id3v2_comments(self, stdout):
+        """Parse ID3v2 tags.
+
+        Examples
+        --------
+        >>> from pprint import pprint
+        >>> c = Converter(None, None)
+        >>> metadata = c._parse_id3v2_comments('\\n'.join([
+        ...     'id3v1 tag info for src/03-Drive_My_Car.mp3:',
+        ...     'Title  : The Famous Song     Artist: No One You Know',
+        ...     'Album  : The Famous Album    Year: 1965, Genre: Rock (17)',
+        ...     'Comment:                     Track: 7',
+        ...     'id3v2 tag info for src/03-Drive_My_Car.mp3:',
+        ...     'TALB (Album/Movie/Show title): The Famous Album',
+        ...     'TPE1 (Lead performer(s)/Soloist(s)): No One You Know',
+        ...     'TIT2 (Title/songname/content description): The Famous Song',
+        ...     'TYER (Year): 1965',
+        ...     'TCON (Content type): Rock (17)',
+        ...     'TRCK (Track number/Position in set): 07/14']))
+        >>> pprint(metadata)  # doctest: +REPORT_UDIFF
+        {'album': 'The Famous Album',
+         'artist': 'No One You Know',
+         'date': '1965',
+         'genre': 'Rock',
+         'title': 'The Famous Song',
+         'tracknumber': '07',
+         'tracktotal': '14'}
+        """
+        metadata = {}
+        vorbis_keys = {
+            'talb': 'album',
+            'tpe1': 'artist',
+            'tit2': 'title',
+            'trck': 'tracknumber',
+            'tyer': 'date',
+            'tcon': 'genre',
+            }
+        in_v2 = False
+        for line in stdout.splitlines():
+            if not in_v2:
+                if line.startswith('id3v2 tag info'):
+                    in_v2 = True
+                continue
+            key,value = [x.strip() for x in line.split(':', 1)]
+            short_key = key.split()[0]
+            v_key = vorbis_keys[short_key.lower()]
+            if v_key == 'genre':
+                value = value.rsplit('(', 1)[0].strip()
+            elif v_key == 'tracknumber' and '/' in value:
+                value,total = value.split('/')
+                metadata['tracktotal'] = total
+            metadata[v_key] = value
+        return metadata
+
+    def _parse_vorbis_comments(self, stdout):
+        """Parse Vorbis comments.
+
+        Examples
+        --------
+        >>> from pprint import pprint
+        >>> c = Converter(None, None)
+        >>> metadata = c._parse_vorbis_comments('\\n'.join([
+        ...     'ARTIST=No One You Know',
+        ...     'ALBUM=The Famous Album',
+        ...     'TITLE=The Famous Song',
+        ...     'DATE=1965',
+        ...     'GENRE=Rock',
+        ...     'TRACKNUMBER=07',
+        ...     'TRACKTOTAL=14',
+        ...     'CDDB=af08640e']))
+        >>> pprint(metadata)  # doctest: +REPORT_UDIFF
+        {'album': 'The Famous Album',
+         'artist': 'No One You Know',
+         'cddb': 'af08640e',
+         'date': '1965',
+         'genre': 'Rock',
+         'title': 'The Famous Song',
+         'tracknumber': '07',
+         'tracktotal': '14'}
+        """
+        metadata = {}
+        for line in stdout.splitlines():
+            key,value = line.split('=', 1)
+            metadata[key.lower()] = value
+        return metadata
 
     def convert_flac_to_mp3(self, source, target):
         self.convert_flac_to_wav(source, self._tempfile)
         self.convert_wav_to_mp3(self._tempfile, target)
 
     def convert_flac_to_wav(self, source, target):
-        p = Popen(['ogg123', '--quiet', '-d', 'wav', '-f', target, source])
-        p.wait()        
+        invoke(['ogg123', '-d', 'wav', '-f', target, source])
 
     def convert_flac_to_ogg(self, source, target):
-        p = Popen(['oggenc', '-q', '3', source, '-o', target])
-        p.wait()
+        invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target])
+    convert_flac_to_ogg.handles_metadata = True
 
     def convert_mp3_to_ogg(self, source, target):
         self.convert_mp3_to_wav(source, self._tempfile)
         self.convert_wav_to_ogg(self._tempfile, target)
 
     def convert_mp3_to_wav(self, source, target):
-        p = Popen(['mpg123', source, '-w', target])
-        p.wait()
+        invoke(['mpg123',  '-w', target, source])
 
     def convert_ogg_to_mp3(self, source, target):
         self.convert_flac_to_mp3(source, target)
@@ -97,27 +249,99 @@ class Converter (object):
         self.convert_flac_to_wav(source_target)
 
     def convert_wav_to_mp3(self, source, target):
-        p = Popen(['lame', '--silent', '-V', '4', source, target])
-        p.wait()
+        invoke(['lame', '--quiet', '-V', '4', source, target])
 
     def convert_wav_to_ogg(self, source, target):
         self.convert_flac_to_ogg(source, target)
 
+    def get_flac_metadata(self, source):
+        status,stdout,stderr = invoke(
+            ['metaflac', '--export-tags-to=-', source])
+        metadata = {}
+        for line in stdout.splitlines():
+            key,value = line.split('=', 1)
+            metadata[key.lower()] = value
+        return metadata
+
+    def get_flac_metadata(self, source):
+        status,stdout,stderr = invoke(
+            ['metaflac', '--export-tags-to=-', source])
+        return self._parse_vorbis_comments(stdout)
+
+    def get_mp3_metadata(self, source):
+        status,stdout,stderr = invoke(
+            ['id3v2', '--list', source])
+        return self._parse_id3v2_comments(stdout)
+
+    def get_ogg_metadata(self, source):
+        status,stdout,stderr = invoke(
+            ['vorbiscomment', '--list', source])
+        return self._parse_vorbis_comments(stdout)
+
+    def set_mp3_metadata(self, target, metadata):
+        args = ['id3v2']
+        for key,arg in [('album', '--album'), ('artist', '--artist'),
+                        ('title', '--song')]:
+            if key in metadata:
+                args.extend([arg, metadata[key]])
+        if 'date' in metadata:
+            year,month,day = self._parse_date(metadata['date'])
+            args.extend(['--year', year])
+        if 'genre' in metadata:
+            genre = metadata['genre']
+            if not hasattr(self, '_id3v1_genres'):
+                status,stdout,stderr = invoke(['id3v2', '--list-genres'])
+                genres = {}
+                for line in stdout.splitlines():
+                    num,name = [x.strip() for x in line.split(':', 1)]
+                    genres[name.lower()] = num
+                self._id3v1_genres = genres
+            num = self._id3v1_genres[genre.lower()]
+            args.extend(['--genre', num])
+        if 'tracknumber' in metadata:
+            track = metadata['tracknumber']
+            if 'tracktotal' in metadata:
+                track = '%s/%s' % (track, metadata['tracktotal'])
+            args.extend(['--track', track])
+        args.append(target)
+        invoke(args)
+
+    def set_ogg_metadata(self, target, metadata):
+        stdin = '\n'.join(['%s=%s' % (k.upper(), v)
+                           for k,v in metadata.iteritems()])
+        invoke(['vorbiscomment', '--write', target], stdin=stdin)
+        return self._parse_vorbis_comments(stdin)
+
+
+def test():
+    import doctest
+    results = doctest.testmod()
+    return results.failed % 127
+
 
 if __name__ == '__main__':
     import optparse
+    import sys
 
-    p = optparse.OptionParser('%prog [options] source-dir target-dir')
+    usage = '%prog [options] source-dir target-dir'
+    epilog = __doc__
+    p = optparse.OptionParser(usage=usage, epilog=epilog)
+    p.format_epilog = lambda formatter: epilog+'\n'
     p.add_option('-t', '--target-extension', dest='ext',
                  default='ogg', metavar='EXT',
                  help='Conversion target type (e.g. flac, mp3) (%default)')
+    p.add_option('--test', dest='test', action='store_true', default=False,
+                 help='Run internal tests and exit')
 
     options,args = p.parse_args()
     source_dir,target_dir = args
 
+    if options.test:
+        sys.exit(test())
+
     c = Converter(source_dir, target_dir, target_extension=options.ext)
     try:
         c.run()
     finally:
-        #c.cleanup()
+        c.cleanup()
         pass