/usr/share/pyshared/cyclone/sse.py is in python-cyclone 1.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | # coding: utf-8
#
# Copyright 2010 Alexandre Fiori
# based on the original Tornado by Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""`Server-sent events <http://en.wikipedia.org/wiki/Server-sent_events>`_
is a technology for providing push notifications from a server to a browser
client in the form of DOM events.
For more information, check out the `SEE demo
<https://github.com/fiorix/cyclone/tree/master/demos/sse>`_.
"""
from cyclone import escape
from cyclone.web import RequestHandler
from twisted.python import log
class SSEHandler(RequestHandler):
"""Subclass this class and define `bind` and `unbind` to get
notified when a new client connects or disconnects, respectively.
Once connected, you may send events to the browser via `sendEvent`.
"""
def __init__(self, application, request):
RequestHandler.__init__(self, application, request)
self.transport = request.connection.transport
self._auto_finish = False
def sendEvent(self, message, event=None, eid=None, retry=None):
"""
sendEvent is the single method to send events to clients.
Parameters:
message: the event itself
event: optional event name
eid: optional event id to be used as Last-Event-ID header or
e.lastEventId property
retry: set the retry timeout in ms. default 3 secs.
"""
if isinstance(message, dict):
message = escape.json_encode(message)
if isinstance(message, unicode):
message = message.encode("utf-8")
assert isinstance(message, str)
if eid:
self.transport.write("id: %s\n" % eid)
if event:
self.transport.write("event: %s\n" % event)
if retry:
self.transport.write("retry: %s\n" % retry)
self.transport.write("data: %s\n\n" % message)
def _execute(self, transforms, *args, **kwargs):
self._transforms = [] # transforms
if self.settings.get("debug"):
log.msg("SSE connection from %s" % self.request.remote_ip)
self.set_header("Content-Type", "text/event-stream")
self.set_header("Cache-Control", "no-cache")
self.set_header("Connection", "keep-alive")
self.flush()
self.request.connection.setRawMode()
self.notifyFinish().addCallback(self.on_connection_closed)
self.bind()
def on_connection_closed(self, *args, **kwargs):
if self.settings.get("debug"):
log.msg("SSE client disconnected %s" % self.request.remote_ip)
self.unbind()
def bind(self):
"""Gets called when a new client connects."""
pass
def unbind(self):
"""Gets called when an existing client disconnects."""
pass
|