class Target():
"Represent a transmission target."
def __init__(self, url):
+ self.url = url
# Pre-2.6 Pythons don't recognize irc: as a valid URL prefix.
url = url.replace("irc://", "http://")
parsed = urlparse.urlparse(url)
if parsed.query:
self.key = re.sub("^key=", "", parsed.query)
self.port = int(ircport)
- def valid(self):
- "Both components must be present for a valid target."
- return self.servername and self.channel
+ def validate(self):
+ "Raise InvalidRequest if the URL is missing a critical component"
+ if not self.servername:
+ raise InvalidRequest(
+ 'target URL missing a servername: %r' % self.url)
+ if not self.channel:
+ raise InvalidRequest(
+ 'target URL missing a channel: %r' % self.url)
def server(self):
"Return a hashable tuple representing the destination server."
return (self.servername, self.port)
def pending(self):
"Do we have any pending message traffic?"
return [k for (k, v) in self.servers.items() if v.pending()]
+
+ def _parse_request(self, line):
+ "Request-parsing helper for the handle() method"
+ request = json.loads(line.strip())
+ if not isinstance(request, dict):
+ raise InvalidRequest(
+ "request is not a JSON dictionary: %r" % request)
+ if "to" not in request or "privmsg" not in request:
+ raise InvalidRequest(
+ "malformed request - 'to' or 'privmsg' missing: %r" % request)
+ channels = request['to']
+ message = request['privmsg']
+ if not isinstance(channels, (list, basestring)):
+ raise InvalidRequest(
+ "malformed request - unexpected channel type: %r" % channels)
+ if not isinstance(message, basestring):
+ raise InvalidRequest(
+ "malformed request - unexpected message type: %r" % message)
+ if not isinstance(channels, list):
+ channels = [channels]
+ targets = []
+ for url in channels:
+ try:
+ if not isinstance(url, basestring):
+ raise InvalidRequest(
+ "malformed request - URL has unexpected type: %r" %
+ url)
+ target = Target(url)
+ target.validate()
+ except InvalidRequest, e:
+ self.logerr(str(e))
+ else:
+ targets.append(target)
+ return (targets, message)
+
def handle(self, line, quit_after=False):
"Perform a JSON relay request."
try:
- request = json.loads(line.strip())
- if not isinstance(request, dict):
- raise InvalidRequest(
- "request is not a JSON dictionary: %r" % request)
- if "to" not in request or "privmsg" not in request:
- raise InvalidRequest(
- "malformed request - 'to' or 'privmsg' missing: %r" %
- request)
- channels = request['to']
- message = request['privmsg']
- if not isinstance(channels, (list, basestring)):
- raise InvalidRequest(
- "malformed request - unexpected channel type: %r"
- % channels)
- if not isinstance(message, basestring):
- raise InvalidRequest(
- "malformed request - unexpected message type: %r" %
- message)
- if not isinstance(channels, list):
- channels = [channels]
- for url in channels:
- try:
- if not isinstance(url, basestring):
- raise InvalidRequest(
- "malformed request - URL has unexpected type: %r" %
- url)
- except InvalidRequest, e:
- self.logerr(str(e))
- else:
- target = Target(url)
- if not target.valid():
- return
- if target.server() not in self.servers:
- self.servers[target.server()] = Dispatcher(
- self, target.servername, target.port)
- self.servers[target.server()].dispatch(
- target.channel, message, target.key,
- quit_after=quit_after)
- # GC dispatchers with no active connections
- servernames = self.servers.keys()
- for servername in servernames:
- if not self.servers[servername].live():
- del self.servers[servername]
- # If we might be pushing a resource limit even
- # after garbage collection, remove a session.
- # The goal here is to head off DoS attacks
- # that aim at exhausting thread space or file
- # descriptors. The cost is that attempts to
- # DoS this service will cause lots of
- # join/leave spam as we scavenge old channels
- # after connecting to new ones. The particular
- # method used for selecting a session to be
- # terminated doesn't matter much; we choose
- # the one longest idle on the assumption that
- # message activity is likely to be clumpy.
- if len(self.servers) >= CONNECTION_MAX:
- oldest = min(
- self.servers.keys(),
- key=lambda name: self.servers[name].last_xmit())
- del self.servers[oldest]
+ targets, message = self._parse_request(line=line)
+ for target in targets:
+ if target.server() not in self.servers:
+ self.servers[target.server()] = Dispatcher(
+ self, target.servername, target.port)
+ self.servers[target.server()].dispatch(
+ target.channel, message, target.key, quit_after=quit_after)
+ # GC dispatchers with no active connections
+ servernames = self.servers.keys()
+ for servername in servernames:
+ if not self.servers[servername].live():
+ del self.servers[servername]
+ # If we might be pushing a resource limit even
+ # after garbage collection, remove a session. The
+ # goal here is to head off DoS attacks that aim at
+ # exhausting thread space or file descriptors.
+ # The cost is that attempts to DoS this service
+ # will cause lots of join/leave spam as we
+ # scavenge old channels after connecting to new
+ # ones. The particular method used for selecting a
+ # session to be terminated doesn't matter much; we
+ # choose the one longest idle on the assumption
+ # that message activity is likely to be clumpy.
+ if len(self.servers) >= CONNECTION_MAX:
+ oldest = min(
+ self.servers.keys(),
+ key=lambda name: self.servers[name].last_xmit())
+ del self.servers[oldest]
except InvalidRequest, e:
self.logerr(str(e))
except ValueError: