This file is indexed.

/usr/lib/python2.7/dist-packages/couchdb/multipart.py is in python-couchdb 0.10-1.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# -*- coding: utf-8 -*-
#
# Copyright (C) 2008-2009 Christopher Lenz
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.

"""Support for streamed reading and writing of multipart MIME content."""

from base64 import b64encode
from cgi import parse_header
from email import header
try:
    from hashlib import md5
except ImportError:
    from md5 import new as md5
import sys

from couchdb import util

__all__ = ['read_multipart', 'write_multipart']
__docformat__ = 'restructuredtext en'


CRLF = b'\r\n'


def read_multipart(fileobj, boundary=None):
    """Simple streaming MIME multipart parser.
    
    This function takes a file-like object reading a MIME envelope, and yields
    a ``(headers, is_multipart, payload)`` tuple for every part found, where
    ``headers`` is a dictionary containing the MIME headers of that part (with
    names lower-cased), ``is_multipart`` is a boolean indicating whether the
    part is itself multipart, and ``payload`` is either a string (if
    ``is_multipart`` is false), or an iterator over the nested parts.
    
    Note that the iterator produced for nested multipart payloads MUST be fully
    consumed, even if you wish to skip over the content.
    
    :param fileobj: a file-like object
    :param boundary: the part boundary string, will generally be determined
                     automatically from the headers of the outermost multipart
                     envelope
    :return: an iterator over the parts
    :since: 0.5
    """
    headers = {}
    buf = []
    outer = in_headers = boundary is None

    next_boundary = boundary and ('--' + boundary + '\n').encode('ascii') or None
    last_boundary = boundary and ('--' + boundary + '--\n').encode('ascii') or None

    def _current_part():
        payload = b''.join(buf)
        if payload.endswith(b'\r\n'):
            payload = payload[:-2]
        elif payload.endswith(b'\n'):
            payload = payload[:-1]
        content_md5 = headers.get(b'content-md5')
        if content_md5:
            h = b64encode(md5(payload).digest())
            if content_md5 != h:
                raise ValueError('data integrity check failed')
        return headers, False, payload

    for line in fileobj:
        if in_headers:
            line = line.replace(CRLF, b'\n')
            if line != b'\n':
                name, value = [item.strip() for item in line.split(b':', 1)]
                name = name.lower().decode('ascii')
                value, charset = header.decode_header(value.decode('utf-8'))[0]
                if charset is None:
                    headers[name] = value
                else:
                    headers[name] = value.decode(charset)
            else:
                in_headers = False
                mimetype, params = parse_header(headers.get('content-type'))
                if mimetype.startswith('multipart/'):
                    sub_boundary = params['boundary']
                    sub_parts = read_multipart(fileobj, boundary=sub_boundary)
                    if boundary is not None:
                        yield headers, True, sub_parts
                        headers.clear()
                        del buf[:]
                    else:
                        for part in sub_parts:
                            yield part
                        return

        elif line.replace(CRLF, b'\n') == next_boundary:
            # We've reached the start of a new part, as indicated by the
            # boundary
            if headers:
                if not outer:
                    yield _current_part()
                else:
                    outer = False
                headers.clear()
                del buf[:]
            in_headers = True

        elif line.replace(CRLF, b'\n') == last_boundary:
            # We're done with this multipart envelope
            break

        else:
            buf.append(line)

    if not outer and headers:
        yield _current_part()


