/usr/lib/python3/dist-packages/waitress/utilities.py is in python3-waitress 0.8.10-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | ##############################################################################
#
# Copyright (c) 2004 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Utility functions
"""
import asyncore
import errno
import logging
import os
import re
import stat
import time
import calendar
logger = logging.getLogger('waitress')
def find_double_newline(s):
"""Returns the position just after a double newline in the given string."""
pos1 = s.find(b'\n\r\n') # One kind of double newline
if pos1 >= 0:
pos1 += 3
pos2 = s.find(b'\n\n') # Another kind of double newline
if pos2 >= 0:
pos2 += 2
if pos1 >= 0:
if pos2 >= 0:
return min(pos1, pos2)
else:
return pos1
else:
return pos2
def concat(*args):
return ''.join(args)
def join(seq, field=' '):
return field.join(seq)
def group(s):
return '(' + s + ')'
short_days = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat']
long_days = ['sunday', 'monday', 'tuesday', 'wednesday',
'thursday', 'friday', 'saturday']
short_day_reg = group(join(short_days, '|'))
long_day_reg = group(join(long_days, '|'))
daymap = {}
for i in range(7):
daymap[short_days[i]] = i
daymap[long_days[i]] = i
hms_reg = join(3 * [group('[0-9][0-9]')], ':')
months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul',
'aug', 'sep', 'oct', 'nov', 'dec']
monmap = {}
for i in range(12):
monmap[months[i]] = i + 1
months_reg = group(join(months, '|'))
# From draft-ietf-http-v11-spec-07.txt/3.3.1
# Sun, 06 Nov 1994 08:49:37 GMT ; RFC 822, updated by RFC 1123
# Sunday, 06-Nov-94 08:49:37 GMT ; RFC 850, obsoleted by RFC 1036
# Sun Nov 6 08:49:37 1994 ; ANSI C's asctime() format
# rfc822 format
rfc822_date = join(
[concat(short_day_reg, ','), # day
group('[0-9][0-9]?'), # date
months_reg, # month
group('[0-9]+'), # year
hms_reg, # hour minute second
'gmt'
],
' '
)
rfc822_reg = re.compile(rfc822_date)
def unpack_rfc822(m):
g = m.group
return (
int(g(4)), # year
monmap[g(3)], # month
int(g(2)), # day
int(g(5)), # hour
int(g(6)), # minute
int(g(7)), # second
0,
0,
0,
)
# rfc850 format
rfc850_date = join(
[concat(long_day_reg, ','),
join(
[group('[0-9][0-9]?'),
months_reg,
group('[0-9]+')
],
'-'
),
hms_reg,
'gmt'
],
' '
)
rfc850_reg = re.compile(rfc850_date)
# they actually unpack the same way
def unpack_rfc850(m):
g = m.group
yr = g(4)
if len(yr) == 2:
yr = '19' + yr
return (
int(yr), # year
monmap[g(3)], # month
int(g(2)), # day
int(g(5)), # hour
int(g(6)), # minute
int(g(7)), # second
0,
0,
0
)
# parsdate.parsedate - ~700/sec.
# parse_http_date - ~1333/sec.
weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
monthname = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def build_http_date(when):
year, month, day, hh, mm, ss, wd, y, z = time.gmtime(when)
return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
weekdayname[wd],
day, monthname[month], year,
hh, mm, ss)
def parse_http_date(d):
d = d.lower()
m = rfc850_reg.match(d)
if m and m.end() == len(d):
retval = int(calendar.timegm(unpack_rfc850(m)))
else:
m = rfc822_reg.match(d)
if m and m.end() == len(d):
retval = int(calendar.timegm(unpack_rfc822(m)))
else:
return 0
return retval
class logging_dispatcher(asyncore.dispatcher):
logger = logger
def log_info(self, message, type='info'):
severity = {
'info': logging.INFO,
'warning': logging.WARN,
'error': logging.ERROR,
}
self.logger.log(severity.get(type, logging.INFO), message)
def cleanup_unix_socket(path):
try:
st = os.stat(path)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise # pragma: no cover
else:
if stat.S_ISSOCK(st.st_mode):
try:
os.remove(path)
except OSError: # pragma: no cover
# avoid race condition error during tests
pass
class Error(object):
def __init__(self, body):
self.body = body
class BadRequest(Error):
code = 400
reason = 'Bad Request'
class RequestHeaderFieldsTooLarge(BadRequest):
code = 431
reason = 'Request Header Fields Too Large'
class RequestEntityTooLarge(BadRequest):
code = 413
reason = 'Request Entity Too Large'
class InternalServerError(Error):
code = 500
reason = 'Internal Server Error'
|