With::
- pics$ gallery.py some_directory another_directory
+ pics$ gallery.py
Note that you can store a caption for ``<PICTURE>`` as plain text in
``<PICTURE>.txt``.
import logging as _logging
import logging.handlers as _logging_handlers
import math as _math
+import mimetypes as _mimetypes
import os as _os
import os.path as _os_path
import random as _random
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.tif', '.tiff', '.png', '.gif']
VIDEO_EXTENSIONS = ['.mov', '.mp4', '.ogv']
+STREAMING_TYPES = ['video/ogg']
RESPONSES = { # httplib takes half a second to load
200: 'OK',
404: 'Not Found',
def __init__(self, command, status, stdout=None, stderr=None):
strerror = ['Command failed (%d):\n %s\n' % (status, stderr),
'while executing\n %s' % str(command)]
- Exception.__init__(self, '\n'.join(strerror))
+ super(CommandError, self).__init__('\n'.join(strerror))
self.command = command
self.status = status
self.stdout = stdout
self.stderr = stderr
+class HTTPError(Exception):
+ def __init__(self, status, message=None, content=None):
+ if message is None:
+ message = RESPONSES[status]
+ super(HTTPError, self).__init__('{} {}'.format(status, message))
+ self.status = status
+ self.message = message
+ self.content = content
+
+
class ProcessingComplete(Exception):
pass
raise CommandError(args, status, stdout, stderr)
return status, stdout, stderr
-def is_picture(filename):
+def is_image(filename):
+ for extension in IMAGE_EXTENSIONS:
+ if filename.lower().endswith(extension):
+ return True
+ return False
+
+def is_video(filename):
for extension in IMAGE_EXTENSIONS:
if filename.lower().endswith(extension):
return True
class CGIGalleryServer (object):
- def __init__(self, base_path='/var/www/localhost/htdocs/gallery/',
- base_url='/cgi-bin/gallery.py',
- cache_path='/tmp/gallery-cache/'):
+ def __init__(self, base_path='.',
+ base_url='/',
+ cache_path='/tmp/gallery-cache/',
+ serve_originals=True):
self._base_path = _os_path.abspath(base_path)
self._base_url = base_url
self._cache_path = cache_path
- self._url_regexp = _re.compile('^[a-z0-9._/-]*$')
+ self._serve_originals = serve_originals
+ self._url_regexp = _re.compile('^[a-zA-Z0-9._/-]*$')
self._rows = 3
self._columns = 3
self.header = []
content = RESPONSES[status]
self._response(header=header, content=content, stream=stream)
- def validate_url(self, url, stream=None):
- LOG.debug('validating {}'.format(repr(url)))
+ def validate_url(self, url, exists=True, directory=False):
+ LOG.debug('validating {} (exists={}, directory={})'.format(
+ repr(url), exists, directory))
if url is None:
return
elif (not self._url_regexp.match(url) or
'..' in url
):
LOG.error('invalid url')
- self._error(404, stream=stream)
- path = _os_path.join(self._base_path, url)
- if _os_path.exists(path) and not _os_path.isdir(path):
- LOG.error('nonexistent directory')
- self._error(404, stream=stream)
+ raise HTTPError(404)
+ if exists:
+ path = _os_path.join(self._base_path, url)
+ if directory:
+ if not _os_path.isdir(path):
+ LOG.error('nonexistent directory')
+ raise HTTPError(404)
+ else:
+ if not _os_path.isfile(path):
+ raise HTTPError(404, 'nonexistent file')
def serve(self, url=None, page=0, stream=None):
LOG.info('serving url {} (page {})'.format(url, page))
try:
- if url is None:
- self.index(stream=stream)
- elif url.endswith('random'):
- self.random(
- url=url, stream=stream, max_width=500, max_height=500)
- elif self.is_cached(url=url):
- self.cached(url=url, stream=stream)
- elif url.endswith('.png'):
- self._thumb(url=url, stream=stream)
- else:
- self.validate_url(url=url, stream=stream)
- self.page(url=url, page=page, stream=stream)
- LOG.error('unexpected url type')
- self._error(404, stream=stream)
+ try:
+ if url is None:
+ self.index(stream=stream)
+ elif url.endswith('random'):
+ self.random(
+ url=url, stream=stream, max_width=500, max_height=500)
+ elif self.is_cacheable(url=url):
+ self.validate_url(url=url, exists=False)
+ self.cached(url=url, stream=stream)
+ else:
+ self.validate_url(url=url, exists=False, directory=True)
+ self.page(url=url, page=page, stream=stream)
+ raise HTTPError(404, 'unexpected URL type')
+ except HTTPError as e:
+ LOG.error(e.message)
+ self._error(e.status, content=e.content, stream=stream)
except ProcessingComplete:
pass
- def relative_url(self, url, stream=None):
+ def relative_url(self, url):
if url is None:
return url
if not url.startswith(self._base_url):
- LOG.error('cannot convert {} to a relative URL of {}'.format(
- url, self._base_url))
- return self._error(404, stream=stream)
+ message = 'cannot convert {} to a relative URL of {}'.format(
+ url, self._base_url)
+ raise HTTPError(404, message)
if url == self._base_url:
return None
return url[len(self._base_url):]
if p.startswith('.') or p.endswith('~'):
continue
picture_path = _os_path.join(path, p)
- if is_picture(picture_path):
+ if is_image(picture_path):
yield picture_path
def index(self, stream=None):
LOG.debug('index page')
return self._directory(self._base_path, stream=stream)
- def _thumb(self, image, max_width=None, max_height=None, stream=None):
+ def _original_url(self, url):
+ """Reverse thumbnail URL mapping
+
+ Returns (original_url, generating_callback, callback_kwargs).
+ """
+ base,extension = _os_path.splitext(url)
+ if extension in ['.png']:
+ try:
+ root,width,height = base.rsplit('-', 2)
+ except ValueError:
+ raise HTTPError(404, 'missing width/height in {}'.format(base))
+ try:
+ width = int(width)
+ height = int(height)
+ except ValueError as e:
+ raise HTTPError(404, 'invalid width/height: {}'.format(e))
+ return (
+ root + '.jpg',
+ self._thumb,
+ {'max_width': width,
+ 'max_height': height},
+ )
+ elif extension in VIDEO_EXTENSIONS:
+ return (
+ base + '.mov',
+ getattr(self, '_{}'.format(extension), None),
+ {},
+ )
+ raise HTTPError(404, 'no original URL for {}'.format(url))
+
+ def _thumb(self, image, max_width=None, max_height=None):
if not _os_path.exists(self._cache_path):
_os.makedirs(self._cache_path)
dirname,filename = _os_path.split(image)
thumb_path = _os_path.join(cache_dir, thumb_filename)
image_path = _os_path.join(self._base_path, image)
if not _os_path.isfile(image_path):
- LOG.error('image path for thumbnail does not exist')
- return self._error(404, stream=stream)
+ raise HTTPError(404, 'image path for thumbnail does not exist')
if (not _os_path.isfile(thumb_path)
or _os_path.getmtime(image_path) > _os_path.getmtime(thumb_path)):
invoke(['convert', '-format', 'png', '-strip', '-quality', '95',
image_path,
'-thumbnail', '{:d}x{:d}'.format(max_width, max_height),
thumb_path])
- return thumb_url
+ return (thumb_path, self._url(thumb_url))
- def _mp4(self, video, stream=None):
+ def _mp4(self, video):
if not video.endswith('.mov'):
- LOG.error("can't translate {} to MPEGv4".format(video))
+ raise HTTPError(404, "can't translate {} to MPEGv4".format(video))
dirname,filename = _os_path.split(video)
mp4_filename = image_base(filename) + '.mp4'
reldir = _os_path.relpath(dirname, self._base_path)
mp4_url = _os_path.join(dirname, mp4_filename)
mp4_path = _os_path.join(cache_dir, mp4_filename)
if not _os_path.isfile(video):
- LOG.error('source video path does not exist')
- return self._error(404, stream=stream)
+ raise HTTPError(404, 'source video path does not exist')
if (not _os_path.isfile(mp4_path)
or _os_path.getmtime(video) > _os_path.getmtime(mp4_path)):
arg = ['ffmpeg', '-i', video, '-acodec', 'libfaac', '-aq', '200',
arg.extend(args)
arg.append(mp4_path)
invoke(arg)
- return self._url(mp4_url)
+ return (mp4_path, self._url(mp4_url))
def _ogv(self, video, stream=None):
if not video.endswith('.mov'):
ogv_path = _os_path.join(cache_dir, ogv_filename)
if not _os_path.isfile(video):
LOG.error('source video path does not exist')
- return self._error(404, stream=stream)
if (not _os_path.isfile(ogv_path)
or _os_path.getmtime(video) > _os_path.getmtime(ogv_path)):
arg = ['ffmpeg2theora', '--optimize']
arg.extend(args)
arg.extend(['--output', ogv_path, video])
invoke(arg)
- return self._url(ogv_url)
+ return (ogv_path, self._url(ogv_url))
def _get_image_caption(self, path):
caption_path = path + '.txt'
except IOError:
return None
- def _get_image_video(self, path, fallback=None, stream=None):
+ def _get_image_video(self, path, fallback=None):
base_path = image_base(path)
for extension in VIDEO_EXTENSIONS:
video_path = base_path + extension
if _os_path.isfile(video_path):
- return self._video(
- video_path, fallback=fallback, stream=stream)
+ return self._video(video_path, fallback=fallback)
return None
- def _captioned_video(self, path, href=None, stream=None):
- img = self._image(path, max_width=640, max_height=480, stream=stream)
+ def _captioned_video(self, path, href=None):
+ img = self._image(path, max_width=640, max_height=480)
caption = self._get_image_caption(path)
- video = self._get_image_video(path, fallback=[img], stream=stream)
+ video = self._get_image_video(path, fallback=[img])
content = []
if video:
content.extend(video)
content.append('<p>{}</p>'.format(caption))
return content
- def _video(self, video, fallback=None, stream=None, **kwargs):
+ def _video(self, video, fallback=None, **kwargs):
if fallback is None:
fallback = [
'<p>Your browser does not support the <video> tag, try',
'</p>',
]
fallback = [' '+line for line in fallback]
- ogv = self._ogv(video, stream=stream)
- mp4 = self._mp4(video, stream=stream)
+ ogv_path,ogv_url = self._ogv(video)
+ mp4_path,mp4_url = self._mp4(video)
return [
'<p>',
(' <video preloads="none" controls="controls" '
'width="640" height="480">'),
- ' <source src="{}"'.format(mp4),
+ ' <source src="{}"'.format(mp4_url),
(''' type='video/mp4; '''
'''codecs="avc1.42E01E, mp4a.40.2"' />'''),
- ' <source src="{}"'.format(ogv),
+ ' <source src="{}"'.format(ogv_url),
''' type='video/ogg; codecs="theora,vorbis"' />''',
] + fallback + [
' </video>',
'</p>',
'<p>Download as',
- ' <a href="{}">Ogg/Theora/Vorbis</a> or'.format(ogv),
+ ' <a href="{}">Ogg/Theora/Vorbis</a> or'.format(ogv_url),
(' <a href="{}">Mpeg4/H.264(ConstrainedBaselineProfile)/AAC</a>.'
- ).format(mp4),
+ ).format(mp4_url),
'<p>',
]
def _image(self, image, **kwargs):
if kwargs:
- image = self._thumb(image, **kwargs)
- return '<img src="{}" />'.format(self._url(image))
+ image_path,image_url = self._thumb(image, **kwargs)
+ return '<img src="{}" />'.format(image_url)
def _image_page(self, image):
return image_base(image) + '/'
def random(self, url=None, stream=None, **kwargs):
LOG.debug('random image')
if url.endswith('/random'):
- base_dir = _os_path.join(
- self._base_path, url[:(-len('/random'))])
+ url = url[:(-len('/random'))]
+ self.validate_url(url=url, directory=True, stream=stream)
+ base_dir = _os_path.join(self._base_path, url)
elif url == 'random':
base_dir = self._base_path
else:
- self._error(404, stream=stream)
+ raise HTTPError(404)
images = []
for dirpath,dirnames,filenames in _os.walk(base_dir):
for filename in filenames:
- if is_picture(filename):
+ if is_image(filename):
images.append(_os_path.join(dirpath, filename))
if not images:
self._response(content='<p>no images to choose from</p>',
image = _random.choice(images)
LOG.debug('selected random image {}'.format(image))
page = self._image_page(image)
- content = self._captioned_video(path=image, href=page, stream=stream)
+ content = self._captioned_video(path=image, href=page)
self._response(content='\n'.join(content), stream=stream)
- def is_cached(self, url):
- for extension in ['.png', '.mp4', '.ogv']:
- if url.endswith(extension):
- return True
- return False
+ def is_cacheable(self, url):
+ return is_image(url) or is_video(url)
def cached(self, url, stream=None):
- LOG.debug('retrieving cached item')
- if url.endswith('.png'):
- mime = 'image/png'
- elif url.endswith('.ogv'):
- mime = 'video/ogg'
- elif url.endswith('.mp4'):
- mime = 'video/mp4'
- else:
- raise NotImplementedError()
- header = self._http_header(mime=mime)
+ LOG.debug('retrieving possibly cached item')
+ mime = _mimetypes.guess_type(url)[0]
+ if mime is None:
+ raise HTTPError(404, 'unknown mime type for {}'.format(url))
cache_path = _os_path.join(self._cache_path, url)
+ original_path = _os_path.join(self._base_path, url)
+ path = None
+ if _os_path.isfile(cache_path):
+ LOG.debug('return cached item {}'.format(cache_path))
+ path = cache_path
+ elif self._serve_originals and _os_path.isfile(original_path):
+ LOG.debug('return original item {}'.format(original_path))
+ path = original_path
+ else:
+ LOG.debug('possibly create cached item {}'.format(cache_path))
+ original_url,callback,kwargs = self._original_url(url)
+ original_path = _os_path.join(self._base_path, original_url)
+ if callback and _os_path.isfile(original_path):
+ path,cache_url = callback(original_path, **kwargs)
+ if not path:
+ raise HTTPError(404)
try:
- content = open(cache_path, 'rb')
- except IOError, e:
- LOG.error('invalid url')
+ content = open(path, 'rb')
+ except IOError as e:
LOG.error(e)
- self._error(404, stream=stream)
- if mime in ['video/ogg']:
+ raise HTTPError(404, 'item not found {}'.format(url))
+ header = self._http_header(mime=mime)
+ if mime in STREAMING_TYPES:
self._response_stream(
header=header, content=content, stream=stream)
content = content.read()
self._response(header=header, content=content, stream=stream)
def page(self, url, page=0, stream=None):
- LOG.debug('HTML page')
+ LOG.debug('HTML page {} {}'.format(url, page))
if not url.endswith('/'):
- LOG.error('HTML page URLs must end with a slash')
- self._error(404, stream=stream)
+ raise HTTPError(404, 'HTML page URLs must end with a slash')
abspath = _os_path.join(self._base_path, url)
if _os_path.isdir(abspath):
self._directory(path=abspath, page=page, stream=stream)
file_path = abspath[:-1] + extension
if _os_path.isfile(file_path):
self._page(path=file_path, stream=stream)
- LOG.debug('unknown HTML page')
- self._error(404, stream=stream)
+ raise HTTPError(404, 'unknown HTML page {}'.format(url))
def _directory_header(self, path):
relpath = _os_path.relpath(path, self._base_path)
content.append('</ul>')
return content
- def _directory_images(self, path, images, stream=None):
+ def _directory_images(self, path, images):
content = ['<table style="margin-left: auto; margin-right: auto;">']
column = 0
for image in images:
page = self._image_page(image)
- img = self._image(
- image, max_width=300, max_height=300, stream=stream)
+ img = self._image(image, max_width=300, max_height=300)
link = self._link(page, img)
if column == 0:
content.append(' <tr>')
return content
def _directory(self, path, page=0, stream=None):
- LOG.debug('directory page')
+ LOG.debug('directory page {} {}'.format(path, page))
images = list(self._images(path))
images_per_page = self._rows * self._columns
pages = int(_math.ceil(float(len(images)) / images_per_page)) or 1
if page < 0 or page >= pages:
- LOG.error(
+ raise HTTPError(
+ 404,
'page out of bounds for this gallery 0 <= {:d} < {:d}'.format(
page, pages))
- self._error(404, stream=stream)
first_image = images_per_page * page
images = images[first_image:first_image+images_per_page]
content = []
nav = self._directory_page_navigation(path, page=page, pages=pages)
content.extend(nav)
content.extend(self._directory_subdirs(path))
- content.extend(self._directory_images(
- path, images=images, stream=stream))
+ content.extend(self._directory_images(path, images=images))
content.extend(nav)
content.extend(self.footer)
self._response(content='\n'.join(content), stream=stream)
def _page(self, path, stream=None):
- LOG.debug('image page')
+ LOG.debug('image page {}'.format(path))
gallery = _os_path.dirname(path)
images = list(self._images(gallery))
images_per_page = self._rows * self._columns
self._link(next_page, 'next'),
'</p>',
])
- content.extend(self._captioned_video(path, stream=stream))
+ content.extend(self._captioned_video(path))
content.append('</div>')
content.extend(self.footer)
self._response(content='\n'.join(content), stream=stream)
except ValueError:
pass
try:
- url = server.relative_url(url=url, stream=output)
+ try:
+ url = server.relative_url(url=url)
+ except HTTPError as e:
+ LOG.error(e.message)
+ server._error(e.status, content=e.content, stream=stream)
except ProcessingComplete:
pass
else: