/usr/lib/python3/dist-packages/aiohttp/client_proto.py is in python3-aiohttp 3.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | import asyncio
import asyncio.streams
from contextlib import suppress
from .client_exceptions import (ClientOSError, ClientPayloadError,
ServerDisconnectedError)
from .http import HttpResponseParser
from .streams import EMPTY_PAYLOAD, DataQueue
class ResponseHandler(DataQueue, asyncio.streams.FlowControlMixin):
"""Helper class to adapt between Protocol and StreamReader."""
def __init__(self, *, loop=None):
asyncio.streams.FlowControlMixin.__init__(self, loop=loop)
DataQueue.__init__(self, loop=loop)
self.transport = None
self._should_close = False
self._message = None
self._payload = None
self._skip_payload = False
self._payload_parser = None
self._reading_paused = False
self._timer = None
self._tail = b''
self._upgraded = False
self._parser = None
@property
def upgraded(self):
return self._upgraded
@property
def should_close(self):
if (self._payload is not None and
not self._payload.is_eof() or self._upgraded):
return True
return (self._should_close or self._upgraded or
self.exception() is not None or
self._payload_parser is not None or
len(self) or self._tail)
def close(self):
transport = self.transport
if transport is not None:
transport.close()
self.transport = None
self._payload = None
return transport
def is_connected(self):
return self.transport is not None
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exc):
if self._payload_parser is not None:
with suppress(Exception):
self._payload_parser.feed_eof()
try:
uncompleted = self._parser.feed_eof()
except Exception:
uncompleted = None
if self._payload is not None:
self._payload.set_exception(
ClientPayloadError('Response payload is not completed'))
if not self.is_eof():
if isinstance(exc, OSError):
exc = ClientOSError(*exc.args)
if exc is None:
exc = ServerDisconnectedError(uncompleted)
# assigns self._should_close to True as side effect,
# we do it anyway below
self.set_exception(exc)
self.transport = None
self._should_close = True
self._parser = None
self._message = None
self._payload = None
self._payload_parser = None
self._reading_paused = False
super().connection_lost(exc)
def eof_received(self):
pass
def pause_reading(self):
if not self._reading_paused:
try:
self.transport.pause_reading()
except (AttributeError, NotImplementedError, RuntimeError):
pass
self._reading_paused = True
def resume_reading(self):
if self._reading_paused:
try:
self.transport.resume_reading()
except (AttributeError, NotImplementedError, RuntimeError):
pass
self._reading_paused = False
def set_exception(self, exc):
self._should_close = True
super().set_exception(exc)
def set_parser(self, parser, payload):
self._payload = payload
self._payload_parser = parser
if self._tail:
data, self._tail = self._tail, b''
self.data_received(data)
def set_response_params(self, *, timer=None,
skip_payload=False,
read_until_eof=False,
auto_decompress=True):
self._skip_payload = skip_payload
self._parser = HttpResponseParser(
self, self._loop, timer=timer,
payload_exception=ClientPayloadError,
read_until_eof=read_until_eof,
auto_decompress=auto_decompress)
if self._tail:
data, self._tail = self._tail, b''
self.data_received(data)
def data_received(self, data):
if not data:
return
# custom payload parser
if self._payload_parser is not None:
eof, tail = self._payload_parser.feed_data(data)
if eof:
self._payload = None
self._payload_parser = None
if tail:
self.data_received(tail)
return
else:
if self._upgraded or self._parser is None:
# i.e. websocket connection, websocket parser is not set yet
self._tail += data
else:
# parse http messages
try:
messages, upgraded, tail = self._parser.feed_data(data)
except BaseException as exc:
self.transport.close()
# should_close is True after the call
self.set_exception(exc)
return
self._upgraded = upgraded
for message, payload in messages:
if message.should_close:
self._should_close = True
self._message = message
self._payload = payload
if self._skip_payload or message.code in (204, 304):
self.feed_data((message, EMPTY_PAYLOAD), 0)
else:
self.feed_data((message, payload), 0)
if tail:
if upgraded:
self.data_received(tail)
else:
self._tail = tail
|