class IncomingEmailDispatcher (object):
"""For reading reply messages.
"""
- def __init__(self, fifo_path=None):
+ def __init__(self, fifo_path=None, verbose=True):
+ self.verbose = verbose
self._cache = []
if fifo_path == None:
self.dir_path = tempfile.mkdtemp(suffix='.pyrisk')
if self.dir_path != None:
os.rmdir(self.dir_path)
def get(self, tag):
- # FIFO blocks on open until a writer also opens
- self.fifo = open(self.fifo_path, 'r')
for msg_tag, msg in self._cache:
if msg_tag == tag:
self._cache.remove(msg)
self._cache.append((msg_tag, msg))
msg = self._get_msg()
msg_tag = self._msg_tag(msg['Subject'])
- self.fifo.close()
+ if self.verbose == True:
+ print >> sys.stderr, msg
return msg
def _msg_tag(self, subject):
""" Return the tag portion of a message subject.
u'[t]'
>>> ied.close()
"""
+ if subject == None:
+ return None
subject = subject.strip()
if subject.startswith('Re:'):
subject = subject[len('Re:'):]
return None
return args[0]+u']'
def _get_msg(self):
+ # FIFO blocks on open until a writer also opens
+ self.fifo = open(self.fifo_path, 'r')
text = self.fifo.read()
+ self.fifo.close()
p = Parser()
return p.parsestr(text)
msg['Message-id'] = make_msgid()
if self.sendmail != None:
- execute(self.sendmail, stdin=self._flatten(msg))
+ self._execute(self.sendmail, stdin=self._flatten(msg))
return None
s = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port)
s.connect()
def _get_mail(self, tag):
msg = self.incoming.get(tag)
msg_charset = msg.get_content_charset('utf-8')
- first_part = [self.msg.walk()][0]
- body = first_part.get_payload(decode=True)
- charset = first_part.get_content_charset(msg_charset)
- mime_type = first_part.get_content_type()
+ if msg.is_multipart():
+ for part in msg.walk():
+ mime_type = part.get_content_type()
+ if mime_type == 'text/plain':
+ break
+ body = part.get_payload(decode=True)
+ charset = part.get_content_charset(msg_charset)
+ else:
+ body = msg.get_payload(decode=True)
+ charset = msg_charset
+ mime_type = msg.get_content_type()
if not mime_type.startswith('text/plain'):
raise PlayerError('Invalid MIME type %s (must be text/plain)'
% mime_type)
else:
body.append(' %s' % c)
self._send_mail(world, log, 'Drawing cards', '\n'.join(body))
- def select_territory(self, world, log):
+ def __select_territory(self, world, log, error=None):
"""Return the selected territory's name.
"""
body = [
for t in world.territories():
if t.player == None:
body.append(' %s' % t)
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, 'Select territory', '\n'.join(body))
body = self._get_mail(tag)
name = body.splitlines()[0].strip()
return name
- def play_cards(self, world, log, play_required=True):
+ def play_cards(self, world, log, error=None,
+ play_required=True):
"""Decide whether or not to turn in a set of cards.
Return a list of cards to turn in or None. If play_required
'blank to pass). Available sets are:']
for i,h in enumerate(possibles):
body.append(' %d: %s' % (i, h))
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
text = body.splitlines()[0].strip()
if text == '':
return None
return possibles[int(text)]
- def place_armies(self, world, log, remaining=1, this_round=1):
+ def place_armies(self, world, log, error=None,
+ remaining=1, this_round=1):
"""Both during setup and before each turn.
Return {territory_name: num_armies, ...}
'Your current disposition is:']
for t in self.territories(world):
body.append(' %d : %s' % (t.armies, t))
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
placements = {}
line = line.strip()
if len(line) == 0:
break
+ if line.count(':') != 1:
+ raise PlayerError('Invalid syntax "%s"' % line)
armies,terr_name = [x.strip() for x in line.split(':')]
placements[terr_name] = int(armies)
return placements
- def attack_and_fortify(self, world, log, mode='attack'):
+ def attack_and_fortify(self, world, log, error=None,
+ mode='attack'):
"""Return list of (source, target, armies) tuples. Place None
in the list to end this phase.
"""
' ',
'or',
' Pass']
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
if mode == 'fortify':
elif line.lower() == 'pass' \
or (mode == 'fortify' and len(line) == 0):
return None
- def support_attack(self, world, log, source, target):
+ def support_attack(self, world, log, error,
+ source, target):
"""Follow up on a conquest by moving additional armies.
"""
subject = 'Support conquest of %s by %s' % (target, source)
'Reply with first line(s) of the body of your email set',
'to "<number_of_armies>", or leave the first line blank',
'to pass.']
+ if error != None:
+ body.insert(0, '')
+ body.insert(0, str(error))
tag = self._send_mail(world, log, subject, '\n'.join(body))
body = self._get_mail(tag)
text = body.splitlines()[0].strip()