irkerd: Replace sys.stderr.write with LOG.error
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
16 resource page at <http://www.catb.org/~esr/irker/>.
17
18 Requires Python 2.7, or:
19 * 2.6 with the argparse package installed.
20 * 2.5 with the argparse and simplejson packages installed.
21 """
22
23 from __future__ import with_statement
24
25 # These things might need tuning
26
27 HOST = "localhost"
28 PORT = 6659
29
30 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
31 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
32 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
33 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
34 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
35 UNSEEN_TTL = 60                 # Time to live, seconds since first request
36 CHANNEL_MAX = 18                # Max channels open per socket (default)
37 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
38 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
39 CONNECTION_MAX = 200            # To avoid hitting a thread limit
40
41 # No user-serviceable parts below this line
42
43 version = "2.6"
44
45 import Queue
46 import SocketServer
47 import argparse
48 import logging
49 try:
50     import simplejson as json   # Faster, also makes us Python-2.4-compatible
51 except ImportError:
52     import json
53 import random
54 import re
55 import select
56 import signal
57 import socket
58 import sys
59 import threading
60 import time
61 import urlparse
62
63
64 LOG = logging.getLogger(__name__)
65 LOG.setLevel(logging.ERROR)
66 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
67
68
69 # Sketch of implementation:
70 #
71 # One Irker object manages multiple IRC sessions.  It holds a map of
72 # Dispatcher objects, one per (server, port) combination, which are
73 # responsible for routing messages to one of any number of Connection
74 # objects that do the actual socket conversations.  The reason for the
75 # Dispatcher layer is that IRC daemons limit the number of channels a
76 # client (that is, from the daemon's point of view, a socket) can be
77 # joined to, so each session to a server needs a flock of Connection
78 # instances each with its own socket.
79 #
80 # Connections are timed out and removed when either they haven't seen a
81 # PING for a while (indicating that the server may be stalled or down)
82 # or there has been no message traffic to them for a while, or
83 # even if the queue is nonempty but efforts to connect have failed for
84 # a long time.
85 #
86 # There are multiple threads. One accepts incoming traffic from all
87 # servers.  Each Connection also has a consumer thread and a
88 # thread-safe message queue.  The program main appends messages to
89 # queues as JSON requests are received; the consumer threads try to
90 # ship them to servers.  When a socket write stalls, it only blocks an
91 # individual consumer thread; if it stalls long enough, the session
92 # will be timed out. This solves the biggest problem with a
93 # single-threaded implementation, which is that you can't count on a
94 # single stalled write not hanging all other traffic - you're at the
95 # mercy of the length of the buffers in the TCP/IP layer.
96 #
97 # Message delivery is thus not reliable in the face of network stalls,
98 # but this was considered acceptable because IRC (notoriously) has the
99 # same problem - there is little point in reliable delivery to a relay
100 # that is down or unreliable.
101 #
102 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
103 # It is strictly compliant to RFC1459, except for the interpretation and
104 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
105 #
106 # CHANLIMIT is as described in the Internet RFC draft
107 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
108 # The ",isnick" feature is as described in
109 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
110
111 # Historical note: the IRCClient and IRCServerConnection classes
112 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
113 # irclib code that irker formerly used as a service library.  They
114 # still look similar to parts of irclib because I contributed to that
115 # code before giving up on it.
116
117 class IRCError(Exception):
118     "An IRC exception"
119     pass
120
121
122 class InvalidRequest (ValueError):
123     "An invalid JSON request"
124     pass
125
126
127 class IRCClient():
128     "An IRC client session to one or more servers."
129     def __init__(self):
130         self.mutex = threading.RLock()
131         self.server_connections = []
132         self.event_handlers = {}
133         self.add_event_handler("ping",
134                                lambda c, e: c.ship("PONG %s" % e.target))
135
136     def newserver(self):
137         "Initialize a new server-connection object."
138         conn = IRCServerConnection(self)
139         with self.mutex:
140             self.server_connections.append(conn)
141         return conn
142
143     def spin(self, timeout=0.2):
144         "Spin processing data from connections forever."
145         # Outer loop should specifically *not* be mutex-locked.
146         # Otherwise no other thread would ever be able to change
147         # the shared state of an IRC object running this function.
148         while True:
149             nextsleep = 0
150             with self.mutex:
151                 connected = [x for x in self.server_connections
152                              if x is not None and x.socket is not None]
153                 sockets = [x.socket for x in connected]
154                 if sockets:
155                     connmap = dict([(c.socket.fileno(), c) for c in connected])
156                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
157                     for s in insocks:
158                         connmap[s.fileno()].consume()
159                 else:
160                     nextsleep = timeout
161             time.sleep(nextsleep)
162
163     def add_event_handler(self, event, handler):
164         "Set a handler to be called later."
165         with self.mutex:
166             event_handlers = self.event_handlers.setdefault(event, [])
167             event_handlers.append(handler)
168
169     def handle_event(self, connection, event):
170         with self.mutex:
171             h = self.event_handlers
172             th = sorted(h.get("all_events", []) + h.get(event.type, []))
173             for handler in th:
174                 handler(connection, event)
175
176     def drop_connection(self, connection):
177         with self.mutex:
178             self.server_connections.remove(connection)
179
180
181 class LineBufferedStream():
182     "Line-buffer a read stream."
183     crlf_re = re.compile(b'\r?\n')
184
185     def __init__(self):
186         self.buffer = ''
187
188     def append(self, newbytes):
189         self.buffer += newbytes
190
191     def lines(self):
192         "Iterate over lines in the buffer."
193         lines = LineBufferedStream.crlf_re.split(self.buffer)
194         self.buffer = lines.pop()
195         return iter(lines)
196
197     def __iter__(self):
198         return self.lines()
199
200 class IRCServerConnectionError(IRCError):
201     pass
202
203 class IRCServerConnection():
204     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
205     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
206     # We only need to ensure that if some ancient server throws numerics
207     # for the ones we actually want to catch, they're mapped.
208     codemap = {
209         "001": "welcome",
210         "005": "featurelist",
211         "432": "erroneusnickname",
212         "433": "nicknameinuse",
213         "436": "nickcollision",
214         "437": "unavailresource",
215     }
216
217     def __init__(self, master):
218         self.master = master
219         self.socket = None
220
221     def connect(self, target, nickname,
222                 password=None, username=None, ircname=None):
223         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
224             target.servername, target.port, nickname))
225         if self.socket is not None:
226             self.disconnect("Changing servers")
227
228         self.buffer = LineBufferedStream()
229         self.event_handlers = {}
230         self.real_server_name = ""
231         self.target = target
232         self.nickname = nickname
233         try:
234             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
235             self.socket.bind(('', 0))
236             self.socket.connect((target.servername, target.port))
237         except socket.error as err:
238             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
239
240         if password:
241             self.ship("PASS " + password)
242         self.nick(self.nickname)
243         self.user(username=username or ircname, realname=ircname or nickname)
244         return self
245
246     def close(self):
247         # Without this thread lock, there is a window during which
248         # select() can find a closed socket, leading to an EBADF error.
249         with self.master.mutex:
250             self.disconnect("Closing object")
251             self.master.drop_connection(self)
252
253     def consume(self):
254         try:
255             incoming = self.socket.recv(16384)
256         except socket.error:
257             # Server hung up on us.
258             self.disconnect("Connection reset by peer")
259             return
260         if not incoming:
261             # Dead air also indicates a connection reset.
262             self.disconnect("Connection reset by peer")
263             return
264
265         self.buffer.append(incoming)
266
267         for line in self.buffer:
268             LOG.debug("FROM: %s" % line)
269
270             if not line:
271                 continue
272
273             prefix = None
274             command = None
275             arguments = None
276             self.handle_event(Event("every_raw_message",
277                                      self.real_server_name,
278                                      None,
279                                      [line]))
280
281             m = IRCServerConnection.command_re.match(line)
282             if m.group("prefix"):
283                 prefix = m.group("prefix")
284                 if not self.real_server_name:
285                     self.real_server_name = prefix
286             if m.group("command"):
287                 command = m.group("command").lower()
288             if m.group("argument"):
289                 a = m.group("argument").split(" :", 1)
290                 arguments = a[0].split()
291                 if len(a) == 2:
292                     arguments.append(a[1])
293
294             command = IRCServerConnection.codemap.get(command, command)
295             if command in ["privmsg", "notice"]:
296                 target = arguments.pop(0)
297             else:
298                 target = None
299
300                 if command == "quit":
301                     arguments = [arguments[0]]
302                 elif command == "ping":
303                     target = arguments[0]
304                 else:
305                     target = arguments[0]
306                     arguments = arguments[1:]
307
308             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
309                 command, prefix, target, arguments))
310             self.handle_event(Event(command, prefix, target, arguments))
311
312     def handle_event(self, event):
313         self.master.handle_event(self, event)
314         if event.type in self.event_handlers:
315             for fn in self.event_handlers[event.type]:
316                 fn(self, event)
317
318     def is_connected(self):
319         return self.socket is not None
320
321     def disconnect(self, message=""):
322         if self.socket is None:
323             return
324         # Don't send a QUIT here - causes infinite loop!
325         try:
326             self.socket.shutdown(socket.SHUT_WR)
327             self.socket.close()
328         except socket.error:
329             pass
330         del self.socket
331         self.socket = None
332         self.handle_event(
333             Event("disconnect", self.target.server, "", [message]))
334
335     def join(self, channel, key=""):
336         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
337
338     def mode(self, target, command):
339         self.ship("MODE %s %s" % (target, command))
340
341     def nick(self, newnick):
342         self.ship("NICK " + newnick)
343
344     def part(self, channel, message=""):
345         cmd_parts = ['PART', channel]
346         if message:
347             cmd_parts.append(message)
348         self.ship(' '.join(cmd_parts))
349
350     def privmsg(self, target, text):
351         self.ship("PRIVMSG %s :%s" % (target, text))
352
353     def quit(self, message=""):
354         self.ship("QUIT" + (message and (" :" + message)))
355
356     def user(self, username, realname):
357         self.ship("USER %s 0 * :%s" % (username, realname))
358
359     def ship(self, string):
360         "Ship a command to the server, appending CR/LF"
361         try:
362             self.socket.send(string.encode('utf-8') + b'\r\n')
363             LOG.debug("TO: %s" % string)
364         except socket.error:
365             self.disconnect("Connection reset by peer.")
366
367 class Event(object):
368     def __init__(self, evtype, source, target, arguments=None):
369         self.type = evtype
370         self.source = source
371         self.target = target
372         if arguments is None:
373             arguments = []
374         self.arguments = arguments
375
376 def is_channel(string):
377     return string and string[0] in "#&+!"
378
379 class Connection:
380     def __init__(self, irker, target, nick_template, nick_needs_number=False,
381                  password=None, **kwargs):
382         self.irker = irker
383         self.target = target
384         self.nick_template = nick_template
385         self.nick_needs_number = nick_needs_number
386         self.password = password
387         self.kwargs = kwargs
388         self.nick_trial = None
389         self.connection = None
390         self.status = None
391         self.last_xmit = time.time()
392         self.last_ping = time.time()
393         self.channels_joined = {}
394         self.channel_limits = {}
395         # The consumer thread
396         self.queue = Queue.Queue()
397         self.thread = None
398     def nickname(self, n=None):
399         "Return a name for the nth server connection."
400         if n is None:
401             n = self.nick_trial
402         if self.nick_needs_number:
403             return (self.nick_template % n)
404         else:
405             return self.nick_template
406     def handle_ping(self):
407         "Register the fact that the server has pinged this connection."
408         self.last_ping = time.time()
409     def handle_welcome(self):
410         "The server says we're OK, with a non-conflicting nick."
411         self.status = "ready"
412         LOG.info("nick %s accepted" % self.nickname())
413         if self.password:
414             self.connection.privmsg("nickserv", "identify %s" % self.password)
415     def handle_badnick(self):
416         "The server says our nick is ill-formed or has a conflict."
417         LOG.info("nick %s rejected" % self.nickname())
418         if self.nick_needs_number:
419             # Randomness prevents a malicious user or bot from
420             # anticipating the next trial name in order to block us
421             # from completing the handshake.
422             self.nick_trial += random.randint(1, 3)
423             self.last_xmit = time.time()
424             self.connection.nick(self.nickname())
425         # Otherwise fall through, it might be possible to
426         # recover manually.
427     def handle_disconnect(self):
428         "Server disconnected us for flooding or some other reason."
429         self.connection = None
430         if self.status != "expired":
431             self.status = "disconnected"
432     def handle_kick(self, outof):
433         "We've been kicked."
434         self.status = "handshaking"
435         try:
436             del self.channels_joined[outof]
437         except KeyError:
438             LOG.error("kicked by %s from %s that's not joined" % (
439                 self.target, outof))
440         qcopy = []
441         while not self.queue.empty():
442             (channel, message, key) = self.queue.get()
443             if channel != outof:
444                 qcopy.append((channel, message, key))
445         for (channel, message, key) in qcopy:
446             self.queue.put((channel, message, key))
447         self.status = "ready"
448     def enqueue(self, channel, message, key, quit_after=False):
449         "Enque a message for transmission."
450         if self.thread is None or not self.thread.is_alive():
451             self.status = "unseen"
452             self.thread = threading.Thread(target=self.dequeue)
453             self.thread.setDaemon(True)
454             self.thread.start()
455         self.queue.put((channel, message, key))
456         if quit_after:
457             self.queue.put((channel, None, key))
458     def dequeue(self):
459         "Try to ship pending messages from the queue."
460         try:
461             while True:
462                 # We want to be kind to the IRC servers and not hold unused
463                 # sockets open forever, so they have a time-to-live.  The
464                 # loop is coded this particular way so that we can drop
465                 # the actual server connection when its time-to-live
466                 # expires, then reconnect and resume transmission if the
467                 # queue fills up again.
468                 if self.queue.empty():
469                     # Queue is empty, at some point we want to time out
470                     # the connection rather than holding a socket open in
471                     # the server forever.
472                     now = time.time()
473                     xmit_timeout = now > self.last_xmit + XMIT_TTL
474                     ping_timeout = now > self.last_ping + PING_TTL
475                     if self.status == "disconnected":
476                         # If the queue is empty, we can drop this connection.
477                         self.status = "expired"
478                         break
479                     elif xmit_timeout or ping_timeout:
480                         LOG.info((
481                             "timing out connection to %s at %s "
482                             "(ping_timeout=%s, xmit_timeout=%s)") % (
483                             self.target, time.asctime(), ping_timeout,
484                             xmit_timeout))
485                         with self.irker.irc.mutex:
486                             self.connection.context = None
487                             self.connection.quit("transmission timeout")
488                             self.connection = None
489                         self.status = "disconnected"
490                     else:
491                         # Prevent this thread from hogging the CPU by pausing
492                         # for just a little bit after the queue-empty check.
493                         # As long as this is less that the duration of a human
494                         # reflex arc it is highly unlikely any human will ever
495                         # notice.
496                         time.sleep(ANTI_BUZZ_DELAY)
497                 elif self.status == "disconnected" \
498                          and time.time() > self.last_xmit + DISCONNECT_TTL:
499                     # Queue is nonempty, but the IRC server might be
500                     # down. Letting failed connections retain queue
501                     # space forever would be a memory leak.
502                     self.status = "expired"
503                     break
504                 elif not self.connection and self.status != "expired":
505                     # Queue is nonempty but server isn't connected.
506                     with self.irker.irc.mutex:
507                         self.connection = self.irker.irc.newserver()
508                         self.connection.context = self
509                         # Try to avoid colliding with other instances
510                         self.nick_trial = random.randint(1, 990)
511                         self.channels_joined = {}
512                         try:
513                             # This will throw
514                             # IRCServerConnectionError on failure
515                             self.connection.connect(
516                                 target=self.target,
517                                 nickname=self.nickname(),
518                                 username="irker",
519                                 ircname="irker relaying client",
520                                 **self.kwargs)
521                             self.status = "handshaking"
522                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
523                                 self.target, time.asctime()))
524                             self.last_xmit = time.time()
525                             self.last_ping = time.time()
526                         except IRCServerConnectionError:
527                             self.status = "expired"
528                             break
529                 elif self.status == "handshaking":
530                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
531                         self.status = "expired"
532                         break
533                     else:
534                         # Don't buzz on the empty-queue test while we're
535                         # handshaking
536                         time.sleep(ANTI_BUZZ_DELAY)
537                 elif self.status == "unseen" \
538                          and time.time() > self.last_xmit + UNSEEN_TTL:
539                     # Nasty people could attempt a denial-of-service
540                     # attack by flooding us with requests with invalid
541                     # servernames. We guard against this by rapidly
542                     # expiring connections that have a nonempty queue but
543                     # have never had a successful open.
544                     self.status = "expired"
545                     break
546                 elif self.status == "ready":
547                     (channel, message, key) = self.queue.get()
548                     if channel not in self.channels_joined:
549                         self.connection.join(channel, key=key)
550                         LOG.info("joining %s on %s." % (channel, self.target))
551                     # None is magic - it's a request to quit the server
552                     if message is None:
553                         self.connection.quit()
554                     # An empty message might be used as a keepalive or
555                     # to join a channel for logging, so suppress the
556                     # privmsg send unless there is actual traffic.
557                     elif message:
558                         for segment in message.split("\n"):
559                             # Truncate the message if it's too long,
560                             # but we're working with characters here,
561                             # not bytes, so we could be off.
562                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
563                             maxlength = 500 - len(channel)
564                             if len(segment) > maxlength:
565                                 segment = segment[:maxlength]
566                             try:
567                                 self.connection.privmsg(channel, segment)
568                             except ValueError as err:
569                                 LOG.warning((
570                                     "irclib rejected a message to %s on %s "
571                                     "because: %s") % (
572                                     channel, self.target, str(err)))
573                                 LOG.debug(err.format_exc())
574                             time.sleep(ANTI_FLOOD_DELAY)
575                     self.last_xmit = self.channels_joined[channel] = time.time()
576                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
577                         self.target, time.asctime()))
578                     self.queue.task_done()
579                 elif self.status == "expired":
580                     LOG.error(
581                         "We're expired but still running! This is a bug.")
582                     break
583         except Exception, e:
584             LOG.error("exception %s in thread for %s" % (e, self.target))
585             # Maybe this should have its own status?
586             self.status = "expired"
587             LOG.debug(e.format_exc())
588         finally:
589             try:
590                 # Make sure we don't leave any zombies behind
591                 self.connection.close()
592             except:
593                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
594                 pass
595     def live(self):
596         "Should this connection not be scavenged?"
597         return self.status != "expired"
598     def joined_to(self, channel):
599         "Is this connection joined to the specified channel?"
600         return channel in self.channels_joined
601     def accepting(self, channel):
602         "Can this connection accept a join of this channel?"
603         if self.channel_limits:
604             match_count = 0
605             for already in self.channels_joined:
606                 # This obscure code is because the RFCs allow separate limits
607                 # by channel type (indicated by the first character of the name)
608                 # a feature that is almost never actually used.
609                 if already[0] == channel[0]:
610                     match_count += 1
611             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
612         else:
613             return len(self.channels_joined) < CHANNEL_MAX
614
615 class Target():
616     "Represent a transmission target."
617     def __init__(self, url):
618         self.url = url
619         # Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
620         url = url.replace("irc://", "http://")
621         parsed = urlparse.urlparse(url)
622         irchost, _, ircport = parsed.netloc.partition(':')
623         if not ircport:
624             ircport = 6667
625         self.servername = irchost
626         # IRC channel names are case-insensitive.  If we don't smash
627         # case here we may run into problems later. There was a bug
628         # observed on irc.rizon.net where an irkerd user specified #Channel,
629         # got kicked, and irkerd crashed because the server returned
630         # "#channel" in the notification that our kick handler saw.
631         self.channel = parsed.path.lstrip('/').lower()
632         # This deals with a tweak in recent versions of urlparse.
633         if parsed.fragment:
634             self.channel += "#" + parsed.fragment
635         isnick = self.channel.endswith(",isnick")
636         if isnick:
637             self.channel = self.channel[:-7]
638         if self.channel and not isnick and self.channel[0] not in "#&+":
639             self.channel = "#" + self.channel
640         # support both channel?secret and channel?key=secret
641         self.key = ""
642         if parsed.query:
643             self.key = re.sub("^key=", "", parsed.query)
644         self.port = int(ircport)
645
646     def __str__(self):
647         "Represent this instance as a string"
648         return self.servername or self.url or repr(self)
649
650     def validate(self):
651         "Raise InvalidRequest if the URL is missing a critical component"
652         if not self.servername:
653             raise InvalidRequest(
654                 'target URL missing a servername: %r' % self.url)
655         if not self.channel:
656             raise InvalidRequest(
657                 'target URL missing a channel: %r' % self.url)
658     def server(self):
659         "Return a hashable tuple representing the destination server."
660         return (self.servername, self.port)
661
662 class Dispatcher:
663     "Manage connections to a particular server-port combination."
664     def __init__(self, irker, **kwargs):
665         self.irker = irker
666         self.kwargs = kwargs
667         self.connections = []
668     def dispatch(self, channel, message, key, quit_after=False):
669         "Dispatch messages for our server-port combination."
670         # First, check if there is room for another channel
671         # on any of our existing connections.
672         connections = [x for x in self.connections if x.live()]
673         eligibles = [x for x in connections if x.joined_to(channel)] \
674                     or [x for x in connections if x.accepting(channel)]
675         if eligibles:
676             eligibles[0].enqueue(channel, message, key, quit_after)
677             return
678         # All connections are full up. Look for one old enough to be
679         # scavenged.
680         ancients = []
681         for connection in connections:
682             for (chan, age) in connections.channels_joined.items():
683                 if age < time.time() - CHANNEL_TTL:
684                     ancients.append((connection, chan, age))
685         if ancients:
686             ancients.sort(key=lambda x: x[2]) 
687             (found_connection, drop_channel, _drop_age) = ancients[0]
688             found_connection.part(drop_channel, "scavenged by irkerd")
689             del found_connection.channels_joined[drop_channel]
690             #time.sleep(ANTI_FLOOD_DELAY)
691             found_connection.enqueue(channel, message, key, quit_after)
692             return
693         # All existing channels had recent activity
694         newconn = Connection(self.irker, **self.kwargs)
695         self.connections.append(newconn)
696         newconn.enqueue(channel, message, key, quit_after)
697     def live(self):
698         "Does this server-port combination have any live connections?"
699         self.connections = [x for x in self.connections if x.live()]
700         return len(self.connections) > 0
701     def pending(self):
702         "Return all connections with pending traffic."
703         return [x for x in self.connections if not x.queue.empty()]
704     def last_xmit(self):
705         "Return the time of the most recent transmission."
706         return max(x.last_xmit for x in self.connections)
707
708 class Irker:
709     "Persistent IRC multiplexer."
710     def __init__(self, logfile=None, **kwargs):
711         self.logfile = logfile
712         self.kwargs = kwargs
713         self.irc = IRCClient()
714         self.irc.add_event_handler("ping", self._handle_ping)
715         self.irc.add_event_handler("welcome", self._handle_welcome)
716         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
717         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
718         self.irc.add_event_handler("nickcollision", self._handle_badnick)
719         self.irc.add_event_handler("unavailresource", self._handle_badnick)
720         self.irc.add_event_handler("featurelist", self._handle_features)
721         self.irc.add_event_handler("disconnect", self._handle_disconnect)
722         self.irc.add_event_handler("kick", self._handle_kick)
723         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
724         self.servers = {}
725     def thread_launch(self):
726         thread = threading.Thread(target=self.irc.spin)
727         thread.setDaemon(True)
728         self.irc._thread = thread
729         thread.start()
730     def _handle_ping(self, connection, _event):
731         "PING arrived, bump the last-received time for the connection."
732         if connection.context:
733             connection.context.handle_ping()
734     def _handle_welcome(self, connection, _event):
735         "Welcome arrived, nick accepted for this connection."
736         if connection.context:
737             connection.context.handle_welcome()
738     def _handle_badnick(self, connection, _event):
739         "Nick not accepted for this connection."
740         if connection.context:
741             connection.context.handle_badnick()
742     def _handle_features(self, connection, event):
743         "Determine if and how we can set deaf mode."
744         if connection.context:
745             cxt = connection.context
746             arguments = event.arguments
747             for lump in arguments:
748                 if lump.startswith("DEAF="):
749                     if not self.logfile:
750                         connection.mode(cxt.nickname(), "+"+lump[5:])
751                 elif lump.startswith("MAXCHANNELS="):
752                     m = int(lump[12:])
753                     for pref in "#&+":
754                         cxt.channel_limits[pref] = m
755                     LOG.info("%s maxchannels is %d" % (connection.server, m))
756                 elif lump.startswith("CHANLIMIT=#:"):
757                     limits = lump[10:].split(",")
758                     try:
759                         for token in limits:
760                             (prefixes, limit) = token.split(":")
761                             limit = int(limit)
762                             for c in prefixes:
763                                 cxt.channel_limits[c] = limit
764                         LOG.info("%s channel limit map is %s" % (
765                             connection.target, cxt.channel_limits))
766                     except ValueError:
767                         LOG.error("ill-formed CHANLIMIT property")
768     def _handle_disconnect(self, connection, _event):
769         "Server hung up the connection."
770         LOG.info("server %s disconnected" % connection.target)
771         connection.close()
772         if connection.context:
773             connection.context.handle_disconnect()
774     def _handle_kick(self, connection, event):
775         "Server hung up the connection."
776         target = event.target
777         LOG.info("irker has been kicked from %s on %s" % (
778             target, connection.target))
779         if connection.context:
780             connection.context.handle_kick(target)
781     def _handle_every_raw_message(self, _connection, event):
782         "Log all messages when in watcher mode."
783         if self.logfile:
784             with open(self.logfile, "a") as logfp:
785                 logfp.write("%03f|%s|%s\n" % \
786                              (time.time(), event.source, event.arguments[0]))
787     def pending(self):
788         "Do we have any pending message traffic?"
789         return [k for (k, v) in self.servers.items() if v.pending()]
790
791     def _parse_request(self, line):
792         "Request-parsing helper for the handle() method"
793         request = json.loads(line.strip())
794         if not isinstance(request, dict):
795             raise InvalidRequest(
796                 "request is not a JSON dictionary: %r" % request)
797         if "to" not in request or "privmsg" not in request:
798             raise InvalidRequest(
799                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
800         channels = request['to']
801         message = request['privmsg']
802         if not isinstance(channels, (list, basestring)):
803             raise InvalidRequest(
804                 "malformed request - unexpected channel type: %r" % channels)
805         if not isinstance(message, basestring):
806             raise InvalidRequest(
807                 "malformed request - unexpected message type: %r" % message)
808         if not isinstance(channels, list):
809             channels = [channels]
810         targets = []
811         for url in channels:
812             try:
813                 if not isinstance(url, basestring):
814                     raise InvalidRequest(
815                         "malformed request - URL has unexpected type: %r" %
816                         url)
817                 target = Target(url)
818                 target.validate()
819             except InvalidRequest, e:
820                 LOG.error(str(e))
821             else:
822                 targets.append(target)
823         return (targets, message)
824
825     def handle(self, line, quit_after=False):
826         "Perform a JSON relay request."
827         try:
828             targets, message = self._parse_request(line=line)
829             for target in targets:
830                 if target.server() not in self.servers:
831                     self.servers[target.server()] = Dispatcher(
832                         self, target=target, **self.kwargs)
833                 self.servers[target.server()].dispatch(
834                     target.channel, message, target.key, quit_after=quit_after)
835                 # GC dispatchers with no active connections
836                 servernames = self.servers.keys()
837                 for servername in servernames:
838                     if not self.servers[servername].live():
839                         del self.servers[servername]
840                     # If we might be pushing a resource limit even
841                     # after garbage collection, remove a session.  The
842                     # goal here is to head off DoS attacks that aim at
843                     # exhausting thread space or file descriptors.
844                     # The cost is that attempts to DoS this service
845                     # will cause lots of join/leave spam as we
846                     # scavenge old channels after connecting to new
847                     # ones. The particular method used for selecting a
848                     # session to be terminated doesn't matter much; we
849                     # choose the one longest idle on the assumption
850                     # that message activity is likely to be clumpy.
851                     if len(self.servers) >= CONNECTION_MAX:
852                         oldest = min(
853                             self.servers.keys(),
854                             key=lambda name: self.servers[name].last_xmit())
855                         del self.servers[oldest]
856         except InvalidRequest, e:
857             LOG.error(str(e))
858         except ValueError:
859             self.logerr("can't recognize JSON on input: %r" % line)
860         except RuntimeError:
861             self.logerr("wildly malformed JSON blew the parser stack.")
862
863 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
864     def handle(self):
865         while True:
866             line = self.rfile.readline()
867             if not line:
868                 break
869             irker.handle(line.strip())
870
871 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
872     def handle(self):
873         data = self.request[0].strip()
874         #socket = self.request[1]
875         irker.handle(data)
876
877
878 if __name__ == '__main__':
879     parser = argparse.ArgumentParser(
880         description=__doc__.strip().splitlines()[0])
881     parser.add_argument(
882         '-d', '--log-level', metavar='LEVEL', choices=LOG_LEVELS,
883         help='file of trusted certificates for SSL/TLS')
884     parser.add_argument(
885         '-l', '--log-file', metavar='PATH',
886         help='file for saving captured message traffic')
887     parser.add_argument(
888         '-n', '--nick', metavar='NAME', default='irker%03d',
889         help="nickname (optionally with a '%%.*d' server connection marker)")
890     parser.add_argument(
891         '-p', '--password', metavar='PASSWORD',
892         help='NickServ password')
893     parser.add_argument(
894         '-i', '--immediate', action='store_const', const=True,
895         help='disconnect after sending each message')
896     parser.add_argument(
897         '-V', '--version', action='version',
898         version='%(prog)s {0}'.format(version))
899     args = parser.parse_args()
900
901     handler = logging.StreamHandler()
902     LOG.addHandler(handler)
903     if args.log_level:
904         log_level = getattr(logging, args.log_level.upper())
905         LOG.setLevel(log_level)
906
907     irker = Irker(
908         logfile=args.log_file,
909         nick_template=args.nick,
910         nick_needs_number=re.search('%.*d', args.nick),
911         password=args.password,
912         )
913     LOG.info("irkerd version %s" % version)
914     if args.immediate:
915         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
916         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
917         irker.irc.spin()
918     else:
919         irker.thread_launch()
920         try:
921             tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
922             udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
923             for server in [tcpserver, udpserver]:
924                 server = threading.Thread(target=server.serve_forever)
925                 server.setDaemon(True)
926                 server.start()
927             try:
928                 signal.pause()
929             except KeyboardInterrupt:
930                 raise SystemExit(1)
931         except socket.error, e:
932             LOG.error("server launch failed: %r\n" % e)
933
934 # end