irkerd: Split imported modules onto their own lines
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). -l sets a logfile to capture message traffic from
17 channels.  -n sets the nick and -p the nickserv password. The -V
18 option prints the program version and exits.
19
20 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
21 resource page at <http://www.catb.org/~esr/irker/>.
22
23 Requires Python 2.6 or 2.5 with the simplejson library installed.
24 """
25
26 from __future__ import with_statement
27
28 # These things might need tuning
29
30 HOST = "localhost"
31 PORT = 6659
32
33 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
34 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
35 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
36 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
37 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
38 UNSEEN_TTL = 60                 # Time to live, seconds since first request
39 CHANNEL_MAX = 18                # Max channels open per socket (default)
40 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
41 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
42 CONNECTION_MAX = 200            # To avoid hitting a thread limit
43
44 # No user-serviceable parts below this line
45
46 version = "2.6"
47
48 import Queue
49 import SocketServer
50 import getopt
51 try:
52     import simplejson as json   # Faster, also makes us Python-2.4-compatible
53 except ImportError:
54     import json
55 import random
56 import re
57 import select
58 import signal
59 import socket
60 import sys
61 import threading
62 import time
63 import urlparse
64
65
66 # Sketch of implementation:
67 #
68 # One Irker object manages multiple IRC sessions.  It holds a map of
69 # Dispatcher objects, one per (server, port) combination, which are
70 # responsible for routing messages to one of any number of Connection
71 # objects that do the actual socket conversations.  The reason for the
72 # Dispatcher layer is that IRC daemons limit the number of channels a
73 # client (that is, from the daemon's point of view, a socket) can be
74 # joined to, so each session to a server needs a flock of Connection
75 # instances each with its own socket.
76 #
77 # Connections are timed out and removed when either they haven't seen a
78 # PING for a while (indicating that the server may be stalled or down)
79 # or there has been no message traffic to them for a while, or
80 # even if the queue is nonempty but efforts to connect have failed for
81 # a long time.
82 #
83 # There are multiple threads. One accepts incoming traffic from all
84 # servers.  Each Connection also has a consumer thread and a
85 # thread-safe message queue.  The program main appends messages to
86 # queues as JSON requests are received; the consumer threads try to
87 # ship them to servers.  When a socket write stalls, it only blocks an
88 # individual consumer thread; if it stalls long enough, the session
89 # will be timed out. This solves the biggest problem with a
90 # single-threaded implementation, which is that you can't count on a
91 # single stalled write not hanging all other traffic - you're at the
92 # mercy of the length of the buffers in the TCP/IP layer.
93 #
94 # Message delivery is thus not reliable in the face of network stalls,
95 # but this was considered acceptable because IRC (notoriously) has the
96 # same problem - there is little point in reliable delivery to a relay
97 # that is down or unreliable.
98 #
99 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
100 # It is strictly compliant to RFC1459, except for the interpretation and
101 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
102 #
103 # CHANLIMIT is as described in the Internet RFC draft
104 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
105 # The ",isnick" feature is as described in
106 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
107
108 # Historical note: the IRCClient and IRCServerConnection classes
109 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
110 # irclib code that irker formerly used as a service library.  They
111 # still look similar to parts of irclib because I contributed to that
112 # code before giving up on it.
113
114 class IRCError(Exception):
115     "An IRC exception"
116     pass
117
118
119 class InvalidRequest (ValueError):
120     "An invalid JSON request"
121     pass
122
123
124 class IRCClient():
125     "An IRC client session to one or more servers."
126     def __init__(self, debuglevel):
127         self.mutex = threading.RLock()
128         self.server_connections = []
129         self.event_handlers = {}
130         self.add_event_handler("ping",
131                                lambda c, e: c.ship("PONG %s" % e.target))
132         self.debuglevel = debuglevel
133
134     def newserver(self):
135         "Initialize a new server-connection object."
136         conn = IRCServerConnection(self)
137         with self.mutex:
138             self.server_connections.append(conn)
139         return conn
140
141     def spin(self, timeout=0.2):
142         "Spin processing data from connections forever."
143         # Outer loop should specifically *not* be mutex-locked.
144         # Otherwise no other thread would ever be able to change
145         # the shared state of an IRC object running this function.
146         while True:
147             nextsleep = 0
148             with self.mutex:
149                 connected = [x for x in self.server_connections
150                              if x is not None and x.socket is not None]
151                 sockets = [x.socket for x in connected]
152                 if sockets:
153                     connmap = dict([(c.socket.fileno(), c) for c in connected])
154                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
155                     for s in insocks:
156                         connmap[s.fileno()].consume()
157                 else:
158                     nextsleep = timeout
159             time.sleep(nextsleep)
160
161     def add_event_handler(self, event, handler):
162         "Set a handler to be called later."
163         with self.mutex:
164             event_handlers = self.event_handlers.setdefault(event, [])
165             event_handlers.append(handler)
166
167     def handle_event(self, connection, event):
168         with self.mutex:
169             h = self.event_handlers
170             th = sorted(h.get("all_events", []) + h.get(event.type, []))
171             for handler in th:
172                 handler(connection, event)
173
174     def drop_connection(self, connection):
175         with self.mutex:
176             self.server_connections.remove(connection)
177
178     def debug(self, level, errmsg):
179         "Debugging information."
180         if self.debuglevel >= level:
181             sys.stderr.write("irkerd: %s\n" % errmsg)
182
183 class LineBufferedStream():
184     "Line-buffer a read stream."
185     crlf_re = re.compile(b'\r?\n')
186
187     def __init__(self):
188         self.buffer = ''
189
190     def append(self, newbytes):
191         self.buffer += newbytes
192
193     def lines(self):
194         "Iterate over lines in the buffer."
195         lines = LineBufferedStream.crlf_re.split(self.buffer)
196         self.buffer = lines.pop()
197         return iter(lines)
198
199     def __iter__(self):
200         return self.lines()
201
202 class IRCServerConnectionError(IRCError):
203     pass
204
205 class IRCServerConnection():
206     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
207     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
208     # We only need to ensure that if some ancient server throws numerics
209     # for the ones we actually want to catch, they're mapped.
210     codemap = {
211         "001": "welcome",
212         "005": "featurelist",
213         "432": "erroneusnickname",
214         "433": "nicknameinuse",
215         "436": "nickcollision",
216         "437": "unavailresource",
217     }
218
219     def __init__(self, master):
220         self.master = master
221         self.socket = None
222
223     def connect(self, server, port, nickname,
224                 password=None, username=None, ircname=None):
225         self.master.debug(2, "connect(server=%r, port=%r, nickname=%r, ...)" %
226                           (server, port, nickname))
227         if self.socket is not None:
228             self.disconnect("Changing servers")
229
230         self.buffer = LineBufferedStream()
231         self.event_handlers = {}
232         self.real_server_name = ""
233         self.server = server
234         self.nickname = nickname
235         try:
236             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
237             self.socket.bind(('', 0))
238             self.socket.connect((server, port))
239         except socket.error as err:
240             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
241
242         if password:
243             self.ship("PASS " + password)
244         self.nick(self.nickname)
245         self.user(username=username or ircname, realname=ircname or nickname)
246         return self
247
248     def close(self):
249         # Without this thread lock, there is a window during which
250         # select() can find a closed socket, leading to an EBADF error.
251         with self.master.mutex:
252             self.disconnect("Closing object")
253             self.master.drop_connection(self)
254
255     def consume(self):
256         try:
257             incoming = self.socket.recv(16384)
258         except socket.error:
259             # Server hung up on us.
260             self.disconnect("Connection reset by peer")
261             return
262         if not incoming:
263             # Dead air also indicates a connection reset.
264             self.disconnect("Connection reset by peer")
265             return
266
267         self.buffer.append(incoming)
268
269         for line in self.buffer:
270             self.master.debug(2, "FROM: %s" % line)
271
272             if not line:
273                 continue
274
275             prefix = None
276             command = None
277             arguments = None
278             self.handle_event(Event("every_raw_message",
279                                      self.real_server_name,
280                                      None,
281                                      [line]))
282
283             m = IRCServerConnection.command_re.match(line)
284             if m.group("prefix"):
285                 prefix = m.group("prefix")
286                 if not self.real_server_name:
287                     self.real_server_name = prefix
288             if m.group("command"):
289                 command = m.group("command").lower()
290             if m.group("argument"):
291                 a = m.group("argument").split(" :", 1)
292                 arguments = a[0].split()
293                 if len(a) == 2:
294                     arguments.append(a[1])
295
296             command = IRCServerConnection.codemap.get(command, command)
297             if command in ["privmsg", "notice"]:
298                 target = arguments.pop(0)
299             else:
300                 target = None
301
302                 if command == "quit":
303                     arguments = [arguments[0]]
304                 elif command == "ping":
305                     target = arguments[0]
306                 else:
307                     target = arguments[0]
308                     arguments = arguments[1:]
309
310             self.master.debug(2,
311                               "command: %s, source: %s, target: %s, arguments: %s" % (command, prefix, target, arguments))
312             self.handle_event(Event(command, prefix, target, arguments))
313
314     def handle_event(self, event):
315         self.master.handle_event(self, event)
316         if event.type in self.event_handlers:
317             for fn in self.event_handlers[event.type]:
318                 fn(self, event)
319
320     def is_connected(self):
321         return self.socket is not None
322
323     def disconnect(self, message=""):
324         if self.socket is None:
325             return
326         # Don't send a QUIT here - causes infinite loop!
327         try:
328             self.socket.shutdown(socket.SHUT_WR)
329             self.socket.close()
330         except socket.error:
331             pass
332         del self.socket
333         self.socket = None
334         self.handle_event(Event("disconnect", self.server, "", [message]))
335
336     def join(self, channel, key=""):
337         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
338
339     def mode(self, target, command):
340         self.ship("MODE %s %s" % (target, command))
341
342     def nick(self, newnick):
343         self.ship("NICK " + newnick)
344
345     def part(self, channel, message=""):
346         cmd_parts = ['PART', channel]
347         if message:
348             cmd_parts.append(message)
349         self.ship(' '.join(cmd_parts))
350
351     def privmsg(self, target, text):
352         self.ship("PRIVMSG %s :%s" % (target, text))
353
354     def quit(self, message=""):
355         self.ship("QUIT" + (message and (" :" + message)))
356
357     def user(self, username, realname):
358         self.ship("USER %s 0 * :%s" % (username, realname))
359
360     def ship(self, string):
361         "Ship a command to the server, appending CR/LF"
362         try:
363             self.socket.send(string.encode('utf-8') + b'\r\n')
364             self.master.debug(2, "TO: %s" % string)
365         except socket.error:
366             self.disconnect("Connection reset by peer.")
367
368 class Event(object):
369     def __init__(self, evtype, source, target, arguments=None):
370         self.type = evtype
371         self.source = source
372         self.target = target
373         if arguments is None:
374             arguments = []
375         self.arguments = arguments
376
377 def is_channel(string):
378     return string and string[0] in "#&+!"
379
380 class Connection:
381     def __init__(self, irkerd, servername, port):
382         self.irker = irkerd
383         self.servername = servername
384         self.port = port
385         self.nick_trial = None
386         self.connection = None
387         self.status = None
388         self.last_xmit = time.time()
389         self.last_ping = time.time()
390         self.channels_joined = {}
391         self.channel_limits = {}
392         # The consumer thread
393         self.queue = Queue.Queue()
394         self.thread = None
395     def nickname(self, n=None):
396         "Return a name for the nth server connection."
397         if n is None:
398             n = self.nick_trial
399         if fallback:
400             return (namestyle % n)
401         else:
402             return namestyle
403     def handle_ping(self):
404         "Register the fact that the server has pinged this connection."
405         self.last_ping = time.time()
406     def handle_welcome(self):
407         "The server says we're OK, with a non-conflicting nick."
408         self.status = "ready"
409         self.irker.irc.debug(1, "nick %s accepted" % self.nickname())
410         if password:
411             self.connection.privmsg("nickserv", "identify %s" % password)
412     def handle_badnick(self):
413         "The server says our nick is ill-formed or has a conflict."
414         self.irker.irc.debug(1, "nick %s rejected" % self.nickname())
415         if fallback:
416             # Randomness prevents a malicious user or bot from
417             # anticipating the next trial name in order to block us
418             # from completing the handshake.
419             self.nick_trial += random.randint(1, 3)
420             self.last_xmit = time.time()
421             self.connection.nick(self.nickname())
422         # Otherwise fall through, it might be possible to
423         # recover manually.
424     def handle_disconnect(self):
425         "Server disconnected us for flooding or some other reason."
426         self.connection = None
427         if self.status != "expired":
428             self.status = "disconnected"
429     def handle_kick(self, outof):
430         "We've been kicked."
431         self.status = "handshaking"
432         try:
433             del self.channels_joined[outof]
434         except KeyError:
435             self.irker.logerr("kicked by %s from %s that's not joined"
436                               % (self.servername, outof))
437         qcopy = []
438         while not self.queue.empty():
439             (channel, message, key) = self.queue.get()
440             if channel != outof:
441                 qcopy.append((channel, message, key))
442         for (channel, message, key) in qcopy:
443             self.queue.put((channel, message, key))
444         self.status = "ready"
445     def enqueue(self, channel, message, key, quit_after=False):
446         "Enque a message for transmission."
447         if self.thread is None or not self.thread.is_alive():
448             self.status = "unseen"
449             self.thread = threading.Thread(target=self.dequeue)
450             self.thread.setDaemon(True)
451             self.thread.start()
452         self.queue.put((channel, message, key))
453         if quit_after:
454             self.queue.put((channel, None, key))
455     def dequeue(self):
456         "Try to ship pending messages from the queue."
457         try:
458             while True:
459                 # We want to be kind to the IRC servers and not hold unused
460                 # sockets open forever, so they have a time-to-live.  The
461                 # loop is coded this particular way so that we can drop
462                 # the actual server connection when its time-to-live
463                 # expires, then reconnect and resume transmission if the
464                 # queue fills up again.
465                 if self.queue.empty():
466                     # Queue is empty, at some point we want to time out
467                     # the connection rather than holding a socket open in
468                     # the server forever.
469                     now = time.time()
470                     xmit_timeout = now > self.last_xmit + XMIT_TTL
471                     ping_timeout = now > self.last_ping + PING_TTL
472                     if self.status == "disconnected":
473                         # If the queue is empty, we can drop this connection.
474                         self.status = "expired"
475                         break
476                     elif xmit_timeout or ping_timeout:
477                         self.irker.irc.debug(1, "timing out connection to %s at %s (ping_timeout=%s, xmit_timeout=%s)" % (self.servername, time.asctime(), ping_timeout, xmit_timeout))
478                         with self.irker.irc.mutex:
479                             self.connection.context = None
480                             self.connection.quit("transmission timeout")
481                             self.connection = None
482                         self.status = "disconnected"
483                     else:
484                         # Prevent this thread from hogging the CPU by pausing
485                         # for just a little bit after the queue-empty check.
486                         # As long as this is less that the duration of a human
487                         # reflex arc it is highly unlikely any human will ever
488                         # notice.
489                         time.sleep(ANTI_BUZZ_DELAY)
490                 elif self.status == "disconnected" \
491                          and time.time() > self.last_xmit + DISCONNECT_TTL:
492                     # Queue is nonempty, but the IRC server might be
493                     # down. Letting failed connections retain queue
494                     # space forever would be a memory leak.
495                     self.status = "expired"
496                     break
497                 elif not self.connection and self.status != "expired":
498                     # Queue is nonempty but server isn't connected.
499                     with self.irker.irc.mutex:
500                         self.connection = self.irker.irc.newserver()
501                         self.connection.context = self
502                         # Try to avoid colliding with other instances
503                         self.nick_trial = random.randint(1, 990)
504                         self.channels_joined = {}
505                         try:
506                             # This will throw
507                             # IRCServerConnectionError on failure
508                             self.connection.connect(self.servername,
509                                                 self.port,
510                                                 nickname=self.nickname(),
511                                                 username="irker",
512                                                 ircname="irker relaying client")
513                             self.status = "handshaking"
514                             self.irker.irc.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime()))
515                             self.last_xmit = time.time()
516                             self.last_ping = time.time()
517                         except IRCServerConnectionError:
518                             self.status = "expired"
519                             break
520                 elif self.status == "handshaking":
521                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
522                         self.status = "expired"
523                         break
524                     else:
525                         # Don't buzz on the empty-queue test while we're
526                         # handshaking
527                         time.sleep(ANTI_BUZZ_DELAY)
528                 elif self.status == "unseen" \
529                          and time.time() > self.last_xmit + UNSEEN_TTL:
530                     # Nasty people could attempt a denial-of-service
531                     # attack by flooding us with requests with invalid
532                     # servernames. We guard against this by rapidly
533                     # expiring connections that have a nonempty queue but
534                     # have never had a successful open.
535                     self.status = "expired"
536                     break
537                 elif self.status == "ready":
538                     (channel, message, key) = self.queue.get()
539                     if channel not in self.channels_joined:
540                         self.connection.join(channel, key=key)
541                         self.irker.irc.debug(1, "joining %s on %s." % (channel, self.servername))
542                     # None is magic - it's a request to quit the server
543                     if message is None:
544                         self.connection.quit()
545                     # An empty message might be used as a keepalive or
546                     # to join a channel for logging, so suppress the
547                     # privmsg send unless there is actual traffic.
548                     elif message:
549                         for segment in message.split("\n"):
550                             # Truncate the message if it's too long,
551                             # but we're working with characters here,
552                             # not bytes, so we could be off.
553                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
554                             maxlength = 500 - len(channel)
555                             if len(segment) > maxlength:
556                                 segment = segment[:maxlength]
557                             try:
558                                 self.connection.privmsg(channel, segment)
559                             except ValueError as err:
560                                 self.irker.irc.debug(1, "irclib rejected a message to %s on %s because: %s" % (channel, self.servername, str(err)))
561                                 self.irker.irc.debug(50, err.format_exc())
562                             time.sleep(ANTI_FLOOD_DELAY)
563                     self.last_xmit = self.channels_joined[channel] = time.time()
564                     self.irker.irc.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime()))
565                     self.queue.task_done()
566                 elif self.status == "expired":
567                     print "We're expired but still running! This is a bug."
568                     break
569         except:
570             (exc_type, _exc_value, exc_traceback) = sys.exc_info()
571             self.irker.logerr("exception %s in thread for %s" % \
572                               (exc_type, self.servername))
573
574             # Maybe this should have its own status?
575             self.status = "expired"
576
577             # This is so we can see tracebacks for errors inside the thread
578             # when we need to be able to for debugging purposes.
579             if debuglvl > 0:
580                 raise exc_type, _exc_value, exc_traceback
581         finally:
582             try:
583                 # Make sure we don't leave any zombies behind
584                 self.connection.close()
585             except:
586                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
587                 pass
588     def live(self):
589         "Should this connection not be scavenged?"
590         return self.status != "expired"
591     def joined_to(self, channel):
592         "Is this connection joined to the specified channel?"
593         return channel in self.channels_joined
594     def accepting(self, channel):
595         "Can this connection accept a join of this channel?"
596         if self.channel_limits:
597             match_count = 0
598             for already in self.channels_joined:
599                 # This obscure code is because the RFCs allow separate limits
600                 # by channel type (indicated by the first character of the name)
601                 # a feature that is almost never actually used.
602                 if already[0] == channel[0]:
603                     match_count += 1
604             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
605         else:
606             return len(self.channels_joined) < CHANNEL_MAX
607
608 class Target():
609     "Represent a transmission target."
610     def __init__(self, url):
611         self.url = url
612         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
613         url = url.replace("irc://", "http://")
614         parsed = urlparse.urlparse(url)
615         irchost, _, ircport = parsed.netloc.partition(':')
616         if not ircport:
617             ircport = 6667
618         self.servername = irchost
619         # IRC channel names are case-insensitive.  If we don't smash
620         # case here we may run into problems later. There was a bug
621         # observed on irc.rizon.net where an irkerd user specified #Channel,
622         # got kicked, and irkerd crashed because the server returned
623         # "#channel" in the notification that our kick handler saw.
624         self.channel = parsed.path.lstrip('/').lower()
625         # This deals with a tweak in recent versions of urlparse.
626         if parsed.fragment:
627             self.channel += "#" + parsed.fragment
628         isnick = self.channel.endswith(",isnick")
629         if isnick:
630             self.channel = self.channel[:-7]
631         if self.channel and not isnick and self.channel[0] not in "#&+":
632             self.channel = "#" + self.channel
633         # support both channel?secret and channel?key=secret
634         self.key = ""
635         if parsed.query:
636             self.key = re.sub("^key=", "", parsed.query)
637         self.port = int(ircport)
638     def validate(self):
639         "Raise InvalidRequest if the URL is missing a critical component"
640         if not self.servername:
641             raise InvalidRequest(
642                 'target URL missing a servername: %r' % self.url)
643         if not self.channel:
644             raise InvalidRequest(
645                 'target URL missing a channel: %r' % self.url)
646     def server(self):
647         "Return a hashable tuple representing the destination server."
648         return (self.servername, self.port)
649
650 class Dispatcher:
651     "Manage connections to a particular server-port combination."
652     def __init__(self, irkerd, servername, port):
653         self.irker = irkerd
654         self.servername = servername
655         self.port = port
656         self.connections = []
657     def dispatch(self, channel, message, key, quit_after=False):
658         "Dispatch messages for our server-port combination."
659         # First, check if there is room for another channel
660         # on any of our existing connections.
661         connections = [x for x in self.connections if x.live()]
662         eligibles = [x for x in connections if x.joined_to(channel)] \
663                     or [x for x in connections if x.accepting(channel)]
664         if eligibles:
665             eligibles[0].enqueue(channel, message, key, quit_after)
666             return
667         # All connections are full up. Look for one old enough to be
668         # scavenged.
669         ancients = []
670         for connection in connections:
671             for (chan, age) in connections.channels_joined.items():
672                 if age < time.time() - CHANNEL_TTL:
673                     ancients.append((connection, chan, age))
674         if ancients:
675             ancients.sort(key=lambda x: x[2]) 
676             (found_connection, drop_channel, _drop_age) = ancients[0]
677             found_connection.part(drop_channel, "scavenged by irkerd")
678             del found_connection.channels_joined[drop_channel]
679             #time.sleep(ANTI_FLOOD_DELAY)
680             found_connection.enqueue(channel, message, key, quit_after)
681             return
682         # Didn't find any channels with no recent activity
683         newconn = Connection(self.irker,
684                              self.servername,
685                              self.port)
686         self.connections.append(newconn)
687         newconn.enqueue(channel, message, key, quit_after)
688     def live(self):
689         "Does this server-port combination have any live connections?"
690         self.connections = [x for x in self.connections if x.live()]
691         return len(self.connections) > 0
692     def pending(self):
693         "Return all connections with pending traffic."
694         return [x for x in self.connections if not x.queue.empty()]
695     def last_xmit(self):
696         "Return the time of the most recent transmission."
697         return max(x.last_xmit for x in self.connections)
698
699 class Irker:
700     "Persistent IRC multiplexer."
701     def __init__(self, debuglevel=0):
702         self.debuglevel = debuglevel
703         self.irc = IRCClient(self.debuglevel)
704         self.irc.add_event_handler("ping", self._handle_ping)
705         self.irc.add_event_handler("welcome", self._handle_welcome)
706         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
707         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
708         self.irc.add_event_handler("nickcollision", self._handle_badnick)
709         self.irc.add_event_handler("unavailresource", self._handle_badnick)
710         self.irc.add_event_handler("featurelist", self._handle_features)
711         self.irc.add_event_handler("disconnect", self._handle_disconnect)
712         self.irc.add_event_handler("kick", self._handle_kick)
713         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
714         self.servers = {}
715     def thread_launch(self):
716         thread = threading.Thread(target=self.irc.spin)
717         thread.setDaemon(True)
718         self.irc._thread = thread
719         thread.start()
720     def logerr(self, errmsg):
721         "Log a processing error."
722         sys.stderr.write("irkerd: " + errmsg + "\n")
723     def _handle_ping(self, connection, _event):
724         "PING arrived, bump the last-received time for the connection."
725         if connection.context:
726             connection.context.handle_ping()
727     def _handle_welcome(self, connection, _event):
728         "Welcome arrived, nick accepted for this connection."
729         if connection.context:
730             connection.context.handle_welcome()
731     def _handle_badnick(self, connection, _event):
732         "Nick not accepted for this connection."
733         if connection.context:
734             connection.context.handle_badnick()
735     def _handle_features(self, connection, event):
736         "Determine if and how we can set deaf mode."
737         if connection.context:
738             cxt = connection.context
739             arguments = event.arguments
740             for lump in arguments:
741                 if lump.startswith("DEAF="):
742                     if not logfile:
743                         connection.mode(cxt.nickname(), "+"+lump[5:])
744                 elif lump.startswith("MAXCHANNELS="):
745                     m = int(lump[12:])
746                     for pref in "#&+":
747                         cxt.channel_limits[pref] = m
748                     self.irc.debug(1, "%s maxchannels is %d"
749                                % (connection.server, m))
750                 elif lump.startswith("CHANLIMIT=#:"):
751                     limits = lump[10:].split(",")
752                     try:
753                         for token in limits:
754                             (prefixes, limit) = token.split(":")
755                             limit = int(limit)
756                             for c in prefixes:
757                                 cxt.channel_limits[c] = limit
758                         self.irc.debug(1, "%s channel limit map is %s"
759                                    % (connection.server, cxt.channel_limits))
760                     except ValueError:
761                         self.logerr("ill-formed CHANLIMIT property")
762     def _handle_disconnect(self, connection, _event):
763         "Server hung up the connection."
764         self.irc.debug(1, "server %s disconnected" % connection.server)
765         connection.close()
766         if connection.context:
767             connection.context.handle_disconnect()
768     def _handle_kick(self, connection, event):
769         "Server hung up the connection."
770         target = event.target
771         self.irc.debug(1, "irker has been kicked from %s on %s" % (target, connection.server))
772         if connection.context:
773             connection.context.handle_kick(target)
774     def _handle_every_raw_message(self, _connection, event):
775         "Log all messages when in watcher mode."
776         if logfile:
777             with open(logfile, "a") as logfp:
778                 logfp.write("%03f|%s|%s\n" % \
779                              (time.time(), event.source, event.arguments[0]))
780     def pending(self):
781         "Do we have any pending message traffic?"
782         return [k for (k, v) in self.servers.items() if v.pending()]
783
784     def _parse_request(self, line):
785         "Request-parsing helper for the handle() method"
786         request = json.loads(line.strip())
787         if not isinstance(request, dict):
788             raise InvalidRequest(
789                 "request is not a JSON dictionary: %r" % request)
790         if "to" not in request or "privmsg" not in request:
791             raise InvalidRequest(
792                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
793         channels = request['to']
794         message = request['privmsg']
795         if not isinstance(channels, (list, basestring)):
796             raise InvalidRequest(
797                 "malformed request - unexpected channel type: %r" % channels)
798         if not isinstance(message, basestring):
799             raise InvalidRequest(
800                 "malformed request - unexpected message type: %r" % message)
801         if not isinstance(channels, list):
802             channels = [channels]
803         targets = []
804         for url in channels:
805             try:
806                 if not isinstance(url, basestring):
807                     raise InvalidRequest(
808                         "malformed request - URL has unexpected type: %r" %
809                         url)
810                 target = Target(url)
811                 target.validate()
812             except InvalidRequest, e:
813                 self.logerr(str(e))
814             else:
815                 targets.append(target)
816         return (targets, message)
817
818     def handle(self, line, quit_after=False):
819         "Perform a JSON relay request."
820         try:
821             targets, message = self._parse_request(line=line)
822             for target in targets:
823                 if target.server() not in self.servers:
824                     self.servers[target.server()] = Dispatcher(
825                         self, target.servername, target.port)
826                 self.servers[target.server()].dispatch(
827                     target.channel, message, target.key, quit_after=quit_after)
828                 # GC dispatchers with no active connections
829                 servernames = self.servers.keys()
830                 for servername in servernames:
831                     if not self.servers[servername].live():
832                         del self.servers[servername]
833                     # If we might be pushing a resource limit even
834                     # after garbage collection, remove a session.  The
835                     # goal here is to head off DoS attacks that aim at
836                     # exhausting thread space or file descriptors.
837                     # The cost is that attempts to DoS this service
838                     # will cause lots of join/leave spam as we
839                     # scavenge old channels after connecting to new
840                     # ones. The particular method used for selecting a
841                     # session to be terminated doesn't matter much; we
842                     # choose the one longest idle on the assumption
843                     # that message activity is likely to be clumpy.
844                     if len(self.servers) >= CONNECTION_MAX:
845                         oldest = min(
846                             self.servers.keys(),
847                             key=lambda name: self.servers[name].last_xmit())
848                         del self.servers[oldest]
849         except InvalidRequest, e:
850             self.logerr(str(e))
851         except ValueError:
852             self.logerr("can't recognize JSON on input: %r" % line)
853         except RuntimeError:
854             self.logerr("wildly malformed JSON blew the parser stack.")
855
856 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
857     def handle(self):
858         while True:
859             line = self.rfile.readline()
860             if not line:
861                 break
862             irker.handle(line.strip())
863
864 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
865     def handle(self):
866         data = self.request[0].strip()
867         #socket = self.request[1]
868         irker.handle(data)
869
870 def usage():
871     sys.stdout.write("""
872 Usage:
873   irkerd [-d debuglevel] [-l logfile] [-n nick] [-p password] [-i channel message] [-V] [-h]
874
875 Options
876   -d    set debug level
877   -l    set logfile
878   -n    set nick-style
879   -p    set nickserv password
880   -i    immediate mode
881   -V    return irkerd version
882   -h    print this help dialog
883 """)
884
885 if __name__ == '__main__':
886     debuglvl = 0
887     immediate = None
888     namestyle = "irker%03d"
889     password = None
890     logfile = None
891     try:
892         (options, arguments) = getopt.getopt(sys.argv[1:], "d:i:l:n:p:Vh")
893     except getopt.GetoptError as e:
894         sys.stderr.write("%s" % e)
895         usage()
896         sys.exit(1)
897     for (opt, val) in options:
898         if opt == '-d':         # Enable debug/progress messages
899             debuglvl = int(val)
900         elif opt == '-i':       # Immediate mode - send one message, then exit. 
901             immediate = val
902         elif opt == '-l':       # Logfile mode - report traffic read in
903             logfile = val
904         elif opt == '-n':       # Force the nick
905             namestyle = val
906         elif opt == '-p':       # Set a nickserv password
907             password = val
908         elif opt == '-V':       # Emit version and exit
909             sys.stdout.write("irkerd version %s\n" % version)
910             sys.exit(0)
911         elif opt == '-h':
912             usage()
913             sys.exit(0)
914     fallback = re.search("%.*d", namestyle)
915     irker = Irker(debuglevel=debuglvl)
916     irker.irc.debug(1, "irkerd version %s" % version)
917     if immediate:
918         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
919         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
920         irker.irc.spin()
921     else:
922         irker.thread_launch()
923         try:
924             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
925             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
926             for server in [tcpserver, udpserver]:
927                 server = threading.Thread(target=server.serve_forever)
928                 server.setDaemon(True)
929                 server.start()
930             try:
931                 signal.pause()
932             except KeyboardInterrupt:
933                 raise SystemExit(1)
934         except socket.error, e:
935             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
936
937 # end