f104029419a736723ae8a64fc8b1e2ad7ce8e145
[pycalendar.git] / pycalendar / feed.py
1 # Copyright
2
3 import logging as _logging
4 import urllib.request as _urllib_request
5
6 from . import USER_AGENT as _USER_AGENT
7 from . import entry as _entry
8
9
10 _LOG = _logging.getLogger(__name__)
11
12
13 class Feed (set):
14     r"""An iCalendar feed (:RFC:`5545`)
15
16     Figure out where the example feed is located, relative to the
17     directory from which you run this doctest (i.e., the project's
18     root directory).
19
20     >>> import os
21     >>> root_dir = os.curdir
22     >>> data_file = os.path.abspath(os.path.join(
23     ...         root_dir, 'test', 'data', 'geohash.ics'))
24     >>> url = 'file://{}'.format(data_file.replace(os.sep, '/'))
25
26     Create a new feed pointing to this URL.
27
28     >>> f = Feed(url=url)
29     >>> f  # doctest: +ELLIPSIS
30     <Feed url:file://.../test/data/geohash.ics>
31     >>> print(f)
32     <BLANKLINE>
33
34     Load the feed content.
35
36     >>> f.fetch()
37
38     The ``.__str__`` method displays the feed content using Python's
39     universal newlines.
40
41     >>> print(f)  # doctest: +REPORT_UDIFF
42     BEGIN:VCALENDAR
43     VERSION:2.0
44     PRODID:-//Example Calendar//NONSGML v1.0//EN
45     BEGIN:VEVENT
46     UID:2013-06-30@geohash.invalid
47     DTSTAMP:2013-06-30T00:00:00Z
48     DTSTART;VALUE=DATE:20130630
49     DTEND;VALUE=DATE:20130701
50     SUMMARY:XKCD geohashing\, Boston graticule
51     URL:http://xkcd.com/426/
52     LOCATION:Snow Hill\, Dover\, Massachusetts
53     GEO:42.226663,-71.28676
54     END:VEVENT
55     END:VCALENDAR
56
57     To get the CRLF line endings specified in :RFC:`5545`, use the
58     ``.write`` method.
59
60     >>> import io
61     >>> stream = io.StringIO()
62     >>> f.write(stream=stream)
63     >>> stream.getvalue()  # doctest: +ELLIPSIS
64     'BEGIN:VCALENDAR\r\nVERSION:2.0\r\n...END:VCALENDAR\r\n'
65
66     You can also iterate through events:
67
68     >>> for event in f:
69     ...     print(repr(event))
70     ...     print(event)
71     <Entry type:VEVENT>
72     BEGIN:VEVENT
73     UID:2013-06-30@geohash.invalid
74     DTSTAMP:2013-06-30T00:00:00Z
75     DTSTART;VALUE=DATE:20130630
76     DTEND;VALUE=DATE:20130701
77     SUMMARY:XKCD geohashing\, Boston graticule
78     URL:http://xkcd.com/426/
79     LOCATION:Snow Hill\, Dover\, Massachusetts
80     GEO:42.226663,-71.28676
81     END:VEVENT
82     """
83     def __init__(self, url, content=None, user_agent=None):
84         super(Feed, self).__init__()
85         self.url = url
86         self.content = content
87         if user_agent is None:
88             user_agent = _USER_AGENT
89         self.user_agent = user_agent
90
91     def __str__(self):
92         if self.content:
93             return self.content.replace('\r\n', '\n').strip()
94         return ''
95
96     def __repr__(self):
97         return '<{} url:{}>'.format(type(self).__name__, self.url)
98
99     def fetch(self, force=False):
100         if self.content is None or force:
101             self._fetch()
102             self.process()
103
104     def _fetch(self):
105         request = _urllib_request.Request(
106             url=self.url,
107             headers={
108                 'User-Agent': self.user_agent,
109                 },
110             )
111         with _urllib_request.urlopen(url=request) as f:
112             info = f.info()
113             content_type = info.get('Content-type', None)
114             if content_type != 'text/calendar':
115                 raise ValueError(content_type)
116             byte_content = f.read()
117         self.content = str(byte_content, encoding='UTF-8')
118
119     def process(self):
120         _LOG.info('{!r}: processing {} content characters'.format(
121             self, len(self.content)))
122         entry = None
123         stack = []
124         for i,line in enumerate(self.content.splitlines()):
125             if line.startswith('BEGIN:'):
126                 _type = line.split(':', 1)[1]
127                 _LOG.info('{!r}: begin {}'.format(self, _type))
128                 stack.append(_type)
129                 if len(stack) == 2:
130                     if entry is not None:
131                         raise ValueError('double entry by line {}'.format(i))
132                     entry = _entry.Entry(type=_type, content=[])
133             _LOG.info(stack)
134             if entry is not None:
135                 entry.content.append(line)
136             if line.startswith('END:'):
137                 _type = line.split(':', 1)[1]
138                 _LOG.info('{!r}: end {}'.format(self, _type))
139                 if not stack or _type != stack[-1]:
140                     raise ValueError(
141                         ('closing {} on line {}, but current stack is {}'
142                          ).format(_type, i, stack))
143                 stack.pop(-1)
144                 if len(stack) == 1:
145                     entry.content.append('')  # trailing blankline
146                     entry.content = '\r\n'.join(entry.content)
147                     entry.process()
148                     self.add(entry)
149                     entry = None
150
151     def write(self, stream):
152         stream.write(self.content)