This file is indexed.

/usr/lib/python2.7/dist-packages/swift/common/middleware/dlo.py is in python-swift 2.7.0-0ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Middleware that will provide Dynamic Large Object (DLO) support.

---------------
Using ``swift``
---------------

The quickest way to try out this feature is use the ``swift`` Swift Tool
included with the `python-swiftclient`_ library.  You can use the ``-S``
option to specify the segment size to use when splitting a large file. For
example::

    swift upload test_container -S 1073741824 large_file

This would split the large_file into 1G segments and begin uploading those
segments in parallel. Once all the segments have been uploaded, ``swift`` will
then create the manifest file so the segments can be downloaded as one.

So now, the following ``swift`` command would download the entire large
object::

    swift download test_container large_file

``swift`` command uses a strict convention for its segmented object
support. In the above example it will upload all the segments into a
second container named test_container_segments. These segments will
have names like large_file/1290206778.25/21474836480/00000000,
large_file/1290206778.25/21474836480/00000001, etc.

The main benefit for using a separate container is that the main container
listings will not be polluted with all the segment names. The reason for using
the segment name format of <name>/<timestamp>/<size>/<segment> is so that an
upload of a new file with the same name won't overwrite the contents of the
first until the last moment when the manifest file is updated.

``swift`` will manage these segment files for you, deleting old segments on
deletes and overwrites, etc. You can override this behavior with the
``--leave-segments`` option if desired; this is useful if you want to have
multiple versions of the same large object available.

.. _`python-swiftclient`: http://github.com/openstack/python-swiftclient

----------
Direct API
----------

You can also work with the segments and manifests directly with HTTP
requests instead of having ``swift`` do that for you. You can just
upload the segments like you would any other object and the manifest
is just a zero-byte (not enforced) file with an extra
``X-Object-Manifest`` header.

All the object segments need to be in the same container, have a common object
name prefix, and sort in the order in which they should be concatenated.
Object names are sorted lexicographically as UTF-8 byte strings.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``swift``.

The manifest file is simply a zero-byte (not enforced) file with the extra
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
the container the object segments are in and ``<prefix>`` is the common prefix
for all the segments.

It is best to upload all the segments first and then create or update the
manifest. In this way, the full object won't be available for downloading
until the upload is complete. Also, you can upload a new set of segments to
a second location and then update the manifest to point to this new location.
During the upload of the new segments, the original manifest will still be
available to download the first set of segments.

.. note::

    The manifest file should have no content. However, this is not enforced.
    If the manifest path itself conforms to container/prefix specified in
    X-Object-Manifest, and if manifest has some content/data in it, it would
    also be considered as segment and manifest's content will be part of the
    concatenated GET response. The order of concatenation follows the usual DLO
    logic which is - the order of concatenation adheres to order returned when
    segment names are sorted.


Here's an example using ``curl`` with tiny 1-byte segments::

    # First, upload the segments
    curl -X PUT -H 'X-Auth-Token: <token>' \
        http://<storage_url>/container/myobject/00000001 --data-binary '1'
    curl -X PUT -H 'X-Auth-Token: <token>' \
        http://<storage_url>/container/myobject/00000002 --data-binary '2'
    curl -X PUT -H 'X-Auth-Token: <token>' \
        http://<storage_url>/container/myobject/00000003 --data-binary '3'

    # Next, create the manifest file
    curl -X PUT -H 'X-Auth-Token: <token>' \
        -H 'X-Object-Manifest: container/myobject/' \
        http://<storage_url>/container/myobject --data-binary ''

    # And now we can download the segments as a single object
    curl -H 'X-Auth-Token: <token>' \
        http://<storage_url>/container/myobject
