This file is indexed.

/usr/lib/python3/dist-packages/acme/standalone.py is in python3-acme 0.28.0-1~deb9u1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""Support for standalone client challenge solvers. """
import argparse
import collections
import functools
import logging
import os
import socket
import sys
import threading

from six.moves import BaseHTTPServer  # type: ignore  # pylint: disable=import-error
from six.moves import http_client  # pylint: disable=import-error
from six.moves import socketserver  # type: ignore  # pylint: disable=import-error

import OpenSSL

from acme import challenges
from acme import crypto_util
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module


logger = logging.getLogger(__name__)

# six.moves.* | pylint: disable=no-member,attribute-defined-outside-init
# pylint: disable=too-few-public-methods,no-init


class TLSServer(socketserver.TCPServer):
    """Generic TLS Server."""

    def __init__(self, *args, **kwargs):
        self.ipv6 = kwargs.pop("ipv6", False)
        if self.ipv6:
            self.address_family = socket.AF_INET6
        else:
            self.address_family = socket.AF_INET
        self.certs = kwargs.pop("certs", {})
        self.method = kwargs.pop(
            # pylint: disable=protected-access
            "method", crypto_util._DEFAULT_TLSSNI01_SSL_METHOD)
        self.allow_reuse_address = kwargs.pop("allow_reuse_address", True)
        socketserver.TCPServer.__init__(self, *args, **kwargs)

    def _wrap_sock(self):
        self.socket = crypto_util.SSLSocket(
            self.socket, certs=self.certs, method=self.method)

    def server_bind(self):  # pylint: disable=missing-docstring
        self._wrap_sock()
        return socketserver.TCPServer.server_bind(self)


class ACMEServerMixin:  # pylint: disable=old-style-class
    """ACME server common settings mixin."""
    # TODO: c.f. #858
    server_version = "ACME client standalone challenge solver"
    allow_reuse_address = True


class BaseDualNetworkedServers(object):
    """Base class for a pair of IPv6 and IPv4 servers that tries to do everything
       it's asked for both servers, but where failures in one server don't
       affect the other.

       If two servers are instantiated, they will serve on the same port.
       """

    def __init__(self, ServerClass, server_address, *remaining_args, **kwargs):
        port = server_address[1]
        self.threads = [] # type: List[threading.Thread]
        self.servers = [] # type: List[ACMEServerMixin]

        # Must try True first.
        # Ubuntu, for example, will fail to bind to IPv4 if we've already bound
        # to IPv6. But that's ok, since it will accept IPv4 connections on the IPv6
        # socket. On the other hand, FreeBSD will successfully bind to IPv4 on the
        # same port, which means that server will accept the IPv4 connections.
        # If Python is compiled without IPv6, we'll error out but (probably) successfully
        # create the IPv4 server.
        for ip_version in [True, False]:
            try:
                kwargs["ipv6"] = ip_version
                new_address = (server_address[0],) + (port,) + server_address[2:]
                new_args = (new_address,) + remaining_args
                server = ServerClass(*new_args, **kwargs) # pylint: disable=star-args
                logger.debug(
                    "Successfully bound to %s:%s using %s", new_address[0],
                    new_address[1], "IPv6" if ip_version else "IPv4")
            except socket.error:
                if self.servers:
                    # Already bound using IPv6.
                    logger.debug(
                        "Certbot wasn't able to bind to %s:%s using %s, this " +
                        "is often expected due to the dual stack nature of " +
                        "IPv6 socket implementations.",
                        new_address[0], new_address[1],
                        "IPv6" if ip_version else "IPv4")
                else:
                    logger.debug(
                        "Failed to bind to %s:%s using %s", new_address[0],
                        new_address[1], "IPv6" if ip_version else "IPv4")
            else:
                self.servers.append(server)
                # If two servers are set up and port 0 was passed in, ensure we always
                # bind to the same port for both servers.
                port = server.socket.getsockname()[1]
        if len(self.servers) == 0:
            raise socket.error("Could not bind to IPv4 or IPv6.")

    def serve_forever(self):
        """Wraps socketserver.TCPServer.serve_forever"""
        for server in self.servers:
            thread = threading.Thread(
                # pylint: disable=no-member
                target=server.serve_forever)
            thread.start()
            self.threads.append(thread)

    def getsocknames(self):
        """Wraps socketserver.TCPServer.socket.getsockname"""
        return [server.socket.getsockname() for server in self.servers]

    def shutdown_and_server_close(self):
        """Wraps socketserver.TCPServer.shutdown, socketserver.TCPServer.server_close, and
           threading.Thread.join"""
        for server in self.servers:
            server.shutdown()
            server.server_close()
        for thread in self.threads:
            thread.join()
        self.threads = []


class TLSSNI01Server(TLSServer, ACMEServerMixin):
    """TLSSNI01 Server."""

    def __init__(self, server_address, certs, ipv6=False):
        TLSServer.__init__(
            self, server_address, BaseRequestHandlerWithLogging, certs=certs, ipv6=ipv6)


class TLSSNI01DualNetworkedServers(BaseDualNetworkedServers):
    """TLSSNI01Server Wrapper. Tries everything for both. Failures for one don't
       affect the other."""

    def __init__(self, *args, **kwargs):
        BaseDualNetworkedServers.__init__(self, TLSSNI01Server, *args, **kwargs)


class BaseRequestHandlerWithLogging(socketserver.BaseRequestHandler):
    """BaseRequestHandler with logging."""

    def log_message(self, format, *args):  # pylint: disable=redefined-builtin
        """Log arbitrary message."""
        logger.debug("%s - - %s", self.client_address[0], format % args)

    def handle(self):
        """Handle request."""
        self.log_message("Incoming request")
        socketserver.BaseRequestHandler.handle(self)


class HTTPServer(BaseHTTPServer.HTTPServer):
    """Generic HTTP Server."""

    def __init__(self, *args, **kwargs):
        self.ipv6 = kwargs.pop("ipv6", False)
        if self.ipv6:
            self.address_family = socket.AF_INET6
        else:
            self.address_family = socket.AF_INET
        BaseHTTPServer.HTTPServer.__init__(self, *args, **kwargs)


class HTTP01Server(HTTPServer, ACMEServerMixin):
    """HTTP01 Server."""

    def __init__(self, server_address, resources, ipv6=False):
        HTTPServer.__init__(
            self, server_address, HTTP01RequestHandler.partial_init(
                simple_http_resources=resources), ipv6=ipv6)


class HTTP01DualNetworkedServers(BaseDualNetworkedServers):
    """HTTP01Server Wrapper. Tries everything for both. Failures for one don't
       affect the other."""

    def __init__(self, *args, **kwargs):
        BaseDualNetworkedServers.__init__(self, HTTP01Server, *args, **kwargs)


class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    """HTTP01 challenge handler.

    Adheres to the stdlib's `socketserver.BaseRequestHandler` interface.

    :ivar set simple_http_resources: A set of `HTTP01Resource`
        objects. TODO: better name?

    """
    HTTP01Resource = collections.namedtuple(
        "HTTP01Resource", "chall response validation")

    def __init__(self, *args, **kwargs):
        self.simple_http_resources = kwargs.pop("simple_http_resources", set())
        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)

    def log_message(self, format, *args):  # pylint: disable=redefined-builtin
        """Log arbitrary message."""
        logger.debug("%s - - %s", self.client_address[0], format % args)

    def handle(self):
        """Handle request."""
        self.log_message("Incoming request")
        BaseHTTPServer.BaseHTTPRequestHandler.handle(self)

    def do_GET(self):  # pylint: disable=invalid-name,missing-docstring
        if self.path == "/":
            self.handle_index()
        elif self.path.startswith("/" + challenges.HTTP01.URI_ROOT_PATH):
            self.handle_simple_http_resource()
        else:
            self.handle_404()

    def handle_index(self):
        """Handle index page."""
        self.send_response(200)
        self.send_header("Content-Type", "text/html")
        self.end_headers()
        self.wfile.write(self.server.server_version.encode())

    def handle_404(self):
        """Handler 404 Not Found errors."""
        self.send_response(http_client.NOT_FOUND, message="Not Found")
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(b"404")

    def handle_simple_http_resource(self):
        """Handle HTTP01 provisioned resources."""
        for resource in self.simple_http_resources:
            if resource.chall.path == self.path:
                self.log_message("Serving HTTP01 with token %r",
                                 resource.chall.encode("token"))
                self.send_response(http_client.OK)
                self.end_headers()
                self.wfile.write(resource.validation.encode())
                return
        else:  # pylint: disable=useless-else-on-loop
            self.log_message("No resources to serve")
        self.log_message("%s does not correspond to any resource. ignoring",
                         self.path)

    @classmethod
    def partial_init(cls, simple_http_resources):
        """Partially initialize this handler.

        This is useful because `socketserver.BaseServer` takes
        uninitialized handler and initializes it with the current
        request.

        """
        return functools.partial(
            cls, simple_http_resources=simple_http_resources)


def simple_tls_sni_01_server(cli_args, forever=True):
    """Run simple standalone TLSSNI01 server."""
    logging.basicConfig(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-p", "--port", default=0, help="Port to serve at. By default "
        "picks random free port.")
    args = parser.parse_args(cli_args[1:])

    certs = {}

    _, hosts, _ = next(os.walk('.')) # type: ignore # https://github.com/python/mypy/issues/465
    for host in hosts:
        with open(os.path.join(host, "cert.pem")) as cert_file:
            cert_contents = cert_file.read()
        with open(os.path.join(host, "key.pem")) as key_file:
            key_contents = key_file.read()
        certs[host.encode()] = (
            OpenSSL.crypto.load_privatekey(
                OpenSSL.crypto.FILETYPE_PEM, key_contents),
            OpenSSL.crypto.load_certificate(
                OpenSSL.crypto.FILETYPE_PEM, cert_contents))

    server = TLSSNI01Server(('', int(args.port)), certs=certs)
    logger.info("Serving at https://%s:%s...", *server.socket.getsockname()[:2])
    if forever:  # pragma: no cover
        server.serve_forever()
    else:
        server.handle_request()


if __name__ == "__main__":
    sys.exit(simple_tls_sni_01_server(sys.argv))  # pragma: no cover