Major refactoring step - untangle the messa around connections.
authorEric S. Raymond <esr@thyrsus.com>
Thu, 30 Aug 2012 01:34:12 +0000 (21:34 -0400)
committerEric S. Raymond <esr@thyrsus.com>
Thu, 30 Aug 2012 01:34:12 +0000 (21:34 -0400)
Now we need to reimplement connection limits.

irker.py

index 71f9cf57b0c14ab11b161d97bdfe721be05e8f96..5a1be9c1d2d8f1efb3e3af1ebcb38c8ec8b5d50f 100755 (executable)
--- a/irker.py
+++ b/irker.py
@@ -45,7 +45,7 @@ version = "1.0"
 # Session per given (server, channel) pair.
 #
 # Multiple sessions to the same IRC server may share the same
-# irc.client.ServerConnection object in order to cut down on open sockets,
+# Connection object in order to cut down on open sockets,
 # but because many servers enforce a limit on channels open per incoming
 # socket, not *all* sessions on the same server necessarily do.
 #
@@ -65,34 +65,26 @@ version = "1.0"
 # problem - there is little point in delivery to a relay that is down or
 # unreliable.
 
-class SessionException(exceptions.Exception):
-    def __init__(self, message):
-        exceptions.Exception.__init__(self)
-        self.message = message
-
-class Session():
-    "IRC session and message queue processing."
-    def __init__(self, irker, url):
+class Connection(irc.client.ServerConnection):
+    def __init__(self, irker, servername, port):
         self.irker = irker
-        self.url = url
-        self.server = None
+        self.servername = servername
+        self.port = port
+        self.connection = None
+        self.nick_trial = 1
         self.last_xmit = time.time()
         self.last_ping = time.time()
-        # Server connection setup
-        parsed = urlparse.urlparse(url)
-        irchost, _, ircport = parsed.netloc.partition(':')
-        if not ircport:
-            ircport = 6667
-        self.servername = irchost
-        self.channel = parsed.path.lstrip('/')
-        self.port = int(ircport)
+        self.channels_joined = []
         # The consumer thread
         self.queue = Queue.Queue()
         self.thread = threading.Thread(target=self.dequeue)
         self.thread.start()
-    def enqueue(self, message):
+    def nickname(self, n):
+        "Return a name for the nth server connection."
+        return (NAMESTYLE % n)
+    def enqueue(self, channel, message):
         "Enque a message for transmission."
-        self.queue.put(message)
+        self.queue.put((channel, message))
     def dequeue(self):
         "Try to ship pending messages from the queue."
         while True:
@@ -102,34 +94,51 @@ class Session():
             # the actual server connection when its time-to-live
             # expires, then reconnect and resume transmission if the
             # queue fills up again.
-            if not self.server:
-                self.server = self.irker.open(self.servername, self.port)
-                self.irker.debug(1, "XMIT_TTL bump (connection) at %s" % time.asctime())
+            if not self.connection:
+                self.connection = self.irker.irc.server()
+                self.connection.context = self
+                self.nick_trial = 1
+                self.channels_joined = []
+                self.connection.connect(self.servername,
+                                        self.port,
+                                        nickname=self.nickname(self.nick_trial),
+                                        username="irker",
+                                        ircname="irker relaying client")
+                self.nick_accepted = False
+                self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime()))
                 self.last_xmit = time.time()
             elif self.queue.empty():
                 now = time.time()
                 if now > self.last_xmit + XMIT_TTL \
                        or now > self.last_ping + PING_TTL:
-                    self.irker.debug(1, "timing out inactive connection at %s" % time.asctime())
-                    self.irker.close(self.servername, self.port)
-                    self.server = None
+                    self.irker.debug(1, "timing out inactive connection to %s at %s" % (self.servername, time.asctime()))
+                    self.connection.context = None
+                    self.connection.close()
+                    self.connection = None
                     break
-            elif self.server.nick_accepted:
-                message = self.queue.get()
-                if self.channel not in self.server.channels_joined:
-                    self.server.join("#" + self.channel)
-                    self.server.channels_joined.append(self.channel)
-                self.server.privmsg("#" + self.channel, message)
+            elif self.nick_accepted:
+                (channel, message) = self.queue.get()
+                if channel not in self.channels_joined:
+                    self.connection.join("#" + channel)
+                    self.channels_joined.append(channel)
+                self.connection.privmsg("#" + channel, message)
                 self.last_xmit = time.time()
