This file is indexed.

/usr/lib/python3/dist-packages/pydbus/registration.py is in python3-pydbus 0.6.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from __future__ import print_function
import sys, traceback
from gi.repository import GLib, Gio
from . import generic
from .exitable import ExitableWithAliases
from functools import partial
from .method_call_context import MethodCallContext
import logging

try:
	from inspect import signature, Parameter
except:
	from ._inspect3 import signature, Parameter

class ObjectWrapper(ExitableWithAliases("unwrap")):
	__slots__ = ["object", "outargs", "readable_properties", "writable_properties"]

	def __init__(self, object, interfaces):
		self.object = object

		self.outargs = {}
		for iface in interfaces:
			for method in iface.methods:
				self.outargs[iface.name + "." + method.name] = [arg.signature for arg in method.out_args]

		self.readable_properties = {}
		self.writable_properties = {}
		for iface in interfaces:
			for prop in iface.properties:
				if prop.flags & Gio.DBusPropertyInfoFlags.READABLE:
					self.readable_properties[iface.name + "." + prop.name] = prop.signature
				if prop.flags & Gio.DBusPropertyInfoFlags.WRITABLE:
					self.writable_properties[iface.name + "." + prop.name] = prop.signature

		for iface in interfaces:
			for signal in iface.signals:
				s_name = signal.name
				def EmitSignal(iface, signal):
					return lambda *args: self.SignalEmitted(iface.name, signal.name, GLib.Variant("(" + "".join(s.signature for s in signal.args) + ")", args))
				self._at_exit(getattr(object, signal.name).connect(EmitSignal(iface, signal)).__exit__)

		if "org.freedesktop.DBus.Properties" not in (iface.name for iface in interfaces):
			try:
				def onPropertiesChanged(iface, changed, invalidated):
					changed = {key: GLib.Variant(self.readable_properties[iface + "." + key], val) for key, val in changed.items()}
					args = GLib.Variant("(sa{sv}as)", (iface, changed, invalidated))
					self.SignalEmitted("org.freedesktop.DBus.Properties", "PropertiesChanged", args)
				self._at_exit(object.PropertiesChanged.connect(onPropertiesChanged).__exit__)
			except AttributeError:
				pass

	SignalEmitted = generic.signal()

	def call_method(self, connection, sender, object_path, interface_name, method_name, parameters, invocation):
		try:
			try:
				outargs = self.outargs[interface_name + "." + method_name]
				method = getattr(self.object, method_name)
			except KeyError:
				if interface_name == "org.freedesktop.DBus.Properties":
					if method_name == "Get":
						method = self.Get
						outargs = ["v"]
					elif method_name == "GetAll":
						method = self.GetAll
						outargs = ["a{sv}"]
					elif method_name == "Set":
						method = self.Set
						outargs = []
					else:
						raise
				else:
					raise

			sig = signature(method)

			kwargs = {}
			if "dbus_context" in sig.parameters and sig.parameters["dbus_context"].kind in (Parameter.POSITIONAL_OR_KEYWORD, Parameter.KEYWORD_ONLY):
				kwargs["dbus_context"] = MethodCallContext(invocation)

			result = method(*parameters, **kwargs)

			if len(outargs) == 0:
				invocation.return_value(None)
			elif len(outargs) == 1:
				invocation.return_value(GLib.Variant("(" + "".join(outargs) + ")", (result,)))
			else:
				invocation.return_value(GLib.Variant("(" + "".join(outargs) + ")", result))

		except Exception as e:
			logger = logging.getLogger(__name__)
			logger.exception("Exception while handling %s.%s()", interface_name, method_name)

			#TODO Think of a better way to translate Python exception types to DBus error types.
			e_type = type(e).__name__
			if not "." in e_type:
				e_type = "unknown." + e_type
			invocation.return_dbus_error(e_type, str(e))

	def Get(self, interface_name, property_name):
		type = self.readable_properties[interface_name + "." + property_name]
		result = getattr(self.object, property_name)
		return GLib.Variant(type, result)

	def GetAll(self, interface_name):
		ret = {}
		for name, type in self.readable_properties.items():
			ns, local = name.rsplit(".", 1)
			if ns == interface_name:
				ret[local] = GLib.Variant(type, getattr(self.object, local))
		return ret

	def Set(self, interface_name, property_name, value):
		self.writable_properties[interface_name + "." + property_name]
		setattr(self.object, property_name, value)

class ObjectRegistration(ExitableWithAliases("unregister")):
	__slots__ = ()

	def __init__(self, bus, path, interfaces, wrapper, own_wrapper=False):
		if own_wrapper:
			self._at_exit(wrapper.__exit__)

		def func(interface_name, signal_name, parameters):
			bus.con.emit_signal(None, path, interface_name, signal_name, parameters)

		self._at_exit(wrapper.SignalEmitted.connect(func).__exit__)

		try:
			ids = [bus.con.register_object(path, interface, wrapper.call_method, None, None) for interface in interfaces]
		except TypeError as e:
			if str(e).startswith("argument vtable: Expected Gio.DBusInterfaceVTable"):
				raise Exception("GLib 2.46 is required to publish objects; it is impossible in older versions.")
			else:
				raise

		self._at_exit(lambda: [bus.con.unregister_object(id) for id in ids])

class RegistrationMixin:
	__slots__ = ()

	def register_object(self, path, object, node_info):
		if node_info is None:
			try:
				node_info = type(object).dbus
			except AttributeError:
				node_info = type(object).__doc__

		if type(node_info) != list and type(node_info) != tuple:
			node_info = [node_info]

		node_info = [Gio.DBusNodeInfo.new_for_xml(ni) for ni in node_info]
		interfaces = sum((ni.interfaces for ni in node_info), [])

		wrapper = ObjectWrapper(object, interfaces)
		return ObjectRegistration(self, path, interfaces, wrapper, own_wrapper=True)