# Copyright
+import urllib.request as _urllib_request
+
+from . import USER_AGENT as _USER_AGENT
+
+
class Feed (object):
r"""An iCalendar feed (:RFC:`5545`)
- >>> f = Feed(url='http://example.com/calendar.ics')
- >>> f
- <Feed url:http://example.com/calendar.ics>
+ Figure out where the example feed is located, relative to the
+ directory from which you run this doctest (i.e., the project's
+ root directory).
+
+ >>> import os
+ >>> root_dir = os.curdir
+ >>> data_file = os.path.abspath(os.path.join(
+ ... root_dir, 'test', 'data', 'geohash.ics'))
+ >>> url = 'file://{}'.format(data_file.replace(os.sep, '/'))
+
+ Create a new feed pointing to this URL.
+
+ >>> f = Feed(url=url)
+ >>> f # doctest: +ELLIPSIS
+ <Feed url:file://.../test/data/geohash.ics>
>>> print(f)
<BLANKLINE>
- We can't fetch this dummy url, so load the content by hand.
+ Load the feed content.
+
+ >>> f.fetch()
+
+ The ``.__str__`` method displays the feed content using Python's
+ universal newlines.
- >>> import os
- >>> root_dir = os.curdir
- >>> data_file = os.path.join(os.curdir, 'test', 'data', 'geohash.ics')
- >>> with open(data_file, 'r') as data:
- ... f.content = data.read().replace('\n', data.newlines)
- ... assert data.newlines == '\r\n', data.newlines
>>> print(f) # doctest: +REPORT_UDIFF
BEGIN:VCALENDAR
VERSION:2.0
>>> stream.getvalue() # doctest: +ELLIPSIS
'BEGIN:VCALENDAR\r\nVERSION:2.0\r\n...END:VCALENDAR\r\n'
"""
- def __init__(self, url, content=None):
+ def __init__(self, url, content=None, user_agent=None):
self.url = url
self.content = content
+ if user_agent is None:
+ user_agent = _USER_AGENT
+ self.user_agent = user_agent
def __str__(self):
if self.content:
return '<{} url:{}>'.format(type(self).__name__, self.url)
def fetch(self):
- raise NotImplementedError()
+ request = _urllib_request.Request(
+ url=self.url,
+ headers={
+ 'User-Agent': self.user_agent,
+ },
+ )
+ with _urllib_request.urlopen(url=request) as f:
+ info = f.info()
+ content_type = info.get('Content-type', None)
+ if content_type != 'text/calendar':
+ raise ValueError(content_type)
+ byte_content = f.read()
+ self.content = str(byte_content, encoding='UTF-8')
def write(self, stream):
stream.write(self.content)