/usr/lib/python2.7/dist-packages/openid/yadis/filters.py is in python-openid 2.2.5-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | """This module contains functions and classes used for extracting
endpoint information out of a Yadis XRD file using the ElementTree XML
parser.
"""
__all__ = [
'BasicServiceEndpoint',
'mkFilter',
'IFilter',
'TransformFilterMaker',
'CompoundFilter',
]
from openid.yadis.etxrd import expandService
class BasicServiceEndpoint(object):
"""Generic endpoint object that contains parsed service
information, as well as a reference to the service element from
which it was generated. If there is more than one xrd:Type or
xrd:URI in the xrd:Service, this object represents just one of
those pairs.
This object can be used as a filter, because it implements
fromBasicServiceEndpoint.
The simplest kind of filter you can write implements
fromBasicServiceEndpoint, which takes one of these objects.
"""
def __init__(self, yadis_url, type_uris, uri, service_element):
self.type_uris = type_uris
self.yadis_url = yadis_url
self.uri = uri
self.service_element = service_element
def matchTypes(self, type_uris):
"""Query this endpoint to see if it has any of the given type
URIs. This is useful for implementing other endpoint classes
that e.g. need to check for the presence of multiple versions
of a single protocol.
@param type_uris: The URIs that you wish to check
@type type_uris: iterable of str
@return: all types that are in both in type_uris and
self.type_uris
"""
return [uri for uri in type_uris if uri in self.type_uris]
def fromBasicServiceEndpoint(endpoint):
"""Trivial transform from a basic endpoint to itself. This
method exists to allow BasicServiceEndpoint to be used as a
filter.
If you are subclassing this object, re-implement this function.
@param endpoint: An instance of BasicServiceEndpoint
@return: The object that was passed in, with no processing.
"""
return endpoint
fromBasicServiceEndpoint = staticmethod(fromBasicServiceEndpoint)
class IFilter(object):
"""Interface for Yadis filter objects. Other filter-like things
are convertable to this class."""
def getServiceEndpoints(self, yadis_url, service_element):
"""Returns an iterator of endpoint objects"""
raise NotImplementedError
class TransformFilterMaker(object):
"""Take a list of basic filters and makes a filter that transforms
the basic filter into a top-level filter. This is mostly useful
for the implementation of mkFilter, which should only be needed
for special cases or internal use by this library.
This object is useful for creating simple filters for services
that use one URI and are specified by one Type (we expect most
Types will fit this paradigm).
Creates a BasicServiceEndpoint object and apply the filter
functions to it until one of them returns a value.
"""
def __init__(self, filter_functions):
"""Initialize the filter maker's state
@param filter_functions: The endpoint transformer functions to
apply to the basic endpoint. These are called in turn
until one of them does not return None, and the result of
that transformer is returned.
"""
self.filter_functions = filter_functions
def getServiceEndpoints(self, yadis_url, service_element):
"""Returns an iterator of endpoint objects produced by the
filter functions."""
endpoints = []
# Do an expansion of the service element by xrd:Type and xrd:URI
for type_uris, uri, _ in expandService(service_element):
# Create a basic endpoint object to represent this
# yadis_url, Service, Type, URI combination
endpoint = BasicServiceEndpoint(
yadis_url, type_uris, uri, service_element)
e = self.applyFilters(endpoint)
if e is not None:
endpoints.append(e)
return endpoints
def applyFilters(self, endpoint):
"""Apply filter functions to an endpoint until one of them
returns non-None."""
for filter_function in self.filter_functions:
e = filter_function(endpoint)
if e is not None:
# Once one of the filters has returned an
# endpoint, do not apply any more.
return e
return None
class CompoundFilter(object):
"""Create a new filter that applies a set of filters to an endpoint
and collects their results.
"""
def __init__(self, subfilters):
self.subfilters = subfilters
def getServiceEndpoints(self, yadis_url, service_element):
"""Generate all endpoint objects for all of the subfilters of
this filter and return their concatenation."""
endpoints = []
for subfilter in self.subfilters:
endpoints.extend(
subfilter.getServiceEndpoints(yadis_url, service_element))
return endpoints
# Exception raised when something is not able to be turned into a filter
filter_type_error = TypeError(
'Expected a filter, an endpoint, a callable or a list of any of these.')
def mkFilter(parts):
"""Convert a filter-convertable thing into a filter
@param parts: a filter, an endpoint, a callable, or a list of any of these.
"""
# Convert the parts into a list, and pass to mkCompoundFilter
if parts is None:
parts = [BasicServiceEndpoint]
try:
parts = list(parts)
except TypeError:
return mkCompoundFilter([parts])
else:
return mkCompoundFilter(parts)
def mkCompoundFilter(parts):
"""Create a filter out of a list of filter-like things
Used by mkFilter
@param parts: list of filter, endpoint, callable or list of any of these
"""
# Separate into a list of callables and a list of filter objects
transformers = []
filters = []
for subfilter in parts:
try:
subfilter = list(subfilter)
except TypeError:
# If it's not an iterable
if hasattr(subfilter, 'getServiceEndpoints'):
# It's a full filter
filters.append(subfilter)
elif hasattr(subfilter, 'fromBasicServiceEndpoint'):
# It's an endpoint object, so put its endpoint
# conversion attribute into the list of endpoint
# transformers
transformers.append(subfilter.fromBasicServiceEndpoint)
elif callable(subfilter):
# It's a simple callable, so add it to the list of
# endpoint transformers
transformers.append(subfilter)
else:
raise filter_type_error
else:
filters.append(mkCompoundFilter(subfilter))
if transformers:
filters.append(TransformFilterMaker(transformers))
if len(filters) == 1:
return filters[0]
else:
return CompoundFilter(filters)
|