This file is indexed.

/usr/lib/python2.7/dist-packages/keysign/gpgmeh.py is in gnome-keysign 0.9-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python
#    Copyright 2016 Tobias Mueller <muelli@cryptobitch.de>
#
#    This file is part of GNOME Keysign.
#
#    GNOME Keysign is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    GNOME Keysign is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with GNOME Keysign.  If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals

import logging
import os  # The SigningKeyring uses os.symlink for the agent
import sys
from tempfile import mkdtemp

import gpg
from gpg.constants import PROTOCOL_OpenPGP


from .gpgkey import Key, UID

texttype = unicode if sys.version_info.major < 3 else str

log = logging.getLogger(__name__)

#####
## INTERNAL API
##

class GenEdit:
    _ignored_status = (gpg.constants.STATUS_EOF,
                       gpg.constants.STATUS_GOT_IT,
                       gpg.constants.STATUS_NEED_PASSPHRASE,
                       gpg.constants.STATUS_GOOD_PASSPHRASE,
                       gpg.constants.STATUS_BAD_PASSPHRASE,
                       gpg.constants.STATUS_USERID_HINT,
                       gpg.constants.STATUS_SIGEXPIRED,
                       gpg.constants.STATUS_KEYEXPIRED,
                       gpg.constants.STATUS_PROGRESS,
                       gpg.constants.STATUS_KEY_CREATED,
                       gpg.constants.STATUS_ALREADY_SIGNED)

    def __init__(self, generator):
        generator.send(None)
        self.generator = generator
        self.last_sink_index = 0

    def edit_cb(self, status, args, sink=None):
        if status in self._ignored_status:
                logging.info("Returning None for %r %r", status, args)
                return
        if not status:
            logging.info("Closing for %r", status)
            self.generator.close()
            return
        # 0 is os.SEEK_SET
        if sink:
            # os.SEEK_CUR = 1
            current = sink.seek(0, 1)
            sink.seek(self.last_sink_index, 0)
            sinkdata = sink.read(current)
            self.last_sink_index = current
        else:
            sinkdata = None
        log.info("edit_cb: %r %r '%s'", status, args, sinkdata)
        data = self.generator.send((status, args)) #, sinkdata))
        log.info("edit_cb data: %r", data)
        return texttype(data)

def del_uids(uids):
    status, arg = yield None
    log.info("status args: %r %r", status, arg)
    #log.info("sinkdata: %s", sinkdata)
    #uids = [l for l in sinkdata.splitlines() if l.startswith('uid:')]
    #log.info("UIDs: %s", uids)
    status, arg = yield "list"
    log.info("status args: %r %r", status, arg)
    for uid in uids:
        status, arg = yield "uid %d" % uid
        log.info("status args: %r %r", status, arg)
    if uids:
        status, arg = yield "deluid"
        log.info("status args: %r %r", status, arg)
        assert status == gpg.constants.STATUS_GET_BOOL, "%r %r" % (status, arg)
        assert arg == 'keyedit.remove.uid.okay'
        status, arg = yield "Y"
        log.info("status args: %r %r", status, arg)
    yield 'save'


def sign_key(uid=0, sign_cmd=u"sign", expire=False, check=3,
             error_cb=None):
    status, prompt = yield None
    assert status == gpg.constants.STATUS_GET_LINE
    assert prompt == u"keyedit.prompt"

    status, prompt = yield u"uid %d" % uid
    # We ignore GOT_IT...
    # assert status == gpg.constants.STATUS_GOT_IT

    #status, prompt = yield None
    assert status == gpg.constants.STATUS_GET_LINE

    status, prompt = yield sign_cmd
    # We ignore GOT_IT...
    # assert status == gpg.constants.STATUS_GOT_IT

    while prompt != 'keyedit.prompt':
        if prompt == 'keyedit.sign_all.okay':
            status, prompt = yield 'Y'
        elif prompt == 'sign_uid.expire':
            status, prompt = yield '%s' % ('Y' if expire else 'N')
        elif prompt == 'sign_uid.class':
            status, prompt = yield '%d' % check
        elif prompt == 'sign_uid.okay':
            status, prompt = yield 'Y'
        elif status == gpg.constants.STATUS_INV_SGNR:
            # When does this actually happen?
            status, prompt = yield None
        elif status == gpg.constants.STATUS_PINENTRY_LAUNCHED:
            status, prompt = yield None
        elif status == gpg.constants.STATUS_GOT_IT:
            status, prompt = yield None
        elif status == gpg.constants.STATUS_ALREADY_SIGNED:
            status, prompt = yield u'Y'
        elif status == gpg.constants.STATUS_ERROR:
            if error_cb:
                error_cb(prompt)
            else:
                raise RuntimeError("Error signing key: %s" % prompt)
            status, prompt = yield None
        else:
            raise AssertionError("Unexpected state %r %r" % (status, prompt))

    yield u"save"




