irkerd: Replace 'fallback' global with local 'nick_needs_number'
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). -l sets a logfile to capture message traffic from
17 channels.  -n sets the nick and -p the nickserv password. The -V
18 option prints the program version and exits.
19
20 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
21 resource page at <http://www.catb.org/~esr/irker/>.
22
23 Requires Python 2.6 or 2.5 with the simplejson library installed.
24 """
25
26 from __future__ import with_statement
27
28 # These things might need tuning
29
30 HOST = "localhost"
31 PORT = 6659
32
33 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
34 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
35 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
36 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
37 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
38 UNSEEN_TTL = 60                 # Time to live, seconds since first request
39 CHANNEL_MAX = 18                # Max channels open per socket (default)
40 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
41 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
42 CONNECTION_MAX = 200            # To avoid hitting a thread limit
43
44 # No user-serviceable parts below this line
45
46 version = "2.6"
47
48 import Queue
49 import SocketServer
50 import getopt
51 import logging
52 try:
53     import simplejson as json   # Faster, also makes us Python-2.4-compatible
54 except ImportError:
55     import json
56 import random
57 import re
58 import select
59 import signal
60 import socket
61 import sys
62 import threading
63 import time
64 import urlparse
65
66
67 LOG = logging.getLogger(__name__)
68 LOG.setLevel(logging.ERROR)
69 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
70
71
72 # Sketch of implementation:
73 #
74 # One Irker object manages multiple IRC sessions.  It holds a map of
75 # Dispatcher objects, one per (server, port) combination, which are
76 # responsible for routing messages to one of any number of Connection
77 # objects that do the actual socket conversations.  The reason for the
78 # Dispatcher layer is that IRC daemons limit the number of channels a
79 # client (that is, from the daemon's point of view, a socket) can be
80 # joined to, so each session to a server needs a flock of Connection
81 # instances each with its own socket.
82 #
83 # Connections are timed out and removed when either they haven't seen a
84 # PING for a while (indicating that the server may be stalled or down)
85 # or there has been no message traffic to them for a while, or
86 # even if the queue is nonempty but efforts to connect have failed for
87 # a long time.
88 #
89 # There are multiple threads. One accepts incoming traffic from all
90 # servers.  Each Connection also has a consumer thread and a
91 # thread-safe message queue.  The program main appends messages to
92 # queues as JSON requests are received; the consumer threads try to
93 # ship them to servers.  When a socket write stalls, it only blocks an
94 # individual consumer thread; if it stalls long enough, the session
95 # will be timed out. This solves the biggest problem with a
96 # single-threaded implementation, which is that you can't count on a
97 # single stalled write not hanging all other traffic - you're at the
98 # mercy of the length of the buffers in the TCP/IP layer.
99 #
100 # Message delivery is thus not reliable in the face of network stalls,
101 # but this was considered acceptable because IRC (notoriously) has the
102 # same problem - there is little point in reliable delivery to a relay
103 # that is down or unreliable.
104 #
105 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
106 # It is strictly compliant to RFC1459, except for the interpretation and
107 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
108 #
109 # CHANLIMIT is as described in the Internet RFC draft
110 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
111 # The ",isnick" feature is as described in
112 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
113
114 # Historical note: the IRCClient and IRCServerConnection classes
115 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
116 # irclib code that irker formerly used as a service library.  They
117 # still look similar to parts of irclib because I contributed to that
118 # code before giving up on it.
119
120 class IRCError(Exception):
121     "An IRC exception"
122     pass
123
124
125 class InvalidRequest (ValueError):
126     "An invalid JSON request"
127     pass
128
129
130 class IRCClient():
131     "An IRC client session to one or more servers."
132     def __init__(self):
133         self.mutex = threading.RLock()
134         self.server_connections = []
135         self.event_handlers = {}
136         self.add_event_handler("ping",
137                                lambda c, e: c.ship("PONG %s" % e.target))
138
139     def newserver(self):
140         "Initialize a new server-connection object."
141         conn = IRCServerConnection(self)
142         with self.mutex:
143             self.server_connections.append(conn)
144         return conn
145
146     def spin(self, timeout=0.2):
147         "Spin processing data from connections forever."
148         # Outer loop should specifically *not* be mutex-locked.
149         # Otherwise no other thread would ever be able to change
150         # the shared state of an IRC object running this function.
151         while True:
152             nextsleep = 0
153             with self.mutex:
154                 connected = [x for x in self.server_connections
155                              if x is not None and x.socket is not None]
156                 sockets = [x.socket for x in connected]
157                 if sockets:
158                     connmap = dict([(c.socket.fileno(), c) for c in connected])
159                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
160                     for s in insocks:
161                         connmap[s.fileno()].consume()
162                 else:
163                     nextsleep = timeout
164             time.sleep(nextsleep)
165
166     def add_event_handler(self, event, handler):
167         "Set a handler to be called later."
168         with self.mutex:
169             event_handlers = self.event_handlers.setdefault(event, [])
170             event_handlers.append(handler)
171
172     def handle_event(self, connection, event):
173         with self.mutex:
174             h = self.event_handlers
175             th = sorted(h.get("all_events", []) + h.get(event.type, []))
176             for handler in th:
177                 handler(connection, event)
178
179     def drop_connection(self, connection):
180         with self.mutex:
181             self.server_connections.remove(connection)
182
183
184 class LineBufferedStream():
185     "Line-buffer a read stream."
186     crlf_re = re.compile(b'\r?\n')
187
188     def __init__(self):
189         self.buffer = ''
190
191     def append(self, newbytes):
192         self.buffer += newbytes
193
194     def lines(self):
195         "Iterate over lines in the buffer."
196         lines = LineBufferedStream.crlf_re.split(self.buffer)
197         self.buffer = lines.pop()
198         return iter(lines)
199
200     def __iter__(self):
201         return self.lines()
202
203 class IRCServerConnectionError(IRCError):
204     pass
205
206 class IRCServerConnection():
207     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
208     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
209     # We only need to ensure that if some ancient server throws numerics
210     # for the ones we actually want to catch, they're mapped.
211     codemap = {
212         "001": "welcome",
213         "005": "featurelist",
214         "432": "erroneusnickname",
215         "433": "nicknameinuse",
216         "436": "nickcollision",
217         "437": "unavailresource",
218     }
219
220     def __init__(self, master):
221         self.master = master
222         self.socket = None
223
224     def connect(self, target, nickname,
225                 password=None, username=None, ircname=None):
226         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
227             target.servername, target.port, nickname))
228         if self.socket is not None:
229             self.disconnect("Changing servers")
230
231         self.buffer = LineBufferedStream()
232         self.event_handlers = {}
233         self.real_server_name = ""
234         self.target = target
235         self.nickname = nickname
236         try:
237             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238             self.socket.bind(('', 0))
239             self.socket.connect((target.servername, target.port))
240         except socket.error as err:
241             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
242
243         if password:
244             self.ship("PASS " + password)
245         self.nick(self.nickname)
246         self.user(username=username or ircname, realname=ircname or nickname)
247         return self
248
249     def close(self):
250         # Without this thread lock, there is a window during which
251         # select() can find a closed socket, leading to an EBADF error.
252         with self.master.mutex:
253             self.disconnect("Closing object")
254             self.master.drop_connection(self)
255
256     def consume(self):
257         try:
258             incoming = self.socket.recv(16384)
259         except socket.error:
260             # Server hung up on us.
261             self.disconnect("Connection reset by peer")
262             return
263         if not incoming:
264             # Dead air also indicates a connection reset.
265             self.disconnect("Connection reset by peer")
266             return
267
268         self.buffer.append(incoming)
269
270         for line in self.buffer:
271             LOG.debug("FROM: %s" % line)
272
273             if not line:
274                 continue
275
276             prefix = None
277             command = None
278             arguments = None
279             self.handle_event(Event("every_raw_message",
280                                      self.real_server_name,
281                                      None,
282                                      [line]))
283
284             m = IRCServerConnection.command_re.match(line)
285             if m.group("prefix"):
286                 prefix = m.group("prefix")
287                 if not self.real_server_name:
288                     self.real_server_name = prefix
289             if m.group("command"):
290                 command = m.group("command").lower()
291             if m.group("argument"):
292                 a = m.group("argument").split(" :", 1)
293                 arguments = a[0].split()
294                 if len(a) == 2:
295                     arguments.append(a[1])
296
297             command = IRCServerConnection.codemap.get(command, command)
298             if command in ["privmsg", "notice"]:
299                 target = arguments.pop(0)
300             else:
301                 target = None
302
303                 if command == "quit":
304                     arguments = [arguments[0]]
305                 elif command == "ping":
306                     target = arguments[0]
307                 else:
308                     target = arguments[0]
309                     arguments = arguments[1:]
310
311             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
312                 command, prefix, target, arguments))
313             self.handle_event(Event(command, prefix, target, arguments))
314
315     def handle_event(self, event):
316         self.master.handle_event(self, event)
317         if event.type in self.event_handlers:
318             for fn in self.event_handlers[event.type]:
319                 fn(self, event)
320
321     def is_connected(self):
322         return self.socket is not None
323
324     def disconnect(self, message=""):
325         if self.socket is None:
326             return
327         # Don't send a QUIT here - causes infinite loop!
328         try:
329             self.socket.shutdown(socket.SHUT_WR)
330             self.socket.close()
331         except socket.error:
332             pass
333         del self.socket
334         self.socket = None
335         self.handle_event(
336             Event("disconnect", self.target.server, "", [message]))
337
338     def join(self, channel, key=""):
339         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
340
341     def mode(self, target, command):
342         self.ship("MODE %s %s" % (target, command))
343
344     def nick(self, newnick):
345         self.ship("NICK " + newnick)
346
347     def part(self, channel, message=""):
348         cmd_parts = ['PART', channel]
349         if message:
350             cmd_parts.append(message)
351         self.ship(' '.join(cmd_parts))
352
353     def privmsg(self, target, text):
354         self.ship("PRIVMSG %s :%s" % (target, text))
355
356     def quit(self, message=""):
357         self.ship("QUIT" + (message and (" :" + message)))
358
359     def user(self, username, realname):
360         self.ship("USER %s 0 * :%s" % (username, realname))
361
362     def ship(self, string):
363         "Ship a command to the server, appending CR/LF"
364         try:
365             self.socket.send(string.encode('utf-8') + b'\r\n')
366             LOG.debug("TO: %s" % string)
367         except socket.error:
368             self.disconnect("Connection reset by peer.")
369
370 class Event(object):
371     def __init__(self, evtype, source, target, arguments=None):
372         self.type = evtype
373         self.source = source
374         self.target = target
375         if arguments is None:
376             arguments = []
377         self.arguments = arguments
378
379 def is_channel(string):
380     return string and string[0] in "#&+!"
381
382 class Connection:
383     def __init__(self, irker, target, nick_needs_number=False, **kwargs):
384         self.irker = irker
385         self.target = target
386         self.nick_needs_number = nick_needs_number
387         self.kwargs = kwargs
388         self.nick_trial = None
389         self.connection = None
390         self.status = None
391         self.last_xmit = time.time()
392         self.last_ping = time.time()
393         self.channels_joined = {}
394         self.channel_limits = {}
395         # The consumer thread
396         self.queue = Queue.Queue()
397         self.thread = None
398     def nickname(self, n=None):
399         "Return a name for the nth server connection."
400         if n is None:
401             n = self.nick_trial
402         if self.nick_needs_number:
403             return (namestyle % n)
404         else:
405             return namestyle
406     def handle_ping(self):
407         "Register the fact that the server has pinged this connection."
408         self.last_ping = time.time()
409     def handle_welcome(self):
410         "The server says we're OK, with a non-conflicting nick."
411         self.status = "ready"
412         LOG.info("nick %s accepted" % self.nickname())
413         if password:
414             self.connection.privmsg("nickserv", "identify %s" % password)
415     def handle_badnick(self):
416         "The server says our nick is ill-formed or has a conflict."
417         LOG.info("nick %s rejected" % self.nickname())
418         if self.nick_needs_number:
419             # Randomness prevents a malicious user or bot from
420             # anticipating the next trial name in order to block us
421             # from completing the handshake.
422             self.nick_trial += random.randint(1, 3)
423             self.last_xmit = time.time()
424             self.connection.nick(self.nickname())
425         # Otherwise fall through, it might be possible to
426         # recover manually.
427     def handle_disconnect(self):
428         "Server disconnected us for flooding or some other reason."
429         self.connection = None
430         if self.status != "expired":
431             self.status = "disconnected"
432     def handle_kick(self, outof):
433         "We've been kicked."
434         self.status = "handshaking"
435         try:
436             del self.channels_joined[outof]
437         except KeyError:
438             LOG.error("kicked by %s from %s that's not joined" % (
439                 self.target, outof))
440         qcopy = []
441         while not self.queue.empty():
442             (channel, message, key) = self.queue.get()
443             if channel != outof:
444                 qcopy.append((channel, message, key))
445         for (channel, message, key) in qcopy:
446             self.queue.put((channel, message, key))
447         self.status = "ready"
448     def enqueue(self, channel, message, key, quit_after=False):
449         "Enque a message for transmission."
450         if self.thread is None or not self.thread.is_alive():
451             self.status = "unseen"
452             self.thread = threading.Thread(target=self.dequeue)
453             self.thread.setDaemon(True)
454             self.thread.start()
455         self.queue.put((channel, message, key))
456         if quit_after:
457             self.queue.put((channel, None, key))
458     def dequeue(self):
459         "Try to ship pending messages from the queue."
460         try:
461             while True:
462                 # We want to be kind to the IRC servers and not hold unused
463                 # sockets open forever, so they have a time-to-live.  The
464                 # loop is coded this particular way so that we can drop
465                 # the actual server connection when its time-to-live
466                 # expires, then reconnect and resume transmission if the
467                 # queue fills up again.
468                 if self.queue.empty():
469                     # Queue is empty, at some point we want to time out
470                     # the connection rather than holding a socket open in
471                     # the server forever.
472                     now = time.time()
473                     xmit_timeout = now > self.last_xmit + XMIT_TTL
474                     ping_timeout = now > self.last_ping + PING_TTL
475                     if self.status == "disconnected":
476                         # If the queue is empty, we can drop this connection.
477                         self.status = "expired"
478                         break
479                     elif xmit_timeout or ping_timeout:
480                         LOG.info((
481                             "timing out connection to %s at %s "
482                             "(ping_timeout=%s, xmit_timeout=%s)") % (
483                             self.target, time.asctime(), ping_timeout,
484                             xmit_timeout))
485                         with self.irker.irc.mutex:
486                             self.connection.context = None
487                             self.connection.quit("transmission timeout")
488                             self.connection = None
489                         self.status = "disconnected"
490                     else:
491                         # Prevent this thread from hogging the CPU by pausing
492                         # for just a little bit after the queue-empty check.
493                         # As long as this is less that the duration of a human
494                         # reflex arc it is highly unlikely any human will ever
495                         # notice.
496                         time.sleep(ANTI_BUZZ_DELAY)
497                 elif self.status == "disconnected" \
498                          and time.time() > self.last_xmit + DISCONNECT_TTL:
499                     # Queue is nonempty, but the IRC server might be
500                     # down. Letting failed connections retain queue
501                     # space forever would be a memory leak.
502                     self.status = "expired"
503                     break
504                 elif not self.connection and self.status != "expired":
505                     # Queue is nonempty but server isn't connected.
506                     with self.irker.irc.mutex:
507                         self.connection = self.irker.irc.newserver()
508                         self.connection.context = self
509                         # Try to avoid colliding with other instances
510                         self.nick_trial = random.randint(1, 990)
511                         self.channels_joined = {}
512                         try:
513                             # This will throw
514                             # IRCServerConnectionError on failure
515                             self.connection.connect(
516                                 target=self.target,
517                                 nickname=self.nickname(),
518                                 username="irker",
519                                 ircname="irker relaying client",
520                                 **self.kwargs)
521                             self.status = "handshaking"
522                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
523                                 self.target, time.asctime()))
524                             self.last_xmit = time.time()
525                             self.last_ping = time.time()
526                         except IRCServerConnectionError:
527                             self.status = "expired"
528                             break
529                 elif self.status == "handshaking":
530                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
531                         self.status = "expired"
532                         break
533                     else:
534                         # Don't buzz on the empty-queue test while we're
535                         # handshaking
536                         time.sleep(ANTI_BUZZ_DELAY)
537                 elif self.status == "unseen" \
538                          and time.time() > self.last_xmit + UNSEEN_TTL:
539                     # Nasty people could attempt a denial-of-service
540                     # attack by flooding us with requests with invalid
541                     # servernames. We guard against this by rapidly
542                     # expiring connections that have a nonempty queue but
543                     # have never had a successful open.
544                     self.status = "expired"
545                     break
546                 elif self.status == "ready":
547                     (channel, message, key) = self.queue.get()
548                     if channel not in self.channels_joined:
549                         self.connection.join(channel, key=key)
550                         LOG.info("joining %s on %s." % (channel, self.target))
551                     # None is magic - it's a request to quit the server
552                     if message is None:
553                         self.connection.quit()
554                     # An empty message might be used as a keepalive or
555                     # to join a channel for logging, so suppress the
556                     # privmsg send unless there is actual traffic.
557                     elif message:
558                         for segment in message.split("\n"):
559                             # Truncate the message if it's too long,
560                             # but we're working with characters here,
561                             # not bytes, so we could be off.
562                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
563                             maxlength = 500 - len(channel)
564                             if len(segment) > maxlength:
565                                 segment = segment[:maxlength]
566                             try:
567                                 self.connection.privmsg(channel, segment)
568                             except ValueError as err:
569                                 LOG.warning((
570                                     "irclib rejected a message to %s on %s "
571                                     "because: %s") % (
572                                     channel, self.target, str(err)))
573                                 LOG.debug(err.format_exc())
574                             time.sleep(ANTI_FLOOD_DELAY)
575                     self.last_xmit = self.channels_joined[channel] = time.time()
576                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
577                         self.target, time.asctime()))
578                     self.queue.task_done()
579                 elif self.status == "expired":
580                     print "We're expired but still running! This is a bug."
581                     break
582         except Exception, e:
583             LOG.error("exception %s in thread for %s" % (e, self.target))
584             # Maybe this should have its own status?
585             self.status = "expired"
586             LOG.debug(e.format_exc())
587         finally:
588             try:
589                 # Make sure we don't leave any zombies behind
590                 self.connection.close()
591             except:
592                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
593                 pass
594     def live(self):
595         "Should this connection not be scavenged?"
596         return self.status != "expired"
597     def joined_to(self, channel):
598         "Is this connection joined to the specified channel?"
599         return channel in self.channels_joined
600     def accepting(self, channel):
601         "Can this connection accept a join of this channel?"
602         if self.channel_limits:
603             match_count = 0
604             for already in self.channels_joined:
605                 # This obscure code is because the RFCs allow separate limits
606                 # by channel type (indicated by the first character of the name)
607                 # a feature that is almost never actually used.
608                 if already[0] == channel[0]:
609                     match_count += 1
610             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
611         else:
612             return len(self.channels_joined) < CHANNEL_MAX
613
614 class Target():
615     "Represent a transmission target."
616     def __init__(self, url):
617         self.url = url
618         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
619         url = url.replace("irc://", "http://")
620         parsed = urlparse.urlparse(url)
621         irchost, _, ircport = parsed.netloc.partition(':')
622         if not ircport:
623             ircport = 6667
624         self.servername = irchost
625         # IRC channel names are case-insensitive.  If we don't smash
626         # case here we may run into problems later. There was a bug
627         # observed on irc.rizon.net where an irkerd user specified #Channel,
628         # got kicked, and irkerd crashed because the server returned
629         # "#channel" in the notification that our kick handler saw.
630         self.channel = parsed.path.lstrip('/').lower()
631         # This deals with a tweak in recent versions of urlparse.
632         if parsed.fragment:
633             self.channel += "#" + parsed.fragment
634         isnick = self.channel.endswith(",isnick")
635         if isnick:
636             self.channel = self.channel[:-7]
637         if self.channel and not isnick and self.channel[0] not in "#&+":
638             self.channel = "#" + self.channel
639         # support both channel?secret and channel?key=secret
640         self.key = ""
641         if parsed.query:
642             self.key = re.sub("^key=", "", parsed.query)
643         self.port = int(ircport)
644
645     def __str__(self):
646         "Represent this instance as a string"
647         return self.servername or self.url or repr(self)
648
649     def validate(self):
650         "Raise InvalidRequest if the URL is missing a critical component"
651         if not self.servername:
652             raise InvalidRequest(
653                 'target URL missing a servername: %r' % self.url)
654         if not self.channel:
655             raise InvalidRequest(
656                 'target URL missing a channel: %r' % self.url)
657     def server(self):
658         "Return a hashable tuple representing the destination server."
659         return (self.servername, self.port)
660
661 class Dispatcher:
662     "Manage connections to a particular server-port combination."
663     def __init__(self, irker, **kwargs):
664         self.irker = irker
665         self.kwargs = kwargs
666         self.connections = []
667     def dispatch(self, channel, message, key, quit_after=False):
668         "Dispatch messages for our server-port combination."
669         # First, check if there is room for another channel
670         # on any of our existing connections.
671         connections = [x for x in self.connections if x.live()]
672         eligibles = [x for x in connections if x.joined_to(channel)] \
673                     or [x for x in connections if x.accepting(channel)]
674         if eligibles:
675             eligibles[0].enqueue(channel, message, key, quit_after)
676             return
677         # All connections are full up. Look for one old enough to be
678         # scavenged.
679         ancients = []
680         for connection in connections:
681             for (chan, age) in connections.channels_joined.items():
682                 if age < time.time() - CHANNEL_TTL:
683                     ancients.append((connection, chan, age))
684         if ancients:
685             ancients.sort(key=lambda x: x[2]) 
686             (found_connection, drop_channel, _drop_age) = ancients[0]
687             found_connection.part(drop_channel, "scavenged by irkerd")
688             del found_connection.channels_joined[drop_channel]
689             #time.sleep(ANTI_FLOOD_DELAY)
690             found_connection.enqueue(channel, message, key, quit_after)
691             return
692         # All existing channels had recent activity
693         newconn = Connection(self.irker, **self.kwargs)
694         self.connections.append(newconn)
695         newconn.enqueue(channel, message, key, quit_after)
696     def live(self):
697         "Does this server-port combination have any live connections?"
698         self.connections = [x for x in self.connections if x.live()]
699         return len(self.connections) > 0
700     def pending(self):
701         "Return all connections with pending traffic."
702         return [x for x in self.connections if not x.queue.empty()]
703     def last_xmit(self):
704         "Return the time of the most recent transmission."
705         return max(x.last_xmit for x in self.connections)
706
707 class Irker:
708     "Persistent IRC multiplexer."
709     def __init__(self, **kwargs):
710         self.kwargs = kwargs
711         self.irc = IRCClient()
712         self.irc.add_event_handler("ping", self._handle_ping)
713         self.irc.add_event_handler("welcome", self._handle_welcome)
714         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
715         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
716         self.irc.add_event_handler("nickcollision", self._handle_badnick)
717         self.irc.add_event_handler("unavailresource", self._handle_badnick)
718         self.irc.add_event_handler("featurelist", self._handle_features)
719         self.irc.add_event_handler("disconnect", self._handle_disconnect)
720         self.irc.add_event_handler("kick", self._handle_kick)
721         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
722         self.servers = {}
723     def thread_launch(self):
724         thread = threading.Thread(target=self.irc.spin)
725         thread.setDaemon(True)
726         self.irc._thread = thread
727         thread.start()
728     def _handle_ping(self, connection, _event):
729         "PING arrived, bump the last-received time for the connection."
730         if connection.context:
731             connection.context.handle_ping()
732     def _handle_welcome(self, connection, _event):
733         "Welcome arrived, nick accepted for this connection."
734         if connection.context:
735             connection.context.handle_welcome()
736     def _handle_badnick(self, connection, _event):
737         "Nick not accepted for this connection."
738         if connection.context:
739             connection.context.handle_badnick()
740     def _handle_features(self, connection, event):
741         "Determine if and how we can set deaf mode."
742         if connection.context:
743             cxt = connection.context
744             arguments = event.arguments
745             for lump in arguments:
746                 if lump.startswith("DEAF="):
747                     if not logfile:
748                         connection.mode(cxt.nickname(), "+"+lump[5:])
749                 elif lump.startswith("MAXCHANNELS="):
750                     m = int(lump[12:])
751                     for pref in "#&+":
752                         cxt.channel_limits[pref] = m
753                     LOG.info("%s maxchannels is %d" % (connection.server, m))
754                 elif lump.startswith("CHANLIMIT=#:"):
755                     limits = lump[10:].split(",")
756                     try:
757                         for token in limits:
758                             (prefixes, limit) = token.split(":")
759                             limit = int(limit)
760                             for c in prefixes:
761                                 cxt.channel_limits[c] = limit
762                         LOG.info("%s channel limit map is %s" % (
763                             connection.target, cxt.channel_limits))
764                     except ValueError:
765                         LOG.error("ill-formed CHANLIMIT property")
766     def _handle_disconnect(self, connection, _event):
767         "Server hung up the connection."
768         LOG.info("server %s disconnected" % connection.target)
769         connection.close()
770         if connection.context:
771             connection.context.handle_disconnect()
772     def _handle_kick(self, connection, event):
773         "Server hung up the connection."
774         target = event.target
775         LOG.info("irker has been kicked from %s on %s" % (
776             target, connection.target))
777         if connection.context:
778             connection.context.handle_kick(target)
779     def _handle_every_raw_message(self, _connection, event):
780         "Log all messages when in watcher mode."
781         if logfile:
782             with open(logfile, "a") as logfp:
783                 logfp.write("%03f|%s|%s\n" % \
784                              (time.time(), event.source, event.arguments[0]))
785     def pending(self):
786         "Do we have any pending message traffic?"
787         return [k for (k, v) in self.servers.items() if v.pending()]
788
789     def _parse_request(self, line):
790         "Request-parsing helper for the handle() method"
791         request = json.loads(line.strip())
792         if not isinstance(request, dict):
793             raise InvalidRequest(
794                 "request is not a JSON dictionary: %r" % request)
795         if "to" not in request or "privmsg" not in request:
796             raise InvalidRequest(
797                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
798         channels = request['to']
799         message = request['privmsg']
800         if not isinstance(channels, (list, basestring)):
801             raise InvalidRequest(
802                 "malformed request - unexpected channel type: %r" % channels)
803         if not isinstance(message, basestring):
804             raise InvalidRequest(
805                 "malformed request - unexpected message type: %r" % message)
806         if not isinstance(channels, list):
807             channels = [channels]
808         targets = []
809         for url in channels:
810             try:
811                 if not isinstance(url, basestring):
812                     raise InvalidRequest(
813                         "malformed request - URL has unexpected type: %r" %
814                         url)
815                 target = Target(url)
816                 target.validate()
817             except InvalidRequest, e:
818                 LOG.error(str(e))
819             else:
820                 targets.append(target)
821         return (targets, message)
822
823     def handle(self, line, quit_after=False):
824         "Perform a JSON relay request."
825         try:
826             targets, message = self._parse_request(line=line)
827             for target in targets:
828                 if target.server() not in self.servers:
829                     self.servers[target.server()] = Dispatcher(
830                         self, target=target, **self.kwargs)
831                 self.servers[target.server()].dispatch(
832                     target.channel, message, target.key, quit_after=quit_after)
833                 # GC dispatchers with no active connections
834                 servernames = self.servers.keys()
835                 for servername in servernames:
836                     if not self.servers[servername].live():
837                         del self.servers[servername]
838                     # If we might be pushing a resource limit even
839                     # after garbage collection, remove a session.  The
840                     # goal here is to head off DoS attacks that aim at
841                     # exhausting thread space or file descriptors.
842                     # The cost is that attempts to DoS this service
843                     # will cause lots of join/leave spam as we
844                     # scavenge old channels after connecting to new
845                     # ones. The particular method used for selecting a
846                     # session to be terminated doesn't matter much; we
847                     # choose the one longest idle on the assumption
848                     # that message activity is likely to be clumpy.
849                     if len(self.servers) >= CONNECTION_MAX:
850                         oldest = min(
851                             self.servers.keys(),
852                             key=lambda name: self.servers[name].last_xmit())
853                         del self.servers[oldest]
854         except InvalidRequest, e:
855             LOG.error(str(e))
856         except ValueError:
857             self.logerr("can't recognize JSON on input: %r" % line)
858         except RuntimeError:
859             self.logerr("wildly malformed JSON blew the parser stack.")
860
861 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
862     def handle(self):
863         while True:
864             line = self.rfile.readline()
865             if not line:
866                 break
867             irker.handle(line.strip())
868
869 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
870     def handle(self):
871         data = self.request[0].strip()
872         #socket = self.request[1]
873         irker.handle(data)
874
875 def usage():
876     sys.stdout.write("""
877 Usage:
878   irkerd [-d debuglevel] [-l logfile] [-n nick] [-p password] [-i channel message] [-V] [-h]
879
880 Options
881   -d    set debug level
882   -l    set logfile
883   -n    set nick-style
884   -p    set nickserv password
885   -i    immediate mode
886   -V    return irkerd version
887   -h    print this help dialog
888 """)
889
890 if __name__ == '__main__':
891     log_level = None
892     immediate = None
893     namestyle = "irker%03d"
894     password = None
895     logfile = None
896     try:
897         (options, arguments) = getopt.getopt(sys.argv[1:], "d:i:l:n:p:Vh")
898     except getopt.GetoptError as e:
899         sys.stderr.write("%s" % e)
900         usage()
901         sys.exit(1)
902     for (opt, val) in options:
903         if opt == '-d': # Enable debug/progress messages
904             if val.lower() not in LOG_LEVELS:
905                 sys.stderr.write('invalid log level %r (choices: %s)\n' % (
906                     val, ', '.join(LOG_LEVELS)))
907                 sys.exit(1)
908             log_level = getattr(logging, val.upper())
909         elif opt == '-i':       # Immediate mode - send one message, then exit. 
910             immediate = val
911         elif opt == '-l':       # Logfile mode - report traffic read in
912             logfile = val
913         elif opt == '-n':       # Force the nick
914             namestyle = val
915         elif opt == '-p':       # Set a nickserv password
916             password = val
917         elif opt == '-V':       # Emit version and exit
918             sys.stdout.write("irkerd version %s\n" % version)
919             sys.exit(0)
920         elif opt == '-h':
921             usage()
922             sys.exit(0)
923
924     handler = logging.StreamHandler()
925     LOG.addHandler(handler)
926     if log_level:
927         LOG.setLevel(log_level)
928
929     irker = Irker(
930         nick_needs_number=re.search("%.*d", namestyle),
931         )
932     LOG.info("irkerd version %s" % version)
933     if immediate:
934         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
935         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
936         irker.irc.spin()
937     else:
938         irker.thread_launch()
939         try:
940             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
941             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
942             for server in [tcpserver, udpserver]:
943                 server = threading.Thread(target=server.serve_forever)
944                 server.setDaemon(True)
945                 server.start()
946             try:
947                 signal.pause()
948             except KeyboardInterrupt:
949                 raise SystemExit(1)
950         except socket.error, e:
951             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
952
953 # end