/usr/lib/python3/dist-packages/txfixtures/mongodb.py is in python3-txfixtures 0.2.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import pymongo
from fixtures import TempDir
from txfixtures.service import Service
FORMAT = (
"{Y}-{m}-{d}T{H}:{M}:{S}\.{msecs}\+0000 {levelname} "
"[A-Z]+ +\[{name}\] {message}")
class MongoDB(Service):
"""Start and stop a `mongodb` process in the background. """
def __init__(self, mongod=b"mongod", args=(), **kwargs):
command = [mongod] + list(args)
super(MongoDB, self).__init__(command, **kwargs)
self.expectOutput("waiting for connections on port")
self.setOutputFormat(FORMAT)
self.setClientKwargs()
def setClientKwargs(self, **kwargs):
"""Additional keyword arguments to pass to the client."""
self.clientKwargs = kwargs
@property
def port(self):
return self.protocol.expectedPort
def _setUp(self):
self.expectPort(self.allocatePort())
self._dbPath = self.useFixture(TempDir())
super(MongoDB, self)._setUp()
uri = "mongodb://localhost:%d" % self.port
self.client = pymongo.MongoClient(uri, **self.clientKwargs)
self.addCleanup(self.client.close)
# XXX Workaround pymongo leaving threads around.
self.addCleanup(pymongo.periodic_executor._shutdown_executors)
@property
def _args(self):
return self.command[:] + [
b"--port=%d" % self.port,
b"--dbpath=%s" % self._dbPath.path.encode("utf-8"),
b"--nojournal",
]
|