irkerd: Add Python-3-compatible string handling
[irker.git] / irkerd
1 #!/usr/bin/env python
2 """
3 irkerd - a simple IRC multiplexer daemon
4
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
7 a newline.
8
9 The <text> must be a string.  The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels.  Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
14
15 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
16 resource page at <http://www.catb.org/~esr/irker/>.
17
18 Requires Python 2.7, or:
19 * 2.6 with the argparse package installed.
20 """
21
22 from __future__ import unicode_literals
23 from __future__ import with_statement
24
25 # These things might need tuning
26
27 HOST = "localhost"
28 PORT = 6659
29
30 XMIT_TTL = (3 * 60 * 60)        # Time to live, seconds from last transmit
31 PING_TTL = (15 * 60)            # Time to live, seconds from last PING
32 HANDSHAKE_TTL = 60              # Time to live, seconds from nick transmit
33 CHANNEL_TTL = (3 * 60 * 60)     # Time to live, seconds from last transmit
34 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
35 UNSEEN_TTL = 60                 # Time to live, seconds since first request
36 CHANNEL_MAX = 18                # Max channels open per socket (default)
37 ANTI_FLOOD_DELAY = 1.0          # Anti-flood delay after transmissions, seconds
38 ANTI_BUZZ_DELAY = 0.09          # Anti-buzz delay after queue-empty check
39 CONNECTION_MAX = 200            # To avoid hitting a thread limit
40
41 # No user-serviceable parts below this line
42
43 version = "2.6"
44
45 import argparse
46 import logging
47 import json
48 try:  # Python 3
49     import queue
50 except ImportError:  # Python 2
51     import Queue as queue
52 import random
53 import re
54 import select
55 import signal
56 import socket
57 try:  # Python 3
58     import socketserver
59 except ImportError:  # Python 2
60     import SocketServer as socketserver
61 import sys
62 import threading
63 import time
64 try:  # Python 3
65     import urllib.parse as urllib_parse
66 except ImportError:  # Python 2
67     import urlparse as urllib_parse
68
69
70 LOG = logging.getLogger(__name__)
71 LOG.setLevel(logging.ERROR)
72 LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
73
74 try:  # Python 2
75     UNICODE_TYPE = unicode
76 except NameError:  # Python 3
77     UNICODE_TYPE = str
78
79
80 # Sketch of implementation:
81 #
82 # One Irker object manages multiple IRC sessions.  It holds a map of
83 # Dispatcher objects, one per (server, port) combination, which are
84 # responsible for routing messages to one of any number of Connection
85 # objects that do the actual socket conversations.  The reason for the
86 # Dispatcher layer is that IRC daemons limit the number of channels a
87 # client (that is, from the daemon's point of view, a socket) can be
88 # joined to, so each session to a server needs a flock of Connection
89 # instances each with its own socket.
90 #
91 # Connections are timed out and removed when either they haven't seen a
92 # PING for a while (indicating that the server may be stalled or down)
93 # or there has been no message traffic to them for a while, or
94 # even if the queue is nonempty but efforts to connect have failed for
95 # a long time.
96 #
97 # There are multiple threads. One accepts incoming traffic from all
98 # servers.  Each Connection also has a consumer thread and a
99 # thread-safe message queue.  The program main appends messages to
100 # queues as JSON requests are received; the consumer threads try to
101 # ship them to servers.  When a socket write stalls, it only blocks an
102 # individual consumer thread; if it stalls long enough, the session
103 # will be timed out. This solves the biggest problem with a
104 # single-threaded implementation, which is that you can't count on a
105 # single stalled write not hanging all other traffic - you're at the
106 # mercy of the length of the buffers in the TCP/IP layer.
107 #
108 # Message delivery is thus not reliable in the face of network stalls,
109 # but this was considered acceptable because IRC (notoriously) has the
110 # same problem - there is little point in reliable delivery to a relay
111 # that is down or unreliable.
112 #
113 # This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT. 
114 # It is strictly compliant to RFC1459, except for the interpretation and
115 # use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
116 #
117 # CHANLIMIT is as described in the Internet RFC draft
118 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
119 # The ",isnick" feature is as described in
120 # <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
121
122 # Historical note: the IRCClient and IRCServerConnection classes
123 # (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
124 # irclib code that irker formerly used as a service library.  They
125 # still look similar to parts of irclib because I contributed to that
126 # code before giving up on it.
127
128 class IRCError(Exception):
129     "An IRC exception"
130     pass
131
132
133 class InvalidRequest (ValueError):
134     "An invalid JSON request"
135     pass
136
137
138 class IRCClient():
139     "An IRC client session to one or more servers."
140     def __init__(self):
141         self.mutex = threading.RLock()
142         self.server_connections = []
143         self.event_handlers = {}
144         self.add_event_handler("ping",
145                                lambda c, e: c.ship("PONG %s" % e.target))
146
147     def newserver(self):
148         "Initialize a new server-connection object."
149         conn = IRCServerConnection(self)
150         with self.mutex:
151             self.server_connections.append(conn)
152         return conn
153
154     def spin(self, timeout=0.2):
155         "Spin processing data from connections forever."
156         # Outer loop should specifically *not* be mutex-locked.
157         # Otherwise no other thread would ever be able to change
158         # the shared state of an IRC object running this function.
159         while True:
160             nextsleep = 0
161             with self.mutex:
162                 connected = [x for x in self.server_connections
163                              if x is not None and x.socket is not None]
164                 sockets = [x.socket for x in connected]
165                 if sockets:
166                     connmap = dict([(c.socket.fileno(), c) for c in connected])
167                     (insocks, _o, _e) = select.select(sockets, [], [], timeout)
168                     for s in insocks:
169                         connmap[s.fileno()].consume()
170                 else:
171                     nextsleep = timeout
172             time.sleep(nextsleep)
173
174     def add_event_handler(self, event, handler):
175         "Set a handler to be called later."
176         with self.mutex:
177             event_handlers = self.event_handlers.setdefault(event, [])
178             event_handlers.append(handler)
179
180     def handle_event(self, connection, event):
181         with self.mutex:
182             h = self.event_handlers
183             th = sorted(h.get("all_events", []) + h.get(event.type, []))
184             for handler in th:
185                 handler(connection, event)
186
187     def drop_connection(self, connection):
188         with self.mutex:
189             self.server_connections.remove(connection)
190
191
192 class LineBufferedStream():
193     "Line-buffer a read stream."
194     _crlf_re = re.compile(b'\r?\n')
195
196     def __init__(self):
197         self.buffer = b''
198
199     def append(self, newbytes):
200         self.buffer += newbytes
201
202     def lines(self):
203         "Iterate over lines in the buffer."
204         lines = self._crlf_re.split(self.buffer)
205         self.buffer = lines.pop()
206         return iter(lines)
207
208     def __iter__(self):
209         return self.lines()
210
211 class IRCServerConnectionError(IRCError):
212     pass
213
214 class IRCServerConnection():
215     command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
216     # The full list of numeric-to-event mappings is in Perl's Net::IRC.
217     # We only need to ensure that if some ancient server throws numerics
218     # for the ones we actually want to catch, they're mapped.
219     codemap = {
220         "001": "welcome",
221         "005": "featurelist",
222         "432": "erroneusnickname",
223         "433": "nicknameinuse",
224         "436": "nickcollision",
225         "437": "unavailresource",
226     }
227
228     def __init__(self, master):
229         self.master = master
230         self.socket = None
231
232     def connect(self, target, nickname,
233                 password=None, username=None, ircname=None):
234         LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
235             target.servername, target.port, nickname))
236         if self.socket is not None:
237             self.disconnect("Changing servers")
238
239         self.buffer = LineBufferedStream()
240         self.event_handlers = {}
241         self.real_server_name = ""
242         self.target = target
243         self.nickname = nickname
244         try:
245             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
246             self.socket.bind(('', 0))
247             self.socket.connect((target.servername, target.port))
248         except socket.error as err:
249             raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
250
251         if password:
252             self.ship("PASS " + password)
253         self.nick(self.nickname)
254         self.user(username=username or ircname, realname=ircname or nickname)
255         return self
256
257     def close(self):
258         # Without this thread lock, there is a window during which
259         # select() can find a closed socket, leading to an EBADF error.
260         with self.master.mutex:
261             self.disconnect("Closing object")
262             self.master.drop_connection(self)
263
264     def consume(self):
265         try:
266             incoming = self.socket.recv(16384)
267         except socket.error:
268             # Server hung up on us.
269             self.disconnect("Connection reset by peer")
270             return
271         if not incoming:
272             # Dead air also indicates a connection reset.
273             self.disconnect("Connection reset by peer")
274             return
275
276         self.buffer.append(incoming)
277
278         for line in self.buffer:
279             if not isinstance(line, UNICODE_TYPE):
280                 line = UNICODE_TYPE(line, 'utf-8')
281             LOG.debug("FROM: %s" % line)
282
283             if not line:
284                 continue
285
286             prefix = None
287             command = None
288             arguments = None
289             self.handle_event(Event("every_raw_message",
290                                      self.real_server_name,
291                                      None,
292                                      [line]))
293
294             m = IRCServerConnection.command_re.match(line)
295             if m.group("prefix"):
296                 prefix = m.group("prefix")
297                 if not self.real_server_name:
298                     self.real_server_name = prefix
299             if m.group("command"):
300                 command = m.group("command").lower()
301             if m.group("argument"):
302                 a = m.group("argument").split(" :", 1)
303                 arguments = a[0].split()
304                 if len(a) == 2:
305                     arguments.append(a[1])
306
307             command = IRCServerConnection.codemap.get(command, command)
308             if command in ["privmsg", "notice"]:
309                 target = arguments.pop(0)
310             else:
311                 target = None
312
313                 if command == "quit":
314                     arguments = [arguments[0]]
315                 elif command == "ping":
316                     target = arguments[0]
317                 else:
318                     target = arguments[0]
319                     arguments = arguments[1:]
320
321             LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
322                 command, prefix, target, arguments))
323             self.handle_event(Event(command, prefix, target, arguments))
324
325     def handle_event(self, event):
326         self.master.handle_event(self, event)
327         if event.type in self.event_handlers:
328             for fn in self.event_handlers[event.type]:
329                 fn(self, event)
330
331     def is_connected(self):
332         return self.socket is not None
333
334     def disconnect(self, message=""):
335         if self.socket is None:
336             return
337         # Don't send a QUIT here - causes infinite loop!
338         try:
339             self.socket.shutdown(socket.SHUT_WR)
340             self.socket.close()
341         except socket.error:
342             pass
343         del self.socket
344         self.socket = None
345         self.handle_event(
346             Event("disconnect", self.target.server, "", [message]))
347
348     def join(self, channel, key=""):
349         self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
350
351     def mode(self, target, command):
352         self.ship("MODE %s %s" % (target, command))
353
354     def nick(self, newnick):
355         self.ship("NICK " + newnick)
356
357     def part(self, channel, message=""):
358         cmd_parts = ['PART', channel]
359         if message:
360             cmd_parts.append(message)
361         self.ship(' '.join(cmd_parts))
362
363     def privmsg(self, target, text):
364         self.ship("PRIVMSG %s :%s" % (target, text))
365
366     def quit(self, message=""):
367         self.ship("QUIT" + (message and (" :" + message)))
368
369     def user(self, username, realname):
370         self.ship("USER %s 0 * :%s" % (username, realname))
371
372     def ship(self, string):
373         "Ship a command to the server, appending CR/LF"
374         try:
375             self.socket.send(string.encode('utf-8') + b'\r\n')
376             LOG.debug("TO: %s" % string)
377         except socket.error:
378             self.disconnect("Connection reset by peer.")
379
380 class Event(object):
381     def __init__(self, evtype, source, target, arguments=None):
382         self.type = evtype
383         self.source = source
384         self.target = target
385         if arguments is None:
386             arguments = []
387         self.arguments = arguments
388
389 def is_channel(string):
390     return string and string[0] in "#&+!"
391
392 class Connection:
393     def __init__(self, irker, target, nick_template, nick_needs_number=False,
394                  password=None, **kwargs):
395         self.irker = irker
396         self.target = target
397         self.nick_template = nick_template
398         self.nick_needs_number = nick_needs_number
399         self.password = password
400         self.kwargs = kwargs
401         self.nick_trial = None
402         self.connection = None
403         self.status = None
404         self.last_xmit = time.time()
405         self.last_ping = time.time()
406         self.channels_joined = {}
407         self.channel_limits = {}
408         # The consumer thread
409         self.queue = queue.Queue()
410         self.thread = None
411     def nickname(self, n=None):
412         "Return a name for the nth server connection."
413         if n is None:
414             n = self.nick_trial
415         if self.nick_needs_number:
416             return (self.nick_template % n)
417         else:
418             return self.nick_template
419     def handle_ping(self):
420         "Register the fact that the server has pinged this connection."
421         self.last_ping = time.time()
422     def handle_welcome(self):
423         "The server says we're OK, with a non-conflicting nick."
424         self.status = "ready"
425         LOG.info("nick %s accepted" % self.nickname())
426         if self.password:
427             self.connection.privmsg("nickserv", "identify %s" % self.password)
428     def handle_badnick(self):
429         "The server says our nick is ill-formed or has a conflict."
430         LOG.info("nick %s rejected" % self.nickname())
431         if self.nick_needs_number:
432             # Randomness prevents a malicious user or bot from
433             # anticipating the next trial name in order to block us
434             # from completing the handshake.
435             self.nick_trial += random.randint(1, 3)
436             self.last_xmit = time.time()
437             self.connection.nick(self.nickname())
438         # Otherwise fall through, it might be possible to
439         # recover manually.
440     def handle_disconnect(self):
441         "Server disconnected us for flooding or some other reason."
442         self.connection = None
443         if self.status != "expired":
444             self.status = "disconnected"
445     def handle_kick(self, outof):
446         "We've been kicked."
447         self.status = "handshaking"
448         try:
449             del self.channels_joined[outof]
450         except KeyError:
451             LOG.error("kicked by %s from %s that's not joined" % (
452                 self.target, outof))
453         qcopy = []
454         while not self.queue.empty():
455             (channel, message, key) = self.queue.get()
456             if channel != outof:
457                 qcopy.append((channel, message, key))
458         for (channel, message, key) in qcopy:
459             self.queue.put((channel, message, key))
460         self.status = "ready"
461     def enqueue(self, channel, message, key, quit_after=False):
462         "Enque a message for transmission."
463         if self.thread is None or not self.thread.is_alive():
464             self.status = "unseen"
465             self.thread = threading.Thread(target=self.dequeue)
466             self.thread.setDaemon(True)
467             self.thread.start()
468         self.queue.put((channel, message, key))
469         if quit_after:
470             self.queue.put((channel, None, key))
471     def dequeue(self):
472         "Try to ship pending messages from the queue."
473         try:
474             while True:
475                 # We want to be kind to the IRC servers and not hold unused
476                 # sockets open forever, so they have a time-to-live.  The
477                 # loop is coded this particular way so that we can drop
478                 # the actual server connection when its time-to-live
479                 # expires, then reconnect and resume transmission if the
480                 # queue fills up again.
481                 if self.queue.empty():
482                     # Queue is empty, at some point we want to time out
483                     # the connection rather than holding a socket open in
484                     # the server forever.
485                     now = time.time()
486                     xmit_timeout = now > self.last_xmit + XMIT_TTL
487                     ping_timeout = now > self.last_ping + PING_TTL
488                     if self.status == "disconnected":
489                         # If the queue is empty, we can drop this connection.
490                         self.status = "expired"
491                         break
492                     elif xmit_timeout or ping_timeout:
493                         LOG.info((
494                             "timing out connection to %s at %s "
495                             "(ping_timeout=%s, xmit_timeout=%s)") % (
496                             self.target, time.asctime(), ping_timeout,
497                             xmit_timeout))
498                         with self.irker.irc.mutex:
499                             self.connection.context = None
500                             self.connection.quit("transmission timeout")
501                             self.connection = None
502                         self.status = "disconnected"
503                     else:
504                         # Prevent this thread from hogging the CPU by pausing
505                         # for just a little bit after the queue-empty check.
506                         # As long as this is less that the duration of a human
507                         # reflex arc it is highly unlikely any human will ever
508                         # notice.
509                         time.sleep(ANTI_BUZZ_DELAY)
510                 elif self.status == "disconnected" \
511                          and time.time() > self.last_xmit + DISCONNECT_TTL:
512                     # Queue is nonempty, but the IRC server might be
513                     # down. Letting failed connections retain queue
514                     # space forever would be a memory leak.
515                     self.status = "expired"
516                     break
517                 elif not self.connection and self.status != "expired":
518                     # Queue is nonempty but server isn't connected.
519                     with self.irker.irc.mutex:
520                         self.connection = self.irker.irc.newserver()
521                         self.connection.context = self
522                         # Try to avoid colliding with other instances
523                         self.nick_trial = random.randint(1, 990)
524                         self.channels_joined = {}
525                         try:
526                             # This will throw
527                             # IRCServerConnectionError on failure
528                             self.connection.connect(
529                                 target=self.target,
530                                 nickname=self.nickname(),
531                                 username="irker",
532                                 ircname="irker relaying client",
533                                 **self.kwargs)
534                             self.status = "handshaking"
535                             LOG.info("XMIT_TTL bump (%s connection) at %s" % (
536                                 self.target, time.asctime()))
537                             self.last_xmit = time.time()
538                             self.last_ping = time.time()
539                         except IRCServerConnectionError:
540                             self.status = "expired"
541                             break
542                 elif self.status == "handshaking":
543                     if time.time() > self.last_xmit + HANDSHAKE_TTL:
544                         self.status = "expired"
545                         break
546                     else:
547                         # Don't buzz on the empty-queue test while we're
548                         # handshaking
549                         time.sleep(ANTI_BUZZ_DELAY)
550                 elif self.status == "unseen" \
551                          and time.time() > self.last_xmit + UNSEEN_TTL:
552                     # Nasty people could attempt a denial-of-service
553                     # attack by flooding us with requests with invalid
554                     # servernames. We guard against this by rapidly
555                     # expiring connections that have a nonempty queue but
556                     # have never had a successful open.
557                     self.status = "expired"
558                     break
559                 elif self.status == "ready":
560                     (channel, message, key) = self.queue.get()
561                     if channel not in self.channels_joined:
562                         self.connection.join(channel, key=key)
563                         LOG.info("joining %s on %s." % (channel, self.target))
564                     # None is magic - it's a request to quit the server
565                     if message is None:
566                         self.connection.quit()
567                     # An empty message might be used as a keepalive or
568                     # to join a channel for logging, so suppress the
569                     # privmsg send unless there is actual traffic.
570                     elif message:
571                         for segment in message.split("\n"):
572                             # Truncate the message if it's too long,
573                             # but we're working with characters here,
574                             # not bytes, so we could be off.
575                             # 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
576                             maxlength = 500 - len(channel)
577                             if len(segment) > maxlength:
578                                 segment = segment[:maxlength]
579                             try:
580                                 self.connection.privmsg(channel, segment)
581                             except ValueError as err:
582                                 LOG.warning((
583                                     "irclib rejected a message to %s on %s "
584                                     "because: %s") % (
585                                     channel, self.target, UNICODE_TYPE(err)))
586                                 LOG.debug(err.format_exc())
587                             time.sleep(ANTI_FLOOD_DELAY)
588                     self.last_xmit = self.channels_joined[channel] = time.time()
589                     LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
590                         self.target, time.asctime()))
591                     self.queue.task_done()
592                 elif self.status == "expired":
593                     LOG.error(
594                         "We're expired but still running! This is a bug.")
595                     break
596         except Exception as e:
597             LOG.error("exception %s in thread for %s" % (e, self.target))
598             # Maybe this should have its own status?
599             self.status = "expired"
600             LOG.debug(e.format_exc())
601         finally:
602             try:
603                 # Make sure we don't leave any zombies behind
604                 self.connection.close()
605             except:
606                 # Irclib has a habit of throwing fresh exceptions here. Ignore that
607                 pass
608     def live(self):
609         "Should this connection not be scavenged?"
610         return self.status != "expired"
611     def joined_to(self, channel):
612         "Is this connection joined to the specified channel?"
613         return channel in self.channels_joined
614     def accepting(self, channel):
615         "Can this connection accept a join of this channel?"
616         if self.channel_limits:
617             match_count = 0
618             for already in self.channels_joined:
619                 # This obscure code is because the RFCs allow separate limits
620                 # by channel type (indicated by the first character of the name)
621                 # a feature that is almost never actually used.
622                 if already[0] == channel[0]:
623                     match_count += 1
624             return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
625         else:
626             return len(self.channels_joined) < CHANNEL_MAX
627
628 class Target():
629     "Represent a transmission target."
630     def __init__(self, url):
631         self.url = url
632         parsed = urllib_parse.urlparse(url)
633         irchost, _, ircport = parsed.netloc.partition(':')
634         if not ircport:
635             ircport = 6667
636         self.servername = irchost
637         # IRC channel names are case-insensitive.  If we don't smash
638         # case here we may run into problems later. There was a bug
639         # observed on irc.rizon.net where an irkerd user specified #Channel,
640         # got kicked, and irkerd crashed because the server returned
641         # "#channel" in the notification that our kick handler saw.
642         self.channel = parsed.path.lstrip('/').lower()
643         # This deals with a tweak in recent versions of urlparse.
644         if parsed.fragment:
645             self.channel += "#" + parsed.fragment
646         isnick = self.channel.endswith(",isnick")
647         if isnick:
648             self.channel = self.channel[:-7]
649         if self.channel and not isnick and self.channel[0] not in "#&+":
650             self.channel = "#" + self.channel
651         # support both channel?secret and channel?key=secret
652         self.key = ""
653         if parsed.query:
654             self.key = re.sub("^key=", "", parsed.query)
655         self.port = int(ircport)
656
657     def __str__(self):
658         "Represent this instance as a string"
659         return self.servername or self.url or repr(self)
660
661     def validate(self):
662         "Raise InvalidRequest if the URL is missing a critical component"
663         if not self.servername:
664             raise InvalidRequest(
665                 'target URL missing a servername: %r' % self.url)
666         if not self.channel:
667             raise InvalidRequest(
668                 'target URL missing a channel: %r' % self.url)
669     def server(self):
670         "Return a hashable tuple representing the destination server."
671         return (self.servername, self.port)
672
673 class Dispatcher:
674     "Manage connections to a particular server-port combination."
675     def __init__(self, irker, **kwargs):
676         self.irker = irker
677         self.kwargs = kwargs
678         self.connections = []
679     def dispatch(self, channel, message, key, quit_after=False):
680         "Dispatch messages for our server-port combination."
681         # First, check if there is room for another channel
682         # on any of our existing connections.
683         connections = [x for x in self.connections if x.live()]
684         eligibles = [x for x in connections if x.joined_to(channel)] \
685                     or [x for x in connections if x.accepting(channel)]
686         if eligibles:
687             eligibles[0].enqueue(channel, message, key, quit_after)
688             return
689         # All connections are full up. Look for one old enough to be
690         # scavenged.
691         ancients = []
692         for connection in connections:
693             for (chan, age) in connections.channels_joined.items():
694                 if age < time.time() - CHANNEL_TTL:
695                     ancients.append((connection, chan, age))
696         if ancients:
697             ancients.sort(key=lambda x: x[2]) 
698             (found_connection, drop_channel, _drop_age) = ancients[0]
699             found_connection.part(drop_channel, "scavenged by irkerd")
700             del found_connection.channels_joined[drop_channel]
701             #time.sleep(ANTI_FLOOD_DELAY)
702             found_connection.enqueue(channel, message, key, quit_after)
703             return
704         # All existing channels had recent activity
705         newconn = Connection(self.irker, **self.kwargs)
706         self.connections.append(newconn)
707         newconn.enqueue(channel, message, key, quit_after)
708     def live(self):
709         "Does this server-port combination have any live connections?"
710         self.connections = [x for x in self.connections if x.live()]
711         return len(self.connections) > 0
712     def pending(self):
713         "Return all connections with pending traffic."
714         return [x for x in self.connections if not x.queue.empty()]
715     def last_xmit(self):
716         "Return the time of the most recent transmission."
717         return max(x.last_xmit for x in self.connections)
718
719 class Irker:
720     "Persistent IRC multiplexer."
721     def __init__(self, logfile=None, **kwargs):
722         self.logfile = logfile
723         self.kwargs = kwargs
724         self.irc = IRCClient()
725         self.irc.add_event_handler("ping", self._handle_ping)
726         self.irc.add_event_handler("welcome", self._handle_welcome)
727         self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
728         self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
729         self.irc.add_event_handler("nickcollision", self._handle_badnick)
730         self.irc.add_event_handler("unavailresource", self._handle_badnick)
731         self.irc.add_event_handler("featurelist", self._handle_features)
732         self.irc.add_event_handler("disconnect", self._handle_disconnect)
733         self.irc.add_event_handler("kick", self._handle_kick)
734         self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
735         self.servers = {}
736     def thread_launch(self):
737         thread = threading.Thread(target=self.irc.spin)
738         thread.setDaemon(True)
739         self.irc._thread = thread
740         thread.start()
741     def _handle_ping(self, connection, _event):
742         "PING arrived, bump the last-received time for the connection."
743         if connection.context:
744             connection.context.handle_ping()
745     def _handle_welcome(self, connection, _event):
746         "Welcome arrived, nick accepted for this connection."
747         if connection.context:
748             connection.context.handle_welcome()
749     def _handle_badnick(self, connection, _event):
750         "Nick not accepted for this connection."
751         if connection.context:
752             connection.context.handle_badnick()
753     def _handle_features(self, connection, event):
754         "Determine if and how we can set deaf mode."
755         if connection.context:
756             cxt = connection.context
757             arguments = event.arguments
758             for lump in arguments:
759                 if lump.startswith("DEAF="):
760                     if not self.logfile:
761                         connection.mode(cxt.nickname(), "+"+lump[5:])
762                 elif lump.startswith("MAXCHANNELS="):
763                     m = int(lump[12:])
764                     for pref in "#&+":
765                         cxt.channel_limits[pref] = m
766                     LOG.info("%s maxchannels is %d" % (connection.server, m))
767                 elif lump.startswith("CHANLIMIT=#:"):
768                     limits = lump[10:].split(",")
769                     try:
770                         for token in limits:
771                             (prefixes, limit) = token.split(":")
772                             limit = int(limit)
773                             for c in prefixes:
774                                 cxt.channel_limits[c] = limit
775                         LOG.info("%s channel limit map is %s" % (
776                             connection.target, cxt.channel_limits))
777                     except ValueError:
778                         LOG.error("ill-formed CHANLIMIT property")
779     def _handle_disconnect(self, connection, _event):
780         "Server hung up the connection."
781         LOG.info("server %s disconnected" % connection.target)
782         connection.close()
783         if connection.context:
784             connection.context.handle_disconnect()
785     def _handle_kick(self, connection, event):
786         "Server hung up the connection."
787         target = event.target
788         LOG.info("irker has been kicked from %s on %s" % (
789             target, connection.target))
790         if connection.context:
791             connection.context.handle_kick(target)
792     def _handle_every_raw_message(self, _connection, event):
793         "Log all messages when in watcher mode."
794         if self.logfile:
795             with open(self.logfile, "a") as logfp:
796                 logfp.write("%03f|%s|%s\n" % \
797                              (time.time(), event.source, event.arguments[0]))
798     def pending(self):
799         "Do we have any pending message traffic?"
800         return [k for (k, v) in self.servers.items() if v.pending()]
801
802     def _parse_request(self, line):
803         "Request-parsing helper for the handle() method"
804         request = json.loads(line.strip())
805         if not isinstance(request, dict):
806             raise InvalidRequest(
807                 "request is not a JSON dictionary: %r" % request)
808         if "to" not in request or "privmsg" not in request:
809             raise InvalidRequest(
810                 "malformed request - 'to' or 'privmsg' missing: %r" % request)
811         channels = request['to']
812         message = request['privmsg']
813         if not isinstance(channels, (list, UNICODE_TYPE)):
814             raise InvalidRequest(
815                 "malformed request - unexpected channel type: %r" % channels)
816         if not isinstance(message, UNICODE_TYPE):
817             raise InvalidRequest(
818                 "malformed request - unexpected message type: %r" % message)
819         if not isinstance(channels, list):
820             channels = [channels]
821         targets = []
822         for url in channels:
823             try:
824                 if not isinstance(url, UNICODE_TYPE):
825                     raise InvalidRequest(
826                         "malformed request - URL has unexpected type: %r" %
827                         url)
828                 target = Target(url)
829                 target.validate()
830             except InvalidRequest as e:
831                 LOG.error(UNICODE_TYPE(e))
832             else:
833                 targets.append(target)
834         return (targets, message)
835
836     def handle(self, line, quit_after=False):
837         "Perform a JSON relay request."
838         try:
839             targets, message = self._parse_request(line=line)
840             for target in targets:
841                 if target.server() not in self.servers:
842                     self.servers[target.server()] = Dispatcher(
843                         self, target=target, **self.kwargs)
844                 self.servers[target.server()].dispatch(
845                     target.channel, message, target.key, quit_after=quit_after)
846                 # GC dispatchers with no active connections
847                 servernames = self.servers.keys()
848                 for servername in servernames:
849                     if not self.servers[servername].live():
850                         del self.servers[servername]
851                     # If we might be pushing a resource limit even
852                     # after garbage collection, remove a session.  The
853                     # goal here is to head off DoS attacks that aim at
854                     # exhausting thread space or file descriptors.
855                     # The cost is that attempts to DoS this service
856                     # will cause lots of join/leave spam as we
857                     # scavenge old channels after connecting to new
858                     # ones. The particular method used for selecting a
859                     # session to be terminated doesn't matter much; we
860                     # choose the one longest idle on the assumption
861                     # that message activity is likely to be clumpy.
862                     if len(self.servers) >= CONNECTION_MAX:
863                         oldest = min(
864                             self.servers.keys(),
865                             key=lambda name: self.servers[name].last_xmit())
866                         del self.servers[oldest]
867         except InvalidRequest as e:
868             LOG.error(UNICODE_TYPE(e))
869         except ValueError:
870             self.logerr("can't recognize JSON on input: %r" % line)
871         except RuntimeError:
872             self.logerr("wildly malformed JSON blew the parser stack.")
873
874 class IrkerTCPHandler(socketserver.StreamRequestHandler):
875     def handle(self):
876         while True:
877             line = self.rfile.readline()
878             if not line:
879                 break
880             if not isinstance(line, UNICODE_TYPE):
881                 line = UNICODE_TYPE(line, 'utf-8')
882             irker.handle(line=line.strip())
883
884 class IrkerUDPHandler(socketserver.BaseRequestHandler):
885     def handle(self):
886         line = self.request[0].strip()
887         #socket = self.request[1]
888         if not isinstance(line, UNICODE_TYPE):
889             line = UNICODE_TYPE(line, 'utf-8')
890         irker.handle(line=line.strip())
891
892
893 if __name__ == '__main__':
894     parser = argparse.ArgumentParser(
895         description=__doc__.strip().splitlines()[0])
896     parser.add_argument(
897         '-d', '--log-level', metavar='LEVEL', choices=LOG_LEVELS,
898         help='file of trusted certificates for SSL/TLS')
899     parser.add_argument(
900         '-l', '--log-file', metavar='PATH',
901         help='file for saving captured message traffic')
902     parser.add_argument(
903         '-n', '--nick', metavar='NAME', default='irker%03d',
904         help="nickname (optionally with a '%%.*d' server connection marker)")
905     parser.add_argument(
906         '-p', '--password', metavar='PASSWORD',
907         help='NickServ password')
908     parser.add_argument(
909         '-i', '--immediate', action='store_const', const=True,
910         help='disconnect after sending each message')
911     parser.add_argument(
912         '-V', '--version', action='version',
913         version='%(prog)s {0}'.format(version))
914     args = parser.parse_args()
915
916     handler = logging.StreamHandler()
917     LOG.addHandler(handler)
918     if args.log_level:
919         log_level = getattr(logging, args.log_level.upper())
920         LOG.setLevel(log_level)
921
922     irker = Irker(
923         logfile=args.log_file,
924         nick_template=args.nick,
925         nick_needs_number=re.search('%.*d', args.nick),
926         password=args.password,
927         )
928     LOG.info("irkerd version %s" % version)
929     if args.immediate:
930         irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
931         irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
932         irker.irc.spin()
933     else:
934         irker.thread_launch()
935         try:
936             tcpserver = socketserver.TCPServer((HOST, PORT), IrkerTCPHandler)
937             udpserver = socketserver.UDPServer((HOST, PORT), IrkerUDPHandler)
938             for server in [tcpserver, udpserver]:
939                 server = threading.Thread(target=server.serve_forever)
940                 server.setDaemon(True)
941                 server.start()
942             try:
943                 signal.pause()
944             except KeyboardInterrupt:
945                 raise SystemExit(1)
946         except socket.error as e:
947             LOG.error("server launch failed: %r\n" % e)
948
949 # end