This file is indexed.

/usr/lib/python2.7/dist-packages/mistralclient/auth/keycloak.py is in python-mistralclient 1:3.3.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright 2016 - Nokia Networks
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.

import logging
import os
import pprint

import requests
from six.moves import urllib

from mistralclient import auth


LOG = logging.getLogger(__name__)


class KeycloakAuthHandler(auth.AuthHandler):

    def authenticate(self, req, session=None):
        """Performs authentication using Keycloak OpenID Protocol.

        :param req: Request dict containing list of parameters required
            for Keycloak authentication.

            * auth_url: Base authentication url of KeyCloak server (e.g.
                "https://my.keycloak:8443/auth"
            * client_id: Client ID (according to OpenID Connect protocol).
            * client_secret: Client secret (according to OpenID Connect
                protocol).
            * project_name: KeyCloak realm name.
            * username: User name (Optional, if None then access_token must be
                provided).
            * api_key: Password (Optional).
            * access_token: Access token. If passed, username and password are
                not used and this method just validates the token and refreshes
                it if needed (Optional, if None then username must be
                provided).
            * cacert: SSL certificate file (Optional).
            * insecure: If True, SSL certificate is not verified (Optional).

        :param session: Keystone session object. Not used by this plugin.

        """

        if not isinstance(req, dict):
            raise TypeError('The input "req" is not typeof dict.')

        auth_url = req.get('auth_url')
        client_id = req.get('client_id')
        client_secret = req.get('client_secret')
        realm_name = req.get('project_name')
        username = req.get('username')
        password = req.get('api_key')
        access_token = req.get('access_token')
        cacert = req.get('cacert')
        insecure = req.get('insecure', False)

        if not auth_url:
            raise ValueError('Base authentication url is not provided.')

        if not client_id:
            raise ValueError('Client ID is not provided.')

        if not realm_name:
            raise ValueError('Project(realm) name is not provided.')

        if username and access_token:
            raise ValueError(
                "User name and access token can't be "
                "provided at the same time."
            )

        if not username and not access_token:
            raise ValueError(
                'Either user name or access token must be provided.'
            )

        if access_token:
            response = self._authenticate_with_token(
                auth_url,
                client_id,
                client_secret,
                access_token,
                cacert,
                insecure
            )
        else:
            response = self._authenticate_with_password(
                auth_url,
                client_id,
                client_secret,
                realm_name,
                username,
                password,
                cacert,
                insecure
            )

        return {'auth_token': response, 'project_id': realm_name}

    @staticmethod
    def _authenticate_with_token(auth_url, client_id, client_secret,
                                 auth_token, cacert=None, insecure=None):
        # TODO(rakhmerov): Implement.
        raise NotImplementedError

    @staticmethod
    def _authenticate_with_password(auth_url, client_id, client_secret,
                                    realm_name, username, password,
                                    cacert=None, insecure=None):
        access_token_endpoint = (
            "%s/realms/%s/protocol/openid-connect/token" %
            (auth_url, realm_name)
        )

        verify = None
        if urllib.parse.urlparse(access_token_endpoint).scheme == "https":
            verify = False if insecure else cacert if cacert else True

        body = {
            'grant_type': 'password',
            'username': username,
            'password': password,
            'client_id': client_id,
            'scope': 'profile'
        }

        if client_secret:
            body['client_secret'] = client_secret,

        resp = requests.post(
            access_token_endpoint,
            data=body,
            verify=verify
        )

        try:
            resp.raise_for_status()
        except Exception as e:
            raise Exception("Failed to get access token:\n %s" % str(e))

        LOG.debug("HTTP response from OIDC provider: %s",
                  pprint.pformat(resp.json()))

        return resp.json()['access_token']


def get_system_ca_file():
    """Return path to system default CA file."""
    # Standard CA file locations for Debian/Ubuntu, RedHat/Fedora,
    # Suse, FreeBSD/OpenBSD, MacOSX, and the bundled ca
    ca_path = ['/etc/ssl/certs/ca-certificates.crt',
               '/etc/pki/tls/certs/ca-bundle.crt',
               '/etc/ssl/ca-bundle.pem',
               '/etc/ssl/cert.pem',
               '/System/Library/OpenSSL/certs/cacert.pem',
               requests.certs.where()]
    for ca in ca_path:
        LOG.debug("Looking for ca file %s", ca)
        if os.path.exists(ca):
            LOG.debug("Using ca file %s", ca)
            return ca
    LOG.warning("System ca file could not be found.")

# An example of working curl request to keycloak
# curl -d "client_id=admin-cli" -d "client_secret=secret"
# -d "username=admin" -d "password=qwerty" -d "grant_type=password"
# "http://localhost:8080/auth/realms/master/protocol/openid-connect/token"

# An example of using KeyCloak OpenID authentication.
if __name__ == '__main__':
    print("Using username/password to get access token from KeyCloak...")

    auth_handler = KeycloakAuthHandler()

    a_token = auth_handler.authenticate(
        dict(
            "https://my.keycloak:8443/auth",
            client_id="mistral_client",
            client_secret="secret",
            project_name="mistral",
            username="user",
            api_key="secret",
            insecure=True
        )
    )['auth_token']

    print("Auth token: %s" % a_token)