/usr/lib/python3/dist-packages/wsme/protocol.py is in python3-wsme 0.9.2-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | import weakref
import pkg_resources
from wsme.exc import ClientSideError
__all__ = [
'CallContext',
'register_protocol', 'getprotocol',
]
registered_protocols = {}
def _cfg(f):
cfg = getattr(f, '_cfg', None)
if cfg is None:
f._cfg = cfg = {}
return cfg
class expose(object):
def __init__(self, path, content_type):
self.path = path
self.content_type = content_type
def __call__(self, func):
func.exposed = True
cfg = _cfg(func)
cfg['content-type'] = self.content_type
cfg.setdefault('paths', []).append(self.path)
return func
class CallContext(object):
def __init__(self, request):
self._request = weakref.ref(request)
self.path = None
self.func = None
self.funcdef = None
@property
def request(self):
return self._request()
class ObjectDict(object):
def __init__(self, obj):
self.obj = obj
def __getitem__(self, name):
return getattr(self.obj, name)
class Protocol(object):
name = None
displayname = None
content_types = []
def resolve_path(self, path):
if '$' in path:
from string import Template
s = Template(path)
path = s.substitute(ObjectDict(self))
return path
def iter_routes(self):
for attrname in dir(self):
attr = getattr(self, attrname)
if getattr(attr, 'exposed', False):
for path in _cfg(attr)['paths']:
yield self.resolve_path(path), attr
def accept(self, request):
return request.headers.get('Content-Type') in self.content_types
def iter_calls(self, request):
pass
def extract_path(self, context):
pass
def read_arguments(self, context):
pass
def encode_result(self, context, result):
pass
def encode_sample_value(self, datatype, value, format=False):
return ('none', 'N/A')
def encode_sample_params(self, params, format=False):
return ('none', 'N/A')
def encode_sample_result(self, datatype, value, format=False):
return ('none', 'N/A')
def register_protocol(protocol):
registered_protocols[protocol.name] = protocol
def getprotocol(name, **options):
protocol_class = registered_protocols.get(name)
if protocol_class is None:
for entry_point in pkg_resources.iter_entry_points(
'wsme.protocols', name):
if entry_point.name == name:
protocol_class = entry_point.load()
if protocol_class is None:
raise ValueError("Cannot find protocol '%s'" % name)
registered_protocols[name] = protocol_class
return protocol_class(**options)
def media_type_accept(request, content_types):
"""Validate media types against request.method.
When request.method is GET or HEAD compare with the Accept header.
When request.method is POST, PUT or PATCH compare with the Content-Type
header.
When request.method is DELETE media type is irrelevant, so return True.
"""
if request.method in ['GET', 'HEAD']:
if request.accept:
if request.accept.best_match(content_types):
return True
error_message = ('Unacceptable Accept type: %s not in %s'
% (request.accept, content_types))
raise ClientSideError(error_message, status_code=406)
elif request.method in ['PUT', 'POST', 'PATCH']:
content_type = request.headers.get('Content-Type')
if content_type:
for ct in content_types:
if request.headers.get('Content-Type', '').startswith(ct):
return True
error_message = ('Unacceptable Content-Type: %s not in %s'
% (content_type, content_types))
raise ClientSideError(error_message, status_code=415)
else:
raise ClientSideError('missing Content-Type header')
elif request.method in ['DELETE']:
return True
return False
|