irkerd: Add Target.__str__ for pretty-printing targets in log messages
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). -l sets a logfile to capture message traffic from
17 channels.  -n sets the nick and -p the nickserv password. The -V
18 option prints the program version and exits.
19
20 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
21 resource page at <http://www.catb.org/~esr/irker/>.
22
23 Requires Python 2.6 or 2.5 with the simplejson library installed.
24 """
25
26 from __future__ import with_statement
27
28 # These things might need tuning
29
30 HOST = "localhost"
31 PORT = 6659
32
33 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
34 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
35 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
36 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
37 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
38 UNSEEN_TTL = 60                 # Time to live, seconds since first request
39 CHANNEL_MAX = 18                # Max channels open per socket (default)
40 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
41 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
42 CONNECTION_MAX = 200            # To avoid hitting a thread limit
43
44 # No user-serviceable parts below this line
45
46 version = "2.6"
47
48 import Queue
49 import SocketServer
50 import getopt
51 import logging
52 try:
53     import simplejson as json   # Faster, also makes us Python-2.4-compatible
54 except ImportError:
55     import json
56 import random
57 import re
58 import select
59 import signal
60 import socket
61 import sys
62 import threading
63 import time
64 import urlparse
65
66
67 LOG = logging.getLogger(__name__)
68 LOG.setLevel(logging.ERROR)
69 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
70
71
72 # Sketch of implementation:
73 #
74 # One Irker object manages multiple IRC sessions.  It holds a map of
75 # Dispatcher objects, one per (server, port) combination, which are
76 # responsible for routing messages to one of any number of Connection
77 # objects that do the actual socket conversations.  The reason for the
78 # Dispatcher layer is that IRC daemons limit the number of channels a
79 # client (that is, from the daemon's point of view, a socket) can be
80 # joined to, so each session to a server needs a flock of Connection
81 # instances each with its own socket.
82 #
83 # Connections are timed out and removed when either they haven't seen a
84 # PING for a while (indicating that the server may be stalled or down)
85 # or there has been no message traffic to them for a while, or
86 # even if the queue is nonempty but efforts to connect have failed for
87 # a long time.
88 #
89 # There are multiple threads. One accepts incoming traffic from all
90 # servers.  Each Connection also has a consumer thread and a
91 # thread-safe message queue.  The program main appends messages to
92 # queues as JSON requests are received; the consumer threads try to
93 # ship them to servers.  When a socket write stalls, it only blocks an
94 # individual consumer thread; if it stalls long enough, the session
95 # will be timed out. This solves the biggest problem with a
96 # single-threaded implementation, which is that you can't count on a
97 # single stalled write not hanging all other traffic - you're at the
98 # mercy of the length of the buffers in the TCP/IP layer.
99 #
100 # Message delivery is thus not reliable in the face of network stalls,
101 # but this was considered acceptable because IRC (notoriously) has the
102 # same problem - there is little point in reliable delivery to a relay
103 # that is down or unreliable.
104 #
105 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
106 # It is strictly compliant to RFC1459, except for the interpretation and
107 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
108 #
109 # CHANLIMIT is as described in the Internet RFC draft
110 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
111 # The ",isnick" feature is as described in
112 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
113
114 # Historical note: the IRCClient and IRCServerConnection classes
115 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
116 # irclib code that irker formerly used as a service library.  They
117 # still look similar to parts of irclib because I contributed to that
118 # code before giving up on it.
119
120 class IRCError(Exception):
121     "An IRC exception"
122     pass
123
124
125 class InvalidRequest (ValueError):
126     "An invalid JSON request"
127     pass
128
129
130 class IRCClient():
131     "An IRC client session to one or more servers."
132     def __init__(self):
133         self.mutex = threading.RLock()
134         self.server_connections = []
135         self.event_handlers = {}
136         self.add_event_handler("ping",
137                                lambda c, e: c.ship("PONG %s" % e.target))
138
139     def newserver(self):
140         "Initialize a new server-connection object."
141         conn = IRCServerConnection(self)
142         with self.mutex:
143             self.server_connections.append(conn)
144         return conn
145
146     def spin(self, timeout=0.2):
147         "Spin processing data from connections forever."
148         # Outer loop should specifically *not* be mutex-locked.
149         # Otherwise no other thread would ever be able to change
150         # the shared state of an IRC object running this function.
151         while True:
152             nextsleep = 0
153             with self.mutex:
154                 connected = [x for x in self.server_connections
155                              if x is not None and x.socket is not None]
156                 sockets = [x.socket for x in connected]
157                 if sockets:
158                     connmap = dict([(c.socket.fileno(), c) for c in connected])
159                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
160                     for s in insocks:
161                         connmap[s.fileno()].consume()
162                 else:
163                     nextsleep = timeout
164             time.sleep(nextsleep)
165
166     def add_event_handler(self, event, handler):
167         "Set a handler to be called later."
168         with self.mutex:
169             event_handlers = self.event_handlers.setdefault(event, [])
170             event_handlers.append(handler)
171
172     def handle_event(self, connection, event):
173         with self.mutex:
174             h = self.event_handlers
175             th = sorted(h.get("all_events", []) + h.get(event.type, []))
176             for handler in th:
177                 handler(connection, event)
178
179     def drop_connection(self, connection):
180         with self.mutex:
181             self.server_connections.remove(connection)
182
183
184 class LineBufferedStream():
185     "Line-buffer a read stream."
186     crlf_re = re.compile(b'\r?\n')
187
188     def __init__(self):
189         self.buffer = ''
190
191     def append(self, newbytes):
192         self.buffer += newbytes
193
194     def lines(self):
195         "Iterate over lines in the buffer."
196         lines = LineBufferedStream.crlf_re.split(self.buffer)
197         self.buffer = lines.pop()
198         return iter(lines)
199
200     def __iter__(self):
201         return self.lines()
202
203 class IRCServerConnectionError(IRCError):
204     pass
205
206 class IRCServerConnection():
207     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
208     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
209     # We only need to ensure that if some ancient server throws numerics
210     # for the ones we actually want to catch, they're mapped.
211     codemap = {
212         "001": "welcome",
213         "005": "featurelist",
214         "432": "erroneusnickname",
215         "433": "nicknameinuse",
216         "436": "nickcollision",
217         "437": "unavailresource",
218     }
219
220     def __init__(self, master):
221         self.master = master
222         self.socket = None
223
224     def connect(self, server, port, nickname,
225                 password=None, username=None, ircname=None):
226         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
227             server, port, nickname))
228         if self.socket is not None:
229             self.disconnect("Changing servers")
230
231         self.buffer = LineBufferedStream()
232         self.event_handlers = {}
233         self.real_server_name = ""
234         self.server = server
235         self.nickname = nickname
236         try:
237             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238             self.socket.bind(('', 0))
239             self.socket.connect((server, port))
240         except socket.error as err:
241             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
242
243         if password:
244             self.ship("PASS " + password)
245         self.nick(self.nickname)
246         self.user(username=username or ircname, realname=ircname or nickname)
247         return self
248
249     def close(self):
250         # Without this thread lock, there is a window during which
251         # select() can find a closed socket, leading to an EBADF error.
252         with self.master.mutex:
253             self.disconnect("Closing object")
254             self.master.drop_connection(self)
255
256     def consume(self):
257         try:
258             incoming = self.socket.recv(16384)
259         except socket.error:
260             # Server hung up on us.
261             self.disconnect("Connection reset by peer")
262             return
263         if not incoming:
264             # Dead air also indicates a connection reset.
265             self.disconnect("Connection reset by peer")
266             return
267
268         self.buffer.append(incoming)
269
270         for line in self.buffer:
271             LOG.debug("FROM: %s" % line)
272
273             if not line:
274                 continue
275
276             prefix = None
277             command = None
278             arguments = None
279             self.handle_event(Event("every_raw_message",
280                                      self.real_server_name,
281                                      None,
282                                      [line]))
283
284             m = IRCServerConnection.command_re.match(line)
285             if m.group("prefix"):
286                 prefix = m.group("prefix")
287                 if not self.real_server_name:
288                     self.real_server_name = prefix
289             if m.group("command"):
290                 command = m.group("command").lower()
291             if m.group("argument"):
292                 a = m.group("argument").split(" :", 1)
293                 arguments = a[0].split()
294                 if len(a) == 2:
295                     arguments.append(a[1])
296
297             command = IRCServerConnection.codemap.get(command, command)
298             if command in ["privmsg", "notice"]:
299                 target = arguments.pop(0)
300             else:
301                 target = None
302
303                 if command == "quit":
304                     arguments = [arguments[0]]
305                 elif command == "ping":
306                     target = arguments[0]
307                 else:
308                     target = arguments[0]
309                     arguments = arguments[1:]
310
311             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
312                 command, prefix, target, arguments))
313             self.handle_event(Event(command, prefix, target, arguments))
314
315     def handle_event(self, event):
316         self.master.handle_event(self, event)
317         if event.type in self.event_handlers:
318             for fn in self.event_handlers[event.type]:
319                 fn(self, event)
320
321     def is_connected(self):
322         return self.socket is not None
323
324     def disconnect(self, message=""):
325         if self.socket is None:
326             return
327         # Don't send a QUIT here - causes infinite loop!
328         try:
329             self.socket.shutdown(socket.SHUT_WR)
330             self.socket.close()
331         except socket.error:
332             pass
333         del self.socket
334         self.socket = None
335         self.handle_event(Event("disconnect", self.server, "", [message]))
336
337     def join(self, channel, key=""):
338         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
339
340     def mode(self, target, command):
341         self.ship("MODE %s %s" % (target, command))
342
343     def nick(self, newnick):
344         self.ship("NICK " + newnick)
345
346     def part(self, channel, message=""):
347         cmd_parts = ['PART', channel]
348         if message:
349             cmd_parts.append(message)
350         self.ship(' '.join(cmd_parts))
351
352     def privmsg(self, target, text):
353         self.ship("PRIVMSG %s :%s" % (target, text))
354
355     def quit(self, message=""):
356         self.ship("QUIT" + (message and (" :" + message)))
357
358     def user(self, username, realname):
359         self.ship("USER %s 0 * :%s" % (username, realname))
360
361     def ship(self, string):
362         "Ship a command to the server, appending CR/LF"
363         try:
364             self.socket.send(string.encode('utf-8') + b'\r\n')
365             LOG.debug("TO: %s" % string)
366         except socket.error:
367             self.disconnect("Connection reset by peer.")
368
369 class Event(object):
370     def __init__(self, evtype, source, target, arguments=None):
371         self.type = evtype
372         self.source = source
373         self.target = target
374         if arguments is None:
375             arguments = []
376         self.arguments = arguments
377
378 def is_channel(string):
379     return string and string[0] in "#&+!"
380
381 class Connection:
382     def __init__(self, irkerd, servername, port):
383         self.irker = irkerd
384         self.servername = servername
385         self.port = port
386         self.nick_trial = None
387         self.connection = None
388         self.status = None
389         self.last_xmit = time.time()
390         self.last_ping = time.time()
391         self.channels_joined = {}
392         self.channel_limits = {}
393         # The consumer thread
394         self.queue = Queue.Queue()
395         self.thread = None
396     def nickname(self, n=None):
397         "Return a name for the nth server connection."
398         if n is None:
399             n = self.nick_trial
400         if fallback:
401             return (namestyle % n)
402         else:
403             return namestyle
404     def handle_ping(self):
405         "Register the fact that the server has pinged this connection."
406         self.last_ping = time.time()
407     def handle_welcome(self):
408         "The server says we're OK, with a non-conflicting nick."
409         self.status = "ready"
410         LOG.info("nick %s accepted" % self.nickname())
411         if password:
412             self.connection.privmsg("nickserv", "identify %s" % password)
413     def handle_badnick(self):
414         "The server says our nick is ill-formed or has a conflict."
415         LOG.info("nick %s rejected" % self.nickname())
416         if fallback:
417             # Randomness prevents a malicious user or bot from
418             # anticipating the next trial name in order to block us
419             # from completing the handshake.
420             self.nick_trial += random.randint(1, 3)
421             self.last_xmit = time.time()
422             self.connection.nick(self.nickname())
423         # Otherwise fall through, it might be possible to
424         # recover manually.
425     def handle_disconnect(self):
426         "Server disconnected us for flooding or some other reason."
427         self.connection = None
428         if self.status != "expired":
429             self.status = "disconnected"
430     def handle_kick(self, outof):
431         "We've been kicked."
432         self.status = "handshaking"
433         try:
434             del self.channels_joined[outof]
435         except KeyError:
436             LOG.error("kicked by %s from %s that's not joined" % (
437                 self.servername, outof))
438         qcopy = []
439         while not self.queue.empty():
440             (channel, message, key) = self.queue.get()
441             if channel != outof:
442                 qcopy.append((channel, message, key))
443         for (channel, message, key) in qcopy:
444             self.queue.put((channel, message, key))
445         self.status = "ready"
446     def enqueue(self, channel, message, key, quit_after=False):
447         "Enque a message for transmission."
448         if self.thread is None or not self.thread.is_alive():
449             self.status = "unseen"
450             self.thread = threading.Thread(target=self.dequeue)
451             self.thread.setDaemon(True)
452             self.thread.start()
453         self.queue.put((channel, message, key))
454         if quit_after:
455             self.queue.put((channel, None, key))
456     def dequeue(self):
457         "Try to ship pending messages from the queue."
458         try:
459             while True:
460                 # We want to be kind to the IRC servers and not hold unused
461                 # sockets open forever, so they have a time-to-live.  The
462                 # loop is coded this particular way so that we can drop
463                 # the actual server connection when its time-to-live
464                 # expires, then reconnect and resume transmission if the
465                 # queue fills up again.
466                 if self.queue.empty():
467                     # Queue is empty, at some point we want to time out
468                     # the connection rather than holding a socket open in
469                     # the server forever.
470                     now = time.time()
471                     xmit_timeout = now > self.last_xmit + XMIT_TTL
472                     ping_timeout = now > self.last_ping + PING_TTL
473                     if self.status == "disconnected":
474                         # If the queue is empty, we can drop this connection.
475                         self.status = "expired"
476                         break
477                     elif xmit_timeout or ping_timeout:
478                         LOG.info((
479                             "timing out connection to %s at %s "
480                             "(ping_timeout=%s, xmit_timeout=%s)") % (
481                             self.servername, time.asctime(), ping_timeout,
482                             xmit_timeout))
483                         with self.irker.irc.mutex:
484                             self.connection.context = None
485                             self.connection.quit("transmission timeout")
486                             self.connection = None
487                         self.status = "disconnected"
488                     else:
489                         # Prevent this thread from hogging the CPU by pausing
490                         # for just a little bit after the queue-empty check.
491                         # As long as this is less that the duration of a human
492                         # reflex arc it is highly unlikely any human will ever
493                         # notice.
494                         time.sleep(ANTI_BUZZ_DELAY)
495                 elif self.status == "disconnected" \
496                          and time.time() > self.last_xmit + DISCONNECT_TTL:
497                     # Queue is nonempty, but the IRC server might be
498                     # down. Letting failed connections retain queue
499                     # space forever would be a memory leak.
500                     self.status = "expired"
501                     break
502                 elif not self.connection and self.status != "expired":
503                     # Queue is nonempty but server isn't connected.
504                     with self.irker.irc.mutex:
505                         self.connection = self.irker.irc.newserver()
506                         self.connection.context = self
507                         # Try to avoid colliding with other instances
508                         self.nick_trial = random.randint(1, 990)
509                         self.channels_joined = {}
510                         try:
511                             # This will throw
512                             # IRCServerConnectionError on failure
513                             self.connection.connect(self.servername,
514                                                 self.port,
515                                                 nickname=self.nickname(),
516                                                 username="irker",
517                                                 ircname="irker relaying client")
518                             self.status = "handshaking"
519                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
520                                 self.servername, time.asctime()))
521                             self.last_xmit = time.time()
522                             self.last_ping = time.time()
523                         except IRCServerConnectionError:
524                             self.status = "expired"
525                             break
526                 elif self.status == "handshaking":
527                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
528                         self.status = "expired"
529                         break
530                     else:
531                         # Don't buzz on the empty-queue test while we're
532                         # handshaking
533                         time.sleep(ANTI_BUZZ_DELAY)
534                 elif self.status == "unseen" \
535                          and time.time() > self.last_xmit + UNSEEN_TTL:
536                     # Nasty people could attempt a denial-of-service
537                     # attack by flooding us with requests with invalid
538                     # servernames. We guard against this by rapidly
539                     # expiring connections that have a nonempty queue but
540                     # have never had a successful open.
541                     self.status = "expired"
542                     break
543                 elif self.status == "ready":
544                     (channel, message, key) = self.queue.get()
545                     if channel not in self.channels_joined:
546                         self.connection.join(channel, key=key)
547                         LOG.info("joining %s on %s." % (
548                             channel, self.servername))
549                     # None is magic - it's a request to quit the server
550                     if message is None:
551                         self.connection.quit()
552                     # An empty message might be used as a keepalive or
553                     # to join a channel for logging, so suppress the
554                     # privmsg send unless there is actual traffic.
555                     elif message:
556                         for segment in message.split("\n"):
557                             # Truncate the message if it's too long,
558                             # but we're working with characters here,
559                             # not bytes, so we could be off.
560                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
561                             maxlength = 500 - len(channel)
562                             if len(segment) > maxlength:
563                                 segment = segment[:maxlength]
564                             try:
565                                 self.connection.privmsg(channel, segment)
566                             except ValueError as err:
567                                 LOG.warning((
568                                     "irclib rejected a message to %s on %s "
569                                     "because: %s") % (
570                                     channel, self.servername, str(err)))
571                                 LOG.debug(err.format_exc())
572                             time.sleep(ANTI_FLOOD_DELAY)
573                     self.last_xmit = self.channels_joined[channel] = time.time()
574                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
575                         self.servername, time.asctime()))
576                     self.queue.task_done()
577                 elif self.status == "expired":
578                     print "We're expired but still running! This is a bug."
579                     break
580         except Exception, e:
581             LOG.error("exception %s in thread for %s" % (
582                 e, self.servername))
583             # Maybe this should have its own status?
584             self.status = "expired"
585             LOG.debug(e.format_exc())
586         finally:
587             try:
588                 # Make sure we don't leave any zombies behind
589                 self.connection.close()
590             except:
591                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
592                 pass
593     def live(self):
594         "Should this connection not be scavenged?"
595         return self.status != "expired"
596     def joined_to(self, channel):
597         "Is this connection joined to the specified channel?"
598         return channel in self.channels_joined
599     def accepting(self, channel):
600         "Can this connection accept a join of this channel?"
601         if self.channel_limits:
602             match_count = 0
603             for already in self.channels_joined:
604                 # This obscure code is because the RFCs allow separate limits
605                 # by channel type (indicated by the first character of the name)
606                 # a feature that is almost never actually used.
607                 if already[0] == channel[0]:
608                     match_count += 1
609             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
610         else:
611             return len(self.channels_joined) < CHANNEL_MAX
612
613 class Target():
614     "Represent a transmission target."
615     def __init__(self, url):
616         self.url = url
617         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
618         url = url.replace("irc://", "http://")
619         parsed = urlparse.urlparse(url)
620         irchost, _, ircport = parsed.netloc.partition(':')
621         if not ircport:
622             ircport = 6667
623         self.servername = irchost
624         # IRC channel names are case-insensitive.  If we don't smash
625         # case here we may run into problems later. There was a bug
626         # observed on irc.rizon.net where an irkerd user specified #Channel,
627         # got kicked, and irkerd crashed because the server returned
628         # "#channel" in the notification that our kick handler saw.
629         self.channel = parsed.path.lstrip('/').lower()
630         # This deals with a tweak in recent versions of urlparse.
631         if parsed.fragment:
632             self.channel += "#" + parsed.fragment
633         isnick = self.channel.endswith(",isnick")
634         if isnick:
635             self.channel = self.channel[:-7]
636         if self.channel and not isnick and self.channel[0] not in "#&+":
637             self.channel = "#" + self.channel
638         # support both channel?secret and channel?key=secret
639         self.key = ""
640         if parsed.query:
641             self.key = re.sub("^key=", "", parsed.query)
642         self.port = int(ircport)
643
644     def __str__(self):
645         "Represent this instance as a string"
646         return self.servername or self.url or repr(self)
647
648     def validate(self):
649         "Raise InvalidRequest if the URL is missing a critical component"
650         if not self.servername:
651             raise InvalidRequest(
652                 'target URL missing a servername: %r' % self.url)
653         if not self.channel:
654             raise InvalidRequest(
655                 'target URL missing a channel: %r' % self.url)
656     def server(self):
657         "Return a hashable tuple representing the destination server."
658         return (self.servername, self.port)
659
660 class Dispatcher:
661     "Manage connections to a particular server-port combination."
662     def __init__(self, irkerd, servername, port):
663         self.irker = irkerd
664         self.servername = servername
665         self.port = port
666         self.connections = []
667     def dispatch(self, channel, message, key, quit_after=False):
668         "Dispatch messages for our server-port combination."
669         # First, check if there is room for another channel
670         # on any of our existing connections.
671         connections = [x for x in self.connections if x.live()]
672         eligibles = [x for x in connections if x.joined_to(channel)] \
673                     or [x for x in connections if x.accepting(channel)]
674         if eligibles:
675             eligibles[0].enqueue(channel, message, key, quit_after)
676             return
677         # All connections are full up. Look for one old enough to be
678         # scavenged.
679         ancients = []
680         for connection in connections:
681             for (chan, age) in connections.channels_joined.items():
682                 if age < time.time() - CHANNEL_TTL:
683                     ancients.append((connection, chan, age))
684         if ancients:
685             ancients.sort(key=lambda x: x[2]) 
686             (found_connection, drop_channel, _drop_age) = ancients[0]
687             found_connection.part(drop_channel, "scavenged by irkerd")
688             del found_connection.channels_joined[drop_channel]
689             #time.sleep(ANTI_FLOOD_DELAY)
690             found_connection.enqueue(channel, message, key, quit_after)
691             return
692         # Didn't find any channels with no recent activity
693         newconn = Connection(self.irker,
694                              self.servername,
695                              self.port)
696         self.connections.append(newconn)
697         newconn.enqueue(channel, message, key, quit_after)
698     def live(self):
699         "Does this server-port combination have any live connections?"
700         self.connections = [x for x in self.connections if x.live()]
701         return len(self.connections) > 0
702     def pending(self):
703         "Return all connections with pending traffic."
704         return [x for x in self.connections if not x.queue.empty()]
705     def last_xmit(self):
706         "Return the time of the most recent transmission."
707         return max(x.last_xmit for x in self.connections)
708
709 class Irker:
710     "Persistent IRC multiplexer."
711     def __init__(self)
712         self.irc = IRCClient()
713         self.irc.add_event_handler("ping", self._handle_ping)
714         self.irc.add_event_handler("welcome", self._handle_welcome)
715         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
716         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
717         self.irc.add_event_handler("nickcollision", self._handle_badnick)
718         self.irc.add_event_handler("unavailresource", self._handle_badnick)
719         self.irc.add_event_handler("featurelist", self._handle_features)
720         self.irc.add_event_handler("disconnect", self._handle_disconnect)
721         self.irc.add_event_handler("kick", self._handle_kick)
722         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
723         self.servers = {}
724     def thread_launch(self):
725         thread = threading.Thread(target=self.irc.spin)
726         thread.setDaemon(True)
727         self.irc._thread = thread
728         thread.start()
729     def _handle_ping(self, connection, _event):
730         "PING arrived, bump the last-received time for the connection."
731         if connection.context:
732             connection.context.handle_ping()
733     def _handle_welcome(self, connection, _event):
734         "Welcome arrived, nick accepted for this connection."
735         if connection.context:
736             connection.context.handle_welcome()
737     def _handle_badnick(self, connection, _event):
738         "Nick not accepted for this connection."
739         if connection.context:
740             connection.context.handle_badnick()
741     def _handle_features(self, connection, event):
742         "Determine if and how we can set deaf mode."
743         if connection.context:
744             cxt = connection.context
745             arguments = event.arguments
746             for lump in arguments:
747                 if lump.startswith("DEAF="):
748                     if not logfile:
749                         connection.mode(cxt.nickname(), "+"+lump[5:])
750                 elif lump.startswith("MAXCHANNELS="):
751                     m = int(lump[12:])
752                     for pref in "#&+":
753                         cxt.channel_limits[pref] = m
754                     LOG.info("%s maxchannels is %d" % (connection.server, m))
755                 elif lump.startswith("CHANLIMIT=#:"):
756                     limits = lump[10:].split(",")
757                     try:
758                         for token in limits:
759                             (prefixes, limit) = token.split(":")
760                             limit = int(limit)
761                             for c in prefixes:
762                                 cxt.channel_limits[c] = limit
763                         LOG.info("%s channel limit map is %s" % (
764                             connection.server, cxt.channel_limits))
765                     except ValueError:
766                         LOG.error("ill-formed CHANLIMIT property")
767     def _handle_disconnect(self, connection, _event):
768         "Server hung up the connection."
769         LOG.info("server %s disconnected" % connection.server)
770         connection.close()
771         if connection.context:
772             connection.context.handle_disconnect()
773     def _handle_kick(self, connection, event):
774         "Server hung up the connection."
775         target = event.target
776         LOG.info("irker has been kicked from %s on %s" % (
777             target, connection.server))
778         if connection.context:
779             connection.context.handle_kick(target)
780     def _handle_every_raw_message(self, _connection, event):
781         "Log all messages when in watcher mode."
782         if logfile:
783             with open(logfile, "a") as logfp:
784                 logfp.write("%03f|%s|%s\n" % \
785                              (time.time(), event.source, event.arguments[0]))
786     def pending(self):
787         "Do we have any pending message traffic?"
788         return [k for (k, v) in self.servers.items() if v.pending()]
789
790     def _parse_request(self, line):
791         "Request-parsing helper for the handle() method"
792         request = json.loads(line.strip())
793         if not isinstance(request, dict):
794             raise InvalidRequest(
795                 "request is not a JSON dictionary: %r" % request)
796         if "to" not in request or "privmsg" not in request:
797             raise InvalidRequest(
798                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
799         channels = request['to']
800         message = request['privmsg']
801         if not isinstance(channels, (list, basestring)):
802             raise InvalidRequest(
803                 "malformed request - unexpected channel type: %r" % channels)
804         if not isinstance(message, basestring):
805             raise InvalidRequest(
806                 "malformed request - unexpected message type: %r" % message)
807         if not isinstance(channels, list):
808             channels = [channels]
809         targets = []
810         for url in channels:
811             try:
812                 if not isinstance(url, basestring):
813                     raise InvalidRequest(
814                         "malformed request - URL has unexpected type: %r" %
815                         url)
816                 target = Target(url)
817                 target.validate()
818             except InvalidRequest, e:
819                 LOG.error(str(e))
820             else:
821                 targets.append(target)
822         return (targets, message)
823
824     def handle(self, line, quit_after=False):
825         "Perform a JSON relay request."
826         try:
827             targets, message = self._parse_request(line=line)
828             for target in targets:
829                 if target.server() not in self.servers:
830                     self.servers[target.server()] = Dispatcher(
831                         self, target.servername, target.port)
832                 self.servers[target.server()].dispatch(
833                     target.channel, message, target.key, quit_after=quit_after)
834                 # GC dispatchers with no active connections
835                 servernames = self.servers.keys()
836                 for servername in servernames:
837                     if not self.servers[servername].live():
838                         del self.servers[servername]
839                     # If we might be pushing a resource limit even
840                     # after garbage collection, remove a session.  The
841                     # goal here is to head off DoS attacks that aim at
842                     # exhausting thread space or file descriptors.
843                     # The cost is that attempts to DoS this service
844                     # will cause lots of join/leave spam as we
845                     # scavenge old channels after connecting to new
846                     # ones. The particular method used for selecting a
847                     # session to be terminated doesn't matter much; we
848                     # choose the one longest idle on the assumption
849                     # that message activity is likely to be clumpy.
850                     if len(self.servers) >= CONNECTION_MAX:
851                         oldest = min(
852                             self.servers.keys(),
853                             key=lambda name: self.servers[name].last_xmit())
854                         del self.servers[oldest]
855         except InvalidRequest, e:
856             LOG.error(str(e))
857         except ValueError:
858             self.logerr("can't recognize JSON on input: %r" % line)
859         except RuntimeError:
860             self.logerr("wildly malformed JSON blew the parser stack.")
861
862 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
863     def handle(self):
864         while True:
865             line = self.rfile.readline()
866             if not line:
867                 break
868             irker.handle(line.strip())
869
870 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
871     def handle(self):
872         data = self.request[0].strip()
873         #socket = self.request[1]
874         irker.handle(data)
875
876 def usage():
877     sys.stdout.write("""
878 Usage:
879   irkerd [-d debuglevel] [-l logfile] [-n nick] [-p password] [-i channel message] [-V] [-h]
880
881 Options
882   -d    set debug level
883   -l    set logfile
884   -n    set nick-style
885   -p    set nickserv password
886   -i    immediate mode
887   -V    return irkerd version
888   -h    print this help dialog
889 """)
890
891 if __name__ == '__main__':
892     log_level = None
893     immediate = None
894     namestyle = "irker%03d"
895     password = None
896     logfile = None
897     try:
898         (options, arguments) = getopt.getopt(sys.argv[1:], "d:i:l:n:p:Vh")
899     except getopt.GetoptError as e:
900         sys.stderr.write("%s" % e)
901         usage()
902         sys.exit(1)
903     for (opt, val) in options:
904         if opt == '-d': # Enable debug/progress messages
905             if val.lower() not in LOG_LEVELS:
906                 sys.stderr.write('invalid log level %r (choices: %s)\n' % (
907                     val, ', '.join(LOG_LEVELS)))
908                 sys.exit(1)
909             log_level = getattr(logging, val.upper())
910         elif opt == '-i':       # Immediate mode - send one message, then exit. 
911             immediate = val
912         elif opt == '-l':       # Logfile mode - report traffic read in
913             logfile = val
914         elif opt == '-n':       # Force the nick
915             namestyle = val
916         elif opt == '-p':       # Set a nickserv password
917             password = val
918         elif opt == '-V':       # Emit version and exit
919             sys.stdout.write("irkerd version %s\n" % version)
920             sys.exit(0)
921         elif opt == '-h':
922             usage()
923             sys.exit(0)
924
925     handler = logging.StreamHandler()
926     LOG.addHandler(handler)
927     if log_level:
928         LOG.setLevel(log_level)
929
930     fallback = re.search("%.*d", namestyle)
931     irker = Irker()
932     LOG.info("irkerd version %s" % version)
933     if immediate:
934         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
935         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
936         irker.irc.spin()
937     else:
938         irker.thread_launch()
939         try:
940             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
941             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
942             for server in [tcpserver, udpserver]:
943                 server = threading.Thread(target=server.serve_forever)
944                 server.setDaemon(True)
945                 server.start()
946             try:
947                 signal.pause()
948             except KeyboardInterrupt:
949                 raise SystemExit(1)
950         except socket.error, e:
951             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
952
953 # end