from cStringIO import StringIO
from email.generator import Generator
from email.parser import Parser
-from email.mime.text import MIMEText
+from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
from email.utils import getaddresses, formataddr, formatdate, make_msgid
import os
import smtplib
class IncomingEmailDispatcher (object):
"""For reading reply messages.
"""
- def __init__(self, fifo_path=None):
+ def __init__(self, fifo_path=None, verbose=True):
+ self.verbose = verbose
self._cache = []
if fifo_path == None:
self.dir_path = tempfile.mkdtemp(suffix='.pyrisk')
if self.dir_path != None:
os.rmdir(self.dir_path)
def get(self, tag):
- # FIFO blocks on open until a writer also opens
- self.fifo = open(self.fifo_path, 'r')
for msg_tag, msg in self._cache:
if msg_tag == tag:
self._cache.remove(msg)
self._cache.append((msg_tag, msg))
msg = self._get_msg()
msg_tag = self._msg_tag(msg['Subject'])
- self.fifo.close()
+ if self.verbose == True:
+ print >> sys.stderr, msg
return msg
def _msg_tag(self, subject):
""" Return the tag portion of a message subject.
u'[t]'
>>> ied.close()
"""
+ if subject == None:
+ return None
subject = subject.strip()
if subject.startswith('Re:'):
subject = subject[len('Re:'):]
return None
return args[0]+u']'
def _get_msg(self):
+ # FIFO blocks on open until a writer also opens
+ self.fifo = open(self.fifo_path, 'r')
text = self.fifo.read()
+ self.fifo.close()
p = Parser()
return p.parsestr(text)
"""For sending outgoing messages.
"""
def __init__(self, return_address, return_name='PyRisk server',
- sendmail=None, verbose_excute=False,
+ sendmail=None, verbose_execute=False,
smtp_host=None, smtp_port=465,
smtp_user=None, smtp_password=None):
self.return_address = return_address
msg['Message-id'] = make_msgid()
if self.sendmail != None:
- execute(self.sendmail, stdin=self._flatten(msg))
+ self._execute(self.sendmail, stdin=self._flatten(msg))
return None
s = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port)
s.connect()
TODO: details on procmail setup
"""
- def __init__(self, name, address, incoming, outgoing):
+ def __init__(self, name, address, incoming, outgoing, world_renderer=None):
Player.__init__(self, name)
self.address = address
self.outgoing = outgoing
self.incoming = incoming
+ self.world_renderer = world_renderer
def _tag(self):
return '[PyRisk %d]' % (id(self))
def _send_mail(self, world, log, subject, body):
msg.attach(encodedMIMEText(body, filename='body'))
msg.attach(self._log_part(log))
msg.attach(self._world_part(world))
+ if self.world_renderer != None:
+ msg.attach(self._rendered_world_part(world, log))
msg['To'] = formataddr((self.name, self.address))
tag = self._tag()
msg['Subject'] = '%s %s' % (tag, subject)
def _get_mail(self, tag):
msg = self.incoming.get(tag)
msg_charset = msg.get_content_charset('utf-8')
- first_part = [self.msg.walk()][0]
- body = first_part.get_payload(decode=True)
- charset = first_part.get_content_charset(msg_charset)
- mime_type = first_part.get_content_type()
+ if msg.is_multipart():
+ for part in msg.walk():
+ mime_type = part.get_content_type()
+ if mime_type == 'text/plain':
+ break
+ body = part.get_payload(decode=True)
+ charset = part.get_content_charset(msg_charset)
+ else:
+ body = msg.get_payload(decode=True)
+ charset = msg_charset
+ mime_type = msg.get_content_type()
if not mime_type.startswith('text/plain'):
raise PlayerError('Invalid MIME type %s (must be text/plain)'
% mime_type)
else:
body.append(' %s\t%s\t%d' % (terr, terr.player, terr.armies))
return encodedMIMEText('\n'.join(body), filename='world')
+ def _rendered_world_part(self, world, log):
+ start_event = log[0]
+ players = start_event.players
+ body = self.world_renderer.render(world, players)
+ filename,subtype = \
+ self.world_renderer.filename_and_mime_image_type(world)
+ part = MIMEImage(body, subtype)
+ part.add_header('Content-Disposition', 'attachment', filename=filename)
+ return part
def _log_part(self, log):
- return encodedMIMEText('\n'.join(log), filename='log')
+ return encodedMIMEText('\n'.join([str(e) for e in log]),
+ filename='log')
def report(self, world, log):
"""Send reports about death and game endings.
else:
body.append(' %s' % c)
self._send_mail(world, log, 'Drawing cards', '\n'.join(body))
- def select_territory(self, world, log):
+ def __select_territory(self, world, log, error=None):
"""Return the selected territory's name.
"""
body = [
for t in world.territories():
if t.player == None:
body.append(' %s' % t)
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, 'Select territory', '\n'.join(body))
body = self._get_mail(tag)
name = body.splitlines()[0].strip()
return name
- def play_cards(self, world, log, play_required=True):
+ def play_cards(self, world, log, error=None,
+ play_required=True):
"""Decide whether or not to turn in a set of cards.
Return a list of cards to turn in or None. If play_required
'blank to pass). Available sets are:']
for i,h in enumerate(possibles):
body.append(' %d: %s' % (i, h))
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
text = body.splitlines()[0].strip()
if text == '':
return None
return possibles[int(text)]
- def place_armies(self, world, log, remaining=1, this_round=1):
+ def place_armies(self, world, log, error=None,
+ remaining=1, this_round=1):
"""Both during setup and before each turn.
Return {territory_name: num_armies, ...}
'Your current disposition is:']
for t in self.territories(world):
body.append(' %d : %s' % (t.armies, t))
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
placements = {}
line = line.strip()
if len(line) == 0:
break
+ if line.count(':') != 1:
+ raise PlayerError('Invalid syntax "%s"' % line)
armies,terr_name = [x.strip() for x in line.split(':')]
placements[terr_name] = int(armies)
return placements
- def attack_and_fortify(self, world, log, mode='attack'):
+ def attack_and_fortify(self, world, log, error=None,
+ mode='attack'):
"""Return list of (source, target, armies) tuples. Place None
in the list to end this phase.
"""
' ',
'or',
' Pass']
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
if mode == 'fortify':
elif line.lower() == 'pass' \
or (mode == 'fortify' and len(line) == 0):
return None
- def support_attack(self, world, log, source, target):
+ def support_attack(self, world, log, error,
+ source, target):
"""Follow up on a conquest by moving additional armies.
"""
subject = 'Support conquest of %s by %s' % (target, source)
'Reply with first line(s) of the body of your email set',
'to "<number_of_armies>", or leave the first line blank',
'to pass.']
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
text = body.splitlines()[0].strip()