/usr/lib/python3/dist-packages/willow/registry.py is in python3-willow 1.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | from collections import defaultdict
class UnrecognisedOperationError(LookupError):
"""
Raised when the operation isn't in any of the known image classes.
"""
pass
class UnavailableOperationError(LookupError):
"""
Raised when all the image classes the operation exists in are not available.
(most likely due to a missing image library.)
"""
pass
class UnroutableOperationError(LookupError):
"""
Raised when there is no way to convert the image into an image class that
supports the operation.
"""
pass
class WillowRegistry(object):
def __init__(self):
self._registered_image_classes = set()
self._unavailable_image_classes = dict()
self._registered_operations = defaultdict(dict)
self._registered_converters = dict()
self._registered_converter_costs = dict()
def register_operation(self, image_class, operation_name, func):
self._registered_operations[image_class][operation_name] = func
def register_converter(self, from_image_class, to_image_class, func, cost=None):
self._registered_converters[from_image_class, to_image_class] = func
if cost is not None:
self._registered_converter_costs[from_image_class, to_image_class] = cost
def register_image_class(self, image_class):
self._registered_image_classes.add(image_class)
# Check the image class
try:
image_class.check()
except Exception as e:
self._unavailable_image_classes[image_class] = e
# Find and register operations/converters
for attr in dir(image_class):
val = getattr(image_class, attr)
if hasattr(val, '_willow_operation'):
self.register_operation(image_class, val.__name__, val)
elif hasattr(val, '_willow_converter_to'):
self.register_converter(image_class, val._willow_converter_to[0], val, cost=val._willow_converter_to[1])
elif hasattr(val, '_willow_converter_from'):
for converter_from, cost in val._willow_converter_from:
self.register_converter(converter_from, image_class, val, cost=cost)
def register_plugin(self, plugin):
image_classes = getattr(plugin, 'willow_image_classes', [])
operations = getattr(plugin, 'willow_operations', [])
converters = getattr(plugin, 'willow_converters', [])
for image_class in image_classes:
self.register_image_class(image_class)
for operation in operations:
self.register_operation(operation[0], operation[1], operation[2])
for converter in converters:
self.register_converter(converter[0], converter[1], converter[2])
def get_operation(self, image_class, operation_name):
return self._registered_operations[image_class][operation_name]
def operation_exists(self, operation_name):
for image_class_operations in self._registered_operations.values():
if operation_name in image_class_operations:
return True
return False
def get_converter(self, from_image_class, to_image_class):
return self._registered_converters[from_image_class, to_image_class]
def get_converter_cost(self, from_image_class, to_image_class):
return self._registered_converter_costs.get((from_image_class, to_image_class), 100)
def get_image_classes(self, with_operation=None, available=None):
image_classes = self._registered_image_classes.copy()
if with_operation:
image_classes = set(filter(lambda image_class: image_class in self._registered_operations and with_operation in self._registered_operations[image_class], image_classes))
if not image_classes:
raise UnrecognisedOperationError("Could not find image class with the '{0}' operation".format(with_operation))
if available:
# Remove unavailable image classes
available_image_classes = image_classes - set(self._unavailable_image_classes.keys())
# Raise error if all image classes failed the check
if not available_image_classes:
raise UnavailableOperationError('\n'.join([
"The operation '{0}' is available in the following image classes but they all raised errors:".format(with_operation)
] + [
"{image_class_name}: {error_message}".format(
image_class_name=image_class.__name__,
error_message=str(self._unavailable_image_classes.get(image_class, "Unknown error"))
)
for image_class in image_classes
]))
return available_image_classes
else:
return image_classes
# Routing
# In some cases, it may not be possible to convert directly between two
# image classes, so we need to use one or more intermediate classes in order
# to get to where we want to be.
# For example, the OpenCV plugin doesn't load JPEG images, so the image
# needs to be loaded into either Pillow or Wand first and converted to
# OpenCV.
# Using a routing algorithm, we're able to work out the best path to take.
def get_converters_from(self, from_image_class):
"""
Yields a tuple for each image class that can be directly converted
from the specified image classes. The tuple contains the converter
function and the image class.
For example:
>>> list(registry.get_converters_from(Pillow))
[
(convert_pillow_to_wand, Wand),
(save_as_jpeg, JpegFile)
...
]
"""
for (c_from, c_to), converter in self._registered_converters.items():
if c_from is from_image_class:
yield converter, c_to
def find_all_paths(self, start, end, path=[], seen_classes=set()):
"""
Returns all paths between two image classes.
Each path is a list of tuples representing the steps to take in order to
convert to the new class. Each tuple contains two items: The converter
function to call and the class that step converts to.
The order of the paths returned is undefined.
For example:
>>> registry.find_all_paths(JpegFile, OpenCV)
[
[
(load_jpeg_into_pillow, Pillow),
(convert_pillow_to_opencv, OpenCV)
],
[
(load_jpeg_into_wand, Wand),
(convert_wand_to_opencv, OpenCV)
]
]
"""
# Implementation based on https://www.python.org/doc/essays/graphs/
if start == end:
return [path]
if start in seen_classes:
return []
if not start in self._registered_image_classes or start in self._unavailable_image_classes:
return []
paths = []
for converter, next_class in self.get_converters_from(start):
if next_class not in path:
newpaths = self.find_all_paths(
next_class, end,
path + [(converter, next_class)],
seen_classes.union({start}))
paths.extend(newpaths)
return paths
def get_path_cost(self, start, path):
"""
Costs up a path and returns the cost as an integer.
"""
last_class = start
total_cost = 0
for converter, next_class in path:
total_cost += self.get_converter_cost(last_class, next_class)
last_class = next_class
return total_cost
def find_shortest_path(self, start, end):
"""
Finds the shortest path between two image classes.
This is similar to the find_all_paths function, except it only returns
the path with the lowest cost.
"""
current_path = None
current_cost = None
for path in self.find_all_paths(start, end):
cost = self.get_path_cost(start, path)
if current_cost is None or cost < current_cost:
current_cost = cost
current_path = path
return current_path, current_cost
def find_closest_image_class(self, start, image_classes):
"""
Finds which of the specified image classes is the closest, based on the
sum of the costs for the conversions needed to convert the image into it.
"""
current_class = None
current_path = None
current_cost = None
for image_class in image_classes:
path, cost = self.find_shortest_path(start, image_class)
if current_cost is None or cost < current_cost:
current_class = image_class
current_cost = cost
current_path = path
return current_class, current_path, current_cost
def find_operation(self, from_class, operation_name):
"""
Finds an operation that can be used by an image in the specified from_class.
This function returns four values:
- The operation function
- The class which the operation is implemented on
- A path to convert the image into the correct class for the operation
- The total cost of all the conversions
The path (third value) is a list of two-element tuple. Each tuple contains
a function to call and a reference to the class that step converts to. See
below for an example.
How it works:
If the specified operation_name is implemented for from_class, that is returned
with an empty conversion path.
If the specified operation_name is implemented on another class (but not from_class)
that operation is returned with the conversion path to that new class.
If it's implemented on multiple image classes, the closest one is chosen (based
on the sum of the costs of each conversion step).
If the operation_name is not implemented anywhere, there is no route to
any image class that implements it or all the image classes that implement
it are unavailable, a LookupError will be raised.
Basic example:
>>> func, cls, path, cost = registry.find_operation(JPEGImageFile, 'resize')
>>> func
PillowImage.resize
>>> cls
PillowImage
>>> path
[
(PillowImage.open, PillowImage)
]
>>> cost
100
To run the found operation on an image, run each conversion function on that
image then run the operation function:
>>> image = Image.open(...)
>>> func, cls, path, cost = registry.find_operation(type(image), operation_name)
>>> for converter, new_class in path:
... image = converter(image)
...
>>> func(image, *args, **kwargs)
"""
try:
# Firstly, we check if the operation is implemented on from_class
func = self.get_operation(from_class, operation_name)
cls = from_class
path = []
cost = 0
except LookupError:
# Not implemented on the current class. Find the closest, available,
# routable class that has it instead
image_classes = self.get_image_classes(
with_operation=operation_name,
available=True)
# Choose an image class
# image_classes will always have a value here as get_image_classes raises
# LookupError if there are no image classes available.
cls, path, cost = self.find_closest_image_class(from_class, image_classes)
if path is None:
raise UnroutableOperationError(
"The operation '{0}' is available in the image class '{1}'"
" but it can't be converted to from '{2}'".format(
operation_name,
', '.join(image_class.__name__ for image_class in image_classes),
from_class.__name__
))
# Get the operation function
func = self.get_operation(cls, operation_name)
return func, cls, path, cost
registry = WillowRegistry()
|