This file is indexed.

/usr/lib/python3/dist-packages/botocore/translate.py is in python3-botocore 0.62.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

"""Translate the raw json files into python specific descriptions."""
import os
import re
from copy import deepcopy

import jmespath

from botocore.compat import OrderedDict, json
from botocore.utils import merge_dicts
from botocore import xform_name


class ModelFiles(object):
    """Container object to hold all the various parsed json files.

    Includes:

        * The json service description.
        * The _retry.json file.
        * The <service>.extra.json enhancements file.
        * The name of the service.

    """
    def __init__(self, model, retry, enhancements, name=''):
        self.model = model
        self.retry = retry
        self.enhancements = enhancements
        self.name = name


def load_model_files(args):
    model = json.load(open(args.modelfile),
                      object_pairs_hook=OrderedDict)
    retry = json.load(open(args.retry_file),
                      object_pairs_hook=OrderedDict)
    enhancements = _load_enhancements_file(args.enhancements_file)
    service_name = os.path.splitext(os.path.basename(args.modelfile))[0]
    return ModelFiles(model, retry, enhancements, name=service_name)


def _load_enhancements_file(file_path):
    if not os.path.isfile(file_path):
        return {}
    else:
        return json.load(open(file_path),
                         object_pairs_hook=OrderedDict)


def translate(model):
    new_model = deepcopy(model.model)
    new_model.update(model.enhancements.get('extra', {}))
    try:
        del new_model['pagination']
    except KeyError:
        pass
    handle_op_renames(new_model, model.enhancements)
    handle_remove_deprecated_params(new_model, model.enhancements)
    handle_remove_deprecated_operations(new_model, model.enhancements)
    handle_filter_documentation(new_model, model.enhancements)
    handle_rename_params(new_model, model.enhancements)
    add_pagination_configs(
        new_model,
        model.enhancements.get('pagination', {}))
    add_waiter_configs(
        new_model,
        model.enhancements.get('waiters', {}))
    # Merge in any per operation overrides defined in the .extras.json file.
    merge_dicts(new_model['operations'],
                model.enhancements.get('operations', {}))
    add_retry_configs(
        new_model, model.retry.get('retry', {}),
        definitions=model.retry.get('definitions', {}))
    return new_model


def handle_op_renames(new_model, enhancements):
    # This allows for operations to be renamed.  The only
    # implemented transformation is removing part of the operation name
    # (because that's all we currently need.)
    remove = enhancements.get('transformations', {}).get(
        'operation-name', {}).get('remove')
    if remove is not None:
        # We're going to recreate the dictionary because we want to preserve
        # the order.  This is the only option we have unless we have our own
        # custom OrderedDict.
        remove_regex = re.compile(remove)
        operations = new_model['operations']
        new_operation = OrderedDict()
        for key in operations:
            new_key = remove_regex.sub('', key)
            new_operation[new_key] = operations[key]
        new_model['operations'] = new_operation


def handle_remove_deprecated_operations(new_model, enhancements):
    # This removes any operation whose documentation string contains
    # the specified phrase that marks a deprecated parameter.
    keyword = enhancements.get('transformations', {}).get(
        'remove-deprecated-operations', {}).get('deprecated_keyword')
    remove = []
    if keyword is not None:
        operations = new_model['operations']
        for op_name in operations:
            operation = operations[op_name]
            if operation:
                docs = operation['documentation']
                if docs and docs.find(keyword) >= 0:
                    remove.append(op_name)
    for op in remove:
        del new_model['operations'][op]


def handle_remove_deprecated_params(new_model, enhancements):
    # This removes any parameter whose documentation string contains
    # the specified phrase that marks a deprecated parameter.
    keyword = enhancements.get('transformations', {}).get(
        'remove-deprecated-params', {}).get('deprecated_keyword')
    if keyword is not None:
        operations = new_model['operations']
        for op_name in operations:
            operation = operations[op_name]
            params = operation.get('input', {}).get('members')
            if params:
                new_params = OrderedDict()
                for param_name in params:
                    param = params[param_name]
                    docs = param['documentation']
                    if docs and docs.find(keyword) >= 0:
                        continue
                    new_params[param_name] = param
                operation['input']['members'] = new_params


