irkerd: Replace 'password' global with local Connection.password
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). -l sets a logfile to capture message traffic from
17 channels.  -n sets the nick and -p the nickserv password. The -V
18 option prints the program version and exits.
19
20 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
21 resource page at <http://www.catb.org/~esr/irker/>.
22
23 Requires Python 2.6 or 2.5 with the simplejson library installed.
24 """
25
26 from __future__ import with_statement
27
28 # These things might need tuning
29
30 HOST = "localhost"
31 PORT = 6659
32
33 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
34 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
35 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
36 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
37 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
38 UNSEEN_TTL = 60                 # Time to live, seconds since first request
39 CHANNEL_MAX = 18                # Max channels open per socket (default)
40 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
41 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
42 CONNECTION_MAX = 200            # To avoid hitting a thread limit
43
44 # No user-serviceable parts below this line
45
46 version = "2.6"
47
48 import Queue
49 import SocketServer
50 import getopt
51 import logging
52 try:
53     import simplejson as json   # Faster, also makes us Python-2.4-compatible
54 except ImportError:
55     import json
56 import random
57 import re
58 import select
59 import signal
60 import socket
61 import sys
62 import threading
63 import time
64 import urlparse
65
66
67 LOG = logging.getLogger(__name__)
68 LOG.setLevel(logging.ERROR)
69 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
70
71
72 # Sketch of implementation:
73 #
74 # One Irker object manages multiple IRC sessions.  It holds a map of
75 # Dispatcher objects, one per (server, port) combination, which are
76 # responsible for routing messages to one of any number of Connection
77 # objects that do the actual socket conversations.  The reason for the
78 # Dispatcher layer is that IRC daemons limit the number of channels a
79 # client (that is, from the daemon's point of view, a socket) can be
80 # joined to, so each session to a server needs a flock of Connection
81 # instances each with its own socket.
82 #
83 # Connections are timed out and removed when either they haven't seen a
84 # PING for a while (indicating that the server may be stalled or down)
85 # or there has been no message traffic to them for a while, or
86 # even if the queue is nonempty but efforts to connect have failed for
87 # a long time.
88 #
89 # There are multiple threads. One accepts incoming traffic from all
90 # servers.  Each Connection also has a consumer thread and a
91 # thread-safe message queue.  The program main appends messages to
92 # queues as JSON requests are received; the consumer threads try to
93 # ship them to servers.  When a socket write stalls, it only blocks an
94 # individual consumer thread; if it stalls long enough, the session
95 # will be timed out. This solves the biggest problem with a
96 # single-threaded implementation, which is that you can't count on a
97 # single stalled write not hanging all other traffic - you're at the
98 # mercy of the length of the buffers in the TCP/IP layer.
99 #
100 # Message delivery is thus not reliable in the face of network stalls,
101 # but this was considered acceptable because IRC (notoriously) has the
102 # same problem - there is little point in reliable delivery to a relay
103 # that is down or unreliable.
104 #
105 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
106 # It is strictly compliant to RFC1459, except for the interpretation and
107 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
108 #
109 # CHANLIMIT is as described in the Internet RFC draft
110 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
111 # The ",isnick" feature is as described in
112 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
113
114 # Historical note: the IRCClient and IRCServerConnection classes
115 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
116 # irclib code that irker formerly used as a service library.  They
117 # still look similar to parts of irclib because I contributed to that
118 # code before giving up on it.
119
120 class IRCError(Exception):
121     "An IRC exception"
122     pass
123
124
125 class InvalidRequest (ValueError):
126     "An invalid JSON request"
127     pass
128
129
130 class IRCClient():
131     "An IRC client session to one or more servers."
132     def __init__(self):
133         self.mutex = threading.RLock()
134         self.server_connections = []
135         self.event_handlers = {}
136         self.add_event_handler("ping",
137                                lambda c, e: c.ship("PONG %s" % e.target))
138
139     def newserver(self):
140         "Initialize a new server-connection object."
141         conn = IRCServerConnection(self)
142         with self.mutex:
143             self.server_connections.append(conn)
144         return conn
145
146     def spin(self, timeout=0.2):
147         "Spin processing data from connections forever."
148         # Outer loop should specifically *not* be mutex-locked.
149         # Otherwise no other thread would ever be able to change
150         # the shared state of an IRC object running this function.
151         while True:
152             nextsleep = 0
153             with self.mutex:
154                 connected = [x for x in self.server_connections
155                              if x is not None and x.socket is not None]
156                 sockets = [x.socket for x in connected]
157                 if sockets:
158                     connmap = dict([(c.socket.fileno(), c) for c in connected])
159                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
160                     for s in insocks:
161                         connmap[s.fileno()].consume()
162                 else:
163                     nextsleep = timeout
164             time.sleep(nextsleep)
165
166     def add_event_handler(self, event, handler):
167         "Set a handler to be called later."
168         with self.mutex:
169             event_handlers = self.event_handlers.setdefault(event, [])
170             event_handlers.append(handler)
171
172     def handle_event(self, connection, event):
173         with self.mutex:
174             h = self.event_handlers
175             th = sorted(h.get("all_events", []) + h.get(event.type, []))
176             for handler in th:
177                 handler(connection, event)
178
179     def drop_connection(self, connection):
180         with self.mutex:
181             self.server_connections.remove(connection)
182
183
184 class LineBufferedStream():
185     "Line-buffer a read stream."
186     crlf_re = re.compile(b'\r?\n')
187
188     def __init__(self):
189         self.buffer = ''
190
191     def append(self, newbytes):
192         self.buffer += newbytes
193
194     def lines(self):
195         "Iterate over lines in the buffer."
196         lines = LineBufferedStream.crlf_re.split(self.buffer)
197         self.buffer = lines.pop()
198         return iter(lines)
199
200     def __iter__(self):
201         return self.lines()
202
203 class IRCServerConnectionError(IRCError):
204     pass
205
206 class IRCServerConnection():
207     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
208     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
209     # We only need to ensure that if some ancient server throws numerics
210     # for the ones we actually want to catch, they're mapped.
211     codemap = {
212         "001": "welcome",
213         "005": "featurelist",
214         "432": "erroneusnickname",
215         "433": "nicknameinuse",
216         "436": "nickcollision",
217         "437": "unavailresource",
218     }
219
220     def __init__(self, master):
221         self.master = master
222         self.socket = None
223
224     def connect(self, target, nickname,
225                 password=None, username=None, ircname=None):
226         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
227             target.servername, target.port, nickname))
228         if self.socket is not None:
229             self.disconnect("Changing servers")
230
231         self.buffer = LineBufferedStream()
232         self.event_handlers = {}
233         self.real_server_name = ""
234         self.target = target
235         self.nickname = nickname
236         try:
237             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238             self.socket.bind(('', 0))
239             self.socket.connect((target.servername, target.port))
240         except socket.error as err:
241             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
242
243         if password:
244             self.ship("PASS " + password)
245         self.nick(self.nickname)
246         self.user(username=username or ircname, realname=ircname or nickname)
247         return self
248
249     def close(self):
250         # Without this thread lock, there is a window during which
251         # select() can find a closed socket, leading to an EBADF error.
252         with self.master.mutex:
253             self.disconnect("Closing object")
254             self.master.drop_connection(self)
255
256     def consume(self):
257         try:
258             incoming = self.socket.recv(16384)
259         except socket.error:
260             # Server hung up on us.
261             self.disconnect("Connection reset by peer")
262             return
263         if not incoming:
264             # Dead air also indicates a connection reset.
265             self.disconnect("Connection reset by peer")
266             return
267
268         self.buffer.append(incoming)
269
270         for line in self.buffer:
271             LOG.debug("FROM: %s" % line)
272
273             if not line:
274                 continue
275
276             prefix = None
277             command = None
278             arguments = None
279             self.handle_event(Event("every_raw_message",
280                                      self.real_server_name,
281                                      None,
282                                      [line]))
283
284             m = IRCServerConnection.command_re.match(line)
285             if m.group("prefix"):
286                 prefix = m.group("prefix")
287                 if not self.real_server_name:
288                     self.real_server_name = prefix
289             if m.group("command"):
290                 command = m.group("command").lower()
291             if m.group("argument"):
292                 a = m.group("argument").split(" :", 1)
293                 arguments = a[0].split()
294                 if len(a) == 2:
295                     arguments.append(a[1])
296
297             command = IRCServerConnection.codemap.get(command, command)
298             if command in ["privmsg", "notice"]:
299                 target = arguments.pop(0)
300             else:
301                 target = None
302
303                 if command == "quit":
304                     arguments = [arguments[0]]
305                 elif command == "ping":
306                     target = arguments[0]
307                 else:
308                     target = arguments[0]
309                     arguments = arguments[1:]
310
311             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
312                 command, prefix, target, arguments))
313             self.handle_event(Event(command, prefix, target, arguments))
314
315     def handle_event(self, event):
316         self.master.handle_event(self, event)
317         if event.type in self.event_handlers:
318             for fn in self.event_handlers[event.type]:
319                 fn(self, event)
320
321     def is_connected(self):
322         return self.socket is not None
323
324     def disconnect(self, message=""):
325         if self.socket is None:
326             return
327         # Don't send a QUIT here - causes infinite loop!
328         try:
329             self.socket.shutdown(socket.SHUT_WR)
330             self.socket.close()
331         except socket.error:
332             pass
333         del self.socket
334         self.socket = None
335         self.handle_event(
336             Event("disconnect", self.target.server, "", [message]))
337
338     def join(self, channel, key=""):
339         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
340
341     def mode(self, target, command):
342         self.ship("MODE %s %s" % (target, command))
343
344     def nick(self, newnick):
345         self.ship("NICK " + newnick)
346
347     def part(self, channel, message=""):
348         cmd_parts = ['PART', channel]
349         if message:
350             cmd_parts.append(message)
351         self.ship(' '.join(cmd_parts))
352
353     def privmsg(self, target, text):
354         self.ship("PRIVMSG %s :%s" % (target, text))
355
356     def quit(self, message=""):
357         self.ship("QUIT" + (message and (" :" + message)))
358
359     def user(self, username, realname):
360         self.ship("USER %s 0 * :%s" % (username, realname))
361
362     def ship(self, string):
363         "Ship a command to the server, appending CR/LF"
364         try:
365             self.socket.send(string.encode('utf-8') + b'\r\n')
366             LOG.debug("TO: %s" % string)
367         except socket.error:
368             self.disconnect("Connection reset by peer.")
369
370 class Event(object):
371     def __init__(self, evtype, source, target, arguments=None):
372         self.type = evtype
373         self.source = source
374         self.target = target
375         if arguments is None:
376             arguments = []
377         self.arguments = arguments
378
379 def is_channel(string):
380     return string and string[0] in "#&+!"
381
382 class Connection:
383     def __init__(self, irker, target, nick_template, nick_needs_number=False,
384                  password=None, **kwargs):
385         self.irker = irker
386         self.target = target
387         self.nick_template = nick_template
388         self.nick_needs_number = nick_needs_number
389         self.password = password
390         self.kwargs = kwargs
391         self.nick_trial = None
392         self.connection = None
393         self.status = None
394         self.last_xmit = time.time()
395         self.last_ping = time.time()
396         self.channels_joined = {}
397         self.channel_limits = {}
398         # The consumer thread
399         self.queue = Queue.Queue()
400         self.thread = None
401     def nickname(self, n=None):
402         "Return a name for the nth server connection."
403         if n is None:
404             n = self.nick_trial
405         if self.nick_needs_number:
406             return (self.nick_template % n)
407         else:
408             return self.nick_template
409     def handle_ping(self):
410         "Register the fact that the server has pinged this connection."
411         self.last_ping = time.time()
412     def handle_welcome(self):
413         "The server says we're OK, with a non-conflicting nick."
414         self.status = "ready"
415         LOG.info("nick %s accepted" % self.nickname())
416         if self.password:
417             self.connection.privmsg("nickserv", "identify %s" % self.password)
418     def handle_badnick(self):
419         "The server says our nick is ill-formed or has a conflict."
420         LOG.info("nick %s rejected" % self.nickname())
421         if self.nick_needs_number:
422             # Randomness prevents a malicious user or bot from
423             # anticipating the next trial name in order to block us
424             # from completing the handshake.
425             self.nick_trial += random.randint(1, 3)
426             self.last_xmit = time.time()
427             self.connection.nick(self.nickname())
428         # Otherwise fall through, it might be possible to
429         # recover manually.
430     def handle_disconnect(self):
431         "Server disconnected us for flooding or some other reason."
432         self.connection = None
433         if self.status != "expired":
434             self.status = "disconnected"
435     def handle_kick(self, outof):
436         "We've been kicked."
437         self.status = "handshaking"
438         try:
439             del self.channels_joined[outof]
440         except KeyError:
441             LOG.error("kicked by %s from %s that's not joined" % (
442                 self.target, outof))
443         qcopy = []
444         while not self.queue.empty():
445             (channel, message, key) = self.queue.get()
446             if channel != outof:
447                 qcopy.append((channel, message, key))
448         for (channel, message, key) in qcopy:
449             self.queue.put((channel, message, key))
450         self.status = "ready"
451     def enqueue(self, channel, message, key, quit_after=False):
452         "Enque a message for transmission."
453         if self.thread is None or not self.thread.is_alive():
454             self.status = "unseen"
455             self.thread = threading.Thread(target=self.dequeue)
456             self.thread.setDaemon(True)
457             self.thread.start()
458         self.queue.put((channel, message, key))
459         if quit_after:
460             self.queue.put((channel, None, key))
461     def dequeue(self):
462         "Try to ship pending messages from the queue."
463         try:
464             while True:
465                 # We want to be kind to the IRC servers and not hold unused
466                 # sockets open forever, so they have a time-to-live.  The
467                 # loop is coded this particular way so that we can drop
468                 # the actual server connection when its time-to-live
469                 # expires, then reconnect and resume transmission if the
470                 # queue fills up again.
471                 if self.queue.empty():
472                     # Queue is empty, at some point we want to time out
473                     # the connection rather than holding a socket open in
474                     # the server forever.
475                     now = time.time()
476                     xmit_timeout = now > self.last_xmit + XMIT_TTL
477                     ping_timeout = now > self.last_ping + PING_TTL
478                     if self.status == "disconnected":
479                         # If the queue is empty, we can drop this connection.
480                         self.status = "expired"
481                         break
482                     elif xmit_timeout or ping_timeout:
483                         LOG.info((
484                             "timing out connection to %s at %s "
485                             "(ping_timeout=%s, xmit_timeout=%s)") % (
486                             self.target, time.asctime(), ping_timeout,
487                             xmit_timeout))
488                         with self.irker.irc.mutex:
489                             self.connection.context = None
490                             self.connection.quit("transmission timeout")
491                             self.connection = None
492                         self.status = "disconnected"
493                     else:
494                         # Prevent this thread from hogging the CPU by pausing
495                         # for just a little bit after the queue-empty check.
496                         # As long as this is less that the duration of a human
497                         # reflex arc it is highly unlikely any human will ever
498                         # notice.
499                         time.sleep(ANTI_BUZZ_DELAY)
500                 elif self.status == "disconnected" \
501                          and time.time() > self.last_xmit + DISCONNECT_TTL:
502                     # Queue is nonempty, but the IRC server might be
503                     # down. Letting failed connections retain queue
504                     # space forever would be a memory leak.
505                     self.status = "expired"
506                     break
507                 elif not self.connection and self.status != "expired":
508                     # Queue is nonempty but server isn't connected.
509                     with self.irker.irc.mutex:
510                         self.connection = self.irker.irc.newserver()
511                         self.connection.context = self
512                         # Try to avoid colliding with other instances
513                         self.nick_trial = random.randint(1, 990)
514                         self.channels_joined = {}
515                         try:
516                             # This will throw
517                             # IRCServerConnectionError on failure
518                             self.connection.connect(
519                                 target=self.target,
520                                 nickname=self.nickname(),
521                                 username="irker",
522                                 ircname="irker relaying client",
523                                 **self.kwargs)
524                             self.status = "handshaking"
525                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
526                                 self.target, time.asctime()))
527                             self.last_xmit = time.time()
528                             self.last_ping = time.time()
529                         except IRCServerConnectionError:
530                             self.status = "expired"
531                             break
532                 elif self.status == "handshaking":
533                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
534                         self.status = "expired"
535                         break
536                     else:
537                         # Don't buzz on the empty-queue test while we're
538                         # handshaking
539                         time.sleep(ANTI_BUZZ_DELAY)
540                 elif self.status == "unseen" \
541                          and time.time() > self.last_xmit + UNSEEN_TTL:
542                     # Nasty people could attempt a denial-of-service
543                     # attack by flooding us with requests with invalid
544                     # servernames. We guard against this by rapidly
545                     # expiring connections that have a nonempty queue but
546                     # have never had a successful open.
547                     self.status = "expired"
548                     break
549                 elif self.status == "ready":
550                     (channel, message, key) = self.queue.get()
551                     if channel not in self.channels_joined:
552                         self.connection.join(channel, key=key)
553                         LOG.info("joining %s on %s." % (channel, self.target))
554                     # None is magic - it's a request to quit the server
555                     if message is None:
556                         self.connection.quit()
557                     # An empty message might be used as a keepalive or
558                     # to join a channel for logging, so suppress the
559                     # privmsg send unless there is actual traffic.
560                     elif message:
561                         for segment in message.split("\n"):
562                             # Truncate the message if it's too long,
563                             # but we're working with characters here,
564                             # not bytes, so we could be off.
565                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
566                             maxlength = 500 - len(channel)
567                             if len(segment) > maxlength:
568                                 segment = segment[:maxlength]
569                             try:
570                                 self.connection.privmsg(channel, segment)
571                             except ValueError as err:
572                                 LOG.warning((
573                                     "irclib rejected a message to %s on %s "
574                                     "because: %s") % (
575                                     channel, self.target, str(err)))
576                                 LOG.debug(err.format_exc())
577                             time.sleep(ANTI_FLOOD_DELAY)
578                     self.last_xmit = self.channels_joined[channel] = time.time()
579                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
580                         self.target, time.asctime()))
581                     self.queue.task_done()
582                 elif self.status == "expired":
583                     print "We're expired but still running! This is a bug."
584                     break
585         except Exception, e:
586             LOG.error("exception %s in thread for %s" % (e, self.target))
587             # Maybe this should have its own status?
588             self.status = "expired"
589             LOG.debug(e.format_exc())
590         finally:
591             try:
592                 # Make sure we don't leave any zombies behind
593                 self.connection.close()
594             except:
595                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
596                 pass
597     def live(self):
598         "Should this connection not be scavenged?"
599         return self.status != "expired"
600     def joined_to(self, channel):
601         "Is this connection joined to the specified channel?"
602         return channel in self.channels_joined
603     def accepting(self, channel):
604         "Can this connection accept a join of this channel?"
605         if self.channel_limits:
606             match_count = 0
607             for already in self.channels_joined:
608                 # This obscure code is because the RFCs allow separate limits
609                 # by channel type (indicated by the first character of the name)
610                 # a feature that is almost never actually used.
611                 if already[0] == channel[0]:
612                     match_count += 1
613             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
614         else:
615             return len(self.channels_joined) < CHANNEL_MAX
616
617 class Target():
618     "Represent a transmission target."
619     def __init__(self, url):
620         self.url = url
621         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
622         url = url.replace("irc://", "http://")
623         parsed = urlparse.urlparse(url)
624         irchost, _, ircport = parsed.netloc.partition(':')
625         if not ircport:
626             ircport = 6667
627         self.servername = irchost
628         # IRC channel names are case-insensitive.  If we don't smash
629         # case here we may run into problems later. There was a bug
630         # observed on irc.rizon.net where an irkerd user specified #Channel,
631         # got kicked, and irkerd crashed because the server returned
632         # "#channel" in the notification that our kick handler saw.
633         self.channel = parsed.path.lstrip('/').lower()
634         # This deals with a tweak in recent versions of urlparse.
635         if parsed.fragment:
636             self.channel += "#" + parsed.fragment
637         isnick = self.channel.endswith(",isnick")
638         if isnick:
639             self.channel = self.channel[:-7]
640         if self.channel and not isnick and self.channel[0] not in "#&+":
641             self.channel = "#" + self.channel
642         # support both channel?secret and channel?key=secret
643         self.key = ""
644         if parsed.query:
645             self.key = re.sub("^key=", "", parsed.query)
646         self.port = int(ircport)
647
648     def __str__(self):
649         "Represent this instance as a string"
650         return self.servername or self.url or repr(self)
651
652     def validate(self):
653         "Raise InvalidRequest if the URL is missing a critical component"
654         if not self.servername:
655             raise InvalidRequest(
656                 'target URL missing a servername: %r' % self.url)
657         if not self.channel:
658             raise InvalidRequest(
659                 'target URL missing a channel: %r' % self.url)
660     def server(self):
661         "Return a hashable tuple representing the destination server."
662         return (self.servername, self.port)
663
664 class Dispatcher:
665     "Manage connections to a particular server-port combination."
666     def __init__(self, irker, **kwargs):
667         self.irker = irker
668         self.kwargs = kwargs
669         self.connections = []
670     def dispatch(self, channel, message, key, quit_after=False):
671         "Dispatch messages for our server-port combination."
672         # First, check if there is room for another channel
673         # on any of our existing connections.
674         connections = [x for x in self.connections if x.live()]
675         eligibles = [x for x in connections if x.joined_to(channel)] \
676                     or [x for x in connections if x.accepting(channel)]
677         if eligibles:
678             eligibles[0].enqueue(channel, message, key, quit_after)
679             return
680         # All connections are full up. Look for one old enough to be
681         # scavenged.
682         ancients = []
683         for connection in connections:
684             for (chan, age) in connections.channels_joined.items():
685                 if age < time.time() - CHANNEL_TTL:
686                     ancients.append((connection, chan, age))
687         if ancients:
688             ancients.sort(key=lambda x: x[2]) 
689             (found_connection, drop_channel, _drop_age) = ancients[0]
690             found_connection.part(drop_channel, "scavenged by irkerd")
691             del found_connection.channels_joined[drop_channel]
692             #time.sleep(ANTI_FLOOD_DELAY)
693             found_connection.enqueue(channel, message, key, quit_after)
694             return
695         # All existing channels had recent activity
696         newconn = Connection(self.irker, **self.kwargs)
697         self.connections.append(newconn)
698         newconn.enqueue(channel, message, key, quit_after)
699     def live(self):
700         "Does this server-port combination have any live connections?"
701         self.connections = [x for x in self.connections if x.live()]
702         return len(self.connections) > 0
703     def pending(self):
704         "Return all connections with pending traffic."
705         return [x for x in self.connections if not x.queue.empty()]
706     def last_xmit(self):
707         "Return the time of the most recent transmission."
708         return max(x.last_xmit for x in self.connections)
709
710 class Irker:
711     "Persistent IRC multiplexer."
712     def __init__(self, logfile=None, **kwargs):
713         self.logfile = logfile
714         self.kwargs = kwargs
715         self.irc = IRCClient()
716         self.irc.add_event_handler("ping", self._handle_ping)
717         self.irc.add_event_handler("welcome", self._handle_welcome)
718         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
719         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
720         self.irc.add_event_handler("nickcollision", self._handle_badnick)
721         self.irc.add_event_handler("unavailresource", self._handle_badnick)
722         self.irc.add_event_handler("featurelist", self._handle_features)
723         self.irc.add_event_handler("disconnect", self._handle_disconnect)
724         self.irc.add_event_handler("kick", self._handle_kick)
725         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
726         self.servers = {}
727     def thread_launch(self):
728         thread = threading.Thread(target=self.irc.spin)
729         thread.setDaemon(True)
730         self.irc._thread = thread
731         thread.start()
732     def _handle_ping(self, connection, _event):
733         "PING arrived, bump the last-received time for the connection."
734         if connection.context:
735             connection.context.handle_ping()
736     def _handle_welcome(self, connection, _event):
737         "Welcome arrived, nick accepted for this connection."
738         if connection.context:
739             connection.context.handle_welcome()
740     def _handle_badnick(self, connection, _event):
741         "Nick not accepted for this connection."
742         if connection.context:
743             connection.context.handle_badnick()
744     def _handle_features(self, connection, event):
745         "Determine if and how we can set deaf mode."
746         if connection.context:
747             cxt = connection.context
748             arguments = event.arguments
749             for lump in arguments:
750                 if lump.startswith("DEAF="):
751                     if not self.logfile:
752                         connection.mode(cxt.nickname(), "+"+lump[5:])
753                 elif lump.startswith("MAXCHANNELS="):
754                     m = int(lump[12:])
755                     for pref in "#&+":
756                         cxt.channel_limits[pref] = m
757                     LOG.info("%s maxchannels is %d" % (connection.server, m))
758                 elif lump.startswith("CHANLIMIT=#:"):
759                     limits = lump[10:].split(",")
760                     try:
761                         for token in limits:
762                             (prefixes, limit) = token.split(":")
763                             limit = int(limit)
764                             for c in prefixes:
765                                 cxt.channel_limits[c] = limit
766                         LOG.info("%s channel limit map is %s" % (
767                             connection.target, cxt.channel_limits))
768                     except ValueError:
769                         LOG.error("ill-formed CHANLIMIT property")
770     def _handle_disconnect(self, connection, _event):
771         "Server hung up the connection."
772         LOG.info("server %s disconnected" % connection.target)
773         connection.close()
774         if connection.context:
775             connection.context.handle_disconnect()
776     def _handle_kick(self, connection, event):
777         "Server hung up the connection."
778         target = event.target
779         LOG.info("irker has been kicked from %s on %s" % (
780             target, connection.target))
781         if connection.context:
782             connection.context.handle_kick(target)
783     def _handle_every_raw_message(self, _connection, event):
784         "Log all messages when in watcher mode."
785         if self.logfile:
786             with open(self.logfile, "a") as logfp:
787                 logfp.write("%03f|%s|%s\n" % \
788                              (time.time(), event.source, event.arguments[0]))
789     def pending(self):
790         "Do we have any pending message traffic?"
791         return [k for (k, v) in self.servers.items() if v.pending()]
792
793     def _parse_request(self, line):
794         "Request-parsing helper for the handle() method"
795         request = json.loads(line.strip())
796         if not isinstance(request, dict):
797             raise InvalidRequest(
798                 "request is not a JSON dictionary: %r" % request)
799         if "to" not in request or "privmsg" not in request:
800             raise InvalidRequest(
801                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
802         channels = request['to']
803         message = request['privmsg']
804         if not isinstance(channels, (list, basestring)):
805             raise InvalidRequest(
806                 "malformed request - unexpected channel type: %r" % channels)
807         if not isinstance(message, basestring):
808             raise InvalidRequest(
809                 "malformed request - unexpected message type: %r" % message)
810         if not isinstance(channels, list):
811             channels = [channels]
812         targets = []
813         for url in channels:
814             try:
815                 if not isinstance(url, basestring):
816                     raise InvalidRequest(
817                         "malformed request - URL has unexpected type: %r" %
818                         url)
819                 target = Target(url)
820                 target.validate()
821             except InvalidRequest, e:
822                 LOG.error(str(e))
823             else:
824                 targets.append(target)
825         return (targets, message)
826
827     def handle(self, line, quit_after=False):
828         "Perform a JSON relay request."
829         try:
830             targets, message = self._parse_request(line=line)
831             for target in targets:
832                 if target.server() not in self.servers:
833                     self.servers[target.server()] = Dispatcher(
834                         self, target=target, **self.kwargs)
835                 self.servers[target.server()].dispatch(
836                     target.channel, message, target.key, quit_after=quit_after)
837                 # GC dispatchers with no active connections
838                 servernames = self.servers.keys()
839                 for servername in servernames:
840                     if not self.servers[servername].live():
841                         del self.servers[servername]
842                     # If we might be pushing a resource limit even
843                     # after garbage collection, remove a session.  The
844                     # goal here is to head off DoS attacks that aim at
845                     # exhausting thread space or file descriptors.
846                     # The cost is that attempts to DoS this service
847                     # will cause lots of join/leave spam as we
848                     # scavenge old channels after connecting to new
849                     # ones. The particular method used for selecting a
850                     # session to be terminated doesn't matter much; we
851                     # choose the one longest idle on the assumption
852                     # that message activity is likely to be clumpy.
853                     if len(self.servers) >= CONNECTION_MAX:
854                         oldest = min(
855                             self.servers.keys(),
856                             key=lambda name: self.servers[name].last_xmit())
857                         del self.servers[oldest]
858         except InvalidRequest, e:
859             LOG.error(str(e))
860         except ValueError:
861             self.logerr("can't recognize JSON on input: %r" % line)
862         except RuntimeError:
863             self.logerr("wildly malformed JSON blew the parser stack.")
864
865 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
866     def handle(self):
867         while True:
868             line = self.rfile.readline()
869             if not line:
870                 break
871             irker.handle(line.strip())
872
873 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
874     def handle(self):
875         data = self.request[0].strip()
876         #socket = self.request[1]
877         irker.handle(data)
878
879 def usage():
880     sys.stdout.write("""
881 Usage:
882   irkerd [-d debuglevel] [-l logfile] [-n nick] [-p password] [-i channel message] [-V] [-h]
883
884 Options
885   -d    set debug level
886   -l    set logfile
887   -n    set nick-style
888   -p    set nickserv password
889   -i    immediate mode
890   -V    return irkerd version
891   -h    print this help dialog
892 """)
893
894 if __name__ == '__main__':
895     log_level = None
896     immediate = None
897     nick_template = "irker%03d"
898     password = None
899     logfile = None
900     try:
901         (options, arguments) = getopt.getopt(sys.argv[1:], "d:i:l:n:p:Vh")
902     except getopt.GetoptError as e:
903         sys.stderr.write("%s" % e)
904         usage()
905         sys.exit(1)
906     for (opt, val) in options:
907         if opt == '-d': # Enable debug/progress messages
908             if val.lower() not in LOG_LEVELS:
909                 sys.stderr.write('invalid log level %r (choices: %s)\n' % (
910                     val, ', '.join(LOG_LEVELS)))
911                 sys.exit(1)
912             log_level = getattr(logging, val.upper())
913         elif opt == '-i':       # Immediate mode - send one message, then exit. 
914             immediate = val
915         elif opt == '-l':       # Logfile mode - report traffic read in
916             logfile = val
917         elif opt == '-n':       # Force the nick
918             nick_template = val
919         elif opt == '-p':       # Set a nickserv password
920             password = val
921         elif opt == '-V':       # Emit version and exit
922             sys.stdout.write("irkerd version %s\n" % version)
923             sys.exit(0)
924         elif opt == '-h':
925             usage()
926             sys.exit(0)
927
928     handler = logging.StreamHandler()
929     LOG.addHandler(handler)
930     if log_level:
931         LOG.setLevel(log_level)
932
933     irker = Irker(
934         logfile=logfile,
935         nick_template=nick_template,
936         nick_needs_number=re.search("%.*d", nick_template),
937         password=password,
938         )
939     LOG.info("irkerd version %s" % version)
940     if immediate:
941         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
942         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
943         irker.irc.spin()
944     else:
945         irker.thread_launch()
946         try:
947             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
948             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
949             for server in [tcpserver, udpserver]:
950                 server = threading.Thread(target=server.serve_forever)
951                 server.setDaemon(True)
952                 server.start()
953             try:
954                 signal.pause()
955             except KeyboardInterrupt:
956                 raise SystemExit(1)
957         except socket.error, e:
958             sys.stderr.write("irkerd: server launch failed: %r\n" % e)
959
960 # end