email: Pass 'config' and 'section' through from send() to *_send()
[rss2email.git] / test / test.py
1 #!/usr/bin/env python3
2
3 """Test processing logic on known feeds.
4 """
5
6 import difflib as _difflib
7 import glob as _glob
8 import io as _io
9 import logging as _logging
10 import os as _os
11 import re as _re
12
13 import rss2email as _rss2email
14 import rss2email.config as _rss2email_config
15 import rss2email.feed as _rss2email_feed
16
17
18 # Get a copy of the internal rss2email.CONFIG for copying
19 _stringio = _io.StringIO()
20 _rss2email_config.CONFIG.write(_stringio)
21 BASE_CONFIG_STRING = _stringio.getvalue()
22 del _stringio
23
24 MESSAGE_ID_REGEXP = _re.compile(
25     '^Message-ID: <[^@]*@dev.null.invalid>$', _re.MULTILINE)
26
27
28 class Send (list):
29     def __call__(self, sender, message):
30         self.append((sender, message))
31
32     def as_string(self):
33         chunks = [
34             'SENT BY: {}\n{}\n'.format(sender, message.as_string())
35             for sender,message in self]
36         return '\n'.join(chunks)
37
38
39 def clean_result(text):
40     """Cleanup dynamic portions of the generated email headers
41
42     >>> text = (
43     ...      'Date: Tue, 23 Aug 2011 15:57:37 -0000\\n'
44     ...      'Message-ID: <9dff03db-f5a7@dev.null.invalid>\\n'
45     ...      'User-Agent: rss2email\\n'
46     ...      )
47     >>> print(clean_result(text).rstrip())
48     Date: Tue, 23 Aug 2011 15:57:37 -0000
49     Message-ID: <...@dev.null.invalid>
50     User-Agent: rss2email
51     """
52     return MESSAGE_ID_REGEXP.sub('Message-ID: <...@dev.null.invalid>', text)
53
54 def test(dirname=None, config_path=None, force=False):
55     if dirname is None:
56         dirname = _os.path.dirname(config_path)
57     if config_path is None:
58         _rss2email.LOG.info('testing {}'.format(dirname))
59         for config_path in _glob.glob(_os.path.join(dirname, '*.config')):
60             test(dirname=dirname, config_path=config_path, force=force)
61         return
62     feed_path = _glob.glob(_os.path.join(dirname, 'feed.*'))[0]
63     _rss2email.LOG.info('testing {}'.format(config_path))
64     config = _rss2email_config.Config()
65     config.read_string(BASE_CONFIG_STRING)
66     read_paths = config.read([config_path])
67     feed = _rss2email_feed.Feed(name='test', url=feed_path, config=config)
68     expected_path = config_path.replace('config', 'expected')
69     with open(expected_path, 'r') as f:
70         expected = clean_result(f.read())
71     feed._send = Send()
72     feed.run()
73     generated = feed._send.as_string()
74     if force:
75         with open(expected_path, 'w') as f:
76             f.write(generated)
77     generated = clean_result(generated)
78     if generated != expected:
79         diff_lines = _difflib.unified_diff(
80             expected.splitlines(), generated.splitlines(),
81             'expected', 'generated', lineterm='')
82         raise ValueError(
83             'error processing {}\n{}'.format(
84                 config_path,
85                 '\n'.join(diff_lines)))
86
87
88 if __name__ == '__main__':
89     import argparse
90
91     parser = argparse.ArgumentParser(description=__doc__)
92     parser.add_argument(
93         '--force', action='store_const', const=True,
94         help=(
95             "write output files (useful for figuring out what's expected "
96             'from a new feed).'))
97     parser.add_argument(
98         '-V', '--verbose', default=0, action='count',
99         help='increment verbosity')
100     parser.add_argument(
101         'dir', nargs='*',
102         help='select subdirs to test (tests all subdirs by default)')
103
104     args = parser.parse_args()
105
106     if args.verbose:
107         _rss2email.LOG.setLevel(
108             max(_logging.DEBUG, _logging.ERROR - 10 * args.verbose))
109
110     # no paths on the command line, find all subdirectories
111     this_dir = _os.path.dirname(__file__)
112     if not args.dir:
113         for basename in _os.listdir(this_dir):
114             path = _os.path.join(this_dir, basename)
115             if _os.path.isdir(path):
116                 args.dir.append(path)
117
118     # we need standardized URLs, so change to `this_dir` and adjust paths
119     orig_dir = _os.getcwd()
120     _os.chdir(this_dir)
121
122     # run tests
123     for orig_path in args.dir:
124         this_path = _os.path.relpath(orig_path, start=this_dir)
125         if _os.path.isdir(this_path):
126             test(dirname=this_path, force=args.force)
127         else:
128             test(config_path=this_path, force=args.force)