-# Copyright
-
+# Copyright (C) 2014 W. Trevor King <wking@tremily.us>
+#
+# This file is part of package-cache.
+#
+# package-cache is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# package-cache is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# package-cache. If not, see <http://www.gnu.org/licenses/>.
+
+import calendar as _calendar
import email.utils as _email_utils
+import logging as _logging
import mimetypes as _mimetypes
import os as _os
import urllib.error as _urllib_error
-import urllib.parse as _urllib_parse
import urllib.request as _urllib_request
from . import __version__
+LOG = _logging.getLogger(__name__)
+
+
class InvalidFile (ValueError):
def __init__(self, url):
super(InvalidFile, self).__init__('invalid file {!r}'.format(url))
self.opener.addheaders = [
('User-agent', 'Package-cache/{}'.format(__version__)),
]
- if not _os.path.isdir(self.cache):
- _os.makedirs(self.cache, exist_ok=True)
def __call__(self, environ, start_response):
try:
url = environ.get('PATH_INFO', None)
if url is None:
raise InvalidFile(url=url)
- parsed_url = _urllib_parse.urlparse(url)
- relative_path = parsed_url.path.lstrip('/').replace('/', _os.path.sep)
- cache_path = _os.path.join(self.cache, relative_path)
+ cache_path = self._get_cache_path(url=url)
if not _os.path.exists(path=cache_path):
self._get_file_from_sources(url=url, path=cache_path)
if not _os.path.isfile(path=cache_path):
return self._serve_file(
path=cache_path, environ=environ, start_response=start_response)
+ def _get_cache_path(self, url):
+ relative_path = url.lstrip('/').replace('/', _os.path.sep)
+ cache_path = _os.path.abspath(_os.path.join(self.cache, relative_path))
+ check_relative_path = _os.path.relpath(
+ path=cache_path, start=self.cache)
+ if check_relative_path.startswith(_os.pardir + _os.path.sep):
+ raise InvalidFile(url=url)
+ return cache_path
+
def _get_file_from_sources(self, url, path):
+ dirname = _os.path.dirname(path)
+ if not _os.path.isdir(dirname):
+ _os.makedirs(dirname, exist_ok=True)
for i, source in enumerate(self.sources):
source_url = source.rstrip('/') + url
try:
self._get_file(url=source_url, path=path)
- except _urllib_error.HTTPError:
+ except _urllib_error.HTTPError as e:
+ LOG.warn('error getting {}: {} {}'.format(
+ source_url, e.code, e.reason))
if i == len(self.sources) - 1:
raise
else:
return
def _get_file(self, url, path):
+ LOG.info('GET {}'.format(url))
with self.opener.open(url) as response:
+ last_modified = response.getheader('Last-Modified', None)
content_length = int(response.getheader('Content-Length'))
with open(path, 'wb') as f:
block_size = 8192
f.write(data)
if len(data) < block_size:
break
+ if last_modified:
+ mtime = _calendar.timegm(_email_utils.parsedate(last_modified))
+ _os.utime(path=path, times=(mtime, mtime))
+ LOG.info('got {}'.format(url))
def _serve_file(self, path, environ, start_response):
headers = {