824dae6fdc70ace1b659b50e6658d54db82bfcb9
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). -l sets a logfile to capture message traffic from
17 channels.  -n sets the nick and -p the nickserv password. The -V
18 option prints the program version and exits.
19
20 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
21 resource page at <http://www.catb.org/~esr/irker/>.
22
23 Requires Python 2.6 or 2.5 with the simplejson library installed.
24 """
25
26 from __future__ import with_statement
27
28 # These things might need tuning
29
30 HOST = "localhost"
31 PORT = 6659
32
33 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
34 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
35 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
36 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
37 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
38 UNSEEN_TTL = 60                 # Time to live, seconds since first request
39 CHANNEL_MAX = 18                # Max channels open per socket (default)
40 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
41 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
42 CONNECTION_MAX = 200            # To avoid hitting a thread limit
43
44 # No user-serviceable parts below this line
45
46 version = "2.6"
47
48 import Queue
49 import SocketServer
50 import getopt
51 import logging
52 try:
53     import simplejson as json   # Faster, also makes us Python-2.4-compatible
54 except ImportError:
55     import json
56 import random
57 import re
58 import select
59 import signal
60 import socket
61 import sys
62 import threading
63 import time
64 import urlparse
65
66
67 LOG = logging.getLogger(__name__)
68 LOG.setLevel(logging.ERROR)
69 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
70
71
72 # Sketch of implementation:
73 #
74 # One Irker object manages multiple IRC sessions.  It holds a map of
75 # Dispatcher objects, one per (server, port) combination, which are
76 # responsible for routing messages to one of any number of Connection
77 # objects that do the actual socket conversations.  The reason for the
78 # Dispatcher layer is that IRC daemons limit the number of channels a
79 # client (that is, from the daemon's point of view, a socket) can be
80 # joined to, so each session to a server needs a flock of Connection
81 # instances each with its own socket.
82 #
83 # Connections are timed out and removed when either they haven't seen a
84 # PING for a while (indicating that the server may be stalled or down)
85 # or there has been no message traffic to them for a while, or
86 # even if the queue is nonempty but efforts to connect have failed for
87 # a long time.
88 #
89 # There are multiple threads. One accepts incoming traffic from all
90 # servers.  Each Connection also has a consumer thread and a
91 # thread-safe message queue.  The program main appends messages to
92 # queues as JSON requests are received; the consumer threads try to
93 # ship them to servers.  When a socket write stalls, it only blocks an
94 # individual consumer thread; if it stalls long enough, the session
95 # will be timed out. This solves the biggest problem with a
96 # single-threaded implementation, which is that you can't count on a
97 # single stalled write not hanging all other traffic - you're at the
98 # mercy of the length of the buffers in the TCP/IP layer.
99 #
100 # Message delivery is thus not reliable in the face of network stalls,
101 # but this was considered acceptable because IRC (notoriously) has the
102 # same problem - there is little point in reliable delivery to a relay
103 # that is down or unreliable.
104 #
105 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
106 # It is strictly compliant to RFC1459, except for the interpretation and
107 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
108 #
109 # CHANLIMIT is as described in the Internet RFC draft
110 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
111 # The ",isnick" feature is as described in
112 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
113
114 # Historical note: the IRCClient and IRCServerConnection classes
115 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
116 # irclib code that irker formerly used as a service library.  They
117 # still look similar to parts of irclib because I contributed to that
118 # code before giving up on it.
119
120 class IRCError(Exception):
121     "An IRC exception"
122     pass
123
124
125 class InvalidRequest (ValueError):
126     "An invalid JSON request"
127     pass
128
129
130 class IRCClient():
131     "An IRC client session to one or more servers."
132     def __init__(self):
133         self.mutex = threading.RLock()
134         self.server_connections = []
135         self.event_handlers = {}
136         self.add_event_handler("ping",
137                                lambda c, e: c.ship("PONG %s" % e.target))
138
139     def newserver(self):
140         "Initialize a new server-connection object."
141         conn = IRCServerConnection(self)
142         with self.mutex:
143             self.server_connections.append(conn)
144         return conn
145
146     def spin(self, timeout=0.2):
147         "Spin processing data from connections forever."
148         # Outer loop should specifically *not* be mutex-locked.
149         # Otherwise no other thread would ever be able to change
150         # the shared state of an IRC object running this function.
151         while True:
152             nextsleep = 0
153             with self.mutex:
154                 connected = [x for x in self.server_connections
155                              if x is not None and x.socket is not None]
156                 sockets = [x.socket for x in connected]
157                 if sockets:
158                     connmap = dict([(c.socket.fileno(), c) for c in connected])
159                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
160                     for s in insocks:
161                         connmap[s.fileno()].consume()
162                 else:
163                     nextsleep = timeout
164             time.sleep(nextsleep)
165
166     def add_event_handler(self, event, handler):
167         "Set a handler to be called later."
168         with self.mutex:
169             event_handlers = self.event_handlers.setdefault(event, [])
170             event_handlers.append(handler)
171
172     def handle_event(self, connection, event):
173         with self.mutex:
174             h = self.event_handlers
175             th = sorted(h.get("all_events", []) + h.get(event.type, []))
176             for handler in th:
177                 handler(connection, event)
178
179     def drop_connection(self, connection):
180         with self.mutex:
181             self.server_connections.remove(connection)
182
183
184 class LineBufferedStream():
185     "Line-buffer a read stream."
186     crlf_re = re.compile(b'\r?\n')
187
188     def __init__(self):
189         self.buffer = ''
190
191     def append(self, newbytes):
192         self.buffer += newbytes
193
194     def lines(self):
195         "Iterate over lines in the buffer."
196         lines = LineBufferedStream.crlf_re.split(self.buffer)
197         self.buffer = lines.pop()
198         return iter(lines)
199
200     def __iter__(self):
201         return self.lines()
202
203 class IRCServerConnectionError(IRCError):
204     pass
205
206 class IRCServerConnection():
207     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
208     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
209     # We only need to ensure that if some ancient server throws numerics
210     # for the ones we actually want to catch, they're mapped.
211     codemap = {
212         "001": "welcome",
213         "005": "featurelist",
214         "432": "erroneusnickname",
215         "433": "nicknameinuse",
216         "436": "nickcollision",
217         "437": "unavailresource",
218     }
219
220     def __init__(self, master):
221         self.master = master
222         self.socket = None
223
224     def connect(self, target, nickname,
225                 password=None, username=None, ircname=None):
226         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
227             target.servername, target.port, nickname))
228         if self.socket is not None:
229             self.disconnect("Changing servers")
230
231         self.buffer = LineBufferedStream()
232         self.event_handlers = {}
233         self.real_server_name = ""
234         self.target = target
235         self.nickname = nickname
236         try:
237             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238             self.socket.bind(('', 0))
239             self.socket.connect((target.servername, target.port))
240         except socket.error as err:
241             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
242
243         if password:
244             self.ship("PASS " + password)
245         self.nick(self.nickname)
246         self.user(username=username or ircname, realname=ircname or nickname)
247         return self
248
249     def close(self):
250         # Without this thread lock, there is a window during which
251         # select() can find a closed socket, leading to an EBADF error.
252         with self.master.mutex:
253             self.disconnect("Closing object")
254             self.master.drop_connection(self)
255
256     def consume(self):
257         try:
258             incoming = self.socket.recv(16384)
259         except socket.error:
260             # Server hung up on us.
261             self.disconnect("Connection reset by peer")
262             return
263         if not incoming:
264             # Dead air also indicates a connection reset.
265             self.disconnect("Connection reset by peer")
266             return
267
268         self.buffer.append(incoming)
269
270         for line in self.buffer:
271             LOG.debug("FROM: %s" % line)
272
273             if not line:
274                 continue
275
276             prefix = None
277             command = None
278             arguments = None
279             self.handle_event(Event("every_raw_message",
280                                      self.real_server_name,
281                                      None,
282                                      [line]))
283
284             m = IRCServerConnection.command_re.match(line)
285             if m.group("prefix"):
286                 prefix = m.group("prefix")
287                 if not self.real_server_name:
288                     self.real_server_name = prefix
289             if m.group("command"):
290                 command = m.group("command").lower()
291             if m.group("argument"):
292                 a = m.group("argument").split(" :", 1)
293                 arguments = a[0].split()
294                 if len(a) == 2:
295                     arguments.append(a[1])
296
297             command = IRCServerConnection.codemap.get(command, command)
298             if command in ["privmsg", "notice"]:
299                 target = arguments.pop(0)
300             else:
301                 target = None
302
303                 if command == "quit":
304                     arguments = [arguments[0]]
305                 elif command == "ping":
306                     target = arguments[0]
307                 else:
308                     target = arguments[0]
309                     arguments = arguments[1:]
310
311             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
312                 command, prefix, target, arguments))
313             self.handle_event(Event(command, prefix, target, arguments))
314
315     def handle_event(self, event):
316         self.master.handle_event(self, event)
317         if event.type in self.event_handlers:
318             for fn in self.event_handlers[event.type]:
319                 fn(self, event)
320
321     def is_connected(self):
322         return self.socket is not None
323
324     def disconnect(self, message=""):
325         if self.socket is None:
326             return
327         # Don't send a QUIT here - causes infinite loop!
328         try:
329             self.socket.shutdown(socket.SHUT_WR)
330             self.socket.close()
331         except socket.error:
332             pass
333         del self.socket
334         self.socket = None
335         self.handle_event(
336             Event("disconnect", self.target.server, "", [message]))
337
338     def join(self, channel, key=""):
339         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
340
341     def mode(self, target, command):
342         self.ship("MODE %s %s" % (target, command))
343
344     def nick(self, newnick):
345         self.ship("NICK " + newnick)
346
347     def part(self, channel, message=""):
348         cmd_parts = ['PART', channel]
349         if message:
350             cmd_parts.append(message)
351         self.ship(' '.join(cmd_parts))
352
353     def privmsg(self, target, text):
354         self.ship("PRIVMSG %s :%s" % (target, text))
355
356     def quit(self, message=""):
357         self.ship("QUIT" + (message and (" :" + message)))
358
359     def user(self, username, realname):
360         self.ship("USER %s 0 * :%s" % (username, realname))
361
362     def ship(self, string):
363         "Ship a command to the server, appending CR/LF"
364         try:
365             self.socket.send(string.encode('utf-8') + b'\r\n')
366             LOG.debug("TO: %s" % string)
367         except socket.error:
368             self.disconnect("Connection reset by peer.")
369
370 class Event(object):
371     def __init__(self, evtype, source, target, arguments=None):
372         self.type = evtype
373         self.source = source
374         self.target = target
375         if arguments is None:
376             arguments = []
377         self.arguments = arguments
378
379 def is_channel(string):
380     return string and string[0] in "#&+!"
381
382 class Connection:
383     def __init__(self, irker, target, **kwargs):
384         self.irker = irker
385         self.target = target
386         self.kwargs = kwargs
387         self.nick_trial = None
388         self.connection = None
389         self.status = None
390         self.last_xmit = time.time()
391         self.last_ping = time.time()
392         self.channels_joined = {}
393         self.channel_limits = {}
394         # The consumer thread
395         self.queue = Queue.Queue()
396         self.thread = None
397     def nickname(self, n=None):
398         "Return a name for the nth server connection."
399         if n is None:
400             n = self.nick_trial
401         if fallback:
402             return (namestyle % n)
403         else:
404             return namestyle
405     def handle_ping(self):
406         "Register the fact that the server has pinged this connection."
407         self.last_ping = time.time()
408     def handle_welcome(self):
409         "The server says we're OK, with a non-conflicting nick."
410         self.status = "ready"
411         LOG.info("nick %s accepted" % self.nickname())
412         if password:
413             self.connection.privmsg("nickserv", "identify %s" % password)
414     def handle_badnick(self):
415         "The server says our nick is ill-formed or has a conflict."
416         LOG.info("nick %s rejected" % self.nickname())
417         if fallback:
418             # Randomness prevents a malicious user or bot from
419             # anticipating the next trial name in order to block us
420             # from completing the handshake.
421             self.nick_trial += random.randint(1, 3)
422             self.last_xmit = time.time()
423             self.connection.nick(self.nickname())
424         # Otherwise fall through, it might be possible to
425         # recover manually.
426     def handle_disconnect(self):
427         "Server disconnected us for flooding or some other reason."
428         self.connection = None
429         if self.status != "expired":
430             self.status = "disconnected"
431     def handle_kick(self, outof):
432         "We've been kicked."
433         self.status = "handshaking"
434         try:
435             del self.channels_joined[outof]
436         except KeyError:
437             LOG.error("kicked by %s from %s that's not joined" % (
438                 self.target, outof))
439         qcopy = []
440         while not self.queue.empty():
441             (channel, message, key) = self.queue.get()
442             if channel != outof:
443                 qcopy.append((channel, message, key))
444         for (channel, message, key) in qcopy:
445             self.queue.put((channel, message, key))
446         self.status = "ready"
447     def enqueue(self, channel, message, key, quit_after=False):
448         "Enque a message for transmission."
449         if self.thread is None or not self.thread.is_alive():
450             self.status = "unseen"
451             self.thread = threading.Thread(target=self.dequeue)
452             self.thread.setDaemon(True)
453             self.thread.start()
454         self.queue.put((channel, message, key))
455         if quit_after:
456             self.queue.put((channel, None, key))
457     def dequeue(self):
458         "Try to ship pending messages from the queue."
459         try:
460             while True:
461                 # We want to be kind to the IRC servers and not hold unused
462                 # sockets open forever, so they have a time-to-live.  The
463                 # loop is coded this particular way so that we can drop
464                 # the actual server connection when its time-to-live
465                 # expires, then reconnect and resume transmission if the
466                 # queue fills up again.
467                 if self.queue.empty():
468                     # Queue is empty, at some point we want to time out
469                     # the connection rather than holding a socket open in
470                     # the server forever.
471                     now = time.time()
472                     xmit_timeout = now > self.last_xmit + XMIT_TTL
473                     ping_timeout = now > self.last_ping + PING_TTL
474                     if self.status == "disconnected":
475                         # If the queue is empty, we can drop this connection.
476                         self.status = "expired"
477                         break
478                     elif xmit_timeout or ping_timeout:
479                         LOG.info((
480                             "timing out connection to %s at %s "
481                             "(ping_timeout=%s, xmit_timeout=%s)") % (
482                             self.target, time.asctime(), ping_timeout,
483                             xmit_timeout))
484                         with self.irker.irc.mutex:
485                             self.connection.context = None
486                             self.connection.quit("transmission timeout")
487                             self.connection = None
488                         self.status = "disconnected"
489                     else:
490                         # Prevent this thread from hogging the CPU by pausing
491                         # for just a little bit after the queue-empty check.
492                         # As long as this is less that the duration of a human
493                         # reflex arc it is highly unlikely any human will ever
494                         # notice.
495                         time.sleep(ANTI_BUZZ_DELAY)
496                 elif self.status == "disconnected" \
497                          and time.time() > self.last_xmit + DISCONNECT_TTL:
498                     # Queue is nonempty, but the IRC server might be
499                     # down. Letting failed connections retain queue
500                     # space forever would be a memory leak.
501                     self.status = "expired"
502                     break
503                 elif not self.connection and self.status != "expired":
504                     # Queue is nonempty but server isn't connected.
505                     with self.irker.irc.mutex:
506                         self.connection = self.irker.irc.newserver()
507                         self.connection.context = self
508                         # Try to avoid colliding with other instances
509                         self.nick_trial = random.randint(1, 990)
510                         self.channels_joined = {}
511                         try:
512                             # This will throw
513                             # IRCServerConnectionError on failure
514                             self.connection.connect(
515                                 target=self.target,
516                                 nickname=self.nickname(),
517                                 username="irker",
518                                 ircname="irker relaying client",
519                                 **self.kwargs)
520                             self.status = "handshaking"
521                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
522                                 self.target, time.asctime()))
523                             self.last_xmit = time.time()
524                             self.last_ping = time.time()
525                         except IRCServerConnectionError:
526                             self.status = "expired"
527                             break
528                 elif self.status == "handshaking":
529                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
530                         self.status = "expired"
531                         break
532                     else:
533                         # Don't buzz on the empty-queue test while we're
534                         # handshaking
535                         time.sleep(ANTI_BUZZ_DELAY)
536                 elif self.status == "unseen" \
537                          and time.time() > self.last_xmit + UNSEEN_TTL:
538                     # Nasty people could attempt a denial-of-service
539                     # attack by flooding us with requests with invalid
540                     # servernames. We guard against this by rapidly
541                     # expiring connections that have a nonempty queue but
542                     # have never had a successful open.
543                     self.status = "expired"
544                     break
545                 elif self.status == "ready":
546                     (channel, message, key) = self.queue.get()
547                     if channel not in self.channels_joined:
548                         self.connection.join(channel, key=key)
549                         LOG.info("joining %s on %s." % (channel, self.target))
550                     # None is magic - it's a request to quit the server
551                     if message is None:
552                         self.connection.quit()
553                     # An empty message might be used as a keepalive or
554                     # to join a channel for logging, so suppress the
555                     # privmsg send unless there is actual traffic.
556                     elif message:
557                         for segment in message.split("\n"):
558                             # Truncate the message if it's too long,
559                             # but we're working with characters here,
560                             # not bytes, so we could be off.
561                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
562                             maxlength = 500 - len(channel)
563                             if len(segment) > maxlength:
564                                 segment = segment[:maxlength]
565                             try:
566                                 self.connection.privmsg(channel, segment)
567                             except ValueError as err:
568                                 LOG.warning((
569                                     "irclib rejected a message to %s on %s "
570                                     "because: %s") % (
571                                     channel, self.target, str(err)))
572                                 LOG.debug(err.format_exc())
573                             time.sleep(ANTI_FLOOD_DELAY)
574                     self.last_xmit = self.channels_joined[channel] = time.time()
575                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
576                         self.target, time.asctime()))
577                     self.queue.task_done()
578                 elif self.status == "expired":
579                     print "We're expired but still running! This is a bug."
580                     break
581         except Exception, e:
582             LOG.error("exception %s in thread for %s" % (e, self.target))
583             # Maybe this should have its own status?
584             self.status = "expired"
585             LOG.debug(e.format_exc())
586         finally:
587             try:
588                 # Make sure we don't leave any zombies behind
589                 self.connection.close()
590             except:
591                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
592                 pass
593     def live(self):
594         "Should this connection not be scavenged?"
595         return self.status != "expired"
596     def joined_to(self, channel):
597         "Is this connection joined to the specified channel?"
598         return channel in self.channels_joined
599     def accepting(self, channel):
600         "Can this connection accept a join of this channel?"
601         if self.channel_limits:
602             match_count = 0
603             for already in self.channels_joined:
604                 # This obscure code is because the RFCs allow separate limits
605                 # by channel type (indicated by the first character of the name)
606                 # a feature that is almost never actually used.
607                 if already[0] == channel[0]:
608                     match_count += 1
609             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
610         else:
611             return len(self.channels_joined) < CHANNEL_MAX
612
613 class Target():
614     "Represent a transmission target."
615     def __init__(self, url):
616         self.url = url
617         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
618         url = url.replace("irc://", "http://")
619         parsed = urlparse.urlparse(url)
620         irchost, _, ircport = parsed.netloc.partition(':')
621         if not ircport:
622             ircport = 6667
623         self.servername = irchost
624         # IRC channel names are case-insensitive.  If we don't smash
625         # case here we may run into problems later. There was a bug
626         # observed on irc.rizon.net where an irkerd user specified #Channel,
627         # got kicked, and irkerd crashed because the server returned
628         # "#channel" in the notification that our kick handler saw.
629         self.channel = parsed.path.lstrip('/').lower()
630         # This deals with a tweak in recent versions of urlparse.
631         if parsed.fragment:
632             self.channel += "#" + parsed.fragment
633         isnick = self.channel.endswith(",isnick")
634         if isnick:
635             self.channel = self.channel[:-7]
636         if self.channel and not isnick and self.channel[0] not in "#&+":
637             self.channel = "#" + self.channel
638         # support both channel?secret and channel?key=secret
639         self.key = ""
640         if parsed.query:
641             self.key = re.sub("^key=", "", parsed.query)
642         self.port = int(ircport)
643
644     def __str__(self):
645         "Represent this instance as a string"
646         return self.servername or self.url or repr(self)
647
648     def validate(self):
649         "Raise InvalidRequest if the URL is missing a critical component"
650         if not self.servername:
651             raise InvalidRequest(
652                 'target URL missing a servername: %r' % self.url)
653         if not self.channel:
654             raise InvalidRequest(
655                 'target URL missing a channel: %r' % self.url)
656     def server(self):
657         "Return a hashable tuple representing the destination server."
658         return (self.servername, self.port)
659
660 class Dispatcher:
661     "Manage connections to a particular server-port combination."
662     def __init__(self, irker, **kwargs):
663         self.irker = irker
664         self.kwargs = kwargs
665         self.connections = []
666     def dispatch(self, channel, message, key, quit_after=False):
667         "Dispatch messages for our server-port combination."
668         # First, check if there is room for another channel
669         # on any of our existing connections.
670         connections = [x for x in self.connections if x.live()]
671         eligibles = [x for x in connections if x.joined_to(channel)] \
672                     or [x for x in connections if x.accepting(channel)]
673         if eligibles:
674             eligibles[0].enqueue(channel, message, key, quit_after)
675             return
676         # All connections are full up. Look for one old enough to be
677         # scavenged.
678         ancients = []
679         for connection in connections:
680             for (chan, age) in connections.channels_joined.items():
681                 if age < time.time() - CHANNEL_TTL:
682                     ancients.append((connection, chan, age))
683         if ancients:
684             ancients.sort(key=lambda x: x[2]) 
685             (found_connection, drop_channel, _drop_age) = ancients[0]
686             found_connection.part(drop_channel, "scavenged by irkerd")
687             del found_connection.channels_joined[drop_channel]
688             #time.sleep(ANTI_FLOOD_DELAY)
689             found_connection.enqueue(channel, message, key, quit_after)
690             return
691         # All existing channels had recent activity
692         newconn = Connection(self.irker, **self.kwargs)
693         self.connections.append(newconn)
694         newconn.enqueue(channel, message, key, quit_after)
695     def live(self):
696         "Does this server-port combination have any live connections?"
697         self.connections = [x for x in self.connections if x.live()]
698         return len(self.connections) > 0
699     def pending(self):
700         "Return all connections with pending traffic."
701         return [x for x in self.connections if not x.queue.empty()]
702     def last_xmit(self):
703         "Return the time of the most recent transmission."
704         return max(x.last_xmit for x in self.connections)
705
706 class Irker:
707     "Persistent IRC multiplexer."
708     def __init__(self, **kwargs):
709         self.kwargs = kwargs
710         self.irc = IRCClient()
711         self.irc.add_event_handler("ping", self._handle_ping)
712         self.irc.add_event_handler("welcome", self._handle_welcome)
713         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
714         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
715         self.irc.add_event_handler("nickcollision", self._handle_badnick)
716         self.irc.add_event_handler("unavailresource", self._handle_badnick)
717         self.irc.add_event_handler("featurelist", self._handle_features)
718         self.irc.add_event_handler("disconnect", self._handle_disconnect)
719         self.irc.add_event_handler("kick", self._handle_kick)
720         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
721         self.servers = {}
722     def thread_launch(self):
723         thread = threading.Thread(target=self.irc.spin)
724         thread.setDaemon(True)
725         self.irc._thread = thread
726         thread.start()
727     def _handle_ping(self, connection, _event):
728         "PING arrived, bump the last-received time for the connection."
729         if connection.context:
730             connection.context.handle_ping()
731     def _handle_welcome(self, connection, _event):
732         "Welcome arrived, nick accepted for this connection."
733         if connection.context:
734             connection.context.handle_welcome()
735     def _handle_badnick(self, connection, _event):
736         "Nick not accepted for this connection."
737         if connection.context:
738             connection.context.handle_badnick()
739     def _handle_features(self, connection, event):
740         "Determine if and how we can set deaf mode."
741         if connection.context:
742             cxt = connection.context
743             arguments = event.arguments
744             for lump in arguments:
745                 if lump.startswith("DEAF="):
746                     if not logfile:
747                         connection.mode(cxt.nickname(), "+"+lump[5:])
748                 elif lump.startswith("MAXCHANNELS="):
749                     m = int(lump[12:])
750                     for pref in "#&+":
751                         cxt.channel_limits[pref] = m
752                     LOG.info("%s maxchannels is %d" % (connection.server, m))
753                 elif lump.startswith("CHANLIMIT=#:"):
754                     limits = lump[10:].split(",")
755                     try:
756                         for token in limits:
757                             (prefixes, limit) = token.split(":")
758                             limit = int(limit)
759                             for c in prefixes:
760                                 cxt.channel_limits[c] = limit
761                         LOG.info("%s channel limit map is %s" % (
762                             connection.target, cxt.channel_limits))
763                     except ValueError:
764                         LOG.error("ill-formed CHANLIMIT property")
765     def _handle_disconnect(self, connection, _event):
766         "Server hung up the connection."
767         LOG.info("server %s disconnected" % connection.target)
768         connection.close()
769         if connection.context:
770             connection.context.handle_disconnect()
771     def _handle_kick(self, connection, event):
772         "Server hung up the connection."
773         target = event.target
774         LOG.info("irker has been kicked from %s on %s" % (
775             target, connection.target))
776         if connection.context:
777             connection.context.handle_kick(target)
778     def _handle_every_raw_message(self, _connection, event):
779         "Log all messages when in watcher mode."
780         if logfile:
781             with open(logfile, "a") as logfp:
782                 logfp.write("%03f|%s|%s\n" % \
783                              (time.time(), event.source, event.arguments[0]))
784     def pending(self):
785         "Do we have any pending message traffic?"
786         return [k for (k, v) in self.servers.items() if v.pending()]
787
788     def _parse_request(self, line):
789         "Request-parsing helper for the handle() method"
790         request = json.loads(line.strip())
791         if not isinstance(request, dict):
792             raise InvalidRequest(
793                 "request is not a JSON dictionary: %r" % request)
794         if "to" not in request or "privmsg" not in request:
795             raise InvalidRequest(
796                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
797         channels = request['to']
798         message = request['privmsg']
799         if not isinstance(channels, (list, basestring)):
800             raise InvalidRequest(
801                 "malformed request - unexpected channel type: %r" % channels)
802         if not isinstance(message, basestring):
803             raise InvalidRequest(
804                 "malformed request - unexpected message type: %r" % message)
805         if not isinstance(channels, list):
806             channels = [channels]
807         targets = []
808         for url in channels:
809             try:
810                 if not isinstance(url, basestring):
811                     raise InvalidRequest(
812                         "malformed request - URL has unexpected type: %r" %
813                         url)
814                 target = Target(url)
815                 target.validate()
816             except InvalidRequest, e:
817                 LOG.error(str(e))
818             else:
819                 targets.append(target)
820         return (targets, message)
821
822     def handle(self, line, quit_after=False):
823         "Perform a JSON relay request."
824         try:
825             targets, message = self._parse_request(line=line)
826             for target in targets:
827                 if target.server() not in self.servers:
828                     self.servers[target.server()] = Dispatcher(
829                         self, target=target, **self.kwargs)
830                 self.servers[target.server()].dispatch(
831                     target.channel, message, target.key, quit_after=quit_after)
832                 # GC dispatchers with no active connections
833                 servernames = self.servers.keys()
834                 for servername in servernames:
835                     if not self.servers[servername].live():
836                         del self.servers[servername]
837                     # If we might be pushing a resource limit even
838                     # after garbage collection, remove a session.  The
839                     # goal here is to head off DoS attacks that aim at
840                     # exhausting thread space or file descriptors.
841                     # The cost is that attempts to DoS this service
842                     # will cause lots of join/leave spam as we
843                     # scavenge old channels after connecting to new
844                     # ones. The particular method used for selecting a
845                     # session to be terminated doesn't matter much; we
846                     # choose the one longest idle on the assumption
847                     # that message activity is likely to be clumpy.
848                     if len(self.servers) >= CONNECTION_MAX:
849                         oldest = min(
850                             self.servers.keys(),
851                             key=lambda name: self.servers[name].last_xmit())
852                         del self.servers[oldest]
853         except InvalidRequest, e:
854             LOG.error(str(e))
855         except ValueError:
856             self.logerr("can't recognize JSON on input: %r" % line)
857         except RuntimeError:
858             self.logerr("wildly malformed JSON blew the parser stack.")
859
860 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
861     def handle(self):
862         while True:
863             line = self.rfile.readline()
864             if not line:
865                 break
866             irker.handle(line.strip())
867
868 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
869     def handle(self):
870         data = self.request[0].strip()
871         #socket = self.request[1]
872         irker.handle(data)
873
874 def usage():
875     sys.stdout.write("""
876 Usage:
877   irkerd [-d debuglevel] [-l logfile] [-n nick] [-p password] [-i channel message] [-V] [-h]
878
879 Options
880   -d    set debug level
881   -l    set logfile
882   -n    set nick-style
883   -p    set nickserv password
884   -i    immediate mode
885   -V    return irkerd version
886   -h    print this help dialog
887 """)
888
889 if __name__ == '__main__':
890     log_level = None
891     immediate = None
892     namestyle = "irker%03d"
893     password = None
894     logfile = None
895     try:
896         (options, arguments) = getopt.getopt(sys.argv[1:], "d:i:l:n:p:Vh")
897     except getopt.GetoptError as e:
898         sys.stderr.write("%s" % e)
899         usage()
900         sys.exit(1)
901     for (opt, val) in options:
902         if opt == '-d': # Enable debug/progress messages
903             if val.lower() not in LOG_LEVELS:
904                 sys.stderr.write('invalid log level %r (choices: %s)\n' % (
905                     val, ', '.join(LOG_LEVELS)))
906                 sys.exit(1)
907             log_level = getattr(logging, val.upper())
908         elif opt == '-i':       # Immediate mode - send one message, then exit. 
909             immediate = val
910         elif opt == '-l':       # Logfile mode - report traffic read in
911             logfile = val
912         elif opt == '-n':       # Force the nick
913             namestyle = val
914         elif opt == '-p':       # Set a nickserv password
915             password = val
916         elif opt == '-V':       # Emit version and exit
917             sys.stdout.write("irkerd version %s\n" % version)
918             sys.exit(0)
919         elif opt == '-h':
920             usage()
921             sys.exit(0)
922
923     handler = logging.StreamHandler()
924     LOG.addHandler(handler)
925     if log_level:
926         LOG.setLevel(log_level)
927
928     fallback = re.search("%.*d", namestyle)
929     irker = Irker()
930     LOG.info("irkerd version %s" % version)
931     if immediate:
932         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
933         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
934         irker.irc.spin()
935     else:
936         irker.thread_launch()
937         try:
938             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
939             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
940             for server in [tcpserver, udpserver]:
941                 server = threading.Thread(target=server.serve_forever)
942                 server.setDaemon(True)
943                 server.start()
944             try:
945                 signal.pause()
946             except KeyboardInterrupt:
947                 raise SystemExit(1)
948         except socket.error, e:
949             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
950
951 # end