def _filter_param_doc(param, replacement, regex):
    # Recurse into complex parameters looking for documentation.
    doc = param.get('documentation')
    if doc:
        param['documentation'] = regex.sub(replacement, doc)
    if param['type'] == 'structure':
        for member_name in param['members']:
            member = param['members'][member_name]
            _filter_param_doc(member, replacement, regex)
    if param['type'] == 'map':
        _filter_param_doc(param['keys'], replacement, regex)
        _filter_param_doc(param['members'], replacement, regex)
    elif param['type'] == 'list':
        _filter_param_doc(param['members'], replacement, regex)


def handle_filter_documentation(new_model, enhancements):
    # This provides a way to filter undesireable content (e.g. CDATA)
    # from documentation strings.
    doc_filter = enhancements.get('transformations', {}).get(
        'filter-documentation', {}).get('filter')
    if doc_filter is not None:
        filter_regex = re.compile(doc_filter.get('regex', ''), re.DOTALL)
        replacement = doc_filter.get('replacement')
        operations = new_model['operations']
        for op_name in operations:
            operation = operations[op_name]
            doc = operation.get('documentation')
            if doc:
                new_doc = filter_regex.sub(replacement, doc)
                operation['documentation'] = new_doc
            params = operation.get('input', {}).get('members')
            if params:
                for param_name in params:
                    param = params[param_name]
                    _filter_param_doc(param, replacement, filter_regex)


def handle_rename_params(new_model, enhancements):
    renames = enhancements.get('transformations', {}).get(
        'renames', {})
    if not renames:
        return
    # This is *extremely* specific to botocore's translations, but
    # we support a restricted set of argument renames based on a
    # jmespath expression.
    for expression, new_value in renames.items():
        # First we take everything up until the last dot.
        parent_expression, key = expression.rsplit('.', 1)
        matched = jmespath.search(parent_expression, new_model['operations'])
        current = matched[key]
        del matched[key]
        matched[new_value] = current


def resembles_jmespath_exp(value):
    # For now, we'll do a naive check.
    if '.' in value or '[' in value:
        return True
    return False


def add_pagination_configs(new_model, pagination):
    # Adding in pagination configs means copying the config to a top level
    # 'pagination' key in the new model, and it also means adding the
    # pagination config to each individual operation.
    # Also, the input_token needs to be transformed to the python specific
    # name, so we're adding a py_input_token (e.g. NextToken -> next_token).
    if pagination:
        new_model['pagination'] = pagination
    for name in pagination:
        config = pagination[name]
        _check_known_pagination_keys(config)
        if 'py_input_token' not in config:
            _add_py_input_token(config)
        _validate_result_key_exists(config)
        _validate_referenced_operation_exists(new_model, name)
        operation = new_model['operations'][name]
        _validate_operation_has_output(operation, name)
        _check_input_keys_match(config, operation)
        _check_output_keys_match(config, operation,
                                 new_model.get('endpoint_prefix', ''))
        operation['pagination'] = config.copy()


def _validate_operation_has_output(operation, name):
    if not operation['output']:
        raise ValueError("Trying to add pagination config for an "
                         "operation with no output members: %s" % name)


def _validate_referenced_operation_exists(new_model, name):
    if name not in new_model['operations']:
        raise ValueError("Trying to add pagination config for non "
                         "existent operation: %s" % name)


def _validate_result_key_exists(config):
    # result_key must be defined.
    if 'result_key' not in config:
        raise ValueError("Required key 'result_key' is missing from "
                         "from pagination config: %s" % config)


def _add_py_input_token(config):
    input_token = config['input_token']
    if isinstance(input_token, list):
        py_input_token = []
        for token in input_token:
            py_input_token.append(xform_name(token))
        config['py_input_token'] = py_input_token
    else:
        config['py_input_token'] = xform_name(input_token)


def add_waiter_configs(new_model, waiters):
    if waiters:
        denormalized = denormalize_waiters(waiters)
        # Before adding it to the new model, we need to verify the
        # final denormalized model.
        for value in denormalized.values():
            if value['operation'] not in new_model['operations']:
                raise ValueError()
        new_model['waiters'] = denormalized


def denormalize_waiters(waiters):
    # The waiter configuration is normalized to avoid duplication.
    # You can inherit defaults, and extend from other definitions.
    # We're going to denormalize this so that the implementation for
    # consuming waiters is simple.
    default = waiters.get('__default__', {})
    new_waiters = {}
    for key, value in waiters.items():
        if key.startswith('__'):
            # Keys that start with '__' are considered abstract/internal
            # and are only used for inheritance.  Because we're going
            # to denormalize the configs and perform all the lookups
            # during this translation process, the abstract/internal
            # configs don't need to make it into the final translated
            # config so we can just skip these.
            continue
        new_waiters[key] = denormalize_single_waiter(value, default, waiters)
    return new_waiters


