From: Eric S. Raymond Date: Mon, 8 Oct 2012 04:57:57 +0000 (-0400) Subject: Wrap library calls in a mutex for thread safety. X-Git-Tag: 1.9~7 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=897be7c8b3380935759502091568ff0a9b80a479;p=irker.git Wrap library calls in a mutex for thread safety. --- diff --git a/irkerd b/irkerd index 2a73835..b9814c1 100755 --- a/irkerd +++ b/irkerd @@ -189,10 +189,11 @@ class Connection: ping_timeout = now > self.last_ping + PING_TTL if (xmit_timeout or ping_timeout) and self.status != "disconnected": self.irker.debug(1, "timing out connection to %s at %s (ping_timeout=%s, xmit_timeout=%s)" % (self.servername, time.asctime(), ping_timeout, xmit_timeout)) - self.connection.context = None - self.connection.quit("transmission timeout") - self.connection.close() - self.connection = None + with self.irker.library_lock: + self.connection.context = None + self.connection.quit("transmission timeout") + self.connection.close() + self.connection = None self.status = "disconnected" else: # Prevent this thread from hogging the CPU by pausing @@ -203,23 +204,25 @@ class Connection: time.sleep(ANTI_BUZZ_DELAY) elif not self.connection: # Queue is nonempty but server isn't connected. - self.connection = self.irker.irc.server() - self.connection.context = self - # Try to avoid colliding with other instances - self.nick_trial = random.randint(1, 990) - self.channels_joined = {} - # This will throw irc.client.ServerConnectionError on failure - try: - self.connection.connect(self.servername, - self.port, - nickname=self.nickname(), - username="irker", - ircname="irker relaying client") - self.status = "handshaking" - self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime())) - self.last_xmit = time.time() - except irc.client.ServerConnectionError: - self.status = "disconnected" + with self.irker.library_lock: + self.connection = self.irker.irc.server() + self.connection.context = self + # Try to avoid colliding with other instances + self.nick_trial = random.randint(1, 990) + self.channels_joined = {} + try: + # This will throw + # irc.client.ServerConnectionError on failure + self.connection.connect(self.servername, + self.port, + nickname=self.nickname(), + username="irker", + ircname="irker relaying client") + self.status = "handshaking" + self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime())) + self.last_xmit = time.time() + except irc.client.ServerConnectionError: + self.status = "disconnected" elif self.status == "handshaking": if time.time() > self.last_xmit + HANDSHAKE_TTL: self.status = "expired" @@ -245,16 +248,17 @@ class Connection: self.status = "expired" break elif self.status == "ready": - (channel, message) = self.queue.get() - if channel not in self.channels_joined: - self.connection.join(channel) - self.irker.debug(1, "joining %s on %s." % (channel, self.servername)) - for segment in message.split("\n"): - self.connection.privmsg(channel, segment) - time.sleep(ANTI_FLOOD_DELAY) - self.last_xmit = self.channels_joined[channel] = time.time() - self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime())) - self.queue.task_done() + with self.irker.library_lock: + (channel, message) = self.queue.get() + if channel not in self.channels_joined: + self.connection.join(channel) + self.irker.debug(1, "joining %s on %s." % (channel, self.servername)) + for segment in message.split("\n"): + self.connection.privmsg(channel, segment) + time.sleep(ANTI_FLOOD_DELAY) + self.last_xmit = self.channels_joined[channel] = time.time() + self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime())) + self.queue.task_done() except: (exc_type, _exc_value, exc_traceback) = sys.exc_info() self.irker.logerr("exception %s in thread for %s" % \ @@ -367,7 +371,8 @@ class Irker: self.irc.add_global_handler("featurelist", self._handle_features) self.irc.add_global_handler("disconnect", self._handle_disconnect) self.irc.add_global_handler("kick", self._handle_kick) - thread = threading.Thread(target=self.irc.process_forever) + self.library_lock = threading.Lock() + thread = threading.Thread(target=self._process_forever) thread.setDaemon(True) self.irc._thread = thread thread.start() @@ -379,6 +384,12 @@ class Irker: "Debugging information." if self.debuglevel >= level: sys.stderr.write("irkerd: %s\n" % errmsg) + def _process_forever(self): + "IRC library process_forever with mutex." + self.debug(1, "process_forever()") + while True: + with self.library_lock: + self.irc.process_once() def _handle_ping(self, connection, _event): "PING arrived, bump the last-received time for the connection." if connection.context: