Implement a trivial dispatch policy.
authorEric S. Raymond <esr@thyrsus.com>
Thu, 30 Aug 2012 12:54:58 +0000 (08:54 -0400)
committerEric S. Raymond <esr@thyrsus.com>
Thu, 30 Aug 2012 12:54:58 +0000 (08:54 -0400)
This version works, but doesn't respect CONNECT_MAX.

irker.py

index 5a1be9c1d2d8f1efb3e3af1ebcb38c8ec8b5d50f..dcc6e699e4e90448d3811d20ca9d2da960d5aa18 100755 (executable)
--- a/irker.py
+++ b/irker.py
@@ -39,22 +39,20 @@ version = "1.0"
 
 # Sketch of implementation:
 #
-# One Irker object manages multiple IRC sessions.  Each Session object
-# corresponds to a destination IRC URL that the daemon has seen and handles
-# traffic for one channel on one server.  There is never more than one
-# Session per given (server, channel) pair.
+# One Irker object manages multiple IRC sessions.  It holds a map of
+# Dispatcher objects, one per (server, port) combination, which are
+# responsible for routing messages to one of any bumber of Connection
+# objects that do the actual socket conversation.  The reason for
+# the Dispatcher layer is that IRC daemons limit the number of channels
+# a client (that is, from the daemon's point of view, a socket) can be
+# joined to.
 #
-# Multiple sessions to the same IRC server may share the same
-# Connection object in order to cut down on open sockets,
-# but because many servers enforce a limit on channels open per incoming
-# socket, not *all* sessions on the same server necessarily do.
-#
-# Sessions are timed out and removed when either they haven't seen a
+# Connections are timed out and removed when either they haven't seen a
 # PING for a while (indicating that the server may be stalled or down)
 # or there has been no message traffic to them for a while.
 #
 # There are multiple threads. One accepts incoming traffic from all servers.
-# Each Session also has a consumer thread and a thread-safe message queue.
+# Each Connection also has a consumer thread and a thread-safe message queue.
 # The program main appends messages to queues as JSON requests are received;
 # the consumer threads try to ship them to servers.  When a socket write
 # stalls, it only blocks an individual consumer thread; if it stalls long
@@ -140,6 +138,17 @@ class Target():
         "Return a hashable tuple representing the destination server."
         return (self.servername, self.port)
 
+class Dispatcher:
+    "Dispatch messages for a particular server-port combination."
+    # FIXME: Implement a nontivial policy that respects CONNECT_MAX.
+    def __init__(self, irker, servername, port):
+        self.irker = irker
+        self.servername = servername
+        self.port = port
+        self.connection = Connection(self.irker, servername, port)
+    def dispatch(self, channel, message):
+        self.connection.enqueue(channel, message)
+
 class Irker:
     "Persistent IRC multiplexer."
     def __init__(self, debuglevel=0):
@@ -155,9 +164,7 @@ class Irker:
         thread = threading.Thread(target=self.irc.process_forever)
         self.irc._thread = thread
         thread.start()
-        self.sessions = {}
-        self.countmap = {}
-        self.servercount = 0
+        self.ircds = {}
     def logerr(self, errmsg):
         "Log a processing error."
         sys.stderr.write("irker: " + errmsg + "\n")
@@ -197,10 +204,9 @@ class Irker:
                             self.logerr("malformed request - unexpected type: %s" % repr(request))
                         else:
                             target = Target(url)
-                            # FIXME: This doesn't respect the per-channel limit 
-                            if target.server() not in self.sessions:
-                                self.sessions[target.server()] = Connection(self, target.servername, target.port)
-                            self.sessions[target.server()].enqueue(target.channel, message)
+                            if target.server() not in self.ircds:
+                                self.ircds[target.server()] = Dispatcher(self, target.servername, target.port)
+                            self.ircds[target.server()].dispatch(target.channel, message)
         except ValueError:
             self.logerr("can't recognize JSON on input: %s" % repr(line))