This file is indexed.

/usr/lib/python3/dist-packages/txfixtures/mongodb.py is in python3-txfixtures 0.2.6-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import pymongo

from fixtures import TempDir

from txfixtures.service import Service

FORMAT = (
    "{Y}-{m}-{d}T{H}:{M}:{S}\.{msecs}\+0000 {levelname} "
    "[A-Z]+ +\[{name}\] {message}")


class MongoDB(Service):
    """Start and stop a `mongodb` process in the background. """

    def __init__(self, mongod=b"mongod", args=(), **kwargs):
        command = [mongod] + list(args)
        super(MongoDB, self).__init__(command, **kwargs)

        self.expectOutput("waiting for connections on port")
        self.setOutputFormat(FORMAT)
        self.setClientKwargs()

    def setClientKwargs(self, **kwargs):
        """Additional keyword arguments to pass to the client."""
        self.clientKwargs = kwargs

    @property
    def port(self):
        return self.protocol.expectedPort

    def _setUp(self):
        self.expectPort(self.allocatePort())
        self._dbPath = self.useFixture(TempDir())
        super(MongoDB, self)._setUp()
        uri = "mongodb://localhost:%d" % self.port
        self.client = pymongo.MongoClient(uri, **self.clientKwargs)
        self.addCleanup(self.client.close)

        # XXX Workaround pymongo leaving threads around.
        self.addCleanup(pymongo.periodic_executor._shutdown_executors)

    @property
    def _args(self):
        return self.command[:] + [
            b"--port=%d" % self.port,
            b"--dbpath=%s" % self._dbPath.path.encode("utf-8"),
            b"--nojournal",
        ]