/usr/lib/python2.7/dist-packages/ginga/web/pgw/PgHelp.py is in python-ginga 2.6.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | #
# PgHelp.py -- web application threading help routines.
#
# This is open-source software licensed under a BSD license.
# Please see the file LICENSE.txt for details.
#
import tornado.web
import tornado.websocket
import tornado.template
from tornado.ioloop import IOLoop
import random
import json
import os, time
import datetime
import binascii
from collections import namedtuple
from io import BytesIO
from ginga.misc import Bunch
from ginga.util import io_rgb
default_interval = 10
ConfigEvent = namedtuple("ConfigEvent", ["type", "id", "width", "height"])
InputEvent = namedtuple("InputEvent", ["type", "id", "x", "y", "button",
"delta", "alt_key", "ctrl_key",
"meta_key", "shift_key", "key_code"])
GestureEvent = namedtuple("GestureEvent", ["type", "id", "x", "y", "dx", "dy",
"distance",
"theta", "direction", "vx", "vy",
"scale", "rotation", "isfirst",
"isfinal"])
WidgetEvent = namedtuple("WidgetEvent", ["type", "id", "value"])
TimerEvent = namedtuple("TimerEvent", ["type", "id", "value"])
class ApplicationHandler(tornado.websocket.WebSocketHandler):
def initialize(self, name, app):
self.name = name
self.app = app
self.app.add_ws_handler(self)
self.event_callbacks = {
"activate": WidgetEvent,
"setbounds": ConfigEvent,
"mousedown": InputEvent,
"mouseup": InputEvent,
"mousemove": InputEvent,
"mouseout": InputEvent,
"mouseover": InputEvent,
"mousewheel": InputEvent,
"wheel": InputEvent,
"click": InputEvent,
"dblclick": InputEvent,
"keydown": InputEvent,
"keyup": InputEvent,
"keypress": InputEvent,
"resize": ConfigEvent,
"focus": InputEvent,
"focusout": InputEvent,
"blur": InputEvent,
"drop": InputEvent,
#"paste": InputEvent,
# These are all Hammer.js events
"pinch": GestureEvent,
"pinchstart": GestureEvent,
"pinchend": GestureEvent,
"rotate": GestureEvent,
"rotatestart": GestureEvent,
"rotateend": GestureEvent,
"pan": GestureEvent,
"panstart": GestureEvent,
"panend": GestureEvent,
"tap": GestureEvent,
"swipe": GestureEvent,
}
#self.interval = 10
interval = self.settings.get("timer_interval", default_interval)
if self.name in self.settings:
interval = self.settings[self.name].get("timer_interval", interval)
self.interval = interval
# randomize the first timeout so we don't get every timer
# expiring at the same time
interval = random.randint(1, self.interval)
delta = datetime.timedelta(milliseconds=interval)
self.timeout = IOLoop.current().add_timeout(delta, self.timer_tick)
def add_event_type(self, msg_type, event_class):
self.event_callbacks[msg_type] = event_class
def on_open(self, *args, **kwdargs):
self.set_nodelay(True)
def on_close(self):
IOLoop.current().remove_timeout(self.timeout)
def on_message(self, raw_message):
message = json.loads(raw_message)
event_type = message.get("type")
try:
event_class = self.event_callbacks[event_type]
except KeyError:
print("I don't know how to process '%s' events!" % (
event_type))
return
event = event_class(**message)
self.app.widget_event(event)
def do_operation(self, operation, **kwargs):
message = dict(kwargs, operation=operation)
raw_message = json.dumps(message)
self.write_message(raw_message)
def timer_tick(self):
event = TimerEvent(type="timer", id=0, value=time.time())
# TODO: should exceptions thrown from this be caught and ignored
self.app.widget_event(event)
delta = datetime.timedelta(milliseconds = self.interval)
self.timeout = IOLoop.current().add_timeout(delta, self.timer_tick)
class WindowHandler(tornado.web.RequestHandler):
def initialize(self, name, url, app):
self.app = app
self.logger = app.logger
self.logger.info("windowhandler initialize")
self.name = name
self.url = url
def make_index(self, wids):
template = '''
<!doctype html>
<html>
<head>
<title>%(title)s</title>
</head>
<body>
%(content)s
</body>
</html>'''
content = ["<ul>"]
for wid in wids:
content.append('''<li><a href="%s?id=%s">Window %s</a></li>''' % (
self.url, wid, wid))
content.append("</ul>")
return template % dict(title="Window index", content=''.join(content))
def get(self):
self.logger.info("windowhandler get")
# Collect arguments
wid = self.get_argument('id', None)
# Get window with this id
wids = self.app.get_wids()
if wid in wids:
window = self.app.get_window(wid)
output = window.render()
else:
output = self.make_index(wids)
self.write(output)
class Timer(object):
"""Abstraction of a GUI-toolkit implemented timer."""
def __init__(self, ival_sec, expire_cb, data=None, mplcanvas=None):
"""Create a timer set to expire after `ival_sec` and which will
call the callable `expire_cb` when it expires.
"""
self.ival_sec = ival_sec
self.cb = expire_cb
self.data = data
self._timer = mplcanvas.new_timer()
self._timer.single_shot = True
self._timer.add_callback(self._redirect_cb)
def start(self, ival_sec=None):
"""Start the timer. If `ival_sec` is not None, it should
specify the time to expiration in seconds.
"""
if ival_sec is None:
ival_sec = self.ival_sec
self.cancel()
# Matplotlib timer set in milliseconds
time_ms = int(ival_sec * 1000.0)
self._timer.interval = time_ms
self._timer.start()
def _redirect_cb(self):
self.cb(self)
def cancel(self):
"""Cancel this timer. If the timer is not running, there
is no error.
"""
try:
self._timer.stop()
except:
pass
def get_image_src_from_buffer(img_buf, imgtype='png'):
img_string = binascii.b2a_base64(img_buf)
if isinstance(img_string, bytes):
img_string = img_string.decode("utf-8")
return ('data:image/%s;base64,' % imgtype) + img_string
def get_icon(iconpath, size=None, format='png'):
image = io_rgb.PILimage.open(iconpath)
if size is not None:
wd, ht = size
else:
wd, ht = 24, 24
image = image.resize((wd, ht))
img_buf = BytesIO()
image.save(img_buf, format=format)
icon = get_image_src_from_buffer(img_buf.getvalue(), imgtype=format)
return icon
def get_font(font_family, point_size):
font = '%s %d' % (font_family, point_size)
return font
#END
|