/usr/lib/python3/dist-packages/waitress/channel.py is in python3-waitress 0.8.10-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 | ##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import asyncore
import socket
import threading
import time
import traceback
from waitress.buffers import (
OverflowableBuffer,
ReadOnlyFileBasedBuffer,
)
from waitress.parser import HTTPRequestParser
from waitress.task import (
ErrorTask,
WSGITask,
)
from waitress.utilities import (
logging_dispatcher,
InternalServerError,
)
class HTTPChannel(logging_dispatcher, object):
"""
Setting self.requests = [somerequest] prevents more requests from being
received until the out buffers have been flushed.
Setting self.requests = [] allows more requests to be received.
"""
task_class = WSGITask
error_task_class = ErrorTask
parser_class = HTTPRequestParser
request = None # A request parser instance
last_activity = 0 # Time of last activity
will_close = False # set to True to close the socket.
close_when_flushed = False # set to True to close the socket when flushed
requests = () # currently pending requests
sent_continue = False # used as a latch after sending 100 continue
force_flush = False # indicates a need to flush the outbuf
#
# ASYNCHRONOUS METHODS (including __init__)
#
def __init__(
self,
server,
sock,
addr,
adj,
map=None,
):
self.server = server
self.adj = adj
self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
self.creation_time = self.last_activity = time.time()
# task_lock used to push/pop requests
self.task_lock = threading.Lock()
# outbuf_lock used to access any outbuf
self.outbuf_lock = threading.Lock()
asyncore.dispatcher.__init__(self, sock, map=map)
# Don't let asyncore.dispatcher throttle self.addr on us.
self.addr = addr
def any_outbuf_has_data(self):
for outbuf in self.outbufs:
if bool(outbuf):
return True
return False
def total_outbufs_len(self):
# genexpr == more funccalls
# use b.__len__ rather than len(b) FBO of not getting OverflowError
# on Python 2
return sum([b.__len__() for b in self.outbufs])
def writable(self):
# if there's data in the out buffer or we've been instructed to close
# the channel (possibly by our server maintenance logic), run
# handle_write
return self.any_outbuf_has_data() or self.will_close
def handle_write(self):
# Precondition: there's data in the out buffer to be sent, or
# there's a pending will_close request
if not self.connected:
# we dont want to close the channel twice
return
# try to flush any pending output
if not self.requests:
# 1. There are no running tasks, so we don't need to try to lock
# the outbuf before sending
# 2. The data in the out buffer should be sent as soon as possible
# because it's either data left over from task output
# or a 100 Continue line sent within "received".
flush = self._flush_some
elif self.force_flush:
# 1. There's a running task, so we need to try to lock
# the outbuf before sending
# 2. This is the last chunk sent by the Nth of M tasks in a
# sequence on this channel, so flush it regardless of whether
# it's >= self.adj.send_bytes. We need to do this now, or it
# won't get done.
flush = self._flush_some_if_lockable
self.force_flush = False
elif (self.total_outbufs_len() >= self.adj.send_bytes):
# 1. There's a running task, so we need to try to lock
# the outbuf before sending
# 2. Only try to send if the data in the out buffer is larger
# than self.adj_bytes to avoid TCP fragmentation
flush = self._flush_some_if_lockable
else:
# 1. There's not enough data in the out buffer to bother to send
# right now.
flush = None
if flush:
try:
flush()
except socket.error:
if self.adj.log_socket_errors:
self.logger.exception('Socket error')
self.will_close = True
except:
self.logger.exception('Unexpected exception when flushing')
self.will_close = True
if self.close_when_flushed and not self.any_outbuf_has_data():
self.close_when_flushed = False
self.will_close = True
if self.will_close:
self.handle_close()
def readable(self):
# We might want to create a new task. We can only do this if:
# 1. We're not already about to close the connection.
# 2. There's no already currently running task(s).
# 3. There's no data in the output buffer that needs to be sent
# before we potentially create a new task.
return not (self.will_close or self.requests or
self.any_outbuf_has_data())
def handle_read(self):
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
if self.adj.log_socket_errors:
self.logger.exception('Socket error')
self.handle_close()
return
if data:
self.last_activity = time.time()
self.received(data)
def received(self, data):
"""
Receives input asynchronously and assigns one or more requests to the
channel.
"""
# Preconditions: there's no task(s) already running
request = self.request
requests = []
if not data:
return False
while data:
if request is None:
request = self.parser_class(self.adj)
n = request.received(data)
if request.expect_continue and request.headers_finished:
# guaranteed by parser to be a 1.1 request
request.expect_continue = False
if not self.sent_continue:
# there's no current task, so we don't need to try to
# lock the outbuf to append to it.
self.outbufs[-1].append(b'HTTP/1.1 100 Continue\r\n\r\n')
self.sent_continue = True
self._flush_some()
request.completed = False
if request.completed:
# The request (with the body) is ready to use.
self.request = None
if not request.empty:
requests.append(request)
request = None
else:
self.request = request
if n >= len(data):
break
data = data[n:]
if requests:
self.requests = requests
self.server.add_task(self)
return True
def _flush_some_if_lockable(self):
# Since our task may be appending to the outbuf, we try to acquire
# the lock, but we don't block if we can't.
locked = self.outbuf_lock.acquire(False)
if locked:
try:
self._flush_some()
finally:
self.outbuf_lock.release()
def _flush_some(self):
# Send as much data as possible to our client
sent = 0
dobreak = False
while True:
outbuf = self.outbufs[0]
# use outbuf.__len__ rather than len(outbuf) FBO of not getting
# OverflowError on Python 2
outbuflen = outbuf.__len__()
if outbuflen <= 0:
# self.outbufs[-1] must always be a writable outbuf
if len(self.outbufs) > 1:
toclose = self.outbufs.pop(0)
try:
toclose.close()
except:
self.logger.exception(
'Unexpected error when closing an outbuf')
continue # pragma: no cover (coverage bug, it is hit)
else:
dobreak = True
while outbuflen > 0:
chunk = outbuf.get(self.adj.send_bytes)
num_sent = self.send(chunk)
if num_sent:
outbuf.skip(num_sent, True)
outbuflen -= num_sent
sent += num_sent
else:
dobreak = True
break
if dobreak:
break
if sent:
self.last_activity = time.time()
return True
return False
def handle_close(self):
for outbuf in self.outbufs:
try:
outbuf.close()
except:
self.logger.exception(
'Unknown exception while trying to close outbuf')
self.connected = False
asyncore.dispatcher.close(self)
def add_channel(self, map=None):
"""See asyncore.dispatcher
This hook keeps track of opened channels.
"""
asyncore.dispatcher.add_channel(self, map)
self.server.active_channels[self._fileno] = self
def del_channel(self, map=None):
"""See asyncore.dispatcher
This hook keeps track of closed channels.
"""
fd = self._fileno # next line sets this to None
asyncore.dispatcher.del_channel(self, map)
ac = self.server.active_channels
if fd in ac:
del ac[fd]
#
# SYNCHRONOUS METHODS
#
def write_soon(self, data):
if data:
# the async mainloop might be popping data off outbuf; we can
# block here waiting for it because we're in a task thread
with self.outbuf_lock:
if data.__class__ is ReadOnlyFileBasedBuffer:
# they used wsgi.file_wrapper
self.outbufs.append(data)
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
else:
self.outbufs[-1].append(data)
# XXX We might eventually need to pull the trigger here (to
# instruct select to stop blocking), but it slows things down so
# much that I'll hold off for now; "server push" on otherwise
# unbusy systems may suffer.
return len(data)
return 0
def service(self):
"""Execute all pending requests """
with self.task_lock:
while self.requests:
request = self.requests[0]
if request.error:
task = self.error_task_class(self, request)
else:
task = self.task_class(self, request)
try:
task.service()
except:
self.logger.exception('Exception when serving %s' %
task.request.path)
if not task.wrote_header:
if self.adj.expose_tracebacks:
body = traceback.format_exc()
else:
body = ('The server encountered an unexpected '
'internal server error')
req_version = request.version
req_headers = request.headers
request = self.parser_class(self.adj)
request.error = InternalServerError(body)
# copy some original request attributes to fulfill
# HTTP 1.1 requirements
request.version = req_version
try:
request.headers['CONNECTION'] = req_headers[
'CONNECTION']
except KeyError:
pass
task = self.error_task_class(self, request)
task.service() # must not fail
else:
task.close_on_finish = True
# we cannot allow self.requests to drop to empty til
# here; otherwise the mainloop gets confused
if task.close_on_finish:
self.close_when_flushed = True
for request in self.requests:
request.close()
self.requests = []
else:
request = self.requests.pop(0)
request.close()
self.force_flush = True
self.server.pull_trigger()
self.last_activity = time.time()
def cancel(self):
""" Cancels all pending requests """
self.force_flush = True
self.last_activity = time.time()
self.requests = []
def defer(self):
pass
|