/usr/share/apt-xapian-index/update-apt-xapian-index-dbus is in apt-xapian-index 0.47ubuntu8.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | #! /usr/bin/python3
import logging
import os
import string
import subprocess
try:
import glib
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
except ImportError as e:
import sys
sys.stderr.write("Failed to import '%s', can not use dbus" % e)
sys.exit(1)
class PermissionDeniedError(dbus.DBusException):
" permission denied by policy "
pass
class AptXapianIndexDBusService(dbus.service.Object):
DBUS_INTERFACE_NAME = "org.debian.AptXapianIndex"
def __init__(self):
bus_name = dbus.service.BusName(self.DBUS_INTERFACE_NAME,
bus=dbus.SystemBus())
dbus.service.Object.__init__(self, bus_name, '/')
self._active_axi = None
def _authWithPolicyKit(self, sender, connection, priv):
system_bus = dbus.SystemBus()
obj = system_bus.get_object("org.freedesktop.PolicyKit1",
"/org/freedesktop/PolicyKit1/Authority",
"org.freedesktop.PolicyKit1.Authority")
policykit = dbus.Interface(obj, "org.freedesktop.PolicyKit1.Authority")
subject = ('system-bus-name',
{ 'name': dbus.String(sender, variant_level = 1) }
)
details = { '' : '' }
flags = dbus.UInt32(1) # AllowUserInteraction = 0x00000001
cancel_id = ''
(ok, notused, details) = policykit.CheckAuthorization(
subject, priv, details, flags, cancel_id)
return ok
@dbus.service.signal(dbus_interface=DBUS_INTERFACE_NAME,
signature="b")
def UpdateFinished(self, res):
logging.debug("Emitting UpdateFinished: %s" % res)
@dbus.service.signal(dbus_interface=DBUS_INTERFACE_NAME,
signature="i")
def UpdateProgress(self, percent):
logging.debug("Emitting UpdateProgress: %s" % percent)
def _update_apt_xapian_index(self, cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
self._active_axi = p
while True:
while gobject.main_context_default().pending():
gobject.main_context_default().iteration()
res = p.poll()
if res is not None:
break
line = p.stdout.readline().strip()
if not line:
continue
try:
(op, progress) = string.split(line, sep=":", maxsplit=1)
if op == "progress":
percent = int(progress.split("/")[0])
self.UpdateProgress(percent)
except ValueError:
pass
# axi finished
self._active_axi = None
# emit finish signal
self.UpdateFinished(res == 0)
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='bb',
out_signature='',
sender_keyword='sender',
connection_keyword='conn')
def update_async(self, force, update_only, sender=None, conn=None):
if not self._authWithPolicyKit(sender, conn,
"org.debian.aptxapianindex.update"):
raise PermissionDeniedError("Permission denied by policy")
# do not start update-apt-xapian-index twice, the clients will
# get the finished signal from the previous running one
if self._active_axi:
return
cmd = ["/usr/sbin/update-apt-xapian-index", "--batch-mode"]
if force:
cmd.append("--force")
if update_only:
cmd.append("--update")
glib.timeout_add(100, self._update_apt_xapian_index, cmd)
if __name__ == "__main__":
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
server = AptXapianIndexDBusService()
gobject.MainLoop().run()
|