"""

import json
import os

import six
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
from six.moves.urllib.parse import unquote

from hashlib import md5
from swift.common import constraints
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
from swift.common.swob import Request, Response, \
    HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict
from swift.common.utils import get_logger, \
    RateLimitedIterator, read_conf_dir, quote, close_if_possible, \
    closing_if_possible
from swift.common.request_helpers import SegmentedIterable
from swift.common.wsgi import WSGIContext, make_subrequest


class GetContext(WSGIContext):
    def __init__(self, dlo, logger):
        super(GetContext, self).__init__(dlo.app)
        self.dlo = dlo
        self.logger = logger

    def _get_container_listing(self, req, version, account, container,
                               prefix, marker=''):
        con_req = make_subrequest(
            req.environ, path='/'.join(['', version, account, container]),
            method='GET',
            headers={'x-auth-token': req.headers.get('x-auth-token')},
            agent=('%(orig)s ' + 'DLO MultipartGET'), swift_source='DLO')
        con_req.query_string = 'format=json&prefix=%s' % quote(prefix)
        if marker:
            con_req.query_string += '&marker=%s' % quote(marker)

        con_resp = con_req.get_response(self.dlo.app)
        if not is_success(con_resp.status_int):
            return con_resp, None
        with closing_if_possible(con_resp.app_iter):
            return None, json.loads(''.join(con_resp.app_iter))

    def _segment_listing_iterator(self, req, version, account, container,
                                  prefix, segments, first_byte=None,
                                  last_byte=None):
        # It's sort of hokey that this thing takes in the first page of
        # segments as an argument, but we need to compute the etag and content
        # length from the first page, and it's better to have a hokey
        # interface than to make redundant requests.
        if first_byte is None:
            first_byte = 0
        if last_byte is None:
            last_byte = float("inf")

        marker = ''
        while True:
            for segment in segments:
                seg_length = int(segment['bytes'])

                if first_byte >= seg_length:
                    # don't need any bytes from this segment
                    first_byte = max(first_byte - seg_length, -1)
                    last_byte = max(last_byte - seg_length, -1)
                    continue
                elif last_byte < 0:
                    # no bytes are needed from this or any future segment
                    break

                seg_name = segment['name']
                if isinstance(seg_name, six.text_type):
                    seg_name = seg_name.encode("utf-8")

                # (obj path, etag, size, first byte, last byte)
                yield ("/" + "/".join((version, account, container,
                                       seg_name)),
                       # We deliberately omit the etag and size here;
                       # SegmentedIterable will check size and etag if
                       # specified, but we don't want it to. DLOs only care
                       # that the objects' names match the specified prefix.
                       None, None,
                       (None if first_byte <= 0 else first_byte),
                       (None if last_byte >= seg_length - 1 else last_byte))

                first_byte = max(first_byte - seg_length, -1)
                last_byte = max(last_byte - seg_length, -1)

            if len(segments) < constraints.CONTAINER_LISTING_LIMIT:
                # a short page means that we're done with the listing
                break
            elif last_byte < 0:
                break

            marker = segments[-1]['name']
            error_response, segments = self._get_container_listing(
                req, version, account, container, prefix, marker)
            if error_response:
                # we've already started sending the response body to the
                # client, so all we can do is raise an exception to make the
                # WSGI server close the connection early
                close_if_possible(error_response.app_iter)
                raise ListingIterError(
                    "Got status %d listing container /%s/%s" %
                    (error_response.status_int, account, container))

    def get_or_head_response(self, req, x_object_manifest,
                             response_headers=None):
        if response_headers is None:
            response_headers = self._response_headers

        container, obj_prefix = x_object_manifest.split('/', 1)
        container = unquote(container)
        obj_prefix = unquote(obj_prefix)

        # manifest might point to a different container
        req.acl = None
        version, account, _junk = req.split_path(2, 3, True)
        error_response, segments = self._get_container_listing(
            req, version, account, container, obj_prefix)
        if error_response:
            return error_response
        have_complete_listing = len(segments) < \
            constraints.CONTAINER_LISTING_LIMIT

        first_byte = last_byte = None
        actual_content_length = None
        content_length_for_swob_range = None
        if req.range and len(req.range.ranges) == 1:
            content_length_for_swob_range = sum(o['bytes'] for o in segments)

            # This is a hack to handle suffix byte ranges (e.g. "bytes=-5"),
            # which we can't honor unless we have a complete listing.
            _junk, range_end = req.range.ranges_for_length(float("inf"))[0]

            # If this is all the segments, we know whether or not this
            # range request is satisfiable.
            #
            # Alternately, we may not have all the segments, but this range
            # falls entirely within the first page's segments, so we know
            # that it is satisfiable.
            if (have_complete_listing
               or range_end < content_length_for_swob_range):
                byteranges = req.range.ranges_for_length(
                    content_length_for_swob_range)
                if not byteranges:
                    return HTTPRequestedRangeNotSatisfiable(request=req)
                first_byte, last_byte = byteranges[0]
                # For some reason, swob.Range.ranges_for_length adds 1 to the
                # last byte's position.
                last_byte -= 1
                actual_content_length = last_byte - first_byte + 1
            else:
                # The range may or may not be satisfiable, but we can't tell
                # based on just one page of listing, and we're not going to go
                # get more pages because that would use up too many resources,
                # so we ignore the Range header and return the whole object.
                actual_content_length = None
                content_length_for_swob_range = None
                req.range = None

        response_headers = [
            (h, v) for h, v in response_headers
            if h.lower() not in ("content-length", "content-range")]

        if content_length_for_swob_range is not None:
            # Here, we have to give swob a big-enough content length so that
            # it can compute the actual content length based on the Range
            # header. This value will not be visible to the client; swob will
            # substitute its own Content-Length.
            #
            # Note: if the manifest points to at least CONTAINER_LISTING_LIMIT
            # segments, this may be less than the sum of all the segments'
            # sizes. However, it'll still be greater than the last byte in the
            # Range header, so it's good enough for swob.
            response_headers.append(('Content-Length',
                                     str(content_length_for_swob_range)))
        elif have_complete_listing:
            actual_content_length = sum(o['bytes'] for o in segments)
            response_headers.append(('Content-Length',
                                     str(actual_content_length)))

        if have_complete_listing:
            response_headers = [(h, v) for h, v in response_headers
                                if h.lower() != "etag"]
            etag = md5()
            for seg_dict in segments:
                etag.update(seg_dict['hash'].strip('"'))
            response_headers.append(('Etag', '"%s"' % etag.hexdigest()))

        app_iter = None
        if req.method == 'GET':
            listing_iter = RateLimitedIterator(
                self._segment_listing_iterator(
                    req, version, account, container, obj_prefix, segments,
                    first_byte=first_byte, last_byte=last_byte),
                self.dlo.rate_limit_segments_per_sec,
                limit_after=self.dlo.rate_limit_after_segment)

            app_iter = SegmentedIterable(
                req, self.dlo.app, listing_iter, ua_suffix="DLO MultipartGET",
                swift_source="DLO", name=req.path, logger=self.logger,
                max_get_time=self.dlo.max_get_time,
                response_body_length=actual_content_length)

            try:
                app_iter.validate_first_segment()
            except (SegmentError, ListingIterError):
                return HTTPConflict(request=req)

        resp = Response(request=req, headers=response_headers,
                        conditional_response=True,
                        app_iter=app_iter)

        return resp

    def handle_request(self, req, start_response):
        """
        Take a GET or HEAD request, and if it is for a dynamic large object
        manifest, return an appropriate response.

        Otherwise, simply pass it through.
        """
        resp_iter = self._app_call(req.environ)

        # make sure this response is for a dynamic large object manifest
        for header, value in self._response_headers:
            if (header.lower() == 'x-object-manifest'):
                close_if_possible(resp_iter)
                response = self.get_or_head_response(req, value)
                return response(req.environ, start_response)
        else:
            # Not a dynamic large object manifest; just pass it through.
            start_response(self._response_status,
                           self._response_headers,
                           self._response_exc_info)
            return resp_iter


class DynamicLargeObject(object):
    def __init__(self, app, conf):
        self.app = app
        self.logger = get_logger(conf, log_route='dlo')

        # DLO functionality used to live in the proxy server, not middleware,
        # so let's try to go find config values in the proxy's config section
        # to ease cluster upgrades.
        self._populate_config_from_old_location(conf)

        self.max_get_time = int(conf.get('max_get_time', '86400'))
        self.rate_limit_after_segment = int(conf.get(
            'rate_limit_after_segment', '10'))
        self.rate_limit_segments_per_sec = int(conf.get(
            'rate_limit_segments_per_sec', '1'))

    def _populate_config_from_old_location(self, conf):
        if ('rate_limit_after_segment' in conf or
                'rate_limit_segments_per_sec' in conf or
                'max_get_time' in conf or
                '__file__' not in conf):
            return

        cp = ConfigParser()
        if os.path.isdir(conf['__file__']):
            read_conf_dir(cp, conf['__file__'])
        else:
            cp.read(conf['__file__'])

        try:
            pipe = cp.get("pipeline:main", "pipeline")
        except (NoSectionError, NoOptionError):
            return

        proxy_name = pipe.rsplit(None, 1)[-1]
        proxy_section = "app:" + proxy_name
        for setting in ('rate_limit_after_segment',
                        'rate_limit_segments_per_sec',
                        'max_get_time'):
            try:
                conf[setting] = cp.get(proxy_section, setting)
            except (NoSectionError, NoOptionError):
                pass

    def __call__(self, env, start_response):
        """
        WSGI entry point
        """
        req = Request(env)
        try:
            vrs, account, container, obj = req.split_path(4, 4, True)
        except ValueError:
            return self.app(env, start_response)

        # install our COPY-callback hook
        env['swift.copy_hook'] = self.copy_hook(
            env.get('swift.copy_hook',
                    lambda src_req, src_resp, sink_req: src_resp))

        if ((req.method == 'GET' or req.method == 'HEAD') and
                req.params.get('multipart-manifest') != 'get'):
            return GetContext(self, self.logger).\
                handle_request(req, start_response)
        elif req.method == 'PUT':
            error_response = self._validate_x_object_manifest_header(req)
            if error_response:
                return error_response(env, start_response)
        return self.app(env, start_response)

    def _validate_x_object_manifest_header(self, req):
        """
        Make sure that X-Object-Manifest is valid if present.
        """
        if 'X-Object-Manifest' in req.headers:
            value = req.headers['X-Object-Manifest']
            container = prefix = None
            try:
                container, prefix = value.split('/', 1)
            except ValueError:
                pass
            if not container or not prefix or '?' in value or '&' in value or \
                    prefix.startswith('/'):
                return HTTPBadRequest(
                    request=req,
                    body=('X-Object-Manifest must be in the '
                          'format container/prefix'))

    def copy_hook(self, inner_hook):

        def dlo_copy_hook(source_req, source_resp, sink_req):
            x_o_m = source_resp.headers.get('X-Object-Manifest')
            if x_o_m:
                if source_req.params.get('multipart-manifest') == 'get':
                    # To copy the manifest, we let the copy proceed as normal,
                    # but ensure that X-Object-Manifest is set on the new
                    # object.
                    sink_req.headers['X-Object-Manifest'] = x_o_m
                else:
                    ctx = GetContext(self, self.logger)
                    source_resp = ctx.get_or_head_response(
                        source_req, x_o_m, source_resp.headers.items())
            return inner_hook(source_req, source_resp, sink_req)

        return dlo_copy_hook


def filter_factory(global_conf, **local_conf):
    conf = global_conf.copy()
    conf.update(local_conf)

    def dlo_filter(app):
        return DynamicLargeObject(app, conf)
    return dlo_filter