bb54f85bbc1a5710ea46fa4c185838a86be489af
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). -l sets a logfile to capture message traffic from
17 channels.  -n sets the nick and -p the nickserv password. The -V
18 option prints the program version and exits.
19
20 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
21 resource page at <http://www.catb.org/~esr/irker/>.
22
23 Requires Python 2.6 or 2.5 with the simplejson library installed.
24 """
25
26 from __future__ import with_statement
27
28 # These things might need tuning
29
30 HOST = "localhost"
31 PORT = 6659
32
33 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
34 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
35 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
36 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
37 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
38 UNSEEN_TTL = 60                 # Time to live, seconds since first request
39 CHANNEL_MAX = 18                # Max channels open per socket (default)
40 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
41 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
42 CONNECTION_MAX = 200            # To avoid hitting a thread limit
43
44 # No user-serviceable parts below this line
45
46 version = "2.6"
47
48 import Queue
49 import SocketServer
50 import getopt
51 import logging
52 try:
53     import simplejson as json   # Faster, also makes us Python-2.4-compatible
54 except ImportError:
55     import json
56 import random
57 import re
58 import select
59 import signal
60 import socket
61 import sys
62 import threading
63 import time
64 import urlparse
65
66
67 LOG = logging.getLogger(__name__)
68 LOG.setLevel(logging.ERROR)
69 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
70
71
72 # Sketch of implementation:
73 #
74 # One Irker object manages multiple IRC sessions.  It holds a map of
75 # Dispatcher objects, one per (server, port) combination, which are
76 # responsible for routing messages to one of any number of Connection
77 # objects that do the actual socket conversations.  The reason for the
78 # Dispatcher layer is that IRC daemons limit the number of channels a
79 # client (that is, from the daemon's point of view, a socket) can be
80 # joined to, so each session to a server needs a flock of Connection
81 # instances each with its own socket.
82 #
83 # Connections are timed out and removed when either they haven't seen a
84 # PING for a while (indicating that the server may be stalled or down)
85 # or there has been no message traffic to them for a while, or
86 # even if the queue is nonempty but efforts to connect have failed for
87 # a long time.
88 #
89 # There are multiple threads. One accepts incoming traffic from all
90 # servers.  Each Connection also has a consumer thread and a
91 # thread-safe message queue.  The program main appends messages to
92 # queues as JSON requests are received; the consumer threads try to
93 # ship them to servers.  When a socket write stalls, it only blocks an
94 # individual consumer thread; if it stalls long enough, the session
95 # will be timed out. This solves the biggest problem with a
96 # single-threaded implementation, which is that you can't count on a
97 # single stalled write not hanging all other traffic - you're at the
98 # mercy of the length of the buffers in the TCP/IP layer.
99 #
100 # Message delivery is thus not reliable in the face of network stalls,
101 # but this was considered acceptable because IRC (notoriously) has the
102 # same problem - there is little point in reliable delivery to a relay
103 # that is down or unreliable.
104 #
105 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
106 # It is strictly compliant to RFC1459, except for the interpretation and
107 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
108 #
109 # CHANLIMIT is as described in the Internet RFC draft
110 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
111 # The ",isnick" feature is as described in
112 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
113
114 # Historical note: the IRCClient and IRCServerConnection classes
115 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
116 # irclib code that irker formerly used as a service library.  They
117 # still look similar to parts of irclib because I contributed to that
118 # code before giving up on it.
119
120 class IRCError(Exception):
121     "An IRC exception"
122     pass
123
124
125 class InvalidRequest (ValueError):
126     "An invalid JSON request"
127     pass
128
129
130 class IRCClient():
131     "An IRC client session to one or more servers."
132     def __init__(self):
133         self.mutex = threading.RLock()
134         self.server_connections = []
135         self.event_handlers = {}
136         self.add_event_handler("ping",
137                                lambda c, e: c.ship("PONG %s" % e.target))
138
139     def newserver(self):
140         "Initialize a new server-connection object."
141         conn = IRCServerConnection(self)
142         with self.mutex:
143             self.server_connections.append(conn)
144         return conn
145
146     def spin(self, timeout=0.2):
147         "Spin processing data from connections forever."
148         # Outer loop should specifically *not* be mutex-locked.
149         # Otherwise no other thread would ever be able to change
150         # the shared state of an IRC object running this function.
151         while True:
152             nextsleep = 0
153             with self.mutex:
154                 connected = [x for x in self.server_connections
155                              if x is not None and x.socket is not None]
156                 sockets = [x.socket for x in connected]
157                 if sockets:
158                     connmap = dict([(c.socket.fileno(), c) for c in connected])
159                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
160                     for s in insocks:
161                         connmap[s.fileno()].consume()
162                 else:
163                     nextsleep = timeout
164             time.sleep(nextsleep)
165
166     def add_event_handler(self, event, handler):
167         "Set a handler to be called later."
168         with self.mutex:
169             event_handlers = self.event_handlers.setdefault(event, [])
170             event_handlers.append(handler)
171
172     def handle_event(self, connection, event):
173         with self.mutex:
174             h = self.event_handlers
175             th = sorted(h.get("all_events", []) + h.get(event.type, []))
176             for handler in th:
177                 handler(connection, event)
178
179     def drop_connection(self, connection):
180         with self.mutex:
181             self.server_connections.remove(connection)
182
183
184 class LineBufferedStream():
185     "Line-buffer a read stream."
186     crlf_re = re.compile(b'\r?\n')
187
188     def __init__(self):
189         self.buffer = ''
190
191     def append(self, newbytes):
192         self.buffer += newbytes
193
194     def lines(self):
195         "Iterate over lines in the buffer."
196         lines = LineBufferedStream.crlf_re.split(self.buffer)
197         self.buffer = lines.pop()
198         return iter(lines)
199
200     def __iter__(self):
201         return self.lines()
202
203 class IRCServerConnectionError(IRCError):
204     pass
205
206 class IRCServerConnection():
207     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
208     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
209     # We only need to ensure that if some ancient server throws numerics
210     # for the ones we actually want to catch, they're mapped.
211     codemap = {
212         "001": "welcome",
213         "005": "featurelist",
214         "432": "erroneusnickname",
215         "433": "nicknameinuse",
216         "436": "nickcollision",
217         "437": "unavailresource",
218     }
219
220     def __init__(self, master):
221         self.master = master
222         self.socket = None
223
224     def connect(self, target, nickname,
225                 password=None, username=None, ircname=None):
226         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
227             target.servername, target.port, nickname))
228         if self.socket is not None:
229             self.disconnect("Changing servers")
230
231         self.buffer = LineBufferedStream()
232         self.event_handlers = {}
233         self.real_server_name = ""
234         self.target = target
235         self.nickname = nickname
236         try:
237             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238             self.socket.bind(('', 0))
239             self.socket.connect((target.servername, target.port))
240         except socket.error as err:
241             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
242
243         if password:
244             self.ship("PASS " + password)
245         self.nick(self.nickname)
246         self.user(username=username or ircname, realname=ircname or nickname)
247         return self
248
249     def close(self):
250         # Without this thread lock, there is a window during which
251         # select() can find a closed socket, leading to an EBADF error.
252         with self.master.mutex:
253             self.disconnect("Closing object")
254             self.master.drop_connection(self)
255
256     def consume(self):
257         try:
258             incoming = self.socket.recv(16384)
259         except socket.error:
260             # Server hung up on us.
261             self.disconnect("Connection reset by peer")
262             return
263         if not incoming:
264             # Dead air also indicates a connection reset.
265             self.disconnect("Connection reset by peer")
266             return
267
268         self.buffer.append(incoming)
269
270         for line in self.buffer:
271             LOG.debug("FROM: %s" % line)
272
273             if not line:
274                 continue
275
276             prefix = None
277             command = None
278             arguments = None
279             self.handle_event(Event("every_raw_message",
280                                      self.real_server_name,
281                                      None,
282                                      [line]))
283
284             m = IRCServerConnection.command_re.match(line)
285             if m.group("prefix"):
286                 prefix = m.group("prefix")
287                 if not self.real_server_name:
288                     self.real_server_name = prefix
289             if m.group("command"):
290                 command = m.group("command").lower()
291             if m.group("argument"):
292                 a = m.group("argument").split(" :", 1)
293                 arguments = a[0].split()
294                 if len(a) == 2:
295                     arguments.append(a[1])
296
297             command = IRCServerConnection.codemap.get(command, command)
298             if command in ["privmsg", "notice"]:
299                 target = arguments.pop(0)
300             else:
301                 target = None
302
303                 if command == "quit":
304                     arguments = [arguments[0]]
305                 elif command == "ping":
306                     target = arguments[0]
307                 else:
308                     target = arguments[0]
309                     arguments = arguments[1:]
310
311             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
312                 command, prefix, target, arguments))
313             self.handle_event(Event(command, prefix, target, arguments))
314
315     def handle_event(self, event):
316         self.master.handle_event(self, event)
317         if event.type in self.event_handlers:
318             for fn in self.event_handlers[event.type]:
319                 fn(self, event)
320
321     def is_connected(self):
322         return self.socket is not None
323
324     def disconnect(self, message=""):
325         if self.socket is None:
326             return
327         # Don't send a QUIT here - causes infinite loop!
328         try:
329             self.socket.shutdown(socket.SHUT_WR)
330             self.socket.close()
331         except socket.error:
332             pass
333         del self.socket
334         self.socket = None
335         self.handle_event(
336             Event("disconnect", self.target.server, "", [message]))
337
338     def join(self, channel, key=""):
339         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
340
341     def mode(self, target, command):
342         self.ship("MODE %s %s" % (target, command))
343
344     def nick(self, newnick):
345         self.ship("NICK " + newnick)
346
347     def part(self, channel, message=""):
348         cmd_parts = ['PART', channel]
349         if message:
350             cmd_parts.append(message)
351         self.ship(' '.join(cmd_parts))
352
353     def privmsg(self, target, text):
354         self.ship("PRIVMSG %s :%s" % (target, text))
355
356     def quit(self, message=""):
357         self.ship("QUIT" + (message and (" :" + message)))
358
359     def user(self, username, realname):
360         self.ship("USER %s 0 * :%s" % (username, realname))
361
362     def ship(self, string):
363         "Ship a command to the server, appending CR/LF"
364         try:
365             self.socket.send(string.encode('utf-8') + b'\r\n')
366             LOG.debug("TO: %s" % string)
367         except socket.error:
368             self.disconnect("Connection reset by peer.")
369
370 class Event(object):
371     def __init__(self, evtype, source, target, arguments=None):
372         self.type = evtype
373         self.source = source
374         self.target = target
375         if arguments is None:
376             arguments = []
377         self.arguments = arguments
378
379 def is_channel(string):
380     return string and string[0] in "#&+!"
381
382 class Connection:
383     def __init__(self, irker, target, nick_template, nick_needs_number=False,
384                  **kwargs):
385         self.irker = irker
386         self.target = target
387         self.nick_template = nick_template
388         self.nick_needs_number = nick_needs_number
389         self.kwargs = kwargs
390         self.nick_trial = None
391         self.connection = None
392         self.status = None
393         self.last_xmit = time.time()
394         self.last_ping = time.time()
395         self.channels_joined = {}
396         self.channel_limits = {}
397         # The consumer thread
398         self.queue = Queue.Queue()
399         self.thread = None
400     def nickname(self, n=None):
401         "Return a name for the nth server connection."
402         if n is None:
403             n = self.nick_trial
404         if self.nick_needs_number:
405             return (self.nick_template % n)
406         else:
407             return self.nick_template
408     def handle_ping(self):
409         "Register the fact that the server has pinged this connection."
410         self.last_ping = time.time()
411     def handle_welcome(self):
412         "The server says we're OK, with a non-conflicting nick."
413         self.status = "ready"
414         LOG.info("nick %s accepted" % self.nickname())
415         if password:
416             self.connection.privmsg("nickserv", "identify %s" % password)
417     def handle_badnick(self):
418         "The server says our nick is ill-formed or has a conflict."
419         LOG.info("nick %s rejected" % self.nickname())
420         if self.nick_needs_number:
421             # Randomness prevents a malicious user or bot from
422             # anticipating the next trial name in order to block us
423             # from completing the handshake.
424             self.nick_trial += random.randint(1, 3)
425             self.last_xmit = time.time()
426             self.connection.nick(self.nickname())
427         # Otherwise fall through, it might be possible to
428         # recover manually.
429     def handle_disconnect(self):
430         "Server disconnected us for flooding or some other reason."
431         self.connection = None
432         if self.status != "expired":
433             self.status = "disconnected"
434     def handle_kick(self, outof):
435         "We've been kicked."
436         self.status = "handshaking"
437         try:
438             del self.channels_joined[outof]
439         except KeyError:
440             LOG.error("kicked by %s from %s that's not joined" % (
441                 self.target, outof))
442         qcopy = []
443         while not self.queue.empty():
444             (channel, message, key) = self.queue.get()
445             if channel != outof:
446                 qcopy.append((channel, message, key))
447         for (channel, message, key) in qcopy:
448             self.queue.put((channel, message, key))
449         self.status = "ready"
450     def enqueue(self, channel, message, key, quit_after=False):
451         "Enque a message for transmission."
452         if self.thread is None or not self.thread.is_alive():
453             self.status = "unseen"
454             self.thread = threading.Thread(target=self.dequeue)
455             self.thread.setDaemon(True)
456             self.thread.start()
457         self.queue.put((channel, message, key))
458         if quit_after:
459             self.queue.put((channel, None, key))
460     def dequeue(self):
461         "Try to ship pending messages from the queue."
462         try:
463             while True:
464                 # We want to be kind to the IRC servers and not hold unused
465                 # sockets open forever, so they have a time-to-live.  The
466                 # loop is coded this particular way so that we can drop
467                 # the actual server connection when its time-to-live
468                 # expires, then reconnect and resume transmission if the
469                 # queue fills up again.
470                 if self.queue.empty():
471                     # Queue is empty, at some point we want to time out
472                     # the connection rather than holding a socket open in
473                     # the server forever.
474                     now = time.time()
475                     xmit_timeout = now > self.last_xmit + XMIT_TTL
476                     ping_timeout = now > self.last_ping + PING_TTL
477                     if self.status == "disconnected":
478                         # If the queue is empty, we can drop this connection.
479                         self.status = "expired"
480                         break
481                     elif xmit_timeout or ping_timeout:
482                         LOG.info((
483                             "timing out connection to %s at %s "
484                             "(ping_timeout=%s, xmit_timeout=%s)") % (
485                             self.target, time.asctime(), ping_timeout,
486                             xmit_timeout))
487                         with self.irker.irc.mutex:
488                             self.connection.context = None
489                             self.connection.quit("transmission timeout")
490                             self.connection = None
491                         self.status = "disconnected"
492                     else:
493                         # Prevent this thread from hogging the CPU by pausing
494                         # for just a little bit after the queue-empty check.
495                         # As long as this is less that the duration of a human
496                         # reflex arc it is highly unlikely any human will ever
497                         # notice.
498                         time.sleep(ANTI_BUZZ_DELAY)
499                 elif self.status == "disconnected" \
500                          and time.time() > self.last_xmit + DISCONNECT_TTL:
501                     # Queue is nonempty, but the IRC server might be
502                     # down. Letting failed connections retain queue
503                     # space forever would be a memory leak.
504                     self.status = "expired"
505                     break
506                 elif not self.connection and self.status != "expired":
507                     # Queue is nonempty but server isn't connected.
508                     with self.irker.irc.mutex:
509                         self.connection = self.irker.irc.newserver()
510                         self.connection.context = self
511                         # Try to avoid colliding with other instances
512                         self.nick_trial = random.randint(1, 990)
513                         self.channels_joined = {}
514                         try:
515                             # This will throw
516                             # IRCServerConnectionError on failure
517                             self.connection.connect(
518                                 target=self.target,
519                                 nickname=self.nickname(),
520                                 username="irker",
521                                 ircname="irker relaying client",
522                                 **self.kwargs)
523                             self.status = "handshaking"
524                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
525                                 self.target, time.asctime()))
526                             self.last_xmit = time.time()
527                             self.last_ping = time.time()
528                         except IRCServerConnectionError:
529                             self.status = "expired"
530                             break
531                 elif self.status == "handshaking":
532                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
533                         self.status = "expired"
534                         break
535                     else:
536                         # Don't buzz on the empty-queue test while we're
537                         # handshaking
538                         time.sleep(ANTI_BUZZ_DELAY)
539                 elif self.status == "unseen" \
540                          and time.time() > self.last_xmit + UNSEEN_TTL:
541                     # Nasty people could attempt a denial-of-service
542                     # attack by flooding us with requests with invalid
543                     # servernames. We guard against this by rapidly
544                     # expiring connections that have a nonempty queue but
545                     # have never had a successful open.
546                     self.status = "expired"
547                     break
548                 elif self.status == "ready":
549                     (channel, message, key) = self.queue.get()
550                     if channel not in self.channels_joined:
551                         self.connection.join(channel, key=key)
552                         LOG.info("joining %s on %s." % (channel, self.target))
553                     # None is magic - it's a request to quit the server
554                     if message is None:
555                         self.connection.quit()
556                     # An empty message might be used as a keepalive or
557                     # to join a channel for logging, so suppress the
558                     # privmsg send unless there is actual traffic.
559                     elif message:
560                         for segment in message.split("\n"):
561                             # Truncate the message if it's too long,
562                             # but we're working with characters here,
563                             # not bytes, so we could be off.
564                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
565                             maxlength = 500 - len(channel)
566                             if len(segment) > maxlength:
567                                 segment = segment[:maxlength]
568                             try:
569                                 self.connection.privmsg(channel, segment)
570                             except ValueError as err:
571                                 LOG.warning((
572                                     "irclib rejected a message to %s on %s "
573                                     "because: %s") % (
574                                     channel, self.target, str(err)))
575                                 LOG.debug(err.format_exc())
576                             time.sleep(ANTI_FLOOD_DELAY)
577                     self.last_xmit = self.channels_joined[channel] = time.time()
578                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
579                         self.target, time.asctime()))
580                     self.queue.task_done()
581                 elif self.status == "expired":
582                     print "We're expired but still running! This is a bug."
583                     break
584         except Exception, e:
585             LOG.error("exception %s in thread for %s" % (e, self.target))
586             # Maybe this should have its own status?
587             self.status = "expired"
588             LOG.debug(e.format_exc())
589         finally:
590             try:
591                 # Make sure we don't leave any zombies behind
592                 self.connection.close()
593             except:
594                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
595                 pass
596     def live(self):
597         "Should this connection not be scavenged?"
598         return self.status != "expired"
599     def joined_to(self, channel):
600         "Is this connection joined to the specified channel?"
601         return channel in self.channels_joined
602     def accepting(self, channel):
603         "Can this connection accept a join of this channel?"
604         if self.channel_limits:
605             match_count = 0
606             for already in self.channels_joined:
607                 # This obscure code is because the RFCs allow separate limits
608                 # by channel type (indicated by the first character of the name)
609                 # a feature that is almost never actually used.
610                 if already[0] == channel[0]:
611                     match_count += 1
612             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
613         else:
614             return len(self.channels_joined) < CHANNEL_MAX
615
616 class Target():
617     "Represent a transmission target."
618     def __init__(self, url):
619         self.url = url
620         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
621         url = url.replace("irc://", "http://")
622         parsed = urlparse.urlparse(url)
623         irchost, _, ircport = parsed.netloc.partition(':')
624         if not ircport:
625             ircport = 6667
626         self.servername = irchost
627         # IRC channel names are case-insensitive.  If we don't smash
628         # case here we may run into problems later. There was a bug
629         # observed on irc.rizon.net where an irkerd user specified #Channel,
630         # got kicked, and irkerd crashed because the server returned
631         # "#channel" in the notification that our kick handler saw.
632         self.channel = parsed.path.lstrip('/').lower()
633         # This deals with a tweak in recent versions of urlparse.
634         if parsed.fragment:
635             self.channel += "#" + parsed.fragment
636         isnick = self.channel.endswith(",isnick")
637         if isnick:
638             self.channel = self.channel[:-7]
639         if self.channel and not isnick and self.channel[0] not in "#&+":
640             self.channel = "#" + self.channel
641         # support both channel?secret and channel?key=secret
642         self.key = ""
643         if parsed.query:
644             self.key = re.sub("^key=", "", parsed.query)
645         self.port = int(ircport)
646
647     def __str__(self):
648         "Represent this instance as a string"
649         return self.servername or self.url or repr(self)
650
651     def validate(self):
652         "Raise InvalidRequest if the URL is missing a critical component"
653         if not self.servername:
654             raise InvalidRequest(
655                 'target URL missing a servername: %r' % self.url)
656         if not self.channel:
657             raise InvalidRequest(
658                 'target URL missing a channel: %r' % self.url)
659     def server(self):
660         "Return a hashable tuple representing the destination server."
661         return (self.servername, self.port)
662
663 class Dispatcher:
664     "Manage connections to a particular server-port combination."
665     def __init__(self, irker, **kwargs):
666         self.irker = irker
667         self.kwargs = kwargs
668         self.connections = []
669     def dispatch(self, channel, message, key, quit_after=False):
670         "Dispatch messages for our server-port combination."
671         # First, check if there is room for another channel
672         # on any of our existing connections.
673         connections = [x for x in self.connections if x.live()]
674         eligibles = [x for x in connections if x.joined_to(channel)] \
675                     or [x for x in connections if x.accepting(channel)]
676         if eligibles:
677             eligibles[0].enqueue(channel, message, key, quit_after)
678             return
679         # All connections are full up. Look for one old enough to be
680         # scavenged.
681         ancients = []
682         for connection in connections:
683             for (chan, age) in connections.channels_joined.items():
684                 if age < time.time() - CHANNEL_TTL:
685                     ancients.append((connection, chan, age))
686         if ancients:
687             ancients.sort(key=lambda x: x[2]) 
688             (found_connection, drop_channel, _drop_age) = ancients[0]
689             found_connection.part(drop_channel, "scavenged by irkerd")
690             del found_connection.channels_joined[drop_channel]
691             #time.sleep(ANTI_FLOOD_DELAY)
692             found_connection.enqueue(channel, message, key, quit_after)
693             return
694         # All existing channels had recent activity
695         newconn = Connection(self.irker, **self.kwargs)
696         self.connections.append(newconn)
697         newconn.enqueue(channel, message, key, quit_after)
698     def live(self):
699         "Does this server-port combination have any live connections?"
700         self.connections = [x for x in self.connections if x.live()]
701         return len(self.connections) > 0
702     def pending(self):
703         "Return all connections with pending traffic."
704         return [x for x in self.connections if not x.queue.empty()]
705     def last_xmit(self):
706         "Return the time of the most recent transmission."
707         return max(x.last_xmit for x in self.connections)
708
709 class Irker:
710     "Persistent IRC multiplexer."
711     def __init__(self, logfile=None, **kwargs):
712         self.logfile = logfile
713         self.kwargs = kwargs
714         self.irc = IRCClient()
715         self.irc.add_event_handler("ping", self._handle_ping)
716         self.irc.add_event_handler("welcome", self._handle_welcome)
717         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
718         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
719         self.irc.add_event_handler("nickcollision", self._handle_badnick)
720         self.irc.add_event_handler("unavailresource", self._handle_badnick)
721         self.irc.add_event_handler("featurelist", self._handle_features)
722         self.irc.add_event_handler("disconnect", self._handle_disconnect)
723         self.irc.add_event_handler("kick", self._handle_kick)
724         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
725         self.servers = {}
726     def thread_launch(self):
727         thread = threading.Thread(target=self.irc.spin)
728         thread.setDaemon(True)
729         self.irc._thread = thread
730         thread.start()
731     def _handle_ping(self, connection, _event):
732         "PING arrived, bump the last-received time for the connection."
733         if connection.context:
734             connection.context.handle_ping()
735     def _handle_welcome(self, connection, _event):
736         "Welcome arrived, nick accepted for this connection."
737         if connection.context:
738             connection.context.handle_welcome()
739     def _handle_badnick(self, connection, _event):
740         "Nick not accepted for this connection."
741         if connection.context:
742             connection.context.handle_badnick()
743     def _handle_features(self, connection, event):
744         "Determine if and how we can set deaf mode."
745         if connection.context:
746             cxt = connection.context
747             arguments = event.arguments
748             for lump in arguments:
749                 if lump.startswith("DEAF="):
750                     if not self.logfile:
751                         connection.mode(cxt.nickname(), "+"+lump[5:])
752                 elif lump.startswith("MAXCHANNELS="):
753                     m = int(lump[12:])
754                     for pref in "#&+":
755                         cxt.channel_limits[pref] = m
756                     LOG.info("%s maxchannels is %d" % (connection.server, m))
757                 elif lump.startswith("CHANLIMIT=#:"):
758                     limits = lump[10:].split(",")
759                     try:
760                         for token in limits:
761                             (prefixes, limit) = token.split(":")
762                             limit = int(limit)
763                             for c in prefixes:
764                                 cxt.channel_limits[c] = limit
765                         LOG.info("%s channel limit map is %s" % (
766                             connection.target, cxt.channel_limits))
767                     except ValueError:
768                         LOG.error("ill-formed CHANLIMIT property")
769     def _handle_disconnect(self, connection, _event):
770         "Server hung up the connection."
771         LOG.info("server %s disconnected" % connection.target)
772         connection.close()
773         if connection.context:
774             connection.context.handle_disconnect()
775     def _handle_kick(self, connection, event):
776         "Server hung up the connection."
777         target = event.target
778         LOG.info("irker has been kicked from %s on %s" % (
779             target, connection.target))
780         if connection.context:
781             connection.context.handle_kick(target)
782     def _handle_every_raw_message(self, _connection, event):
783         "Log all messages when in watcher mode."
784         if self.logfile:
785             with open(self.logfile, "a") as logfp:
786                 logfp.write("%03f|%s|%s\n" % \
787                              (time.time(), event.source, event.arguments[0]))
788     def pending(self):
789         "Do we have any pending message traffic?"
790         return [k for (k, v) in self.servers.items() if v.pending()]
791
792     def _parse_request(self, line):
793         "Request-parsing helper for the handle() method"
794         request = json.loads(line.strip())
795         if not isinstance(request, dict):
796             raise InvalidRequest(
797                 "request is not a JSON dictionary: %r" % request)
798         if "to" not in request or "privmsg" not in request:
799             raise InvalidRequest(
800                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
801         channels = request['to']
802         message = request['privmsg']
803         if not isinstance(channels, (list, basestring)):
804             raise InvalidRequest(
805                 "malformed request - unexpected channel type: %r" % channels)
806         if not isinstance(message, basestring):
807             raise InvalidRequest(
808                 "malformed request - unexpected message type: %r" % message)
809         if not isinstance(channels, list):
810             channels = [channels]
811         targets = []
812         for url in channels:
813             try:
814                 if not isinstance(url, basestring):
815                     raise InvalidRequest(
816                         "malformed request - URL has unexpected type: %r" %
817                         url)
818                 target = Target(url)
819                 target.validate()
820             except InvalidRequest, e:
821                 LOG.error(str(e))
822             else:
823                 targets.append(target)
824         return (targets, message)
825
826     def handle(self, line, quit_after=False):
827         "Perform a JSON relay request."
828         try:
829             targets, message = self._parse_request(line=line)
830             for target in targets:
831                 if target.server() not in self.servers:
832                     self.servers[target.server()] = Dispatcher(
833                         self, target=target, **self.kwargs)
834                 self.servers[target.server()].dispatch(
835                     target.channel, message, target.key, quit_after=quit_after)
836                 # GC dispatchers with no active connections
837                 servernames = self.servers.keys()
838                 for servername in servernames:
839                     if not self.servers[servername].live():
840                         del self.servers[servername]
841                     # If we might be pushing a resource limit even
842                     # after garbage collection, remove a session.  The
843                     # goal here is to head off DoS attacks that aim at
844                     # exhausting thread space or file descriptors.
845                     # The cost is that attempts to DoS this service
846                     # will cause lots of join/leave spam as we
847                     # scavenge old channels after connecting to new
848                     # ones. The particular method used for selecting a
849                     # session to be terminated doesn't matter much; we
850                     # choose the one longest idle on the assumption
851                     # that message activity is likely to be clumpy.
852                     if len(self.servers) >= CONNECTION_MAX:
853                         oldest = min(
854                             self.servers.keys(),
855                             key=lambda name: self.servers[name].last_xmit())
856                         del self.servers[oldest]
857         except InvalidRequest, e:
858             LOG.error(str(e))
859         except ValueError:
860             self.logerr("can't recognize JSON on input: %r" % line)
861         except RuntimeError:
862             self.logerr("wildly malformed JSON blew the parser stack.")
863
864 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
865     def handle(self):
866         while True:
867             line = self.rfile.readline()
868             if not line:
869                 break
870             irker.handle(line.strip())
871
872 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
873     def handle(self):
874         data = self.request[0].strip()
875         #socket = self.request[1]
876         irker.handle(data)
877
878 def usage():
879     sys.stdout.write("""
880 Usage:
881   irkerd [-d debuglevel] [-l logfile] [-n nick] [-p password] [-i channel message] [-V] [-h]
882
883 Options
884   -d    set debug level
885   -l    set logfile
886   -n    set nick-style
887   -p    set nickserv password
888   -i    immediate mode
889   -V    return irkerd version
890   -h    print this help dialog
891 """)
892
893 if __name__ == '__main__':
894     log_level = None
895     immediate = None
896     nick_template = "irker%03d"
897     password = None
898     logfile = None
899     try:
900         (options, arguments) = getopt.getopt(sys.argv[1:], "d:i:l:n:p:Vh")
901     except getopt.GetoptError as e:
902         sys.stderr.write("%s" % e)
903         usage()
904         sys.exit(1)
905     for (opt, val) in options:
906         if opt == '-d': # Enable debug/progress messages
907             if val.lower() not in LOG_LEVELS:
908                 sys.stderr.write('invalid log level %r (choices: %s)\n' % (
909                     val, ', '.join(LOG_LEVELS)))
910                 sys.exit(1)
911             log_level = getattr(logging, val.upper())
912         elif opt == '-i':       # Immediate mode - send one message, then exit. 
913             immediate = val
914         elif opt == '-l':       # Logfile mode - report traffic read in
915             logfile = val
916         elif opt == '-n':       # Force the nick
917             nick_template = val
918         elif opt == '-p':       # Set a nickserv password
919             password = val
920         elif opt == '-V':       # Emit version and exit
921             sys.stdout.write("irkerd version %s\n" % version)
922             sys.exit(0)
923         elif opt == '-h':
924             usage()
925             sys.exit(0)
926
927     handler = logging.StreamHandler()
928     LOG.addHandler(handler)
929     if log_level:
930         LOG.setLevel(log_level)
931
932     irker = Irker(
933         logfile=logfile,
934         nick_template=nick_template,
935         nick_needs_number=re.search("%.*d", nick_template),
936         )
937     LOG.info("irkerd version %s" % version)
938     if immediate:
939         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
940         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
941         irker.irc.spin()
942     else:
943         irker.thread_launch()
944         try:
945             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
946             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
947             for server in [tcpserver, udpserver]:
948                 server = threading.Thread(target=server.serve_forever)
949                 server.setDaemon(True)
950                 server.start()
951             try:
952                 signal.pause()
953             except KeyboardInterrupt:
954                 raise SystemExit(1)
955         except socket.error, e:
956             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
957
958 # end