1 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pygrader.
5 # pygrader is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pygrader is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pygrader. If not, see <http://www.gnu.org/licenses/>.
17 import asyncore as _asyncore
18 import email as _email
19 import smtpd as _smptd
20 import socket as _socket
22 from .. import LOG as _LOG
25 class SMTPChannel (_smptd.SMTPChannel):
27 super(SMTPChannel, self).close()
28 _LOG.debug('close {}'.format(self))
29 self.smtp_server.channel_closed()
32 class SMTPServer (_smptd.SMTPServer):
33 """An SMTP server for testing pygrader.
35 >>> from asyncore import loop
36 >>> from smtplib import SMTP
37 >>> from pgp_mime.email import encodedMIMEText
38 >>> from pygrader.test.client import MessageSender
40 >>> def process(peer, mailfrom, rcpttos, data):
41 ... print('peer: {}'.format(peer))
42 ... print('mailfrom: {}'.format(mailfrom))
43 ... print('rcpttos: {}'.format(rcpttos))
46 >>> server = SMTPServer(
47 ... ('localhost', 1025), None, process=process, count=3)
49 >>> message = encodedMIMEText('Ping')
50 >>> message['From'] = 'a@example.com'
51 >>> message['To'] = 'b@example.com, c@example.com'
52 >>> message['Cc'] = 'd@example.com'
53 >>> messages = [message, message, message]
54 >>> ms = MessageSender(address=('localhost', 1025), messages=messages)
55 >>> loop() # doctest: +REPORT_UDIFF, +ELLIPSIS
56 peer: ('127.0.0.1', ...)
57 mailfrom: a@example.com
58 rcpttos: ['b@example.com', 'c@example.com', 'd@example.com']
60 Content-Type: text/plain; charset="us-ascii"
62 Content-Transfer-Encoding: 7bit
63 Content-Disposition: inline
65 To: b@example.com, c@example.com
69 peer: ('127.0.0.1', ...)
70 mailfrom: a@example.com
71 rcpttos: ['b@example.com', 'c@example.com', 'd@example.com']
73 Content-Type: text/plain; charset="us-ascii"
75 Content-Transfer-Encoding: 7bit
76 Content-Disposition: inline
78 To: b@example.com, c@example.com
82 peer: ('127.0.0.1', ...)
83 mailfrom: a@example.com
84 rcpttos: ['b@example.com', 'c@example.com', 'd@example.com']
86 Content-Type: text/plain; charset="us-ascii"
88 Content-Transfer-Encoding: 7bit
89 Content-Disposition: inline
91 To: b@example.com, c@example.com
96 channel_class = SMTPChannel
98 def __init__(self, *args, **kwargs):
99 self.count = kwargs.pop('count', None)
100 self.process = kwargs.pop('process', None)
101 self.channels_open = 0
102 super(SMTPServer, self).__init__(*args, **kwargs)
104 def log_info(self, message, type='info'):
105 # TODO: type -> severity
108 def handle_accepted(self, conn, addr):
112 super(SMTPServer, self).handle_accepted(conn, addr)
113 self.channels_open += 1
115 def channel_closed(self):
116 self.channels_open -= 1
117 if self.channels_open == 0 and self.count <= 0:
118 _LOG.debug('close {}'.format(self))
121 def process_message(self, peer, mailfrom, rcpttos, data):
122 if self.count is not None:
124 _LOG.debug('Count: {}'.format(self.count))
125 _LOG.debug('receiving message from: {}'.format(peer))
126 _LOG.debug('message addressed from: {}'.format(mailfrom))
127 _LOG.debug('message addressed to : {}'.format(rcpttos))
128 _LOG.debug('message length : {}'.format(len(data)))
130 self.process(peer, mailfrom, rcpttos, data)