- request = json.loads(line.strip())
- if not isinstance(request, dict):
- self.logerr("request is not a JSON dictionary: %r" % request)
- elif "to" not in request or "privmsg" not in request:
- self.logerr("malformed request - 'to' or 'privmsg' missing: %r" % request)
- else:
- channels = request['to']
- message = request['privmsg']
- if not isinstance(channels, (list, basestring)):
- self.logerr("malformed request - unexpected channel type: %r" % channels)
- if not isinstance(message, basestring):
- self.logerr("malformed request - unexpected message type: %r" % message)
- else:
- if not isinstance(channels, list):
- channels = [channels]
- for url in channels:
- if not isinstance(url, basestring):
- self.logerr("malformed request - URL has unexpected type: %r" % url)
- else:
- target = Target(url)
- if not target.valid():
- return
- if target.server() not in self.servers:
- self.servers[target.server()] = Dispatcher(self, target.servername, target.port)
- self.servers[target.server()].dispatch(target.channel, message, target.key)
- # GC dispatchers with no active connections
- servernames = self.servers.keys()
- for servername in servernames:
- if not self.servers[servername].live():
- del self.servers[servername]
- # If we might be pushing a resource limit
- # even after garbage collection, remove a
- # session. The goal here is to head off
- # DoS attacks that aim at exhausting
- # thread space or file descriptors. The
- # cost is that attempts to DoS this
- # service will cause lots of join/leave
- # spam as we scavenge old channels after
- # connecting to new ones. The particular
- # method used for selecting a session to
- # be terminated doesn't matter much; we
- # choose the one longest idle on the
- # assumption that message activity is likely
- # to be clumpy.
- if len(self.servers) >= CONNECTION_MAX:
- oldest = min(self.servers.keys(), key=lambda name: self.servers[name].last_xmit())
- del self.servers[oldest]
+ targets, message = self._parse_request(line=line)
+ for target in targets:
+ if target.server() not in self.servers:
+ self.servers[target.server()] = Dispatcher(
+ self, target=target, **self.kwargs)
+ self.servers[target.server()].dispatch(
+ target.channel, message, target.key, quit_after=quit_after)
+ # GC dispatchers with no active connections
+ servernames = self.servers.keys()
+ for servername in servernames:
+ if not self.servers[servername].live():
+ del self.servers[servername]
+ # If we might be pushing a resource limit even
+ # after garbage collection, remove a session. The
+ # goal here is to head off DoS attacks that aim at
+ # exhausting thread space or file descriptors.
+ # The cost is that attempts to DoS this service
+ # will cause lots of join/leave spam as we
+ # scavenge old channels after connecting to new
+ # ones. The particular method used for selecting a
+ # session to be terminated doesn't matter much; we
+ # choose the one longest idle on the assumption
+ # that message activity is likely to be clumpy.
+ if len(self.servers) >= CONNECTION_MAX:
+ oldest = min(
+ self.servers.keys(),
+ key=lambda name: self.servers[name].last_xmit())
+ del self.servers[oldest]
+ except InvalidRequest, e:
+ LOG.error(str(e))