class MultipartWriter(object):

    def __init__(self, fileobj, headers=None, subtype='mixed', boundary=None):
        self.fileobj = fileobj
        if boundary is None:
            boundary = self._make_boundary()
        self.boundary = boundary
        if headers is None:
            headers = {}
        headers['Content-Type'] = 'multipart/%s; boundary="%s"' % (
            subtype, self.boundary
        )
        self._write_headers(headers)

    def open(self, headers=None, subtype='mixed', boundary=None):
        self.fileobj.write(b'--')
        self.fileobj.write(self.boundary.encode('utf-8'))
        self.fileobj.write(CRLF)
        return MultipartWriter(self.fileobj, headers=headers, subtype=subtype,
                               boundary=boundary)

    def add(self, mimetype, content, headers=None):
        self.fileobj.write(b'--')
        self.fileobj.write(self.boundary.encode('utf-8'))
        self.fileobj.write(CRLF)
        if headers is None:
            headers = {}

        ctype, params = parse_header(mimetype)
        if isinstance(content, util.utype):
            if 'charset' in params:
                content = content.encode(params['charset'])
            else:
                content = content.encode('utf-8')
                mimetype = mimetype + ';charset=utf-8'
        elif 'charset' not in params:
            try:
                content.decode('utf-8')
            finally:
                mimetype = mimetype + ';charset=utf-8'

        headers['Content-Type'] = mimetype
        if content:
            headers['Content-Length'] = str(len(content))
            hash = b64encode(md5(content).digest()).decode('ascii')
            headers['Content-MD5'] = hash
        self._write_headers(headers)
        if content:
            # XXX: throw an exception if a boundary appears in the content??
            self.fileobj.write(content)
            self.fileobj.write(CRLF)

    def close(self):
        self.fileobj.write(b'--')
        self.fileobj.write(self.boundary.encode('ascii'))
        self.fileobj.write(b'--')
        self.fileobj.write(CRLF)

    def _make_boundary(self):
        try:
            from uuid import uuid4
            return '==' + uuid4().hex + '=='
        except ImportError:
            from random import randrange
            nonce = ('%%0%dd' % len(repr(sys.maxsize - 1))) % token
            return '===============' + nonce + '=='

    def _write_headers(self, headers):
        if headers:
            for name in sorted(headers.keys()):
                value = headers[name]
                if value.encode('ascii', 'ignore') != value.encode('utf-8'):
                    value = header.make_header([(value, 'utf-8')]).encode()
                self.fileobj.write(name.encode('utf-8'))
                self.fileobj.write(b': ')
                self.fileobj.write(value.encode('utf-8'))
                self.fileobj.write(CRLF)
        self.fileobj.write(CRLF)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


def write_multipart(fileobj, subtype='mixed', boundary=None):
    r"""Simple streaming MIME multipart writer.

    This function returns a `MultipartWriter` object that has a few methods to
    control the nested MIME parts. For example, to write a flat multipart
    envelope you call the ``add(mimetype, content, [headers])`` method for
    every part, and finally call the ``close()`` method.

    >>> from couchdb.util import StringIO

    >>> buf = StringIO()
    >>> envelope = write_multipart(buf, boundary='==123456789==')
    >>> envelope.add('text/plain', 'Just testing')
    >>> envelope.close()
    >>> print(buf.getvalue().replace(b'\r\n', b'\n').decode('utf-8'))
    Content-Type: multipart/mixed; boundary="==123456789=="
    <BLANKLINE>
    --==123456789==
    Content-Length: 12
    Content-MD5: nHmX4a6el41B06x2uCpglQ==
    Content-Type: text/plain;charset=utf-8
    <BLANKLINE>
    Just testing
    --==123456789==--
    <BLANKLINE>

    Note that an explicit boundary is only specified for testing purposes. If
    the `boundary` parameter is omitted, the multipart writer will generate a
    random string for the boundary.

    To write nested structures, call the ``open([headers])`` method on the
    respective envelope, and finish each envelope using the ``close()`` method:

    >>> buf = StringIO()
    >>> envelope = write_multipart(buf, boundary='==123456789==')
    >>> part = envelope.open(boundary='==abcdefghi==')
    >>> part.add('text/plain', 'Just testing')
    >>> part.close()
    >>> envelope.close()
    >>> print(buf.getvalue().replace(b'\r\n', b'\n').decode('utf-8')) #:doctest +ELLIPSIS
    Content-Type: multipart/mixed; boundary="==123456789=="
    <BLANKLINE>
    --==123456789==
    Content-Type: multipart/mixed; boundary="==abcdefghi=="
    <BLANKLINE>
    --==abcdefghi==
    Content-Length: 12
    Content-MD5: nHmX4a6el41B06x2uCpglQ==
    Content-Type: text/plain;charset=utf-8
    <BLANKLINE>
    Just testing
    --==abcdefghi==--
    --==123456789==--
    <BLANKLINE>
    
    :param fileobj: a writable file-like object that the output should get
                    written to
    :param subtype: the subtype of the multipart MIME type (e.g. "mixed")
    :param boundary: the boundary to use to separate the different parts
    :since: 0.6
    """
    return MultipartWriter(fileobj, subtype=subtype, boundary=boundary)