/usr/share/pyshared/paste/gzipper.py is in python-paste 1.7.5.1-4.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
WSGI middleware
Gzip-encodes the response.
"""
import gzip
from paste.response import header_value, remove_header
from paste.httpheaders import CONTENT_LENGTH
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
class GzipOutput(object):
pass
class middleware(object):
def __init__(self, application, compress_level=6):
self.application = application
self.compress_level = int(compress_level)
def __call__(self, environ, start_response):
if 'gzip' not in environ.get('HTTP_ACCEPT_ENCODING', ''):
# nothing for us to do, so this middleware will
# be a no-op:
return self.application(environ, start_response)
response = GzipResponse(start_response, self.compress_level)
app_iter = self.application(environ,
response.gzip_start_response)
if app_iter is not None:
response.finish_response(app_iter)
return response.write()
class GzipResponse(object):
def __init__(self, start_response, compress_level):
self.start_response = start_response
self.compress_level = compress_level
self.buffer = StringIO()
self.compressible = False
self.content_length = None
def gzip_start_response(self, status, headers, exc_info=None):
self.headers = headers
ct = header_value(headers,'content-type')
ce = header_value(headers,'content-encoding')
self.compressible = False
if ct and (ct.startswith('text/') or ct.startswith('application/')) \
and 'zip' not in ct:
self.compressible = True
if ce:
self.compressible = False
if self.compressible:
headers.append(('content-encoding', 'gzip'))
remove_header(headers, 'content-length')
self.headers = headers
self.status = status
return self.buffer.write
def write(self):
out = self.buffer
out.seek(0)
s = out.getvalue()
out.close()
return [s]
def finish_response(self, app_iter):
if self.compressible:
output = gzip.GzipFile(mode='wb', compresslevel=self.compress_level,
fileobj=self.buffer)
else:
output = self.buffer
try:
for s in app_iter:
output.write(s)
if self.compressible:
output.close()
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
content_length = self.buffer.tell()
CONTENT_LENGTH.update(self.headers, content_length)
self.start_response(self.status, self.headers)
def filter_factory(application, **conf):
import warnings
warnings.warn(
'This function is deprecated; use make_gzip_middleware instead',
DeprecationWarning, 2)
def filter(application):
return middleware(application)
return filter
def make_gzip_middleware(app, global_conf, compress_level=6):
"""
Wrap the middleware, so that it applies gzipping to a response
when it is supported by the browser and the content is of
type ``text/*`` or ``application/*``
"""
compress_level = int(compress_level)
return middleware(app, compress_level=compress_level)
|