/usr/lib/python3/dist-packages/secretstorage/util.py is in python3-secretstorage 2.3.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | # SecretStorage module for Python
# Access passwords using the SecretService DBus API
# Author: Dmitry Shachnev, 2013
# License: BSD
"""This module provides some utility functions, but these shouldn't
normally be used by external applications."""
import dbus
import os
from secretstorage.defines import DBUS_UNKNOWN_METHOD, DBUS_NO_SUCH_OBJECT, \
DBUS_SERVICE_UNKNOWN, DBUS_NO_REPLY, DBUS_NOT_SUPPORTED, DBUS_EXEC_FAILED, \
SS_PATH, SS_PREFIX, ALGORITHM_DH, ALGORITHM_PLAIN
from secretstorage.dhcrypto import Session, int_to_bytes
from secretstorage.exceptions import ItemNotFoundException, \
SecretServiceNotAvailableException
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.utils import int_from_bytes
BUS_NAME = 'org.freedesktop.secrets'
SERVICE_IFACE = SS_PREFIX + 'Service'
class InterfaceWrapper(dbus.Interface):
"""Wraps :cls:`dbus.Interface` class and replaces some D-Bus exceptions
with :doc:`SecretStorage exceptions <exceptions>`."""
def catch_errors(self, function_in):
def function_out(*args, **kwargs):
try:
return function_in(*args, **kwargs)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() == DBUS_UNKNOWN_METHOD:
raise ItemNotFoundException('Item does not exist!')
if e.get_dbus_name() == DBUS_NO_SUCH_OBJECT:
raise ItemNotFoundException(e.get_dbus_message())
if e.get_dbus_name() in (DBUS_NO_REPLY, DBUS_NOT_SUPPORTED):
raise SecretServiceNotAvailableException(
e.get_dbus_message())
raise
return function_out
def __getattr__(self, attribute):
result = dbus.Interface.__getattr__(self, attribute)
if callable(result):
result = self.catch_errors(result)
return result
def bus_get_object(bus, object_path, service_name=None):
"""A wrapper around :meth:`SessionBus.get_object` that raises
:exc:`~secretstorage.exceptions.SecretServiceNotAvailableException`
when appropriate."""
name = service_name or BUS_NAME
try:
return bus.get_object(name, object_path, introspect=False)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() in (DBUS_SERVICE_UNKNOWN, DBUS_EXEC_FAILED,
DBUS_NO_REPLY):
raise SecretServiceNotAvailableException(e.get_dbus_message())
raise
def open_session(bus):
"""Returns a new Secret Service session."""
service_obj = bus_get_object(bus, SS_PATH)
service_iface = dbus.Interface(service_obj, SS_PREFIX+'Service')
session = Session()
try:
output, result = service_iface.OpenSession(
ALGORITHM_DH,
dbus.ByteArray(int_to_bytes(session.my_public_key)),
signature='sv'
)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() != DBUS_NOT_SUPPORTED:
raise
output, result = service_iface.OpenSession(
ALGORITHM_PLAIN,
'',
signature='sv'
)
session.encrypted = False
else:
output = int_from_bytes(bytearray(output), 'big')
session.set_server_public_key(output)
session.object_path = result
return session
def format_secret(session, secret, content_type):
"""Formats `secret` to make possible to pass it to the
Secret Service API."""
if not isinstance(secret, bytes):
secret = secret.encode('utf-8')
if not session.encrypted:
return dbus.Struct((session.object_path, '',
dbus.ByteArray(secret), content_type))
# PKCS-7 style padding
padding = 0x10 - (len(secret) & 0xf)
secret += bytes(bytearray((padding,)) * padding)
aes_iv = os.urandom(0x10)
aes = algorithms.AES(session.aes_key)
encryptor = Cipher(aes, modes.CBC(aes_iv), default_backend()).encryptor()
encrypted_secret = encryptor.update(secret) + encryptor.finalize()
return dbus.Struct((
session.object_path,
dbus.Array(aes_iv),
dbus.Array(bytearray(encrypted_secret)),
content_type
))
def exec_prompt(bus, prompt, callback):
"""Executes the given `prompt`, when complete calls `callback`
function with two arguments: a boolean representing whether the
operation was dismissed and a list of unlocked item paths. A main
loop should be running and registered for this function to work."""
prompt_obj = bus_get_object(bus, prompt)
prompt_iface = dbus.Interface(prompt_obj, SS_PREFIX+'Prompt')
prompt_iface.Prompt('', signature='s')
def new_callback(dismissed, unlocked):
if isinstance(unlocked, dbus.Array):
unlocked = list(unlocked)
callback(bool(dismissed), unlocked)
prompt_iface.connect_to_signal('Completed', new_callback)
def exec_prompt_glib(bus, prompt):
"""Like :func:`exec_prompt`, but synchronous (uses loop from GLib
API). Returns (*dismissed*, *unlocked*) tuple."""
from gi.repository import GLib
loop = GLib.MainLoop()
result = []
def callback(dismissed, unlocked):
result.append(dismissed)
result.append(unlocked)
loop.quit()
exec_prompt(bus, prompt, callback)
loop.run()
return result[0], result[1]
def exec_prompt_qt(bus, prompt):
"""Like :func:`exec_prompt`, but synchronous (uses loop from PyQt5
API). Returns (*dismissed*, *unlocked*) tuple."""
from PyQt5.QtCore import QCoreApplication
app = QCoreApplication([])
result = []
def callback(dismissed, unlocked):
result.append(dismissed)
result.append(unlocked)
app.quit()
exec_prompt(bus, prompt, callback)
app.exec_()
return result[0], result[1]
def unlock_objects(bus, paths, callback=None):
"""Requests unlocking objects specified in `paths`. If `callback`
is specified, calls it when unlocking is complete (see
:func:`exec_prompt` description for details).
Otherwise, uses the loop from GLib API and returns a boolean
representing whether the operation was dismissed.
.. versionadded:: 2.1.2"""
service_obj = bus_get_object(bus, SS_PATH)
service_iface = InterfaceWrapper(service_obj, SERVICE_IFACE)
unlocked_paths, prompt = service_iface.Unlock(paths, signature='ao')
unlocked_paths = list(unlocked_paths) # Convert from dbus.Array
if len(prompt) > 1:
if callback:
exec_prompt(bus, prompt, callback)
else:
return exec_prompt_glib(bus, prompt)[0]
elif callback:
# We still need to call it.
callback(False, unlocked_paths)
def to_unicode(string):
"""Converts D-Bus string to unicode string."""
try:
# For Python 2
return unicode(string)
except NameError:
# For Python 3
return str(string)
|