"""An email interface for players.
"""
-from ..base import Player
+from __future__ import absolute_import
+from cStringIO import StringIO
+from email.generator import Generator
+from email.parser import Parser
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from email.utils import getaddresses, formataddr, formatdate, make_msgid
+import os
+import smtplib
+import subprocess
+import sys
+import tempfile
+
+from ..base import Player, PlayerError
+
+
+# Configure alternative sendmail command in case smtplib is too
+# annoying. Set SENDMAIL to None to use smtplib.
+SENDMAIL = None
+#SENDMAIL = ['/usr/sbin/sendmail', '-t']
+#SENDMAIL = ['/usr/bin/msmtp', '-t']
+
+
+class IncomingEmailDispatcher (object):
+ """For reading reply messages.
+ """
+ def __init__(self, fifo_path=None):
+ self._cache = []
+ if fifo_path == None:
+ self.dir_path = tempfile.mkdtemp(suffix='.pyrisk')
+ self.fifo_path = os.path.join(self.dir_path, 'incoming')
+ else:
+ self.dir_path = None
+ self.fifo_path = os.path.abspath(fifo_path)
+ os.mkfifo(self.fifo_path)
+ def close(self):
+ os.remove(self.fifo_path)
+ if self.dir_path != None:
+ os.rmdir(self.dir_path)
+ def get(self, tag):
+ # FIFO blocks on open until a writer also opens
+ self.fifo = open(self.fifo_path, 'r')
+ for msg_tag, msg in self._cache:
+ if msg_tag == tag:
+ self._cache.remove(msg)
+ return msg
+ msg = self._get_msg()
+ msg_tag = self._msg_tag(msg['Subject'])
+ while msg_tag != tag:
+ self._cache.append((msg_tag, msg))
+ msg = self._get_msg()
+ msg_tag = self._msg_tag(msg['Subject'])
+ self.fifo.close()
+ return msg
+ def _msg_tag(self, subject):
+ """ Return the tag portion of a message subject.
+
+ >>> ied = IncomingEmailDispatcher()
+ >>> ied._msg_tag('[TAG] Hi there')
+ u'[TAG]'
+ >>> ied._msg_tag(' [tg] Hi there')
+ u'[tg]'
+ >>> ied._msg_tag(' Re: [t] Hi there')
+ u'[t]'
+ >>> ied.close()
+ """
+ subject = subject.strip()
+ if subject.startswith('Re:'):
+ subject = subject[len('Re:'):]
+ subject = subject.strip()
+ args = subject.split(u']',1)
+ if len(args) < 1:
+ return None
+ return args[0]+u']'
+ def _get_msg(self):
+ text = self.fifo.read()
+ p = Parser()
+ return p.parsestr(text)
+
+class OutgoingEmailDispatcher (object):
+ """For sending outgoing messages.
+ """
+ def __init__(self, return_address, return_name='PyRisk server',
+ sendmail=None, verbose_excute=False,
+ smtp_host=None, smtp_port=465,
+ smtp_user=None, smtp_password=None):
+ self.return_address = return_address
+ self.return_name = return_name
+ self.sendmail = sendmail
+ if self.sendmail == None:
+ self.sendmail = SENDMAIL
+ self.verbose_execute = verbose_execute
+ self.smtp_host = smtp_host
+ self.smtp_port = smtp_port
+ self.smtp_user = smtp_user
+ self.smtp_password = smtp_password
+ def send(self, msg):
+ """Send an email Message instance on its merry way.
+ """
+ msg['From'] = formataddr((self.return_name, self.return_address))
+ msg['Reply-to'] = msg['From']
+ msg['Date'] = formatdate()
+ msg['Message-id'] = make_msgid()
+
+ if self.sendmail != None:
+ execute(self.sendmail, stdin=self._flatten(msg))
+ return None
+ s = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port)
+ s.connect()
+ s.login(self.smtp_user, self.smtp_password)
+ s.sendmail(from_addr=self._source_email(msg),
+ to_addrs=self._target_emails(msg),
+ msg=self._flatten(msg))
+ s.close()
+ def _execute(self, args, stdin=None, expect=(0,)):
+ """
+ Execute a command (allows us to drive gpg).
+ """
+ if self.verbose_execute == True:
+ print >> sys.stderr, '$ '+args
+ try:
+ p = subprocess.Popen(args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False, close_fds=True)
+ except OSError, e:
+ strerror = '%s\nwhile executing %s' % (e.args[1], args)
+ raise Exception, strerror
+ output, error = p.communicate(input=stdin)
+ status = p.wait()
+ if self.verbose_execute == True:
+ print >> sys.stderr, '(status: %d)\n%s%s' % (status, output, error)
+ if status not in expect:
+ strerror = '%s\nwhile executing %s\n%s\n%d' % (args[1], args, error, status)
+ raise Exception, strerror
+ return status, output, error
+ def _source_email(self, msg, return_realname=False):
+ """
+ Search the header of an email Message instance to find the
+ sender's email address.
+ """
+ froms = msg.get_all('from', [])
+ from_tuples = getaddresses(froms) # [(realname, email_address), ...]
+ if return_realname == True:
+ return from_tuples[0] # (realname, email_address)
+ return from_tuples[0][1] # email_address
+ def _target_emails(self, msg):
+ """
+ Search the header of an email Message instance to find a
+ list of recipient's email addresses.
+ """
+ tos = msg.get_all('to', [])
+ ccs = msg.get_all('cc', [])
+ bccs = msg.get_all('bcc', [])
+ resent_tos = msg.get_all('resent-to', [])
+ resent_ccs = msg.get_all('resent-cc', [])
+ resent_bccs = msg.get_all('resent-bcc', [])
+ resent = resent_tos + resent_ccs + resent_bccs
+ if len(resent) > 0:
+ all_recipients = getaddresses(resent)
+ else:
+ all_recipients = getaddresses(tos + ccs + bccs)
+ return [addr[1] for addr in all_recipients]
+ def _flatten(self, msg, to_unicode=False):
+ """
+ Produce flat text output from an email Message instance.
+ """
+ assert msg != None
+ fp = StringIO()
+ g = Generator(fp, mangle_from_=False)
+ g.flatten(msg)
+ text = fp.getvalue()
+ if to_unicode == True:
+ encoding = msg.get_content_charset('utf-8')
+ text = unicode(text, encoding=encoding)
+ return text
+
+def encodedMIMEText(body, encoding='us-ascii', disposition='inline', filename=None):
+ if encoding == 'us-ascii':
+ part = MIMEText(body)
+ else:
+ # Create the message ('plain' stands for Content-Type: text/plain)
+ part = MIMEText(body.encode(encoding), 'plain', encoding)
+ if filename == None:
+ part.add_header('Content-Disposition', disposition)
+ else:
+ part.add_header('Content-Disposition', disposition, filename=filename)
+ return part
+
class EmailPlayer (Player):
"""Human Player with an email interface.
TODO: details on procmail setup
"""
- def __init__(self, name, address, return_address):
+ def __init__(self, name, address, incoming, outgoing):
Player.__init__(self, name)
self.address = address
- self.return_address = return_address
- def _send_email(self, world, log, subject, body):
- wpart = self._world_part(world)
- lpart = self._world_part(log)
- print 'Subject: %s\n\n%s\n' % (subject, body)
- def _get_email(self):
- body = raw_input('response? ')
- body = body.replace(r'\n', '\n')
- if len(body) == 0 or body[-1] != '\n':
- body += '\n'
+ self.outgoing = outgoing
+ self.incoming = incoming
+ def _tag(self):
+ return '[PyRisk %d]' % (id(self))
+ def _send_mail(self, world, log, subject, body):
+ msg = MIMEMultipart()
+ msg.attach(encodedMIMEText(body, filename='body'))
+ msg.attach(self._log_part(log))
+ msg.attach(self._world_part(world))
+ msg['To'] = formataddr((self.name, self.address))
+ tag = self._tag()
+ msg['Subject'] = '%s %s' % (tag, subject)
+ self.outgoing.send(msg)
+ return tag
+ def _get_mail(self, tag):
+ msg = self.incoming.get(tag)
+ msg_charset = msg.get_content_charset('utf-8')
+ first_part = [self.msg.walk()][0]
+ body = first_part.get_payload(decode=True)
+ charset = first_part.get_content_charset(msg_charset)
+ mime_type = first_part.get_content_type()
+ if not mime_type.startswith('text/plain'):
+ raise PlayerError('Invalid MIME type %s (must be text/plain)'
+ % mime_type)
+ body = unicode(body, charset) # convert text types to unicode
+ if len(body) == 0 or body[-1] != u'\n':
+ body += u'\n'
return body
def _world_part(self, world):
- pass
+ body = []
+ for continent in world:
+ body.append(str(continent))
+ for terr in continent:
+ if terr.player == None:
+ body.append(' %s\t%s' % (terr, terr.player))
+ else:
+ body.append(' %s\t%s\t%d' % (terr, terr.player, terr.armies))
+ return encodedMIMEText('\n'.join(body), filename='world')
def _log_part(self, log):
- pass
+ return encodedMIMEText('\n'.join(log), filename='log')
def report(self, world, log):
"""Send reports about death and game endings.
--------
draw - another notification-only method
"""
- self._send_email(world, log, 'Report', Player.report())
+ self._send_mail(world, log, 'Report: %s' % log[-1],
+ Player.report(world, log))
def draw(self, world, log, cards=[]):
"""Only called if you earned a new card (or cards).
body.append(' %s (owned)' % c)
else:
body.append(' %s' % c)
- self._send_email(world, log, 'Drawing cards', '\n'.join(body))
+ self._send_mail(world, log, 'Drawing cards', '\n'.join(body))
def select_territory(self, world, log):
"""Return the selected territory's name.
"""
for t in world.territories():
if t.player == None:
body.append(' %s' % t)
- self._send_email(world, log, 'Select territory', '\n'.join(body))
- body = self._get_email()
+ tag = self._send_mail(world, log, 'Select territory', '\n'.join(body))
+ body = self._get_mail(tag)
name = body.splitlines()[0].strip()
return name
def play_cards(self, world, log, play_required=True):
'blank to pass). Available sets are:']
for i,h in enumerate(possibles):
body.append(' %d: %s' % (i, h))
- self._send_email(world, log, subject, '\n'.join(body))
- body = self._get_email()
+ tag = self._send_mail(world, log, subject, '\n'.join(body))
+ body = self._get_mail(tag)
text = body.splitlines()[0].strip()
if text == '':
return None
'Your current disposition is:']
for t in self.territories(world):
body.append(' %d : %s' % (t.armies, t))
- self._send_email(world, log, subject, '\n'.join(body))
- body = self._get_email()
+ tag = self._send_mail(world, log, subject, '\n'.join(body))
+ body = self._get_mail(tag)
placements = {}
for line in body.splitlines():
line = line.strip()
' ',
'or',
' Pass']
- self._send_email(world, log, subject, '\n'.join(body))
- body = self._get_email()
+ tag = self._send_mail(world, log, subject, '\n'.join(body))
+ body = self._get_mail(tag)
if mode == 'fortify':
return [self._parse_attack_or_fortify_line(
body.splitlines()[0], mode)]
'Reply with first line(s) of the body of your email set',
'to "<number_of_armies>", or leave the first line blank',
'to pass.']
- self._send_email(world, log, subject, '\n'.join(body))
- body = self._get_email()
+ tag = self._send_mail(world, log, subject, '\n'.join(body))
+ body = self._get_mail(tag)
text = body.splitlines()[0].strip()
if text == '':
return 0
return int(text)
+
+
+def test():
+ import doctest
+ failures,tests = doctest.testmod(sys.modules[__name__])
+ return failures