This file is indexed.

/usr/lib/python2.7/dist-packages/ipaserver/secrets/client.py is in python-ipaserver 4.7.0~pre1+git20180411-2ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright (C) 2015  IPA Project Contributors, see COPYING for license

from __future__ import print_function
# pylint: disable=relative-import
from custodia.message.kem import KEMClient, KEY_USAGE_SIG, KEY_USAGE_ENC
# pylint: enable=relative-import
from jwcrypto.common import json_decode
from jwcrypto.jwk import JWK
from ipaserver.secrets.kem import IPAKEMKeys
from ipaserver.secrets.store import iSecStore
from ipaplatform.paths import paths
from base64 import b64encode
import ldapurl
import gssapi
import os
import urllib3
import requests


class CustodiaClient(object):

    def _client_keys(self):
        return self.ikk.server_keys

    def _server_keys(self, server, realm):
        principal = 'host/%s@%s' % (server, realm)
        sk = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_SIG)))
        ek = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_ENC)))
        return (sk, ek)

    def _ldap_uri(self, realm):
        dashrealm = '-'.join(realm.split('.'))
        socketpath = paths.SLAPD_INSTANCE_SOCKET_TEMPLATE % (dashrealm,)
        return 'ldapi://' + ldapurl.ldapUrlEscape(socketpath)

    def _keystore(self, realm, ldap_uri, auth_type):
        config = dict()
        if ldap_uri is None:
            config['ldap_uri'] = self._ldap_uri(realm)
        else:
            config['ldap_uri'] = ldap_uri
        if auth_type is not None:
            config['auth_type'] = auth_type

        return iSecStore(config)

    def __init__(
            self, client_service, keyfile, keytab, server, realm,
            ldap_uri=None, auth_type=None):
        self.client_service = client_service
        self.keytab = keytab

        # Init creds immediately to make sure they are valid.  Creds
        # can also be re-inited by _auth_header to avoid expiry.
        #
        self.creds = self.init_creds()

        self.service_name = gssapi.Name('HTTP@%s' % (server,),
                                        gssapi.NameType.hostbased_service)
        self.server = server

        self.ikk = IPAKEMKeys({'server_keys': keyfile, 'ldap_uri': ldap_uri})

        self.kemcli = KEMClient(self._server_keys(server, realm),
                                self._client_keys())

        self.keystore = self._keystore(realm, ldap_uri, auth_type)

        # FIXME: Remove warnings about missing subjAltName for the
        #        requests module
        urllib3.disable_warnings()

    def init_creds(self):
        name = gssapi.Name(self.client_service,
                           gssapi.NameType.hostbased_service)
        store = {'client_keytab': self.keytab,
                 'ccache': 'MEMORY:Custodia_%s' % b64encode(
                     os.urandom(8)).decode('ascii')}
        return gssapi.Credentials(name=name, store=store, usage='initiate')

    def _auth_header(self):
        if not self.creds or self.creds.lifetime < 300:
            self.creds = self.init_creds()
        ctx = gssapi.SecurityContext(name=self.service_name, creds=self.creds)
        authtok = ctx.step()
        return {'Authorization': 'Negotiate %s' % b64encode(
            authtok).decode('ascii')}

    def fetch_key(self, keyname, store=True):

        # Prepare URL
        url = 'https://%s/ipa/keys/%s' % (self.server, keyname)

        # Prepare signed/encrypted request
        encalg = ('RSA-OAEP', 'A256CBC-HS512')
        request = self.kemcli.make_request(keyname, encalg=encalg)

        # Prepare Authentication header
        headers = self._auth_header()

        # Perform request
        r = requests.get(url, headers=headers,
                         verify=paths.IPA_CA_CRT,
                         params={'type': 'kem', 'value': request})
        r.raise_for_status()
        reply = r.json()

        if 'type' not in reply or reply['type'] != 'kem':
            raise RuntimeError('Invlid JSON response type')

        value = self.kemcli.parse_reply(keyname, reply['value'])

        if store:
            self.keystore.set('keys/%s' % keyname, value)
        else:
            return value