3 irkerd - a simple IRC multiplexer daemon
5 Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
6 and relays messages to IRC channels. Each request must be followed by
9 The <text> must be a string. The value of the 'to' attribute can be a
10 string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
11 a list of such strings; in the latter case the message is broadcast to
12 all listed channels. Note that the channel portion of the URL need
13 *not* have a leading '#' unless the channel name itself does.
15 Options: -d sets the debug-message level (probably only of interest to
16 developers). The -V option prints the program version and exits.
18 Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
19 resource page at <http://www.catb.org/~esr/irker/>.
21 Requires Python 2.6 and the irc client library at version >= 2.0.2: see
23 http://pypi.python.org/pypi/irc/
25 # These things might need tuning
30 NAMESTYLE = "irker%03d" # IRC nick template - must contain '%d'
31 XMIT_TTL = (3 * 60 * 60) # Time to live, seconds from last transmit
32 PING_TTL = (15 * 60) # Time to live, seconds from last PING
33 CHANNEL_TTL = (3 * 60 * 60) # Time to live, seconds from last transmit
34 DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
35 UNSEEN_TTL = 60 # Time to live, seconds since first request
36 CHANNEL_MAX = 18 # Max channels open per socket (default)
37 ANTI_FLOOD_DELAY = 0.5 # Anti-flood delay after transmissions, seconds
38 ANTI_BUZZ_DELAY = 0.09 # Anti-buzz delay after queue-empty check
40 # No user-serviceable parts below this line
44 # This black magic imports support for green threads (coroutines),
45 # then has kinky sex with the import library internals, replacing
46 # "threading" with a coroutine-using imposter. Threads then become
47 # ultra-light-weight and cooperatively scheduled.
50 eventlet.monkey_patch()
52 # With greenlets we don't worry about thread exhaustion, only the
53 # file descriptor limit (typically 1024 on modern Unixes). Thus we
54 # can handle a lot more concurrent sessions and generate less
55 # join/leave spam under heavy load.
58 # Threads are more expensive if we have to use OS-level ones
59 # rather than greenlets. We need to avoid pushing thread limits
60 # as well as fd limits. See security.txt for discussion.
64 import sys, getopt, urlparse, time, random, socket, signal
65 import threading, Queue, SocketServer
66 import irc.client, logging
68 import simplejson as json # Faster, also makes us Python-2.4-compatible
72 # Sketch of implementation:
74 # One Irker object manages multiple IRC sessions. It holds a map of
75 # Dispatcher objects, one per (server, port) combination, which are
76 # responsible for routing messages to one of any number of Connection
77 # objects that do the actual socket conversations. The reason for the
78 # Dispatcher layer is that IRC daemons limit the number of channels a
79 # client (that is, from the daemon's point of view, a socket) can be
80 # joined to, so each session to a server needs a flock of Connection
81 # instances each with its own socket.
83 # Connections are timed out and removed when either they haven't seen a
84 # PING for a while (indicating that the server may be stalled or down)
85 # or there has been no message traffic to them for a while, or
86 # even if the queue is nonempty but efforts to connect have failed for
89 # There are multiple threads. One accepts incoming traffic from all servers.
90 # Each Connection also has a consumer thread and a thread-safe message queue.
91 # The program main appends messages to queues as JSON requests are received;
92 # the consumer threads try to ship them to servers. When a socket write
93 # stalls, it only blocks an individual consumer thread; if it stalls long
94 # enough, the session will be timed out.
96 # Message delivery is thus not reliable in the face of network stalls,
97 # but this was considered acceptable because IRC (notoriously) has the
98 # same problem - there is little point in reliable delivery to a relay
99 # that is down or unreliable.
101 # This code uses only NICK, JOIN, MODE, and PRIVMSG. It is strictly
102 # compliant to RFC1459, except for the interpretation and use of the
103 # DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features. CHANLIMIT
104 # is as described in the Internet RFC draft
105 # draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
108 def __init__(self, irkerd, servername, port):
110 self.servername = servername
112 self.nick_trial = None
113 self.connection = None
115 self.last_xmit = time.time()
116 self.last_ping = time.time()
117 self.channels_joined = {}
118 self.channel_limits = {}
119 # The consumer thread
120 self.queue = Queue.Queue()
122 def nickname(self, n=None):
123 "Return a name for the nth server connection."
126 return (NAMESTYLE % n)
127 def handle_ping(self):
128 "Register the fact that the server has pinged this connection."
129 self.last_ping = time.time()
130 def handle_welcome(self):
131 "The server says we're OK, with a non-conflicting nick."
132 self.status = "ready"
133 self.irker.debug(1, "nick %s accepted" % self.nickname())
134 def handle_badnick(self):
135 "The server says our nick has a conflict."
136 self.irker.debug(1, "nick %s rejected" % self.nickname())
137 # Randomness prevents a malicious user or bot from antcipating the
138 # next trial name in order to block us from completing the handshake.
139 self.nick_trial += random.randint(1, 3)
140 self.connection.nick(self.nickname())
141 def handle_disconnect(self):
142 "Server disconnected us for flooding or some other reason."
143 self.connection = None
144 def handle_kick(self, outof):
146 self.status = "handshaking"
148 del self.channels_joined[outof]
150 self.irker.logerr("kicked by %s from %s that's not joined"
151 % (self.servername, outof))
153 while not self.queue.empty():
154 (channel, message) = self.queue.get()
156 qcopy.append((channel, message))
157 for (channel, message) in qcopy:
158 self.queue.put((channel, message))
159 self.status = "ready"
160 def enqueue(self, channel, message):
161 "Enque a message for transmission."
162 if self.thread is None or not self.thread.is_alive():
163 self.status = "unseen"
164 self.thread = threading.Thread(target=self.dequeue)
165 self.thread.setDaemon(True)
167 self.queue.put((channel, message))
169 "Try to ship pending messages from the queue."
172 # We want to be kind to the IRC servers and not hold unused
173 # sockets open forever, so they have a time-to-live. The
174 # loop is coded this particular way so that we can drop
175 # the actual server connection when its time-to-live
176 # expires, then reconnect and resume transmission if the
177 # queue fills up again.
178 if self.queue.empty():
179 # Queue is empty, at some point we want to time out
180 # the connection rather than holding a socket open in
181 # the server forever.
183 if (now > self.last_xmit + XMIT_TTL \
184 or now > self.last_ping + PING_TTL) \
185 and self.status != "disconnected":
186 self.irker.debug(1, "timing out inactive connection to %s at %s" % (self.servername, time.asctime()))
187 self.connection.context = None
188 self.connection.quit("transmission timeout")
189 self.connection.close()
190 self.connection = None
191 self.status = "disconnected"
193 # Prevent this thread from hogging the CPU by pausing
194 # for just a little bit after the queue-empty check.
195 # As long as this is less that the duration of a human
196 # reflex arc it is highly unlikely any human will ever
198 time.sleep(ANTI_BUZZ_DELAY)
199 elif not self.connection:
200 # Queue is nonempty but server isn't connected.
201 self.connection = self.irker.irc.server()
202 self.connection.context = self
203 # Try to avoid colliding with other instances
204 self.nick_trial = random.randint(1, 990)
205 self.channels_joined = {}
206 # This will throw irc.client.ServerConnectionError on failure
208 self.connection.connect(self.servername,
210 nickname=self.nickname(),
212 ircname="irker relaying client")
213 self.status = "handshaking"
214 self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime()))
215 self.last_xmit = time.time()
216 except irc.client.ServerConnectionError:
217 self.status = "disconnected"
218 elif self.status == "handshaking":
219 # Don't buzz on the empty-queue test while we're
221 time.sleep(ANTI_BUZZ_DELAY)
222 elif self.status == "disconnected" \
223 and time.time() > self.last_xmit + DISCONNECT_TTL:
224 # Queue is nonempty, but the IRC server might be
225 # down. Letting failed connections retain queue
226 # space forever would be a memory leak.
227 self.status = "expired"
229 elif self.status == "unseen" \
230 and time.time() > self.last_xmit + UNSEEN_TTL:
231 # Nasty people could attempt a denial-of-service
232 # attack by flooding us with requests with invalid
233 # servernames. We guard against this by rapidly
234 # expiring connections that have a nonempty queue but
235 # have never had a successful open.
236 self.status = "expired"
238 elif self.status == "ready":
239 (channel, message) = self.queue.get()
240 if channel not in self.channels_joined:
241 self.connection.join(channel)
242 self.irker.debug(1, "joining %s on %s." % (channel, self.servername))
243 for segment in message.split("\n"):
244 self.connection.privmsg(channel, segment)
245 time.sleep(ANTI_FLOOD_DELAY)
246 self.last_xmit = self.channels_joined[channel] = time.time()
247 self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime()))
248 self.queue.task_done()
250 (exc_type, _exc_value, _exc_traceback) = sys.exc_info()
251 self.irker.logerr("exception %s in thread for %s" % \
252 (exc_type, self.servername))
254 "Should this connection not be scavenged?"
255 return self.status != "expired"
256 def joined_to(self, channel):
257 "Is this connection joined to the specified channel?"
258 return channel in self.channels_joined
259 def accepting(self, channel):
260 "Can this connection accept a join of this channel?"
261 if self.channel_limits:
263 for already in self.channels_joined:
264 if already[0] == channel[0]:
266 return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
268 return len(self.channels_joined) < CHANNEL_MAX
271 "Represent a transmission target."
272 def __init__(self, url):
273 parsed = urlparse.urlparse(url)
274 irchost, _, ircport = parsed.netloc.partition(':')
277 self.servername = irchost
278 # IRC channel names are case-insensitive. If we don't smash
279 # case here we may run into problems later. There was a bug
280 # observed on irc.rizon.net where an irkerd user specified #Channel,
281 # got kicked, and irkerd crashed because the server returned
282 # "#channel" in the notification that our kick handler saw.
283 self.channel = parsed.path.lstrip('/').lower()
284 if self.channel and self.channel[0] not in "#&+":
285 self.channel = "#" + self.channel
286 self.port = int(ircport)
288 "Both components must be present for a valid target."
289 return self.servername and self.channel
291 "Return a hashable tuple representing the destination server."
292 return (self.servername, self.port)
295 "Manage connections to a particular server-port combination."
296 def __init__(self, irkerd, servername, port):
298 self.servername = servername
300 self.connections = []
301 def dispatch(self, channel, message):
302 "Dispatch messages for our server-port combination."
303 # First, check if there is room for another channel
304 # on any of our existing connections.
305 connections = [x for x in self.connections if x.live()]
306 eligibles = [x for x in connections if x.joined_to(channel)] \
307 or [x for x in connections if x.accepting(channel)]
309 eligibles[0].enqueue(channel, message)
311 # All connections are full up. Look for one old enough to be
314 for connection in connections:
315 for (chan, age) in connections.channels_joined.items():
316 if age < time.time() - CHANNEL_TTL:
317 ancients.append((connection, chan, age))
319 ancients.sort(key=lambda x: x[2])
320 (found_connection, drop_channel, _drop_age) = ancients[0]
321 found_connection.part(drop_channel, "scavenged by irkerd")
322 del found_connection.channels_joined[drop_channel]
323 #time.sleep(ANTI_FLOOD_DELAY)
324 found_connection.enqueue(channel, message)
326 # Didn't find any channels with no recent activity
327 newconn = Connection(self.irker,
330 self.connections.append(newconn)
331 newconn.enqueue(channel, message)
333 "Does this server-port combination have any live connections?"
334 self.connections = [x for x in self.connections if x.live()]
335 return len(self.connections) > 0
337 "Return the time of the most recent transmission."
338 return max([x.last_xmit for x in self.connections])
341 "Persistent IRC multiplexer."
342 def __init__(self, debuglevel=0):
343 self.debuglevel = debuglevel
344 self.irc = irc.client.IRC()
345 self.irc.add_global_handler("ping", self._handle_ping)
346 self.irc.add_global_handler("welcome", self._handle_welcome)
347 self.irc.add_global_handler("erroneusnickname", self._handle_badnick)
348 self.irc.add_global_handler("nicknameinuse", self._handle_badnick)
349 self.irc.add_global_handler("nickcollision", self._handle_badnick)
350 self.irc.add_global_handler("unavailresource", self._handle_badnick)
351 self.irc.add_global_handler("featurelist", self._handle_features)
352 self.irc.add_global_handler("disconnect", self._handle_disconnect)
353 self.irc.add_global_handler("kick", self._handle_kick)
354 thread = threading.Thread(target=self.irc.process_forever)
355 thread.setDaemon(True)
356 self.irc._thread = thread
359 def logerr(self, errmsg):
360 "Log a processing error."
361 sys.stderr.write("irkerd: " + errmsg + "\n")
362 def debug(self, level, errmsg):
363 "Debugging information."
364 if self.debuglevel >= level:
365 sys.stderr.write("irkerd: %s\n" % errmsg)
366 def _handle_ping(self, connection, _event):
367 "PING arrived, bump the last-received time for the connection."
368 if connection.context:
369 connection.context.handle_ping()
370 def _handle_welcome(self, connection, _event):
371 "Welcome arrived, nick accepted for this connection."
372 if connection.context:
373 connection.context.handle_welcome()
374 def _handle_badnick(self, connection, _event):
375 "Nick not accepted for this connection."
376 if connection.context:
377 connection.context.handle_badnick()
378 def _handle_features(self, connection, event):
379 "Determine if and how we can set deaf mode."
380 if connection.context:
381 cxt = connection.context
382 for lump in event.arguments():
383 if lump.startswith("DEAF="):
384 connection.mode(cxt.nickname(), "+"+lump[5:])
385 elif lump.startswith("MAXCHANNELS="):
388 cxt.channel_limits[pref] = m
389 self.debug(1, "%s maxchannels is %d"
390 % (connection.server, m))
391 elif lump.startswith("CHANLIMIT=#:"):
392 limits = lump[10:].split(",")
395 (prefixes, limit) = token.split(":")
398 cxt.channel_limits[c] = limit
399 self.debug(1, "%s channel limit map is %s"
400 % (connection.server, cxt.channel_limits))
402 self.logerr("ill-formed CHANLIMIT property")
403 def _handle_disconnect(self, connection, _event):
404 "Server hung up the connection."
405 self.debug(1, "server %s disconnected" % connection.server)
406 if connection.context:
407 connection.context.handle_disconnect()
408 def _handle_kick(self, connection, event):
409 "Server hung up the connection."
410 self.debug(1, "irker has been kicked from %s on %s" % (event.target(), connection.server))
411 if connection.context:
412 connection.context.handle_kick(event.target())
413 def handle(self, line):
414 "Perform a JSON relay request."
416 request = json.loads(line.strip())
417 if not isinstance(request, dict):
418 self.logerr("request is not a JSON dictionary: %r" % request)
419 elif "to" not in request or "privmsg" not in request:
420 self.logerr("malformed request - 'to' or 'privmsg' missing: %r" % request)
422 channels = request['to']
423 message = request['privmsg']
424 if type(channels) not in (type([]), type(""), type(u"")):
425 self.logerr("malformed request - unexpected channel type: %r" % channels)
426 if type(message) not in (type(""), type(u"")):
427 self.logerr("malformed request - unexpected message type: %r" % message)
429 if type(channels) != type([]):
430 channels = [channels]
432 if not type(url) in (type(""), type(u"")):
433 self.logerr("malformed request - URL has unexpected type: %r" % url)
436 if not target.valid():
438 if target.server() not in self.servers:
439 self.servers[target.server()] = Dispatcher(self, target.servername, target.port)
440 self.servers[target.server()].dispatch(target.channel, message)
441 # GC dispatchers with no active connections
442 servernames = self.servers.keys()
443 for servername in servernames:
444 if not self.servers[servername].live():
445 del self.servers[servername]
446 # If we might be pushing a resource limit
447 # even after garbage collection, remove a
448 # session. The goal here is to head off
449 # DoS attacks that aim at exhausting
450 # thread space or file descriptors. The
451 # cost is that attempts to DoS this
452 # service will cause lots of join/leave
453 # spam as we scavenge old channels after
454 # connecting to new ones. The particular
455 # method used for selecting a session to
456 # be terminated doesn't matter much; we
457 # choose the one longest idle on the
458 # assumption that message activity is likely
461 oldtime = float("inf")
462 if len(self.servers) >= CONNECTION_MAX:
463 for (name, server) in self.servers.items():
464 if server.last_xmit() < oldtime:
466 oldtime = server.last_xmit()
467 del self.servers[oldest]
469 self.logerr("can't recognize JSON on input: %r" % line)
471 self.logerr("wildly malformed JSON blew the parser stack.")
473 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
476 line = self.rfile.readline()
479 irker.handle(line.strip())
481 class IrkerUDPHandler(SocketServer.BaseRequestHandler):
483 data = self.request[0].strip()
484 #socket = self.request[1]
487 if __name__ == '__main__':
489 (options, arguments) = getopt.getopt(sys.argv[1:], "d:V")
490 for (opt, val) in options:
491 if opt == '-d': # Enable debug/progress messages
494 logging.basicConfig(level=logging.DEBUG)
495 elif opt == '-V': # Emit version and exit
496 sys.stdout.write("irkerd version %s\n" % version)
498 irker = Irker(debuglevel=debuglvl)
499 irker.debug(1, "irkerd version %s" % version)
501 tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
502 udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
503 for server in [tcpserver, udpserver]:
504 server = threading.Thread(target=server.serve_forever)
505 server.setDaemon(True)
509 except KeyboardInterrupt:
511 except socket.error, e:
512 sys.stderr.write("irkerd: server launch failed: %r\n" % e)