/usr/share/pyshared/txaws/server/registry.py is in python-txaws 0.2.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | from txaws.server.exception import APIError
class Registry(object):
"""Register API L{Method}s. for handling specific actions and versions"""
def __init__(self):
self._by_action = {}
def add(self, method_class, action, version=None):
"""Add a method class to the regitry.
@param method_class: The method class to add
@param action: The action that the method class can handle
@param version: The version that the method class can handle
"""
by_version = self._by_action.setdefault(action, {})
if version in by_version:
raise RuntimeError("A method was already registered for action"
" %s in version %s" % (action, version))
by_version[version] = method_class
def check(self, action, version=None):
"""Check if the given action is supported in the given version.
@raises APIError: If there's no method class registered for handling
the given action or version.
"""
if action not in self._by_action:
raise APIError(400, "InvalidAction", "The action %s is not valid "
"for this web service." % action)
by_version = self._by_action[action]
if None not in by_version:
# There's no catch-all method, let's try the version-specific one
if version not in by_version:
raise APIError(400, "InvalidVersion", "Invalid API version.")
def get(self, action, version=None):
"""Get the method class handing the given action and version."""
by_version = self._by_action[action]
if version in by_version:
return by_version[version]
else:
return by_version[None]
def scan(self, module, onerror=None, ignore=None):
"""Scan the given module object for L{Method}s and register them."""
from venusian import Scanner
scanner = Scanner(registry=self)
kwargs = {"onerror": onerror, "categories": ["method"]}
if ignore is not None:
# Only pass it if specified, for backward compatibility
kwargs["ignore"] = ignore
scanner.scan(module, **kwargs)
|