def denormalize_single_waiter(value, default, waiters):
    """Denormalize a single waiter config.

    :param value: The dictionary of a single waiter config, e.g.
        the ``InstanceRunning`` or ``TableExists`` config.  This
        is the config we're going to denormalize.
    :param default: The ``__default__`` (if any) configuration.
        This is needed to resolve the lookup process.
    :param waiters: The full configuration of the waiters.
        This is needed if we need to look up at parent class that the
        current config extends.
    :return: The denormalized config.
    :rtype: dict

    """
    # First we need to resolve all the keys based on the inheritance
    # hierarchy.  The lookup process is:
    # The most bottom/leaf class is ``value``.  From there we need
    # to look up anything it inherits from (denoted via the ``extends``
    # key).  We need to perform this process recursively until we hit
    # a config that has no ``extends`` key.
    # And finally if we haven't found our value yet, we check in the
    # ``__default__`` key.
    # So the first thing we need to do is build the lookup chain that
    # starts with ``value`` and ends with ``__default__``.
    lookup_chain = [value]
    current = value
    while True:
        if 'extends' not in current:
            break
        current = waiters[current.get('extends')]
        lookup_chain.append(current)
    lookup_chain.append(default)
    new_waiter = {}
    # Now that we have this lookup chain we can build the entire set
    # of values by starting at the most parent class and walking down
    # to the children.  At each step the child is merged onto the parent's
    # config items.  This is the desired behavior as a child's values
    # overrides its parents.  This is what the ``reversed(...)`` call
    # is for.
    for element in reversed(lookup_chain):
        new_waiter.update(element)
    # We don't care about 'extends' so we can safely remove that key.
    new_waiter.pop('extends', {})
    # Now we need to resolve the success/failure values.  We
    # want to completely remove the acceptor types.
    # The logic here is that if there is no success/failure_* variable
    # defined, it inherits this value from the matching acceptor_* variable.
    new_waiter['success_type'] = new_waiter.get(
        'success_type', new_waiter.get('acceptor_type'))
    new_waiter['success_path'] = new_waiter.get(
        'success_path', new_waiter.get('acceptor_path'))
    new_waiter['success_value'] = new_waiter.get(
        'success_value', new_waiter.get('acceptor_value'))
    new_waiter['failure_type'] = new_waiter.get(
        'failure_type', new_waiter.get('acceptor_type'))
    new_waiter['failure_path'] = new_waiter.get(
        'failure_path', new_waiter.get('acceptor_path'))
    new_waiter['failure_value'] = new_waiter.get(
        'failure_value', new_waiter.get('acceptor_value'))
    # We can remove acceptor_* vars because they're only used for lookups
    # and we've already performed this step in the lines above.
    new_waiter.pop('acceptor_type', '')
    new_waiter.pop('acceptor_path', '')
    new_waiter.pop('acceptor_value', '')
    # Remove any keys with a None value.
    for key in list(new_waiter.keys()):
        if new_waiter[key] is None:
            del new_waiter[key]
    # Check required keys.
    for required in ['operation', 'success_type']:
        if required not in new_waiter:
            raise ValueError('Missing required waiter configuration '
                             'value "%s": %s' % (required, new_waiter))
        if new_waiter.get(required) is None:
            raise ValueError('Required waiter configuration '
                             'value cannot be None "%s": %s' %
                             (required, new_waiter))
    # Finally, success/failure values can be a scalar or a list.  We're going
    # to just always make them a list.
    if 'success_value' in new_waiter and not \
            isinstance(new_waiter['success_value'], list):
        new_waiter['success_value'] = [new_waiter['success_value']]
    if 'failure_value' in new_waiter and not \
            isinstance(new_waiter['failure_value'], list):
        new_waiter['failure_value'] = [new_waiter['failure_value']]
    _transform_waiter(new_waiter)
    return new_waiter


def _transform_waiter(new_waiter):
    # This transforms the waiters into a format that's slightly
    # easier to consume.
    if 'success_type' in new_waiter:
        success = {'type': new_waiter.pop('success_type')}
        if 'success_path' in new_waiter:
            success['path'] = new_waiter.pop('success_path')
        if 'success_value' in new_waiter:
            success['value'] = new_waiter.pop('success_value')
        new_waiter['success'] = success
    if 'failure_type' in new_waiter:
        failure = {'type': new_waiter.pop('failure_type')}
        if 'failure_path' in new_waiter:
            failure['path'] = new_waiter.pop('failure_path')
        if 'failure_value' in new_waiter:
            failure['value'] = new_waiter.pop('failure_value')
        new_waiter['failure'] = failure