def UIDExport(keydata, uid_i):
    """Export only the UID of a key.
    Unfortunately, GnuPG does not provide smth like
    --export-uid-only in order to obtain a UID and its
    signatures."""
    log.debug("Deletion of UID %r from %r", uid_i, keydata)
    if not uid_i >= 1:
        log.debug("Raising because uid: %r", uid_i)
        raise ValueError("Expected UID to be >= 1, but is %r", uid_i)
    ctx = TempContext()
    ctx.op_import(keydata)
    result = ctx.op_import_result()
    if result.considered != 1 or result.imported != 1:
        raise ValueError("Expected exactly one key in keydata. %r" % result)
    else:
        assert len(result.imports) == 1
        fpr = result.imports[0].fpr
        key = ctx.get_key(fpr)
        uids_to_remove = {i for i in range(1, len(key.uids)+1)}
        uids_to_remove.remove(uid_i)
        if uids_to_remove:
            sink = gpg.Data()
            ctx.interact(key,
                GenEdit(del_uids(uids_to_remove)).edit_cb,
                fnc_value=sink, sink=sink)
            sink.seek(0, 0)
            log.debug("Data after UIDExport: %s", sink.read())
        uid_data = gpg.Data()
        ctx.op_export_keys([key], 0, uid_data)
        uid_data.seek(0, 0)
        uid_bytes = uid_data.read()
        log.debug("UID %r: %r", uid_i, uid_bytes)
        return uid_bytes

def export_uids(keydata):
    """Export each valid and non-revoked UID of a key"""
    ctx = TempContext()
    ctx.op_import(keydata)
    result = ctx.op_import_result()
    log.debug("ExportUIDs: Imported %r", result)
    if result.considered != 1 or result.imported != 1:
        raise ValueError("Expected exactly one key in keydata. %r" % result)
    else:
        assert len(result.imports) == 1
        fpr = result.imports[0].fpr
        key = ctx.get_key(fpr)
        for i, uid in enumerate(key.uids, start=1):
            log.info("Potentially deleting UID %d: %r", i, uid)
            if not uid.invalid and not uid.revoked:
                uid_data = UIDExport(keydata, i)
                yield (uid.uid, uid_data)





def is_usable(key):
    unusable =    key.invalid or key.disabled \
               or key.expired or key.revoked
    log.debug('Key %s is invalid: %s (i:%s, d:%s, e:%s, r:%s)', key, unusable,
        key.invalid, key.disabled, key.expired, key.revoked)
    return not unusable

def filter_usable_keys(keys):
    usable_keys = [Key.from_gpgme(key) for key in keys if is_usable(key)]
    log.debug('Identified usable keys: %s', usable_keys)
    return usable_keys



class DirectoryContext(gpg.Context):
    def __init__(self, homedir):
        super(DirectoryContext, self).__init__()
        self.set_engine_info(PROTOCOL_OpenPGP, None, homedir)
        self.homedir = homedir

class TempContext(DirectoryContext):
    def __init__(self):
        self.homedir = mkdtemp()
        super(TempContext, self).__init__(homedir=self.homedir)

    def __del__(self):
        try:
            # shutil.rmtree(self.homedir, ignore_errors=True)
            pass
        except:
            log.exception("During cleanup of %r", self.homedir)

class TempContextWithAgent(TempContext):
    def __init__(self, oldctx):
        super(TempContextWithAgent, self).__init__()
        homedir = self.homedir

        if oldctx:
            old_homedir = oldctx.engine_info.home_dir
            if not old_homedir:
                old_homedir = os.path.join(os.path.expanduser("~"), ".gnupg")
        else:
            old_homedir = os.path.join(os.path.expanduser("~"), ".gnupg")

        log.info("Old homedir: %r", old_homedir)
        old_agent_path = os.path.expanduser(os.path.join(old_homedir, "S.gpg-agent"))
        new_agent_path = os.path.expanduser(os.path.join(homedir, "S.gpg-agent"))
        os.symlink(old_agent_path, new_agent_path)

        assert len(list(self.keylist())) == 0

        secret_keys = list(oldctx.keylist(secret=True))
        for key in secret_keys:
            def export_key(fpr):
                # FIXME: The Context should really be able to export()
                public_key = gpg.Data()
                oldctx.op_export(fpr, 0, public_key)
                public_key.seek(0, os.SEEK_SET)
                return public_key
            keydata = export_key(key.subkeys[0].fpr)
            self.op_import(keydata)
            # FIXME: I guess we should assert on the result

        assert len(list(self.keylist())) == len(secret_keys)



##
## END OF INTERNAL API
#####





