From e7e1c4731d3a2733ee2e9c899f2e05f7a7618c07 Mon Sep 17 00:00:00 2001 From: "Eric S. Raymond" Date: Wed, 29 Aug 2012 21:34:12 -0400 Subject: [PATCH] Major refactoring step - untangle the messa around connections. Now we need to reimplement connection limits. --- irker.py | 150 ++++++++++++++++++++++--------------------------------- 1 file changed, 61 insertions(+), 89 deletions(-) diff --git a/irker.py b/irker.py index 71f9cf5..5a1be9c 100755 --- a/irker.py +++ b/irker.py @@ -45,7 +45,7 @@ version = "1.0" # Session per given (server, channel) pair. # # Multiple sessions to the same IRC server may share the same -# irc.client.ServerConnection object in order to cut down on open sockets, +# Connection object in order to cut down on open sockets, # but because many servers enforce a limit on channels open per incoming # socket, not *all* sessions on the same server necessarily do. # @@ -65,34 +65,26 @@ version = "1.0" # problem - there is little point in delivery to a relay that is down or # unreliable. -class SessionException(exceptions.Exception): - def __init__(self, message): - exceptions.Exception.__init__(self) - self.message = message - -class Session(): - "IRC session and message queue processing." - def __init__(self, irker, url): +class Connection(irc.client.ServerConnection): + def __init__(self, irker, servername, port): self.irker = irker - self.url = url - self.server = None + self.servername = servername + self.port = port + self.connection = None + self.nick_trial = 1 self.last_xmit = time.time() self.last_ping = time.time() - # Server connection setup - parsed = urlparse.urlparse(url) - irchost, _, ircport = parsed.netloc.partition(':') - if not ircport: - ircport = 6667 - self.servername = irchost - self.channel = parsed.path.lstrip('/') - self.port = int(ircport) + self.channels_joined = [] # The consumer thread self.queue = Queue.Queue() self.thread = threading.Thread(target=self.dequeue) self.thread.start() - def enqueue(self, message): + def nickname(self, n): + "Return a name for the nth server connection." + return (NAMESTYLE % n) + def enqueue(self, channel, message): "Enque a message for transmission." - self.queue.put(message) + self.queue.put((channel, message)) def dequeue(self): "Try to ship pending messages from the queue." while True: @@ -102,34 +94,51 @@ class Session(): # the actual server connection when its time-to-live # expires, then reconnect and resume transmission if the # queue fills up again. - if not self.server: - self.server = self.irker.open(self.servername, self.port) - self.irker.debug(1, "XMIT_TTL bump (connection) at %s" % time.asctime()) + if not self.connection: + self.connection = self.irker.irc.server() + self.connection.context = self + self.nick_trial = 1 + self.channels_joined = [] + self.connection.connect(self.servername, + self.port, + nickname=self.nickname(self.nick_trial), + username="irker", + ircname="irker relaying client") + self.nick_accepted = False + self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime())) self.last_xmit = time.time() elif self.queue.empty(): now = time.time() if now > self.last_xmit + XMIT_TTL \ or now > self.last_ping + PING_TTL: - self.irker.debug(1, "timing out inactive connection at %s" % time.asctime()) - self.irker.close(self.servername, self.port) - self.server = None + self.irker.debug(1, "timing out inactive connection to %s at %s" % (self.servername, time.asctime())) + self.connection.context = None + self.connection.close() + self.connection = None break - elif self.server.nick_accepted: - message = self.queue.get() - if self.channel not in self.server.channels_joined: - self.server.join("#" + self.channel) - self.server.channels_joined.append(self.channel) - self.server.privmsg("#" + self.channel, message) + elif self.nick_accepted: + (channel, message) = self.queue.get() + if channel not in self.channels_joined: + self.connection.join("#" + channel) + self.channels_joined.append(channel) + self.connection.privmsg("#" + channel, message) self.last_xmit = time.time() - self.irker.debug(1, "XMIT_TTL bump (transmission) at %s" % time.asctime()) + self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime())) self.queue.task_done() - def terminate(self): - "Terminate this session" - self.server.quit("#" + self.channel) - self.server.close() - def await(self): - "Block until processing of all queued messages is done." - self.queue.join() + +class Target(): + "Represent a transmission target." + def __init__(self, url): + parsed = urlparse.urlparse(url) + irchost, _, ircport = parsed.netloc.partition(':') + if not ircport: + ircport = 6667 + self.servername = irchost + self.channel = parsed.path.lstrip('/') + self.port = int(ircport) + def server(self): + "Return a hashable tuple representing the destination server." + return (self.servername, self.port) class Irker: "Persistent IRC multiplexer." @@ -156,53 +165,18 @@ class Irker: "Debugging information." if self.debuglevel >= level: sys.stderr.write("irker: %s\n" % errmsg) - def nickname(self, n): - "Return a name for the nth server connection." - return (NAMESTYLE % n) - def open(self, servername, port): - "Allocate a new server instance." - if not (servername, port) in self.countmap: - self.countmap[(servername, port)] = (CONNECT_MAX+1, None) - count = self.countmap[(servername, port)][0] - if count > CONNECT_MAX: - self.servercount += 1 - newserver = self.irc.server() - newserver.nick_trial = self.servercount - newserver.channels_joined = [] - newserver.connect(servername, - port, - nickname=self.nickname(newserver.nick_trial), - username="irker", - ircname="irker relaying client") - newserver.nick_accepted = False - self.countmap[(servername, port)] = (1, newserver) - self.debug(1, "new server connection %d opened for %s:%s" % \ - (self.servercount, servername, port)) - else: - self.debug(1, "reusing server connection for %s:%s" % \ - (servername, port)) - return self.countmap[(servername, port)][1] - def close(self, servername, port): - "Release a server instance and all sessions using it." - del self.countmap[(servername, port)] - for val in self.sessions.values(): - if (val.servername, val.port) == (servername, port): - self.sessions[val.url].terminate() - del self.sessions[val.url] def _handle_ping(self, connection, event): "PING arrived, bump the last-received time for the connection." - for (name, session) in self.sessions.items(): - if name == connection.server: - session.last_ping = time.time() + connection.context.last_ping = time.time() def _handle_welcome(self, connection, event): "Welcome arrived, nick accepted for this connection." - connection.nick_accepted = True - self.debug(1, "nick %s accepted" % self.nickname(connection.nick_trial)) + connection.context.nick_accepted = True + self.debug(1, "nick %s accepted" % connection.context.nickname(connection.context.nick_trial)) def _handle_badnick(self, connection, event): "Nick not accepted for this connection." - self.debug(1, "nick %s rejected" % self.nickname(connection.nick_trial)) - connection.nick_trial += 1 - connection.nick(self.nickname(connection.nick_trial)) + self.debug(1, "nick %s rejected" % self.nickname(connection.context.nick_trial)) + connection.context.nick_trial += 1 + connection.context.nick(self.nickname(connection.context.nick_trial)) def handle(self, line): "Perform a JSON relay request." try: @@ -222,15 +196,13 @@ class Irker: if type(url) != type(u""): self.logerr("malformed request - unexpected type: %s" % repr(request)) else: - if url not in self.sessions: - self.sessions[url] = Session(self, url) - self.sessions[url].enqueue(message) + target = Target(url) + # FIXME: This doesn't respect the per-channel limit + if target.server() not in self.sessions: + self.sessions[target.server()] = Connection(self, target.servername, target.port) + self.sessions[target.server()].enqueue(target.channel, message) except ValueError: self.logerr("can't recognize JSON on input: %s" % repr(line)) - def terminate(self): - "Ship all pending messages before terminating." - for session in self.sessions.values(): - session.await() class IrkerTCPHandler(SocketServer.StreamRequestHandler): def handle(self): -- 2.26.2