This file is indexed.

/usr/lib/python2.7/dist-packages/nevow/test/test_compression.py is in python-nevow 0.14.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
"""
Tests for on-the-fly content compression encoding.
"""
from StringIO import StringIO
from gzip import GzipFile

from zope.interface import implements

from twisted.trial.unittest import TestCase
from twisted.internet.defer import succeed

from nevow.inevow import IResource, IRequest
from nevow.testutil import FakeRequest
from nevow.context import RequestContext
from nevow.appserver import errorMarker
from nevow.rend import NotFound
from nevow.compression import CompressingResourceWrapper, CompressingRequestWrapper
from nevow.compression import parseAcceptEncoding, _ProxyDescriptor



class HeaderTests(TestCase):
    """
    Tests for header parsing.
    """
    def test_parseAcceptEncoding(self):
        """
        Test the parsing of a variety of Accept-Encoding field values.
        """
        cases = [
            ('compress, gzip',
             {'compress': 1.0, 'gzip': 1.0, 'identity': 0.0001}),
            ('',
             {'identity': 0.0001}),
            ('*',
             {'*': 1}),
            ('compress;q=0.5, gzip;q=1.0',
             {'compress': 0.5, 'gzip': 1.0, 'identity': 0.0001}),
            ('gzip;q=1.0, identity;q=0.5, *;q=0',
             {'gzip': 1.0, 'identity': 0.5, '*': 0})
            ]

        for value, result in cases:
            self.assertEqual(parseAcceptEncoding(value), result, msg='error parsing %r' % value)



class _Dummy(object):
    """
    Dummy object just to get an instance dict.
    """



class _Wrapper(object):
    """
    Test wrapper.
    """
    x = _ProxyDescriptor('x')
    y = _ProxyDescriptor('y')

    def __init__(self, underlying):
        self.underlying = underlying



class ProxyDescriptorTests(TestCase):
    """
    Tests for L{_ProxyDescriptor}.
    """
    def setUp(self):
        """
        Set up a dummy object and a wrapper for it.
        """
        self.dummy = _Dummy()
        self.dummy.x = object()
        self.dummy.y = object()
        self.wrapper = _Wrapper(self.dummy)


    def test_proxyGet(self):
        """
        Getting a proxied attribute should retrieve the underlying attribute.
        """
        self.assertIdentical(self.wrapper.x, self.dummy.x)
        self.assertIdentical(self.wrapper.y, self.dummy.y)
        self.dummy.x = object()
        self.assertIdentical(self.wrapper.x, self.dummy.x)


    def test_proxyClassGet(self):
        """
        Getting a proxied attribute from the class should just retrieve the
        descriptor.
        """
        self.assertIdentical(_Wrapper.x, _Wrapper.__dict__['x'])


    def test_proxySet(self):
        """
        Setting a proxied attribute should set the underlying attribute.
        """
        self.wrapper.x = object()
        self.assertIdentical(self.dummy.x, self.wrapper.x)
        self.wrapper.y = 5
        self.assertEqual(self.dummy.y, 5)


    def test_proxyDelete(self):
        """
        Deleting a proxied attribute should delete the underlying attribute.
        """
        self.assertTrue(hasattr(self.dummy, 'x'))
        del self.wrapper.x
        self.assertFalse(hasattr(self.dummy, 'x'))