def openpgpkey_from_data(keydata):
    c = TempContext()
    c.op_import(gpg.Data(keydata))
    result = c.op_import_result()
    log.debug("Import Result: %s", result)
    if result.imported != 1:
        raise ValueError("Keydata did not contain exactly one key, but %r" %
            result.imported)
    else:
        imported = result.imports
        import_ = imported[0]
        fpr = import_.fpr
        key = c.get_key(fpr)
        return Key.from_gpgme(key)



def get_public_key_data(fpr, homedir=None):
    c = DirectoryContext(homedir)
    c.armor = True
    sink = gpg.Data()
    # FIXME: There will probably be an export() function
    c.op_export(fpr, 0, sink)
    sink.seek(0, os.SEEK_SET)
    keydata = sink.read()
    log.debug("Exported %r: %r", fpr, keydata)
    if not keydata:
        s = "No data to export for {} (in {})".format(fpr, homedir)
        raise ValueError(s)
    return keydata



def fingerprint_from_keydata(keydata):
    '''Returns the OpenPGP Fingerprint for a given key'''
    openpgpkey = openpgpkey_from_data(keydata)
    return openpgpkey.fpr

def get_usable_keys_from_context(ctx, pattern="", secret=False):
    keys = [Key.from_gpgme(key)
            for key in ctx.keylist(pattern=pattern, secret=secret)
            if is_usable(key)]
    return keys

def get_usable_keys(pattern="", homedir=None):
    '''Uses get_keys on the keyring and filters for
    non revoked, expired, disabled, or invalid keys'''
    log.debug('Retrieving keys for %s, %s', pattern, homedir)
    ctx = DirectoryContext(homedir=homedir)
    return get_usable_keys_from_context(ctx,
    	pattern=pattern, secret=False)

def get_usable_secret_keys(pattern="", homedir=None):
    '''Returns all secret keys which can be used to sign a key'''
    ctx = DirectoryContext(homedir=homedir)
    return get_usable_keys_from_context(ctx,
    	pattern=pattern, secret=True)




def minimise_key(keydata):
    "Returns the public key exported under the MINIMAL mode"
    ctx = TempContext()
    ctx.op_import(keydata)
    result = ctx.op_import_result()
    if result.considered != 1 and result.imported != 1:
        raise ValueError("Expected to load exactly one key. %r", result)
    else:
        imports = [i for i in result.imports
                   if i.status == gpg.constants.IMPORT_NEW]
        log.debug("Import %r", result)
        assert len(imports) == 1
        fpr = result.imports[0].fpr
        key = ctx.get_key(fpr)
        sink = gpg.Data()
        ctx.op_export_keys([key], gpg.constants.EXPORT_MODE_MINIMAL, sink)
        sink.seek(0, 0)
        minimised_key = sink.read()
        return minimised_key

def sign_keydata_and_encrypt(keydata, error_cb=None, homedir=None):
    oldctx = DirectoryContext(homedir)
    ctx = TempContextWithAgent(oldctx)
    # We're trying to sign with all available secret keys
    available_secret_keys = [key for key in ctx.keylist(secret=True)
        if not key.disabled or key.revoked or key.invalid or key.expired]
    ctx.signers = available_secret_keys

    ctx.op_import(minimise_key(keydata))
    result = ctx.op_import_result()
    if result.considered != 1 and result.imported != 1:
        raise ValueError("Expected to load exactly one key. %r", result)
    else:
        imports = result.imports
        assert len(imports) == 1
        fpr = result.imports[0].fpr
        key = ctx.get_key(fpr)
        sink = gpg.Data()
        # There is op_keysign, but it's only available with gpg 2.1.12
        ctx.interact(key, GenEdit(sign_key(error_cb=error_cb)).edit_cb, sink=sink)
        sink.seek(0, 0)
        log.debug("Sink after signing: %s", sink.read())

        signed_sink = gpg.Data()
        ctx.set_keylist_mode(gpg.constants.KEYLIST_MODE_SIGS)
        ctx.armor = True
        ctx.op_export_keys([key], 0, signed_sink)
        signed_sink.seek(0, 0)
        signed_keydata = signed_sink.read()
        log.debug("Signed Key: %s", signed_keydata)
        # Do I have to re-get the key to make the signatures known?
        key = ctx.get_key(fpr)

        for i, uid in enumerate(key.uids, start=1):
            if uid.revoked or uid.invalid:
                continue
            else:
                uid_data = UIDExport(signed_keydata, i)
                log.debug("Data for uid %d: %r, sigs: %r %r", i, uid, uid.signatures, uid_data)

                ciphertext, _, _ = ctx.encrypt(plaintext=uid_data,
                                               recipients=[key],
                                               # We probably have to set owner trust
                                               # in order for it to work out of the box
                                               always_trust=True,
                                               sign=False)
                yield (UID.from_gpgme(uid), ciphertext)