irkerd: Transition from getopt to argparse
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
16 resource page at <http://www.catb.org/~esr/irker/>.
17
18 Requires Python 2.7, or:
19 * 2.6 with the argparse package installed.
20 * 2.5 with the argparse and simplejson packages installed.
21 """
22
23 from __future__ import with_statement
24
25 # These things might need tuning
26
27 HOST = "localhost"
28 PORT = 6659
29
30 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
31 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
32 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
33 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
34 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
35 UNSEEN_TTL = 60                 # Time to live, seconds since first request
36 CHANNEL_MAX = 18                # Max channels open per socket (default)
37 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
38 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
39 CONNECTION_MAX = 200            # To avoid hitting a thread limit
40
41 # No user-serviceable parts below this line
42
43 version = "2.6"
44
45 import Queue
46 import SocketServer
47 import argparse
48 import logging
49 try:
50     import simplejson as json   # Faster, also makes us Python-2.4-compatible
51 except ImportError:
52     import json
53 import random
54 import re
55 import select
56 import signal
57 import socket
58 import sys
59 import threading
60 import time
61 import urlparse
62
63
64 LOG = logging.getLogger(__name__)
65 LOG.setLevel(logging.ERROR)
66 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
67
68
69 # Sketch of implementation:
70 #
71 # One Irker object manages multiple IRC sessions.  It holds a map of
72 # Dispatcher objects, one per (server, port) combination, which are
73 # responsible for routing messages to one of any number of Connection
74 # objects that do the actual socket conversations.  The reason for the
75 # Dispatcher layer is that IRC daemons limit the number of channels a
76 # client (that is, from the daemon's point of view, a socket) can be
77 # joined to, so each session to a server needs a flock of Connection
78 # instances each with its own socket.
79 #
80 # Connections are timed out and removed when either they haven't seen a
81 # PING for a while (indicating that the server may be stalled or down)
82 # or there has been no message traffic to them for a while, or
83 # even if the queue is nonempty but efforts to connect have failed for
84 # a long time.
85 #
86 # There are multiple threads. One accepts incoming traffic from all
87 # servers.  Each Connection also has a consumer thread and a
88 # thread-safe message queue.  The program main appends messages to
89 # queues as JSON requests are received; the consumer threads try to
90 # ship them to servers.  When a socket write stalls, it only blocks an
91 # individual consumer thread; if it stalls long enough, the session
92 # will be timed out. This solves the biggest problem with a
93 # single-threaded implementation, which is that you can't count on a
94 # single stalled write not hanging all other traffic - you're at the
95 # mercy of the length of the buffers in the TCP/IP layer.
96 #
97 # Message delivery is thus not reliable in the face of network stalls,
98 # but this was considered acceptable because IRC (notoriously) has the
99 # same problem - there is little point in reliable delivery to a relay
100 # that is down or unreliable.
101 #
102 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
103 # It is strictly compliant to RFC1459, except for the interpretation and
104 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
105 #
106 # CHANLIMIT is as described in the Internet RFC draft
107 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
108 # The ",isnick" feature is as described in
109 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
110
111 # Historical note: the IRCClient and IRCServerConnection classes
112 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
113 # irclib code that irker formerly used as a service library.  They
114 # still look similar to parts of irclib because I contributed to that
115 # code before giving up on it.
116
117 class IRCError(Exception):
118     "An IRC exception"
119     pass
120
121
122 class InvalidRequest (ValueError):
123     "An invalid JSON request"
124     pass
125
126
127 class IRCClient():
128     "An IRC client session to one or more servers."
129     def __init__(self):
130         self.mutex = threading.RLock()
131         self.server_connections = []
132         self.event_handlers = {}
133         self.add_event_handler("ping",
134                                lambda c, e: c.ship("PONG %s" % e.target))
135
136     def newserver(self):
137         "Initialize a new server-connection object."
138         conn = IRCServerConnection(self)
139         with self.mutex:
140             self.server_connections.append(conn)
141         return conn
142
143     def spin(self, timeout=0.2):
144         "Spin processing data from connections forever."
145         # Outer loop should specifically *not* be mutex-locked.
146         # Otherwise no other thread would ever be able to change
147         # the shared state of an IRC object running this function.
148         while True:
149             nextsleep = 0
150             with self.mutex:
151                 connected = [x for x in self.server_connections
152                              if x is not None and x.socket is not None]
153                 sockets = [x.socket for x in connected]
154                 if sockets:
155                     connmap = dict([(c.socket.fileno(), c) for c in connected])
156                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
157                     for s in insocks:
158                         connmap[s.fileno()].consume()
159                 else:
160                     nextsleep = timeout
161             time.sleep(nextsleep)
162
163     def add_event_handler(self, event, handler):
164         "Set a handler to be called later."
165         with self.mutex:
166             event_handlers = self.event_handlers.setdefault(event, [])
167             event_handlers.append(handler)
168
169     def handle_event(self, connection, event):
170         with self.mutex:
171             h = self.event_handlers
172             th = sorted(h.get("all_events", []) + h.get(event.type, []))
173             for handler in th:
174                 handler(connection, event)
175
176     def drop_connection(self, connection):
177         with self.mutex:
178             self.server_connections.remove(connection)
179
180
181 class LineBufferedStream():
182     "Line-buffer a read stream."
183     crlf_re = re.compile(b'\r?\n')
184
185     def __init__(self):
186         self.buffer = ''
187
188     def append(self, newbytes):
189         self.buffer += newbytes
190
191     def lines(self):
192         "Iterate over lines in the buffer."
193         lines = LineBufferedStream.crlf_re.split(self.buffer)
194         self.buffer = lines.pop()
195         return iter(lines)
196
197     def __iter__(self):
198         return self.lines()
199
200 class IRCServerConnectionError(IRCError):
201     pass
202
203 class IRCServerConnection():
204     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
205     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
206     # We only need to ensure that if some ancient server throws numerics
207     # for the ones we actually want to catch, they're mapped.
208     codemap = {
209         "001": "welcome",
210         "005": "featurelist",
211         "432": "erroneusnickname",
212         "433": "nicknameinuse",
213         "436": "nickcollision",
214         "437": "unavailresource",
215     }
216
217     def __init__(self, master):
218         self.master = master
219         self.socket = None
220
221     def connect(self, target, nickname,
222                 password=None, username=None, ircname=None):
223         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
224             target.servername, target.port, nickname))
225         if self.socket is not None:
226             self.disconnect("Changing servers")
227
228         self.buffer = LineBufferedStream()
229         self.event_handlers = {}
230         self.real_server_name = ""
231         self.target = target
232         self.nickname = nickname
233         try:
234             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
235             self.socket.bind(('', 0))
236             self.socket.connect((target.servername, target.port))
237         except socket.error as err:
238             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
239
240         if password:
241             self.ship("PASS " + password)
242         self.nick(self.nickname)
243         self.user(username=username or ircname, realname=ircname or nickname)
244         return self
245
246     def close(self):
247         # Without this thread lock, there is a window during which
248         # select() can find a closed socket, leading to an EBADF error.
249         with self.master.mutex:
250             self.disconnect("Closing object")
251             self.master.drop_connection(self)
252
253     def consume(self):
254         try:
255             incoming = self.socket.recv(16384)
256         except socket.error:
257             # Server hung up on us.
258             self.disconnect("Connection reset by peer")
259             return
260         if not incoming:
261             # Dead air also indicates a connection reset.
262             self.disconnect("Connection reset by peer")
263             return
264
265         self.buffer.append(incoming)
266
267         for line in self.buffer:
268             LOG.debug("FROM: %s" % line)
269
270             if not line:
271                 continue
272
273             prefix = None
274             command = None
275             arguments = None
276             self.handle_event(Event("every_raw_message",
277                                      self.real_server_name,
278                                      None,
279                                      [line]))
280
281             m = IRCServerConnection.command_re.match(line)
282             if m.group("prefix"):
283                 prefix = m.group("prefix")
284                 if not self.real_server_name:
285                     self.real_server_name = prefix
286             if m.group("command"):
287                 command = m.group("command").lower()
288             if m.group("argument"):
289                 a = m.group("argument").split(" :", 1)
290                 arguments = a[0].split()
291                 if len(a) == 2:
292                     arguments.append(a[1])
293
294             command = IRCServerConnection.codemap.get(command, command)
295             if command in ["privmsg", "notice"]:
296                 target = arguments.pop(0)
297             else:
298                 target = None
299
300                 if command == "quit":
301                     arguments = [arguments[0]]
302                 elif command == "ping":
303                     target = arguments[0]
304                 else:
305                     target = arguments[0]
306                     arguments = arguments[1:]
307
308             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
309                 command, prefix, target, arguments))
310             self.handle_event(Event(command, prefix, target, arguments))
311
312     def handle_event(self, event):
313         self.master.handle_event(self, event)
314         if event.type in self.event_handlers:
315             for fn in self.event_handlers[event.type]:
316                 fn(self, event)
317
318     def is_connected(self):
319         return self.socket is not None
320
321     def disconnect(self, message=""):
322         if self.socket is None:
323             return
324         # Don't send a QUIT here - causes infinite loop!
325         try:
326             self.socket.shutdown(socket.SHUT_WR)
327             self.socket.close()
328         except socket.error:
329             pass
330         del self.socket
331         self.socket = None
332         self.handle_event(
333             Event("disconnect", self.target.server, "", [message]))
334
335     def join(self, channel, key=""):
336         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
337
338     def mode(self, target, command):
339         self.ship("MODE %s %s" % (target, command))
340
341     def nick(self, newnick):
342         self.ship("NICK " + newnick)
343
344     def part(self, channel, message=""):
345         cmd_parts = ['PART', channel]
346         if message:
347             cmd_parts.append(message)
348         self.ship(' '.join(cmd_parts))
349
350     def privmsg(self, target, text):
351         self.ship("PRIVMSG %s :%s" % (target, text))
352
353     def quit(self, message=""):
354         self.ship("QUIT" + (message and (" :" + message)))
355
356     def user(self, username, realname):
357         self.ship("USER %s 0 * :%s" % (username, realname))
358
359     def ship(self, string):
360         "Ship a command to the server, appending CR/LF"
361         try:
362             self.socket.send(string.encode('utf-8') + b'\r\n')
363             LOG.debug("TO: %s" % string)
364         except socket.error:
365             self.disconnect("Connection reset by peer.")
366
367 class Event(object):
368     def __init__(self, evtype, source, target, arguments=None):
369         self.type = evtype
370         self.source = source
371         self.target = target
372         if arguments is None:
373             arguments = []
374         self.arguments = arguments
375
376 def is_channel(string):
377     return string and string[0] in "#&+!"
378
379 class Connection:
380     def __init__(self, irker, target, nick_template, nick_needs_number=False,
381                  password=None, **kwargs):
382         self.irker = irker
383         self.target = target
384         self.nick_template = nick_template
385         self.nick_needs_number = nick_needs_number
386         self.password = password
387         self.kwargs = kwargs
388         self.nick_trial = None
389         self.connection = None
390         self.status = None
391         self.last_xmit = time.time()
392         self.last_ping = time.time()
393         self.channels_joined = {}
394         self.channel_limits = {}
395         # The consumer thread
396         self.queue = Queue.Queue()
397         self.thread = None
398     def nickname(self, n=None):
399         "Return a name for the nth server connection."
400         if n is None:
401             n = self.nick_trial
402         if self.nick_needs_number:
403             return (self.nick_template % n)
404         else:
405             return self.nick_template
406     def handle_ping(self):
407         "Register the fact that the server has pinged this connection."
408         self.last_ping = time.time()
409     def handle_welcome(self):
410         "The server says we're OK, with a non-conflicting nick."
411         self.status = "ready"
412         LOG.info("nick %s accepted" % self.nickname())
413         if self.password:
414             self.connection.privmsg("nickserv", "identify %s" % self.password)
415     def handle_badnick(self):
416         "The server says our nick is ill-formed or has a conflict."
417         LOG.info("nick %s rejected" % self.nickname())
418         if self.nick_needs_number:
419             # Randomness prevents a malicious user or bot from
420             # anticipating the next trial name in order to block us
421             # from completing the handshake.
422             self.nick_trial += random.randint(1, 3)
423             self.last_xmit = time.time()
424             self.connection.nick(self.nickname())
425         # Otherwise fall through, it might be possible to
426         # recover manually.
427     def handle_disconnect(self):
428         "Server disconnected us for flooding or some other reason."
429         self.connection = None
430         if self.status != "expired":
431             self.status = "disconnected"
432     def handle_kick(self, outof):
433         "We've been kicked."
434         self.status = "handshaking"
435         try:
436             del self.channels_joined[outof]
437         except KeyError:
438             LOG.error("kicked by %s from %s that's not joined" % (
439                 self.target, outof))
440         qcopy = []
441         while not self.queue.empty():
442             (channel, message, key) = self.queue.get()
443             if channel != outof:
444                 qcopy.append((channel, message, key))
445         for (channel, message, key) in qcopy:
446             self.queue.put((channel, message, key))
447         self.status = "ready"
448     def enqueue(self, channel, message, key, quit_after=False):
449         "Enque a message for transmission."
450         if self.thread is None or not self.thread.is_alive():
451             self.status = "unseen"
452             self.thread = threading.Thread(target=self.dequeue)
453             self.thread.setDaemon(True)
454             self.thread.start()
455         self.queue.put((channel, message, key))
456         if quit_after:
457             self.queue.put((channel, None, key))
458     def dequeue(self):
459         "Try to ship pending messages from the queue."
460         try:
461             while True:
462                 # We want to be kind to the IRC servers and not hold unused
463                 # sockets open forever, so they have a time-to-live.  The
464                 # loop is coded this particular way so that we can drop
465                 # the actual server connection when its time-to-live
466                 # expires, then reconnect and resume transmission if the
467                 # queue fills up again.
468                 if self.queue.empty():
469                     # Queue is empty, at some point we want to time out
470                     # the connection rather than holding a socket open in
471                     # the server forever.
472                     now = time.time()
473                     xmit_timeout = now > self.last_xmit + XMIT_TTL
474                     ping_timeout = now > self.last_ping + PING_TTL
475                     if self.status == "disconnected":
476                         # If the queue is empty, we can drop this connection.
477                         self.status = "expired"
478                         break
479                     elif xmit_timeout or ping_timeout:
480                         LOG.info((
481                             "timing out connection to %s at %s "
482                             "(ping_timeout=%s, xmit_timeout=%s)") % (
483                             self.target, time.asctime(), ping_timeout,
484                             xmit_timeout))
485                         with self.irker.irc.mutex:
486                             self.connection.context = None
487                             self.connection.quit("transmission timeout")
488                             self.connection = None
489                         self.status = "disconnected"
490                     else:
491                         # Prevent this thread from hogging the CPU by pausing
492                         # for just a little bit after the queue-empty check.
493                         # As long as this is less that the duration of a human
494                         # reflex arc it is highly unlikely any human will ever
495                         # notice.
496                         time.sleep(ANTI_BUZZ_DELAY)
497                 elif self.status == "disconnected" \
498                          and time.time() > self.last_xmit + DISCONNECT_TTL:
499                     # Queue is nonempty, but the IRC server might be
500                     # down. Letting failed connections retain queue
501                     # space forever would be a memory leak.
502                     self.status = "expired"
503                     break
504                 elif not self.connection and self.status != "expired":
505                     # Queue is nonempty but server isn't connected.
506                     with self.irker.irc.mutex:
507                         self.connection = self.irker.irc.newserver()
508                         self.connection.context = self
509                         # Try to avoid colliding with other instances
510                         self.nick_trial = random.randint(1, 990)
511                         self.channels_joined = {}
512                         try:
513                             # This will throw
514                             # IRCServerConnectionError on failure
515                             self.connection.connect(
516                                 target=self.target,
517                                 nickname=self.nickname(),
518                                 username="irker",
519                                 ircname="irker relaying client",
520                                 **self.kwargs)
521                             self.status = "handshaking"
522                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
523                                 self.target, time.asctime()))
524                             self.last_xmit = time.time()
525                             self.last_ping = time.time()
526                         except IRCServerConnectionError:
527                             self.status = "expired"
528                             break
529                 elif self.status == "handshaking":
530                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
531                         self.status = "expired"
532                         break
533                     else:
534                         # Don't buzz on the empty-queue test while we're
535                         # handshaking
536                         time.sleep(ANTI_BUZZ_DELAY)
537                 elif self.status == "unseen" \
538                          and time.time() > self.last_xmit + UNSEEN_TTL:
539                     # Nasty people could attempt a denial-of-service
540                     # attack by flooding us with requests with invalid
541                     # servernames. We guard against this by rapidly
542                     # expiring connections that have a nonempty queue but
543                     # have never had a successful open.
544                     self.status = "expired"
545                     break
546                 elif self.status == "ready":
547                     (channel, message, key) = self.queue.get()
548                     if channel not in self.channels_joined:
549                         self.connection.join(channel, key=key)
550                         LOG.info("joining %s on %s." % (channel, self.target))
551                     # None is magic - it's a request to quit the server
552                     if message is None:
553                         self.connection.quit()
554                     # An empty message might be used as a keepalive or
555                     # to join a channel for logging, so suppress the
556                     # privmsg send unless there is actual traffic.
557                     elif message:
558                         for segment in message.split("\n"):
559                             # Truncate the message if it's too long,
560                             # but we're working with characters here,
561                             # not bytes, so we could be off.
562                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
563                             maxlength = 500 - len(channel)
564                             if len(segment) > maxlength:
565                                 segment = segment[:maxlength]
566                             try:
567                                 self.connection.privmsg(channel, segment)
568                             except ValueError as err:
569                                 LOG.warning((
570                                     "irclib rejected a message to %s on %s "
571                                     "because: %s") % (
572                                     channel, self.target, str(err)))
573                                 LOG.debug(err.format_exc())
574                             time.sleep(ANTI_FLOOD_DELAY)
575                     self.last_xmit = self.channels_joined[channel] = time.time()
576                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
577                         self.target, time.asctime()))
578                     self.queue.task_done()
579                 elif self.status == "expired":
580                     print "We're expired but still running! This is a bug."
581                     break
582         except Exception, e:
583             LOG.error("exception %s in thread for %s" % (e, self.target))
584             # Maybe this should have its own status?
585             self.status = "expired"
586             LOG.debug(e.format_exc())
587         finally:
588             try:
589                 # Make sure we don't leave any zombies behind
590                 self.connection.close()
591             except:
592                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
593                 pass
594     def live(self):
595         "Should this connection not be scavenged?"
596         return self.status != "expired"
597     def joined_to(self, channel):
598         "Is this connection joined to the specified channel?"
599         return channel in self.channels_joined
600     def accepting(self, channel):
601         "Can this connection accept a join of this channel?"
602         if self.channel_limits:
603             match_count = 0
604             for already in self.channels_joined:
605                 # This obscure code is because the RFCs allow separate limits
606                 # by channel type (indicated by the first character of the name)
607                 # a feature that is almost never actually used.
608                 if already[0] == channel[0]:
609                     match_count += 1
610             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
611         else:
612             return len(self.channels_joined) < CHANNEL_MAX
613
614 class Target():
615     "Represent a transmission target."
616     def __init__(self, url):
617         self.url = url
618         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
619         url = url.replace("irc://", "http://")
620         parsed = urlparse.urlparse(url)
621         irchost, _, ircport = parsed.netloc.partition(':')
622         if not ircport:
623             ircport = 6667
624         self.servername = irchost
625         # IRC channel names are case-insensitive.  If we don't smash
626         # case here we may run into problems later. There was a bug
627         # observed on irc.rizon.net where an irkerd user specified #Channel,
628         # got kicked, and irkerd crashed because the server returned
629         # "#channel" in the notification that our kick handler saw.
630         self.channel = parsed.path.lstrip('/').lower()
631         # This deals with a tweak in recent versions of urlparse.
632         if parsed.fragment:
633             self.channel += "#" + parsed.fragment
634         isnick = self.channel.endswith(",isnick")
635         if isnick:
636             self.channel = self.channel[:-7]
637         if self.channel and not isnick and self.channel[0] not in "#&+":
638             self.channel = "#" + self.channel
639         # support both channel?secret and channel?key=secret
640         self.key = ""
641         if parsed.query:
642             self.key = re.sub("^key=", "", parsed.query)
643         self.port = int(ircport)
644
645     def __str__(self):
646         "Represent this instance as a string"
647         return self.servername or self.url or repr(self)
648
649     def validate(self):
650         "Raise InvalidRequest if the URL is missing a critical component"
651         if not self.servername:
652             raise InvalidRequest(
653                 'target URL missing a servername: %r' % self.url)
654         if not self.channel:
655             raise InvalidRequest(
656                 'target URL missing a channel: %r' % self.url)
657     def server(self):
658         "Return a hashable tuple representing the destination server."
659         return (self.servername, self.port)
660
661 class Dispatcher:
662     "Manage connections to a particular server-port combination."
663     def __init__(self, irker, **kwargs):
664         self.irker = irker
665         self.kwargs = kwargs
666         self.connections = []
667     def dispatch(self, channel, message, key, quit_after=False):
668         "Dispatch messages for our server-port combination."
669         # First, check if there is room for another channel
670         # on any of our existing connections.
671         connections = [x for x in self.connections if x.live()]
672         eligibles = [x for x in connections if x.joined_to(channel)] \
673                     or [x for x in connections if x.accepting(channel)]
674         if eligibles:
675             eligibles[0].enqueue(channel, message, key, quit_after)
676             return
677         # All connections are full up. Look for one old enough to be
678         # scavenged.
679         ancients = []
680         for connection in connections:
681             for (chan, age) in connections.channels_joined.items():
682                 if age < time.time() - CHANNEL_TTL:
683                     ancients.append((connection, chan, age))
684         if ancients:
685             ancients.sort(key=lambda x: x[2]) 
686             (found_connection, drop_channel, _drop_age) = ancients[0]
687             found_connection.part(drop_channel, "scavenged by irkerd")
688             del found_connection.channels_joined[drop_channel]
689             #time.sleep(ANTI_FLOOD_DELAY)
690             found_connection.enqueue(channel, message, key, quit_after)
691             return
692         # All existing channels had recent activity
693         newconn = Connection(self.irker, **self.kwargs)
694         self.connections.append(newconn)
695         newconn.enqueue(channel, message, key, quit_after)
696     def live(self):
697         "Does this server-port combination have any live connections?"
698         self.connections = [x for x in self.connections if x.live()]
699         return len(self.connections) > 0
700     def pending(self):
701         "Return all connections with pending traffic."
702         return [x for x in self.connections if not x.queue.empty()]
703     def last_xmit(self):
704         "Return the time of the most recent transmission."
705         return max(x.last_xmit for x in self.connections)
706
707 class Irker:
708     "Persistent IRC multiplexer."
709     def __init__(self, logfile=None, **kwargs):
710         self.logfile = logfile
711         self.kwargs = kwargs
712         self.irc = IRCClient()
713         self.irc.add_event_handler("ping", self._handle_ping)
714         self.irc.add_event_handler("welcome", self._handle_welcome)
715         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
716         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
717         self.irc.add_event_handler("nickcollision", self._handle_badnick)
718         self.irc.add_event_handler("unavailresource", self._handle_badnick)
719         self.irc.add_event_handler("featurelist", self._handle_features)
720         self.irc.add_event_handler("disconnect", self._handle_disconnect)
721         self.irc.add_event_handler("kick", self._handle_kick)
722         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
723         self.servers = {}
724     def thread_launch(self):
725         thread = threading.Thread(target=self.irc.spin)
726         thread.setDaemon(True)
727         self.irc._thread = thread
728         thread.start()
729     def _handle_ping(self, connection, _event):
730         "PING arrived, bump the last-received time for the connection."
731         if connection.context:
732             connection.context.handle_ping()
733     def _handle_welcome(self, connection, _event):
734         "Welcome arrived, nick accepted for this connection."
735         if connection.context:
736             connection.context.handle_welcome()
737     def _handle_badnick(self, connection, _event):
738         "Nick not accepted for this connection."
739         if connection.context:
740             connection.context.handle_badnick()
741     def _handle_features(self, connection, event):
742         "Determine if and how we can set deaf mode."
743         if connection.context:
744             cxt = connection.context
745             arguments = event.arguments
746             for lump in arguments:
747                 if lump.startswith("DEAF="):
748                     if not self.logfile:
749                         connection.mode(cxt.nickname(), "+"+lump[5:])
750                 elif lump.startswith("MAXCHANNELS="):
751                     m = int(lump[12:])
752                     for pref in "#&+":
753                         cxt.channel_limits[pref] = m
754                     LOG.info("%s maxchannels is %d" % (connection.server, m))
755                 elif lump.startswith("CHANLIMIT=#:"):
756                     limits = lump[10:].split(",")
757                     try:
758                         for token in limits:
759                             (prefixes, limit) = token.split(":")
760                             limit = int(limit)
761                             for c in prefixes:
762                                 cxt.channel_limits[c] = limit
763                         LOG.info("%s channel limit map is %s" % (
764                             connection.target, cxt.channel_limits))
765                     except ValueError:
766                         LOG.error("ill-formed CHANLIMIT property")
767     def _handle_disconnect(self, connection, _event):
768         "Server hung up the connection."
769         LOG.info("server %s disconnected" % connection.target)
770         connection.close()
771         if connection.context:
772             connection.context.handle_disconnect()
773     def _handle_kick(self, connection, event):
774         "Server hung up the connection."
775         target = event.target
776         LOG.info("irker has been kicked from %s on %s" % (
777             target, connection.target))
778         if connection.context:
779             connection.context.handle_kick(target)
780     def _handle_every_raw_message(self, _connection, event):
781         "Log all messages when in watcher mode."
782         if self.logfile:
783             with open(self.logfile, "a") as logfp:
784                 logfp.write("%03f|%s|%s\n" % \
785                              (time.time(), event.source, event.arguments[0]))
786     def pending(self):
787         "Do we have any pending message traffic?"
788         return [k for (k, v) in self.servers.items() if v.pending()]
789
790     def _parse_request(self, line):
791         "Request-parsing helper for the handle() method"
792         request = json.loads(line.strip())
793         if not isinstance(request, dict):
794             raise InvalidRequest(
795                 "request is not a JSON dictionary: %r" % request)
796         if "to" not in request or "privmsg" not in request:
797             raise InvalidRequest(
798                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
799         channels = request['to']
800         message = request['privmsg']
801         if not isinstance(channels, (list, basestring)):
802             raise InvalidRequest(
803                 "malformed request - unexpected channel type: %r" % channels)
804         if not isinstance(message, basestring):
805             raise InvalidRequest(
806                 "malformed request - unexpected message type: %r" % message)
807         if not isinstance(channels, list):
808             channels = [channels]
809         targets = []
810         for url in channels:
811             try:
812                 if not isinstance(url, basestring):
813                     raise InvalidRequest(
814                         "malformed request - URL has unexpected type: %r" %
815                         url)
816                 target = Target(url)
817                 target.validate()
818             except InvalidRequest, e:
819                 LOG.error(str(e))
820             else:
821                 targets.append(target)
822         return (targets, message)
823
824     def handle(self, line, quit_after=False):
825         "Perform a JSON relay request."
826         try:
827             targets, message = self._parse_request(line=line)
828             for target in targets:
829                 if target.server() not in self.servers:
830                     self.servers[target.server()] = Dispatcher(
831                         self, target=target, **self.kwargs)
832                 self.servers[target.server()].dispatch(
833                     target.channel, message, target.key, quit_after=quit_after)
834                 # GC dispatchers with no active connections
835                 servernames = self.servers.keys()
836                 for servername in servernames:
837                     if not self.servers[servername].live():
838                         del self.servers[servername]
839                     # If we might be pushing a resource limit even
840                     # after garbage collection, remove a session.  The
841                     # goal here is to head off DoS attacks that aim at
842                     # exhausting thread space or file descriptors.
843                     # The cost is that attempts to DoS this service
844                     # will cause lots of join/leave spam as we
845                     # scavenge old channels after connecting to new
846                     # ones. The particular method used for selecting a
847                     # session to be terminated doesn't matter much; we
848                     # choose the one longest idle on the assumption
849                     # that message activity is likely to be clumpy.
850                     if len(self.servers) >= CONNECTION_MAX:
851                         oldest = min(
852                             self.servers.keys(),
853                             key=lambda name: self.servers[name].last_xmit())
854                         del self.servers[oldest]
855         except InvalidRequest, e:
856             LOG.error(str(e))
857         except ValueError:
858             self.logerr("can't recognize JSON on input: %r" % line)
859         except RuntimeError:
860             self.logerr("wildly malformed JSON blew the parser stack.")
861
862 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
863     def handle(self):
864         while True:
865             line = self.rfile.readline()
866             if not line:
867                 break
868             irker.handle(line.strip())
869
870 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
871     def handle(self):
872         data = self.request[0].strip()
873         #socket = self.request[1]
874         irker.handle(data)
875
876
877 if __name__ == '__main__':
878     parser = argparse.ArgumentParser(
879         description=__doc__.strip().splitlines()[0])
880     parser.add_argument(
881         '-d', '--log-level', metavar='LEVEL', choices=LOG_LEVELS,
882         help='file of trusted certificates for SSL/TLS')
883     parser.add_argument(
884         '-l', '--log-file', metavar='PATH',
885         help='file for saving captured message traffic')
886     parser.add_argument(
887         '-n', '--nick', metavar='NAME', default='irker%03d',
888         help="nickname (optionally with a '%%.*d' server connection marker)")
889     parser.add_argument(
890         '-p', '--password', metavar='PASSWORD',
891         help='NickServ password')
892     parser.add_argument(
893         '-i', '--immediate', action='store_const', const=True,
894         help='disconnect after sending each message')
895     parser.add_argument(
896         '-V', '--version', action='version',
897         version='%(prog)s {0}'.format(version))
898     args = parser.parse_args()
899
900     handler = logging.StreamHandler()
901     LOG.addHandler(handler)
902     if args.log_level:
903         log_level = getattr(logging, args.log_level.upper())
904         LOG.setLevel(log_level)
905
906     irker = Irker(
907         logfile=args.log_file,
908         nick_template=args.nick,
909         nick_needs_number=re.search('%.*d', args.nick),
910         password=args.password,
911         )
912     LOG.info("irkerd version %s" % version)
913     if args.immediate:
914         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
915         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
916         irker.irc.spin()
917     else:
918         irker.thread_launch()
919         try:
920             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
921             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
922             for server in [tcpserver, udpserver]:
923                 server = threading.Thread(target=server.serve_forever)
924                 server.setDaemon(True)
925                 server.start()
926             try:
927                 signal.pause()
928             except KeyboardInterrupt:
929                 raise SystemExit(1)
930         except socket.error, e:
931             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
932
933 # end