class RequestWrapperTests(TestCase):
    """
    Tests for L{CompressingRequestWrapper}.
    """
    def setUp(self):
        """
        Wrap a request fake to test the wrapper.
        """
        self.request = FakeRequest()
        self.wrapper = CompressingRequestWrapper(self.request)


    def test_attributes(self):
        """
        Attributes on the wrapper should be forwarded to the underlying
        request.
        """
        attributes = ['method', 'uri', 'path', 'args', 'requestHeaders']
        for attrName in attributes:
            self.assertIdentical(getattr(self.wrapper, attrName),
                                 getattr(self.request, attrName))


    def test_missingAttributes(self):
        """
        Attributes that are not part of the interfaces being proxied should not
        be proxied.
        """
        self.assertRaises(AttributeError, getattr, self.wrapper, 'doesntexist')
        self.request._privateTestAttribute = 42
        self.assertRaises(AttributeError, getattr, self.wrapper, '_privateTestAttribute')


    def test_contentLength(self):
        """
        Content-Length header should be discarded when compression is in use.
        """
        self.assertFalse(
            self.request.responseHeaders.hasHeader('content-length'))
        self.wrapper.setHeader('content-length', 1234)
        self.assertFalse(
            self.request.responseHeaders.hasHeader('content-length'))

        self.request.setHeader('content-length', 1234)
        self.wrapper = CompressingRequestWrapper(self.request)
        self.assertFalse(
            self.request.responseHeaders.hasHeader('content-length'))


    def test_responseHeaders(self):
        """
        Content-Encoding header should be set appropriately.
        """
        self.assertEqual(
            self.request.responseHeaders.getRawHeaders('content-encoding'),
            ['gzip'])


    def test_lazySetup(self):
        """
        The gzip prelude should only be written once real data is written.

        This is necessary to avoid terminating the header too quickly.
        """
        self.assertEqual(self.request.accumulator, '')
        self.wrapper.write('foo')
        self.assertNotEqual(self.request.accumulator, '')


    def _ungzip(self, data):
        """
        Un-gzip some data.
        """
        s = StringIO(data)
        return GzipFile(fileobj=s, mode='rb').read()


    def test_encoding(self):
        """
        Response content should be written out in compressed format.
        """
        self.wrapper.write('foo')
        self.wrapper.write('bar')
        self.wrapper.finishRequest(True)
        self.assertEqual(self._ungzip(self.request.accumulator), 'foobar')


    def test_finish(self):
        """
        Calling C{finishRequest()} on the wrapper should cause the underlying
        implementation to be called.
        """
        self.wrapper.finishRequest(True)
        self.assertTrue(self.request.finished)



class TestResource(object):
    """
    L{IResource} implementation for testing.

    @ivar lastRequest: The last request we were rendered with.
    @type lastRequest: L{IRequest} or C{None}
    @ivar segments: The segments we were constructed with.
    @type segments: C{list}
    @ivar html: The data to return from C{renderHTTP}.
    """
    implements(IResource)

    lastRequest = None

    def __init__(self, segments=[], html='o hi'):
        self.segments = segments
        self.html = html


    def locateChild(self, ctx, segments):
        """
        Construct a new resource of our type.

        We hand out child resources for any segments, as this is the simplest
        thing to do.
        """
        return type(self)(segments), []


    def renderHTTP(self, ctx):
        """
        Stash the request for later inspection.
        """
        self.lastRequest = IRequest(ctx)
        return self.html



class TestChildlessResource(object):
    """
    L{IResource} implementation with no children.
    """
    implements(IResource)

    def locateChild(self, ctx, segments):
        """
        Always return C{NotFound}.
        """
        return NotFound



class TestDeferredResource(object):
    """
    L{IResource} implementation with children.
    """
    implements(IResource)

    def locateChild(self, ctx, segments):
        """
        Construct a new resource of our type.

        We hand out child resources for any segments, but the resource itself
        is wrapped in a deferred.
        """
        return succeed(type(self)()), []



class TestResourceWrapper(CompressingResourceWrapper):
    """
    Subclass for testing purposes, just to create a new type.
    """


class TestBrokenResource(object):
    """
    L{IResource} implementation that returns garbage from C{locateChild}.
    """
    implements(IResource)

    def locateChild(self, ctx, segments):
        """
        Return some garbage.
        """
        return 42



