/usr/lib/python2.7/dist-packages/autopilot/introspection/dbus.py is in python-autopilot 1.4.1+15.10.20150911-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 | # -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
#
# Autopilot Functional Test Tool
# Copyright (C) 2012-2013 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""This module contains the code to retrieve state via DBus calls.
Under normal circumstances, the only thing you should need to use from this
module is the DBusIntrospectableObject class.
"""
from __future__ import absolute_import
from contextlib import contextmanager
import sys
import logging
import re
import six
from uuid import uuid4
from autopilot.introspection.types import create_value_instance
from autopilot.introspection.utilities import translate_state_keys
from autopilot.utilities import (
get_debug_logger,
sleep,
Timer,
)
_object_registry = {}
_logger = logging.getLogger(__name__)
class StateNotFoundError(RuntimeError):
"""Raised when a piece of state information is not found.
This exception is commonly raised when the application has destroyed (or
not yet created) the object you are trying to access in autopilot. This
typically happens for a number of possible reasons:
* The UI widget you are trying to access with
:py:meth:`~DBusIntrospectionObject.select_single` or
:py:meth:`~DBusIntrospectionObject.wait_select_single` or
:py:meth:`~DBusIntrospectionObject.select_many` does not exist yet.
* The UI widget you are trying to access has been destroyed by the
application.
"""
def __init__(self, class_name=None, **filters):
"""Construct a StateNotFoundError.
:raises ValueError: if neither the class name not keyword arguments
are specified.
"""
if class_name is None and not filters:
raise ValueError("Must specify either class name or filters.")
if class_name is None:
self._message = \
u"Object not found with properties {}.".format(
repr(filters)
)
elif not filters:
self._message = u"Object not found with name '{}'.".format(
class_name
)
else:
self._message = \
u"Object not found with name '{}' and properties {}.".format(
class_name,
repr(filters)
)
def __str__(self):
if six.PY3:
return self._message
else:
return self._message.encode('utf8')
def __unicode__(self):
return self._message
class IntrospectableObjectMetaclass(type):
"""Metaclass to insert appropriate classes into the object registry."""
def __new__(cls, classname, bases, classdict):
"""Add class name to type registry."""
class_object = type.__new__(cls, classname, bases, classdict)
if classname in (
'ApplicationProxyObject',
'CustomEmulatorBase',
'DBusIntrospectionObject',
):
return class_object
if getattr(class_object, '_id', None) is not None:
if class_object._id in _object_registry:
_object_registry[class_object._id][classname] = class_object
else:
_object_registry[class_object._id] = {classname: class_object}
return class_object
def get_classname_from_path(object_path):
return object_path.split("/")[-1]
def _object_passes_filters(instance, **kwargs):
"""Return true if *instance* satisifies all the filters present in
kwargs."""
with instance.no_automatic_refreshing():
for attr, val in kwargs.items():
if not hasattr(instance, attr) or getattr(instance, attr) != val:
# Either attribute is not present, or is present but with
# the wrong value - don't add this instance to the results
# list.
return False
return True
DBusIntrospectionObjectBase = IntrospectableObjectMetaclass(
'DBusIntrospectionObjectBase',
(object,),
{}
)
class DBusIntrospectionObject(DBusIntrospectionObjectBase):
"""A class that supports transparent data retrieval from the application
under test.
This class is the base class for all objects retrieved from the application
under test. It handles transparently refreshing attribute values when
needed, and contains many methods to select child objects in the
introspection tree.
"""
def __init__(self, state_dict, path, backend):
self.__state = {}
self.__refresh_on_attribute = True
self._set_properties(state_dict)
self._path = path
self._poll_time = 10
self._backend = backend
def _set_properties(self, state_dict):
"""Creates and set attributes of *self* based on contents of
*state_dict*.
.. note:: Translates '-' to '_', so a key of 'icon-type' for example
becomes 'icon_type'.
"""
self.__state = {}
for key, value in translate_state_keys(state_dict).items():
# don't store id in state dictionary -make it a proper instance
# attribute
if key == 'id':
self.id = int(value[1])
try:
self.__state[key] = create_value_instance(value, self, key)
except ValueError as e:
_logger.warning(
"While constructing attribute '%s.%s': %s",
self.__class__.__name__,
key,
str(e)
)
def get_children_by_type(self, desired_type, **kwargs):
"""Get a list of children of the specified type.
Keyword arguments can be used to restrict returned instances. For
example::
get_children_by_type('Launcher', monitor=1)
will return only Launcher instances that have an attribute 'monitor'
that is equal to 1. The type can also be specified as a string, which
is useful if there is no emulator class specified::
get_children_by_type('Launcher', monitor=1)
Note however that if you pass a string, and there is an emulator class
defined, autopilot will not use it.
:param desired_type: Either a string naming the type you want, or a
class of the type you want (the latter is used when defining
custom emulators)
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
#TODO: if kwargs has exactly one item in it we should specify the
# restriction in the XPath query, so it gets processed in the Unity C++
# code rather than in Python.
instances = self.get_children()
result = []
for instance in instances:
# Skip items that are not instances of the desired type:
if isinstance(desired_type, six.string_types):
if instance.__class__.__name__ != desired_type:
continue
elif not isinstance(instance, desired_type):
continue
#skip instances that fail attribute check:
if _object_passes_filters(instance, **kwargs):
result.append(instance)
return result
def get_properties(self):
"""Returns a dictionary of all the properties on this class.
This can be useful when you want to log all the properties exported
from your application for a particular object. Every property in the
returned dictionary can be accessed as attributes of the object as
well.
"""
# Since we're grabbing __state directly there's no implied state
# refresh, so do it manually:
self.refresh_state()
props = self.__state.copy()
props['id'] = self.id
return props
def get_children(self):
"""Returns a list of all child objects.
This returns a list of all children. To return only children of a
specific type, use :meth:`get_children_by_type`. To get objects
further down the introspection tree (i.e.- nodes that may not
necessarily be immeadiate children), use :meth:`select_single` and
:meth:`select_many`.
"""
# Thomi: 2014-03-20: There used to be a call to 'self.refresh_state()'
# here. That's not needed, since the only thing we use is the proxy
# path, which isn't affected by the current state.
query = self.get_class_query_string() + "/*"
state_dicts = self.get_state_by_path(query)
children = [self.make_introspection_object(i) for i in state_dicts]
return children
def get_parent(self):
"""Returns the parent of this object.
If this object has no parent (i.e.- it is the root of the introspection
tree). Then it returns itself.
"""
query = self.get_class_query_string() + "/.."
parent_state_dicts = self.get_state_by_path(query)
parent = self.make_introspection_object(parent_state_dicts[0])
return parent
def select_single(self, type_name='*', **kwargs):
"""Get a single node from the introspection tree, with type equal to
*type_name* and (optionally) matching the keyword filters present in
*kwargs*.
You must specify either *type_name*, keyword filters or both.
This method searches recursively from the instance this method is
called on. Calling :meth:`select_single` on the application (root)
proxy object will search the entire tree. Calling
:meth:`select_single` on an object in the tree will only search it's
descendants.
Example usage::
app.select_single('QPushButton', objectName='clickme')
# returns a QPushButton whose 'objectName' property is 'clickme'.
If nothing is returned from the query, this method raises
StateNotFoundError.
:param type_name: Either a string naming the type you want, or a class
of the appropriate type (the latter case is for overridden emulator
classes).
:raises ValueError: if the query returns more than one item. *If
you want more than one item, use select_many instead*.
:raises TypeError: if neither *type_name* or keyword filters are
provided.
:raises StateNotFoundError: if the requested object was not found.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
instances = self.select_many(type_name, **kwargs)
if len(instances) > 1:
raise ValueError("More than one item was returned for query")
if not instances:
raise StateNotFoundError(type_name, **kwargs)
return instances[0]
def wait_select_single(self, type_name='*', **kwargs):
"""Get a proxy object matching some search criteria, retrying if no
object is found until a timeout is reached.
This method is identical to the :meth:`select_single` method, except
that this method will poll the application under test for 10 seconds
in the event that the search criteria does not match anything.
This method will return single proxy object from the introspection
tree, with type equal to *type_name* and (optionally) matching the
keyword filters present in *kwargs*.
You must specify either *type_name*, keyword filters or both.
This method searches recursively from the proxy object this method is
called on. Calling :meth:`select_single` on the application (root)
proxy object will search the entire tree. Calling
:meth:`select_single` on an object in the tree will only search it's
descendants.
Example usage::
app.wait_select_single('QPushButton', objectName='clickme')
# returns a QPushButton whose 'objectName' property is 'clickme'.
# will poll the application until such an object exists, or will
# raise StateNotFoundError after 10 seconds.
If nothing is returned from the query, this method raises
StateNotFoundError after 10 seconds.
:param type_name: Either a string naming the type you want, or a class
of the appropriate type (the latter case is for overridden emulator
classes).
:raises ValueError: if the query returns more than one item. *If
you want more than one item, use select_many instead*.
:raises TypeError: if neither *type_name* or keyword filters are
provided.
:raises StateNotFoundError: if the requested object was not found.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
for i in range(self._poll_time):
try:
return self.select_single(type_name, **kwargs)
except StateNotFoundError:
if i == self._poll_time - 1:
raise
sleep(1)
def select_many(self, type_name='*', **kwargs):
"""Get a list of nodes from the introspection tree, with type equal to
*type_name* and (optionally) matching the keyword filters present in
*kwargs*.
You must specify either *type_name*, keyword filters or both.
This method searches recursively from the instance this method is
called on. Calling :meth:`select_many` on the application (root) proxy
object will search the entire tree. Calling :meth:`select_many` on an
object in the tree will only search it's descendants.
Example Usage::
app.select_many('QPushButton', enabled=True)
# returns a list of QPushButtons that are enabled.
As mentioned above, this method searches the object tree recursively::
file_menu = app.select_one('QMenu', title='File')
file_menu.select_many('QAction')
# returns a list of QAction objects who appear below file_menu in
# the object tree.
.. warning::
The order in which objects are returned is not guaranteed. It is
bad practise to write tests that depend on the order in which
this method returns objects. (see :ref:`object_ordering` for more
information).
If you only want to get one item, use :meth:`select_single` instead.
:param type_name: Either a string naming the type you want, or a class
of the appropriate type (the latter case is for overridden emulator
classes).
:raises TypeError: if neither *type_name* or keyword filters are
provided.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
if not isinstance(type_name, str) and issubclass(
type_name, DBusIntrospectionObject):
type_name = type_name.__name__
if type_name == "*" and not kwargs:
raise TypeError("You must specify either a type name or a filter.")
_logger.debug(
"Selecting objects of %s with attributes: %r",
'any type' if type_name == '*' else 'type ' + type_name, kwargs)
server_side_filters = []
client_side_filters = {}
for k, v in kwargs.items():
# LP Bug 1209029: The XPathSelect protocol does not allow all valid
# node names or values. We need to decide here whether the filter
# parameters are going to work on the backend or not. If not, we
# just do the processing client-side. See the
# _is_valid_server_side_filter_param function (below) for the
# specific requirements.
if _is_valid_server_side_filter_param(k, v):
server_side_filters.append(
_get_filter_string_for_key_value_pair(k, v)
)
else:
client_side_filters[k] = v
filter_str = '[{}]'.format(','.join(server_side_filters)) \
if server_side_filters else ""
query_path = "%s//%s%s" % (
self.get_class_query_string(),
type_name,
filter_str
)
state_dicts = self.get_state_by_path(query_path)
instances = [self.make_introspection_object(i) for i in state_dicts]
return [i for i in instances
if _object_passes_filters(i, **client_side_filters)]
def refresh_state(self):
"""Refreshes the object's state.
You should probably never have to call this directly. Autopilot
automatically retrieves new state every time this object's attributes
are read.
:raises StateNotFound: if the object in the application under test
has been destroyed.
"""
_, new_state = self.get_new_state()
self._set_properties(new_state)
def get_all_instances(self):
"""Get all instances of this class that exist within the Application
state tree.
For example, to get all the LauncherIcon instances::
icons = LauncherIcon.get_all_instances()
.. warning::
Using this method is slow - it requires a complete scan of the
introspection tree. You should only use this when you're not sure
where the objects you are looking for are located. Depending on
the application you are testing, you may get duplicate results
using this method.
:return: List (possibly empty) of class instances.
"""
cls_name = type(self).__name__
instances = self.get_state_by_path("//%s" % (cls_name))
return [self.make_introspection_object(i) for i in instances]
def get_root_instance(self):
"""Get the object at the root of this tree.
This will return an object that represents the root of the
introspection tree.
"""
instances = self.get_state_by_path("/")
if len(instances) != 1:
_logger.error("Could not retrieve root object.")
return None
return self.make_introspection_object(instances[0])
def __getattr__(self, name):
# avoid recursion if for some reason we have no state set (should never
# happen).
if name == '__state':
raise AttributeError()
if name in self.__state:
if self.__refresh_on_attribute:
self.refresh_state()
return self.__state[name]
# attribute not found.
raise AttributeError(
"Class '%s' has no attribute '%s'." %
(self.__class__.__name__, name))
def get_state_by_path(self, piece):
"""Get state for a particular piece of the state tree.
You should probably never need to call this directly.
:param piece: an XPath-like query that specifies which bit of the tree
you want to look at.
:raises TypeError: on invalid *piece* parameter.
"""
if not isinstance(piece, six.string_types):
raise TypeError(
"XPath query must be a string, not %r", type(piece))
with Timer("GetState %s" % piece):
data = self._backend.introspection_iface.GetState(piece)
if len(data) > 15:
_logger.warning(
"Your query '%s' returned a lot of data (%d items). This "
"is likely to be slow. You may want to consider optimising"
" your query to return fewer items.",
piece,
len(data)
)
return data
def get_new_state(self):
"""Retrieve a new state dictionary for this class instance.
You should probably never need to call this directly.
.. note:: The state keys in the returned dictionary are not translated.
"""
try:
return self.get_state_by_path(self.get_class_query_string())[0]
except IndexError:
raise StateNotFoundError(self.__class__.__name__, id=self.id)
def wait_until_destroyed(self, timeout=10):
"""Block until this object is destroyed in the application.
Block until the object this instance is a proxy for has been destroyed
in the applicaiton under test. This is commonly used to wait until a
UI component has been destroyed.
:param timeout: The number of seconds to wait for the object to be
destroyed. If not specified, defaults to 10 seconds.
:raises RuntimeError: if the method timed out.
"""
for i in range(timeout):
try:
self.get_new_state()
sleep(1)
except StateNotFoundError:
return
else:
raise RuntimeError(
"Object was not destroyed after %d seconds" % timeout
)
def get_class_query_string(self):
"""Get the XPath query string required to refresh this class's
state."""
if not self._path.startswith('/'):
return "//" + self._path + "[id=%d]" % self.id
else:
return self._path + "[id=%d]" % self.id
def make_introspection_object(self, dbus_tuple):
"""Make an introspection object given a DBus tuple of
(path, state_dict).
This only works for classes that derive from DBusIntrospectionObject.
:returns: A proxy object that derives from DBusIntrospectionObject
:raises ValueError: if more than one class is appropriate for this
dbus_tuple
"""
path, state = dbus_tuple
class_object = _get_proxy_object_class(
_object_registry[self._id], type(self), path, state)
return class_object(state, path, self._backend)
def print_tree(self, output=None, maxdepth=None, _curdepth=0):
"""Print properties of the object and its children to a stream.
When writing new tests, this can be called when it is too difficult to
find the widget or property that you are interested in in "vis".
.. warning:: Do not use this in production tests, this is expensive and
not at all appropriate for actual testing. Only call this
temporarily and replace with proper select_single/select_many
calls.
:param output: A file object or path name where the output will be
written to. If not given, write to stdout.
:param maxdepth: If given, limit the maximum recursion level to that
number, i. e. only print children which have at most maxdepth-1
intermediate parents.
"""
if maxdepth is not None and _curdepth > maxdepth:
return
indent = " " * _curdepth
if output is None:
output = sys.stdout
elif isinstance(output, six.string_types):
output = open(output, 'w')
# print path
if _curdepth > 0:
output.write("\n")
output.write("%s== %s ==\n" % (indent, self._path))
# Thomi 2014-03-20: For all levels other than the top level, we can
# avoid an entire dbus round trip if we grab the underlying property
# dictionary directly. We can do this since the print_tree function
# that called us will have retrieved us via a call to get_children(),
# which gets the latest state anyway.
if _curdepth > 0:
properties = self.__state.copy()
else:
properties = self.get_properties()
# print properties
try:
for key in sorted(properties.keys()):
output.write("%s%s: %r\n" % (indent, key, properties[key]))
# print children
if maxdepth is None or _curdepth < maxdepth:
for c in self.get_children():
c.print_tree(output, maxdepth, _curdepth + 1)
except StateNotFoundError as error:
output.write("%sError: %s\n" % (indent, error))
@contextmanager
def no_automatic_refreshing(self):
"""Context manager function to disable automatic DBus refreshing when
retrieving attributes.
Example usage:
with instance.no_automatic_refreshing():
# access lots of attributes.
This can be useful if you need to check lots of attributes in a tight
loop, or if you want to atomicaly check several attributes at once.
"""
try:
self.__refresh_on_attribute = False
yield
finally:
self.__refresh_on_attribute = True
@classmethod
def validate_dbus_object(cls, path, _state):
"""Return whether this class is the appropriate proxy object class for
a given dbus path and state.
The default version matches the name of the dbus object and the class.
Subclasses of CustomProxyObject can override it to define a different
validation method.
:param path: The dbus path of the object to check
:param state: The dbus state dict of the object to check
(ignored in default implementation)
:returns: Whether this class is appropriate for the dbus object
"""
name = get_classname_from_path(path)
return cls.__name__ == name
def _is_valid_server_side_filter_param(key, value):
"""Return True if the key and value parameters are valid for server-side
processing.
"""
key_is_valid = re.match(
r'^[a-zA-Z0-9_\-]+( [a-zA-Z0-9_\-])*$',
key
) is not None
if type(value) == int:
return key_is_valid and (-2**31 <= value <= 2**31 - 1)
elif type(value) == bool:
return key_is_valid
elif type(value) == six.binary_type:
return key_is_valid
elif type(value) == six.text_type:
try:
value.encode('ascii')
return key_is_valid
except UnicodeEncodeError:
pass
return False
def _get_filter_string_for_key_value_pair(key, value):
"""Return a string representing the filter query for this key/value pair.
The value must be suitable for server-side filtering. Raises ValueError if
this is not the case.
"""
if isinstance(value, six.text_type):
if six.PY3:
escaped_value = value.encode("unicode_escape")\
.decode('ASCII')\
.replace("'", "\\'")
else:
escaped_value = value.encode('utf-8').encode("string_escape")
# note: string_escape codec escapes "'" but not '"'...
escaped_value = escaped_value.replace('"', r'\"')
return '{}="{}"'.format(key, escaped_value)
elif isinstance(value, six.binary_type):
if six.PY3:
escaped_value = value.decode('utf-8')\
.encode("unicode_escape")\
.decode('ASCII')\
.replace("'", "\\'")
else:
escaped_value = value.encode("string_escape")
# note: string_escape codec escapes "'" but not '"'...
escaped_value = escaped_value.replace('"', r'\"')
return '{}="{}"'.format(key, escaped_value)
elif isinstance(value, int) or isinstance(value, bool):
return "{}={}".format(key, repr(value))
else:
raise ValueError("Unsupported value type: {}".format(type(value)))
def _get_proxy_object_class(proxy_class_dict, default_class, path, state):
"""Return a custom proxy class, either from the list or the default.
Use helper functions to check the class list or return the default.
:param proxy_class_dict: dict of proxy classes to try
:param default_class: default class to use if nothing in dict matches
:param path: dbus path
:param state: dbus state
:returns: appropriate custom proxy class
:raises ValueError: if more than one class in the dict matches
"""
class_type = _try_custom_proxy_classes(proxy_class_dict, path, state)
if class_type:
return class_type
return _get_default_proxy_class(default_class,
get_classname_from_path(path))
def _try_custom_proxy_classes(proxy_class_dict, path, state):
"""Identify which custom proxy class matches the dbus path and state.
If more than one class in proxy_class_dict matches, raise an exception.
:param proxy_class_dict: dict of proxy classes to try
:param path: dbus path
:param state: dbus state dict
:returns: matching custom proxy class
:raises ValueError: if more than one class matches
"""
possible_classes = [c for c in proxy_class_dict.values() if
c.validate_dbus_object(path, state)]
if len(possible_classes) > 1:
raise ValueError(
'More than one custom proxy class matches this object: '
'Matching classes are: %s. State is %s. Path is %s.'
','.join([repr(c) for c in possible_classes]),
repr(state),
path)
if len(possible_classes) == 1:
return possible_classes[0]
return None
def _get_default_proxy_class(default_class, name):
"""Return a custom proxy object class of the default or a base class.
We want the object to inherit from CustomEmulatorBase, not the object
class that is doing the selecting.
:param default_class: default class to use if no bases match
:param name: name of new class
:returns: custom proxy object class
"""
get_debug_logger().warning(
"Generating introspection instance for type '%s' based on generic "
"class.", name)
for base in default_class.__bases__:
if issubclass(base, CustomEmulatorBase):
base_class = base
break
else:
base_class = default_class
return type(str(name), (base_class,), {})
class _CustomEmulatorMeta(IntrospectableObjectMetaclass):
def __new__(cls, name, bases, d):
# only consider classes derived from CustomEmulatorBase
if name != 'CustomEmulatorBase':
# and only if they don't already have an Id set.
have_id = False
for base in bases:
if hasattr(base, '_id'):
have_id = True
break
if not have_id:
d['_id'] = uuid4()
return super(_CustomEmulatorMeta, cls).__new__(cls, name, bases, d)
CustomEmulatorBase = _CustomEmulatorMeta('CustomEmulatorBase',
(DBusIntrospectionObject, ),
{})
CustomEmulatorBase.__doc__ = \
"""This class must be used as a base class for any custom emulators defined
within a test case.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
Information on how to write custom emulators.
"""
|