/usr/lib/python3/dist-packages/klaus/utils.py is in python3-klaus 1.2.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | # encoding: utf-8
import os
import re
import time
import datetime
import mimetypes
import locale
import warnings
import subprocess
import six
try:
import chardet
except ImportError:
chardet = None
from werkzeug.contrib.fixers import ProxyFix as WerkzeugProxyFix
from humanize import naturaltime
class ProxyFix(WerkzeugProxyFix):
"""This middleware can be applied to add HTTP (reverse) proxy support to a
WSGI application (klaus), making it possible to:
* Mount it under a sub-URL (http://example.com/git/...)
* Use a different HTTP scheme (HTTP vs. HTTPS)
* Make it appear under a different domain altogether
It sets `REMOTE_ADDR`, `HTTP_HOST` and `wsgi.url_scheme` from `X-Forwarded-*`
headers. It also sets `SCRIPT_NAME` from the `X-Script-Name` header.
For instance if you have klaus mounted under /git/ and your site uses SSL
(but your proxy doesn't), make the proxy pass ::
X-Script-Name = '/git'
X-Forwarded-Proto = 'https'
...
If you have more than one proxy server in front of your app, set
`num_proxies` accordingly.
Do not use this middleware in non-proxy setups for security reasons.
The original values of `REMOTE_ADDR` and `HTTP_HOST` are stored in
the WSGI environment as `werkzeug.proxy_fix.orig_remote_addr` and
`werkzeug.proxy_fix.orig_http_host`.
:param app: the WSGI application
:param num_proxies: the number of proxy servers in front of the app.
"""
def __call__(self, environ, start_response):
script_name = environ.get('HTTP_X_SCRIPT_NAME')
if script_name is not None:
if script_name.endswith('/'):
warnings.warn(
"'X-Script-Name' header should not end in '/' (found: %r). "
"Please fix your proxy's configuration." % script_name)
script_name = script_name.rstrip('/')
environ['SCRIPT_NAME'] = script_name
return super(ProxyFix, self).__call__(environ, start_response)
class SubUri(object):
"""WSGI middleware to tweak the WSGI environ so that it's possible to serve
the wrapped app (klaus) under a sub-URL and/or to use a different HTTP
scheme (http:// vs. https://) for proxy communication.
This is done by making your proxy pass appropriate HTTP_X_SCRIPT_NAME and
HTTP_X_SCHEME headers.
For instance if you have klaus mounted under /git/ and your site uses SSL
(but your proxy doesn't), make it pass ::
X-Script-Name = '/git'
X-Scheme = 'https'
Snippet stolen from http://flask.pocoo.org/snippets/35/
"""
def __init__(self, app):
warnings.warn(
"'klaus.utils.SubUri' is deprecated and will be removed. "
"Please upgrade your code to use 'klaus.utils.ProxyFix' instead.",
DeprecationWarning
)
self.app = app
def __call__(self, environ, start_response):
script_name = environ.get('HTTP_X_SCRIPT_NAME', '')
if script_name:
environ['SCRIPT_NAME'] = script_name.rstrip('/')
if script_name and environ['PATH_INFO'].startswith(script_name):
# strip `script_name` from PATH_INFO
environ['PATH_INFO'] = environ['PATH_INFO'][len(script_name):]
if 'HTTP_X_SCHEME' in environ:
environ['wsgi.url_scheme'] = environ['HTTP_X_SCHEME']
return self.app(environ, start_response)
def timesince(when, now=time.time):
"""Return the difference between `when` and `now` in human readable form."""
return naturaltime(now() - when)
def formattimestamp(timestamp):
return datetime.datetime.fromtimestamp(timestamp).strftime('%b %d, %Y %H:%M:%S')
def guess_is_binary(dulwich_blob):
return any(b'\0' in chunk for chunk in dulwich_blob.chunked)
def guess_is_image(filename):
mime, _ = mimetypes.guess_type(filename)
if mime is None:
return False
return mime.startswith('image/')
def encode_for_git(s):
# XXX This assumes everything to be UTF-8 encoded
return s.encode('utf8')
def decode_from_git(b):
# XXX This assumes everything to be UTF-8 encoded
return b.decode('utf8')
def force_unicode(s):
"""Do all kinds of magic to turn `s` into unicode"""
# It's already unicode, don't do anything:
if isinstance(s, six.text_type):
return s
# Try some default encodings:
try:
return s.decode('utf-8')
except UnicodeDecodeError as exc:
pass
try:
return s.decode(locale.getpreferredencoding())
except UnicodeDecodeError:
pass
if chardet is not None:
# Try chardet, if available
encoding = chardet.detect(s)['encoding']
if encoding is not None:
return s.decode(encoding)
raise exc # Give up.
def extract_author_name(email):
"""Extract the name from an email address --
>>> extract_author_name("John <john@example.com>")
"John"
-- or return the address if none is given.
>>> extract_author_name("noname@example.com")
"noname@example.com"
"""
match = re.match('^(.*?)<.*?>$', email)
if match:
return match.group(1).strip()
return email
def shorten_sha1(sha1):
if re.match(r'[a-z\d]{20,40}', sha1):
sha1 = sha1[:7]
return sha1
def parent_directory(path):
return os.path.split(path)[0]
def subpaths(path):
"""Yield a `(last part, subpath)` tuple for all possible sub-paths of `path`.
>>> list(subpaths("foo/bar/spam"))
[('foo', 'foo'), ('bar', 'foo/bar'), ('spam', 'foo/bar/spam')]
"""
seen = []
for part in path.split('/'):
seen.append(part)
yield part, '/'.join(seen)
def shorten_message(msg):
return msg.split('\n')[0]
def replace_dupes(ls, replacement):
"""Replace items in `ls` that are equal to their predecessors with `replacement`.
>>> ls = [1, 2, 2, 3, 2, 2, 2]
>>> replace_dupes(x, 'x')
>>> ls
[1, 2, 'x', 3, 2, 'x', 'x']
"""
last = object()
for i, elem in enumerate(ls):
if last == elem:
ls[i] = replacement
else:
last = elem
def guess_git_revision():
"""Try to guess whether this instance of klaus is run directly from a klaus
git checkout. If it is, guess and return the currently checked-out commit
SHA. If it's not (installed using pip, setup.py or the like), return None.
This is used to display the "powered by klaus $VERSION" footer on each page,
$VERSION being either the SHA guessed by this function or the latest release number.
"""
git_dir = os.path.join(os.path.dirname(__file__), '..', '.git')
try:
return force_unicode(subprocess.check_output(
['git', 'log', '--format=%h', '-n', '1'],
cwd=git_dir
).strip())
except OSError:
# Either the git executable couldn't be found in the OS's PATH
# or no ".git" directory exists, i.e. this is no "bleeding-edge" installation.
return None
def sanitize_branch_name(name, chars='./', repl='-'):
for char in chars:
name = name.replace(char, repl)
return name
def escape_html(s):
return s.replace(b'&', b'&').replace(b'<', b'<') \
.replace(b'>', b'>').replace(b'"', b'"')
|