class ResourceWrapper(TestCase):
    """
    Tests for L{CompressingResourceWrapper}.

    @ivar resource: The underlying resource for testing.
    @type resource: L{TestResource}
    @ivar wrapped: The wrapped resource.
    @type wrapped: L{CompressingResourceWrapper}
    @ivar request: A fake request.
    @type request: L{FakeRequest}
    @ivar ctx: A dummy context.
    @type ctx: L{RequestContext}
    """
    def setUp(self):
        self.resource = TestResource()
        self.wrapped = CompressingResourceWrapper(self.resource)
        self.request = FakeRequest()
        self.ctx = RequestContext(tag=self.request)


    def test_rendering(self):
        """
        Rendering a wrapped resource renders the underlying resource with a
        wrapped request if compression is available.
        """
        self.wrapped.canCompress = lambda req: True
        self.wrapped.renderHTTP(self.ctx)
        self.assertEqual(type(self.resource.lastRequest), CompressingRequestWrapper)
        self.assertIdentical(self.resource.lastRequest.underlying, self.request)


    def test_renderingUnwrapped(self):
        """
        Rendering a wrapped resource renders the underlying resource with an
        unwrapped request if compression is not available.
        """
        self.wrapped.canCompress = lambda req: False
        self.wrapped.renderHTTP(self.ctx)
        self.assertIdentical(self.resource.lastRequest, self.request)


    def test_awfulHack(self):
        """
        Rendering a wrapped resource should finish off the request, and return
        a special sentinel value to prevent the Site machinery from trying to
        finish it again.
        """
        def _cbCheckReturn(result):
            self.rendered = True
            self.assertIdentical(result, errorMarker)
            self.assertTrue(self.request.finished)

        self.rendered = False
        self.wrapped.canCompress = lambda req: True
        self.wrapped.renderHTTP(self.ctx).addCallback(_cbCheckReturn)
        # The callback should run synchronously
        self.assertTrue(self.rendered)


    def test_rendering(self):
        """
        Returning something other than C{str} causes the value to be passed
        through.
        """
        def _cbResult(result):
            self.result = result

        marker = object()
        self.resource.html = marker
        self.wrapped.canCompress = lambda req: True
        self.wrapped.renderHTTP(self.ctx).addCallback(_cbResult)
        self.assertIdentical(marker, self.result)


    def _cbCheckChild(self, result):
        """
        Check that the child resource is wrapped correctly.
        """
        self.checked = True


    def _locateChild(self, resource, segments):
        """
        Helper function for retrieving a child synchronously.
        """
        def _cbGotChild(result):
            self.gotChild = True
            self.result = result

        def _ebChild(f):
            self.gotChild = 'error'
            self.f = f

        self.gotChild = False
        resource.locateChild(None, segments).addCallbacks(_cbGotChild, _ebChild)
        self.assertTrue(self.gotChild)
        if self.gotChild == 'error':
            self.f.raiseException()
        return self.result


    def test_wrapChildren(self):
        """
        Any children of the wrapped resource should also be wrapped.
        """
        self.checked = False
        child, segments = self._locateChild(self.wrapped, ['some', 'child', 'segments'])
        self.assertIdentical(type(child), type(self.wrapped))
        self.assertEqual(child.underlying.segments, ['some', 'child', 'segments'])


    def test_wrapChildrenSubclass(self):
        """
        The request wrapper should wrap children with the same type.
        """
        self.wrapped = TestResourceWrapper(self.resource)
        self.test_wrapChildren()


    def test_childNotFound(self):
        """
        The request wrapper should pass C{NotFound} through.
        """
        wrapped = CompressingResourceWrapper(TestChildlessResource())
        result = self._locateChild(wrapped, ['foo'])
        self.assertEqual(result, NotFound)


    def test_deferredChild(self):
        """
        The wrapper should deal with a resource wrapped in a deferred returned
        from locateChild.
        """
        wrapped = CompressingResourceWrapper(TestDeferredResource())
        child, segments = self._locateChild(wrapped, ['foo'])
        self.assertEqual(type(child.underlying), TestDeferredResource)
        self.assertEqual(segments, [])


    def test_brokenChild(self):
        """
        C{ValueError} should be raised if the underlying C{locateChild} returns
        something bogus.
        """
        wrapped = CompressingResourceWrapper(TestBrokenResource())
        self.assertRaises(ValueError, self._locateChild, wrapped, ['foo'])


    def test_negotiation(self):
        """
        Request wrapping should only occur when the client has indicated they
        can accept compression.
        """
        self.assertFalse(self.wrapped.canCompress(self.request))

        self.request.requestHeaders.setRawHeaders(
            'accept-encoding', ['foo;q=1.0, bar;q=0.5, baz'])
        self.assertFalse(self.wrapped.canCompress(self.request))

        self.request.requestHeaders.setRawHeaders('accept-encoding', ['gzip'])
        self.assertTrue(self.wrapped.canCompress(self.request))

        self.request.requestHeaders.setRawHeaders(
            'accept-encoding', ['gzip;q=0.5'])
        self.assertTrue(self.wrapped.canCompress(self.request))

        self.request.requestHeaders.setRawHeaders(
            'accept-encoding', ['gzip;q=0'])
        self.assertFalse(self.wrapped.canCompress(self.request))