class Entry (dict):
r"""An iCalendar entry (e.g. VEVENT)
- Get an entry.
-
- >>> from .feed import Feed
+ Load example content.
+ >>> import codecs
>>> import os
>>> root_dir = os.curdir
>>> data_file = os.path.abspath(os.path.join(
... root_dir, 'test', 'data', 'geohash.ics'))
- >>> url = 'file://{}'.format(data_file.replace(os.sep, '/'))
+ >>> with codecs.open(data_file, 'r', 'UTF-8') as f:
+ ... content = f.read()
+
+ Make an entry.
- >>> feed = Feed(url=url)
- >>> feed.fetch()
- >>> entry = feed.pop()
+ >>> calendar = Entry(content=content)
Investigate the entry.
- >>> print(entry)
+ >>> print(calendar) # doctest: +REPORT_UDIFF
+ BEGIN:VCALENDAR
+ VERSION:2.0
+ PRODID:-//Example Calendar//NONSGML v1.0//EN
BEGIN:VEVENT
UID:2013-06-30@geohash.invalid
DTSTAMP:2013-06-30T00:00:00Z
LOCATION:Snow Hill\, Dover\, Massachusetts
GEO:42.226663,-71.28676
END:VEVENT
+ END:VCALENDAR
- >>> entry.type
- 'VEVENT'
- >>> entry.content # doctest: +ELLIPSIS
- 'BEGIN:VEVENT\r\nUID:...\r\nEND:VEVENT\r\n'
+ >>> calendar.type
+ 'VCALENDAR'
``Entry`` subclasses Python's ``dict``, so you can access raw
field values in the usual ways.
- >>> entry['LOCATION']
- 'Snow Hill\\, Dover\\, Massachusetts'
- >>> entry.get('LOCATION')
- 'Snow Hill\\, Dover\\, Massachusetts'
- >>> entry.get('missing')
- >>> entry.get('missing', 'some default')
+ >>> calendar['VERSION']
+ '2.0'
+ >>> calendar.get('missing')
+ >>> calendar.get('missing', 'some default')
'some default'
+ >>> sorted(calendar.keys())
+ ['PRODID', 'VERSION', 'VEVENT']
+
+
+ Dig into the children (which are always stored as lists):
+
+ >>> event = calendar['VEVENT'][0]
+
+ >>> event.type
+ 'VEVENT'
+ >>> event.content # doctest: +ELLIPSIS
+ 'BEGIN:VEVENT\r\nUID:...\r\nEND:VEVENT\r\n'
+ >>> sorted(event.keys()) # doctest: +NORMALIZE_WHITESPACE
+ ['DTEND;VALUE=DATE', 'DTSTAMP', 'DTSTART;VALUE=DATE', 'GEO',
+ 'LOCATION', 'SUMMARY', 'UID', 'URL']
+
+ >>> event['LOCATION']
+ 'Snow Hill\\, Dover\\, Massachusetts'
You can also use ``get_text`` to unescape text fields.
- >>> entry.get_text('LOCATION')
+ >>> event.get_text('LOCATION')
'Snow Hill, Dover, Massachusetts'
"""
- def __init__(self, type, content=None):
+ def __init__(self, type=None, content=None):
super(Entry, self).__init__()
+ if type is None and content:
+ firstline = content.splitlines()[0]
+ type = firstline.split(':', 1)[1]
self.type = type
self.content = content
- self.lines = None
+ self._lines = None # unwrapped semantic lines
if content:
self.process()
return '<{} type:{}>'.format(type(self).__name__, self.type)
def process(self):
- self.clear()
self.unfold()
- self._fill_dict()
+ self._parse()
- def _fill_dict(self):
+ def _parse(self):
+ self.clear()
for index,verb,expected in [
[0, 'begin', 'BEGIN:{}'.format(self.type)],
[-1, 'end', 'END:{}'.format(self.type)],
]:
- if self.lines[index] != expected:
+ if self._lines[index] != expected:
raise ValueError('entry should {} with {!r}, not {!r}'.format(
- verb, expected, self.lines[index]))
- for line in self.lines[1:-1]:
+ verb, expected, self._lines[index]))
+ stack = []
+ child_lines = []
+ for i,line in enumerate(self._lines[1:-1]):
key,value = [x.strip() for x in line.split(':', 1)]
- if key in ['BEGIN' or 'END']:
- raise NotImplementedError(line)
- if key in self:
- if type(self[key]) == str:
- self[key] = [self[key]]
- self[key].append(value)
- else:
- self[key] = value
+ if key == 'BEGIN':
+ _LOG.debug('{!r}: begin {}'.format(self, value))
+ stack.append(value)
+ if stack:
+ child_lines.append(line)
+ if key == 'END':
+ _LOG.debug('{!r}: end {}'.format(self, value))
+ if not stack or value != stack[-1]:
+ raise ValueError(
+ ('closing {} on line {}, but current stack is {}'
+ ).format(value, i+1, stack))
+ stack.pop(-1)
+ if not stack:
+ child = Entry(
+ type=value,
+ content='\r\n'.join(child_lines) + '\r\n',
+ )
+ child._lines = child_lines
+ child._parse()
+ self._add_value(key=value, value=child, force_list=True)
+ child_lines = []
+ elif not stack: # our own data, not a child's
+ self._add_value(key=key, value=value)
+
+ def _add_value(self, key, value, force_list=False):
+ if force_list and key not in self:
+ self[key] = []
+ if key in self:
+ if type(self[key]) == str:
+ self[key] = [self[key]]
+ self[key].append(value)
+ else:
+ self[key] = value
def unfold(self):
"""Unfold wrapped lines
Following :RFC:`5545`, section 3.1 (Content Lines)
"""
- self.lines = []
+ self._lines = []
semantic_line_chunks = []
for line in self.content.splitlines():
lstrip = line.lstrip()
semantic_line_chunks.append(lstrip)
else:
if semantic_line_chunks:
- self.lines.append(''.join(semantic_line_chunks))
+ self._lines.append(''.join(semantic_line_chunks))
semantic_line_chunks = [line]
if semantic_line_chunks:
- self.lines.append(''.join(semantic_line_chunks))
+ self._lines.append(''.join(semantic_line_chunks))
def get_text(self, *args, **kwargs):
value = self.get(*args, **kwargs)
_LOG = _logging.getLogger(__name__)
-class Feed (set):
+class Feed (_entry.Entry):
r"""An iCalendar feed (:RFC:`5545`)
Figure out where the example feed is located, relative to the
You can also iterate through events:
- >>> for event in f:
+ >>> for event in f['VEVENT']:
... print(repr(event))
... print(event)
<Entry type:VEVENT>
GEO:42.226663,-71.28676
END:VEVENT
"""
- def __init__(self, url, content=None, user_agent=None):
- super(Feed, self).__init__()
+ def __init__(self, url, user_agent=None):
+ super(Feed, self).__init__(type='VCALENDAR')
self.url = url
- self.content = content
if user_agent is None:
user_agent = _USER_AGENT
self.user_agent = user_agent
- def __str__(self):
- if self.content:
- return self.content.replace('\r\n', '\n').strip()
- return ''
-
def __repr__(self):
return '<{} url:{}>'.format(type(self).__name__, self.url)
raise ValueError(content_type)
byte_content = f.read()
self.content = str(byte_content, encoding='UTF-8')
-
- def process(self):
- _LOG.info('{!r}: processing {} content characters'.format(
- self, len(self.content)))
- entry = None
- stack = []
- for i,line in enumerate(self.content.splitlines()):
- if line.startswith('BEGIN:'):
- _type = line.split(':', 1)[1]
- _LOG.info('{!r}: begin {}'.format(self, _type))
- stack.append(_type)
- if len(stack) == 2:
- if entry is not None:
- raise ValueError('double entry by line {}'.format(i))
- entry = _entry.Entry(type=_type, content=[])
- _LOG.info(stack)
- if entry is not None:
- entry.content.append(line)
- if line.startswith('END:'):
- _type = line.split(':', 1)[1]
- _LOG.info('{!r}: end {}'.format(self, _type))
- if not stack or _type != stack[-1]:
- raise ValueError(
- ('closing {} on line {}, but current stack is {}'
- ).format(_type, i, stack))
- stack.pop(-1)
- if len(stack) == 1:
- entry.content.append('') # trailing blankline
- entry.content = '\r\n'.join(entry.content)
- entry.process()
- self.add(entry)
- entry = None
-
- def write(self, stream):
- stream.write(self.content)