-                self.irker.debug(1, "XMIT_TTL bump (transmission) at %s" % time.asctime())
+                self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime()))
                 self.queue.task_done()
-    def terminate(self):
-        "Terminate this session"
-        self.server.quit("#" + self.channel)
-        self.server.close()
-    def await(self):
-        "Block until processing of all queued messages is done."
-        self.queue.join()
+
+class Target():
+    "Represent a transmission target."
+    def __init__(self, url):
+        parsed = urlparse.urlparse(url)
+        irchost, _, ircport = parsed.netloc.partition(':')
+        if not ircport:
+            ircport = 6667
+        self.servername = irchost
+        self.channel = parsed.path.lstrip('/')
+        self.port = int(ircport)
+    def server(self):
+        "Return a hashable tuple representing the destination server."
+        return (self.servername, self.port)
 
 class Irker:
     "Persistent IRC multiplexer."
@@ -156,53 +165,18 @@ class Irker:
         "Debugging information."
         if self.debuglevel >= level:
             sys.stderr.write("irker: %s\n" % errmsg)
-    def nickname(self, n):
-        "Return a name for the nth server connection."
-        return (NAMESTYLE % n)
-    def open(self, servername, port):
-        "Allocate a new server instance."
-        if not (servername, port) in self.countmap:
-            self.countmap[(servername, port)] = (CONNECT_MAX+1, None)
-        count = self.countmap[(servername, port)][0]
-        if count > CONNECT_MAX:
-            self.servercount += 1
-            newserver = self.irc.server()
-            newserver.nick_trial = self.servercount
-            newserver.channels_joined = []
-            newserver.connect(servername,
-                              port,
-                              nickname=self.nickname(newserver.nick_trial),
-                              username="irker",
-                              ircname="irker relaying client")
-            newserver.nick_accepted = False
-            self.countmap[(servername, port)] = (1, newserver)
-            self.debug(1, "new server connection %d opened for %s:%s" % \
-                       (self.servercount, servername, port))
-        else:
-            self.debug(1, "reusing server connection for %s:%s" % \
-                       (servername, port))
-        return self.countmap[(servername, port)][1]
-    def close(self, servername, port):
-        "Release a server instance and all sessions using it."
-        del self.countmap[(servername, port)]
-        for val in self.sessions.values():
-            if (val.servername, val.port) == (servername, port):
-                self.sessions[val.url].terminate()
-                del self.sessions[val.url]
     def _handle_ping(self, connection, event):
         "PING arrived, bump the last-received time for the connection."
-        for (name, session) in self.sessions.items():
-            if name == connection.server:
-                session.last_ping = time.time()
+        connection.context.last_ping = time.time()
     def _handle_welcome(self, connection, event):
         "Welcome arrived, nick accepted for this connection."
-        connection.nick_accepted = True
-        self.debug(1, "nick %s accepted" % self.nickname(connection.nick_trial))
+        connection.context.nick_accepted = True
+        self.debug(1, "nick %s accepted" % connection.context.nickname(connection.context.nick_trial))
     def _handle_badnick(self, connection, event):
         "Nick not accepted for this connection."
-        self.debug(1, "nick %s rejected" % self.nickname(connection.nick_trial))
-        connection.nick_trial += 1
-        connection.nick(self.nickname(connection.nick_trial))
+        self.debug(1, "nick %s rejected" % self.nickname(connection.context.nick_trial))
+        connection.context.nick_trial += 1
+        connection.context.nick(self.nickname(connection.context.nick_trial))
     def handle(self, line):
         "Perform a JSON relay request."
         try:
@@ -222,15 +196,13 @@ class Irker:
                         if type(url) != type(u""):
                             self.logerr("malformed request - unexpected type: %s" % repr(request))
                         else:
-                            if url not in self.sessions:
-                                self.sessions[url] = Session(self, url)
-                            self.sessions[url].enqueue(message)
+                            target = Target(url)
+                            # FIXME: This doesn't respect the per-channel limit 
+                            if target.server() not in self.sessions:
+                                self.sessions[target.server()] = Connection(self, target.servername, target.port)
+                            self.sessions[target.server()].enqueue(target.channel, message)
         except ValueError:
             self.logerr("can't recognize JSON on input: %s" % repr(line))
-    def terminate(self):
-        "Ship all pending messages before terminating."
-        for session in self.sessions.values():
-            session.await()
 
 class IrkerTCPHandler(SocketServer.StreamRequestHandler):
     def handle(self):