eventsource.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #!/usr/bin/env python3
  2. # Contest Management System - http://cms-dev.github.io/
  3. # Copyright © 2013 Luca Wehrstedt <luca.wehrstedt@gmail.com>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU Affero General Public License as
  7. # published by the Free Software Foundation, either version 3 of the
  8. # License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU Affero General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Affero General Public License
  16. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. import re
  18. import time
  19. from collections import deque
  20. from weakref import WeakSet
  21. from gevent import Timeout
  22. from gevent.pywsgi import WSGIHandler
  23. from gevent.queue import Queue, Empty
  24. from werkzeug.exceptions import NotAcceptable
  25. from werkzeug.wrappers import Request
  26. __all__ = [
  27. "format_event",
  28. "Publisher", "Subscriber", "EventSource",
  29. ]
  30. def format_event(id_, event, data):
  31. """Format the parameters to be sent on an event stream.
  32. Produce a text that, written on a Server-Sent Events connection,
  33. will cause the client to receive an event of the given type with
  34. the given data, and set the last event ID to the given id. No colon
  35. nor line breaks (i.e. "\\r\\n", "\\r", "\\n") are allowed in the
  36. event name and all line breaks in the event data will become "\\n".
  37. id_ (unicode): the ID of the event.
  38. event (unicode): the name of the event, or None.
  39. data (unicode): the content of the event, or None.
  40. return (bytes): the value to write on the stream.
  41. raise (TypeError): if any parameter isn't unicode.
  42. raise (ValueError): if event contains illegal characters.
  43. """
  44. if not isinstance(id_, str):
  45. raise TypeError("Id isn't unicode.")
  46. result = [b"id:%s" % id_.encode('utf-8')]
  47. if event is not None and event != "message":
  48. if not isinstance(event, str):
  49. raise TypeError("Event isn't unicode.")
  50. if not set("\r\n:").isdisjoint(event):
  51. raise ValueError("Event cannot contain '\\r', '\\n' or ':'.")
  52. result += [b"event:%s" % event.encode('utf-8')]
  53. if data is not None:
  54. if not isinstance(data, str):
  55. raise TypeError("Data isn't unicode.")
  56. for line in re.split("\r\n|(?<!\r)\n|\r(?!\n)", data):
  57. result += [b"data:%s" % line.encode('utf-8')]
  58. result += [b'\n']
  59. return b'\n'.join(result)
  60. class Publisher:
  61. """The publish part of a pub-sub broadcast system.
  62. Publish-subscribe is actually an improper name, as there's just one
  63. "topic", making it a simple broadcast system. The publisher class
  64. is responsible for receiving messages to be sent, keeping them in
  65. a cache for a while, instantiating subscribers, each with its own
  66. queue, and pushing new messages to all these queues.
  67. """
  68. def __init__(self, size):
  69. """Instantiate a new publisher.
  70. size (int): the number of messages to keep in cache.
  71. """
  72. # We use a deque as it's efficient to add messages to one end
  73. # and have the ones at the other end be dropped when the total
  74. # number exceeds the given limit.
  75. self._cache = deque(maxlen=size)
  76. # We use a WeakSet as we want queues to be vanish automatically
  77. # when no one else is using (i.e. fetching from) them.
  78. self._sub_queues = WeakSet()
  79. def put(self, event, data):
  80. """Dispatch a new item to all subscribers.
  81. See format_event for details about the parameters.
  82. event (unicode): the type of event the client will receive.
  83. data (unicode): the associated data.
  84. """
  85. # Number of microseconds since epoch.
  86. key = int(time.time() * 1_000_000)
  87. msg = format_event("%x" % key, event, data)
  88. # Put into cache.
  89. self._cache.append((key, msg))
  90. # Send to all subscribers.
  91. for queue in self._sub_queues:
  92. queue.put(msg)
  93. def get_subscriber(self, last_event_id=None):
  94. """Obtain a new subscriber.
  95. The returned subscriber will receive all messages after the one
  96. with the given index (if they are still in the cache).
  97. last_event_id (unicode|None): the ID of the last message the
  98. client did receive, to request the one generated since
  99. then to be sent again. If not given no past message will
  100. be sent.
  101. return (Subscriber): a new subscriber instance.
  102. """
  103. queue = Queue()
  104. # If a valid last_event_id is provided see if cache can supply
  105. # missed events.
  106. if last_event_id is not None and \
  107. re.match("^[0-9A-Fa-f]+$", last_event_id):
  108. last_event_key = int(last_event_id, 16)
  109. if len(self._cache) > 0 and last_event_key >= self._cache[0][0]:
  110. # All missed events are in cache.
  111. for key, msg in self._cache:
  112. if key > last_event_key:
  113. queue.put(msg)
  114. else:
  115. # Some events may be missing. Ask to reinit.
  116. queue.put(b"event:reinit\n\n")
  117. # Store the queue and return a subscriber bound to it.
  118. self._sub_queues.add(queue)
  119. return Subscriber(queue)
  120. class Subscriber:
  121. """The subscribe part of a pub-sub broadcast system.
  122. This class receives the messages sent to the Publisher that created
  123. it.
  124. """
  125. def __init__(self, queue):
  126. """Create a new subscriber.
  127. Make it wait for messages on the given queue, managed by the
  128. Publisher.
  129. queue (Queue): a message queue.
  130. """
  131. self._queue = queue
  132. def get(self):
  133. """Retrieve new messages.
  134. Obtain all messages that were put in the associated publisher
  135. since this method was last called, or (on the first call) since
  136. the last_event_id given to get_subscriber.
  137. return ([objects]): the items put in the publisher, in order
  138. (actually, returns a generator, not a list).
  139. raise (OutdatedError): if some of the messages it's supposed to
  140. retrieve have already been removed from the cache.
  141. """
  142. # Block until we have something to do.
  143. self._queue.peek()
  144. # Fetch all items that are immediately available.
  145. try:
  146. while True:
  147. yield self._queue.get_nowait()
  148. except Empty:
  149. pass
  150. class EventSource:
  151. """A class that implements a Server-Sent Events [1] handler.
  152. This class is intended to be extended: it takes charge of all the
  153. hard work of managing a stream of events, leaving to the subclass
  154. only the fun of determining which events to send.
  155. Server-Sent Events are a way to make server push using long-polling
  156. over HTTP connections, preferably with chunked transfer encoding.
  157. This use wasn't a design goal of WSGI but this class, which is a
  158. WSGI application, should be able to manage it. It has been written
  159. to work on a gevent.pywsgi server, but should handle other servers
  160. as well.
  161. """
  162. _GLOBAL_TIMEOUT = 600
  163. _WRITE_TIMEOUT = 30
  164. _PING_TIMEOUT = 15
  165. _CACHE_SIZE = 250
  166. def __init__(self):
  167. """Create an event source.
  168. """
  169. self._pub = Publisher(self._CACHE_SIZE)
  170. def send(self, event, data):
  171. """Send the event to the stream.
  172. Intended for subclasses to push new events to clients. See
  173. format_event for the meaning of the parameters.
  174. event (unicode): the type of the event.
  175. data (unicode): the data of the event.
  176. """
  177. self._pub.put(event, data)
  178. def __call__(self, environ, start_response):
  179. """Execute this instance as a WSGI application.
  180. See the PEP for the meaning of parameters. The separation of
  181. __call__ and wsgi_app eases the insertion of middlewares.
  182. """
  183. return self.wsgi_app(environ, start_response)
  184. def wsgi_app(self, environ, start_response):
  185. """Execute this instance as a WSGI application.
  186. See the PEP for the meaning of parameters. The separation of
  187. __call__ and wsgi_app eases the insertion of middlewares.
  188. """
  189. request = Request(environ)
  190. request.encoding_errors = "strict"
  191. # The problem here is that we'd like to send an infinite stream
  192. # of events, but WSGI has been designed to handle only finite
  193. # responses. Hence, to do this we will have to "abuse" the API
  194. # a little. This works well with gevent's pywsgi implementation
  195. # but it may not with others (still PEP-compliant). Therefore,
  196. # just to be extra-safe, we will terminate the response anyway,
  197. # after a long timeout, to make it finite.
  198. # The first such "hack" is the mechanism to trigger the chunked
  199. # transfer-encoding. The PEP states just that "the server *may*
  200. # use chunked encoding" to send each piece of data we give it,
  201. # if we don't specify a Content-Length header and if both the
  202. # client and the server support it. According to the HTTP spec.
  203. # all (and only) HTTP/1.1 compliant clients have to support it.
  204. # We'll assume that the server software supports it too, and
  205. # actually uses it (gevent does!) even if we have no way to
  206. # check it. We cannot try to force such behavior as the PEP
  207. # doesn't even allow us to set the Transfer-Encoding header.
  208. # The second abuse is the use of the write() callable, returned
  209. # by start_response, even if the PEP strongly discourages its
  210. # use in new applications. We do it because we need a way to
  211. # detect when the client disconnects, and we hope to achieve
  212. # this by seeing when a call to write() fails, i.e. raises an
  213. # exception. This behavior isn't documented by the PEP, but it
  214. # seems reasonable and it's present in gevent (which raises a
  215. # OSError).
  216. # The third non-standard behavior that we expect (related to
  217. # the previous one) is that no one in the application-to-client
  218. # chain does response buffering: neither any middleware nor the
  219. # server (gevent doesn't!). This should also hold outside the
  220. # server realm (i.e. no proxy buffering) but that's definitely
  221. # not our responsibility.
  222. # The fourth "hack" is to avoid an error to be printed on the
  223. # logs. If the client terminates the connection, we catch and
  224. # silently ignore the exception and return gracefully making
  225. # the server try to write the last zero-sized chunk (used to
  226. # mark the end of the stream). This will fail and produce an
  227. # error. To avoid this we detect if we're running on a gevent
  228. # server and make it "forget" this was a chunked response.
  229. # Check if the client will understand what we will produce.
  230. if request.accept_mimetypes.quality("text/event-stream") <= 0:
  231. return NotAcceptable()(environ, start_response)
  232. # Initialize the response and get the write() callback. The
  233. # Cache-Control header is useless for conforming clients, as
  234. # the spec. already imposes that behavior on them, but we set
  235. # it explicitly to avoid unwanted caching by unaware proxies and
  236. # middlewares.
  237. write = start_response(
  238. "200 OK", [("Content-Type", "text/event-stream; charset=utf-8"),
  239. ("Cache-Control", "no-cache")])
  240. # This is a part of the fourth hack (see above).
  241. if hasattr(start_response, "__self__") and \
  242. isinstance(start_response.__self__, WSGIHandler):
  243. handler = start_response.__self__
  244. else:
  245. handler = None
  246. # One-shot means that we will terminate the request after the
  247. # first batch of sent events. We do this when we believe the
  248. # client doesn't support chunked transfer. As this encoding has
  249. # been introduced in HTTP/1.1 (as mandatory!) we restrict to
  250. # requests in that HTTP version. Also, if it comes from an
  251. # XMLHttpRequest it has been probably sent from a polyfill (not
  252. # from the native browser implementation) which will be able to
  253. # read the response body only when it has been fully received.
  254. if environ["SERVER_PROTOCOL"] != "HTTP/1.1" or request.is_xhr:
  255. one_shot = True
  256. else:
  257. one_shot = False
  258. # As for the Server-Sent Events [1] spec., this is the way for
  259. # the client to tell us the ID of the last event it received
  260. # and to ask us to send it the ones that happened since then.
  261. # [1] http://www.w3.org/TR/eventsource/
  262. # The spec. requires implementations to retry the connection
  263. # when it fails, adding the "Last-Event-ID" HTTP header. But in
  264. # case of an error they stop, and we have to (manually) delete
  265. # the EventSource and create a new one. To obtain that behavior
  266. # again we give the "last_event_id" as a URL query parameter
  267. # (with lower priority, to have the header override it).
  268. last_event_id = request.headers.get("Last-Event-ID")
  269. if last_event_id is None:
  270. last_event_id = request.args.get("last_event_id")
  271. # We subscribe to the publisher to receive events.
  272. sub = self._pub.get_subscriber(last_event_id)
  273. # Send some data down the pipe. We need that to make the user
  274. # agent announces the connection (see the spec.). Since it's a
  275. # comment it will be ignored.
  276. write(b":\n")
  277. # XXX We could make the client change its reconnection timeout
  278. # by sending a "retry:" line.
  279. # As a last line of defence from very bad-behaving servers we
  280. # don't want to the request to last longer than _GLOBAL_TIMEOUT
  281. # seconds (see above). We use "False" to just cause the control
  282. # exit the with block, instead of raising an exception.
  283. with Timeout(self._GLOBAL_TIMEOUT, False):
  284. # Repeat indefinitely.
  285. while True:
  286. # Proxies often have a read timeout. We try not to hit
  287. # it by not being idle for more than _PING_TIMEOUT
  288. # seconds, sending a ping (i.e. a comment) if there's
  289. # no real data.
  290. try:
  291. with Timeout(self._PING_TIMEOUT):
  292. data = b"".join(sub.get())
  293. got_sth = True
  294. except Timeout:
  295. data = b":\n"
  296. got_sth = False
  297. try:
  298. with Timeout(self._WRITE_TIMEOUT):
  299. write(data)
  300. # The PEP doesn't tell what has to happen when a write
  301. # fails. We're conservative, and allow any unexpected
  302. # event to interrupt the request. We hope it's enough
  303. # to detect when the client disconnects. It is with
  304. # gevent, which raises an OSError. The timeout (we
  305. # catch that too) is just an extra precaution.
  306. except Exception:
  307. # This is part of the fourth hack (see above).
  308. if handler is not None:
  309. handler.response_use_chunked = False
  310. break
  311. # If we decided this is one-shot, stop the long-poll as
  312. # soon as we sent the client some real data.
  313. if one_shot and got_sth:
  314. break
  315. # An empty iterable tells the server not to send anything.
  316. return []