def _check_known_pagination_keys(config):
    # Verify that the pagination config only has keys we expect to see.
    expected = set(['input_token', 'py_input_token', 'output_token',
                    'result_key', 'limit_key', 'more_results',
                    'non_aggregate_keys'])
    for key in config:
        if key not in expected:
            raise ValueError("Unknown key in pagination config: %s" % key)


def _check_output_keys_match(config, operation, service_name):
    output_members = list(operation['output']['members'])
    jmespath_seen = False
    for output_key in _get_all_page_output_keys(config):
        if resembles_jmespath_exp(output_key):
            # We don't validate jmespath expressions for now.
            jmespath_seen = True
            continue
        if output_key not in output_members:
            raise ValueError("Key %r is not an output member: %s" %
                             (output_key,
                              output_members))
        output_members.remove(output_key)
    # Some services echo the input parameters in the response
    # output.  We should not trigger a validation error
    # if those params are still not accounted for.
    for input_name in operation['input']['members']:
        if input_name in output_members:
            output_members.remove(input_name)
    if not jmespath_seen and output_members:
        # Because we can't validate jmespath expressions yet,
        # we can't say for user if output_members actually has
        # remaining keys or not.
        if service_name == 's3' and output_members == ['Name']:
            # The S3 model uses 'Name' for the output key, which
            # actually maps to the 'Bucket' input param so we don't
            # need to validate this output member.  This is the only
            # model that has this, so we can just special case this
            # for now.
            return
        raise ValueError("Output members still exist for operation %s: %s" % (
            operation['name'], output_members))


def _get_all_page_output_keys(config):
    if not isinstance(config['result_key'], list):
        yield config['result_key']
    else:
        for result_key in config['result_key']:
            yield result_key
    if not isinstance(config['output_token'], list):
        yield config['output_token']
    else:
        for result_key in config['output_token']:
            yield result_key
    if 'more_results' in config:
        yield config['more_results']
    for key in config.get('non_aggregate_keys', []):
        yield key


def _check_input_keys_match(config, operation):
    input_tokens = config['input_token']
    if not isinstance(input_tokens, list):
        input_tokens = [input_tokens]
    valid_input_names = operation['input']['members']
    for token in input_tokens:
        if token not in valid_input_names:
            raise ValueError("input_token refers to a non existent "
                             "input name for operation %s: %s.  "
                             "Must be one of: %s" % (operation['name'], token,
                                                     list(valid_input_names)))
    if 'limit_key' in config and config['limit_key'] not in valid_input_names:
        raise ValueError("limit_key refers to a non existent input name for "
                         "operation %s: %s.  Must be one of: %s" % (
                             operation['name'], config['limit_key'],
                             list(valid_input_names)))


def add_retry_configs(new_model, retry_model, definitions):
    if not retry_model:
        new_model['retry'] = {}
        return
    # The service specific retry config is keyed off of the endpoint
    # prefix as defined in the JSON model.
    endpoint_prefix = new_model.get('endpoint_prefix', '')
    service_config = retry_model.get(endpoint_prefix, {})
    resolve_references(service_config, definitions)
    # We want to merge the global defaults with the service specific
    # defaults, with the service specific defaults taking precedence.
    # So we use the global defaults as the base.
    final_retry_config = {'__default__': retry_model.get('__default__', {})}
    resolve_references(final_retry_config, definitions)
    # The merge the service specific config on top.
    merge_dicts(final_retry_config, service_config)
    new_model['retry'] = final_retry_config


def resolve_references(config, definitions):
    """Recursively replace $ref keys.

    To cut down on duplication, common definitions can be declared
    (and passed in via the ``definitions`` attribute) and then
    references as {"$ref": "name"}, when this happens the reference
    dict is placed with the value from the ``definition`` dict.

    This is recursively done.

    """
    for key, value in config.items():
        if isinstance(value, dict):
            if len(value) == 1 and list(value.keys())[0] == '$ref':
                # Then we need to resolve this reference.
                config[key] = definitions[list(value.values())[0]]
            else:
                resolve_references(value, definitions)